diff --git a/churchtools_api/groups.py b/churchtools_api/groups.py index af2029d..16222bd 100644 --- a/churchtools_api/groups.py +++ b/churchtools_api/groups.py @@ -81,6 +81,75 @@ def get_groups_hierarchies(self): "Something went wrong fetching groups hierarchies: {}".format( response.status_code)) + def get_group_statistics(self, group_id: int): + """ + Get statistics for the given group + :param group_id: required group_id + :return: dict with statistics + :rtype: dict + """ + url = self.domain + '/api/groups/{}/statistics'.format(group_id) + headers = { + 'accept': 'application/json' + } + response = self.session.get(url=url, headers=headers) + + if response.status_code == 200: + response_content = json.loads(response.content) + response_data = response_content['data'].copy() + logging.debug( + "First response of Group Statistics successful {}".format(response_content)) + return response_data + else: + logging.warning( + "Something went wrong fetching group statistics: {}".format( + response.status_code)) + + def create_group(self, name: str, group_status_id: int, grouptype_id: int, **kwargs): + """ + Create a new group + :param name: str: required name + :param group_status_id: int: required status id + :param grouptype_id: int: required grouptype id + :keyword campus_id: int: optional campus id + :keyword superior_group_id: int: optional superior group id + :keyword force: bool: set to force create if a group with this name + already exists + :return: dict with created group + :rtype: dict + """ + url = self.domain + '/api/groups' + headers = { + 'accept': 'application/json' + } + data = { + "groupStatusId": group_status_id, + "groupTypeId": grouptype_id, + "name": name, + } + + if 'campus_id' in kwargs.keys(): + data['campusId'] = kwargs['campus_id'] + + if 'force' in kwargs.keys(): + data['force'] = kwargs['force'] + + if 'superior_group_id' in kwargs.keys(): + data['superiorGroupId'] = kwargs['superior_group_id'] + + response = self.session.post(url=url, headers=headers, data=data) + + if response.status_code == 201: + response_content = json.loads(response.content) + response_data = response_content['data'].copy() + logging.debug( + "First response of Create Group successful {}".format(response_content)) + + return response_data + else: + logging.warning( + "Something went wrong with creating group: {}".format(response.status_code)) + def update_group(self, group_id: int, data: dict): """ Update a field of the given group @@ -110,6 +179,24 @@ def update_group(self, group_id: int, data: dict): "Something went wrong updating group: {}".format( response.status_code)) + def delete_group(self, group_id: int): + """ + Delete the given group + :param group_id: int: required group_id + :return: True if successful + :rtype: bool + """ + url = self.domain + '/api/groups/{}'.format(group_id) + response = self.session.delete(url=url) + + if response.status_code == 204: + logging.debug("First response of Delete Group successful") + return True + else: + logging.warning( + "Something went wrong deleting group: {}".format( + response.status_code)) + def get_grouptypes(self, **kwargs): """ Get list of all grouptypes @@ -164,3 +251,167 @@ def get_group_permissions(self, group_id: int): logging.warning( "Something went wrong fetching group permissions: {}".format( response.status_code)) + + def get_group_members(self, group_id: int, **kwargs): + """ + Get list of members for the given group + :param group_id: int: required group id + :keyword role_ids: list[int]: optional filter list of role ids + :return: list of group member dicts + :rtype: list[dict] + """ + url = self.domain + '/api/groups/{}/members'.format(group_id) + headers = { + 'accept': 'application/json' + } + params = {} + + if 'role_ids' in kwargs.keys(): + params['role_ids[]'] = kwargs['role_ids'] + + response = self.session.get(url=url, headers=headers, params=params) + + if response.status_code == 200: + response_content = json.loads(response.content) + response_data = response_content['data'].copy() + logging.debug( + "First response of Group Members successful {}".format(response_content)) + + if 'meta' not in response_content.keys(): # Shortcut without Pagination + return response_data + + # Long part extending results with pagination + while response_content['meta']['pagination']['current'] \ + < response_content['meta']['pagination']['lastPage']: + logging.info("page {} of {}".format(response_content['meta']['pagination']['current'], + response_content['meta']['pagination']['lastPage'])) + params['page'] = response_content['meta']['pagination']['current'] + 1 + response = self.session.get( + url=url, headers=headers, params=params) + response_content = json.loads(response.content) + response_data.extend(response_content['data']) + + return response_data + else: + logging.warning( + "Something went wrong fetching group members: {}".format(response.status_code)) + + def add_group_member(self, group_id: int, person_id: int, **kwargs): + """ + Add a member to a group + :param group_id: int: required group id + :param person_id: int: required person id + :keyword grouptype_role_id: int: optional grouptype role id + :keyword group_member_status: str: optional member status + :return: dict with group member + :rtype: dict + """ + url = self.domain + \ + '/api/groups/{}/members/{}'.format(group_id, person_id) + headers = { + 'accept': 'application/json', + } + + data = {} + if 'grouptype_role_id' in kwargs.keys(): + data['groupTypeRoleId'] = kwargs['grouptype_role_id'] + if 'group_member_status' in kwargs.keys(): + data['group_member_status'] = kwargs['group_member_status'] + + response = self.session.put( + url=url, data=data, headers=headers) + + if response.status_code == 200: + response_content = json.loads(response.content) + # For unknown reasons the endpoint returns a list of items instead + # of a single item as specified in the API documentation. + response_data = response_content['data'][0].copy() + logging.debug( + "First response of Add Group Member successful {}".format(response_content)) + + return response_data + else: + logging.warning( + "Something went wrong adding group member: {}".format(response.status_code)) + + def remove_group_member(self, group_id: int, person_id: int): + """ + Remove the given group member + :param group_id: int: required group id + :param person_id: int: required person id + :return: True if successful + :rtype: bool + """ + url = self.domain + \ + '/api/groups/{}/members/{}'.format(group_id, person_id) + response = self.session.delete(url=url) + + if response.status_code == 204: + logging.debug("First response of Remove Group member successful") + return True + else: + logging.warning( + "Something went wrong removing group member: {}".format( + response.status_code)) + + def get_group_roles(self, group_id: int): + """ + Get list of all roles for the given group + :param group_id: int: required group id + :return: list with group roles dicts + :rtype: list[dict] + """ + url = self.domain + '/api/groups/{}/roles'.format(group_id) + headers = { + 'accept': 'application/json' + } + response = self.session.get(url=url, headers=headers) + + if response.status_code == 200: + response_content = json.loads(response.content) + response_data = response_content['data'].copy() + logging.debug( + "First response of Group Roles successful {}".format(response_content)) + + return response_data + else: + logging.warning( + "Something went wrong fetching group roles: {}".format(response.status_code)) + + def add_parent_group(self, group_id: int, parent_group_id: int): + """ + Add a parent group for a group + :param group_id: int: required group id + :param parent_group_id: int: required parent group id + :return: True if successful + :rtype: bool + """ + url = self.domain + \ + '/api/groups/{}/parents/{}'.format(group_id, parent_group_id) + response = self.session.put(url=url) + + if response.status_code == 201: + logging.debug("First response of Add Parent Group successful") + return True + else: + logging.warning("Something went wrong adding parent group: {}".format( + response.status_code)) + + def remove_parent_group(self, group_id: int, parent_group_id: int): + """ + REmove a parent group from a group + :param group_id: int: required group id + :param parent_group_id: int: required parent group id + :return: True if successful + :rtype: bool + """ + url = self.domain + \ + '/api/groups/{}/parents/{}'.format(group_id, parent_group_id) + response = self.session.delete(url=url) + + if response.status_code == 204: + logging.debug("First response of Remove Parent Group successful") + return True + else: + logging.warning("Something went wrong removing parent group: {}".format( + response.status_code)) diff --git a/tests/test_churchtools_api.py b/tests/test_churchtools_api.py index ebeda19..27c9c3a 100644 --- a/tests/test_churchtools_api.py +++ b/tests/test_churchtools_api.py @@ -192,6 +192,18 @@ def test_get_groups_hierarchies(self): self.assertTrue('parents' in hierarchy) self.assertTrue('children' in hierarchy) + def test_get_group_statistics(self): + """ + Checks that the statistics for a group can be retrieved and certain keys + exist in the dict. + :return: + """ + stats = self.api.get_group_statistics(group_id=103) + self.assertIsNotNone(stats) + self.assertIn('unfiltered', stats) + self.assertIn('freePlaces', stats['unfiltered']) + self.assertIn('takenPlaces', stats['unfiltered']) + def test_get_grouptypes(self): """ 1. Check that the list of grouptypes can be retrieved and each element contains the keys 'id' and 'name'. @@ -227,6 +239,74 @@ def test_get_group_permissions(self): self.assertEqual(permissions['churchdb']['+see group'], 2) self.assertTrue(permissions['churchdb']['+edit group infos']) + def test_create_and_delete_group(self): + """ + IMPORTANT - This test method and the parameters used depend on the + target system! + 1. Checks if groups can be created with minimal and optional parameters. + 2. Checks if a group can be created with a name of an existing group + only when the 'force' parameter is set. + 3. Checks if groups can be deleted. + :return: + """ + grouptype_id = 2 + group_status_id = 1 + campus_id = 0 + group1 = self.api.create_group( + "TestGroup", group_status_id=group_status_id, grouptype_id=grouptype_id + ) + self.assertIsNotNone(group1) + self.assertEqual(group1['name'], "TestGroup") + self.assertEqual(group1['information']['groupTypeId'], grouptype_id) + self.assertEqual( + group1['information']['groupStatusId'], + group_status_id + ) + + group2 = self.api.create_group( + "TestGroup With Campus And Superior", + group_status_id=group_status_id, + grouptype_id=grouptype_id, + campus_id=campus_id, + superior_group_id=group1['id'] + ) + self.assertIsNotNone(group2) + self.assertEqual(group2['name'], "TestGroup With Campus And Superior") + self.assertEqual(group2['information']['groupTypeId'], grouptype_id) + self.assertEqual( + group2['information']['groupStatusId'], + group_status_id + ) + self.assertEqual(group2['information']['campusId'], campus_id) + + group3 = self.api.create_group( + "TestGroup", group_status_id=group_status_id, + grouptype_id=grouptype_id + ) + self.assertIsNone(group3) + + group3 = self.api.create_group( + "TestGroup", group_status_id=group_status_id, + grouptype_id=grouptype_id, + force=True + ) + self.assertIsNotNone(group3) + self.assertEqual(group3['name'], "TestGroup") + self.assertEqual(group3['information']['groupTypeId'], grouptype_id) + self.assertEqual( + group3['information']['groupStatusId'], + group_status_id + ) + + ret = self.api.delete_group(group_id=group1['id']) + self.assertTrue(ret) + + ret = self.api.delete_group(group_id=group2['id']) + self.assertTrue(ret) + + ret = self.api.delete_group(group_id=group3['id']) + self.assertTrue(ret) + def test_update_group(self): """ IMPORTANT - This test method and the parameters used depend on the target system! @@ -246,6 +326,90 @@ def test_update_group(self): group = self.api.get_groups(group_id=test_group_id) self.assertEqual(group['information']['note'], '') + def test_get_group_members(self): + """ + IMPORTANT - This test method and the parameters used depend on the target system! + Checks if group members can be retrieved from the group and filtering + for role ids works. + :return: + """ + test_group_id = 103 + grouptype_role_id = 2 + members = self.api.get_group_members(group_id=test_group_id) + self.assertIsNotNone(members) + self.assertNotEqual(members, []) + for member in members: + self.assertIn('personId', member) + + members = self.api.get_group_members( + group_id=test_group_id, + role_ids=[grouptype_role_id] + ) + self.assertIsNotNone(members) + self.assertNotEqual(members, []) + for member in members: + self.assertIn('personId', member) + self.assertEqual(member['groupTypeRoleId'], grouptype_role_id) + + def test_add_and_remove_group_members(self): + """ + IMPORTANT - This test method and the parameters used depend on the target system! + Checks if a group member can be added to and removed from a group. + :return: + """ + test_group_id = 103 + test_person_id = 1 + grouptype_role_id = 2 + member = self.api.add_group_member( + group_id=test_group_id, + person_id=test_person_id, + grouptype_role_id=grouptype_role_id, + group_member_status='active' + ) + self.assertIsNotNone(member) + self.assertEqual(member['personId'], test_person_id) + self.assertEqual(member['groupTypeRoleId'], grouptype_role_id) + self.assertEqual(member['groupMemberStatus'], 'active') + + ret = self.api.remove_group_member( + group_id=test_group_id, + person_id=test_person_id + ) + self.assertTrue(ret) + + def test_get_group_roles(self): + """ + Checks if group roles can be retrieved from a group + :return: + """ + test_group_id = 103 + roles = self.api.get_group_roles(group_id=test_group_id) + self.assertIsNotNone(roles) + self.assertNotEqual(roles, []) + for role in roles: + self.assertIn('id', role) + self.assertIn('groupTypeId', role) + self.assertIn('name', role) + + def test_add_and_remove_parent_group(self): + """ + Checks if a parent group can be added to and removed from a group + :return: + """ + test_group_id = 103 + test_parent_group_id = 104 + ret = self.api.add_parent_group( + group_id=test_group_id, + parent_group_id=test_parent_group_id + ) + self.assertTrue(ret) + + ret = self.api.remove_parent_group( + group_id=test_group_id, + parent_group_id=test_parent_group_id + ) + self.assertTrue(ret) + def test_get_global_permissions(self): """ IMPORTANT - This test method and the parameters used depend on the target system!