From 13420a2b07c2dafb376b577f8aad6b6be5b4df8e Mon Sep 17 00:00:00 2001 From: bensteUEM Date: Tue, 8 Oct 2024 17:32:14 +0200 Subject: [PATCH] optimized logging G and reduced complexity SIM #102 --- churchtools_api/churchtools_api.py | 45 ++++++++--------- churchtools_api/churchtools_api_abstract.py | 34 ++++++------- churchtools_api/events.py | 55 ++++++++++++--------- churchtools_api/files.py | 17 +++---- churchtools_api/groups.py | 12 ++--- churchtools_api/persons.py | 22 ++++----- churchtools_api/songs.py | 9 ++-- pyproject.toml | 2 - tests/test_churchtools_api.py | 4 +- 9 files changed, 99 insertions(+), 101 deletions(-) diff --git a/churchtools_api/churchtools_api.py b/churchtools_api/churchtools_api.py index 087d8e2..63b3914 100644 --- a/churchtools_api/churchtools_api.py +++ b/churchtools_api/churchtools_api.py @@ -90,14 +90,14 @@ def login_ct_rest_api(self, **kwargs): if response.status_code == 200: response_content = json.loads(response.content) logger.info( - "Token Login Successful as {}".format( - response_content["data"]["email"], - ), + "Token Login Successful as %s", + response_content["data"]["email"], ) self.session.headers["CSRF-Token"] = self.get_ct_csrf_token() return json.loads(response.content)["data"]["id"] logger.warning( - f"Token Login failed with {response.content.decode()}", + "Token Login failed with %s", + response.content.decode(), ) return False @@ -110,12 +110,11 @@ def login_ct_rest_api(self, **kwargs): if response.status_code == 200: response_content = json.loads(response.content) person = self.who_am_i() - logger.info( - "User/Password Login Successful as {}".format(person["email"]), - ) + logger.info("User/Password Login Successful as %s", person["email"]) return person["id"] logger.warning( - f"User/Password Login failed with {response.content.decode()}", + "User/Password Login failed with %s", + response.content.decode(), ) return False return None @@ -132,10 +131,11 @@ def get_ct_csrf_token(self): response = self.session.get(url=url) if response.status_code == 200: csrf_token = json.loads(response.content)["data"] - logger.info(f"CSRF Token erfolgreich abgerufen {csrf_token}") + logger.debug("CSRF Token erfolgreich abgerufen %s", csrf_token) return csrf_token logger.warning( - f"CSRF Token not updated because of Response {response.content.decode()}", + "CSRF Token not updated because of Response %s", + response.content.decode(), ) return None @@ -150,15 +150,11 @@ def who_am_i(self): if response.status_code == 200: response_content = json.loads(response.content) if "email" in response_content["data"]: - logger.info("Who am I as {}".format(response_content["data"]["email"])) + logger.info("Who am I as %s", response_content["data"]["email"]) return response_content["data"] - logger.warning( - "User might not be logged in? {}".format(response_content["data"]), - ) + logger.warning("User might not be logged in? %s", response_content["data"]) return False - logger.warning( - f"Checking who am i failed with {response.status_code}", - ) + logger.warning("Checking who am i failed with %s", response.status_code) return False def check_connection_ajax(self) -> bool: @@ -173,7 +169,8 @@ def check_connection_ajax(self) -> bool: logger.debug("Response AJAX Connection successful") return True logger.debug( - f"Response AJAX Connection failed with {json.load(response.content)}", + "Response AJAX Connection failed with %s", + json.load(response.content), ) return False @@ -188,7 +185,7 @@ def get_global_permissions(self) -> dict: response_content = json.loads(response.content) response_data = response_content["data"].copy() logger.debug( - f"First response of Global Permissions successful {response_content}", + "First response of Global Permissions successful %s", response_content ) return response_data @@ -225,10 +222,11 @@ def get_services(self, **kwargs): response_data = result logger.debug( - f"Services load successful with {len(response_data)} entries", + "Services load successful with %s entries", + len(response_data), ) return response_data - logger.info(f"Services requested failed: {response.status_code}") + logger.info("Services requested failed: %s", response.status_code) return None def get_tags(self, type="songs"): @@ -250,7 +248,7 @@ def get_tags(self, type="songs"): if response.status_code == 200: response_content = json.loads(response.content) response_content["data"].copy() - logger.debug(f"SongTags load successful {response_content}") + logger.debug("SongTags load successful %s", response_content) return response_content["data"] logger.warning( @@ -277,8 +275,7 @@ def get_options(self) -> dict: if response.status_code == 200: response_content = json.loads(response.content) response_data = response_content["data"].copy() - - logger.debug(f"SongTags load successful {response_content}") + logger.debug("SongTags load successful %s", response_content) return {item["name"]: item for item in response_data} logger.warning( "%s Something went wrong fetching Song-tags: %s", diff --git a/churchtools_api/churchtools_api_abstract.py b/churchtools_api/churchtools_api_abstract.py index 91d8b6c..37f21f6 100644 --- a/churchtools_api/churchtools_api_abstract.py +++ b/churchtools_api/churchtools_api_abstract.py @@ -35,22 +35,20 @@ def combine_paginated_response_data( """ response_data = response_content["data"].copy() - if meta := response_content.get("meta"): - if pagination := meta.get("pagination"): - for page in range(pagination["current"], pagination["lastPage"]): - logger.debug( - "running paginated request for page {} of {}".format( - page + 1, - pagination["lastPage"], - ), - ) - new_param = {"page": page + 1} - if kwargs.get("params"): - kwargs["params"].update(new_param) - else: - kwargs["params"] = new_param - - response = self.session.get(url=url, **kwargs) - response_content = json.loads(response.content) - response_data.extend(response_content["data"]) + if pagination := response_content.get("meta", {}).get("pagination"): + for page in range(pagination["current"], pagination["lastPage"]): + logger.debug( + "running paginated request for page %s of %s", + page + 1, + pagination["lastPage"], + ) + new_param = {"page": page + 1} + if kwargs.get("params"): + kwargs["params"].update(new_param) + else: + kwargs["params"] = new_param + + response = self.session.get(url=url, **kwargs) + response_content = json.loads(response.content) + response_data.extend(response_content["data"]) return response_data diff --git a/churchtools_api/events.py b/churchtools_api/events.py index 35ab7b7..5ed3fc8 100644 --- a/churchtools_api/events.py +++ b/churchtools_api/events.py @@ -162,14 +162,16 @@ def get_AllEventData_ajax(self, eventId) -> dict: response_content = json.loads(response.content) if len(response_content["data"]) > 0: response_data = response_content["data"][str(eventId)] - logger.debug(f"AJAX Event data len= {len(response_data)}") + logger.debug("AJAX Event data len=%s", len(response_data)) return response_data logger.info( - f"AJAX All Event data not successful - no event found: {response.status_code}", + "AJAX All Event data not successful - no event found:%s", + response.status_code, ) return None logger.info( - f"AJAX All Event data not successful: {response.status_code}", + "AJAX All Event data not successful: %s", + response.status_code, ) return None @@ -244,12 +246,13 @@ def set_event_services_counts_ajax(self, eventId, serviceId, servicesCount) -> b # Generate form specific data item_id = 0 data = {"id": eventId, "func": "addOrRemoveServiceToEvent"} - for serviceIdRow, serviceCount in servicesOfServiceGroup.items(): + for item_id, (serviceIdRow, serviceCount) in enumerate( + servicesOfServiceGroup.items() + ): data[f"col{item_id}"] = serviceIdRow if serviceCount > 0: data[f"val{item_id}"] = "checked" data[f"count{item_id}"] = serviceCount - item_id += 1 response = self.session.post(url=url, headers=headers, params=params, data=data) @@ -266,11 +269,14 @@ def set_event_services_counts_ajax(self, eventId, serviceId, servicesCount) -> b if number_match and response_success: return True logger.warning( - f"Request was successful but serviceId {serviceId} not changed to count {servicesCount} ", + "Request was successful but serviceId %s not changed to count %s ", + serviceId, + servicesCount, ) return False logger.info( - f"set_event_services_counts_ajax not successful: {response.status_code}", + "set_event_services_counts_ajax not successful: %s", + response.status_code, ) return False @@ -317,12 +323,15 @@ def set_event_admins_ajax(self, eventId: int, admin_ids: list) -> bool: response_content = json.loads(response.content) response_data = response_content["status"] == "success" logger.debug( - f"Setting Admin IDs {admin_ids} for event {eventId} success", + "Setting Admin IDs %s for event %s success", admin_ids, eventId ) return response_data logger.info( - f"Setting Admin IDs {admin_ids} for event {eventId} failed with : {response.status_code}", + "Setting Admin IDs %s for event %s failed with : %s", + admin_ids, + eventId, + response.status_code, ) return False @@ -340,11 +349,12 @@ def get_event_agenda(self, eventId: int) -> list: if response.status_code == 200: response_content = json.loads(response.content) response_data = response_content["data"].copy() - logger.debug(f"Agenda load successful {len(response_content)} items") + logger.debug("Agenda load successful %s items", len(response_content)) return response_data logger.info( - f"Event requested that does not have an agenda with status: {response.status_code}", + "Event requested that does not have an agenda with status: %s", + response.status_code, ) return None @@ -427,7 +437,7 @@ def export_event_agenda( if response.status_code == 200: response_content = json.loads(response.content) agenda_data = response_content["data"].copy() - logger.debug(f"Agenda package found {response_content}") + logger.debug("Agenda package found %s", response_content) result_ok = self.file_download_from_url( "{}/{}".format(self.domain, agenda_data["url"]), target_path, @@ -435,9 +445,7 @@ def export_event_agenda( if result_ok: logger.debug("download finished") else: - logger.warning( - f"export of event_agenda failed: {response.status_code}", - ) + logger.warning("export of event_agenda failed: %s", response.status_code) return result_ok @@ -452,7 +460,7 @@ def get_event_agenda_docx(self, agenda, **kwargs): """ excludeBeforeEvent = kwargs.get("excludeBeforeEvent", False) - logger.debug("Trying to get agenda for: " + agenda["name"]) + logger.debug("Trying to get agenda for: %s", agenda["name"]) document = docx.Document() heading = agenda["name"] @@ -478,12 +486,10 @@ def get_event_agenda_docx(self, agenda, **kwargs): document.add_heading(item["title"], level=1) continue - if ( - pre_event_last_item - ): # helper for event start heading which is not part of the ct_api - if not item["isBeforeEvent"]: - pre_event_last_item = False - document.add_heading("Eventstart", level=1) + # helper for event start heading which is not part of the ct_api + if pre_event_last_item and not item["isBeforeEvent"]: + pre_event_last_item = False + document.add_heading("Eventstart", level=1) agenda_item += 1 @@ -584,10 +590,11 @@ def get_event_masterdata(self, **kwargs) -> list | list[list] | dict | list[dict if kwargs.get("returnAsDict"): response_data2 = response_data.copy() response_data = {item["id"]: item for item in response_data2} - logger.debug(f"Event Masterdata load successful {response_data}") + logger.debug("Event Masterdata load successful len=%s", response_data) return response_data logger.info( - f"Event Masterdata requested failed: {response.status_code}", + "Event Masterdata requested failed: %s", + response.status_code, ) return None diff --git a/churchtools_api/files.py b/churchtools_api/files.py index 36891d6..469af56 100644 --- a/churchtools_api/files.py +++ b/churchtools_api/files.py @@ -47,7 +47,7 @@ def file_upload( url = f"{self.domain}/api/files/{domain_type}/{domain_identifier}" if overwrite: - logger.debug(f"deleting old file {source_file} before new upload") + logger.debug("deleting old file %s before new upload", source_file) delete_file_name = ( source_file.name.split("/")[-1] if custom_file_name is None @@ -60,9 +60,7 @@ def file_upload( if custom_file_name is None: files = {"files[]": (source_file.name.split("/")[-1], source_file)} elif "/" in custom_file_name: - logger.warning( - f"/ in file name ({custom_file_name}) will fail upload!", - ) + logger.warning("/ in file name (%s) will fail upload!", custom_file_name) files = {} else: files = {"files[]": (custom_file_name, source_file)} @@ -86,7 +84,7 @@ def file_upload( if response.status_code == 200: try: response_content = json.loads(response.content) - logger.debug(f"Upload successful {response_content}") + logger.debug("Upload successful len=%s", response_content) return True except (json.JSONDecodeError, TypeError, UnicodeDecodeError): logger.warning(response.content.decode()) @@ -166,7 +164,8 @@ def file_download( response_content = json.loads(response.content) arrangement_files = response_content["data"].copy() logger.debug( - f"SongArrangement-Files load successful {response_content}", + "SongArrangement-Files load successful len=%s", + response_content, ) file_found = False @@ -177,13 +176,13 @@ def file_download( break if file_found: - logger.debug(f"Found File: {filename}") + logger.debug("Found File: %s", filename) # Build path OS independent fileUrl = str(file["fileUrl"]) path_file = os.sep.join([target_path, filename]) StateOK = self.file_download_from_url(fileUrl, path_file) else: - logger.warning(f"File {filename} does not exist") + logger.warning("File %s does not exist", filename) return StateOK logger.warning( @@ -214,7 +213,7 @@ def file_download_from_url(self, file_url: str, target_path: str) -> bool: # and set chunk_size parameter to None. # if chunk: f.write(chunk) - logger.debug(f"Download of {file_url} successful") + logger.debug("Download of %s successful",file_url) return True logger.warning( "%s Something went wrong during file_download: %s", diff --git a/churchtools_api/groups.py b/churchtools_api/groups.py index 72eff54..5ad15be 100644 --- a/churchtools_api/groups.py +++ b/churchtools_api/groups.py @@ -69,7 +69,7 @@ def get_groups_hierarchies(self): response_content = json.loads(response.content) response_data = response_content["data"].copy() logger.debug( - f"First response of Groups Hierarchies successful {response_content}", + "First response of Groups Hierarchies successful %s",response_content, ) return {group["groupId"]: group for group in response_data} @@ -104,7 +104,7 @@ def get_group_statistics(self, group_id: int) -> dict: headers=headers, ) logger.debug( - f"First response of Group Statistics successful {response_content}", + "First response of Group Statistics successful len=%s",response_content, ) return response_data logger.warning( @@ -168,7 +168,7 @@ def create_group( headers=headers, ) logger.debug( - f"First response of Create Group successful {response_content}", + "First response of Create Group successful len=%s",response_content, ) return response_data @@ -201,7 +201,7 @@ def update_group(self, group_id: int, data: dict) -> dict: response_content = json.loads(response.content) response_data = response_content["data"].copy() logger.debug( - f"First response of Update Group successful {response_content}", + "First response of Update Group successful len=%s",response_content, ) return response_data @@ -253,7 +253,7 @@ def get_grouptypes(self, **kwargs): response_content = json.loads(response.content) response_data = response_content["data"].copy() logger.debug( - f"First response of Grouptypes successful {response_content}", + "First response of Grouptypes successful len=%s",response_content, ) if isinstance(response_data, list): result = {group["id"]: group for group in response_data} @@ -281,7 +281,7 @@ def get_group_permissions(self, group_id: int): response_content = json.loads(response.content) response_data = response_content["data"].copy() logger.debug( - f"First response of Group Permissions successful {response_content}", + "First response of Group Permissions successful len=%s",response_content, ) return response_data logger.warning( diff --git a/churchtools_api/persons.py b/churchtools_api/persons.py index dce008a..7937d75 100644 --- a/churchtools_api/persons.py +++ b/churchtools_api/persons.py @@ -46,13 +46,14 @@ def get_persons(self, **kwargs) -> list[dict]: response_data = response_content["data"].copy() logger.debug( - f"First response of GET Persons successful {response_content}", + "len of first response of GET Persons successful len(%s)" ,response_content, ) if len(response_data) == 0: logger.warning( - f"Requesting ct_users {params} returned an empty response - " + "Requesting ct_users %s returned an empty response - " "make sure the user has correct permissions", + params ) response_data = self.combine_paginated_response_data( @@ -65,16 +66,15 @@ def get_persons(self, **kwargs) -> list[dict]: [response_data] if isinstance(response_data, dict) else response_data ) - if "returnAsDict" in kwargs and "serviceId" not in kwargs: - if kwargs["returnAsDict"]: - result = {} - for item in response_data: - result[item["id"]] = item - response_data = result + if kwargs.get("returnAsDict") and "serviceId" not in kwargs: + result = {} + for item in response_data: + result[item["id"]] = item + response_data = result - logger.debug(f"Persons load successful {response_data}") + logger.debug("Persons load successful %s",response_data) return response_data - logger.info(f"Persons requested failed: {response.status_code}") + logger.info("Persons requested failed: %s",response.status_code) return None def get_persons_masterdata( @@ -98,7 +98,7 @@ def get_persons_masterdata( if response.status_code == 200: response_content = json.loads(response.content) response_data = response_content["data"].copy() - logger.debug(f"Person Masterdata load successful {response_data}") + logger.debug("Person Masterdata load successful len=%s",response_data) return response_data logger.warning( diff --git a/churchtools_api/songs.py b/churchtools_api/songs.py index 5aedbf8..cfe4633 100644 --- a/churchtools_api/songs.py +++ b/churchtools_api/songs.py @@ -45,11 +45,10 @@ def get_songs(self, **kwargs) -> list[dict]: if "song_id" in kwargs: logger.info( - "Did not find song ({}) with CODE {}".format( + "Did not find song (%s) with CODE %s", kwargs["song_id"], response.status_code, - ), - ) + ) return None logger.warning( "%s Something went wrong fetching songs: %s", @@ -151,10 +150,10 @@ def create_song( # noqa: PLR0913 if response.status_code == 200: response_content = json.loads(response.content) new_id = int(response_content["data"]) - logger.debug(f"Song created successful with ID={new_id}") + logger.debug("Song created successful with ID=%s",new_id) return new_id - logger.info(f"Creating song failed with {response.status_code}") + logger.info("Creating song failed with %s",response.status_code) return None def edit_song( # noqa: PLR0913 diff --git a/pyproject.toml b/pyproject.toml index 18c019a..17553be 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -80,8 +80,6 @@ ignore = [ "PTH110","PTH107","PTH123","PTH118","PTH112","PTH103", #Path changes "SIM115", #context manager for files - "SIM102","SIM113", - "G001","G003", "G004", "A001","A002", "TRY300", diff --git a/tests/test_churchtools_api.py b/tests/test_churchtools_api.py index f09c4e4..3280cbc 100644 --- a/tests/test_churchtools_api.py +++ b/tests/test_churchtools_api.py @@ -175,7 +175,7 @@ def test_get_persons_sex_id(self) -> None: assert result == EXPECTED_RESULT def test_get_songs(self) -> None: - """1. Test requests all songs and checks that result has more than 10 elements (hence default pagination works) + """1. Test requests all songs and checks that result has more than 50 elements (hence default pagination works) 2. Test requests song 2034 and checks that result matches "sample". IMPORTANT - This test method and the parameters used depend on the target system! @@ -214,7 +214,7 @@ def test_get_song_category_map(self) -> None: assert song_catgegory_dict["Test"] == 13 def test_get_groups(self) -> None: - """1. Test requests all groups and checks that result has more than 10 elements (hence default pagination works) + """1. Test requests all groups and checks that result has more than 50 elements (hence default pagination works) 2. Test requests group 103 and checks that result matches Test song. IMPORTANT - This test method and the parameters used depend on the target system!