Skip to content

Commit

Permalink
replace dynamic telmetry with the secondary telemetry
Browse files Browse the repository at this point in the history
Signed-off-by: Elad Gershon <egershon@nvidia.com>
  • Loading branch information
egershonNvidia committed Aug 14, 2024
1 parent 92541da commit 8ebc25d
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 191 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ DEISOLATE_CONSIDER_TIME=300
AUTOMATIC_DEISOLATE=True
# if set to false, the plugin will not perform deisolation
DO_DEISOLATION=True
DYNAMIC_WAIT_TIME=30
# number of times to check if a dynamic session is unresponsive before restarting it
DYNAMIC_UNRESPONSIVE_LIMIT=3

[Metrics]
# in Celsius
Expand Down
10 changes: 5 additions & 5 deletions plugins/pdr_deterministic_plugin/tests/simulation_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
lock = Lock()

PHY_EFF_ERROR = "phy_effective_errors"
PHY_SYMBOL_ERROR = "phy_symbol_errors"
PHY_SYMBOL_ERROR = "Symbol_Errors"
RCV_PACKETS_COUNTER = "PortRcvPktsExtended"
RCV_ERRORS_COUNTER = "PortRcvErrorsExtended"
LINK_DOWN_COUNTER = "LinkDownedCounterExtended"
RCV_REMOTE_PHY_ERROR_COUNTER = "PortRcvRemotePhysicalErrorsExtended"
TEMP_COUNTER = "CableInfo.Temperature"
RCV_ERRORS_COUNTER = "PortRcvErrors"
LINK_DOWN_COUNTER = "Link_Down_IB"
RCV_REMOTE_PHY_ERROR_COUNTER = "PortRcvRemotePhysicalErrors"
TEMP_COUNTER = "Module_Temperature"
FEC_MODE = "fec_mode_active"
ENDPOINT_CONFIG = {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class PDRConstants(object):
SWITCH_TO_HOST_ISOLATION = "SWITCH_TO_HOST_ISOLATION"
TEST_MODE = "TEST_MODE"
TEST_MODE_PORT = 9090
DYNAMIC_UNRESPONSIVE_LIMIT = "DYNAMIC_UNRESPONSIVE_LIMIT"
SECONDARY_TELEMETRY_PORT = 9002

GET_SESSION_DATA_REST = "/monitoring/session/0/data"
POST_EVENT_REST = "/app/events/external_event"
Expand All @@ -53,8 +53,7 @@ class PDRConstants(object):
GET_ACTIVE_PORTS_REST = "/resources/ports?active=true"
API_HEALTHY_PORTS = "healthy_ports"
API_ISOLATED_PORTS = "isolated_ports"
DYNAMIC_SESSION_REST = "/app/telemetry/instances/%s"
STATUS_DYNAMIC_SESSION_REST = "/app/telemetry/instances/status"
SECONDARY_INSTANCE = "low_freq_debug"

EXTERNAL_EVENT_ERROR = 554
EXTERNAL_EVENT_ALERT = 553
Expand All @@ -70,12 +69,12 @@ class PDRConstants(object):
CONF_USERNAME = 'admin'
CONF_PASSWORD = 'password'

TEMP_COUNTER = "CableInfo.Temperature"
ERRORS_COUNTER = "errors"
ERRORS_COUNTER = "Symbol_Errors"
RCV_PACKETS_COUNTER = "PortRcvPktsExtended"
RCV_ERRORS_COUNTER = "PortRcvErrorsExtended"
RCV_REMOTE_PHY_ERROR_COUNTER = "PortRcvRemotePhysicalErrorsExtended"
LNK_DOWNED_COUNTER = "LinkDownedCounterExtended"
RCV_ERRORS_COUNTER = "PortRcvErrors"
RCV_REMOTE_PHY_ERROR_COUNTER = "PortRcvRemotePhysicalErrors"
TEMP_COUNTER = "Module_Temperature"
LNK_DOWNED_COUNTER = "Link_Down_IB"

PHY_RAW_ERROR_LANE0 = "phy_raw_errors_lane0"
PHY_RAW_ERROR_LANE1 = "phy_raw_errors_lane1"
Expand Down Expand Up @@ -109,6 +108,5 @@ class PDRConstants(object):
STATE_ISOLATED = "isolated"
STATE_TREATED = "treated"

PDR_DYNAMIC_NAME = "pdr_dynamic"
# intervals in seconds for testing ber values and corresponding thresholds
BER_THRESHOLDS_INTERVALS = [(125 * 60, 3), (12 * 60, 2.88)]
142 changes: 15 additions & 127 deletions plugins/pdr_deterministic_plugin/ufm_sim_web_service/isolation_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,9 @@
from exclude_list import ExcludeList

from constants import PDRConstants as Constants
from ufm_communication_mgr import DynamicSessionState, UFMCommunicator
from ufm_communication_mgr import UFMCommunicator
# should actually be persistent and thread safe dictionary pf PortStates


class DynamicTelemetryUnresponsive(Exception):
"""
Exception raised when the dynamic telemetry is unresponsive.
"""
pass


class PortData(object):
"""
Represents the port data.
Expand Down Expand Up @@ -156,7 +148,7 @@ def get_counter(counter_name, row, default=0):
or its value is NaN.
"""
try:
val = row.get(counter_name) if (row.get(counter_name) is not None and not pd.isna(row.get(counter_name))) else default
val = row.get(counter_name) if (row.get(counter_name) is not None and not numpy.isnan(row.get(counter_name))) else default
except Exception as e:
return default
return val
Expand Down Expand Up @@ -194,13 +186,11 @@ def __init__(self, ufm_client: UFMCommunicator, logger):
self.do_deisolate = pdr_config.getboolean(Constants.CONF_ISOLATION,Constants.DO_DEISOLATION)
self.deisolate_consider_time = pdr_config.getint(Constants.CONF_ISOLATION,Constants.DEISOLATE_CONSIDER_TIME)
self.automatic_deisolate = pdr_config.getboolean(Constants.CONF_ISOLATION,Constants.AUTOMATIC_DEISOLATE)
self.dynamic_wait_time = pdr_config.getint(Constants.CONF_ISOLATION,"DYNAMIC_WAIT_TIME")
self.temp_check = pdr_config.getboolean(Constants.CONF_ISOLATION,Constants.CONFIGURED_TEMP_CHECK)
self.link_down_isolation = pdr_config.getboolean(Constants.CONF_ISOLATION,Constants.LINK_DOWN_ISOLATION)
self.switch_hca_isolation = pdr_config.getboolean(Constants.CONF_ISOLATION,Constants.SWITCH_TO_HOST_ISOLATION)
self.test_mode = pdr_config.getboolean(Constants.CONF_COMMON,Constants.TEST_MODE, fallback=False)
self.test_iteration = 0
self.dynamic_unresponsive_limit = pdr_config.getint(Constants.CONF_ISOLATION,Constants.DYNAMIC_UNRESPONSIVE_LIMIT, fallback=3)
# Take from Conf
self.logger = logger
self.ber_intervals = Constants.BER_THRESHOLDS_INTERVALS if not self.test_mode else [[0.5 * 60, 3]]
Expand All @@ -227,13 +217,6 @@ def __init__(self, ufm_client: UFMCommunicator, logger):
Constants.LNK_DOWNED_COUNTER,
]

