From 1da00fba3ba87fcc66349fa88ec900821c1ed00c Mon Sep 17 00:00:00 2001 From: Arthur Chan Date: Mon, 17 Jun 2024 12:45:18 +0000 Subject: [PATCH 1/2] Add typing to routes.py Signed-off-by: Arthur Chan --- .../app/webapp/routes.py | 157 ++++++++++-------- 1 file changed, 84 insertions(+), 73 deletions(-) diff --git a/tools/web-fuzzing-introspection/app/webapp/routes.py b/tools/web-fuzzing-introspection/app/webapp/routes.py index 31c7899e0..63ea51149 100644 --- a/tools/web-fuzzing-introspection/app/webapp/routes.py +++ b/tools/web-fuzzing-introspection/app/webapp/routes.py @@ -19,14 +19,12 @@ import json import signal -from flask import Blueprint, render_template, request, redirect - -#from app.site import models -from . import models +from flask import Blueprint, render_template, request, redirect, BaseResponse +from typing import Dict, List, Tuple, Any, Optional, Callable, Union +from . import models, data_storage # Use these during testing. #from app.site import test_data -from . import data_storage blueprint = Blueprint('site', __name__, template_folder='templates') @@ -36,23 +34,25 @@ local_oss_fuzz = '' -def get_introspector_report_url_base(project_name, datestr): +def get_introspector_report_url_base(project_name: str, datestr: str) -> str: base_url = 'https://storage.googleapis.com/oss-fuzz-introspector/{0}/inspector-report/{1}/' project_url = base_url.format(project_name, datestr.replace("-", "")) return project_url -def get_introspector_report_url_source_base(project_name, datestr): +def get_introspector_report_url_source_base(project_name: str, + datestr: str) -> str: return get_introspector_report_url_base(project_name, datestr) + "source-code" -def get_introspector_url(project_name, datestr): +def get_introspector_url(project_name: str, datestr: str) -> str: return get_introspector_report_url_base(project_name, datestr) + "fuzz_report.html" -def get_coverage_report_url(project_name, datestr, language): +def get_coverage_report_url(project_name: str, datestr: str, + language: str) -> str: if language == 'java' or language == 'python' or language == 'go': file_report = "index.html" else: @@ -63,8 +63,8 @@ def get_coverage_report_url(project_name, datestr, language): return project_url -def extract_introspector_raw_source_code(project_name, date_str, target_file): - +def extract_introspector_raw_source_code(project_name: str, date_str: str, + target_file: str) -> Optional[str]: if is_local: src_location = os.path.join(local_oss_fuzz, 'build', 'out', project_name, 'inspector', @@ -87,13 +87,14 @@ def extract_introspector_raw_source_code(project_name, date_str, target_file): return raw_source -def extract_lines_from_source_code(project_name, - date_str, - target_file, - line_begin, - line_end, - print_line_numbers=False, - sanity_check_function_end=False): +def extract_lines_from_source_code( + project_name: str, + date_str: str, + target_file: str, + line_begin: int, + line_end: int, + print_line_numbers: bool = False, + sanity_check_function_end: bool = False) -> Optional[str]: raw_source = extract_introspector_raw_source_code(project_name, date_str, target_file) if raw_source is None: @@ -147,7 +148,7 @@ def extract_lines_from_source_code(project_name, return return_source -def get_functions_of_interest(project_name): +def get_functions_of_interest(project_name: str) -> List[models.Function]: all_functions = data_storage.get_functions() all_functions = all_functions + data_storage.get_constructors() @@ -169,7 +170,7 @@ def get_functions_of_interest(project_name): return sorted_functions_of_interest -def get_frontpage_summary_stats(): +def get_frontpage_summary_stats() -> models.DBSummary: # Get total number of projects all_projects = data_storage.get_projects() @@ -204,7 +205,7 @@ def get_frontpage_summary_stats(): return db_summary -def get_project_with_name(project_name): +def get_project_with_name(project_name: str) -> Optional[models.Project]: all_projects = data_storage.get_projects() for project in all_projects: if project.name == project_name: @@ -214,7 +215,8 @@ def get_project_with_name(project_name): return None -def get_fuction_with_name(function_name, project_name): +def get_fuction_with_name(function_name: str, + project_name: str) -> models.Function: all_functions = data_storage.get_functions() for function in all_functions: if function.name == function_name and function.project == project_name: @@ -224,7 +226,8 @@ def get_fuction_with_name(function_name, project_name): return all_functions[0] -def get_all_related_functions(primary_function): +def get_all_related_functions( + primary_function: models.Function) -> List[models.Function]: all_functions = data_storage.get_functions() related_functions = [] for function in all_functions: @@ -237,7 +240,7 @@ def get_all_related_functions(primary_function): @blueprint.route('/') -def index(): +def index() -> str: db_summary = get_frontpage_summary_stats() db_timestamps = data_storage.DB_TIMESTAMPS print("Length of timestamps: %d" % (len(db_timestamps))) @@ -272,7 +275,7 @@ def index(): @blueprint.route('/function-profile', methods=['GET']) -def function_profile(): +def function_profile() -> str: function_profile = get_fuction_with_name( request.args.get('function', 'none'), request.args.get('project', 'none')) @@ -285,7 +288,7 @@ def function_profile(): @blueprint.route('/project-profile', methods=['GET']) -def project_profile(): +def project_profile() -> Union[str, BaseResponse]: #print(request.args.get('project', 'none')) target_project_name = request.args.get('project', 'none') @@ -418,7 +421,7 @@ def project_profile(): @blueprint.route('/function-search') -def function_search(): +def function_search() -> str: info_msg = None MAX_MATCHES_TO_DISPLAY = 900 query = request.args.get('q', '') @@ -466,7 +469,7 @@ def function_search(): @blueprint.route('/projects-overview') -def projects_overview(): +def projects_overview() -> str: # Get statistics of the project project_statistics = data_storage.PROJECT_TIMESTAMPS latest_coverage_profiles = dict() @@ -479,7 +482,8 @@ def projects_overview(): all_projects=latest_coverage_profiles.values()) -def oracle_3(all_functions, all_projects): +def oracle_3(all_functions: List[models.Function], + all_projects: List[models.Project]) -> List[models.Function]: """Filters fucntions that: - "have far reach but low coverage and are likely easy to trigger" @@ -490,7 +494,7 @@ def oracle_3(all_functions, all_projects): - at least one argument. """ functions_of_interest = [] - projects_added = dict() + projects_added: Dict[str, List[models.Function]] = dict() for function in all_functions: if (function.runtime_code_coverage < 20.0 @@ -533,9 +537,11 @@ def oracle_3(all_functions, all_projects): return functions_of_interest -def oracle_1(all_functions, all_projects, max_project_count=5): +def oracle_1(all_functions: List[models.Function], + all_projects: List[models.Project], + max_project_count: int = 5) -> List[models.Function]: tmp_list = [] - project_count = dict() + project_count: Dict[str, int] = dict() for function in all_functions: interesting_fuzz_keywords = { 'deserialize', @@ -593,7 +599,7 @@ def oracle_1(all_functions, all_projects, max_project_count=5): return functions_to_display -def match_easy_fuzz_arguments(function): +def match_easy_fuzz_arguments(function: models.Function) -> bool: debug_args = function.debug_data.get('args') if not debug_args: return False @@ -611,7 +617,7 @@ def match_easy_fuzz_arguments(function): return False -def is_static(target_function) -> bool: +def is_static(target_function: models.Function) -> bool: """Returns True if a function is determined to be static and False otherwise, including if undecided.""" @@ -668,12 +674,12 @@ def is_static(target_function) -> bool: return False -def oracle_2(all_functions, - all_projects, - only_functions_with_xrefs=False, - no_static_functions=False): +def oracle_2(all_functions: List[models.Function], + all_projects: List[models.Project], + only_functions_with_xrefs: bool=False, + no_static_functions: bool=False) -> Optional[models.Project]: tmp_list = [] - project_count = dict() + project_count: Dict[str, int] = dict() if len(all_projects) == 1: project_to_target = all_projects[0] else: @@ -730,15 +736,16 @@ def oracle_2(all_functions, @blueprint.route('/target_oracle') -def target_oracle(): +def target_oracle() -> str: all_projects = data_storage.get_projects() all_functions = data_storage.get_functions() functions_to_display = [] total_funcs = set() - oracle_pairs = [(oracle_1, "heuristic 1"), (oracle_2, "heuristic 2"), - (oracle_3, "heuristic 3")] + oracle_pairs: List[Tuple[Callable, str]] = [(oracle_1, "heuristic 1"), + (oracle_2, "heuristic 2"), + (oracle_3, "heuristic 3")] for oracle, heuristic_name in oracle_pairs: func_targets = oracle(all_functions, all_projects) for func in func_targets: @@ -765,7 +772,7 @@ def target_oracle(): @blueprint.route('/indexing-overview') -def indexing_overview(): +def indexing_overview() -> str: build_status = data_storage.get_build_status() languages_summarised = dict() @@ -794,17 +801,17 @@ def indexing_overview(): @blueprint.route('/about') -def about(): +def about() -> str: return render_template('about.html', gtag=gtag) @blueprint.route('/api') -def api(): +def api() -> str: return render_template('api.html', gtag=gtag) @blueprint.route('/api/annotated-cfg') -def api_annotated_cfg(): +def api_annotated_cfg() -> Dict[str, Any]: project_name = request.args.get('project', None) if project_name is None: return {'result': 'error', 'msg': 'Please provide project name'} @@ -834,7 +841,7 @@ def api_annotated_cfg(): @blueprint.route('/api/project-summary') -def api_project_summary(): +def api_project_summary() -> Dict[str, Any]: project_name = request.args.get('project', None) if project_name is None: return {'result': 'error', 'msg': 'Please provide project name'} @@ -858,7 +865,7 @@ def api_project_summary(): @blueprint.route('/api/branch-blockers') -def branch_blockers(): +def branch_blockers() -> Dict[str, Any]: project_name = request.args.get('project', None) if project_name is None: return {'result': 'error', 'msg': 'Please provide project name'} @@ -895,7 +902,8 @@ def branch_blockers(): return {'result': 'success', 'project_blockers': project_blockers} -def get_function_from_func_signature(func_signature, project_name): +def get_function_from_func_signature( + func_signature: str, project_name: str) -> Optional[models.Function]: all_functions = data_storage.get_functions() for function in all_functions: if function.project == project_name and function.func_signature == func_signature: @@ -904,7 +912,7 @@ def get_function_from_func_signature(func_signature, project_name): @blueprint.route('/api/all-cross-references') -def api_cross_references(): +def api_cross_references() -> Dict[str, Any]: """Returns a json representation of all the functions in a given project""" project_name = request.args.get('project', None) if project_name is None: @@ -952,7 +960,7 @@ def api_cross_references(): @blueprint.route('/api/all-functions') -def api_project_all_functions(): +def api_project_all_functions() -> Dict[str, Any]: """Returns a json representation of all the functions in a given project""" project_name = request.args.get('project', None) if project_name is None: @@ -970,7 +978,7 @@ def api_project_all_functions(): @blueprint.route('/api/all-jvm-constructors') -def api_project_all_jvm_constructors(): +def api_project_all_jvm_constructors() -> Dict[str, Any]: """Returns a json representation of all the functions in a given project""" project_name = request.args.get('project', None) if project_name is None: @@ -986,7 +994,8 @@ def api_project_all_jvm_constructors(): return {'result': 'success', 'functions': list_to_return} -def _convert_function_return_list(project_functions): +def _convert_function_return_list( + project_functions: List[models.Function]) -> List[Dict[str, Any]]: """Convert a function list to something we can return""" list_to_return = [] for function in project_functions: @@ -1016,7 +1025,7 @@ def _convert_function_return_list(project_functions): @blueprint.route('/api/project-source-code') -def api_project_source_code(): +def api_project_source_code() -> Dict[str, Any]: """Returns a json representation of all the functions in a given project""" project_name = request.args.get('project', None) if project_name is None: @@ -1079,7 +1088,7 @@ def api_project_source_code(): @blueprint.route('/api/type-info') -def api_type_info(): +def api_type_info() -> Dict[str, Any]: """Returns a json representation of all the functions in a given project""" project_name = request.args.get('project', None) if project_name is None: @@ -1151,7 +1160,7 @@ def api_function_signature(): @blueprint.route('/api/function-source-code') -def api_function_source_code(): +def api_function_source_code() -> Dict[str, Any]: """Returns a json representation of all the functions in a given project""" project_name = request.args.get('project', None) if project_name is None: @@ -1214,7 +1223,8 @@ def api_function_source_code(): } -def get_build_status_of_project(project_name): +def get_build_status_of_project( + project_name: str) -> Optional[models.BuildStatus]: build_status = data_storage.get_build_status() for bs in build_status: @@ -1225,9 +1235,9 @@ def get_build_status_of_project(project_name): @blueprint.route('/api/easy-params-far-reach') -def api_oracle_2(): +def api_oracle_2() -> Dict[str, Any]: """API for getting fuzz targets with easy fuzzable arguments.""" - err_msgs = list() + err_msgs: List[str] = list() project_name = request.args.get('project', None) if project_name is None: return { @@ -1286,8 +1296,8 @@ def api_oracle_2(): @blueprint.route('/api/far-reach-low-cov-fuzz-keyword') -def api_oracle_1(): - err_msgs = list() +def api_oracle_1() -> Dict[str, Any]: + err_msgs: List[str] = list() project_name = request.args.get('project', None) if project_name is None: return { @@ -1336,7 +1346,7 @@ def api_oracle_1(): @blueprint.route('/api/project-repository') -def project_repository(): +def project_repository() -> Dict[str, Any]: project_name = request.args.get('project', None) if project_name is None: return { @@ -1359,7 +1369,7 @@ def project_repository(): @blueprint.route('/api/far-reach-but-low-coverage') -def far_reach_but_low_coverage(): +def far_reach_but_low_coverage() -> Dict[str, Any]: err_msgs = list() project_name = request.args.get('project', None) if project_name is None: @@ -1518,7 +1528,7 @@ def get_full_recursive_types(debug_type_dictionary, resulting_types, @blueprint.route('/api/shutdown') -def shutdown(): +def shutdown() -> Dict[str, str]: """Shuts down the server, only if it's local.""" if is_local or allow_shutdown: sig = getattr(signal, "SIGKILL", signal.SIGTERM) @@ -1529,7 +1539,7 @@ def shutdown(): @blueprint.route('/api/all-header-files') -def all_project_header_files(): +def all_project_header_files() -> Dict[str, Any]: project = request.args.get('project', None) if project is None: return { @@ -1548,7 +1558,7 @@ def all_project_header_files(): @blueprint.route('/api/addr-to-recursive-dwarf-info') -def type_at_addr(): +def type_at_addr() -> Dict[str, Any]: # Temporary disabling this API because of size limit. # @arthurscchan 14/6/2024 return {'result': 'error', 'extended_msgs': ['Temporary disabled']} @@ -1574,7 +1584,7 @@ def type_at_addr(): with open(type_map, 'r') as f: type_map_dict = json.load(f) - resulting_types = dict() + resulting_types: Dict[str, Any] = dict() print("Getting types") get_full_recursive_types(type_map_dict, resulting_types, addr) @@ -1582,11 +1592,11 @@ def type_at_addr(): @blueprint.route('/api/function-target-oracle') -def api_all_interesting_function_targets(): +def api_all_interesting_function_targets() -> Dict[str, Any]: """Returns a list of function targets based on analysis of all functions in all OSS-Fuzz projects (assuming they have introspetor builds) using several different heuristics.""" - result_dict = dict() + result_dict: Dict[str, Any] = dict() # Get the list of all oracles that we have all_projects = data_storage.get_projects() @@ -1596,8 +1606,9 @@ def api_all_interesting_function_targets(): functions_to_display = [] total_funcs = set() - oracle_pairs = [(oracle_1, "heuristic 1"), (oracle_2, "heuristic 2"), - (oracle_3, "heuristic 3")] + oracle_pairs: List[Tuple[Callable, str]] = [(oracle_1, "heuristic 1"), + (oracle_2, "heuristic 2"), + (oracle_3, "heuristic 3")] for oracle, heuristic_name in oracle_pairs: func_targets = oracle(all_functions, all_projects) for func in func_targets: @@ -1635,8 +1646,8 @@ def api_all_interesting_function_targets(): @blueprint.route('/api/sample-cross-references') -def sample_cross_references(): - """Returns a list of strings with functions that call into a given +def sample_cross_references() -> Dict[str, Any]: + """Returns a list of strings with functions that call into a given target function.""" project_name = request.args.get('project', None) if project_name is None: From fa1a888224dc98d133b39ce504e277c734751232 Mon Sep 17 00:00:00 2001 From: Arthur Chan Date: Mon, 17 Jun 2024 15:01:33 +0000 Subject: [PATCH 2/2] Fix return type Signed-off-by: Arthur Chan --- tools/web-fuzzing-introspection/app/webapp/routes.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/web-fuzzing-introspection/app/webapp/routes.py b/tools/web-fuzzing-introspection/app/webapp/routes.py index 63ea51149..5e66f3eae 100644 --- a/tools/web-fuzzing-introspection/app/webapp/routes.py +++ b/tools/web-fuzzing-introspection/app/webapp/routes.py @@ -676,8 +676,8 @@ def is_static(target_function: models.Function) -> bool: def oracle_2(all_functions: List[models.Function], all_projects: List[models.Project], - only_functions_with_xrefs: bool=False, - no_static_functions: bool=False) -> Optional[models.Project]: + only_functions_with_xrefs: bool = False, + no_static_functions: bool = False) -> List[models.Function]: tmp_list = [] project_count: Dict[str, int] = dict() if len(all_projects) == 1: