-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathvcpipemit.py
205 lines (157 loc) · 8.1 KB
/
vcpipemit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
import sys
import argparse
import logging
import datetime
import os
import json
from uuid import UUID
import anticrlf
from veracode_api_py import VeracodeAPI as vapi, Applications, Findings
LINE_NUMBER_SLOP = 3 #adjust to allow for line number movement
log = logging.getLogger(__name__)
ALLOWED_EXTENSIONS = set(['json'])
def setup_logger():
handler = logging.FileHandler('vcpipmit.log', encoding='utf8')
handler.setFormatter(anticrlf.LogFormatter('%(asctime)s - %(levelname)s - %(funcName)s - %(message)s'))
logger = logging.getLogger(__name__)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
def creds_expire_days_warning():
creds = vapi().get_creds()
exp = datetime.datetime.strptime(creds['expiration_ts'], "%Y-%m-%dT%H:%M:%S.%f%z")
delta = exp - datetime.datetime.now().astimezone() #we get a datetime with timezone...
if (delta.days < 7):
print('These API credentials expire ', creds['expiration_ts'])
def prompt_for_app(prompt_text):
appguid = ""
app_name_search = input(prompt_text)
app_candidates = vapi().get_app_by_name(app_name_search)
if len(app_candidates) == 0:
print("No matches were found!")
elif len(app_candidates) > 1:
print("Please choose an application:")
for idx, appitem in enumerate(app_candidates,start=1):
print("{}) {}".format(idx, appitem["profile"]["name"]))
i = input("Enter number: ")
try:
if 0 < int(i) <= len(app_candidates):
appguid = app_candidates[int(i)-1].get('guid')
except ValueError:
appguid = ""
else:
appguid = app_candidates[0].get('guid')
return appguid
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def is_valid_uuid(uuid_to_test, version=4):
try:
uuid_obj = UUID(uuid_to_test, version=version)
except ValueError:
return False
return str(uuid_obj) == uuid_to_test
def get_app_findings(appguid,sandboxguid=None):
status = "Getting findings for application {}".format(appguid)
print(status)
log.info(status)
request_params = {'scan_type': 'STATIC'}
all_findings = Findings().get_findings(app=appguid, annot=True, request_params=request_params, sandbox=sandboxguid)
log.info('Got {} findings for app guid {} and sandbox guid {}'.format(len(all_findings),appguid,sandboxguid))
return all_findings
def get_mitigated_findings(all_findings):
return list(filter(lambda finding: finding['finding_status']['resolution_status'] == 'APPROVED', all_findings))
def get_results_findings(results_file):
rfindings = []
with open(results_file) as f:
data = json.load(f)
rfindings.extend(data.get('findings',[]))
log.info('The results file {} contains {} findings'.format(results_file, len(rfindings)))
return rfindings
def create_match_format_pipeline(pipeline_findings):
# thisf['cwe'] = int(bf['cwe_id'])
# thisf['source_file'] = bf['files']['source_file']['file']
# thisf['function_name'] = bf['files']['source_file']['function_name']
# thisf['function_prototype'] = bf['files']['source_file']['function_name']
# thisf['line'] = bf['files']['source_file']['line']
# thisf['qualified_function_name'] = bf['files']['source_file']['qualified_function_name']
# thisf['scope'] = bf['files']['scope']
return [{'cwe': int(pf['cwe_id']), 'source_file': pf['files']['source_file']['file'], 'line': pf['files']['source_file']['line'] } for pf in pipeline_findings]
def create_match_format_policy(policy_findings):
return [{'id': pf['issue_id'],
'resolution': pf['finding_status']['resolution'],
'cwe': pf['finding_details']['cwe']['id'],
'source_file': pf['finding_details']['file_path'],
'line': pf['finding_details']['file_line_number']} for pf in policy_findings]
def get_matched_findings(appguid, mitigated_findings, pipeline_findings, sandboxguid=None):
candidate_findings = []
mitigated_index = create_match_format_policy(mitigated_findings)
for thisf in mitigated_index:
# we allow for some movement of the line number in the pipeline scan findings relative to the mitigated finding as the code may
# have changed. adjust LINE_NUMBER_SLOP for a more or less precise match, but don't broaden too far or you might match the wrong
# finding.
match = next((pf for pf in pipeline_findings if ((thisf['cwe'] == int(pf['cwe_id'])) &
(thisf['source_file'].find(pf['files']['source_file']['file']) > -1 ) &
((pf['files']['source_file']['line'] - LINE_NUMBER_SLOP) <= thisf['line'] <= (pf['files']['source_file']['line'] + LINE_NUMBER_SLOP)))), None)
if match != None:
match['origin'] = { 'source_app': appguid, 'source_id': thisf['id'], 'resolution': thisf['resolution'],'comment': 'Migrated from mitigated policy or sandbox finding'}
candidate_findings.append(match)
log.debug('Matched pipeline finding {} to mitigated finding {}'.format(match['issue_id'],thisf['id']))
return candidate_findings
def process_matched_findings(baselinefilename, matched_findings):
# write matched findings to new baseline file
bfcontent = {'findings': matched_findings}
with open(baselinefilename, "w", newline='') as f:
f.write(json.dumps(bfcontent, indent=4))
f.close()
def get_app_by_name(appname, verbose):
applications = Applications().get_by_name(appname)
if (verbose):
print(applications)
if applications:
for index in range(len(applications)):
if applications[index]["profile"]["name"] == appname:
return applications[index]["guid"]
print(f"ERROR: unable to find application named {appname}")
sys.exit(1)
def main():
parser = argparse.ArgumentParser(
description='This script lists modules in which static findings were identified.')
parser.add_argument('-a', '--applicationguid', help='Applications guid from which to retrieve mitigated findings')
parser.add_argument('-an', '--applicationname', help='Applications name from which to retrieve mitigated findings')
parser.add_argument('-p', '--prompt', action='store_true', help='Prompt for application using partial match search')
parser.add_argument('-rf', '--results', help='Location of a Pipeline Scan results file from which the baseline file will be created.', required=True)
parser.add_argument('-s', '--sandboxguid', help='Sandbox guid from which to retrieve mitigated findings in the application specified above')
parser.add_argument('-of', '--outputfilename', help='Name for the file to generate')
parser.add_argument('-d', '--debug')
args = parser.parse_args()
appguid = args.applicationguid
appname = args.applicationname
rf = args.results
sandboxguid = args.sandboxguid
prompt = args.prompt
outputfilename = args.outputfilename
verbose = args.debug
setup_logger()
# CHECK FOR CREDENTIALS EXPIRATION
creds_expire_days_warning()
if not(allowed_file(rf)):
print('{} is an invalid filename. --results must point to a json file.')
return
if prompt:
appguid = prompt_for_app('Enter the application name from which to copy mitigations: ')
if appname:
appguid = get_app_by_name(appname, verbose)
if not(is_valid_uuid(appguid)):
print('{} is an invalid application guid. Please supply a valid UUID.'.format(appguid))
return
if not(outputfilename):
outputfilename = 'baseline-{}.json'.format(appguid)
all_findings = get_app_findings(appguid, sandboxguid)
mitigated_findings = get_mitigated_findings(all_findings)
pipeline_findings = get_results_findings(rf)
matched_findings = get_matched_findings(appguid, mitigated_findings, pipeline_findings, sandboxguid)
process_matched_findings(outputfilename, matched_findings)
status = "Processed {} matched findings. See log file for details".format(len(mitigated_findings))
print(status)
log.info(status)
if __name__ == '__main__':
main()