From 8ebc25d65da481229ecc8b30c2b740b0c501c533 Mon Sep 17 00:00:00 2001 From: Elad Gershon Date: Wed, 14 Aug 2024 10:07:46 +0300 Subject: [PATCH] replace dynamic telmetry with the secondary telemetry Signed-off-by: Elad Gershon --- .../build/config/pdr_deterministic.conf | 3 - .../tests/simulation_telemetry.py | 10 +- .../ufm_sim_web_service/constants.py | 16 +- .../ufm_sim_web_service/isolation_mgr.py | 142 ++---------------- .../ufm_communication_mgr.py | 48 +----- 5 files changed, 28 insertions(+), 191 deletions(-) diff --git a/plugins/pdr_deterministic_plugin/build/config/pdr_deterministic.conf b/plugins/pdr_deterministic_plugin/build/config/pdr_deterministic.conf index c0ca1b8ae..a4535b2ce 100644 --- a/plugins/pdr_deterministic_plugin/build/config/pdr_deterministic.conf +++ b/plugins/pdr_deterministic_plugin/build/config/pdr_deterministic.conf @@ -27,9 +27,6 @@ DEISOLATE_CONSIDER_TIME=300 AUTOMATIC_DEISOLATE=True # if set to false, the plugin will not perform deisolation DO_DEISOLATION=True -DYNAMIC_WAIT_TIME=30 -# number of times to check if a dynamic session is unresponsive before restarting it -DYNAMIC_UNRESPONSIVE_LIMIT=3 [Metrics] # in Celsius diff --git a/plugins/pdr_deterministic_plugin/tests/simulation_telemetry.py b/plugins/pdr_deterministic_plugin/tests/simulation_telemetry.py index 59023bf39..8784510ca 100755 --- a/plugins/pdr_deterministic_plugin/tests/simulation_telemetry.py +++ b/plugins/pdr_deterministic_plugin/tests/simulation_telemetry.py @@ -25,12 +25,12 @@ lock = Lock() PHY_EFF_ERROR = "phy_effective_errors" -PHY_SYMBOL_ERROR = "phy_symbol_errors" +PHY_SYMBOL_ERROR = "Symbol_Errors" RCV_PACKETS_COUNTER = "PortRcvPktsExtended" -RCV_ERRORS_COUNTER = "PortRcvErrorsExtended" -LINK_DOWN_COUNTER = "LinkDownedCounterExtended" -RCV_REMOTE_PHY_ERROR_COUNTER = "PortRcvRemotePhysicalErrorsExtended" -TEMP_COUNTER = "CableInfo.Temperature" +RCV_ERRORS_COUNTER = "PortRcvErrors" +LINK_DOWN_COUNTER = "Link_Down_IB" +RCV_REMOTE_PHY_ERROR_COUNTER = "PortRcvRemotePhysicalErrors" +TEMP_COUNTER = "Module_Temperature" FEC_MODE = "fec_mode_active" ENDPOINT_CONFIG = {} diff --git a/plugins/pdr_deterministic_plugin/ufm_sim_web_service/constants.py b/plugins/pdr_deterministic_plugin/ufm_sim_web_service/constants.py index 87e3dc92d..547578a52 100644 --- a/plugins/pdr_deterministic_plugin/ufm_sim_web_service/constants.py +++ b/plugins/pdr_deterministic_plugin/ufm_sim_web_service/constants.py @@ -43,7 +43,7 @@ class PDRConstants(object): SWITCH_TO_HOST_ISOLATION = "SWITCH_TO_HOST_ISOLATION" TEST_MODE = "TEST_MODE" TEST_MODE_PORT = 9090 - DYNAMIC_UNRESPONSIVE_LIMIT = "DYNAMIC_UNRESPONSIVE_LIMIT" + SECONDARY_TELEMETRY_PORT = 9002 GET_SESSION_DATA_REST = "/monitoring/session/0/data" POST_EVENT_REST = "/app/events/external_event" @@ -53,8 +53,7 @@ class PDRConstants(object): GET_ACTIVE_PORTS_REST = "/resources/ports?active=true" API_HEALTHY_PORTS = "healthy_ports" API_ISOLATED_PORTS = "isolated_ports" - DYNAMIC_SESSION_REST = "/app/telemetry/instances/%s" - STATUS_DYNAMIC_SESSION_REST = "/app/telemetry/instances/status" + SECONDARY_INSTANCE = "low_freq_debug" EXTERNAL_EVENT_ERROR = 554 EXTERNAL_EVENT_ALERT = 553 @@ -70,12 +69,12 @@ class PDRConstants(object): CONF_USERNAME = 'admin' CONF_PASSWORD = 'password' - TEMP_COUNTER = "CableInfo.Temperature" - ERRORS_COUNTER = "errors" + ERRORS_COUNTER = "Symbol_Errors" RCV_PACKETS_COUNTER = "PortRcvPktsExtended" - RCV_ERRORS_COUNTER = "PortRcvErrorsExtended" - RCV_REMOTE_PHY_ERROR_COUNTER = "PortRcvRemotePhysicalErrorsExtended" - LNK_DOWNED_COUNTER = "LinkDownedCounterExtended" + RCV_ERRORS_COUNTER = "PortRcvErrors" + RCV_REMOTE_PHY_ERROR_COUNTER = "PortRcvRemotePhysicalErrors" + TEMP_COUNTER = "Module_Temperature" + LNK_DOWNED_COUNTER = "Link_Down_IB" PHY_RAW_ERROR_LANE0 = "phy_raw_errors_lane0" PHY_RAW_ERROR_LANE1 = "phy_raw_errors_lane1" @@ -109,6 +108,5 @@ class PDRConstants(object): STATE_ISOLATED = "isolated" STATE_TREATED = "treated" - PDR_DYNAMIC_NAME = "pdr_dynamic" # intervals in seconds for testing ber values and corresponding thresholds BER_THRESHOLDS_INTERVALS = [(125 * 60, 3), (12 * 60, 2.88)] diff --git a/plugins/pdr_deterministic_plugin/ufm_sim_web_service/isolation_mgr.py b/plugins/pdr_deterministic_plugin/ufm_sim_web_service/isolation_mgr.py index c5eb8d632..38ab99093 100644 --- a/plugins/pdr_deterministic_plugin/ufm_sim_web_service/isolation_mgr.py +++ b/plugins/pdr_deterministic_plugin/ufm_sim_web_service/isolation_mgr.py @@ -22,17 +22,9 @@ from exclude_list import ExcludeList from constants import PDRConstants as Constants -from ufm_communication_mgr import DynamicSessionState, UFMCommunicator +from ufm_communication_mgr import UFMCommunicator # should actually be persistent and thread safe dictionary pf PortStates - -class DynamicTelemetryUnresponsive(Exception): - """ - Exception raised when the dynamic telemetry is unresponsive. - """ - pass - - class PortData(object): """ Represents the port data. @@ -156,7 +148,7 @@ def get_counter(counter_name, row, default=0): or its value is NaN. """ try: - val = row.get(counter_name) if (row.get(counter_name) is not None and not pd.isna(row.get(counter_name))) else default + val = row.get(counter_name) if (row.get(counter_name) is not None and not numpy.isnan(row.get(counter_name))) else default except Exception as e: return default return val @@ -194,13 +186,11 @@ def __init__(self, ufm_client: UFMCommunicator, logger): self.do_deisolate = pdr_config.getboolean(Constants.CONF_ISOLATION,Constants.DO_DEISOLATION) self.deisolate_consider_time = pdr_config.getint(Constants.CONF_ISOLATION,Constants.DEISOLATE_CONSIDER_TIME) self.automatic_deisolate = pdr_config.getboolean(Constants.CONF_ISOLATION,Constants.AUTOMATIC_DEISOLATE) - self.dynamic_wait_time = pdr_config.getint(Constants.CONF_ISOLATION,"DYNAMIC_WAIT_TIME") self.temp_check = pdr_config.getboolean(Constants.CONF_ISOLATION,Constants.CONFIGURED_TEMP_CHECK) self.link_down_isolation = pdr_config.getboolean(Constants.CONF_ISOLATION,Constants.LINK_DOWN_ISOLATION) self.switch_hca_isolation = pdr_config.getboolean(Constants.CONF_ISOLATION,Constants.SWITCH_TO_HOST_ISOLATION) self.test_mode = pdr_config.getboolean(Constants.CONF_COMMON,Constants.TEST_MODE, fallback=False) self.test_iteration = 0 - self.dynamic_unresponsive_limit = pdr_config.getint(Constants.CONF_ISOLATION,Constants.DYNAMIC_UNRESPONSIVE_LIMIT, fallback=3) # Take from Conf self.logger = logger self.ber_intervals = Constants.BER_THRESHOLDS_INTERVALS if not self.test_mode else [[0.5 * 60, 3]] @@ -227,13 +217,6 @@ def __init__(self, ufm_client: UFMCommunicator, logger): Constants.LNK_DOWNED_COUNTER, ] - # bring telemetry data on disabled ports - self.dynamic_extra_configuration = { - "plugin_env_CLX_EXPORT_API_ENABLE_DOWN_PORT_COUNTERS": "1", - "plugin_env_CLX_EXPORT_API_ENABLE_DOWN_PHY": "1", - "arg_11": "" - } - self.exclude_list = ExcludeList(self.logger) def calc_max_ber_wait_time(self, min_threshold): @@ -456,13 +439,11 @@ def check_temp_issue(self, port_obj, row, timestamp): if not self.temp_check: return None cable_temp = get_counter(Constants.TEMP_COUNTER, row, default=None) - if cable_temp is not None and not pd.isna(cable_temp): - if cable_temp in ["NA", "N/A", "", "0C", "0"]: + if cable_temp is not None and not numpy.isnan(cable_temp): + if cable_temp in ["NA", "N/A", "", "0C"]: return None - # Get new and saved temperature values - cable_temp = int(cable_temp.split("C")[0]) if isinstance(cable_temp, str) else cable_temp - old_cable_temp = port_obj.counters_values.get(Constants.TEMP_COUNTER); - # Save new temperature value + cable_temp = int(cable_temp.split("C")[0]) if type(cable_temp) == str else cable_temp + old_cable_temp = port_obj.counters_values.get(Constants.TEMP_COUNTER, 0) port_obj.counters_values[Constants.TEMP_COUNTER] = cable_temp # Check temperature condition if cable_temp and (cable_temp > self.tmax): @@ -542,17 +523,16 @@ def check_ber_issue(self, port_obj, row, timestamp): return Issue(port_obj.port_name, Constants.ISSUE_BER) return None - def read_next_set_of_high_ber_or_pdr_ports(self, endpoint_port): + def read_next_set_of_high_ber_or_pdr_ports(self): """ Read the next set of ports and check if they have high BER, PDR, temperature or link downed issues """ issues = {} - ports_counters = self.ufm_client.get_telemetry(endpoint_port, Constants.PDR_DYNAMIC_NAME,self.test_mode) + ports_counters = self.ufm_client.get_telemetry(Constants.SECONDARY_TELEMETRY_PORT, Constants.SECONDARY_INSTANCE, self.test_mode) if ports_counters is None: self.logger.error("Couldn't retrieve telemetry data") - raise DynamicTelemetryUnresponsive - for index, row in ports_counters.iterrows(): - port_name = f"{row.get('port_guid', '').split('x')[-1]}_{row.get('port_num', '')}" + for _, row in ports_counters.iterrows(): + port_name = f"{row.get('Node_GUID', '').split('x')[-1]}_{row.get('Port_Number', '')}" if self.exclude_list.contains(port_name): # The port is excluded from analysis continue @@ -766,35 +746,6 @@ def get_isolation_state(self): port_state.update(Constants.STATE_ISOLATED, Constants.ISSUE_OONOC) self.ports_states[port] = port_state - def start_telemetry_session(self): - """ - Starts a telemetry session. - - Returns: - str: The port number if the dynamic session is started successfully, False otherwise. - """ - self.logger.info("Starting telemetry session") - guids = self.get_requested_guids() - response = self.ufm_client.start_dynamic_session(Constants.PDR_DYNAMIC_NAME, self.telemetry_counters, self.interval, guids, self.dynamic_extra_configuration) - if response and response.status_code == http.HTTPStatus.ACCEPTED: - port = str(int(response.content)) - else: - self.logger.error(f"Failed to start dynamic session: {response}") - return False - return port - - def update_telemetry_session(self): - """ - Updates the telemetry session by requesting and updating the dynamic session with the specified interval and guids. - - Returns: - The response from the UFM client after updating the dynamic session. - """ - self.logger.info("Updating telemetry session") - guids = self.get_requested_guids() - response = self.ufm_client.update_dynamic_session(Constants.PDR_DYNAMIC_NAME, self.interval, guids) - return response - def get_requested_guids(self): """ Get the requested GUIDs and their corresponding ports. @@ -812,63 +763,13 @@ def get_requested_guids(self): requested_guids = [{"guid": sys_guid, "ports": ports} for sys_guid, ports in guids.items()] return requested_guids - # this function create dynamic telemetry and returns the port of this telemetry - def run_telemetry_get_port(self): - """ - Runs the telemetry and returns the endpoint port. - - If the test mode is enabled, it returns the test mode port. - Otherwise, it waits for the dynamic session to start, starts the telemetry session, - and retrieves the endpoint port. - - Returns: - int: The endpoint port for the telemetry. - - Raises: - Exception: If an error occurs during the process. - """ - if self.test_mode: - return Constants.TEST_MODE_PORT - try: - while True: - session_state = self.ufm_client.get_dynamic_session_state(Constants.PDR_DYNAMIC_NAME) - if session_state == DynamicSessionState.RUNNING: - # Telemetry session is running - break - if session_state == DynamicSessionState.NONE: - # Start new session - self.logger.info("Waiting for dynamic session to start") - endpoint_port = self.start_telemetry_session() - time.sleep(self.dynamic_wait_time) - else: - # Stop inactive session - self.logger.info("Waiting for inactive dynamic session to stop") - self.ufm_client.stop_dynamic_session(Constants.PDR_DYNAMIC_NAME) - time.sleep(self.dynamic_wait_time) - except Exception as e: - self.ufm_client.stop_dynamic_session(Constants.PDR_DYNAMIC_NAME) - time.sleep(self.dynamic_wait_time) - endpoint_port = self.ufm_client.dynamic_session_get_port(Constants.PDR_DYNAMIC_NAME) - return endpoint_port - - def restart_telemetry_session(self): - """ - Restart the dynamic telemetry session and return the new endpoint port - """ - self.logger.info("Restarting telemetry session") - self.ufm_client.stop_dynamic_session(Constants.PDR_DYNAMIC_NAME) - time.sleep(self.dynamic_wait_time) - endpoint_port = self.run_telemetry_get_port() - return endpoint_port - def main_flow(self): """ Executes the main flow of the Isolation Manager. This method synchronizes with the telemetry clock, retrieves ports metadata, - starts the telemetry session, and continuously retrieves telemetry data to - determine the states of the ports. It handles dynamic telemetry unresponsiveness, - skips isolation if too many ports are detected as unhealthy, and evaluates + continuously retrieves telemetry data from secondary telemetry to + determine the states of the ports. skips isolation if too many ports are detected as unhealthy, and evaluates isolation and deisolation for reported issues and ports with specific causes. Args: @@ -880,9 +781,6 @@ def main_flow(self): self.logger.info("Isolation Manager initialized, starting isolation loop") self.get_ports_metadata() self.logger.info("Retrieved ports metadata") - endpoint_port = self.run_telemetry_get_port() - self.logger.info("telemetry session started") - dynamic_telemetry_unresponsive_count = 0 while(True): try: t_begin = time.time() @@ -894,15 +792,9 @@ def main_flow(self): self.logger.info(f"Retrieving test mode telemetry data to determine ports' states: iteration {self.test_iteration}") self.test_iteration += 1 try: - issues = self.read_next_set_of_high_ber_or_pdr_ports(endpoint_port) - except DynamicTelemetryUnresponsive: - dynamic_telemetry_unresponsive_count += 1 - if dynamic_telemetry_unresponsive_count > self.dynamic_unresponsive_limit: - self.logger.error(f"Dynamic telemetry is unresponsive for {dynamic_telemetry_unresponsive_count} times, restarting telemetry session...") - endpoint_port = self.restart_telemetry_session() - dynamic_telemetry_unresponsive_count = 0 - self.test_iteration = 0 - continue + issues = self.read_next_set_of_high_ber_or_pdr_ports() + except (KeyError,) as e: + self.logger.error(f"failed to read information with error {e}") if len(issues) > self.max_num_isolate: # UFM send external event event_msg = "got too many ports detected as unhealthy: %d, skipping isolation" % len(issues) @@ -937,7 +829,3 @@ def main_flow(self): self.logger.warning(traceback_err) t_end = time.time() time.sleep(max(1, self.interval - (t_end - t_begin))) - -# this is a callback for API exposed by this code - second phase -# def work_reportingd(port): -# PORTS_STATE[port].update(Constants.STATE_TREATED, Constants.ISSUE_INIT) diff --git a/plugins/pdr_deterministic_plugin/ufm_sim_web_service/ufm_communication_mgr.py b/plugins/pdr_deterministic_plugin/ufm_sim_web_service/ufm_communication_mgr.py index 1b38a0928..4c5ae9275 100644 --- a/plugins/pdr_deterministic_plugin/ufm_sim_web_service/ufm_communication_mgr.py +++ b/plugins/pdr_deterministic_plugin/ufm_sim_web_service/ufm_communication_mgr.py @@ -18,14 +18,6 @@ import http import pandas as pd -class DynamicSessionState(Enum): - """ - States of telemetry session instance - """ - NONE = 0 - INACTIVE = 1 - RUNNING = 2 - class UFMCommunicator: def __init__(self, host='127.0.0.1', ufm_port=8000): @@ -67,7 +59,7 @@ def get_telemetry(self, port, instance_name,test_mode): url = f"http://127.0.0.1:{port}/csv/xcset/{instance_name}" try: telemetry_data = pd.read_csv(url) - except Exception as e: + except (pd.errors.ParserError, pd.errors.EmptyDataError) as e: logging.error(f"Failed to get telemetry data from UFM, fetched url={url}. Error: {e}") telemetry_data = None return telemetry_data @@ -119,41 +111,3 @@ def get_ports_metadata(self): def get_port_metadata(self, port_name): return self.get_request("%s/%s" % (Constants.GET_PORTS_REST, port_name)) - - def start_dynamic_session(self, instance_name, counters, sample_rate, guids, extra_configuration=None): - data = { - "counters": counters, - "sample_rate": sample_rate, - "requested_guids": guids, - "is_registered_discovery": False - } - if extra_configuration: - data["configuration"] = extra_configuration - return self.send_request(Constants.DYNAMIC_SESSION_REST % instance_name, data, method=Constants.POST_METHOD) - - def update_dynamic_session(self, instance_name, sample_rate, guids): - data = { - "sample_rate": sample_rate, - "requested_guids": guids - } - return self.send_request(Constants.DYNAMIC_SESSION_REST % instance_name, data, method=Constants.PUT_METHOD) - - def get_dynamic_session_state(self, instance_name): - response = self.get_request(Constants.STATUS_DYNAMIC_SESSION_REST) - if response: - instance_status = response.get(instance_name) - if instance_status: - if instance_status.get("status") == "running": - return DynamicSessionState.RUNNING - else: - return DynamicSessionState.INACTIVE - return DynamicSessionState.NONE - - def stop_dynamic_session(self, instance_name): - data = {} - return self.send_request(Constants.DYNAMIC_SESSION_REST % instance_name, data, method=Constants.DELETE_METHOD) - - def dynamic_session_get_port(self, instance_name): - data = self.get_request(Constants.DYNAMIC_SESSION_REST % instance_name) - if data: - return data.get("endpoint_port")