diff --git a/docs/api_reference/cim_compliance_report.md b/docs/api_reference/cim_compliance_report.md new file mode 100644 index 000000000..67b2b45fb --- /dev/null +++ b/docs/api_reference/cim_compliance_report.md @@ -0,0 +1,21 @@ +# CimComplianceReport + +::: pytest_splunk_addon.cim_compliance + handler: python + +## CIMReportGenerator + +::: pytest_splunk_addon.cim_compliance.cim_report_generator + handler: python + + +## MarkDownReport + +::: pytest_splunk_addon.cim_compliance.markdown_report + handler: python + + +## MarkDownTable + +::: pytest_splunk_addon.cim_compliance.markdown_table + handler: python diff --git a/mkdocs.yml b/mkdocs.yml index 7a986292f..218ca7b67 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -74,5 +74,6 @@ nav: - AppTestGenerator: "api_reference/app_test_generator.md" - DataGenerator: "api_reference/sample_generation.md" - EventIngestor: "api_reference/event_ingestion.md" + - CimComplianceReport: "api_reference/cim_compliance_report.md" - Contributing: "contributing.md" - Troubleshooting: "troubleshoot.md" diff --git a/pytest_splunk_addon/addon_parser/fields.py b/pytest_splunk_addon/addon_parser/fields.py index a0826bf9c..8312a3134 100644 --- a/pytest_splunk_addon/addon_parser/fields.py +++ b/pytest_splunk_addon/addon_parser/fields.py @@ -28,6 +28,7 @@ class Field(object): * name (str): name of the field * type (str): Field type. Supported [required, conditional, optional] + * multi_value (bool): True if field is multi value field * expected_values (list): The field should have this expected values * negative_values (list): The field should not have negative values * condition (spl): The field should only be checked if the condition satisfies diff --git a/pytest_splunk_addon/addon_parser/props_parser.py b/pytest_splunk_addon/addon_parser/props_parser.py index a44e831be..73c3a4350 100644 --- a/pytest_splunk_addon/addon_parser/props_parser.py +++ b/pytest_splunk_addon/addon_parser/props_parser.py @@ -182,8 +182,8 @@ def _get_extract_fields(self, name: str, value: str): EXTRACT-one = regex with (?.*) Args: - name: key in the configuration settings - value: value of the respective name in the configuration + name (str): key in the configuration settings + value (str): value of the respective name in the configuration Regex: Parse the fields from a regex. Examples, @@ -220,8 +220,8 @@ def _get_eval_fields(self, name, value): EVAL-action = if(isnull(action), "unknown", action) Args: - name: key in the configuration settings - value: value of the respective name in the configuration + name (str): key in the configuration settings + value (str): value of the respective name in the configuration Yields: generator of fields @@ -240,8 +240,8 @@ def _get_fieldalias_fields(self, name: str, value: str): FIELDALIAS-class = source AS dest, sc2 AS dest2 Args: - name: key in the configuration settings - value: value of the respective name in the configuration + name (str): key in the configuration settings + value (str): value of the respective name in the configuration Regex: Description: @@ -274,8 +274,8 @@ def _get_report_fields(self, name: str, value: str): transforms.conf and returns the list Args: - name: key in the configuration settings - value: value of the respective name in the configuration + name (str): key in the configuration settings + value (str): value of the respective name in the configuration Yields: generator of (transform_stanza ,fields) parsed from transforms.conf @@ -294,8 +294,8 @@ def _get_lookup_fields(self, name: str, value: str): Extracts the lookup fields Args: - name: key in the configuration settings - value: value of the respective name in the configuration + name (str): key in the configuration settings + value (str): value of the respective name in the configuration Returns: List of lookup fields diff --git a/pytest_splunk_addon/cim_compliance/cim_report_generator.py b/pytest_splunk_addon/cim_compliance/cim_report_generator.py index 5da6ae190..1ee37514c 100644 --- a/pytest_splunk_addon/cim_compliance/cim_report_generator.py +++ b/pytest_splunk_addon/cim_compliance/cim_report_generator.py @@ -55,7 +55,7 @@ class CIMReportGenerator(object): """ - Generate the Report + Generate the Report. data format:: [ @@ -67,6 +67,9 @@ class CIMReportGenerator(object): "status": "pass"/"fail" } ] + + Args: + data (list): List of dictionaries with specified format. """ def __init__(self, data=[], report_class=MarkDownReport): @@ -78,7 +81,7 @@ def add_data(self, data): adds data to object property. Args: - data(list): List of dictionaries with specified format. + data (list): List of dictionaries with specified format. """ self.data.append(data) @@ -87,8 +90,8 @@ def _group_by(self, keys, data=None): Function to generate group of data using Keys provided Args: - keys(list): Contains keys to group data by. - data(list): list of dictionaries with specified format. + keys (list): Contains keys to group data by. + data (list): list of dictionaries with specified format. Yields: data_set.DataSet: data set object mapped with the tags @@ -105,8 +108,8 @@ def _get_count_by(self, keys, data=None): Function to generate count of data using Keys provided Args: - keys(list): Contains keys to generate count by. - data(list): list of dictionaries with specified format. + keys (list): Contains keys to generate count by. + data (list): list of dictionaries with specified format. Yields: data_set.DataSet: data set object mapped with the tags @@ -123,7 +126,7 @@ def pass_count(counter): Function to Get count in Pass/Total format. Args: - counter(collections.Counter): Contains counts of passing/failing Testcases. + counter (collections.Counter): Contains counts of passing/failing Testcases. Yields: String: string with pass/total format. @@ -139,7 +142,7 @@ def fail_count(counter): Function to Get count in Fail/Total format. Args: - counter(collections.Counter): Contains counts of passing/failing Testcases. + counter (collections.Counter): Contains counts of passing/failing Testcases. Yields: String: string with fail/total format. @@ -231,7 +234,9 @@ def generate_field_summary_table(self): del field_summary_table def generate_skip_tests_table(self): - """ """ + """ + Displays summary of the skipped tests + """ skipped_tests = list(filter(lambda d: d["status"] == "skipped", self.data)) if skipped_tests: skipped_tests_table = MarkdownTable( @@ -255,7 +260,7 @@ def generate_report(self, report_path): Function to generate report from the stored data. Args: - report_path(string): Path to create the report. + report_path (string): Path to create the report. """ self.report_generator.set_title("CIM AUDIT REPORT") self.data.sort( diff --git a/pytest_splunk_addon/cim_compliance/markdown_report.py b/pytest_splunk_addon/cim_compliance/markdown_report.py index 36f998093..22e9b6152 100644 --- a/pytest_splunk_addon/cim_compliance/markdown_report.py +++ b/pytest_splunk_addon/cim_compliance/markdown_report.py @@ -20,6 +20,10 @@ class MarkDownReport(CIMReport): + """ + Generate the markdown content + """ + def __init__(self): self.markdown_str = "" self.note_str = "" @@ -29,7 +33,7 @@ def set_title(self, title_string): Function to set title of a report Args: - title_string(string): String containing title for report. + title_string (str): String containing title for report. """ self.title_str = "# {} \n".format(title_string) @@ -38,7 +42,7 @@ def add_section_title(self, section_title): Function to add new section to report Args: - section_title(string): String containing title for new Section. + section_title (str): String containing title for new Section. """ self.markdown_str += "\n## {}\n".format(section_title) @@ -47,7 +51,7 @@ def add_section_description(self, description): Adds description string to the section Args: - description(str): Description string. + description (str): Description string. """ self.markdown_str += "\n**Description:** " + description + "\n" @@ -56,7 +60,7 @@ def add_section_note(self, section_note): Function to set Note in a report Args: - section_note(string): String containing note for report. + section_note (str): String containing note for report. """ self.note_str = "## Note: {} \n".format(section_note) @@ -65,7 +69,7 @@ def add_table(self, table_string): Function to add a table to the Report. Args: - table_string(string): Stringified table. + table_string (str): Stringified table. """ self.markdown_str += table_string @@ -74,7 +78,7 @@ def write(self, path): Function to add a table to the Report. Args: - path(string) : path to store report file. + path (str) : path to store report file. """ with open(path, "w") as report: report.write(self.title_str) diff --git a/pytest_splunk_addon/cim_compliance/markdown_table.py b/pytest_splunk_addon/cim_compliance/markdown_table.py index 81741ccbf..63e84b9bd 100644 --- a/pytest_splunk_addon/cim_compliance/markdown_table.py +++ b/pytest_splunk_addon/cim_compliance/markdown_table.py @@ -20,6 +20,14 @@ class MarkdownTable(BaseTable): + """ + Generate table in markdown format + + Args: + table_title (str): Title of the table + header_list (list(str)): List of header names + """ + def __init__(self, table_title, header_list): self.table_title = self.__set_title(table_title) self.table_headers = self.__set_headers(header_list) @@ -32,7 +40,7 @@ def __set_title(self, title): Adds Title string to the table Args: - title(str): Title string. + title (str): Title string. """ return "### {}".format(title) if title else "" @@ -41,7 +49,7 @@ def __set_headers(self, header_list): Sets the header column for the table. Args: - header_list(list): Contains list of column headers. + header_list (list): Contains list of column headers. """ header_str = "\n" helper_str = "" @@ -55,7 +63,7 @@ def set_description(self, description): Adds description string to the table Args: - description(str): Description string. + description (str): Description string. """ self.table_description = "\n {} \n".format(description) @@ -64,7 +72,7 @@ def add_row(self, value_list): Expects a list of row values to be added in the table Args: - value_list(list): Contains list of row values + value_list (list): Contains list of row values """ row_element = "" for each_value in value_list: @@ -76,7 +84,7 @@ def set_note(self, note_str): It adds the note at the end of the table Args: - note_str(str): Note string to be added. + note_str (str): Note string to be added. """ self.table_note = "\n*NOTE: {} *\n ".format(note_str) diff --git a/pytest_splunk_addon/cim_compliance/plugin.py b/pytest_splunk_addon/cim_compliance/plugin.py index 5f95855bd..8328c6d64 100644 --- a/pytest_splunk_addon/cim_compliance/plugin.py +++ b/pytest_splunk_addon/cim_compliance/plugin.py @@ -38,6 +38,9 @@ def pytest_sessionfinish(self, session): def pytest_runtest_logreport(self, report): """ Collect the data to be added into the report. + + Args: + report (TestReport): test report object """ if report.when == "call" and "test_cim_required_fields" in report.nodeid: data_dict = {} diff --git a/pytest_splunk_addon/cim_tests/data_model.py b/pytest_splunk_addon/cim_tests/data_model.py index 49efdad75..a46822dd7 100644 --- a/pytest_splunk_addon/cim_tests/data_model.py +++ b/pytest_splunk_addon/cim_tests/data_model.py @@ -42,8 +42,8 @@ def _get_mapped_datasets(self, addon_tags, data_sets, mapped_datasets=[]): If the parent data_set is mapped, check the child data_sets too Args: - addon_tags(list): Contains tags mapped to a stanza - data_sets(list): list of data sets to check with + addon_tags (list): Contains tags mapped to a stanza + data_sets (list): list of data sets to check with Yields: data_set.DataSet: data set object mapped with the tags @@ -62,7 +62,7 @@ def get_mapped_datasets(self, addon_tags): Get all mapped dataSets for an Add-on's tags stanza Args: - addon_tags(list): Contains tags mapped to a stanza + addon_tags (list): Contains tags mapped to a stanza Yields: data_set.DataSet: data set object mapped with the tags diff --git a/pytest_splunk_addon/cim_tests/data_model_handler.py b/pytest_splunk_addon/cim_tests/data_model_handler.py index e8dc4f589..cbb60cd69 100644 --- a/pytest_splunk_addon/cim_tests/data_model_handler.py +++ b/pytest_splunk_addon/cim_tests/data_model_handler.py @@ -79,6 +79,9 @@ def load_data_models(self, data_model_path): """ Parse all the data model JSON files one by one + Args: + data_model_path (str): path to the datamodel schema json file + Yields: (cim_tests.data_model.DataModel): parsed data model object """ diff --git a/pytest_splunk_addon/cim_tests/data_set.py b/pytest_splunk_addon/cim_tests/data_set.py index a79ce6021..5fa55d605 100644 --- a/pytest_splunk_addon/cim_tests/data_set.py +++ b/pytest_splunk_addon/cim_tests/data_set.py @@ -25,7 +25,8 @@ class DataSet(object): Handles a single data set Args: - data_set_json(dict): Json of a single DataSet + data_set_json (dict): Json of a single DataSet + data_model (str): Name of the data model """ def __init__(self, data_set_json, data_model): @@ -55,8 +56,8 @@ def load_dataset(cls, dataset_list, data_model): Parse all the fields from the data_model_json Args: - dataset_list(list): Contains list of datasets - data_model: Name of the data model + dataset_list (list): Contains list of datasets + data_model (str): Name of the data model Yields: data_set.DataSet: Dataset object for the given list @@ -77,6 +78,9 @@ def _parse_constraint(cls, constraint_search): def _parse_fields_cluster(self, fields_clusters): """ Parse all the fields from the data_model_json + + Args: + fields_clusters (list(list(str))): list of field clusters, each field cluster contains list of field names which are expected to be together """ parsed_fields_clusters = [] for each_cluster in fields_clusters: @@ -90,6 +94,9 @@ def _parse_fields_cluster(self, fields_clusters): def match_tags(self, addon_tag_list): """ Check if the tags are mapped with this data set + + Args: + addon_tag_list (list): list of tags defined in the addon """ for each_tag_group in self.tags: if set(each_tag_group).issubset(set(addon_tag_list)): diff --git a/pytest_splunk_addon/cim_tests/test_generator.py b/pytest_splunk_addon/cim_tests/test_generator.py index b61cbf398..7e0e2194d 100644 --- a/pytest_splunk_addon/cim_tests/test_generator.py +++ b/pytest_splunk_addon/cim_tests/test_generator.py @@ -65,6 +65,7 @@ def __init__( def generate_tests(self, fixture): """ Generate the test cases based on the fixture provided + supported fixtures: * splunk_searchtime_cim_fields @@ -72,7 +73,7 @@ def generate_tests(self, fixture): * splunk_searchtime_cim_fields_not_extracted Args: - fixture(str): fixture name + fixture (str): fixture name """ if fixture.endswith("fields"): @@ -259,6 +260,13 @@ def generate_mapped_datamodel_tests(self): ) def generate_recommended_fields_tests(self): + """ + Generates the tests to check all the recommended cim fields of the mapped data model are extracted. + + 1. Get a list of fields defined in cim_fields and missing_recommended_fields + 2. combine the fields list with the defined exceptions + 3. yield object with datamodel, dataset, cim_version and list of fields + """ for event in self.tokenized_events: if not event.requirement_test_data: continue diff --git a/pytest_splunk_addon/cim_tests/test_templates.py b/pytest_splunk_addon/cim_tests/test_templates.py index 5324c1015..788b3abf1 100644 --- a/pytest_splunk_addon/cim_tests/test_templates.py +++ b/pytest_splunk_addon/cim_tests/test_templates.py @@ -29,7 +29,7 @@ class CIMTestTemplates(object): Test scenarios to check the CIM compatibility of an Add-on Supported Test scenarios: - - The eventtype should exctract all required fields of data model + - The eventtype should extract all required fields of data model - One eventtype should not be mapped with more than one data model - Field Cluster should be verified (should be included with required field test) - Verify if CIM installed or not @@ -56,6 +56,13 @@ def test_cim_required_fields( - Check that each required field is extracted in all of the events mapped with the data model. - Check that if there are inter dependent fields, either all fields should be extracted or none of them should be extracted. + + Args: + splunk_search_util (SearchUtil): Object that helps to search on Splunk. + splunk_ingest_data (fixture): Ensure data was ingested before running test + splunk_setup (fixture): Ensure that test environment was set up before running test + splunk_searchtime_cim_fields (fixture): Pytest parameter to test required cim field extraction + record_property (fixture): Document facts of test cases to provide more info in the test failure reports. """ cim_data_set = splunk_searchtime_cim_fields["data_set"] @@ -163,6 +170,13 @@ def test_cim_fields_not_allowed_in_search( """ This test case checks the event_count for the cim fields of type ["not_allowed_in_search_and_props", "not_allowed_in_search"]. - Expected event_count for these fields is zero. + + Args: + splunk_ingest_data (fixture): Ensure data was ingested before running test + splunk_search_util (SearchUtil): Object that helps to search on Splunk. + splunk_setup (fixture): Ensure that test environment was set up before running test + splunk_searchtime_cim_fields_not_allowed_in_search (fixture): Object which contain list of fields not_allowed_in_search + record_property (fixture): Document facts of test cases to provide more info in the test failure reports. """ cim_dataset = splunk_searchtime_cim_fields_not_allowed_in_search["data_set"] cim_fields = splunk_searchtime_cim_fields_not_allowed_in_search["fields"] @@ -258,6 +272,12 @@ def test_cim_fields_not_allowed_in_props( ): """ This testcase checks for cim field of type ["not_allowed_in_search_and_props", "not_allowed_in_props"] if an extraction is defined in the configuration file. + + Args: + splunk_ingest_data (fixture): Ensure data was ingested before running test + splunk_setup (fixture): Ensure that test environment was set up before running test + splunk_searchtime_cim_fields_not_allowed_in_props (fixture): Object which contain list of fields not allowed in props + record_property (fixture): Document facts of test cases to provide more info in the test failure reports. """ result_str = ( "The field extractions are not allowed in the configuration files" @@ -294,8 +314,10 @@ def test_eventtype_mapped_multiple_cim_datamodel( Args: splunk_search_util (SearchUtil): Object that helps to search on Splunk. - splunk_searchtime_cim_mapped_datamodel: Object which contain eventtype list - record_property (fixture): Document facts of test cases. + splunk_ingest_data (fixture): Ensure data was ingested before running test + splunk_setup (fixture): Ensure that test environment was set up before running test + splunk_searchtime_cim_mapped_datamodel (fixture): Object which contain eventtype list + record_property (fixture): Document facts of test cases to provide more info in the test failure reports. caplog (fixture): fixture to capture logs. """ @@ -491,6 +513,13 @@ def test_eventtype_mapped_multiple_cim_datamodel( def test_cim_fields_recommended( self, splunk_dm_recommended_fields, splunk_searchtime_cim_fields_recommended ): + """ + This test case check that all the recommended cim fields of datamodel mapped with event are extracted. + + Args: + splunk_dm_recommended_fields (fixture): function which gets recommended fields for given datamodel + splunk_searchtime_cim_fields_recommended (fixture): pytest parameters to test. + """ datamodel = splunk_searchtime_cim_fields_recommended["datamodel"] datasets = splunk_searchtime_cim_fields_recommended["datasets"] fields = splunk_searchtime_cim_fields_recommended["fields"] diff --git a/pytest_splunk_addon/event_ingestors/hec_metric_ingestor.py b/pytest_splunk_addon/event_ingestors/hec_metric_ingestor.py index 4f53b124d..3aafcc237 100644 --- a/pytest_splunk_addon/event_ingestors/hec_metric_ingestor.py +++ b/pytest_splunk_addon/event_ingestors/hec_metric_ingestor.py @@ -38,7 +38,7 @@ def __init__(self, required_configs): init method for the class Args: - required_configs(dict): { + required_configs (dict): { hec_uri: {splunk_hec_scheme}://{splunk_host}:{hec_port}/services/collector, session_headers(dict): { "Authorization": f"Splunk ", diff --git a/pytest_splunk_addon/event_ingestors/hec_raw_ingestor.py b/pytest_splunk_addon/event_ingestors/hec_raw_ingestor.py index 8df4ceccc..cd952f4e7 100644 --- a/pytest_splunk_addon/event_ingestors/hec_raw_ingestor.py +++ b/pytest_splunk_addon/event_ingestors/hec_raw_ingestor.py @@ -42,7 +42,7 @@ class HECRawEventIngestor(EventIngestor): Args: - required_configs(dict): Dictionary containing hec_uri and session headers + required_configs (dict): Dictionary containing hec_uri and session headers """ def __init__(self, required_configs): diff --git a/pytest_splunk_addon/event_ingestors/ingestor_helper.py b/pytest_splunk_addon/event_ingestors/ingestor_helper.py index e11c8917e..2f9c9e78b 100644 --- a/pytest_splunk_addon/event_ingestors/ingestor_helper.py +++ b/pytest_splunk_addon/event_ingestors/ingestor_helper.py @@ -35,6 +35,10 @@ class IngestorHelper(object): def get_event_ingestor(cls, input_type, ingest_meta_data): """ Based on the input_type of the event, it returns an appropriate ingestor. + + Args: + input_type (str): input_type defined in pytest-splunk-addon-data.conf + ingest_meta_data (dict): Dictionary of required meta_data. """ ingest_methods = { "modinput": HECEventIngestor, @@ -85,10 +89,11 @@ def ingest_events( """ Events are ingested in the splunk. Args: - ingest_meta_data(dict): Dictionary of required meta_data. - addon_path: Path to Splunk app package. - config_path: Path to pytest-splunk-addon-sample-generator.conf. - bulk_event_ingestion(bool): Boolean param for bulk event ingestion. + ingest_meta_data (dict): Dictionary of required meta_data. + addon_path (str): Path to Splunk app package. + config_path (str): Path to pytest-splunk-addon-data.conf + thread_count (int): number of threads to use for ingestion + store_events (bool): Boolean param for generating json files with tokenised events """ sample_generator = SampleXdistGenerator(addon_path, config_path) store_sample = sample_generator.get_samples(store_events) diff --git a/pytest_splunk_addon/fields_tests/sample_parser.py b/pytest_splunk_addon/fields_tests/sample_parser.py index 52b2b7335..bdf5119d6 100644 --- a/pytest_splunk_addon/fields_tests/sample_parser.py +++ b/pytest_splunk_addon/fields_tests/sample_parser.py @@ -26,6 +26,15 @@ def parse_sample_files(folder_path): + """ + Parse the sample files + + Args: + folder_path (str): path to the sample files + + Yields: + EventXML: object of EventXML + """ if os.path.isdir(folder_path): for file in os.listdir(folder_path): if file.endswith(".log") or file.endswith(".xml"): @@ -37,6 +46,15 @@ def parse_sample_files(folder_path): def parse_file(filename): + """ + Parse the xml sample file to get the list of events + + Args: + filename (str): file name of the sample xml + + Yields: + Element: xml element for event + """ try: tree = ET.parse(filename) except ParseError: @@ -50,11 +68,33 @@ def parse_file(filename): class XMLParser: + """ + Class for parsing the xml samples + """ + def extract_transport_tag(self, event): + """ + Function to get the transport type for the event + + Args: + event (Element): xml element of the event + + Returns: + str: transport type of the given event + """ for transport in event.iter("transport"): return str(transport.get("type")) def strip_syslog_header(self, raw_event): + """ + Function to strip the syslog header from the raw event + + Args: + raw_event (str): raw event string + + Returns: + str: raw event with stripped syslog header + """ # remove leading space chars raw_event = raw_event.strip() CEF_format_match = re.search( @@ -92,8 +132,13 @@ def strip_syslog_header(self, raw_event): def get_event(self, root): """ - Input: Root of the xml file Function to return raw event string + + Args: + root (Element): root of the xml file + + Returns: + str: raw event """ event = None for raw in root.iter("raw"): @@ -102,8 +147,13 @@ def get_event(self, root): def get_models(self, root): """ - Input: Root of the xml file Function to return list of models in each event of the log file + + Args: + root (Element): root of the xml file + + Returns: + model_list list(str): list of datamodel names """ model_list = [] for model in root.iter("model"): @@ -112,8 +162,13 @@ def get_models(self, root): def split_model(self, model): """ - Input: Root of the xml file - Function to return list of models in each event of the log file + Function to parse the data model name defined in sample file + + Args: + model (str): name of the datamodel + + Returns: + str: name of the data model """ model_name = model.split(":", 2) if len(model_name) == 3: @@ -133,26 +188,30 @@ def split_model(self, model): return model_dataset_subdaset - def get_event(self, root): - """ - Input: Root of the xml file - Function to return raw event string - """ - event = None - for raw in root.iter("raw"): - event = raw.text - return event - def get_root(self, filename): """ - Input: Filename ending with .log extension - Function to return raw event string + Function to get root element of sample file + + Args: + filename (str): name of the sample file + + Returns: + Element: root xml element of sample file """ tree = ET.parse(filename) root = tree.getroot() return root def check_xml_format(self, file_name): + """ + Validates the xml format of the sample file + + Args: + file_name (str): name of the sample file + + Returns: + bool: True if the provided sample file is valid xml + """ if ET.parse(file_name): return True else: @@ -161,6 +220,17 @@ def check_xml_format(self, file_name): # extract_params_transport def extract_params(self, event): + """ + Extracts the host, source and sourcetype fields from sample + + Args: + event (Element): xml element of the sample event + + Returns: + str: host of the event + str: source of the event + str: sourcetype of the event + """ host, source, source_type = "", "", "" for transport in event.iter("transport"): if transport.get("host"): @@ -179,9 +249,14 @@ def escape_host_src_srctype(self, host, source, sourcetype): def escape_char_event(self, event): """ - Input: Event getting parsed Function to escape special characters in Splunk https://docs.splunk.com/Documentation/StyleGuide/current/StyleGuide/Specialcharacters + + Args: + event (str): raw event + + Returns: + str: event with escaped special chars """ escape_splunk_chars = [ "`", @@ -234,6 +309,25 @@ def escape_char_event(self, event): class EventXML: + """ + Class to handle xml element of event + + * transport_type (str): transport type of the event + * event_string (str): raw event + * name (str): name of the sample event + * models (list(str)): datamodels mapped with the event + * tags_to_check (list(str)): list of tag names + * list_model_dataset_subdataset (list(str)): list of datasets mapped with event + * host (str): host of the event + * source (str): source of the event + * sourcetype (str): sourcetype of the event + * cim_fields (dict): key-value pairs for cim_fields defined for the event + * exceptions (dict): key-value pairs for exceptions defined for the event + + Args: + event_tag(Element): xml element of the event + """ + transport_types = [ "modinput", "Modinput", @@ -269,11 +363,23 @@ def __init__(self, event_tag): self.exceptions = self.extract_key_value_xml("exceptions") def get_transport_type(self): + """ + Function to get the transport type of the event + + Raises: + ValueError: if transport type defined for the event is not supported + """ tt = self.xml_parser.extract_transport_tag(self.event_tag) if tt not in EventXML.all_transport_types: raise ValueError(f"Not supported transport type for {self.event_tag}") def get_model_list(self): + """ + Function to get the list of datamodels mapped with the event + + Returns: + list(str): list of datamodel names + """ list_model_dataset_subdataset = [] for model in self.models: model = model.replace(" ", "_") @@ -284,6 +390,15 @@ def get_model_list(self): return list_model_dataset_subdataset def get_event_string(self): + """ + Function to get the raw event + + Raises: + ValueError: if transport type of event is syslog and event does not match the supported syslog format + + Returns: + str: escaped raw event with syslog headers stripped + """ unescaped_event = self.xml_parser.get_event(self.event_tag) if self.transport_type.lower() == "syslog": stripped_event = self.xml_parser.strip_syslog_header(unescaped_event) @@ -296,6 +411,14 @@ def get_event_string(self): return self.xml_parser.escape_char_event(unescaped_event) def get_basic_fields(self): + """ + Function to get the escaped host, source and sourcetype for the event + + Returns: + str: escaped value of host + str: escaped value of source + str: escaped value of sourcetype + """ if self.transport_types in EventXML.transport_types: host, source, sourcetype = self.xml_parser.extract_params(self.event_tag) return self.xml_parser.escape_host_src_srctype(host, source, sourcetype) @@ -310,12 +433,27 @@ def get_transport_type_params(self): } def get_tags_to_check(self): + """ + Function to get the list of tags for the datamodel mapped with the event + + Returns: + (list(str)): list of tag names + """ tags = [] for model in self.models: tags += dict_datamodel_tag[model.replace(" ", "_").replace(":", "_")] return list(set(tags)) def extract_key_value_xml(self, _type): + """ + Function to generate the dict object with the fields key-value + + Args: + _type (str): type of the fields (cim_fields, exceptions) + + Returns: + dict: key-value pairs for fields defined for the event + """ key_value_dict = {} for type_fields in self.event_tag.iter(_type): for fields in type_fields.iter("field"): diff --git a/pytest_splunk_addon/fields_tests/test_generator.py b/pytest_splunk_addon/fields_tests/test_generator.py index 46b4ef4bb..68d81029a 100644 --- a/pytest_splunk_addon/fields_tests/test_generator.py +++ b/pytest_splunk_addon/fields_tests/test_generator.py @@ -40,6 +40,7 @@ class FieldTestGenerator(object): Args: app_path (str): Path of the app package + tokenized_events (list): list of tokenized events field_bank (str): Path of the fields Json file """ @@ -63,9 +64,7 @@ def generate_tests(self, fixture): * splunk_searchtime_fields_requirements Args: - fixture(str): fixture name - sample_generator(SampleGenerator): sample objects generator - store_events(bool): variable to define if events should be stored + fixture (str): fixture name """ if fixture.endswith("positive"): @@ -155,7 +154,7 @@ def generate_tag_tests(self): def generate_requirements_datamodels_tests(self): """ - Generate test case for tags + Generate test case for datamodels Yields: pytest.params for the test templates @@ -224,7 +223,7 @@ def generate_savedsearches_tests(self): def generate_requirements_tests(self): """ Generate test cases for fields defined for datamodel - These function generates tests previously covered by requirement tests + This function generates tests previously covered by requirement tests Yields: pytest.params for the test templates diff --git a/pytest_splunk_addon/fields_tests/test_templates.py b/pytest_splunk_addon/fields_tests/test_templates.py index 109ab77a2..0ea0031ba 100644 --- a/pytest_splunk_addon/fields_tests/test_templates.py +++ b/pytest_splunk_addon/fields_tests/test_templates.py @@ -50,7 +50,7 @@ def test_splunk_internal_errors( Args: splunk_search_util (SearchUtil): Object that helps to search on Splunk. ignore_internal_errors (fixture): common list of errors to be ignored - record_property (fixture): Document facts of test cases. + record_property (fixture): Document facts of test cases to provide more info in the test failure reports. caplog (fixture): fixture to capture logs. """ search = """ @@ -91,9 +91,10 @@ def test_props_fields( Args: splunk_search_util (SearchUtil): Object that helps to search on Splunk. - splunk_searchtime_fields_positive (fixture): Test for stanza field. - record_property (fixture): Document facts of test cases. - caplog (fixture): fixture to capture logs. + splunk_ingest_data (fixture): Ensure data was ingested before running test + splunk_setup (fixture): Ensure that test environment was set up before running test + splunk_searchtime_fields_positive (fixture): fields data of the event to be tested + record_property (fixture): Document facts of test cases to provide more info in the test failure reports. """ # Search Query @@ -154,9 +155,10 @@ def test_requirements_fields( Args: splunk_search_util (SearchUtil): Object that helps to search on Splunk. - splunk_searchtime_fields_positive (fixture): Test for stanza field. - record_property (fixture): Document facts of test cases. - caplog (fixture): fixture to capture logs. + splunk_ingest_data (fixture): Ensure data was ingested before running test + splunk_setup (fixture): Ensure that test environment was set up before running test + splunk_searchtime_fields_requirements (fixture): fields data of the event to be tested + record_property (fixture): Document facts of test cases to provide more info in the test failure reports. """ # Search Query @@ -239,14 +241,11 @@ def test_props_fields_no_dash_not_empty( This test case checks negative scenario for the field value. Args: - splunk_search_util (SearchUtil): - Object that helps to search on Splunk. - splunk_searchtime_fields_negative (fixture): - Test for stanza field. - record_property (fixture): - Document facts of test cases. - caplog (fixture): - fixture to capture logs. + splunk_search_util (SearchUtil): Object that helps to search on Splunk. + splunk_ingest_data (fixture): Ensure data was ingested before running test + splunk_setup (fixture): Ensure that test environment was set up before running test + splunk_searchtime_fields_negative (fixture): fields data of the event to be tested + record_property (fixture): Document facts of test cases to provide more info in the test failure reports. """ # Search Query @@ -318,10 +317,11 @@ def test_tags( and also checks that a tag is not assigned to the event if disabled. Args: - splunk_search_util (splunksplwrapper.SearchUtil.SearchUtil): - object that helps to search on Splunk. + splunk_search_util (splunksplwrapper.SearchUtil.SearchUtil): object that helps to search on Splunk. + splunk_ingest_data (fixture): Ensure data was ingested before running test + splunk_setup (fixture): Ensure that test environment was set up before running test splunk_searchtime_fields_tags (fixture): pytest parameters to test. - record_property (fixture): pytest fixture to document facts of test cases. + record_property (fixture): Document facts of test cases to provide more info in the test failure reports. caplog (fixture): fixture to capture logs. """ @@ -386,10 +386,10 @@ def test_datamodels( Args: splunk_search_util (splunksplwrapper.SearchUtil.SearchUtil): object that helps to search on Splunk. - splunk_ingest_data (fixture): Unused but required to ensure data was ingested before running test - splunk_setup (fixture): Unused but required to ensure that test environment was set up before running test + splunk_ingest_data (fixture): Ensure data was ingested before running test + splunk_setup (fixture): Ensure that test environment was set up before running test splunk_searchtime_fields_datamodels (fixture): pytest parameters to test. - record_property (fixture): pytest fixture to document facts of test cases. + record_property (fixture): Document facts of test cases to provide more info in the test failure reports. caplog (fixture): fixture to capture logs. """ esacaped_event = splunk_searchtime_fields_datamodels["stanza"] @@ -484,14 +484,13 @@ def test_eventtype( Tests if all eventtypes in eventtypes.conf are generated in Splunk. Args: - splunk_search_util (fixture): - Fixture to create a simple connection to Splunk via SplunkSDK - splunk_searchtime_fields_eventtypes (fixture): - Fixture containing list of eventtypes - record_property (fixture): - Used to add user properties to test report - caplog (fixture): - Access and control log capturing + splunk_search_util (splunksplwrapper.SearchUtil.SearchUtil): + object that helps to search on Splunk. + splunk_ingest_data (fixture): Ensure data was ingested before running test + splunk_setup (fixture): Ensure that test environment was set up before running test + splunk_searchtime_fields_eventtypes (fixture): Fixture containing list of eventtypes + record_property (fixture): Document facts of test cases to provide more info in the test failure reports. + caplog (fixture): Access and control log capturing Returns: Asserts whether test case passes or fails. @@ -543,14 +542,13 @@ def test_savedsearches( Tests if all savedsearches in savedsearches.conf are being executed properly to generate proper results. Args: - splunk_search_util (fixture): - Fixture to create a simple connection to Splunk via SplunkSDK - splunk_searchtime_fields_savedsearches (fixture): - Fixture containing list of savedsearches - record_property (fixture): - Used to add user properties to test report - caplog (fixture): - Access and control log capturing + splunk_search_util (splunksplwrapper.SearchUtil.SearchUtil): + object that helps to search on Splunk. + splunk_ingest_data (fixture): Ensure data was ingested before running test + splunk_setup (fixture): Ensure that test environment was set up before running test + splunk_searchtime_fields_savedsearches (fixture): Fixture containing list of savedsearches + record_property (fixture): Document facts of test cases to provide more info in the test failure reports. + caplog (fixture): Access and control log capturing Returns: Asserts whether test case passes or fails. diff --git a/pytest_splunk_addon/index_tests/test_generator.py b/pytest_splunk_addon/index_tests/test_generator.py index ae989990c..e81168b7a 100644 --- a/pytest_splunk_addon/index_tests/test_generator.py +++ b/pytest_splunk_addon/index_tests/test_generator.py @@ -37,6 +37,7 @@ def generate_tests(self, store_events, app_path, config_path, test_type): Generates the test cases based on test_type Args: + store_events (bool): variable to define if events should be stored app_path (str): Path of the app package config_path (str): Path of package which contains pytest-splunk-addon-data.conf test_type (str): Type of test case @@ -61,9 +62,7 @@ def generate_tests(self, store_events, app_path, config_path, test_type): yield from self.generate_line_breaker_tests(tokenized_events) else: - for tokenized_event in tokenized_events: - identifier_key = tokenized_event.metadata.get("identifier") hosts = self.get_hosts(tokenized_event) @@ -108,6 +107,9 @@ def generate_line_breaker_tests(self, tokenized_events): """ Generates test case for testing line breaker + Args: + tokenized_events (list): list of tokenized events + Yields: pytest.params for the test templates """ diff --git a/pytest_splunk_addon/index_tests/test_templates.py b/pytest_splunk_addon/index_tests/test_templates.py index 84c5729b5..c080ed766 100644 --- a/pytest_splunk_addon/index_tests/test_templates.py +++ b/pytest_splunk_addon/index_tests/test_templates.py @@ -62,9 +62,10 @@ def test_indextime_key_fields( Args: splunk_search_util (SearchUtil): Object that helps to search on Splunk. - splunk_ingest_data (fixture): To ingest data into splunk. + splunk_ingest_data (fixture): Ensure data was ingested before running test + splunk_setup (fixture): Ensure that test environment was set up before running test splunk_indextime_key_fields (fixture): Test for key fields - record_property (fixture): Document facts of test cases. + record_property (fixture): Document facts of test cases to provide more info in the test failure reports. caplog (fixture): fixture to capture logs. """ @@ -193,9 +194,10 @@ def test_indextime_time( Args: splunk_search_util (SearchUtil): Object that helps to search on Splunk. - splunk_ingest_data (fixture): To ingest data into splunk. + splunk_ingest_data (fixture): Ensure data was ingested before running test + splunk_setup (fixture): Ensure that test environment was set up before running test splunk_indextime_time (fixture): Test for _time field - record_property (fixture): Document facts of test cases. + record_property (fixture): Document facts of test cases to provide more info in the test failure reports. caplog (fixture): fixture to capture logs. """ index_list = ( @@ -280,9 +282,10 @@ def test_indextime_line_breaker( Args: splunk_search_util (SearchUtil): Object that helps to search on Splunk. - splunk_ingest_data (fixture): To ingest data into splunk. + splunk_ingest_data (fixture): Ensure data was ingested before running test + splunk_setup (fixture): Ensure that test environment was set up before running test splunk_indextime_line_breaker (fixture): Test for event count - record_property (fixture): Document facts of test cases. + record_property (fixture): Document facts of test cases to provide more info in the test failure reports. caplog (fixture): fixture to capture logs. """ expected_events_count = int( diff --git a/pytest_splunk_addon/sample_generation/pytest_splunk_addon_data_parser.py b/pytest_splunk_addon/sample_generation/pytest_splunk_addon_data_parser.py index 738c2ba65..1bcffd64b 100644 --- a/pytest_splunk_addon/sample_generation/pytest_splunk_addon_data_parser.py +++ b/pytest_splunk_addon/sample_generation/pytest_splunk_addon_data_parser.py @@ -34,7 +34,8 @@ class PytestSplunkAddonDataParser: This class parses pytest-splunk-addon-data.conf file. Args: - addon_path: Path to the Splunk App + addon_path (str): Path to the Splunk App + config_path (str): Path to the pytest-splunk-addon-data.conf """ conf_name = " " @@ -48,6 +49,12 @@ def __init__(self, addon_path: str, config_path: str): self._path_to_samples = self._get_path_to_samples() def _get_path_to_samples(self): + """ + Function to get the path to the samples folder + + Returns: + str: path to the samples folder + """ if os.path.exists(os.path.join(self.config_path, "samples")): LOGGER.info( "Samples path is: {}".format(os.path.join(self.config_path, "samples")) @@ -92,7 +99,7 @@ def get_sample_stanzas(self): Converts a stanza in pytest-splunk-addon-data.conf to an object of SampleStanza. Returns: - List of SampleStanza objects. + list: List of SampleStanza objects. """ _psa_data = self._get_psa_data_stanzas() self._check_samples() @@ -125,7 +132,7 @@ def _get_psa_data_stanzas(self): } Return: - Dictionary representing pytest-splunk-addon-data.conf in the above format. + dict: Dictionary representing pytest-splunk-addon-data.conf in the above format. """ psa_data_dict = {} schema = XMLSchema(SCHEMA_PATH) @@ -173,8 +180,16 @@ def _check_samples(self): def test_unicode_char(filename): + """ + Function to check if the file contains unicode chars + + Args: + filename (str): name of the file to check + + Raises: + ValueError: if file contains unicode chars + """ invalid = False - # pattern = re.compile("[^\x00-\x7F]") #do ot want to replace printable chars like €¢ etc pattern = re.compile( "[\u200B-\u200E\uFEFF\u202c\u202D\u2063\u2062]" ) # zero width characters diff --git a/pytest_splunk_addon/sample_generation/rule.py b/pytest_splunk_addon/sample_generation/rule.py index 8aeb5ac1a..24a9b04b6 100644 --- a/pytest_splunk_addon/sample_generation/rule.py +++ b/pytest_splunk_addon/sample_generation/rule.py @@ -51,7 +51,7 @@ def raise_warning(warning_string): To raise a pytest user warning along with a log. Args: - warning_string(str): warning string + warning_string (str): warning string """ LOGGER.warning(warning_string) warnings.warn(UserWarning(warning_string)) @@ -259,7 +259,6 @@ def get_rule_replacement_values(self, sample, value_list, rule): rule (str): fieldname i.e. host, src, user, dvc etc Returns: - index_list (list): list of mapped columns(int) as per value_list csv_row (list): list of replacement values for the rule. """ csv_row = [] diff --git a/pytest_splunk_addon/sample_generation/sample_event.py b/pytest_splunk_addon/sample_generation/sample_event.py index 01cbb8680..2bf3cde7e 100644 --- a/pytest_splunk_addon/sample_generation/sample_event.py +++ b/pytest_splunk_addon/sample_generation/sample_event.py @@ -356,6 +356,14 @@ def register_field_value(self, field, token_values): self.key_fields.setdefault(field, []).append(str(token_values.key)) def update_requirement_test_field(self, field, token, token_values): + """ + Function to update field value for requirement_test_data as per the token replacement + + Args: + field (str): name of the field + token (str): name of the token + token_values (list/str): Token value(s) which are replaced in the key fields + """ if field != "_time": if ( self.requirement_test_data is not None diff --git a/pytest_splunk_addon/sample_generation/sample_generator.py b/pytest_splunk_addon/sample_generation/sample_generator.py index fb6d3a230..2efc6f6de 100644 --- a/pytest_splunk_addon/sample_generation/sample_generator.py +++ b/pytest_splunk_addon/sample_generation/sample_generator.py @@ -22,21 +22,18 @@ class SampleGenerator(object): """ - Main Class - Generate sample objects + Main Class to generate sample objects + + Args: + addon_path (str): path to the addon + config_path (str): Path to the pytest-splunk-addon-data.conf + process_count (num): generate {no} process for execution """ sample_stanzas = [] conf_name = " " def __init__(self, addon_path, config_path=None, process_count=4): - """ - init method for the class - - Args: - addon_path(str): path to the addon - process_count(no): generate {no} process for execution - """ self.addon_path = addon_path self.process_count = process_count self.config_path = config_path diff --git a/pytest_splunk_addon/sample_generation/sample_xdist_generator.py b/pytest_splunk_addon/sample_generation/sample_xdist_generator.py index 7f797146c..d33b46a3a 100644 --- a/pytest_splunk_addon/sample_generation/sample_xdist_generator.py +++ b/pytest_splunk_addon/sample_generation/sample_xdist_generator.py @@ -24,12 +24,30 @@ class SampleXdistGenerator: + """ + This class handles sample generation + + Args: + addon_path (str): path to the addon + config_path (str): Path to the pytest-splunk-addon-data.conf + process_count (num): generate {no} process for execution + """ + def __init__(self, addon_path, config_path=None, process_count=4): self.addon_path = addon_path self.process_count = process_count self.config_path = config_path def get_samples(self, store_events): + """ + Function to generate samples + + Args: + store_events (bool): variable to define if events should be stored + + Returns: + dict: dictionary with conf_name and tokenized events + """ if self.tokenized_event_source == "pregenerated": with open(self.event_path, "rb") as file_obj: store_sample = pickle.load(file_obj) @@ -76,6 +94,12 @@ def get_samples(self, store_events): return store_sample def store_events(self, tokenized_events): + """ + Function to store tokenized events in json file + + Args: + tokenized_events (list): list of tokenized events + """ if not os.path.exists(os.path.join(os.getcwd(), ".tokenized_events")): os.makedirs(os.path.join(os.getcwd(), ".tokenized_events")) tokenized_samples_dict = {} diff --git a/pytest_splunk_addon/sample_generation/time_parser.py b/pytest_splunk_addon/sample_generation/time_parser.py index e9d67ec0a..3615928b5 100644 --- a/pytest_splunk_addon/sample_generation/time_parser.py +++ b/pytest_splunk_addon/sample_generation/time_parser.py @@ -28,13 +28,14 @@ def __init__(self): def convert_to_time(self, sign, num, unit): """ converts splunk time into datetime format for earliest and latest - args : - sign : to increase or decrease time - num : time value - unit : unit of time eg: seconds,minuits etc - returns : - datetime formated time + Args : + sign (str): to increase or decrease time + num (num): time value + unit (str): unit of time eg: seconds,minutes etc + + Returns : + datetime formatted time """ num = int(num) unittime = None @@ -136,15 +137,14 @@ def convert_to_time(self, sign, num, unit): def get_timezone_time(self, random_time, timezone_time): """ - Converts timezone formated time into datetime object for earliest and latest + Converts timezone formatted time into datetime object for earliest and latest Args: - sign to increase or decrease time - hrs : hours in timezone - mins : minutes in timezone + random_time (datetime): datetime object + timezone_time (str): timezone time string - returns: - datetime formated time + Returns: + datetime formatted time """ sign, hrs, mins = re.match(r"([+-])(\d\d)(\d\d)", timezone_time).groups() diff --git a/pytest_splunk_addon/splunk.py b/pytest_splunk_addon/splunk.py index d81c03d3f..da77dd882 100644 --- a/pytest_splunk_addon/splunk.py +++ b/pytest_splunk_addon/splunk.py @@ -371,7 +371,7 @@ def splunk_search_util(splunk, request): Returns: splunksplwrapper.SearchUtil.SearchUtil: The SearchUtil object """ - LOGGER.info("Initializing SearchUtil for the Splunk instace.") + LOGGER.info("Initializing SearchUtil for the Splunk instance.") cloud_splunk = CloudSplunk( splunkd_host=splunk["host"], splunkd_port=splunk["port"], @@ -381,7 +381,7 @@ def splunk_search_util(splunk, request): conn = cloud_splunk.create_logged_in_connector() jobs = Jobs(conn) - LOGGER.info("initialized SearchUtil for the Splunk instace.") + LOGGER.info("initialized SearchUtil for the Splunk instance.") search_util = SearchUtil(jobs, LOGGER) search_util.search_index = request.config.getoption("search_index") search_util.search_retry = request.config.getoption("search_retry") @@ -393,7 +393,7 @@ def splunk_search_util(splunk, request): @pytest.fixture(scope="session") def ignore_internal_errors(request): """ - This fixture generates a common list of errors which are suppossed + This fixture generates a common list of errors which are supposed to be ignored in test_splunk_internal_errors. Returns: diff --git a/pytest_splunk_addon/tools/cim_field_report.py b/pytest_splunk_addon/tools/cim_field_report.py index b5a55a356..b51ff48e2 100644 --- a/pytest_splunk_addon/tools/cim_field_report.py +++ b/pytest_splunk_addon/tools/cim_field_report.py @@ -217,7 +217,7 @@ def get_punct_by_eventtype(jobs, eventtypes, config): list list of tuples of 2 elements, representing collected unique pairs of eventtype+punct None - if exception taks places during splunk search request + if exception take place during splunk search request """ start = time.time() @@ -258,7 +258,7 @@ def get_field_names(jobs, eventtypes, config): list collected field names None - if exception taks places during splunk search request + if exception take place during splunk search request """ start = time.time()