Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pdr remove dynamic #238

Closed
wants to merge 10 commits into from
2 changes: 1 addition & 1 deletion plugins/pdr_deterministic_plugin/.ci/do_add_plugin.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/bash -x
export SERVER_HOST=$SERVER_HOST
expect << EOF
spawn ssh admin@${SERVER_HOST}
spawn ssh -o StrictHostKeyChecking=no admin@${SERVER_HOST}
expect "Password:*"
send -- "admin\r"
expect "> "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ namehost=$(echo $HOSTNAME)
export SERVER_HOST=$SERVER_HOST
export PASSWORD=$PASSWORD
expect << EOF
spawn ssh admin@${SERVER_HOST}
spawn ssh -o StrictHostKeyChecking=no admin@${SERVER_HOST}
expect "Password:*"
send -- "admin\r"
expect "> "
Expand Down
2 changes: 1 addition & 1 deletion plugins/pdr_deterministic_plugin/.ci/do_load_plugin.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/bash -x
export SERVER_HOST=$SERVER_HOST
expect << EOF
spawn ssh admin@${SERVER_HOST}
spawn ssh -o StrictHostKeyChecking=no admin@${SERVER_HOST}
expect "Password:*"
send -- "admin\r"
expect "> "
Expand Down
2 changes: 1 addition & 1 deletion plugins/pdr_deterministic_plugin/.ci/do_remove_plugin.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/bash -x
export SERVER_HOST=$SERVER_HOST
expect << EOF
spawn ssh admin@${SERVER_HOST}
spawn ssh -o StrictHostKeyChecking=no admin@${SERVER_HOST}
expect "Password:*"
send -- "admin\r"
expect "> "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ DEISOLATE_CONSIDER_TIME=300
AUTOMATIC_DEISOLATE=True
# if set to false, the plugin will not perform deisolation
DO_DEISOLATION=True
DYNAMIC_WAIT_TIME=30
# number of times to check if a dynamic session is unresponsive before restarting it
DYNAMIC_UNRESPONSIVE_LIMIT=3

[Metrics]
# in Celsius
Expand Down
12 changes: 6 additions & 6 deletions plugins/pdr_deterministic_plugin/tests/simulation_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
lock = Lock()

PHY_EFF_ERROR = "phy_effective_errors"
PHY_SYMBOL_ERROR = "phy_symbol_errors"
PHY_SYMBOL_ERROR = "Symbol_Errors"
RCV_PACKETS_COUNTER = "PortRcvPktsExtended"
RCV_ERRORS_COUNTER = "PortRcvErrorsExtended"
LINK_DOWN_COUNTER = "LinkDownedCounterExtended"
RCV_REMOTE_PHY_ERROR_COUNTER = "PortRcvRemotePhysicalErrorsExtended"
TEMP_COUNTER = "CableInfo.Temperature"
RCV_ERRORS_COUNTER = "PortRcvErrors"
LINK_DOWN_COUNTER = "Link_Down_IB"
RCV_REMOTE_PHY_ERROR_COUNTER = "PortRcvRemotePhysicalErrors"
TEMP_COUNTER = "Module_Temperature"
FEC_MODE = "fec_mode_active"
ENDPOINT_CONFIG = {}

Expand Down Expand Up @@ -192,7 +192,7 @@ def start_server(port:str,changes_intervals:int, run_forever:bool):
t.daemon = True
t.start()
counters_names = list(counters.keys())
header = ['timestamp', 'source_id,tag,node_guid,port_guid,port_num'] + counters_names
header = ['timestamp', 'source_id,tag,Node_GUID,port_guid,Port_Number'] + counters_names
endpoint['data'] = ""
while True:
# lock.acquire()
Expand Down
19 changes: 10 additions & 9 deletions plugins/pdr_deterministic_plugin/ufm_sim_web_service/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class PDRConstants(object):
SWITCH_TO_HOST_ISOLATION = "SWITCH_TO_HOST_ISOLATION"
TEST_MODE = "TEST_MODE"
TEST_MODE_PORT = 9090
DYNAMIC_UNRESPONSIVE_LIMIT = "DYNAMIC_UNRESPONSIVE_LIMIT"
SECONDARY_TELEMETRY_PORT = 9002

