diff --git a/plugins/pdr_deterministic_plugin/tests/simulation_telemetry.py b/plugins/pdr_deterministic_plugin/tests/simulation_telemetry.py index edaff7349..80b8f4b15 100755 --- a/plugins/pdr_deterministic_plugin/tests/simulation_telemetry.py +++ b/plugins/pdr_deterministic_plugin/tests/simulation_telemetry.py @@ -17,10 +17,13 @@ import copy import argparse import random -from os.path import exists +from os.path import exists,join from collections import OrderedDict import requests from utils.utils import Utils +import pandas as pd +from pathlib import Path +from datetime import datetime,timedelta lock = Lock() @@ -32,6 +35,8 @@ RCV_REMOTE_PHY_ERROR_COUNTER = "PortRcvRemotePhysicalErrors" TEMP_COUNTER = "Module_Temperature" FEC_MODE = "fec_mode_active" +TELEMETRY_DATASTORE_LOCATION = "/opt/ufm/ufm_plugin_pdr_deterministic/datastore" +MAX_LOG_FILES = 10 ENDPOINT_CONFIG = {} EXCLUDE_PORT_LONG_TIME = "ExcludePortForLongTime" @@ -70,6 +75,8 @@ def do_GET(self): # pylint: disable=invalid-name data = endpoint['data'] self.wfile.write(data.encode()) +OUTPUT_FILE_FORMAT = "%Y_%m_%d_%H_%M_%S.csv" + DIFFERENT_DEFAULT_VALUES = { # because the plugin reads the meta data to know the first temperature and we cannot stream the metadata. TEMP_COUNTER:"5", @@ -194,6 +201,7 @@ def start_server(port:str,changes_intervals:int, run_forever:bool): counters_names = list(counters.keys()) header = ['timestamp', 'source_id,tag,Node_GUID,port_guid,Port_Number'] + counters_names endpoint['data'] = "" + setup_log_test = False while True: # lock.acquire() data = [] @@ -217,8 +225,28 @@ def start_server(port:str,changes_intervals:int, run_forever:bool): if not run_forever and ENDPOINT_CONFIG["ITERATION_TIME"] > MAX_ITERATIONS: # after all the tests are done, we need to stop the simulator and check the logs return + if not setup_log_test: + setup_log_test = create_telemetries_logs() time.sleep(changes_intervals) +def create_telemetries_logs(): + """ + create up to double of the amount of logs that can be in the folder. this is a setup for test. + """ + successful = True + abs_file_path = Path(join(TELEMETRY_DATASTORE_LOCATION,"abs")) + files = list(abs_file_path.glob("*.csv")) + if len(files) == 0: + print("abs file have not created yet") + return False + df = pd.read_csv(files[0]) + for option_location in ['abs','delta']: + for day in range(1,MAX_LOG_FILES*2): + # we put days back to make sure the logs that the server creates are recent, we have a test for recent logs. + file_name = (datetime.now() - timedelta(days=day)).strftime(OUTPUT_FILE_FORMAT) + df.to_csv(join(TELEMETRY_DATASTORE_LOCATION,option_location,file_name)) + return successful + def excluded_ports_simulation(endpoint): """ Perform operations on exclusion port for current iteration @@ -229,7 +257,6 @@ def excluded_ports_simulation(endpoint): for port_index in range(len(rows)): port_name = endpoint["Ports_names"][port_index] iteration = ENDPOINT_CONFIG["ITERATION_TIME"] - # Process remove operation if find_value(port_index, INCLUDE_PORT, iteration, None) is not None: # Remove from exclusion list @@ -339,7 +366,31 @@ def check_logs(config): break if len(lines) == 0: print("Could not find log file in " + str(location_logs_can_be)) - return 1 + return 1 + + saved_files_tests = True + for option_location in ['abs','delta']: + input_path_dir = Path(join(TELEMETRY_DATASTORE_LOCATION,option_location)) + files = list(input_path_dir.glob("*.csv")) + + if len(files) == 0: # testing we have files there + print(f"There are no files in the datastore location:{join(TELEMETRY_DATASTORE_LOCATION,option_location)}") + saved_files_tests = False + continue + print_test_result("amount of telemetry logs in the folder equal to maximum or maximum plus 1 (did not clean the plus 1 yet).", + len(files), MAX_LOG_FILES) + if len(files) != MAX_LOG_FILES and len(files) != MAX_LOG_FILES + 1: + saved_files_tests = False + files.sort(key=lambda p: p.name,reverse=True) + latest_file = files[0].name + saved_time = datetime.strptime(latest_file,OUTPUT_FILE_FORMAT) + different_time = datetime.now() - saved_time + print_test_result(f"the latest file saved at {join(TELEMETRY_DATASTORE_LOCATION,option_location)}\ + before test running, more than 5 minutes {different_time.total_seconds() > 300}", + different_time.total_seconds() > 300, False) + if different_time.total_seconds() > 300: + saved_files_tests = False + continue # if a you want to add more tests, please add more guids and test on other indices. ports_should_be_isolated_indices = list(set([x[1] for x in POSITIVE_DATA_TEST])) @@ -373,7 +424,7 @@ def check_logs(config): break print_test_result(f"port {port_name} (index: {p}) which check {tested_counter} should not be in the logs", found, False, "negative") - all_pass = number_of_failed_positive_tests == 0 and number_of_failed_negative_tests == 0 + all_pass = number_of_failed_positive_tests == 0 and number_of_failed_negative_tests == 0 and saved_files_tests return 0 if all_pass else 1 def main(): @@ -417,7 +468,7 @@ def main(): if not validate_simulation_data(): return 1 - + port = args.endpoint_port url = f'http://0.0.0.0:{port}{args.url_suffix}' print(f'---Starting endpoint {url}') diff --git a/plugins/pdr_deterministic_plugin/ufm_sim_web_service/constants.py b/plugins/pdr_deterministic_plugin/ufm_sim_web_service/constants.py index fc53971ca..82a1053cb 100644 --- a/plugins/pdr_deterministic_plugin/ufm_sim_web_service/constants.py +++ b/plugins/pdr_deterministic_plugin/ufm_sim_web_service/constants.py @@ -46,7 +46,6 @@ class PDRConstants(): SWITCH_TO_HOST_ISOLATION = "SWITCH_TO_HOST_ISOLATION" TEST_MODE = "TEST_MODE" TEST_MODE_PORT = 9090 - SECONDARY_TELEMETRY_PORT = 9002 GET_SESSION_DATA_REST = "/monitoring/session/0/data" POST_EVENT_REST = "/app/events/external_event" @@ -56,7 +55,6 @@ class PDRConstants(): GET_ACTIVE_PORTS_REST = "/resources/ports?active=true" API_HEALTHY_PORTS = "healthy_ports" API_ISOLATED_PORTS = "isolated_ports" - SECONDARY_INSTANCE = "low_freq_debug" TIMEOUT = 60 diff --git a/plugins/pdr_deterministic_plugin/ufm_sim_web_service/data_store.py b/plugins/pdr_deterministic_plugin/ufm_sim_web_service/data_store.py new file mode 100644 index 000000000..dba03a074 --- /dev/null +++ b/plugins/pdr_deterministic_plugin/ufm_sim_web_service/data_store.py @@ -0,0 +1,89 @@ +from os.path import join,exists +from os import remove,makedirs +from pathlib import Path +from datetime import datetime +import pandas as pd + +class DataStore: + """ + arrange the data store of the telemetries that we save, abs or delta. remove old data when want. + """ + OUTPUT_FILE_FORMAT = "%Y_%m_%d_%H_%M_%S.csv" + AMOUNT_FILES_TO_KEEP = 10 + BASE_PATH = "/opt/ufm/ufm_plugin_pdr_deterministic/datastore" + ABS_PATH = "abs" + DELTA_PATH = "delta" + TAR_SUFFIX = "*.csv" + + def __init__(self,logger) -> None: + self.logger = logger + if not exists(self.BASE_PATH): + makedirs(self._folder_abs()) + makedirs(self._folder_delta()) + + def _folder_abs(self) -> str: + return join(self.BASE_PATH,self.ABS_PATH) + + def _folder_delta(self) -> str: + return join(self.BASE_PATH,self.DELTA_PATH) + + def _get_filename(self) -> str: + return datetime.now().strftime(self.OUTPUT_FILE_FORMAT) + + def get_filename_abs(self) -> str: + """ + return a filename for abs data + """ + return join(self._folder_abs(),self._get_filename()) + + def get_filename_delta(self) -> str: + """ + return a filename for delta data + """ + return join(self._folder_delta(),self._get_filename()) + + def _get_files_to_remove(self, data_path:str, suffix:str, to_keep:int) -> list: + """ + find the file names of the oldest which is after the amount of to_keep + search for in the data_path with the suffix. + """ + files_to_remove = [] + input_path_dir = Path(data_path) + files = list(input_path_dir.glob(suffix)) + files.sort(key=lambda p: p.name) + files = [str(p) for p in files] + if len(files) > to_keep: + files_to_remove = files[:len(files)- to_keep] + return files_to_remove + + def clean_old_files(self) -> None: + """ + search for the both locations to clean the old files. + """ + for data_path in [self._folder_abs(),self._folder_delta()]: + files = self._get_files_to_remove(data_path,self.TAR_SUFFIX,self.AMOUNT_FILES_TO_KEEP) + if len(files) > 0: + self._remove_files(files) + + def _remove_files(self, files: list) -> None: + """ + Delete a list of files + :param files: (List) List of files to be removed + :return: None + """ + self.logger.info(f"removing {len(files)} old files") + for file in files: + try: + if exists(file): + remove(file) + except FileNotFoundError: + pass + except OSError as exc: + self.logger.error("failed to remove file %s [%s]", file, exc) + + def save(self, dataframe:pd.DataFrame, file_name:str) -> None: + """ + save dataframe to the file name + """ + self.logger.info(f"saving data to {file_name}") + dataframe.to_csv(file_name) diff --git a/plugins/pdr_deterministic_plugin/ufm_sim_web_service/isolation_mgr.py b/plugins/pdr_deterministic_plugin/ufm_sim_web_service/isolation_mgr.py index 232e39eb9..6a9e8cd55 100644 --- a/plugins/pdr_deterministic_plugin/ufm_sim_web_service/isolation_mgr.py +++ b/plugins/pdr_deterministic_plugin/ufm_sim_web_service/isolation_mgr.py @@ -18,8 +18,9 @@ from pdr_algorithm import PortData, IsolatedPort, PDRAlgorithm from constants import PDRConstants as Constants +from telemetry_collector import TelemetryCollector from ufm_communication_mgr import UFMCommunicator -# should actually be persistent and thread safe dictionary pf IsolatedPorts +from data_store import DataStore #pylint: disable=too-many-instance-attributes class IsolationMgr: @@ -48,6 +49,8 @@ def __init__(self, ufm_client: UFMCommunicator, logger): self.test_iteration = 0 self.logger = logger + self.data_store = DataStore(self.logger) + self.telemetry_collector = TelemetryCollector(self.test_mode,logger,self.data_store) self.exclude_list = ExcludeList(self.logger) self.pdr_alg = PDRAlgorithm(self.ufm_client, self.exclude_list, self.logger, pdr_config) @@ -268,6 +271,7 @@ def main_flow(self): t_begin = time.time() self.exclude_list.refresh() self.get_isolation_state() + self.data_store.clean_old_files() issues = None # Get telemetry data @@ -277,7 +281,7 @@ def main_flow(self): self.logger.info(f"Retrieving test mode telemetry data to determine ports' states: iteration {self.test_iteration}") self.test_iteration += 1 try: - ports_counters = self.ufm_client.get_telemetry(self.test_mode) + ports_counters = self.telemetry_collector.get_telemetry() if ports_counters is None: self.logger.error("Couldn't retrieve telemetry data") else: @@ -309,6 +313,7 @@ def main_flow(self): for isolated_port in list(self.isolated_ports.values()): if self.pdr_alg.check_deisolation_conditions(isolated_port): self.eval_deisolate(isolated_port.name) + self.update_ports_data() t_end = time.time() #pylint: disable=broad-except diff --git a/plugins/pdr_deterministic_plugin/ufm_sim_web_service/pdr_algorithm.py b/plugins/pdr_deterministic_plugin/ufm_sim_web_service/pdr_algorithm.py index dec6207b8..6ecff06db 100644 --- a/plugins/pdr_deterministic_plugin/ufm_sim_web_service/pdr_algorithm.py +++ b/plugins/pdr_deterministic_plugin/ufm_sim_web_service/pdr_algorithm.py @@ -433,7 +433,7 @@ def analyze_telemetry_data(self, ports_data, ports_counters): issues[port_name] = ber_issue # If out of operating conditions we'll overwrite the cause if self.temp_check and self.is_out_of_operating_conf(port_name): - issues[port_name] = Constants.ISSUE_OONOC + issues[port_name].cause = Constants.ISSUE_OONOC return list(issues.values()) def check_deisolation_conditions(self, isolated_port): diff --git a/plugins/pdr_deterministic_plugin/ufm_sim_web_service/telemetry_collector.py b/plugins/pdr_deterministic_plugin/ufm_sim_web_service/telemetry_collector.py new file mode 100644 index 000000000..566b7608f --- /dev/null +++ b/plugins/pdr_deterministic_plugin/ufm_sim_web_service/telemetry_collector.py @@ -0,0 +1,56 @@ +import urllib +import pandas as pd +from data_store import DataStore + +class TelemetryCollector: + """ + Represent Telemetry collector which send DataFrame once telemetry is called. + Calls data store class for save the abs or delta data. + """ + BASED_COLUMNS = ["Node_GUID", "port_guid", "Port_Number"] + KEY = BASED_COLUMNS + ["timestamp","tag"] + SECONDARY_TELEMETRY_PORT = 9002 + SECONDARY_INSTANCE = "low_freq_debug" + + def __init__(self,test_mode:bool,logger,data_store:DataStore) -> None: + self.test_mode = test_mode + self.logger = logger + self.previous_telemetry_data = None + self.data_store = data_store + + def get_telemetry(self): + """ + get the telemetry from secondary telemetry, if it in test mode it get from the simulation + return DataFrame of the telemetry + """ + if self.test_mode: + url = "http://127.0.0.1:9090/csv/xcset/simulated_telemetry" + else: + url = f"http://127.0.0.1:{self.SECONDARY_TELEMETRY_PORT}/csv/xcset/{self.SECONDARY_INSTANCE}" + try: + self.logger.info(f"collecting telemetry from {url}.") + telemetry_data = pd.read_csv(url) + except (pd.errors.ParserError, pd.errors.EmptyDataError, urllib.error.URLError) as connection_error: + self.logger.error("failed to get telemetry data from UFM, fetched url=%s. Error: %s",url,connection_error) + telemetry_data = None + if self.previous_telemetry_data is not None and telemetry_data is not None: + delta = self._get_delta(self.previous_telemetry_data,telemetry_data) + # when we want to keep only delta + if len(delta) > 0: + self.data_store.save(delta,self.data_store.get_filename_delta()) + elif telemetry_data is not None: + # when we want to keep the abs + self.previous_telemetry_data = telemetry_data + self.data_store.save(telemetry_data,self.data_store.get_filename_abs()) + return telemetry_data + + def _get_delta(self, first_df: pd.DataFrame, second_df:pd.DataFrame): + merged_df = pd.merge(second_df, first_df, on=self.BASED_COLUMNS, how='inner', suffixes=('', '_x')) + delta_dataframe = pd.DataFrame() + for index,col in enumerate(second_df.columns): + if col not in self.KEY and not isinstance(merged_df.iat[0,index],str): + col_x = col + "_x" + delta_dataframe[col] = merged_df[col] - merged_df[col_x] + else: + delta_dataframe[col] = second_df[col] + return delta_dataframe diff --git a/plugins/pdr_deterministic_plugin/ufm_sim_web_service/ufm_communication_mgr.py b/plugins/pdr_deterministic_plugin/ufm_sim_web_service/ufm_communication_mgr.py index 1abe370da..f4e8fcaf0 100644 --- a/plugins/pdr_deterministic_plugin/ufm_sim_web_service/ufm_communication_mgr.py +++ b/plugins/pdr_deterministic_plugin/ufm_sim_web_service/ufm_communication_mgr.py @@ -10,13 +10,10 @@ # provided with the software product. # -import urllib.error -import urllib import logging import http from constants import PDRConstants as Constants import requests -import pandas as pd class UFMCommunicator: """ @@ -59,22 +56,6 @@ def send_request(self, uri, data, method=Constants.POST_METHOD, headers=None): logging.info("UFM API Request Status: %s, URL: %s",response.status_code, request) return response - def get_telemetry(self,test_mode): - """ - get the telemetry from secondary telemetry, if it in test mode it get from the simulation - return DataFrame of the telemetry - """ - if test_mode: - url = "http://127.0.0.1:9090/csv/xcset/simulated_telemetry" - else: - url = f"http://127.0.0.1:{Constants.SECONDARY_TELEMETRY_PORT}/csv/xcset/{Constants.SECONDARY_INSTANCE}" - try: - telemetry_data = pd.read_csv(url) - except (pd.errors.ParserError, pd.errors.EmptyDataError, urllib.error.URLError) as error: - logging.error("Failed to get telemetry data from UFM, fetched url=%s. Error: %s",url,error) - telemetry_data = None - return telemetry_data - def send_event(self, message, event_id=Constants.EXTERNAL_EVENT_NOTICE, external_event_name="PDR Plugin Event", external_event_type="PDR Plugin Event"): data = {