forked from tjarrettveracode/veracode-mitigation-copier
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathMitigationCopier.py
313 lines (256 loc) · 14.5 KB
/
MitigationCopier.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
import sys
import argparse
import logging
import json
import datetime
import anticrlf
from veracode_api_py.api import VeracodeAPI as vapi, Applications, Findings
from veracode_api_py.constants import Constants
log = logging.getLogger(__name__)
def setup_logger():
handler = logging.FileHandler('MitigationCopier.log', encoding='utf8')
handler.setFormatter(anticrlf.LogFormatter('%(asctime)s - %(levelname)s - %(funcName)s - %(message)s'))
log = logging.getLogger(__name__)
log.addHandler(handler)
log.setLevel(logging.INFO)
def creds_expire_days_warning():
creds = vapi().get_creds()
exp = datetime.datetime.strptime(creds['expiration_ts'], "%Y-%m-%dT%H:%M:%S.%f%z")
delta = exp - datetime.datetime.now().astimezone() #we get a datetime with timezone...
if (delta.days < 7):
print('These API credentials expire ', creds['expiration_ts'])
def prompt_for_app(prompt_text):
appguid = ""
app_name_search = input(prompt_text)
app_candidates = Applications().get_by_name(app_name_search)
if len(app_candidates) == 0:
print("No matches were found!")
elif len(app_candidates) > 1:
print("Please choose an application:")
for idx, appitem in enumerate(app_candidates,start=1):
print("{}) {}".format(idx, appitem["profile"]["name"]))
i = input("Enter number: ")
try:
if 0 < int(i) <= len(app_candidates):
appguid = app_candidates[int(i)-1].get('guid')
except ValueError:
appguid = ""
else:
appguid = app_candidates[0].get('guid')
return appguid
def get_app_guid_from_legacy_id(app_id):
app = Applications().get(legacy_id=app_id)
if app is None:
return
return app['_embedded']['applications'][0]['guid']
def get_application_name(guid):
app = Applications().get(guid)
return app['profile']['name']
def get_findings_by_type(app_guid, scan_type='STATIC', sandbox_guid=None):
findings = []
if scan_type == 'STATIC':
findings = Findings().get_findings(app_guid,scantype=scan_type,annot='TRUE',sandbox=sandbox_guid)
elif scan_type == 'DYNAMIC':
findings = Findings().get_findings(app_guid,scantype=scan_type,annot='TRUE')
return findings
def logprint(log_msg):
log.info(log_msg)
print(log_msg)
def filter_approved(findings,id_list):
if id_list is not None:
log.info('Only copying the following findings provided in id_list: {}'.format(id_list))
findings = [f for f in findings if f['issue_id'] in id_list]
return [f for f in findings if (f['finding_status']['resolution_status'] == 'APPROVED')]
def filter_proposed(findings,id_list):
if id_list is not None:
log.info('Only copying the following findings provided in id_list: {}'.format(id_list))
findings = [f for f in findings if f['issue_id'] in id_list]
return [f for f in findings if (f['finding_status']['resolution_status'] == 'PROPOSED')]
def format_file_path(file_path):
# special case - omit prefix for teamcity work directories, which look like this:
# teamcity/buildagent/work/d2a72efd0db7f7d7
if file_path is None:
return ''
suffix_length = len(file_path)
buildagent_loc = file_path.find('teamcity/buildagent/work/')
if buildagent_loc > 0:
#strip everything starting with this prefix plus the 17 characters after
# (25 characters for find string, 16 character random hash value, plus / )
formatted_file_path = file_path[(buildagent_loc + 42):suffix_length]
else:
formatted_file_path = file_path
return formatted_file_path
def create_match_format_policy(app_guid, sandbox_guid, policy_findings, finding_type):
findings = []
if finding_type == 'STATIC':
thesefindings = [{'app_guid': app_guid,
'sandbox_guid': sandbox_guid,
'id': pf['issue_id'],
'resolution': pf['finding_status']['resolution'],
'cwe': pf['finding_details']['cwe']['id'],
'procedure': pf['finding_details'].get('procedure'),
'relative_location': pf['finding_details'].get('relative_location'),
'source_file': format_file_path(pf['finding_details'].get('file_path')),
'line': pf['finding_details'].get('file_line_number'),
'finding': pf} for pf in policy_findings]
findings.extend(thesefindings)
elif finding_type == 'DYNAMIC':
thesefindings = [{'app_guid': app_guid,
'id': pf['issue_id'],
'resolution': pf['finding_status']['resolution'],
'cwe': pf['finding_details']['cwe']['id'],
'path': pf['finding_details']['path'],
'vulnerable_parameter': pf['finding_details'].get('vulnerable_parameter',''), # vulnerable_parameter may not be populated for some info leak findings
'finding': pf} for pf in policy_findings]
findings.extend(thesefindings)
return findings
def format_application_name(guid, app_name, sandbox_guid=None):
if sandbox_guid is None:
formatted_name = 'application {} (guid: {})'.format(app_name,guid)
else:
formatted_name = 'sandbox {} in application {} (guid: {})'.format(sandbox_guid,app_name,guid)
return formatted_name
def update_mitigation_info_rest(to_app_guid,flaw_id,action,comment,sandbox_guid=None, propose_only=False):
# validate length of comment argument, gracefully handle overage
if len(comment) > 2048:
comment = comment[0:2048]
if action == 'CONFORMS' or action == 'DEVIATES':
log.warning('Cannot copy {} mitigation for Flaw ID {} in {}'.format(action,flaw_id,to_app_guid))
return
elif action == 'APPROVED' or action == 'PROPOSED':
if propose_only:
log.info('propose_only set to True; skipping applying approval for flaw_id {}'.format(flaw_id))
return
action = Constants.ANNOT_TYPE[action]
flaw_id_list = [flaw_id]
if sandbox_guid==None:
Findings().add_annotation(to_app_guid,flaw_id_list,comment,action)
else:
Findings().add_annotation(to_app_guid,flaw_id_list,comment,action,sandbox=sandbox_guid)
log.info(
'Updated mitigation information to {} for Flaw ID {} in {}'.format(action, str(flaw_id_list), to_app_guid))
def set_in_memory_flaw_to_approved(findings_to,to_id):
# use this function to update the status of target findings in memory, so that, if it is found
# as a match for multiple flaws, we only copy the mitigations once.
for finding in findings_to:
if all (k in finding for k in ("id", "finding")):
if (finding["id"] == to_id):
finding['finding']['finding_status']['resolution_status'] = 'APPROVED'
def set_in_memory_flaw_to_proposed(findings_to,to_id):
# use this function to update the status of target findings in memory, so that, if it is found
# as a match for multiple flaws, we only copy the mitigations once.
for finding in findings_to:
if all (k in finding for k in ("id", "finding")):
if (finding["id"] == to_id):
finding['finding']['finding_status']['resolution_status'] = 'PROPOSED'
def match_for_scan_type(from_app_guid, to_app_guid, dry_run, scan_type='STATIC',from_sandbox_guid=None,
to_sandbox_guid=None, propose_only=False, id_list=[], fuzzy_match=False):
results_from_app_name = get_application_name(from_app_guid)
formatted_from = format_application_name(from_app_guid,results_from_app_name,from_sandbox_guid)
logprint('Getting {} findings for {}'.format(scan_type.lower(),formatted_from))
findings_from = get_findings_by_type(from_app_guid,scan_type=scan_type, sandbox_guid=from_sandbox_guid)
count_from = len(findings_from)
logprint('Found {} {} findings in "from" {}'.format(count_from,scan_type.lower(),formatted_from))
if count_from == 0:
return 0 # no source findings to copy!
findings_from_approved = filter_approved(findings_from,id_list)
findings_from_proposed = filter_proposed(findings_from,id_list)
if len(findings_from_approved) == 0:
logprint('No approved findings in "from" {}. Exiting.'.format(formatted_from))
elif len(findings_from_proposed) == 0:
logprint('No proposed findings in "from" {}. Exiting.'.format(formatted_from))
return 0
results_to_app_name = get_application_name(to_app_guid)
formatted_to = format_application_name(to_app_guid,results_to_app_name,to_sandbox_guid)
logprint('Getting {} findings for {}'.format(scan_type.lower(),formatted_to))
findings_to = get_findings_by_type(to_app_guid,scan_type=scan_type, sandbox_guid=to_sandbox_guid)
count_to = len(findings_to)
logprint('Found {} {} findings in "to" {}'.format(count_to,scan_type.lower(),formatted_to))
if count_to == 0:
return 0 # no destination findings to mitigate!
# CREATE LIST OF UNIQUE VALUES FOR BUILD COPYING TO
copy_array_to = create_match_format_policy( app_guid=to_app_guid, sandbox_guid=to_sandbox_guid, policy_findings=findings_to,finding_type=scan_type)
# We'll return how many mitigations we applied
counter = 0
# look for a match for each finding in the TO list and apply mitigations of the matching flaw, if found
for this_to_finding in findings_to:
to_id = this_to_finding['issue_id']
if this_to_finding['finding_status']['resolution_status'] == 'APPROVED':
logprint ('Flaw ID {} in {} already has an accepted mitigation; skipped.'.format(to_id,formatted_to))
continue
elif this_to_finding['finding_status']['resolution_status'] == 'PROPOSED':
logprint ('Flaw ID {} in {} already has a proposed mitigation; skipped.'.format(to_id,formatted_to))
continue
match = Findings().match(this_to_finding,findings_from,approved_matches_only=False,allow_fuzzy_match=fuzzy_match)
if match == None:
log.info('No approved or proposed match found for finding {} in {}'.format(to_id,formatted_from))
continue
from_id = match.get('id')
log.info('Source flaw {} in {} has a possible target match in flaw {} in {}.'.format(from_id,formatted_from,to_id,formatted_to))
mitigation_list = ''
if match['finding'].get('annotations') == None:
logprint ('{} annotations for flaw ID {} in {}...'.format(len(mitigation_list),to_id,formatted_to))
else:
mitigation_list = match['finding']['annotations']
logprint ('Applying {} annotations for flaw ID {} in {}...'.format(len(mitigation_list),to_id,formatted_to))
for mitigation_action in reversed(mitigation_list): #findings API puts most recent action first
proposal_action = mitigation_action['action']
proposal_comment = '(COPIED FROM APP {}) {}'.format(from_app_guid, mitigation_action['comment'])
if not(dry_run):
update_mitigation_info_rest(to_app_guid, to_id, proposal_action, proposal_comment, to_sandbox_guid, propose_only)
set_in_memory_flaw_to_approved(copy_array_to,to_id) # so we don't attempt to mitigate approved finding twice
set_in_memory_flaw_to_proposed(copy_array_to,to_id) # so we don't attempt to mitigate proposed finding twice
counter += 1
print('[*] Updated {} flaws in {}. See log file for details.'.format(str(counter),formatted_to))
def main():
parser = argparse.ArgumentParser(
description='This script looks at the results set of the FROM APP. For any flaws that have an '
'accepted mitigation, it checks the TO APP to see if that flaw exists. If it exists, '
'it copies all mitigation information.')
parser.add_argument('-f', '--fromapp', help='App GUID to copy from')
parser.add_argument('-fs', '--fromsandbox', help='Sandbox GUID to copy from (optional)')
parser.add_argument('-t', '--toapp', help='App GUID to copy to')
parser.add_argument('-ts', '--tosandbox', help="Sandbox GUID to copy to (optional)")
parser.add_argument('-p', '--prompt', action='store_true', help='Specify to prompt for the applications to copy from and to.')
parser.add_argument('-d', '--dry_run', action='store_true', help="Log matched flaws instead of applying mitigations")
parser.add_argument('-l', '--legacy_ids',action='store_true', help='Use legacy Veracode app IDs instead of GUIDs')
parser.add_argument('-po', '--propose_only',action='store_true', help='Only propose mitigations, do not approve them')
parser.add_argument('-i','--id_list',nargs='*', help='Only copy mitigations for the flaws in the id_list')
parser.add_argument('-fm','--fuzzy_match',action='store_true', help='Look within a range of line numbers for a matching flaw')
args = parser.parse_args()
setup_logger()
logprint('======== beginning MitigationCopier.py run ========')
# CHECK FOR CREDENTIALS EXPIRATION
creds_expire_days_warning()
# SET VARIABLES FOR FROM AND TO APPS
results_from_app_id = args.fromapp
results_to_app_id = args.toapp
results_from_sandbox_id = args.fromsandbox
results_to_sandbox_id = args.tosandbox
prompt = args.prompt
dry_run = args.dry_run
legacy_ids = args.legacy_ids
propose_only = args.propose_only
id_list = args.id_list
fuzzy_match = args.fuzzy_match
if prompt:
results_from_app_id = prompt_for_app("Enter the application name to copy mitigations from: ")
results_to_app_id = prompt_for_app("Enter the application name to copy mitigations to: ")
# ignore Sandbox arguments in the Prompt case
results_from_sandbox_id = None
results_to_sandbox_id = None
if results_from_app_id in ( None, '' ) or results_to_app_id in ( None, '' ):
print('You must provide an application to copy mitigations to and from.')
return
if legacy_ids:
results_from = get_app_guid_from_legacy_id(results_from_app_id)
results_to = get_app_guid_from_legacy_id(results_to_app_id)
results_from_app_id = results_from
results_to_app_id = results_to
# get static findings and apply mitigations
match_for_scan_type(from_app_guid=results_from_app_id, to_app_guid=results_to_app_id, dry_run=dry_run, scan_type='STATIC',
from_sandbox_guid=results_from_sandbox_id,to_sandbox_guid=results_to_sandbox_id,propose_only=propose_only,id_list=id_list,fuzzy_match=fuzzy_match)
match_for_scan_type(from_app_guid=results_from_app_id, to_app_guid=results_to_app_id, dry_run=dry_run,
scan_type='DYNAMIC',propose_only=propose_only,id_list=id_list)
if __name__ == '__main__':
main()