From 0902ab4822aad53fc654e3cd09fa187c440b5411 Mon Sep 17 00:00:00 2001 From: bensteUEM Date: Mon, 7 Oct 2024 17:28:29 +0200 Subject: [PATCH] Implement masterdata and gender options Fixes #110 --- churchtools_api/churchtools_api.py | 41 +++- churchtools_api/persons.py | 68 +++++-- tests/test_churchtools_api.py | 313 ++++++++++++++++++----------- 3 files changed, 286 insertions(+), 136 deletions(-) diff --git a/churchtools_api/churchtools_api.py b/churchtools_api/churchtools_api.py index 7c39e01..5e576cd 100644 --- a/churchtools_api/churchtools_api.py +++ b/churchtools_api/churchtools_api.py @@ -12,8 +12,16 @@ logger = logging.getLogger(__name__) -class ChurchToolsApi(ChurchToolsApiPersons, ChurchToolsApiEvents, ChurchToolsApiGroups, - ChurchToolsApiSongs, ChurchToolsApiFiles, ChurchToolsApiCalendar, ChurchToolsApiResources): + +class ChurchToolsApi( + ChurchToolsApiPersons, + ChurchToolsApiEvents, + ChurchToolsApiGroups, + ChurchToolsApiSongs, + ChurchToolsApiFiles, + ChurchToolsApiCalendar, + ChurchToolsApiResources, +): """Main class used to combine all api functions Args: @@ -265,3 +273,32 @@ def get_tags(self, type='songs'): else: logger.warning( "%s Something went wrong fetching Song-tags: %s",response.status_code, response.content) + + def get_options(self) -> dict: + """Helper function which returns all configurable option fields from CT. + e.g. common use is sexId + + Returns: + dict of options - named by "name" from original list response + """ + + url = self.domain + "/api/dbfields" + headers = {"accept": "application/json"} + params = { + "include[]": "options", + } + response = self.session.get(url=url, params=params, headers=headers) + + if response.status_code == 200: + response_content = json.loads(response.content) + response_data = response_content["data"].copy() + + logger.debug("SongTags load successful {}".format(response_content)) + response_dict = {item["name"]: item for item in response_data} + return response_dict + else: + logger.warning( + "%s Something went wrong fetching Song-tags: %s", + response.status_code, + response.content, + ) diff --git a/churchtools_api/persons.py b/churchtools_api/persons.py index ab1abe6..8f5b45c 100644 --- a/churchtools_api/persons.py +++ b/churchtools_api/persons.py @@ -5,6 +5,7 @@ logger = logging.getLogger(__name__) + class ChurchToolsApiPersons(ChurchToolsApiAbstract): """ Part definition of ChurchToolsApi which focuses on persons @@ -15,23 +16,29 @@ class ChurchToolsApiPersons(ChurchToolsApiAbstract): def __init__(self): super() - def get_persons(self, **kwargs): + def get_persons(self, **kwargs) -> list[dict]: """ - Function to get list of all or a person from CT - :param kwargs: optional keywords as listed - :keyword ids: list: of a ids filter - :keyword returnAsDict: bool: true if should return a dict instead of list - :return: list of user dicts - :rtype: list[dict] + Function to get list of all or a person from CT. + + Arguments: + kwargs: optional keywords as listed + + Kwargs: + ids: list: of a ids filter + returnAsDict: bool: true if should return a dict instead of list + + Permissions: + some fields e.g. sexId require "security level person" with at least level 2 (administer persons is not sufficient) + + Returns: + list of user dicts """ - url = self.domain + '/api/persons' - params = {"limit":50} #increases default pagination size - if 'ids' in kwargs.keys(): - params['ids[]'] = kwargs['ids'] - - headers = { - 'accept': 'application/json' - } + url = self.domain + "/api/persons" + params = {"limit": 50} # increases default pagination size + if "ids" in kwargs.keys(): + params["ids[]"] = kwargs["ids"] + + headers = {"accept": "application/json"} response = self.session.get(url=url, headers=headers, params=params) if response.status_code == 200: @@ -50,7 +57,7 @@ def get_persons(self, **kwargs): ) response_data = [response_data] if isinstance(response_data, dict) else response_data - if 'returnAsDict' in kwargs and not 'serviceId' in kwargs: + if 'returnAsDict' in kwargs and 'serviceId' not in kwargs: if kwargs['returnAsDict']: result = {} for item in response_data: @@ -64,3 +71,32 @@ def get_persons(self, **kwargs): "Persons requested failed: {}".format( response.status_code)) return None + + def get_persons_masterdata( + self, resultClass: str = None, returnAsDict: bool = False, **kwargs + ) -> dict[list[dict]]: + """ + Function to get the Masterdata of the persons module + This information is required to map some IDs to specific items + + Returns: + dict of lists of masterdata items each with list of dict items used as configuration + """ + url = self.domain + "/api/person/masterdata" + + headers = {"accept": "application/json"} + response = self.session.get(url=url, headers=headers) + + if response.status_code == 200: + response_content = json.loads(response.content) + response_data = response_content["data"].copy() + logger.debug("Person Masterdata load successful {}".format(response_data)) + + return response_data + else: + logger.warning( + "%s Something went wrong fetching person metadata: %s", + response.status_code, + response.content, + ) + return None diff --git a/tests/test_churchtools_api.py b/tests/test_churchtools_api.py index 296307e..ccf9644 100644 --- a/tests/test_churchtools_api.py +++ b/tests/test_churchtools_api.py @@ -5,7 +5,6 @@ import os from pathlib import Path import unittest -from datetime import datetime, timedelta from churchtools_api.churchtools_api import ChurchToolsApi @@ -19,30 +18,30 @@ log_directory.mkdir(parents=True) logging.config.dictConfig(config=logging_config) + class TestsChurchToolsApi(unittest.TestCase): def __init__(self, *args, **kwargs): super(TestsChurchToolsApi, self).__init__(*args, **kwargs) - if 'CT_TOKEN' in os.environ: - self.ct_token = os.environ['CT_TOKEN'] - self.ct_domain = os.environ['CT_DOMAIN'] - users_string = os.environ['CT_USERS'] + if "CT_TOKEN" in os.environ: + self.ct_token = os.environ["CT_TOKEN"] + self.ct_domain = os.environ["CT_DOMAIN"] + users_string = os.environ["CT_USERS"] self.ct_users = ast.literal_eval(users_string) - logger.info( - 'using connection details provided with ENV variables') + logger.info("using connection details provided with ENV variables") else: from secure.config import ct_token + self.ct_token = ct_token from secure.config import ct_domain + self.ct_domain = ct_domain from secure.config import ct_users + self.ct_users = ct_users - logger.info( - 'using connection details provided from secrets folder') + logger.info("using connection details provided from secrets folder") - self.api = ChurchToolsApi( - domain=self.ct_domain, - ct_token=self.ct_token) + self.api = ChurchToolsApi(domain=self.ct_domain, ct_token=self.ct_token) logger.info("Executing Tests RUN") def tearDown(self): @@ -61,10 +60,7 @@ def test_init_userpwd(self): self.api.session.close() username = list(self.ct_users.keys())[0] password = list(self.ct_users.values())[0] - ct_api = ChurchToolsApi( - self.ct_domain, - ct_user=username, - ct_password=password) + ct_api = ChurchToolsApi(self.ct_domain, ct_user=username, ct_password=password) self.assertIsNotNone(ct_api) ct_api.session.close() @@ -82,8 +78,7 @@ def test_login_ct_rest_api(self): password = list(self.ct_users.values())[0] if self.api.session is not None: self.api.session.close() - result = self.api.login_ct_rest_api( - ct_user=username, ct_password=password) + result = self.api.login_ct_rest_api(ct_user=username, ct_password=password) self.assertTrue(result) def test_get_ct_csrf_token(self): @@ -93,9 +88,8 @@ def test_get_ct_csrf_token(self): """ token = self.api.get_ct_csrf_token() self.assertGreater( - len(token), - 0, - "Token should be more than one letter but changes each time") + len(token), 0, "Token should be more than one letter but changes each time" + ) def test_check_connection_ajax(self): """ @@ -122,7 +116,7 @@ def test_get_persons(self): result2 = self.api.get_persons(ids=[personId]) self.assertIsInstance(result2, list) self.assertIsInstance(result2[0], dict) - self.assertEqual(result2[0]['firstName'][0:3], 'Ben') + self.assertEqual(result2[0]["firstName"][0:3], "Ben") result3 = self.api.get_persons(returnAsDict=True) self.assertIsInstance(result3, dict) @@ -130,6 +124,64 @@ def test_get_persons(self): result4 = self.api.get_persons(returnAsDict=False) self.assertIsInstance(result4, list) + def test_get_persons_masterdata(self) -> None: + """ + Tries to retrieve metadata for persons module + Expected sections equal those that were available on ELKW1610.krz.tools on 4.Oct.2024 + + Some items might be hidden in options instead of masterdata e.g. sex + """ + EXPECTED_SECTIONS = { + "roles", + "ageGroups", + "targetGroups", + "groupTypes", + "groupCategories", + "groupStatuses", + "departments", + "statuses", + "campuses", + "contactLabels", + "growPaths", + "followUps", + "followUpIntervals", + "groupMeetingTemplates", + "relationshipTypes", + } + + result = self.api.get_persons_masterdata() + self.assertIsInstance(result, dict) + self.assertEqual(EXPECTED_SECTIONS, set(result.keys())) + + for section in result.values(): + self.assertIsInstance(section, list) + for item in section: + self.assertIsInstance(item, dict) + + def test_get_options(self): + """Checks that option fields can retrieved""" + result = self.api.get_options() + self.assertIn("sex", result) + + def test_get_persons_sex_id(self): + """Tests that persons sexId can be retrieved and converted to a human readable gender + + IMPORTANT - This test method and the parameters used depend on the target system! + the hard coded sample exists on ELKW1610.KRZ.TOOLS + """ + EXPECTED_RESULT = "sex.unknown" + SAMPLE_USER_ID = 513 + + person = self.api.get_persons(ids=[SAMPLE_USER_ID]) + + gender_map = { + item["id"]: item["name"] + for item in self.api.get_options()["sex"]["options"] + } + result = gender_map[person[0]["sexId"]] + + self.assertEqual(EXPECTED_RESULT, result) + def test_get_songs(self) -> None: """ 1. Test requests all songs and checks that result has more than 10 elements (hence default pagination works) @@ -159,8 +211,8 @@ def test_get_song_ajax(self): self.assertIsInstance(song, dict) self.assertEqual(len(song), 14) - self.assertEqual(int(song['id']), SAMPLE_SONG_ID) - self.assertEqual(song['bezeichnung'], 'sample') + self.assertEqual(int(song["id"]), SAMPLE_SONG_ID) + self.assertEqual(song["bezeichnung"], "sample") def test_get_song_category_map(self): """ @@ -171,7 +223,7 @@ def test_get_song_category_map(self): """ song_catgegory_dict = self.api.get_song_category_map() - self.assertEqual(song_catgegory_dict['Test'], 13) + self.assertEqual(song_catgegory_dict["Test"], 13) def test_get_groups(self) -> None: """ @@ -204,9 +256,9 @@ def test_get_groups_hierarchies(self): hierarchies = self.api.get_groups_hierarchies() self.assertIsInstance(hierarchies, dict) for hierarchy in hierarchies.values(): - self.assertTrue('groupId' in hierarchy) - self.assertTrue('parents' in hierarchy) - self.assertTrue('children' in hierarchy) + self.assertTrue("groupId" in hierarchy) + self.assertTrue("parents" in hierarchy) + self.assertTrue("children" in hierarchy) def test_get_group_statistics(self) -> None: """ @@ -237,17 +289,17 @@ def test_get_grouptypes(self): self.assertIsInstance(grouptypes, dict) self.assertGreater(len(grouptypes), 2) for grouptype in grouptypes.values(): - self.assertTrue('id' in grouptype) - self.assertTrue('name' in grouptype) + self.assertTrue("id" in grouptype) + self.assertTrue("name" in grouptype) # one type only grouptypes = self.api.get_grouptypes(grouptype_id=2) self.assertEqual(len(grouptypes), 1) for grouptype in grouptypes.values(): - self.assertTrue('id' in grouptype) - self.assertTrue('name' in grouptype) - self.assertEqual(grouptype['id'], 2) - self.assertEqual(grouptype['name'], 'Dienst') + self.assertTrue("id" in grouptype) + self.assertTrue("name" in grouptype) + self.assertEqual(grouptype["id"], 2) + self.assertEqual(grouptype["name"], "Dienst") def test_get_group_permissions(self): """ @@ -256,8 +308,8 @@ def test_get_group_permissions(self): :return: """ permissions = self.api.get_group_permissions(group_id=103) - self.assertEqual(permissions['churchdb']['+see group'], 2) - self.assertTrue(permissions['churchdb']['+edit group infos']) + self.assertEqual(permissions["churchdb"]["+see group"], 2) + self.assertTrue(permissions["churchdb"]["+edit group infos"]) def test_create_and_delete_group(self): """IMPORTANT - This test method and the parameters used depend on the target system! @@ -417,7 +469,7 @@ def test_get_groups_members(self) -> None: ) assert len(result) == 1 - def test_add_and_remove_group_members(self)->None: + def test_add_and_remove_group_members(self) -> None: """IMPORTANT - This test method and the parameters used depend on the target system! the hard coded sample exists on ELKW1610.KRZ.TOOLS. """ @@ -479,12 +531,12 @@ def test_get_global_permissions(self): :return: """ permissions = self.api.get_global_permissions() - self.assertIn('churchcore', permissions.keys()) - self.assertIn('administer settings', permissions['churchcore'].keys()) + self.assertIn("churchcore", permissions.keys()) + self.assertIn("administer settings", permissions["churchcore"].keys()) - self.assertFalse(permissions['churchcore']['administer settings']) - self.assertFalse(permissions['churchdb']['view birthdaylist']) - self.assertTrue(permissions['churchwiki']['view']) + self.assertFalse(permissions["churchcore"]["administer settings"]) + self.assertFalse(permissions["churchdb"]["view birthdaylist"]) + self.assertTrue(permissions["churchwiki"]["view"]) def test_file_upload_replace_delete(self): """ @@ -500,91 +552,106 @@ def test_file_upload_replace_delete(self): :return: """ # 0. Clean and delete files in test - self.api.file_delete('song_arrangement', 417) + self.api.file_delete("song_arrangement", 417) song = self.api.get_songs(song_id=408)[0] self.assertEqual( - song['arrangements'][0]['id'], - 417, - 'check that default arrangement exists') - self.assertEqual(len(song['arrangements'][0] - ['files']), 0, 'check that ono files exist') + song["arrangements"][0]["id"], 417, "check that default arrangement exists" + ) + self.assertEqual( + len(song["arrangements"][0]["files"]), 0, "check that ono files exist" + ) # 1. Tries 3 uploads to the test song with ID 408 and arrangement 417 # Adds the same file again without overwrite - should exist twice - self.api.file_upload('samples/pinguin.png', "song_arrangement", 417) + self.api.file_upload("samples/pinguin.png", "song_arrangement", 417) self.api.file_upload( - 'samples/pinguin_shell.png', + "samples/pinguin_shell.png", "song_arrangement", 417, - 'pinguin_shell_rename.png') + "pinguin_shell_rename.png", + ) self.api.file_upload( - 'samples/pinguin.png', - "song_arrangement", - 417, - 'pinguin.png') + "samples/pinguin.png", "song_arrangement", 417, "pinguin.png" + ) song = self.api.get_songs(song_id=408)[0] self.assertIsInstance( - song, dict, 'Should be a single song instead of list of songs') + song, dict, "Should be a single song instead of list of songs" + ) self.assertEqual( - song['arrangements'][0]['id'], - 417, - 'check that default arrangement exsits') - self.assertEqual(len(song['arrangements'][0]['files']), - 3, 'check that only the 3 test attachments exist') - filenames = [i['name'] for i in song['arrangements'][0]['files']] - filenames_target = [ - 'pinguin.png', - 'pinguin_shell_rename.png', - 'pinguin.png'] + song["arrangements"][0]["id"], 417, "check that default arrangement exsits" + ) + self.assertEqual( + len(song["arrangements"][0]["files"]), + 3, + "check that only the 3 test attachments exist", + ) + filenames = [i["name"] for i in song["arrangements"][0]["files"]] + filenames_target = ["pinguin.png", "pinguin_shell_rename.png", "pinguin.png"] self.assertEqual(filenames, filenames_target) # 2. Reupload pinguin.png using overwrite which will remove both old # files but keep one self.api.file_upload( - 'samples/pinguin.png', + "samples/pinguin.png", "song_arrangement", 417, - 'pinguin.png', - overwrite=True) + "pinguin.png", + overwrite=True, + ) song = self.api.get_songs(song_id=408)[0] - self.assertEqual(len(song['arrangements'][0]['files']), - 2, 'check that overwrite is applied on upload') + self.assertEqual( + len(song["arrangements"][0]["files"]), + 2, + "check that overwrite is applied on upload", + ) # 3. Overwrite without existing file self.api.file_upload( - 'samples/pinguin.png', + "samples/pinguin.png", "song_arrangement", 417, - 'pinguin2.png', - overwrite=True) + "pinguin2.png", + overwrite=True, + ) song = self.api.get_songs(song_id=408)[0] - self.assertEqual(len(song['arrangements'][0]['files']), - 3, 'check that both file with overwrite of new file') + self.assertEqual( + len(song["arrangements"][0]["files"]), + 3, + "check that both file with overwrite of new file", + ) # 3.b Try overwriting again and check that number of files does not # increase self.api.file_upload( - 'samples/pinguin.png', + "samples/pinguin.png", "song_arrangement", 417, - 'pinguin.png', - overwrite=True) + "pinguin.png", + overwrite=True, + ) song = self.api.get_songs(song_id=408)[0] - self.assertEqual(len(song['arrangements'][0]['files']), - 3, 'check that still only 3 file exists') + self.assertEqual( + len(song["arrangements"][0]["files"]), + 3, + "check that still only 3 file exists", + ) # 4. Delete only one file self.api.file_delete("song_arrangement", 417, "pinguin.png") song = self.api.get_songs(song_id=408)[0] - self.assertEqual(len(song['arrangements'][0]['files']), - 2, 'check that still only 2 file exists') + self.assertEqual( + len(song["arrangements"][0]["files"]), + 2, + "check that still only 2 file exists", + ) # cleanup delete all files - self.api.file_delete('song_arrangement', 417) + self.api.file_delete("song_arrangement", 417) song = self.api.get_songs(song_id=408)[0] self.assertEqual( - len(song['arrangements'][0]['files']), 0, 'check that files are deleted') + len(song["arrangements"][0]["files"]), 0, "check that files are deleted" + ) def test_create_edit_delete_song(self): """ @@ -594,7 +661,7 @@ def test_create_edit_delete_song(self): On ELKW1610.KRZ.TOOLS songcategory_id 13 is TEST :return: """ - title = 'Test_bezeichnung1' + title = "Test_bezeichnung1" songcategory_id = 13 # 1. Create Song after and check it exists with all params @@ -603,41 +670,51 @@ def test_create_edit_delete_song(self): self.assertIsNotNone(song_id) ct_song = self.api.get_songs(song_id=song_id)[0] - self.assertEqual(ct_song['name'], title) - self.assertEqual(ct_song['author'], '') - self.assertEqual(ct_song['category']['id'], songcategory_id) + self.assertEqual(ct_song["name"], title) + self.assertEqual(ct_song["author"], "") + self.assertEqual(ct_song["category"]["id"], songcategory_id) # 2. Edit Song title and check it exists with all params - self.api.edit_song(song_id, title='Test_bezeichnung2') + self.api.edit_song(song_id, title="Test_bezeichnung2") ct_song = self.api.get_songs(song_id=song_id)[0] - self.assertEqual(ct_song['author'], '') - self.assertEqual(ct_song['name'], 'Test_bezeichnung2') + self.assertEqual(ct_song["author"], "") + self.assertEqual(ct_song["name"], "Test_bezeichnung2") # 3. Edit all fields and check it exists with all params data = { - 'bezeichnung': 'Test_bezeichnung3', - 'songcategory_id': 1, # needs to exist does not matter which because deleted later - 'author': 'Test_author', - 'copyright': 'Test_copyright', - 'ccli': 'Test_ccli', - 'practice_yn': 1, + "bezeichnung": "Test_bezeichnung3", + "songcategory_id": 1, # needs to exist does not matter which because deleted later + "author": "Test_author", + "copyright": "Test_copyright", + "ccli": "Test_ccli", + "practice_yn": 1, } - self.api.edit_song(song_id, data['songcategory_id'], data['bezeichnung'], data['author'], data['copyright'], - data['ccli'], data['practice_yn']) + self.api.edit_song( + song_id, + data["songcategory_id"], + data["bezeichnung"], + data["author"], + data["copyright"], + data["ccli"], + data["practice_yn"], + ) ct_song = self.api.get_songs(song_id=song_id)[0] - self.assertEqual(ct_song['name'], data['bezeichnung']) - self.assertEqual(ct_song['category']['id'], data['songcategory_id']) - self.assertEqual(ct_song['author'], data['author']) - self.assertEqual(ct_song['copyright'], data['copyright']) - self.assertEqual(ct_song['ccli'], data['ccli']) - self.assertEqual(ct_song['shouldPractice'], data['practice_yn']) + self.assertEqual(ct_song["name"], data["bezeichnung"]) + self.assertEqual(ct_song["category"]["id"], data["songcategory_id"]) + self.assertEqual(ct_song["author"], data["author"]) + self.assertEqual(ct_song["copyright"], data["copyright"]) + self.assertEqual(ct_song["ccli"], data["ccli"]) + self.assertEqual(ct_song["shouldPractice"], data["practice_yn"]) # Delete Song self.api.delete_song(song_id) - with self.assertLogs(level='INFO') as cm: + with self.assertLogs(level="INFO") as cm: ct_song = self.api.get_songs(song_id=song_id) messages = [ - "INFO:churchtools_api.songs:Did not find song ({}) with CODE 404".format(song_id)] + "INFO:churchtools_api.songs:Did not find song ({}) with CODE 404".format( + song_id + ) + ] self.assertEqual(messages, cm.output) self.assertIsNone(ct_song) @@ -651,7 +728,7 @@ def test_add_remove_song_tag(self): """ self.api.ajax_song_last_update = None self.assertTrue(self.api.contains_song_tag(408, 53)) - with self.assertNoLogs(level='INFO') as cm: + with self.assertNoLogs(level="INFO") as cm: response = self.api.remove_song_tag(408, 53) self.assertEqual(response.status_code, 200) @@ -659,7 +736,7 @@ def test_add_remove_song_tag(self): self.assertFalse(self.api.contains_song_tag(408, 53)) self.api.ajax_song_last_update = None - with self.assertNoLogs(level='INFO') as cm: + with self.assertNoLogs(level="INFO") as cm: response = self.api.add_song_tag(408, 53) self.assertEqual(response.status_code, 200) @@ -678,11 +755,11 @@ def test_get_songs_with_tag(self): self.api.ajax_song_last_update = None result = self.api.get_songs_by_tag(SAMPLE_TAG_ID) - result_ids = [song['id'] for song in result] + result_ids = [song["id"] for song in result] self.assertIn(SAMPLE_SONG_ID, result_ids) def test_file_download(self): - """ Test of file_download and file_download_from_url on https://elkw1610.krz.tools on any song + """Test of file_download and file_download_from_url on https://elkw1610.krz.tools on any song IDs vary depending on the server used On ELKW1610.KRZ.TOOLS song ID 762 has arrangement 774 does exist @@ -693,21 +770,21 @@ def test_file_download(self): """ test_id = 762 - self.api.file_upload('samples/test.txt', 'song_arrangement', test_id) + self.api.file_upload("samples/test.txt", "song_arrangement", test_id) - filePath = 'downloads/test.txt' + filePath = "downloads/test.txt" if os.path.exists(filePath): os.remove(filePath) - self.api.file_download('test.txt', 'song_arrangement', test_id) + self.api.file_download("test.txt", "song_arrangement", test_id) with open(filePath, "r") as file: download_text = file.read() - self.assertEqual('TEST CONTENT', download_text) + self.assertEqual("TEST CONTENT", download_text) - self.api.file_delete('song_arrangement', test_id, 'test.txt') + self.api.file_delete("song_arrangement", test_id, "test.txt") if os.path.exists(filePath): os.remove(filePath) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main()