-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathacm_handler.py
151 lines (125 loc) · 4.86 KB
/
acm_handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# MIT Licensed, Copyright (c) 2015 Ryan Scott Brown <sb@ryansb.com>
import boto3
import hashlib
import logging
import time
from botocore.exceptions import ClientError
import cfn_resource
log = logging.getLogger()
log.setLevel(logging.DEBUG)
acm = boto3.client('acm')
def await_validation(domain, context):
# as long as we have at least 10 seconds left
while context.get_remaining_time_in_millis() > 10000:
time.sleep(5)
resp = acm.list_certificates(CertificateStatuses=['ISSUED'])
if any(cert['DomainName'] == domain for cert in resp['CertificateSummaryList']):
cert_info = [cert for cert in resp['CertificateSummaryList']
if cert['DomainName'] == domain][0]
log.info("Certificate has been issued for domain %s, ARN: %s" %
(domain, cert_info['CertificateArn']))
return cert_info['CertificateArn']
log.info("Awaiting cert for domain %s" % domain)
log.warning("Timed out waiting for cert for domain %s" % domain)
def check_properties(event):
properties = event['ResourceProperties']
for p in ('Domains', ):
if properties.get(p) is None:
reason = "ERROR: No property '%s' on event %s" % (p, event)
log.error(reason)
return {
'Status': 'FAILED',
'Reason': reason,
'PhysicalResourceId': 'could-not-create',
'Data': {},
}
dom = properties.get('Domains', list)
log.info("Got domains %s" % dom)
if not (isinstance(dom, list) and len(dom) >= 1):
reason = "ERROR: Domains is not a list in event %s" % event
log.error(reason)
return {
'Status': 'FAILED',
'Reason': reason,
'PhysicalResourceId': 'could-not-create',
'Data': {},
}
handler = cfn_resource.Resource()
@handler.create
def create_cert(event, context):
props = event['ResourceProperties']
prop_errors = check_properties(event)
if prop_errors:
return prop_errors
domains = props['Domains']
# take a hash of the Stack & resource ID to make a request token
id_token = hashlib.md5('cfn-{StackId}-{LogicalResourceId}'.format(
**event)).hexdigest()
kwargs = {
'DomainName': domains[0],
# the idempotency token length limit is 31 characters
'IdempotencyToken': id_token[:30]
}
if len(domains) > 1:
# add alternative names if the user wants more names
# wildcards are allowed
kwargs['SubjectAlternativeNames'] = domains[1:]
if props.get('ValidationOptions'):
# TODO validate format of this parameter
""" List of domain validation options. Looks like:
{
"DomainName": "test.foo.com",
"ValidationDomain": "foo.com",
}
"""
kwargs['DomainValidationOptions'] = props.get('ValidationOptions')
response = acm.request_certificate(**kwargs)
if props.get('Await', False):
await_validation(domains[0], context)
return {
'Status': 'SUCCESS',
'Reason': 'Cert request created successfully',
'PhysicalResourceId': response['CertificateArn'],
'Data': {},
}
@handler.update
def update_certificate(event, context):
props = event['ResourceProperties']
prop_errors = check_properties(event)
if prop_errors:
return prop_errors
domains = props['Domains']
arn = event['PhysicalResourceId']
if not arn.startswith('arn:aws:acm:'):
return create_cert(event, context)
try:
cert = acm.describe_certificate(CertificateArn=arn)
except ClientError:
# cert doesn't exist! make it
return create_cert(event, context)
if cert['Certificate']['Status'] == 'PENDING_VALIDATION' and props.get('Await', False):
# cert isn't yet valid, wait as long as we can until it is
await_validation(domains[0], context)
if sorted(domains) != sorted(cert['Certificate']['SubjectAlternativeNames']):
# domain names have changed, need to delete & rebuild
try:
acm.delete_certificate(CertificateArn=event['PhysicalResourceId'])
except:
log.exception('Failure deleting cert with arn %s' % event['PhysicalResourceId'])
return create_cert(event, context)
return {'Status': 'SUCCESS',
'Reason': 'Nothing to do, we think',
'Data': {}
}
@handler.delete
def delete_certificate(event, context):
resp = {'Status': 'SUCCESS',
'PhysicalResourceId': event['PhysicalResourceId'],
'Data': {},
}
try:
acm.delete_certificate(CertificateArn=event['PhysicalResourceId'])
except:
log.exception('Failure deleting cert with arn %s' % event['PhysicalResourceId'])
resp['Reason'] = 'Some exception was raised while deleting the cert'
return resp