Skip to content

Commit

Permalink
Fix bug to add exploits to GHSA-only vulns
Browse files Browse the repository at this point in the history
Before, exploits would not be added to vulnerabilities, whose
source is only the GHSA, even if some exploits were stored via the
vuln's CVE alias. This commit fixes that.
  • Loading branch information
ra1nb0rn committed Nov 8, 2024
1 parent 72d49db commit 056a1a1
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 36 deletions.
101 changes: 65 additions & 36 deletions search_vulns.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,51 @@ def is_more_specific_cpe_contained(vuln_cpe, cve_cpes):
return False


def retrieve_exploit_references(db_cursor, vuln_id, init_exploits=None, add_init_edbids=True, add_other_exploit_refs=True):
"""Retrieve exploit references for vuln_id and add to init_exploits"""

exploit_refs = [] if not init_exploits else init_exploits

if add_init_edbids and vuln_id.startswith('CVE-'):
db_cursor.execute('SELECT edb_ids FROM cve WHERE cve_id = ?', (vuln_id,))
edb_ids = db_cursor.fetchone()
if edb_ids and edb_ids[0]:
edb_ids = edb_ids[0].strip()
if edb_ids:
for edb_id in edb_ids.split(","):
exploit_refs.append("https://www.exploit-db.com/exploits/%s" % edb_id)

if add_other_exploit_refs and vuln_id.startswith('CVE-'):
# from NVD
query = 'SELECT exploit_ref FROM nvd_exploits_refs_view WHERE cve_id = ?'
nvd_exploit_refs = ''
db_cursor.execute(query, (vuln_id,))
if db_cursor:
nvd_exploit_refs = db_cursor.fetchall()
if nvd_exploit_refs:
for nvd_exploit_ref in nvd_exploit_refs:
if (nvd_exploit_ref[0] not in exploit_refs and
nvd_exploit_ref[0] + '/' not in exploit_refs and
nvd_exploit_ref[0][:-1] not in exploit_refs):
exploit_refs.append(nvd_exploit_ref[0])

# from PoC-in-Github
query = 'SELECT reference FROM cve_poc_in_github_map WHERE cve_id = ?'
poc_in_github_refs = ''
db_cursor.execute(query, (vuln_id,))
if db_cursor:
poc_in_github_refs = db_cursor.fetchall()
if poc_in_github_refs:
for poc_in_github_ref in poc_in_github_refs:
if (poc_in_github_ref[0] not in exploit_refs and
poc_in_github_ref[0] + '/' not in exploit_refs and
poc_in_github_ref[0][:-1] not in exploit_refs and
poc_in_github_ref[0] + '.git' not in exploit_refs):
exploit_refs.append(poc_in_github_ref[0])

return exploit_refs


def get_vuln_details(db_cursor, vulns, add_other_exploit_refs):
'''Collect more detailed information about the given vulns and return it'''

Expand All @@ -160,7 +205,7 @@ def get_vuln_details(db_cursor, vulns, add_other_exploit_refs):
if queried_info:
edb_ids, descr, publ, last_mod, cvss_ver, score, vector, cisa_known_exploited = queried_info
else:
edb_ids, publ, last_mod, cvss_ver, vector, cisa_known_exploited = '', '', '', '', '', False
publ, last_mod, cvss_ver, vector, cisa_known_exploited = '', '', '', '', False
score, descr = "-1.0", "NOT FOUND"
match_reason = "not_found"
if cvss_ver:
Expand All @@ -170,44 +215,20 @@ def get_vuln_details(db_cursor, vulns, add_other_exploit_refs):
"cvss": str(float(score)), "cvss_vec": vector, "vuln_match_reason": match_reason,
"cisa_known_exploited": bool(cisa_known_exploited), "aliases": [], "sources": [source]}

if not queried_info:
continue

# add exploit references
exploit_refs = []
edb_ids = edb_ids.strip()
if edb_ids:
detailed_vulns[vuln_id]["exploits"] = []
for edb_id in edb_ids.split(","):
detailed_vulns[vuln_id]["exploits"].append("https://www.exploit-db.com/exploits/%s" % edb_id)

