Skip to content

Commit

Permalink
issue:#4043165: Create a new telemetry_collector class (#235)
Browse files Browse the repository at this point in the history
  • Loading branch information
egershonNvidia authored Sep 18, 2024
1 parent 83cdab3 commit 7501a79
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 29 deletions.
61 changes: 56 additions & 5 deletions plugins/pdr_deterministic_plugin/tests/simulation_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
import copy
import argparse
import random
from os.path import exists
from os.path import exists,join
from collections import OrderedDict
import requests
from utils.utils import Utils
import pandas as pd
from pathlib import Path
from datetime import datetime,timedelta

lock = Lock()

Expand All @@ -32,6 +35,8 @@
RCV_REMOTE_PHY_ERROR_COUNTER = "PortRcvRemotePhysicalErrors"
TEMP_COUNTER = "Module_Temperature"
FEC_MODE = "fec_mode_active"
TELEMETRY_DATASTORE_LOCATION = "/opt/ufm/ufm_plugin_pdr_deterministic/datastore"
MAX_LOG_FILES = 10
ENDPOINT_CONFIG = {}

EXCLUDE_PORT_LONG_TIME = "ExcludePortForLongTime"
Expand Down Expand Up @@ -70,6 +75,8 @@ def do_GET(self): # pylint: disable=invalid-name
data = endpoint['data']
self.wfile.write(data.encode())

OUTPUT_FILE_FORMAT = "%Y_%m_%d_%H_%M_%S.csv"

