This repository has been archived by the owner on Nov 14, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathstart_vulnerability_scan.py
100 lines (90 loc) · 3.84 KB
/
start_vulnerability_scan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
def start_cve_scan(api_url, api_key):
# Auth
default_headers = {"Content-Type": "application/json"}
auth_response = requests.post("{0}/users/auth".format(api_url), json={"api_key": api_key}, headers=default_headers,
verify=False).json()
if auth_response["success"]:
print("Authentication successful")
else:
print("Authentication failed")
return
default_headers["Authorization"] = "Bearer " + auth_response["data"]["access_token"]
# Enumerate nodes
enumerate_response = requests.post(
"{0}/enumerate".format(api_url),
json={"filters": {"type": ["host", "container_image"], "pseudo": False}, "size": 5000},
headers=default_headers, verify=False).json()
nodes_list = []
counter = 1
enumerate_response_nodes = enumerate_response.get("data", {}).get("data", [])
if not enumerate_response_nodes:
print("No nodes found")
return
for node in enumerate_response_nodes:
if node["type"] == "container_image":
node_name = "{0} (container_image)".format(node.get("image_name_with_tag", ""))
else:
node_name = "{0} (host)".format(node.get("host_name", ""))
print("{0}: {1}".format(counter, node_name))
nodes_list.append(
{
"id": node["id"],
"node_name": node_name,
"node_type": node["type"],
"scope_id": node["scope_id"]
})
counter += 1
print("\nEnter comma separated list of node numbers to start vulnerability scan. Eg: 1,3,4")
print("Enter \"all\" (without quotes) to start vulnerability scan on all nodes\n")
user_input = input("-->").split(",")
if "all" in user_input:
nodes_selected = nodes_list
else:
nodes_selected = []
for user_input_no in user_input:
try:
nodes_selected.append(nodes_list[int(user_input_no) - 1])
except:
pass
host_scope_ids = []
container_image_scope_ids = []
for node_delected in nodes_selected:
if node_delected["node_type"] == "host":
host_scope_ids.append(node_delected["scope_id"])
elif node_delected["node_type"] == "container_image":
container_image_scope_ids.append(node_delected["scope_id"])
if host_scope_ids:
print("\nStarting vulnerability scan on hosts")
print(requests.post(
"{0}/node_action".format(api_url),
json={
"action": "cve_scan_start",
"node_type": "host",
"node_id_list": host_scope_ids,
"action_args": {
"scan_type": ["base", "java", "python", "ruby", "php", "javascript", "rust", "golang", "dotnet"]
}
},
headers=default_headers, verify=False).json())
if container_image_scope_ids:
print("\nStarting vulnerability scan on container images")
print(requests.post(
"{0}/node_action".format(api_url),
json={
"action": "cve_scan_start",
"node_type": "container_image",
"node_id_list": container_image_scope_ids,
"action_args": {
"scan_type": ["base", "java", "python", "ruby", "php", "javascript", "rust", "golang", "dotnet"]
}
},
headers=default_headers, verify=False).json())
if __name__ == '__main__':
import sys
if len(sys.argv) != 3:
print("Usage: python3 start_vulnerability_scan.py <mgmt_console_ip_address> <api_key>")
exit(1)
start_cve_scan("https://{0}/deepfence/v1.5".format(sys.argv[1]), sys.argv[2])