# bring telemetry data on disabled ports
self.dynamic_extra_configuration = {
"plugin_env_CLX_EXPORT_API_ENABLE_DOWN_PORT_COUNTERS": "1",
"plugin_env_CLX_EXPORT_API_ENABLE_DOWN_PHY": "1",
"arg_11": ""
}

self.exclude_list = ExcludeList(self.logger)

def calc_max_ber_wait_time(self, min_threshold):
Expand Down Expand Up @@ -456,13 +439,11 @@ def check_temp_issue(self, port_obj, row, timestamp):
if not self.temp_check:
return None
cable_temp = get_counter(Constants.TEMP_COUNTER, row, default=None)
if cable_temp is not None and not pd.isna(cable_temp):
if cable_temp in ["NA", "N/A", "", "0C", "0"]:
if cable_temp is not None and not numpy.isnan(cable_temp):
if cable_temp in ["NA", "N/A", "", "0C"]:
return None
# Get new and saved temperature values
cable_temp = int(cable_temp.split("C")[0]) if isinstance(cable_temp, str) else cable_temp
old_cable_temp = port_obj.counters_values.get(Constants.TEMP_COUNTER);
# Save new temperature value
cable_temp = int(cable_temp.split("C")[0]) if type(cable_temp) == str else cable_temp
old_cable_temp = port_obj.counters_values.get(Constants.TEMP_COUNTER, 0)
port_obj.counters_values[Constants.TEMP_COUNTER] = cable_temp
# Check temperature condition
if cable_temp and (cable_temp > self.tmax):
Expand Down Expand Up @@ -542,17 +523,16 @@ def check_ber_issue(self, port_obj, row, timestamp):
return Issue(port_obj.port_name, Constants.ISSUE_BER)
return None

def read_next_set_of_high_ber_or_pdr_ports(self, endpoint_port):
def read_next_set_of_high_ber_or_pdr_ports(self):
"""
Read the next set of ports and check if they have high BER, PDR, temperature or link downed issues
"""
issues = {}
ports_counters = self.ufm_client.get_telemetry(endpoint_port, Constants.PDR_DYNAMIC_NAME,self.test_mode)
ports_counters = self.ufm_client.get_telemetry(Constants.SECONDARY_TELEMETRY_PORT, Constants.SECONDARY_INSTANCE, self.test_mode)
if ports_counters is None:
self.logger.error("Couldn't retrieve telemetry data")
raise DynamicTelemetryUnresponsive
for index, row in ports_counters.iterrows():
port_name = f"{row.get('port_guid', '').split('x')[-1]}_{row.get('port_num', '')}"
for _, row in ports_counters.iterrows():
port_name = f"{row.get('Node_GUID', '').split('x')[-1]}_{row.get('Port_Number', '')}"
if self.exclude_list.contains(port_name):
# The port is excluded from analysis
continue
Expand Down Expand Up @@ -766,35 +746,6 @@ def get_isolation_state(self):
port_state.update(Constants.STATE_ISOLATED, Constants.ISSUE_OONOC)
self.ports_states[port] = port_state

def start_telemetry_session(self):
"""
Starts a telemetry session.
Returns:
str: The port number if the dynamic session is started successfully, False otherwise.
"""
self.logger.info("Starting telemetry session")
guids = self.get_requested_guids()
response = self.ufm_client.start_dynamic_session(Constants.PDR_DYNAMIC_NAME, self.telemetry_counters, self.interval, guids, self.dynamic_extra_configuration)
if response and response.status_code == http.HTTPStatus.ACCEPTED:
port = str(int(response.content))
else:
self.logger.error(f"Failed to start dynamic session: {response}")
return False
return port

def update_telemetry_session(self):
"""
Updates the telemetry session by requesting and updating the dynamic session with the specified interval and guids.
Returns:
The response from the UFM client after updating the dynamic session.
"""
self.logger.info("Updating telemetry session")
guids = self.get_requested_guids()
response = self.ufm_client.update_dynamic_session(Constants.PDR_DYNAMIC_NAME, self.interval, guids)
return response

def get_requested_guids(self):
"""
Get the requested GUIDs and their corresponding ports.
Expand All @@ -812,63 +763,13 @@ def get_requested_guids(self):
requested_guids = [{"guid": sys_guid, "ports": ports} for sys_guid, ports in guids.items()]
return requested_guids

# this function create dynamic telemetry and returns the port of this telemetry
def run_telemetry_get_port(self):
"""
Runs the telemetry and returns the endpoint port.
If the test mode is enabled, it returns the test mode port.
Otherwise, it waits for the dynamic session to start, starts the telemetry session,
and retrieves the endpoint port.
Returns:
int: The endpoint port for the telemetry.
Raises:
Exception: If an error occurs during the process.
"""
if self.test_mode:
return Constants.TEST_MODE_PORT
try:
while True:
session_state = self.ufm_client.get_dynamic_session_state(Constants.PDR_DYNAMIC_NAME)
if session_state == DynamicSessionState.RUNNING:
# Telemetry session is running
break
if session_state == DynamicSessionState.NONE:
# Start new session
self.logger.info("Waiting for dynamic session to start")
endpoint_port = self.start_telemetry_session()
time.sleep(self.dynamic_wait_time)
else:
# Stop inactive session
self.logger.info("Waiting for inactive dynamic session to stop")
self.ufm_client.stop_dynamic_session(Constants.PDR_DYNAMIC_NAME)
time.sleep(self.dynamic_wait_time)
except Exception as e:
self.ufm_client.stop_dynamic_session(Constants.PDR_DYNAMIC_NAME)
time.sleep(self.dynamic_wait_time)
endpoint_port = self.ufm_client.dynamic_session_get_port(Constants.PDR_DYNAMIC_NAME)
return endpoint_port

def restart_telemetry_session(self):
"""
Restart the dynamic telemetry session and return the new endpoint port
"""
self.logger.info("Restarting telemetry session")
self.ufm_client.stop_dynamic_session(Constants.PDR_DYNAMIC_NAME)
time.sleep(self.dynamic_wait_time)
endpoint_port = self.run_telemetry_get_port()
return endpoint_port

def main_flow(self):
"""
Executes the main flow of the Isolation Manager.
This method synchronizes with the telemetry clock, retrieves ports metadata,
starts the telemetry session, and continuously retrieves telemetry data to
determine the states of the ports. It handles dynamic telemetry unresponsiveness,
skips isolation if too many ports are detected as unhealthy, and evaluates
continuously retrieves telemetry data from secondary telemetry to
determine the states of the ports. skips isolation if too many ports are detected as unhealthy, and evaluates
isolation and deisolation for reported issues and ports with specific causes.
Args:
Expand All @@ -880,9 +781,6 @@ def main_flow(self):
self.logger.info("Isolation Manager initialized, starting isolation loop")
self.get_ports_metadata()
self.logger.info("Retrieved ports metadata")
endpoint_port = self.run_telemetry_get_port()
self.logger.info("telemetry session started")
dynamic_telemetry_unresponsive_count = 0
while(True):
try:
t_begin = time.time()
Expand All @@ -894,15 +792,9 @@ def main_flow(self):
self.logger.info(f"Retrieving test mode telemetry data to determine ports' states: iteration {self.test_iteration}")
self.test_iteration += 1
try:
issues = self.read_next_set_of_high_ber_or_pdr_ports(endpoint_port)
except DynamicTelemetryUnresponsive:
dynamic_telemetry_unresponsive_count += 1
if dynamic_telemetry_unresponsive_count > self.dynamic_unresponsive_limit:
self.logger.error(f"Dynamic telemetry is unresponsive for {dynamic_telemetry_unresponsive_count} times, restarting telemetry session...")
endpoint_port = self.restart_telemetry_session()
dynamic_telemetry_unresponsive_count = 0
self.test_iteration = 0
continue
issues = self.read_next_set_of_high_ber_or_pdr_ports()
except (KeyError,) as e:
self.logger.error(f"failed to read information with error {e}")
if len(issues) > self.max_num_isolate:
# UFM send external event
event_msg = "got too many ports detected as unhealthy: %d, skipping isolation" % len(issues)
Expand Down Expand Up @@ -937,7 +829,3 @@ def main_flow(self):
self.logger.warning(traceback_err)
t_end = time.time()
time.sleep(max(1, self.interval - (t_end - t_begin)))

# this is a callback for API exposed by this code - second phase
# def work_reportingd(port):
# PORTS_STATE[port].update(Constants.STATE_TREATED, Constants.ISSUE_INIT)
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,6 @@
import http
import pandas as pd

class DynamicSessionState(Enum):
"""
States of telemetry session instance
"""
NONE = 0
INACTIVE = 1
RUNNING = 2

class UFMCommunicator:

def __init__(self, host='127.0.0.1', ufm_port=8000):
Expand Down Expand Up @@ -67,7 +59,7 @@ def get_telemetry(self, port, instance_name,test_mode):
url = f"http://127.0.0.1:{port}/csv/xcset/{instance_name}"
try:
telemetry_data = pd.read_csv(url)
except Exception as e:
except (pd.errors.ParserError, pd.errors.EmptyDataError) as e:
logging.error(f"Failed to get telemetry data from UFM, fetched url={url}. Error: {e}")
telemetry_data = None
return telemetry_data
Expand Down Expand Up @@ -119,41 +111,3 @@ def get_ports_metadata(self):

def get_port_metadata(self, port_name):
return self.get_request("%s/%s" % (Constants.GET_PORTS_REST, port_name))

def start_dynamic_session(self, instance_name, counters, sample_rate, guids, extra_configuration=None):
data = {
"counters": counters,
"sample_rate": sample_rate,
"requested_guids": guids,
"is_registered_discovery": False
}
if extra_configuration:
data["configuration"] = extra_configuration
return self.send_request(Constants.DYNAMIC_SESSION_REST % instance_name, data, method=Constants.POST_METHOD)

def update_dynamic_session(self, instance_name, sample_rate, guids):
data = {
"sample_rate": sample_rate,
"requested_guids": guids
}
return self.send_request(Constants.DYNAMIC_SESSION_REST % instance_name, data, method=Constants.PUT_METHOD)

def get_dynamic_session_state(self, instance_name):
response = self.get_request(Constants.STATUS_DYNAMIC_SESSION_REST)
if response:
instance_status = response.get(instance_name)
if instance_status:
if instance_status.get("status") == "running":
return DynamicSessionState.RUNNING
else:
return DynamicSessionState.INACTIVE
return DynamicSessionState.NONE

def stop_dynamic_session(self, instance_name):
data = {}
return self.send_request(Constants.DYNAMIC_SESSION_REST % instance_name, data, method=Constants.DELETE_METHOD)

def dynamic_session_get_port(self, instance_name):
data = self.get_request(Constants.DYNAMIC_SESSION_REST % instance_name)
if data:
return data.get("endpoint_port")

0 comments on commit 8ebc25d

Please sign in to comment.