GET_SESSION_DATA_REST = "/monitoring/session/0/data"
POST_EVENT_REST = "/app/events/external_event"
Expand All @@ -53,8 +53,7 @@ class PDRConstants(object):
GET_ACTIVE_PORTS_REST = "/resources/ports?active=true"
API_HEALTHY_PORTS = "healthy_ports"
API_ISOLATED_PORTS = "isolated_ports"
DYNAMIC_SESSION_REST = "/app/telemetry/instances/%s"
STATUS_DYNAMIC_SESSION_REST = "/app/telemetry/instances/status"
SECONDARY_INSTANCE = "low_freq_debug"

EXTERNAL_EVENT_ERROR = 554
EXTERNAL_EVENT_ALERT = 553
Expand All @@ -70,12 +69,12 @@ class PDRConstants(object):
CONF_USERNAME = 'admin'
CONF_PASSWORD = 'password'

TEMP_COUNTER = "CableInfo.Temperature"
ERRORS_COUNTER = "errors"
ERRORS_COUNTER = "Symbol_Errors"
RCV_PACKETS_COUNTER = "PortRcvPktsExtended"
RCV_ERRORS_COUNTER = "PortRcvErrorsExtended"
RCV_REMOTE_PHY_ERROR_COUNTER = "PortRcvRemotePhysicalErrorsExtended"
LNK_DOWNED_COUNTER = "LinkDownedCounterExtended"
RCV_ERRORS_COUNTER = "PortRcvErrors"
RCV_REMOTE_PHY_ERROR_COUNTER = "PortRcvRemotePhysicalErrors"
TEMP_COUNTER = "Module_Temperature"
LNK_DOWNED_COUNTER = "Link_Down_IB"

PHY_RAW_ERROR_LANE0 = "phy_raw_errors_lane0"
PHY_RAW_ERROR_LANE1 = "phy_raw_errors_lane1"
Expand All @@ -98,6 +97,9 @@ class PDRConstants(object):
NODE_TYPE_OTHER = "other"
BER_TELEMETRY = "ber_telemetry"

NODE_GUID = "Node_GUID"
PORT_NUMBER = "Port_Number"

ISSUE_PDR = "pdr"
ISSUE_BER = "ber"
ISSUE_PDR_BER = "pdr&ber"
Expand All @@ -109,6 +111,5 @@ class PDRConstants(object):
STATE_ISOLATED = "isolated"
STATE_TREATED = "treated"

PDR_DYNAMIC_NAME = "pdr_dynamic"
# intervals in seconds for testing ber values and corresponding thresholds
BER_THRESHOLDS_INTERVALS = [(125 * 60, 3), (12 * 60, 2.88)]
143 changes: 17 additions & 126 deletions plugins/pdr_deterministic_plugin/ufm_sim_web_service/isolation_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,10 @@
from exclude_list import ExcludeList

from constants import PDRConstants as Constants
from ufm_communication_mgr import DynamicSessionState, UFMCommunicator
from telemetry_collector import TelemetryCollector
from ufm_communication_mgr import UFMCommunicator
# should actually be persistent and thread safe dictionary pf PortStates


class DynamicTelemetryUnresponsive(Exception):
"""
Exception raised when the dynamic telemetry is unresponsive.
"""
pass


class PortData(object):
"""
Represents the port data.
Expand Down Expand Up @@ -194,15 +187,14 @@ def __init__(self, ufm_client: UFMCommunicator, logger):
self.do_deisolate = pdr_config.getboolean(Constants.CONF_ISOLATION,Constants.DO_DEISOLATION)
self.deisolate_consider_time = pdr_config.getint(Constants.CONF_ISOLATION,Constants.DEISOLATE_CONSIDER_TIME)
self.automatic_deisolate = pdr_config.getboolean(Constants.CONF_ISOLATION,Constants.AUTOMATIC_DEISOLATE)
self.dynamic_wait_time = pdr_config.getint(Constants.CONF_ISOLATION,"DYNAMIC_WAIT_TIME")
self.temp_check = pdr_config.getboolean(Constants.CONF_ISOLATION,Constants.CONFIGURED_TEMP_CHECK)
self.link_down_isolation = pdr_config.getboolean(Constants.CONF_ISOLATION,Constants.LINK_DOWN_ISOLATION)
self.switch_hca_isolation = pdr_config.getboolean(Constants.CONF_ISOLATION,Constants.SWITCH_TO_HOST_ISOLATION)
self.test_mode = pdr_config.getboolean(Constants.CONF_COMMON,Constants.TEST_MODE, fallback=False)
self.test_iteration = 0
self.dynamic_unresponsive_limit = pdr_config.getint(Constants.CONF_ISOLATION,Constants.DYNAMIC_UNRESPONSIVE_LIMIT, fallback=3)
# Take from Conf
self.logger = logger
self.telemetry_collector = TelemetryCollector(self.test_mode)
self.ber_intervals = Constants.BER_THRESHOLDS_INTERVALS if not self.test_mode else [[0.5 * 60, 3]]
intervals = [x[0] for x in self.ber_intervals]
self.min_ber_wait_time = min(intervals)
Expand All @@ -227,13 +219,6 @@ def __init__(self, ufm_client: UFMCommunicator, logger):
Constants.LNK_DOWNED_COUNTER,
]

# bring telemetry data on disabled ports
self.dynamic_extra_configuration = {
"plugin_env_CLX_EXPORT_API_ENABLE_DOWN_PORT_COUNTERS": "1",
"plugin_env_CLX_EXPORT_API_ENABLE_DOWN_PHY": "1",
"arg_11": ""
}

self.exclude_list = ExcludeList(self.logger)

def calc_max_ber_wait_time(self, min_threshold):
Expand Down Expand Up @@ -415,10 +400,10 @@ def find_peer_row_for_port(self, port_obj, ports_counters):
return None
peer_guid, peer_num = port_obj.peer.split('_')
# Fix peer guid format for future search
if ports_counters['port_guid'].iloc[0].startswith('0x') and not peer_guid.startswith('0x'):
if ports_counters[Constants.NODE_GUID].iloc[0].startswith('0x') and not peer_guid.startswith('0x'):
peer_guid = f'0x{peer_guid}'
#TODO check for a way to save peer row in data structure for performance
peer_row_list = ports_counters.loc[(ports_counters['port_guid'] == peer_guid) & (ports_counters['port_num'] == int(peer_num))]
peer_row_list = ports_counters.loc[(ports_counters[Constants.NODE_GUID] == peer_guid) & (ports_counters[Constants.PORT_NUMBER] == int(peer_num))]
if peer_row_list.empty:
self.logger.warning(f"Peer port {port_obj.peer} not found in ports data")
return None
Expand Down Expand Up @@ -459,10 +444,8 @@ def check_temp_issue(self, port_obj, row, timestamp):
if cable_temp is not None and not pd.isna(cable_temp):
if cable_temp in ["NA", "N/A", "", "0C", "0"]:
return None
# Get new and saved temperature values
cable_temp = int(cable_temp.split("C")[0]) if isinstance(cable_temp, str) else cable_temp
old_cable_temp = port_obj.counters_values.get(Constants.TEMP_COUNTER);
# Save new temperature value
cable_temp = int(cable_temp.split("C")[0]) if type(cable_temp) == str else cable_temp
old_cable_temp = port_obj.counters_values.get(Constants.TEMP_COUNTER, 0)
port_obj.counters_values[Constants.TEMP_COUNTER] = cable_temp
# Check temperature condition
if cable_temp and (cable_temp > self.tmax):
Expand Down Expand Up @@ -542,17 +525,17 @@ def check_ber_issue(self, port_obj, row, timestamp):
return Issue(port_obj.port_name, Constants.ISSUE_BER)
return None

def read_next_set_of_high_ber_or_pdr_ports(self, endpoint_port):
def read_next_set_of_high_ber_or_pdr_ports(self):
"""
Read the next set of ports and check if they have high BER, PDR, temperature or link downed issues
"""
issues = {}
ports_counters = self.ufm_client.get_telemetry(endpoint_port, Constants.PDR_DYNAMIC_NAME,self.test_mode)
ports_counters = self.telemetry_collector.get_telemetry()
if ports_counters is None:
self.logger.error("Couldn't retrieve telemetry data")
raise DynamicTelemetryUnresponsive
for index, row in ports_counters.iterrows():
port_name = f"{row.get('port_guid', '').split('x')[-1]}_{row.get('port_num', '')}"
return {}
for _, row in ports_counters.iterrows():
port_name = f"{row.get(Constants.NODE_GUID, '').split('x')[-1]}_{row.get(Constants.PORT_NUMBER, '')}"
if self.exclude_list.contains(port_name):
# The port is excluded from analysis
continue
Expand Down Expand Up @@ -766,35 +749,6 @@ def get_isolation_state(self):
port_state.update(Constants.STATE_ISOLATED, Constants.ISSUE_OONOC)
self.ports_states[port] = port_state

def start_telemetry_session(self):
"""
Starts a telemetry session.

