From 056a1a1a10914d625ff3b7285e5dd61ad5bd251b Mon Sep 17 00:00:00 2001 From: Dustin Born Date: Sat, 9 Nov 2024 00:30:04 +0100 Subject: [PATCH] Fix bug to add exploits to GHSA-only vulns Before, exploits would not be added to vulnerabilities, whose source is only the GHSA, even if some exploits were stored via the vuln's CVE alias. This commit fixes that. --- search_vulns.py | 101 +++++++++++++++++++---------- tests/test_exploit_completeness.py | 13 ++++ 2 files changed, 78 insertions(+), 36 deletions(-) diff --git a/search_vulns.py b/search_vulns.py index 94e592f..da99fba 100755 --- a/search_vulns.py +++ b/search_vulns.py @@ -144,6 +144,51 @@ def is_more_specific_cpe_contained(vuln_cpe, cve_cpes): return False +def retrieve_exploit_references(db_cursor, vuln_id, init_exploits=None, add_init_edbids=True, add_other_exploit_refs=True): + """Retrieve exploit references for vuln_id and add to init_exploits""" + + exploit_refs = [] if not init_exploits else init_exploits + + if add_init_edbids and vuln_id.startswith('CVE-'): + db_cursor.execute('SELECT edb_ids FROM cve WHERE cve_id = ?', (vuln_id,)) + edb_ids = db_cursor.fetchone() + if edb_ids and edb_ids[0]: + edb_ids = edb_ids[0].strip() + if edb_ids: + for edb_id in edb_ids.split(","): + exploit_refs.append("https://www.exploit-db.com/exploits/%s" % edb_id) + + if add_other_exploit_refs and vuln_id.startswith('CVE-'): + # from NVD + query = 'SELECT exploit_ref FROM nvd_exploits_refs_view WHERE cve_id = ?' + nvd_exploit_refs = '' + db_cursor.execute(query, (vuln_id,)) + if db_cursor: + nvd_exploit_refs = db_cursor.fetchall() + if nvd_exploit_refs: + for nvd_exploit_ref in nvd_exploit_refs: + if (nvd_exploit_ref[0] not in exploit_refs and + nvd_exploit_ref[0] + '/' not in exploit_refs and + nvd_exploit_ref[0][:-1] not in exploit_refs): + exploit_refs.append(nvd_exploit_ref[0]) + + # from PoC-in-Github + query = 'SELECT reference FROM cve_poc_in_github_map WHERE cve_id = ?' + poc_in_github_refs = '' + db_cursor.execute(query, (vuln_id,)) + if db_cursor: + poc_in_github_refs = db_cursor.fetchall() + if poc_in_github_refs: + for poc_in_github_ref in poc_in_github_refs: + if (poc_in_github_ref[0] not in exploit_refs and + poc_in_github_ref[0] + '/' not in exploit_refs and + poc_in_github_ref[0][:-1] not in exploit_refs and + poc_in_github_ref[0] + '.git' not in exploit_refs): + exploit_refs.append(poc_in_github_ref[0]) + + return exploit_refs + + def get_vuln_details(db_cursor, vulns, add_other_exploit_refs): '''Collect more detailed information about the given vulns and return it''' @@ -160,7 +205,7 @@ def get_vuln_details(db_cursor, vulns, add_other_exploit_refs): if queried_info: edb_ids, descr, publ, last_mod, cvss_ver, score, vector, cisa_known_exploited = queried_info else: - edb_ids, publ, last_mod, cvss_ver, vector, cisa_known_exploited = '', '', '', '', '', False + publ, last_mod, cvss_ver, vector, cisa_known_exploited = '', '', '', '', False score, descr = "-1.0", "NOT FOUND" match_reason = "not_found" if cvss_ver: @@ -170,44 +215,20 @@ def get_vuln_details(db_cursor, vulns, add_other_exploit_refs): "cvss": str(float(score)), "cvss_vec": vector, "vuln_match_reason": match_reason, "cisa_known_exploited": bool(cisa_known_exploited), "aliases": [], "sources": [source]} + if not queried_info: + continue + + # add exploit references + exploit_refs = [] edb_ids = edb_ids.strip() if edb_ids: - detailed_vulns[vuln_id]["exploits"] = [] for edb_id in edb_ids.split(","): - detailed_vulns[vuln_id]["exploits"].append("https://www.exploit-db.com/exploits/%s" % edb_id) - - # add other exploit references - if add_other_exploit_refs: - # from NVD - query = 'SELECT exploit_ref FROM nvd_exploits_refs_view WHERE cve_id = ?' - nvd_exploit_refs = '' - db_cursor.execute(query, (vuln_id,)) - if db_cursor: - nvd_exploit_refs = db_cursor.fetchall() - if nvd_exploit_refs: - if "exploits" not in detailed_vulns[vuln_id]: - detailed_vulns[vuln_id]["exploits"] = [] - for nvd_exploit_ref in nvd_exploit_refs: - if (nvd_exploit_ref[0] not in detailed_vulns[vuln_id]["exploits"] and - nvd_exploit_ref[0] + '/' not in detailed_vulns[vuln_id]["exploits"] and - nvd_exploit_ref[0][:-1] not in detailed_vulns[vuln_id]["exploits"]): - detailed_vulns[vuln_id]["exploits"].append(nvd_exploit_ref[0]) - - # from PoC-in-Github - query = 'SELECT reference FROM cve_poc_in_github_map WHERE cve_id = ?' - poc_in_github_refs = '' - db_cursor.execute(query, (vuln_id,)) - if db_cursor: - poc_in_github_refs = db_cursor.fetchall() - if poc_in_github_refs: - if "exploits" not in detailed_vulns[vuln_id]: - detailed_vulns[vuln_id]["exploits"] = [] - for poc_in_github_ref in poc_in_github_refs: - if (poc_in_github_ref[0] not in detailed_vulns[vuln_id]["exploits"] and - poc_in_github_ref[0] + '/' not in detailed_vulns[vuln_id]["exploits"] and - poc_in_github_ref[0][:-1] not in detailed_vulns[vuln_id]["exploits"] and - poc_in_github_ref[0] + '.git' not in detailed_vulns[vuln_id]["exploits"]): - detailed_vulns[vuln_id]["exploits"].append(poc_in_github_ref[0]) + exploit_refs.append("https://www.exploit-db.com/exploits/%s" % edb_id) + exploit_refs = retrieve_exploit_references(db_cursor, vuln_id, exploit_refs, False, add_other_exploit_refs) + + if exploit_refs: + detailed_vulns[vuln_id]["exploits"] = exploit_refs + elif source == 'ghsa': query = 'SELECT aliases, description, published, last_modified, cvss_version, base_score, vector FROM ghsa WHERE ghsa_id = ?' db_cursor.execute(query, (vuln_id,)) @@ -229,6 +250,14 @@ def get_vuln_details(db_cursor, vulns, add_other_exploit_refs): "cvss": str(float(score)), "cvss_vec": vector, "vuln_match_reason": match_reason, "aliases": aliases, "sources": [source]} + # add exploit references + exploit_refs = [] + for alias_vuln_id in aliases: + exploit_refs = retrieve_exploit_references(db_cursor, alias_vuln_id, exploit_refs, True, add_other_exploit_refs) + + if exploit_refs: + detailed_vulns[vuln_id]['exploits'] = exploit_refs + return detailed_vulns diff --git a/tests/test_exploit_completeness.py b/tests/test_exploit_completeness.py index 94bf095..2a2184a 100755 --- a/tests/test_exploit_completeness.py +++ b/tests/test_exploit_completeness.py @@ -88,6 +88,19 @@ def test_search_hitachi_replication_manager_86500(self): result_exploits.append(exploit) self.assertEqual(set(expected_exploits), set(result_exploits)) + def test_search_ghsa_only(self): + self.maxDiff = None + query = 'GHSA-6c3j-c64m-qhgq' + result = search_vulns.search_vulns(query=query, add_other_exploit_refs=True) + expected_exploits = ['https://snyk.io/vuln/SNYK-JS-JQUERY-174006', 'https://github.com/DanielRuf/snyk-js-jquery-174006', 'https://github.com/DanielRuf/snyk-js-jquery-565129', 'https://github.com/Snorlyd/https-nj.gov---CVE-2019-11358', 'https://github.com/bitnesswise/jquery-prototype-pollution-fix', 'https://github.com/chrisneagu/FTC-Skystone-Dark-Angels-Romania-2020', 'https://github.com/isacaya/CVE-2019-11358'] + result_exploits = [] + for cve in result[query]['vulns']: + data = result[query]['vulns'].get(cve) + if 'exploits' in data: + for exploit in data['exploits']: + result_exploits.append(exploit) + self.assertEqual(set(expected_exploits), set(result_exploits)) + if __name__ == '__main__': unittest.main()