This commit is contained in:
Bastian Schnorbus
2025-05-04 16:01:16 +02:00
commit 5941117b86
11 changed files with 302 additions and 0 deletions

2
scripts/helper/__init__.py Executable file
View File

@@ -0,0 +1,2 @@
from .paperless_api import PaperlessAPI
from .config import Config

29
scripts/helper/config.py Executable file
View File

@@ -0,0 +1,29 @@
import os
class Config():
def __init__(self) -> None:
self._paperless_url = os.environ["PAPERLESS_BASE_URL"]
self._auth_token = os.environ["PAPERLESS_POST_CONSUME_AUTH_TOKEN"]
self._notify_tag = os.getenv("PAPERLESS_POST_CONSUME_EXTRACTOR_NOTIFY_TAG", "Title changed")
self._expand_by_date = (os.getenv("PAPERLESS_POST_CONSUME_EXPAND_BY_DATE", 'True').lower() in ('false', '0', 'f'))
def get_paperless_url(self):
return self._paperless_url
def get_paperless_api_url(self):
return self._paperless_url + "/api"
def get_auth_token(self):
return self._auth_token
def get_notify_tag(self) -> str:
return self._notify_tag
def expand_title_by_date(self) -> bool:
return True #self._expand_by_date
def __str__(self) -> str:
return " ".join([self._paperless_url, self.get_paperless_api_url(), self._auth_token])

45
scripts/helper/paperless_api.py Executable file
View File

@@ -0,0 +1,45 @@
# TODO: The user can use anything in the standard library, installed for paperless
# or use the custom startup scripts to install additional libraries via pip
import requests
class PaperlessAPI:
def __init__(self, api_url, auth_token, timeout=5) -> None:
self._base_api_url = api_url
self._auth_token = auth_token
self._timeout = timeout
def _get_item_by_id(self, item_type, item_id):
if item_id:
response = requests.get(f"{self._base_api_url}/{item_type}/{item_id}/",
headers = {"Authorization": f"Token {self._auth_token}"})
if response.ok:
return response.json()
return {}
def get_document_by_id(self, document_id):
return self._get_item_by_id("documents", document_id)
def get_tag_id_by_name(self, tag_name):
response = requests.get(f"{self._base_api_url}/tags/",
headers = {"Authorization": f"Token {self._auth_token}"})
if response.ok:
print("tags ok")
response_json = response.json()
tag_list = []
tag_list.extend(response_json.get("results"))
fetched_tag = [tag for tag in tag_list if tag.get("name") == tag_name]
if len(fetched_tag) > 0:
print(f'fetched = {fetched_tag}')
return fetched_tag[0].get("id")
print("not found, returning none")
return None
def patch_document(self, document_id, data):
return requests.patch(f"{self._base_api_url}/documents/{document_id}/",
headers = {"Authorization": f"Token {self._auth_token}"},
data = data,
timeout = self._timeout)