Returns:
str: The port number if the dynamic session is started successfully, False otherwise.
"""
self.logger.info("Starting telemetry session")
guids = self.get_requested_guids()
response = self.ufm_client.start_dynamic_session(Constants.PDR_DYNAMIC_NAME, self.telemetry_counters, self.interval, guids, self.dynamic_extra_configuration)
if response and response.status_code == http.HTTPStatus.ACCEPTED:
port = str(int(response.content))
else:
self.logger.error(f"Failed to start dynamic session: {response}")
return False
return port

def update_telemetry_session(self):
"""
Updates the telemetry session by requesting and updating the dynamic session with the specified interval and guids.

Returns:
The response from the UFM client after updating the dynamic session.
"""
self.logger.info("Updating telemetry session")
guids = self.get_requested_guids()
response = self.ufm_client.update_dynamic_session(Constants.PDR_DYNAMIC_NAME, self.interval, guids)
return response

def get_requested_guids(self):
"""
Get the requested GUIDs and their corresponding ports.
Expand All @@ -812,63 +766,13 @@ def get_requested_guids(self):
requested_guids = [{"guid": sys_guid, "ports": ports} for sys_guid, ports in guids.items()]
return requested_guids

# this function create dynamic telemetry and returns the port of this telemetry
def run_telemetry_get_port(self):
"""
Runs the telemetry and returns the endpoint port.

If the test mode is enabled, it returns the test mode port.
Otherwise, it waits for the dynamic session to start, starts the telemetry session,
and retrieves the endpoint port.

Returns:
int: The endpoint port for the telemetry.

Raises:
Exception: If an error occurs during the process.
"""
if self.test_mode:
return Constants.TEST_MODE_PORT
try:
while True:
session_state = self.ufm_client.get_dynamic_session_state(Constants.PDR_DYNAMIC_NAME)
if session_state == DynamicSessionState.RUNNING:
# Telemetry session is running
break
if session_state == DynamicSessionState.NONE:
# Start new session
self.logger.info("Waiting for dynamic session to start")
endpoint_port = self.start_telemetry_session()
time.sleep(self.dynamic_wait_time)
else:
# Stop inactive session
self.logger.info("Waiting for inactive dynamic session to stop")
self.ufm_client.stop_dynamic_session(Constants.PDR_DYNAMIC_NAME)
time.sleep(self.dynamic_wait_time)
except Exception as e:
self.ufm_client.stop_dynamic_session(Constants.PDR_DYNAMIC_NAME)
time.sleep(self.dynamic_wait_time)
endpoint_port = self.ufm_client.dynamic_session_get_port(Constants.PDR_DYNAMIC_NAME)
return endpoint_port

