diff --git a/plugins/pdr_deterministic_plugin/ufm_sim_web_service/api/__init__.py b/plugins/pdr_deterministic_plugin/ufm_sim_web_service/api/__init__.py index 7285e76fb..c2e04e83a 100644 --- a/plugins/pdr_deterministic_plugin/ufm_sim_web_service/api/__init__.py +++ b/plugins/pdr_deterministic_plugin/ufm_sim_web_service/api/__init__.py @@ -11,6 +11,9 @@ # class InvalidRequest(Exception): + """ + and exception of invalid request + """ def __init__(self, message): - Exception.__init__(self,message) \ No newline at end of file + Exception.__init__(self,message) diff --git a/plugins/pdr_deterministic_plugin/ufm_sim_web_service/api/pdr_plugin_api.py b/plugins/pdr_deterministic_plugin/ufm_sim_web_service/api/pdr_plugin_api.py index c8a0c22e8..ee6b0fef4 100644 --- a/plugins/pdr_deterministic_plugin/ufm_sim_web_service/api/pdr_plugin_api.py +++ b/plugins/pdr_deterministic_plugin/ufm_sim_web_service/api/pdr_plugin_api.py @@ -28,7 +28,7 @@ def __init__(self, isolation_mgr): """ Initialize a new instance of the PDRPluginAPI class. """ - super(PDRPluginAPI, self).__init__() + super().__init__() self.isolation_mgr = isolation_mgr @@ -48,7 +48,8 @@ def get_excluded_ports(self): Return ports from exclude list as comma separated port names """ items = self.isolation_mgr.exclude_list.items() - formatted_items = [f"{item.port_name}: {'infinite' if item.ttl_seconds == 0 else int(max(0, item.remove_time - time.time()))}" for item in items] + formatted_items = [f"{item.port_name}: {'infinite' if item.ttl_seconds == 0 else int(max(0, item.remove_time - time.time()))}" + for item in items] response = EOL.join(formatted_items) + ('' if not formatted_items else EOL) return response, HTTPStatus.OK @@ -61,7 +62,7 @@ def exclude_ports(self): """ try: - pairs = self.get_request_data() + pairs = PDRPluginAPI.get_request_data() except (JSONDecodeError, ValueError): return ERROR_INCORRECT_INPUT_FORMAT + EOL, HTTPStatus.BAD_REQUEST @@ -71,7 +72,7 @@ def exclude_ports(self): response = "" for pair in pairs: if pair: - port_name = self.fix_port_name(pair[0]) + port_name = PDRPluginAPI.fix_port_name(pair[0]) ttl = 0 if len(pair) == 1 else int(pair[1]) self.isolation_mgr.exclude_list.add(port_name, ttl) if ttl == 0: @@ -80,7 +81,7 @@ def exclude_ports(self): response += f"Port {port_name} added to exclude list for {ttl} seconds" response += self.get_port_warning(port_name) + EOL - + return response, HTTPStatus.OK @@ -91,7 +92,7 @@ def include_ports(self): Example: ["0c42a10300756a04_1","98039b03006c73ba_2"] """ try: - port_names = self.get_request_data() + port_names = PDRPluginAPI.get_request_data() except (JSONDecodeError, ValueError): return ERROR_INCORRECT_INPUT_FORMAT + EOL, HTTPStatus.BAD_REQUEST @@ -100,7 +101,7 @@ def include_ports(self): response = "" for port_name in port_names: - port_name = self.fix_port_name(port_name) + port_name = PDRPluginAPI.fix_port_name(port_name) if self.isolation_mgr.exclude_list.remove(port_name): response += f"Port {port_name} removed from exclude list" else: @@ -110,20 +111,19 @@ def include_ports(self): return response, HTTPStatus.OK - - def get_request_data(self): + @staticmethod + def get_request_data(): """ Deserialize request json data into object """ if request.is_json: # Directly convert JSON data into Python object return request.get_json() - else: - # Attempt to load plain data text as JSON - return json.loads(request.get_data(as_text=True)) - + # Attempt to load plain data text as JSON + return json.loads(request.get_data(as_text=True)) - def fix_port_name(self, port_name): + @staticmethod + def fix_port_name(port_name): """ Try to fix common user mistakes for input port names Return fixed port name diff --git a/plugins/pdr_deterministic_plugin/ufm_sim_web_service/constants.py b/plugins/pdr_deterministic_plugin/ufm_sim_web_service/constants.py index 31097e62e..23d120924 100644 --- a/plugins/pdr_deterministic_plugin/ufm_sim_web_service/constants.py +++ b/plugins/pdr_deterministic_plugin/ufm_sim_web_service/constants.py @@ -12,7 +12,10 @@ import logging -class PDRConstants(object): +class PDRConstants(): + """ + The constants of the PDR plugin. + """ CONF_FILE = "/config/pdr_deterministic.conf" LOG_FILE = '/log/pdr_deterministic_plugin.log' diff --git a/plugins/pdr_deterministic_plugin/ufm_sim_web_service/exclude_list.py b/plugins/pdr_deterministic_plugin/ufm_sim_web_service/exclude_list.py index 24129abc8..50f75b916 100644 --- a/plugins/pdr_deterministic_plugin/ufm_sim_web_service/exclude_list.py +++ b/plugins/pdr_deterministic_plugin/ufm_sim_web_service/exclude_list.py @@ -13,7 +13,7 @@ import time import threading -class ExcludeListItem(object): +class ExcludeListItem(): """ Represents details of excluded port. @@ -33,7 +33,7 @@ def __init__(self, port_name, ttl_seconds): self.remove_time = 0 if ttl_seconds == 0 else time.time() + ttl_seconds -class ExcludeList(object): +class ExcludeList(): """ Implements list for excluded ports. @@ -71,7 +71,7 @@ def add(self, port_name, ttl_seconds = 0): def contains(self, port_name): """ Check if port exists. - Remove the port if its remove time is reached. + Remove the port if its remove time is reached. :param port_name: The name of the port. :return: True if the port still exists, False otherwise. """ @@ -81,10 +81,10 @@ def contains(self, port_name): if data.remove_time == 0 or time.time() < data.remove_time: # Excluded port return True - else: - # The time is expired, so remove port from the list - self.__dict.pop(port_name) - self.__logger.info(f"Port {port_name} automatically removed from exclude list after {data.ttl_seconds} seconds") + + # The time is expired, so remove port from the list + self.__dict.pop(port_name) + self.__logger.info(f"Port {port_name} automatically removed from exclude list after {data.ttl_seconds} seconds") return False @@ -98,8 +98,7 @@ def remove(self, port_name): self.__dict.pop(port_name) self.__logger.info(f"Port {port_name} removed from exclude list") return True - else: - return False + return False def refresh(self): diff --git a/plugins/pdr_deterministic_plugin/ufm_sim_web_service/isolation_algo.py b/plugins/pdr_deterministic_plugin/ufm_sim_web_service/isolation_algo.py index b5272c5d5..6b0c0d00a 100644 --- a/plugins/pdr_deterministic_plugin/ufm_sim_web_service/isolation_algo.py +++ b/plugins/pdr_deterministic_plugin/ufm_sim_web_service/isolation_algo.py @@ -18,9 +18,7 @@ from isolation_mgr import IsolationMgr from ufm_communication_mgr import UFMCommunicator from api.pdr_plugin_api import PDRPluginAPI -from twisted.web.wsgi import WSGIResource from twisted.internet import reactor -from twisted.web import server from utils.flask_server import run_api from utils.flask_server.base_flask_api_app import BaseFlaskAPIApp from utils.utils import Utils @@ -32,7 +30,7 @@ def create_logger(log_file): :param file: name of the file :return: """ - format_str = "%(asctime)-15s UFM-PDR_deterministic-plugin-{0} Machine: {1} %(levelname)-7s: %(message)s".format(log_file,'localhost') + format_str = f"%(asctime)-15s UFM-PDR_deterministic-plugin-{log_file} Machine: localhost %(levelname)-7s: %(message)s" if not os.path.exists(log_file): os.makedirs('/'.join(log_file.split('/')[:-1]), exist_ok=True) logger = logging.getLogger(log_file) @@ -79,9 +77,9 @@ def main(): ufm_port = config_parser.getint(Constants.CONF_LOGGING, Constants.CONF_INTERNAL_PORT) ufm_client = UFMCommunicator("127.0.0.1", ufm_port) logger = create_logger(Constants.LOG_FILE) - + algo_loop = IsolationMgr(ufm_client, logger) - reactor.callInThread(algo_loop.main_flow) + reactor.callInThread(algo_loop.main_flow) # pylint: disable=no-member try: plugin_port = Utils.get_plugin_port( @@ -95,10 +93,10 @@ def main(): app = BaseFlaskAPIApp(routes) run_api(app=app, port_number=int(plugin_port)) - except Exception as ex: + except Exception as ex: # pylint: disable=broad-except print(f'Failed to run the app: {str(ex)}') - + #optional second phase # rest_server = RESTserver() # rest_server.serve() diff --git a/plugins/pdr_deterministic_plugin/ufm_sim_web_service/isolation_mgr.py b/plugins/pdr_deterministic_plugin/ufm_sim_web_service/isolation_mgr.py index 20804fb00..be33957cc 100644 --- a/plugins/pdr_deterministic_plugin/ufm_sim_web_service/isolation_mgr.py +++ b/plugins/pdr_deterministic_plugin/ufm_sim_web_service/isolation_mgr.py @@ -16,7 +16,6 @@ import http import configparser import math -import json import pandas as pd import numpy from exclude_list import ExcludeList @@ -25,11 +24,11 @@ from ufm_communication_mgr import UFMCommunicator # should actually be persistent and thread safe dictionary pf PortStates -class PortData(object): +class PortData(): #pylint: disable=too-many-instance-attributes """ Represents the port data. """ - def __init__(self, port_name=None, port_num=None, peer=None, node_type=None, active_speed=None, port_width=None, port_guid=None): + def __init__(self, port_name=None, port_num=None, peer=None, node_type=None, active_speed=None, port_width=None, port_guid=None): #pylint: disable=too-many-arguments """ Initialize a new instance of the PortData class. @@ -57,7 +56,7 @@ def __init__(self, port_name=None, port_num=None, peer=None, node_type=None, act -class PortState(object): +class PortState(): """ Represents the state of a port. @@ -117,7 +116,7 @@ def get_change_time(self): return self.change_time -class Issue(object): +class Issue(): """ Represents an issue that occurred on a specific port. @@ -138,7 +137,7 @@ def __init__(self, port, cause): def get_counter(counter_name, row, default=0): """ - Get the value of a specific counter from a row of data. If the counter is not present + Get the value of a specific counter from a row of data. If the counter is not present or its value is NaN, return a default value. :param counter_name: The name of the counter to get. @@ -149,7 +148,7 @@ def get_counter(counter_name, row, default=0): """ try: val = row.get(counter_name) if (row.get(counter_name) is not None and not pd.isna(row.get(counter_name))) else default - except Exception as e: + except (KeyError,ValueError,TypeError): return default return val @@ -159,7 +158,7 @@ def get_timestamp_seconds(row): ''' return row.get(Constants.TIMESTAMP) / 1000.0 / 1000.0 -class IsolationMgr: +class IsolationMgr:#pylint: disable=too-many-instance-attributes,too-many-public-methods ''' This class is responsible for managing the isolation of ports based on the telemetry data ''' @@ -167,9 +166,9 @@ class IsolationMgr: def __init__(self, ufm_client: UFMCommunicator, logger): self.ufm_client = ufm_client # {port_name: PortState} - self.ports_states = dict() + self.ports_states = {} # {port_name: telemetry_data} - self.ports_data = dict() + self.ports_data = {} self.ufm_latest_isolation_state = [] pdr_config = configparser.ConfigParser() @@ -219,22 +218,23 @@ def __init__(self, ufm_client: UFMCommunicator, logger): self.exclude_list = ExcludeList(self.logger) - def calc_max_ber_wait_time(self, min_threshold): - """ - Calculates the maximum wait time for Bit Error Rate (BER) based on the given minimum threshold. + @staticmethod + def calc_max_ber_wait_time(min_threshold): + """ + Calculates the maximum wait time for Bit Error Rate (BER) based on the given minimum threshold. - Args: - min_threshold (float): The minimum threshold for BER. + Args: + min_threshold (float): The minimum threshold for BER. - Returns: - float: The maximum wait time in seconds. - """ - # min speed EDR = 32 Gb/s - min_speed, min_width = 32 * 1024 * 1024 * 1024, 1 - min_port_rate = min_speed * min_width - min_bits = float(format(float(min_threshold), '.0e').replace('-', '')) - min_sec_to_wait = min_bits / min_port_rate - return min_sec_to_wait + Returns: + float: The maximum wait time in seconds. + """ + # min speed EDR = 32 Gb/s + min_speed, min_width = 32 * 1024 * 1024 * 1024, 1 + min_port_rate = min_speed * min_width + min_bits = float(format(float(min_threshold), '.0e').replace('-', '')) + min_sec_to_wait = min_bits / min_port_rate + return min_sec_to_wait def is_out_of_operating_conf(self, port_name): """ @@ -249,7 +249,7 @@ def is_out_of_operating_conf(self, port_name): port_obj = self.ports_data.get(port_name) if not port_obj: self.logger.warning(f"Port {port_name} not found in ports data in calculation of oonoc port") - return + return False temp = port_obj.counters_values.get(Constants.TEMP_COUNTER) if temp and temp > self.tmax: return True @@ -327,7 +327,7 @@ def eval_deisolate(self, port_name): self.ports_states[port_name].update(Constants.STATE_ISOLATED, cause) return # we need some time after the change in state - elif datetime.now() >= self.ports_states[port_name].get_change_time() + timedelta(seconds=self.deisolate_consider_time): + if datetime.now() >= self.ports_states[port_name].get_change_time() + timedelta(seconds=self.deisolate_consider_time): port_obj = self.ports_data.get(port_name) port_state = self.ports_states.get(port_name) if port_state.cause == Constants.ISSUE_BER: @@ -342,7 +342,7 @@ def eval_deisolate(self, port_name): return # port is clean now - de-isolate it - # using UFM "mark as healthy" API - PUT /ufmRestV2/app/unhealthy_ports + # using UFM "mark as healthy" API - PUT /ufmRestV2/app/unhealthy_ports # { # "ports": [ # "e41d2d0300062380_3" @@ -352,7 +352,8 @@ def eval_deisolate(self, port_name): if not self.dry_run: ret = self.ufm_client.deisolate_port(port_name) if not ret or ret.status_code != http.HTTPStatus.OK: - self.logger.warning("Failed deisolating port: %s with cause: %s... status_code= %s", port_name, self.ports_states[port_name].cause, ret.status_code) + self.logger.warning("Failed deisolating port: %s with cause: %s... status_code= %s",\ + port_name, self.ports_states[port_name].cause, ret.status_code) return self.ports_states.pop(port_name) log_message = f"Deisolated port: {port_name}. dry_run: {self.dry_run}" @@ -360,7 +361,7 @@ def eval_deisolate(self, port_name): if not self.test_mode: self.ufm_client.send_event(log_message, event_id=Constants.EXTERNAL_EVENT_NOTICE, external_event_name="Deisolating Port") - def get_rate(self, port_obj, counter_name, new_val, timestamp): + def get_rate(self,port_obj, counter_name, new_val, timestamp): #pylint: disable=no-self-use """ Calculate the rate of the counter """ @@ -401,7 +402,8 @@ def find_peer_row_for_port(self, port_obj, ports_counters): if ports_counters[Constants.NODE_GUID].iloc[0].startswith('0x') and not peer_guid.startswith('0x'): peer_guid = f'0x{peer_guid}' #TODO check for a way to save peer row in data structure for performance - peer_row_list = ports_counters.loc[(ports_counters[Constants.NODE_GUID] == peer_guid) & (ports_counters[Constants.PORT_NUMBER] == int(peer_num))] + peer_row_list = ports_counters.loc[(ports_counters[Constants.NODE_GUID] == peer_guid) &\ + (ports_counters[Constants.PORT_NUMBER] == int(peer_num))] if peer_row_list.empty: self.logger.warning(f"Peer port {port_obj.peer} not found in ports data") return None @@ -416,7 +418,7 @@ def calc_error_rate(self, port_obj, row, timestamp): rcv_remote_phy_error = get_counter(Constants.RCV_REMOTE_PHY_ERROR_COUNTER, row) errors = rcv_error + rcv_remote_phy_error error_rate = self.get_rate_and_update(port_obj, Constants.ERRORS_COUNTER, errors, timestamp) - return error_rate + return error_rate def check_pdr_issue(self, port_obj, row, timestamp): """ @@ -432,7 +434,7 @@ def check_pdr_issue(self, port_obj, row, timestamp): return Issue(port_obj.port_name, Constants.ISSUE_PDR) return None - def check_temp_issue(self, port_obj, row, timestamp): + def check_temp_issue(self, port_obj, row): """ Check if the port passed the temperature threshold and return an issue """ @@ -442,7 +444,7 @@ def check_temp_issue(self, port_obj, row, timestamp): if cable_temp is not None and not pd.isna(cable_temp): if cable_temp in ["NA", "N/A", "", "0C", "0"]: return None - cable_temp = int(cable_temp.split("C")[0]) if type(cable_temp) == str else cable_temp + cable_temp = int(cable_temp.split("C")[0]) if isinstance(cable_temp,str) else cable_temp old_cable_temp = port_obj.counters_values.get(Constants.TEMP_COUNTER, 0) port_obj.counters_values[Constants.TEMP_COUNTER] = cable_temp # Check temperature condition @@ -505,7 +507,7 @@ def check_ber_issue(self, port_obj, row, timestamp): if symbol_ber_val is not None: ber_data = { Constants.TIMESTAMP : timestamp, - Constants.SYMBOL_BER : symbol_ber_val, + Constants.SYMBOL_BER : symbol_ber_val, } port_obj.ber_tele_data.loc[len(port_obj.ber_tele_data)] = ber_data port_obj.last_symbol_ber_timestamp = timestamp @@ -518,7 +520,8 @@ def check_ber_issue(self, port_obj, row, timestamp): for (interval, threshold) in self.ber_intervals: symbol_ber_rate = self.calc_ber_rates(port_obj.port_name, port_obj.active_speed, port_obj.port_width, interval) if symbol_ber_rate and symbol_ber_rate > threshold: - self.logger.info(f"Isolation issue ({Constants.ISSUE_BER}) detected for port {port_obj.port_name} (speed: {port_obj.active_speed}, width: {port_obj.port_width}): " + self.logger.info(f"Isolation issue ({Constants.ISSUE_BER}) detected for port {port_obj.port_name}" + f"(speed: {port_obj.active_speed}, width: {port_obj.port_width}): " f"symbol ber rate ({symbol_ber_rate}) is higher than threshold ({threshold})") return Issue(port_obj.port_name, Constants.ISSUE_BER) return None @@ -545,13 +548,13 @@ def read_next_set_of_high_ber_or_pdr_ports(self): if not port_obj: if get_counter(Constants.RCV_PACKETS_COUNTER,row,0) == 0: # meaning it is down port continue - self.logger.warning("Port {0} not found in ports data".format(port_name)) + self.logger.warning("Port %s not found in ports data",port_name) continue # Converting from micro seconds to seconds. timestamp = get_timestamp_seconds(row) #TODO add logs regarding the exact telemetry value leading to the decision pdr_issue = self.check_pdr_issue(port_obj, row, timestamp) - temp_issue = self.check_temp_issue(port_obj, row, timestamp) + temp_issue = self.check_temp_issue(port_obj, row) link_downed_issue = self.check_link_down_issue(port_obj, row, timestamp, ports_counters) ber_issue = self.check_ber_issue(port_obj, row, timestamp) port_obj.last_timestamp = timestamp @@ -565,7 +568,7 @@ def read_next_set_of_high_ber_or_pdr_ports(self): issues[port_name] = ber_issue return issues - def calc_symbol_ber_rate(self, port_name, port_speed, port_width, col_name, time_delta): + def calc_symbol_ber_rate(self, port_name, port_speed, port_width, col_name, time_delta):#pylint: disable=too-many-arguments,too-many-locals """ calculate the symbol BER rate for a given port given the time delta """ @@ -592,10 +595,12 @@ def calc_symbol_ber_rate(self, port_name, port_speed, port_width, col_name, time # Calculate the delta of 'symbol_ber' delta = port_obj.last_symbol_ber_val - comparison_sample[Constants.SYMBOL_BER] actual_speed = self.speed_types.get(port_speed, 100000) - return delta / ((port_obj.last_symbol_ber_timestamp - comparison_df.loc[comparison_idx][Constants.TIMESTAMP]) * actual_speed * port_width * 1024 * 1024 * 1024) + return delta / ((port_obj.last_symbol_ber_timestamp - + comparison_df.loc[comparison_idx][Constants.TIMESTAMP]) * + actual_speed * port_width * 1024 * 1024 * 1024) - except Exception as e: - self.logger.error(f"Error calculating {col_name}, error: {e}") + except (KeyError,ValueError,TypeError) as exception_error: + self.logger.error(f"Error calculating {col_name}, error: {exception_error}") return 0 def calc_ber_rates(self, port_name, port_speed, port_width, time_delta): @@ -666,7 +671,7 @@ def update_port_metadata(self, port_name, port): def update_ports_data(self): """ Updates the ports data by retrieving metadata from the UFM client. - + Returns: bool: True if ports data is updated, False otherwise. """ @@ -707,6 +712,7 @@ def get_port_metadata(self, port_name): if port_width: port_width = int(port_width.strip('x')) return port_speed, port_width + return None, None def set_ports_as_treated(self, ports_dict): @@ -723,7 +729,7 @@ def set_ports_as_treated(self, ports_dict): port_state = self.ports_states.get(port) if port_state and state == Constants.STATE_TREATED: port_state.state = state - + def get_isolation_state(self): """ Retrieves the isolation state of the ports. @@ -732,7 +738,7 @@ def get_isolation_state(self): None: If the test mode is enabled. List[str]: A list of isolated ports if available. """ - + if self.test_mode: # I don't want to get to the isolated ports because we simulating everything.. return @@ -764,7 +770,7 @@ def get_requested_guids(self): requested_guids = [{"guid": sys_guid, "ports": ports} for sys_guid, ports in guids.items()] return requested_guids - def main_flow(self): + def main_flow(self): #pylint: disable=too-many-branches """ Executes the main flow of the Isolation Manager. @@ -782,7 +788,7 @@ def main_flow(self): self.logger.info("Isolation Manager initialized, starting isolation loop") self.get_ports_metadata() self.logger.info("Retrieved ports metadata") - while(True): + while True: try: t_begin = time.time() self.exclude_list.refresh() @@ -794,14 +800,15 @@ def main_flow(self): self.test_iteration += 1 try: issues = self.read_next_set_of_high_ber_or_pdr_ports() - except (KeyError,) as e: - self.logger.error(f"failed to read information with error {e}") + except (KeyError,TypeError,ValueError) as exception_error: + self.logger.error(f"failed to read information with error {exception_error}") if len(issues) > self.max_num_isolate: # UFM send external event - event_msg = "got too many ports detected as unhealthy: %d, skipping isolation" % len(issues) + event_msg = f"got too many ports detected as unhealthy: {len(issues)}, skipping isolation" self.logger.warning(event_msg) if not self.test_mode: - self.ufm_client.send_event(event_msg, event_id=Constants.EXTERNAL_EVENT_ALERT, external_event_name="Skipping isolation") + self.ufm_client.send_event(event_msg, event_id=Constants.EXTERNAL_EVENT_ALERT, + external_event_name="Skipping isolation") # deal with reported new issues else: @@ -815,7 +822,7 @@ def main_flow(self): for port_state in list(self.ports_states.values()): state = port_state.get_state() cause = port_state.get_cause() - # EZ: it is a state that say that some maintenance was done to the link + # EZ: it is a state that say that some maintenance was done to the link # so need to re-evaluate if to return it to service if self.automatic_deisolate or cause == Constants.ISSUE_OONOC or state == Constants.STATE_TREATED: self.eval_deisolate(port_state.name) @@ -823,10 +830,10 @@ def main_flow(self): if ports_updated: self.update_telemetry_session() t_end = time.time() - except Exception as e: + except Exception as exception: #pylint: disable=broad-except self.logger.warning("Error in main loop") - self.logger.warning(e) + self.logger.warning(exception) traceback_err = traceback.format_exc() self.logger.warning(traceback_err) - t_end = time.time() + t_end = time.time() time.sleep(max(1, self.interval - (t_end - t_begin))) diff --git a/plugins/pdr_deterministic_plugin/ufm_sim_web_service/telemetry_collector.py b/plugins/pdr_deterministic_plugin/ufm_sim_web_service/telemetry_collector.py new file mode 100644 index 000000000..6c8ed22de --- /dev/null +++ b/plugins/pdr_deterministic_plugin/ufm_sim_web_service/telemetry_collector.py @@ -0,0 +1,27 @@ +import logging +import urllib +import pandas as pd +from constants import PDRConstants as Constants + +class TelemetryCollector: + """ + Represent Telemetry collector which send DataFrame once telemetry is called. + """ + def __init__(self,test_mode) -> None: + self.test_mode=test_mode + + def get_telemetry(self): + """ + get the telemetry from secondary telemetry, if it in test mode it get from the simulation + return DataFrame of the telemetry + """ + if self.test_mode: + url = "http://127.0.0.1:9090/csv/xcset/simulated_telemetry" + else: + url = f"http://127.0.0.1:{Constants.SECONDARY_TELEMETRY_PORT}/csv/xcset/{Constants.SECONDARY_INSTANCE}" + try: + telemetry_data = pd.read_csv(url) + except (pd.errors.ParserError, pd.errors.EmptyDataError, urllib.error.URLError) as exception_error: + logging.error("Failed to get telemetry data from UFM, fetched url= %s. Error: %s",url,exception_error) + telemetry_data = None + return telemetry_data diff --git a/plugins/pdr_deterministic_plugin/ufm_sim_web_service/ufm_communication_mgr.py b/plugins/pdr_deterministic_plugin/ufm_sim_web_service/ufm_communication_mgr.py index 3f2cfe33e..0c6bd21b4 100644 --- a/plugins/pdr_deterministic_plugin/ufm_sim_web_service/ufm_communication_mgr.py +++ b/plugins/pdr_deterministic_plugin/ufm_sim_web_service/ufm_communication_mgr.py @@ -10,16 +10,15 @@ # provided with the software product. # -from enum import Enum -import urllib.error -from constants import PDRConstants as Constants -import requests import logging -import urllib import http -import pandas as pd +import requests +from constants import PDRConstants as Constants class UFMCommunicator: + """ + communicate with the UFM, send actions to the UFM, see that ports isolated. + """ def __init__(self, host='127.0.0.1', ufm_port=8000): #TODO: read from conf @@ -27,21 +26,21 @@ def __init__(self, host='127.0.0.1', ufm_port=8000): self.ufm_protocol = "http" self.headers = {"X-Remote-User": "ufmsystem"} #self.suffix = None - self._host = "{0}:{1}".format(host, self.internal_port) - + self._host = f"{host}:{self.internal_port}" + def get_request(self, uri, headers=None): request = self.ufm_protocol + '://' + self._host + uri if not headers: headers = self.headers try: response = requests.get(request, verify=False, headers=headers) - logging.info("UFM API Request Status: {}, URL: {}".format(response.status_code, request)) + logging.info("UFM API Request Status: %s, URL: %s",response.status_code, request) if response.status_code == http.client.OK: return response.json() - except ConnectionRefusedError as e: - logging.error(f"failed to get data from {request} with error {e}") - return - + except ConnectionRefusedError as connection_error: + logging.error("failed to get data from %s with error %s",request,connection_error) + return None + def send_request(self, uri, data, method=Constants.POST_METHOD, headers=None): request = self.ufm_protocol + '://' + self._host + uri if not headers: @@ -52,27 +51,11 @@ def send_request(self, uri, data, method=Constants.POST_METHOD, headers=None): response = requests.put(url=request, json=data, verify=False, headers=headers) elif method == Constants.DELETE_METHOD: response = requests.delete(url=request, verify=False, headers=headers) - logging.info("UFM API Request Status: {}, URL: {}".format(response.status_code, request)) + logging.info("UFM API Request Status: %s, URL: %s",response.status_code, request) return response - - def get_telemetry(self,test_mode): - """ - get the telemetry from secondary telemetry, if it in test mode it get from the simulation - return DataFrame of the telemetry - """ - if test_mode: - url = f"http://127.0.0.1:9090/csv/xcset/simulated_telemetry" - else: - url = f"http://127.0.0.1:{Constants.SECONDARY_TELEMETRY_PORT}/csv/xcset/{Constants.SECONDARY_INSTANCE}" - try: - telemetry_data = pd.read_csv(url) - except (pd.errors.ParserError, pd.errors.EmptyDataError, urllib.error.URLError) as e: - logging.error(f"Failed to get telemetry data from UFM, fetched url={url}. Error: {e}") - telemetry_data = None - return telemetry_data - - def send_event(self, message, event_id=Constants.EXTERNAL_EVENT_NOTICE, external_event_name="PDR Plugin Event", external_event_type="PDR Plugin Event"): + def send_event(self, message, event_id=Constants.EXTERNAL_EVENT_NOTICE, + external_event_name="PDR Plugin Event", external_event_type="PDR Plugin Event"): data = { "event_id": event_id, "description": message, @@ -82,8 +65,8 @@ def send_event(self, message, event_id=Constants.EXTERNAL_EVENT_NOTICE, external } ret = self.send_request(Constants.POST_EVENT_REST, data) - if ret: - return True + if ret: + return True return False def get_isolated_ports(self): @@ -113,9 +96,9 @@ def deisolate_port(self, port_name): "ports_policy": "HEALTHY", } return self.send_request(Constants.ISOLATION_REST, data, method=Constants.PUT_METHOD) - + def get_ports_metadata(self): return self.get_request(Constants.GET_ACTIVE_PORTS_REST) def get_port_metadata(self, port_name): - return self.get_request("%s/%s" % (Constants.GET_PORTS_REST, port_name)) + return self.get_request(f"{Constants.GET_PORTS_REST}/ {port_name}")