DIFFERENT_DEFAULT_VALUES = {
# because the plugin reads the meta data to know the first temperature and we cannot stream the metadata.
TEMP_COUNTER:"5",
Expand Down Expand Up @@ -194,6 +201,7 @@ def start_server(port:str,changes_intervals:int, run_forever:bool):
counters_names = list(counters.keys())
header = ['timestamp', 'source_id,tag,Node_GUID,port_guid,Port_Number'] + counters_names
endpoint['data'] = ""
setup_log_test = False
while True:
# lock.acquire()
data = []
Expand All @@ -217,8 +225,28 @@ def start_server(port:str,changes_intervals:int, run_forever:bool):
if not run_forever and ENDPOINT_CONFIG["ITERATION_TIME"] > MAX_ITERATIONS:
# after all the tests are done, we need to stop the simulator and check the logs
return
if not setup_log_test:
setup_log_test = create_telemetries_logs()
time.sleep(changes_intervals)

def create_telemetries_logs():
"""
create up to double of the amount of logs that can be in the folder. this is a setup for test.
"""
successful = True
abs_file_path = Path(join(TELEMETRY_DATASTORE_LOCATION,"abs"))
files = list(abs_file_path.glob("*.csv"))
if len(files) == 0:
print("abs file have not created yet")
return False
df = pd.read_csv(files[0])
for option_location in ['abs','delta']:
for day in range(1,MAX_LOG_FILES*2):
# we put days back to make sure the logs that the server creates are recent, we have a test for recent logs.
file_name = (datetime.now() - timedelta(days=day)).strftime(OUTPUT_FILE_FORMAT)
df.to_csv(join(TELEMETRY_DATASTORE_LOCATION,option_location,file_name))
return successful

def excluded_ports_simulation(endpoint):
"""
Perform operations on exclusion port for current iteration
Expand All @@ -229,7 +257,6 @@ def excluded_ports_simulation(endpoint):
for port_index in range(len(rows)):
port_name = endpoint["Ports_names"][port_index]
iteration = ENDPOINT_CONFIG["ITERATION_TIME"]

# Process remove operation
if find_value(port_index, INCLUDE_PORT, iteration, None) is not None:
# Remove from exclusion list
Expand Down Expand Up @@ -339,7 +366,31 @@ def check_logs(config):
break
if len(lines) == 0:
print("Could not find log file in " + str(location_logs_can_be))
return 1
return 1

saved_files_tests = True
for option_location in ['abs','delta']:
input_path_dir = Path(join(TELEMETRY_DATASTORE_LOCATION,option_location))
files = list(input_path_dir.glob("*.csv"))

if len(files) == 0: # testing we have files there
print(f"There are no files in the datastore location:{join(TELEMETRY_DATASTORE_LOCATION,option_location)}")
saved_files_tests = False
continue
print_test_result("amount of telemetry logs in the folder equal to maximum or maximum plus 1 (did not clean the plus 1 yet).",
len(files), MAX_LOG_FILES)
if len(files) != MAX_LOG_FILES and len(files) != MAX_LOG_FILES + 1:
saved_files_tests = False
files.sort(key=lambda p: p.name,reverse=True)
latest_file = files[0].name
saved_time = datetime.strptime(latest_file,OUTPUT_FILE_FORMAT)
different_time = datetime.now() - saved_time
print_test_result(f"the latest file saved at {join(TELEMETRY_DATASTORE_LOCATION,option_location)}\
before test running, more than 5 minutes {different_time.total_seconds() > 300}",
different_time.total_seconds() > 300, False)
if different_time.total_seconds() > 300:
saved_files_tests = False
continue
# if a you want to add more tests, please add more guids and test on other indices.

ports_should_be_isolated_indices = list(set([x[1] for x in POSITIVE_DATA_TEST]))
Expand Down Expand Up @@ -373,7 +424,7 @@ def check_logs(config):
break
print_test_result(f"port {port_name} (index: {p}) which check {tested_counter} should not be in the logs", found, False, "negative")

all_pass = number_of_failed_positive_tests == 0 and number_of_failed_negative_tests == 0
all_pass = number_of_failed_positive_tests == 0 and number_of_failed_negative_tests == 0 and saved_files_tests
return 0 if all_pass else 1

def main():
Expand Down Expand Up @@ -417,7 +468,7 @@ def main():

if not validate_simulation_data():
return 1

port = args.endpoint_port
url = f'http://0.0.0.0:{port}{args.url_suffix}'
print(f'---Starting endpoint {url}')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ class PDRConstants():
SWITCH_TO_HOST_ISOLATION = "SWITCH_TO_HOST_ISOLATION"
TEST_MODE = "TEST_MODE"
TEST_MODE_PORT = 9090
SECONDARY_TELEMETRY_PORT = 9002

GET_SESSION_DATA_REST = "/monitoring/session/0/data"
POST_EVENT_REST = "/app/events/external_event"
Expand All @@ -56,7 +55,6 @@ class PDRConstants():
GET_ACTIVE_PORTS_REST = "/resources/ports?active=true"
API_HEALTHY_PORTS = "healthy_ports"
API_ISOLATED_PORTS = "isolated_ports"
SECONDARY_INSTANCE = "low_freq_debug"

TIMEOUT = 60

Expand Down
89 changes: 89 additions & 0 deletions plugins/pdr_deterministic_plugin/ufm_sim_web_service/data_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from os.path import join,exists
from os import remove,makedirs
from pathlib import Path
from datetime import datetime
import pandas as pd

class DataStore:
"""
arrange the data store of the telemetries that we save, abs or delta. remove old data when want.
"""
OUTPUT_FILE_FORMAT = "%Y_%m_%d_%H_%M_%S.csv"
AMOUNT_FILES_TO_KEEP = 10
BASE_PATH = "/opt/ufm/ufm_plugin_pdr_deterministic/datastore"
ABS_PATH = "abs"
DELTA_PATH = "delta"
TAR_SUFFIX = "*.csv"

def __init__(self,logger) -> None:
self.logger = logger
if not exists(self.BASE_PATH):
makedirs(self._folder_abs())
makedirs(self._folder_delta())

def _folder_abs(self) -> str:
return join(self.BASE_PATH,self.ABS_PATH)

def _folder_delta(self) -> str:
return join(self.BASE_PATH,self.DELTA_PATH)

def _get_filename(self) -> str:
return datetime.now().strftime(self.OUTPUT_FILE_FORMAT)

def get_filename_abs(self) -> str:
"""
return a filename for abs data
"""
return join(self._folder_abs(),self._get_filename())

def get_filename_delta(self) -> str:
"""
return a filename for delta data
"""
return join(self._folder_delta(),self._get_filename())

def _get_files_to_remove(self, data_path:str, suffix:str, to_keep:int) -> list:
"""
find the file names of the oldest which is after the amount of to_keep
search for in the data_path with the suffix.
"""
files_to_remove = []
input_path_dir = Path(data_path)
files = list(input_path_dir.glob(suffix))
files.sort(key=lambda p: p.name)
files = [str(p) for p in files]
if len(files) > to_keep:
files_to_remove = files[:len(files)- to_keep]
return files_to_remove

def clean_old_files(self) -> None:
"""
search for the both locations to clean the old files.
"""
for data_path in [self._folder_abs(),self._folder_delta()]:
files = self._get_files_to_remove(data_path,self.TAR_SUFFIX,self.AMOUNT_FILES_TO_KEEP)
if len(files) > 0:
self._remove_files(files)

def _remove_files(self, files: list) -> None:
"""
Delete a list of files
:param files: (List) List of files to be removed
:return: None
"""
self.logger.info(f"removing {len(files)} old files")
for file in files:
try:
if exists(file):
remove(file)
except FileNotFoundError:
pass
except OSError as exc:
self.logger.error("failed to remove file %s [%s]", file, exc)

def save(self, dataframe:pd.DataFrame, file_name:str) -> None:
"""
save dataframe to the file name
"""
self.logger.info(f"saving data to {file_name}")
dataframe.to_csv(file_name)
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
from pdr_algorithm import PortData, IsolatedPort, PDRAlgorithm

from constants import PDRConstants as Constants
from telemetry_collector import TelemetryCollector
from ufm_communication_mgr import UFMCommunicator
# should actually be persistent and thread safe dictionary pf IsolatedPorts
from data_store import DataStore

#pylint: disable=too-many-instance-attributes
class IsolationMgr:
Expand Down Expand Up @@ -48,6 +49,8 @@ def __init__(self, ufm_client: UFMCommunicator, logger):

self.test_iteration = 0
self.logger = logger
self.data_store = DataStore(self.logger)
self.telemetry_collector = TelemetryCollector(self.test_mode,logger,self.data_store)
self.exclude_list = ExcludeList(self.logger)
self.pdr_alg = PDRAlgorithm(self.ufm_client, self.exclude_list, self.logger, pdr_config)

Expand Down Expand Up @@ -268,6 +271,7 @@ def main_flow(self):
t_begin = time.time()
self.exclude_list.refresh()
self.get_isolation_state()
self.data_store.clean_old_files()

issues = None
# Get telemetry data
Expand All @@ -277,7 +281,7 @@ def main_flow(self):
self.logger.info(f"Retrieving test mode telemetry data to determine ports' states: iteration {self.test_iteration}")
self.test_iteration += 1
try:
ports_counters = self.ufm_client.get_telemetry(self.test_mode)
ports_counters = self.telemetry_collector.get_telemetry()
if ports_counters is None:
self.logger.error("Couldn't retrieve telemetry data")
else:
Expand Down Expand Up @@ -309,6 +313,7 @@ def main_flow(self):
for isolated_port in list(self.isolated_ports.values()):
if self.pdr_alg.check_deisolation_conditions(isolated_port):
self.eval_deisolate(isolated_port.name)

self.update_ports_data()
t_end = time.time()
#pylint: disable=broad-except
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ def analyze_telemetry_data(self, ports_data, ports_counters):
issues[port_name] = ber_issue
# If out of operating conditions we'll overwrite the cause
if self.temp_check and self.is_out_of_operating_conf(port_name):
issues[port_name] = Constants.ISSUE_OONOC
issues[port_name].cause = Constants.ISSUE_OONOC
return list(issues.values())

def check_deisolation_conditions(self, isolated_port):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import urllib
import pandas as pd
from data_store import DataStore

class TelemetryCollector:
"""
Represent Telemetry collector which send DataFrame once telemetry is called.
Calls data store class for save the abs or delta data.
"""
BASED_COLUMNS = ["Node_GUID", "port_guid", "Port_Number"]
KEY = BASED_COLUMNS + ["timestamp","tag"]
SECONDARY_TELEMETRY_PORT = 9002
SECONDARY_INSTANCE = "low_freq_debug"

def __init__(self,test_mode:bool,logger,data_store:DataStore) -> None:
self.test_mode = test_mode
self.logger = logger
self.previous_telemetry_data = None
self.data_store = data_store

def get_telemetry(self):
"""
get the telemetry from secondary telemetry, if it in test mode it get from the simulation
return DataFrame of the telemetry
"""
if self.test_mode:
url = "http://127.0.0.1:9090/csv/xcset/simulated_telemetry"
else:
url = f"http://127.0.0.1:{self.SECONDARY_TELEMETRY_PORT}/csv/xcset/{self.SECONDARY_INSTANCE}"
try:
self.logger.info(f"collecting telemetry from {url}.")
telemetry_data = pd.read_csv(url)
except (pd.errors.ParserError, pd.errors.EmptyDataError, urllib.error.URLError) as connection_error:
self.logger.error("failed to get telemetry data from UFM, fetched url=%s. Error: %s",url,connection_error)
telemetry_data = None
if self.previous_telemetry_data is not None and telemetry_data is not None:
delta = self._get_delta(self.previous_telemetry_data,telemetry_data)
# when we want to keep only delta
if len(delta) > 0:
self.data_store.save(delta,self.data_store.get_filename_delta())
elif telemetry_data is not None:
# when we want to keep the abs
self.previous_telemetry_data = telemetry_data
self.data_store.save(telemetry_data,self.data_store.get_filename_abs())
return telemetry_data

def _get_delta(self, first_df: pd.DataFrame, second_df:pd.DataFrame):
merged_df = pd.merge(second_df, first_df, on=self.BASED_COLUMNS, how='inner', suffixes=('', '_x'))
delta_dataframe = pd.DataFrame()
for index,col in enumerate(second_df.columns):
if col not in self.KEY and not isinstance(merged_df.iat[0,index],str):
col_x = col + "_x"
delta_dataframe[col] = merged_df[col] - merged_df[col_x]
else:
delta_dataframe[col] = second_df[col]
return delta_dataframe
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,10 @@
# provided with the software product.
#

import urllib.error
import urllib
import logging
import http
from constants import PDRConstants as Constants
import requests
import pandas as pd

class UFMCommunicator:
"""
Expand Down Expand Up @@ -59,22 +56,6 @@ def send_request(self, uri, data, method=Constants.POST_METHOD, headers=None):
logging.info("UFM API Request Status: %s, URL: %s",response.status_code, request)
return response

def get_telemetry(self,test_mode):
"""
get the telemetry from secondary telemetry, if it in test mode it get from the simulation
return DataFrame of the telemetry
"""
if test_mode:
url = "http://127.0.0.1:9090/csv/xcset/simulated_telemetry"
else:
url = f"http://127.0.0.1:{Constants.SECONDARY_TELEMETRY_PORT}/csv/xcset/{Constants.SECONDARY_INSTANCE}"
try:
telemetry_data = pd.read_csv(url)
except (pd.errors.ParserError, pd.errors.EmptyDataError, urllib.error.URLError) as error:
logging.error("Failed to get telemetry data from UFM, fetched url=%s. Error: %s",url,error)
telemetry_data = None
return telemetry_data

def send_event(self, message, event_id=Constants.EXTERNAL_EVENT_NOTICE,
external_event_name="PDR Plugin Event", external_event_type="PDR Plugin Event"):
data = {
Expand Down

0 comments on commit 7501a79

Please sign in to comment.