def restart_telemetry_session(self):
"""
Restart the dynamic telemetry session and return the new endpoint port
"""
self.logger.info("Restarting telemetry session")
self.ufm_client.stop_dynamic_session(Constants.PDR_DYNAMIC_NAME)
time.sleep(self.dynamic_wait_time)
endpoint_port = self.run_telemetry_get_port()
return endpoint_port

def main_flow(self):
"""
Executes the main flow of the Isolation Manager.

This method synchronizes with the telemetry clock, retrieves ports metadata,
starts the telemetry session, and continuously retrieves telemetry data to
determine the states of the ports. It handles dynamic telemetry unresponsiveness,
skips isolation if too many ports are detected as unhealthy, and evaluates
continuously retrieves telemetry data from secondary telemetry to
determine the states of the ports. skips isolation if too many ports are detected as unhealthy, and evaluates
isolation and deisolation for reported issues and ports with specific causes.

Args:
Expand All @@ -880,9 +784,6 @@ def main_flow(self):
self.logger.info("Isolation Manager initialized, starting isolation loop")
self.get_ports_metadata()
self.logger.info("Retrieved ports metadata")
endpoint_port = self.run_telemetry_get_port()
self.logger.info("telemetry session started")
dynamic_telemetry_unresponsive_count = 0
while(True):
try:
t_begin = time.time()
Expand All @@ -894,15 +795,9 @@ def main_flow(self):
self.logger.info(f"Retrieving test mode telemetry data to determine ports' states: iteration {self.test_iteration}")
self.test_iteration += 1
try:
issues = self.read_next_set_of_high_ber_or_pdr_ports(endpoint_port)
except DynamicTelemetryUnresponsive:
dynamic_telemetry_unresponsive_count += 1
if dynamic_telemetry_unresponsive_count > self.dynamic_unresponsive_limit:
self.logger.error(f"Dynamic telemetry is unresponsive for {dynamic_telemetry_unresponsive_count} times, restarting telemetry session...")
endpoint_port = self.restart_telemetry_session()
dynamic_telemetry_unresponsive_count = 0
self.test_iteration = 0
continue
issues = self.read_next_set_of_high_ber_or_pdr_ports()
except (KeyError,) as e:
self.logger.error(f"failed to read information with error {e}")
if len(issues) > self.max_num_isolate:
# UFM send external event
event_msg = "got too many ports detected as unhealthy: %d, skipping isolation" % len(issues)
Expand Down Expand Up @@ -937,7 +832,3 @@ def main_flow(self):
self.logger.warning(traceback_err)
t_end = time.time()
time.sleep(max(1, self.interval - (t_end - t_begin)))

# this is a callback for API exposed by this code - second phase
# def work_reportingd(port):
# PORTS_STATE[port].update(Constants.STATE_TREATED, Constants.ISSUE_INIT)
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import pandas as pd
from constants import PDRConstants as Constants
import logging
import urllib


class TelemetryCollector:
"""
Represent Telemetry collector which send DataFrame once telemetry is called.
"""
def __init__(self,test_mode) -> None:
self.test_mode=test_mode

def get_telemetry(self):
"""
get the telemetry from secondary telemetry, if it in test mode it get from the simulation
return DataFrame of the telemetry
"""
if self.test_mode:
url = f"http://127.0.0.1:9090/csv/xcset/simulated_telemetry"
else:
url = f"http://127.0.0.1:{Constants.SECONDARY_TELEMETRY_PORT}/csv/xcset/{Constants.SECONDARY_INSTANCE}"
try:
telemetry_data = pd.read_csv(url)
except (pd.errors.ParserError, pd.errors.EmptyDataError, urllib.error.URLError) as e:
logging.error(f"Failed to get telemetry data from UFM, fetched url={url}. Error: {e}")
telemetry_data = None
return telemetry_data
Loading