-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PCE-89 Update company name and invoice contact information in AWS #3
base: main
Are you sure you want to change the base?
Changes from all commits
a508782
252f42d
542db77
5b08012
96a75cc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should the override tags be documented ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Of course :-) Will add |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,62 +1,147 @@ | ||
import os | ||
import re | ||
|
||
import json | ||
import boto3 | ||
from botocore.exceptions import ClientError | ||
from functools import cache | ||
|
||
|
||
ORG_CLIENT = boto3.client('organizations') | ||
ACCOUNT_CLIENT = boto3.client('account') | ||
|
||
ALTERNATE_CONTACTS = { | ||
'SECURITY': json.loads(os.environ.get('security_alternate_contact')), | ||
'OPERATIONS': json.loads(os.environ.get('operations_alternate_contact')), | ||
'BILLING': json.loads(os.environ.get('billing_alternate_contact')) | ||
} | ||
PRIMARY_CONTACT = json.loads(os.environ.get('primary_contact')) | ||
|
||
ORG_CLIENT = boto3.client("organizations") | ||
ACCOUNT_CLIENT = boto3.client("account") | ||
MANAGEMENT_ACCOUNT_ID = os.environ.get('management_account_id') | ||
|
||
SEC_ALTERNATE_CONTACTS = os.environ.get("security_alternate_contact") | ||
BILL_ALTERNATE_CONTACTS = os.environ.get("operations_alternate_contact") | ||
OPS_ALTERNATE_CONTACTS = os.environ.get("billing_alternate_contact") | ||
MANAGEMENT_ACCOUNT_ID = os.environ.get("management_account_id") | ||
CONTACT_TAGS = { | ||
'OPERATIONS': 'tds:contact:operations', | ||
'SECURITY': 'tds:contact:security' | ||
} | ||
|
||
DEFAULT_CONTACT_EMAIL = 'support@basefarm-orange.com' | ||
|
||
CONTACTS = [] | ||
FAILED_ACCOUNTS = [] | ||
|
||
def list_accounts(client): | ||
response = client.list_accounts() | ||
accounts = [] | ||
while response: | ||
accounts += response["Accounts"] | ||
if "NextToken" in response: | ||
response = client.list_accounts(NextToken = response["NextToken"]) | ||
else: | ||
response = None | ||
return accounts | ||
|
||
|
||
def parse_contact_types(): | ||
CONTACT_LIST = [] | ||
for contact in [SEC_ALTERNATE_CONTACTS, BILL_ALTERNATE_CONTACTS, OPS_ALTERNATE_CONTACTS]: | ||
CONTACT_LIST = re.split("=|; ", contact) | ||
list_to_dict = {CONTACT_LIST[i]: CONTACT_LIST[i + 1] for i in range(0, len(CONTACT_LIST), 2)} | ||
CONTACTS.append(list_to_dict) | ||
|
||
|
||
def put_alternate_contact(accountId): | ||
for contact in CONTACTS: | ||
try: | ||
response = ACCOUNT_CLIENT.put_alternate_contact( | ||
AccountId=accountId, | ||
AlternateContactType=contact["CONTACT_TYPE"], | ||
EmailAddress=contact["EMAIL_ADDRESS"], | ||
Name=contact["CONTACT_NAME"], | ||
PhoneNumber=contact["PHONE_NUMBER"], | ||
Title=contact["CONTACT_TITLE"], | ||
) | ||
|
||
except ClientError as error: | ||
FAILED_ACCOUNTS.append(accountId) | ||
print(error) | ||
response = client.list_accounts() | ||
accounts = [] | ||
while response: | ||
accounts += response['Accounts'] | ||
if "NextToken" in response: | ||
response = client.list_accounts(NextToken = response['NextToken']) | ||
else: | ||
response = None | ||
return accounts | ||
|
||
@cache | ||
def list_parents(client, ChildId): | ||
return client.list_parents( ChildId=ChildId) | ||
|
||
@cache | ||
def list_tags_for_resource(client, ResourceId): | ||
tags = {} | ||
response = client.list_tags_for_resource( ResourceId = ResourceId ) | ||
if 'Tags' in response: | ||
tags = {t['Key']: t['Value'] for t in response['Tags']} | ||
return tags | ||
|
||
def get_ous(client, account_id): | ||
parent_list = [ account_id ] | ||
response = list_parents(client, ChildId=account_id ) | ||
while response['Parents'][0]['Type'] != 'ROOT': | ||
parent_list.append(response['Parents'][0]['Id']) | ||
response = list_parents(client, ChildId=response['Parents'][0]['Id']) | ||
parent_list.append(response['Parents'][0]['Id']) | ||
parent_list.reverse() | ||
return parent_list | ||
|
||
def get_tag_contacts(client, resources): | ||
contacts = { | ||
'SECURITY': DEFAULT_CONTACT_EMAIL, | ||
'OPERATIONS': DEFAULT_CONTACT_EMAIL | ||
} | ||
for ResourceId in resources: | ||
tags = list_tags_for_resource(client, ResourceId ) | ||
for tag in tags: | ||
for key, value in CONTACT_TAGS.items(): | ||
if tag.startswith(value): | ||
contacts[key] = tags[tag] | ||
return contacts | ||
|
||
def get_contacts(client, account_id): | ||
return get_tag_contacts(client, get_ous(client, account_id)) | ||
|
||
# Full name is a mandatory parameter and is uniq to every account | ||
def get_full_name(accountId): | ||
try: | ||
if accountId != MANAGEMENT_ACCOUNT_ID: | ||
response = ACCOUNT_CLIENT.get_contact_information(AccountId=accountId) | ||
else: | ||
response = ACCOUNT_CLIENT.get_contact_information() | ||
return response["ContactInformation"]["FullName"] | ||
except ClientError as error: | ||
FAILED_ACCOUNTS.append(accountId) | ||
raise error # Need to abort, do not want to continue with missing account name | ||
|
||
def update_alternate_contact(accountId): | ||
optional_contacts = get_contacts(ORG_CLIENT, accountId) | ||
|
||
for type, contact in ALTERNATE_CONTACTS.items(): | ||
try: | ||
account_contact = contact.copy() | ||
if type in optional_contacts: | ||
if optional_contacts[type] == '': | ||
delete_params = { | ||
'AlternateContactType': type | ||
} | ||
if accountId != MANAGEMENT_ACCOUNT_ID: | ||
delete_params['AccountId'] = accountId | ||
print("Delete alternate contact ", type) | ||
try: | ||
ACCOUNT_CLIENT.delete_alternate_contact(**delete_params) | ||
except ClientError as error: | ||
if error.response['Error']['Code'] == 'ResourceNotFoundException': | ||
pass | ||
else: | ||
raise error | ||
pass | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree. |
||
continue | ||
else: | ||
account_contact['EmailAddress'] = optional_contacts[type] | ||
if accountId != MANAGEMENT_ACCOUNT_ID: | ||
account_contact['AccountId'] = accountId | ||
print("Setting alternate contact ", type, "to ", account_contact) | ||
ACCOUNT_CLIENT.put_alternate_contact(**account_contact) | ||
except ClientError as error: | ||
FAILED_ACCOUNTS.append(accountId) | ||
print(error) | ||
pass | ||
|
||
def update_primary_contact(accountId): | ||
try: | ||
contact = PRIMARY_CONTACT.copy() | ||
contact["FullName"] = get_full_name(accountId) | ||
print("Setting primary contact to ", contact) | ||
if accountId != MANAGEMENT_ACCOUNT_ID: | ||
ACCOUNT_CLIENT.put_contact_information( | ||
AccountId = accountId, | ||
ContactInformation=contact) | ||
else: | ||
ACCOUNT_CLIENT.put_contact_information(ContactInformation=contact) | ||
except ClientError as error: | ||
FAILED_ACCOUNTS.append(accountId) | ||
print(error) | ||
pass | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This one too ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree, was in the original code. |
||
|
||
def lambda_handler(event, context): | ||
parse_contact_types() | ||
for account in list_accounts(ORG_CLIENT): | ||
if account["Status"] != "SUSPENDED" and account["Id"] != MANAGEMENT_ACCOUNT_ID: | ||
put_alternate_contact(account["Id"]) | ||
|
||
return ("Completed! Failed Accounts: ", FAILED_ACCOUNTS) | ||
for account in list_accounts(ORG_CLIENT): | ||
if account["Status"] != "SUSPENDED": | ||
print("Updating contact information for ", account["Id"]) | ||
update_primary_contact(account["Id"]) | ||
update_alternate_contact(account["Id"]) | ||
return (FAILED_ACCOUNTS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe the Usage section should be updated...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I think I must refactor the deployment in https://github.com/basefarm/bf-internal-security/tree/main/account. Move the contact administration to the master account.
This way we do not actually need the "delegated admin" support, thus there will be only one module, and usage must be updated accordingly.