# add other exploit references
if add_other_exploit_refs:
# from NVD
query = 'SELECT exploit_ref FROM nvd_exploits_refs_view WHERE cve_id = ?'
nvd_exploit_refs = ''
db_cursor.execute(query, (vuln_id,))
if db_cursor:
nvd_exploit_refs = db_cursor.fetchall()
if nvd_exploit_refs:
if "exploits" not in detailed_vulns[vuln_id]:
detailed_vulns[vuln_id]["exploits"] = []
for nvd_exploit_ref in nvd_exploit_refs:
if (nvd_exploit_ref[0] not in detailed_vulns[vuln_id]["exploits"] and
nvd_exploit_ref[0] + '/' not in detailed_vulns[vuln_id]["exploits"] and
nvd_exploit_ref[0][:-1] not in detailed_vulns[vuln_id]["exploits"]):
detailed_vulns[vuln_id]["exploits"].append(nvd_exploit_ref[0])

# from PoC-in-Github
query = 'SELECT reference FROM cve_poc_in_github_map WHERE cve_id = ?'
poc_in_github_refs = ''
db_cursor.execute(query, (vuln_id,))
if db_cursor:
poc_in_github_refs = db_cursor.fetchall()
if poc_in_github_refs:
if "exploits" not in detailed_vulns[vuln_id]:
detailed_vulns[vuln_id]["exploits"] = []
for poc_in_github_ref in poc_in_github_refs:
if (poc_in_github_ref[0] not in detailed_vulns[vuln_id]["exploits"] and
poc_in_github_ref[0] + '/' not in detailed_vulns[vuln_id]["exploits"] and
poc_in_github_ref[0][:-1] not in detailed_vulns[vuln_id]["exploits"] and
poc_in_github_ref[0] + '.git' not in detailed_vulns[vuln_id]["exploits"]):
detailed_vulns[vuln_id]["exploits"].append(poc_in_github_ref[0])
exploit_refs.append("https://www.exploit-db.com/exploits/%s" % edb_id)
exploit_refs = retrieve_exploit_references(db_cursor, vuln_id, exploit_refs, False, add_other_exploit_refs)

if exploit_refs:
detailed_vulns[vuln_id]["exploits"] = exploit_refs

elif source == 'ghsa':
query = 'SELECT aliases, description, published, last_modified, cvss_version, base_score, vector FROM ghsa WHERE ghsa_id = ?'
db_cursor.execute(query, (vuln_id,))
Expand All @@ -229,6 +250,14 @@ def get_vuln_details(db_cursor, vulns, add_other_exploit_refs):
"cvss": str(float(score)), "cvss_vec": vector, "vuln_match_reason": match_reason,
"aliases": aliases, "sources": [source]}

# add exploit references
exploit_refs = []
for alias_vuln_id in aliases:
exploit_refs = retrieve_exploit_references(db_cursor, alias_vuln_id, exploit_refs, True, add_other_exploit_refs)

if exploit_refs:
detailed_vulns[vuln_id]['exploits'] = exploit_refs

return detailed_vulns


Expand Down
13 changes: 13 additions & 0 deletions tests/test_exploit_completeness.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,19 @@ def test_search_hitachi_replication_manager_86500(self):
result_exploits.append(exploit)
self.assertEqual(set(expected_exploits), set(result_exploits))

def test_search_ghsa_only(self):
self.maxDiff = None
query = 'GHSA-6c3j-c64m-qhgq'
result = search_vulns.search_vulns(query=query, add_other_exploit_refs=True)
expected_exploits = ['https://snyk.io/vuln/SNYK-JS-JQUERY-174006', 'https://github.com/DanielRuf/snyk-js-jquery-174006', 'https://github.com/DanielRuf/snyk-js-jquery-565129', 'https://github.com/Snorlyd/https-nj.gov---CVE-2019-11358', 'https://github.com/bitnesswise/jquery-prototype-pollution-fix', 'https://github.com/chrisneagu/FTC-Skystone-Dark-Angels-Romania-2020', 'https://github.com/isacaya/CVE-2019-11358']
result_exploits = []
for cve in result[query]['vulns']:
data = result[query]['vulns'].get(cve)
if 'exploits' in data:
for exploit in data['exploits']:
result_exploits.append(exploit)
self.assertEqual(set(expected_exploits), set(result_exploits))


if __name__ == '__main__':
unittest.main()

0 comments on commit 056a1a1

Please sign in to comment.