From 61a7afece4a85b3a575d4e1242a82b8882da5fc4 Mon Sep 17 00:00:00 2001 From: bensteUEM Date: Tue, 8 Oct 2024 20:51:32 +0200 Subject: [PATCH] fixing PTH SIM and TRY lint issues #102 --- churchtools_api/events.py | 11 ++--- churchtools_api/files.py | 70 ++++++++++++++-------------- generate_pyproj.py | 3 +- pyproject.toml | 6 +-- tests/test_churchtools_api.py | 11 ++--- tests/test_churchtools_api_events.py | 10 ++-- 6 files changed, 54 insertions(+), 57 deletions(-) diff --git a/churchtools_api/events.py b/churchtools_api/events.py index bce46cf..4bab312 100644 --- a/churchtools_api/events.py +++ b/churchtools_api/events.py @@ -2,8 +2,8 @@ import json import logging -import os from datetime import datetime, timedelta +from pathlib import Path import docx @@ -400,18 +400,15 @@ def export_event_agenda( # call or just a folder is_zip = target_path.lower().endswith(".zip") if not is_zip: - folder_exists = os.path.isdir(target_path) - # If folder doesn't exist, then create it. - if not folder_exists: - os.makedirs(target_path) - logger.debug("created folder : %s", target_path) + target_path = Path(target_path) + target_path.mkdir(parents=True, exist_ok=True) if "eventId" in kwargs: new_file_name = "{}_{}.zip".format(agenda["name"], target_format) else: new_file_name = f"{target_format}_agendaId_{agendaId}.zip" - target_path = os.sep.join([target_path, new_file_name]) + target_path = target_path/new_file_name url = f"{self.domain}/api/agendas/{agendaId}/export" # NOTE the stream=True parameter below diff --git a/churchtools_api/files.py b/churchtools_api/files.py index 469af56..c06f8d8 100644 --- a/churchtools_api/files.py +++ b/churchtools_api/files.py @@ -1,6 +1,6 @@ import json import logging -import os +from pathlib import Path from typing import Optional from churchtools_api.churchtools_api_abstract import ChurchToolsApiAbstract @@ -42,31 +42,32 @@ def file_upload( Returns: if successful. """ - source_file = open(source_filepath, "rb") - - url = f"{self.domain}/api/files/{domain_type}/{domain_identifier}" - - if overwrite: - logger.debug("deleting old file %s before new upload", source_file) - delete_file_name = ( - source_file.name.split("/")[-1] - if custom_file_name is None - else custom_file_name - ) - self.file_delete(domain_type, domain_identifier, delete_file_name) - - # add files as files form data with dict using 'files[]' as key and - # (tuple of filename and fileobject) - if custom_file_name is None: - files = {"files[]": (source_file.name.split("/")[-1], source_file)} - elif "/" in custom_file_name: - logger.warning("/ in file name (%s) will fail upload!", custom_file_name) - files = {} - else: - files = {"files[]": (custom_file_name, source_file)} + source_filepath = Path(source_filepath) + with source_filepath.open("rb") as source_file: + url = f"{self.domain}/api/files/{domain_type}/{domain_identifier}" + + if overwrite: + logger.debug("deleting old file %s before new upload", source_file) + delete_file_name = ( + source_file.name.split("/")[-1] + if custom_file_name is None + else custom_file_name + ) + self.file_delete(domain_type, domain_identifier, delete_file_name) + + # add files as files form data with dict using 'files[]' as key and + # (tuple of filename and fileobject) + if custom_file_name is None: + files = {"files[]": (source_file.name.split("/")[-1], source_file)} + elif "/" in custom_file_name: + logger.warning( + "/ in file name (%s) will fail upload!", custom_file_name + ) + files = {} + else: + files = {"files[]": (custom_file_name, source_file)} - response = self.session.post(url=url, files=files) - source_file.close() + response = self.session.post(url=url, files=files) """ # Issues with HEADERS in Request module when using non standard 'files[]' key in POST Request @@ -85,10 +86,11 @@ def file_upload( try: response_content = json.loads(response.content) logger.debug("Upload successful len=%s", response_content) - return True except (json.JSONDecodeError, TypeError, UnicodeDecodeError): logger.warning(response.content.decode()) return False + else: + return True else: logger.warning(response.content.decode()) return False @@ -150,12 +152,10 @@ def file_download( if successful. """ StateOK = False - CHECK_FOLDER = os.path.isdir(target_path) - # If folder doesn't exist, then create it. - if not CHECK_FOLDER: - os.makedirs(target_path) - logger.info("created folder : %s", target_path) + target_path = Path(target_path) + target_path.mkdir(parents=True, exist_ok=True) + url = f"{self.domain}/api/files/{domain_type}/{domain_identifier}" response = self.session.get(url=url) @@ -179,7 +179,7 @@ def file_download( logger.debug("Found File: %s", filename) # Build path OS independent fileUrl = str(file["fileUrl"]) - path_file = os.sep.join([target_path, filename]) + path_file = target_path / filename StateOK = self.file_download_from_url(fileUrl, path_file) else: logger.warning("File %s does not exist", filename) @@ -205,15 +205,17 @@ def file_download_from_url(self, file_url: str, target_path: str) -> bool: if successful. """ # NOTE the stream=True parameter below + + target_path = Path(target_path) with self.session.get(url=file_url, stream=True) as response: if response.status_code == 200: - with open(target_path, "wb") as f: + with target_path.open("wb") as f: for chunk in response.iter_content(chunk_size=8192): # If you have chunk encoded response uncomment if # and set chunk_size parameter to None. # if chunk: f.write(chunk) - logger.debug("Download of %s successful",file_url) + logger.debug("Download of %s successful", file_url) return True logger.warning( "%s Something went wrong during file_download: %s", diff --git a/generate_pyproj.py b/generate_pyproj.py index 13d7e3b..48761c5 100644 --- a/generate_pyproj.py +++ b/generate_pyproj.py @@ -1,4 +1,5 @@ import importlib.util +from pathlib import Path import tomli_w @@ -111,5 +112,5 @@ }, } -with open("pyproject.toml", "wb") as toml_file: +with Path.open("pyproject.toml", "wb") as toml_file: tomli_w.dump(pyproject_toml_content, toml_file) diff --git a/pyproject.toml b/pyproject.toml index 9f05e69..7b58291 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -76,11 +76,7 @@ ignore = [ "E501", #line too long "DTZ001","DTZ005","DTZ007","DTZ002", #datetime timezone - "PLR2004", #magic values - "PTH110","PTH107","PTH123","PTH118","PTH112","PTH103", #Path changes - "SIM115", #context manager for files - - "TRY300", + "PLR2004", #magic values "COM812", "ISC001", #disabled for formatter compatibility diff --git a/tests/test_churchtools_api.py b/tests/test_churchtools_api.py index 3280cbc..0d0e026 100644 --- a/tests/test_churchtools_api.py +++ b/tests/test_churchtools_api.py @@ -743,18 +743,17 @@ def test_file_download(self) -> None: self.api.file_upload("samples/test.txt", "song_arrangement", test_id) - filePath = "downloads/test.txt" - if os.path.exists(filePath): - os.remove(filePath) + filePath = Path("downloads/test.txt") + + filePath.unlink(missing_ok=True) self.api.file_download("test.txt", "song_arrangement", test_id) - with open(filePath) as file: + with filePath.open() as file: download_text = file.read() assert download_text == "TEST CONTENT" self.api.file_delete("song_arrangement", test_id, "test.txt") - if os.path.exists(filePath): - os.remove(filePath) + filePath.unlink() if __name__ == "__main__": diff --git a/tests/test_churchtools_api_events.py b/tests/test_churchtools_api_events.py index 13ffa02..bc327c8 100644 --- a/tests/test_churchtools_api_events.py +++ b/tests/test_churchtools_api_events.py @@ -301,10 +301,12 @@ def test_export_event_agenda(self) -> None: assert len(captured.records) == 1 assert not download_result - if os.path.exists("downloads"): - for file in os.listdir("downloads"): - os.remove("downloads/" + file) - assert len(os.listdir("downloads")) == 0 + download_dir = Path("downloads") + for root, dirs, files in download_dir.walk(top_down=False): + for name in files: + (root / name).unlink() + for name in dirs: + (root / name).rmdir() download_result = self.api.export_event_agenda("SONG_BEAMER", agendaId=agendaId) assert download_result