Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue:#4043165: Create a new telemetry_collector class #235

Merged
merged 30 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
c729b59
replace dynamic telmetry with the secondary telemetry
egershonNvidia Aug 14, 2024
6eb60da
fixed vitaly comment
egershonNvidia Aug 14, 2024
0e3eebf
fixed CI issue and
egershonNvidia Aug 14, 2024
58c2b9d
fix CI and adding constants
egershonNvidia Aug 14, 2024
651c51a
move get_telemetry to it's own class
egershonNvidia Aug 14, 2024
5e8ce58
added comments and fixed dror comments
egershonNvidia Aug 14, 2024
e388ac5
added fix for the connection between runner and machine
egershonNvidia Aug 14, 2024
65982e2
rename file telemetry collector
egershonNvidia Aug 14, 2024
dad6b6b
removed Telemetry collector
egershonNvidia Aug 15, 2024
f28263f
put telemetry collection in it own class
egershonNvidia Aug 19, 2024
d15a14e
added the class
egershonNvidia Aug 19, 2024
bd0147b
Merge branch 'main' into pdr_telemetry
egershonNvidia Sep 4, 2024
b4ad16d
add save location
egershonNvidia Sep 4, 2024
0729ecb
added data store and telemetry collector classes
egershonNvidia Sep 10, 2024
46120dc
Merge branch 'main' into pdr_telemetry
egershonNvidia Sep 10, 2024
8eb2577
replace the ufm client get telemetry
egershonNvidia Sep 10, 2024
0e7c507
fixed crashes
egershonNvidia Sep 10, 2024
63f38c3
remove constant changes
egershonNvidia Sep 10, 2024
df3bbb3
added a test getting the file
egershonNvidia Sep 16, 2024
99dbffd
Merge branch 'main' into pdr_telemetry
egershonNvidia Sep 16, 2024
4769aca
added a test to insure that we have maximum logs.
egershonNvidia Sep 17, 2024
94b9705
fixed creation of log
egershonNvidia Sep 17, 2024
88f823b
another edit for the test
egershonNvidia Sep 17, 2024
b8011ef
copying only abs
egershonNvidia Sep 17, 2024
3e94cce
fixed test
egershonNvidia Sep 17, 2024
cef733a
the length of file is 0
egershonNvidia Sep 17, 2024
4e8e662
the len of files it 0
egershonNvidia Sep 17, 2024
416c00d
revert the sort order to take the last item
egershonNvidia Sep 17, 2024
c204de5
fixed the maximum log test
egershonNvidia Sep 17, 2024
4709eba
fixed grammer mistake
egershonNvidia Sep 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 58 additions & 5 deletions plugins/pdr_deterministic_plugin/tests/simulation_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
import copy
import argparse
import random
from os.path import exists
from os.path import exists,join
from collections import OrderedDict
import requests
from utils.utils import Utils
import pandas as pd
from pathlib import Path
from datetime import datetime,timedelta

lock = Lock()

Expand All @@ -32,6 +35,8 @@
RCV_REMOTE_PHY_ERROR_COUNTER = "PortRcvRemotePhysicalErrors"
TEMP_COUNTER = "Module_Temperature"
FEC_MODE = "fec_mode_active"
TELEMETRY_DATASTORE_LOCATION = "/opt/ufm/ufm_plugin_pdr_deterministic/datastore"
MAX_LOG_FILES = 10
ENDPOINT_CONFIG = {}

EXCLUDE_PORT_LONG_TIME = "ExcludePortForLongTime"
Expand Down Expand Up @@ -70,6 +75,8 @@ def do_GET(self): # pylint: disable=invalid-name
data = endpoint['data']
self.wfile.write(data.encode())

OUTPUT_FILE_FORMAT = "%Y_%m_%d_%H_%M_%S.csv"

DIFFERENT_DEFAULT_VALUES = {
# because the plugin reads the meta data to know the first temperature and we cannot stream the metadata.
TEMP_COUNTER:"5",
Expand Down Expand Up @@ -194,6 +201,7 @@ def start_server(port:str,changes_intervals:int, run_forever:bool):
counters_names = list(counters.keys())
header = ['timestamp', 'source_id,tag,Node_GUID,port_guid,Port_Number'] + counters_names
endpoint['data'] = ""
setup_log_test = False
while True:
# lock.acquire()
data = []
Expand All @@ -217,8 +225,29 @@ def start_server(port:str,changes_intervals:int, run_forever:bool):
if not run_forever and ENDPOINT_CONFIG["ITERATION_TIME"] > MAX_ITERATIONS:
# after all the tests are done, we need to stop the simulator and check the logs
return
if not setup_log_test:
setup_log_test = create_telemetries_logs()
time.sleep(changes_intervals)

def create_telemetries_logs():
"""
create up to double of the amount of logs that can be in the folder. this is a setup for test.
"""
successful = True
for option_location in ['abs','delta']:
input_path_dir = Path(join(TELEMETRY_DATASTORE_LOCATION,option_location))
files = list(input_path_dir.glob("*.csv"))
if len(files)==0:
print("didnt create files because there are no files in the folder")
egershonNvidia marked this conversation as resolved.
Show resolved Hide resolved
successful = False
continue
df = pd.read_csv(files[0])
for day in range(1,MAX_LOG_FILES*2):
# we put days back to make sure the logs that the server creates are recent, we have a test for recent logs.
file_name = (datetime.now() - timedelta(days=day)).strftime(OUTPUT_FILE_FORMAT)
df.to_csv(join(TELEMETRY_DATASTORE_LOCATION,option_location,file_name))
return successful

def excluded_ports_simulation(endpoint):
"""
Perform operations on exclusion port for current iteration
Expand All @@ -229,7 +258,6 @@ def excluded_ports_simulation(endpoint):
for port_index in range(len(rows)):
port_name = endpoint["Ports_names"][port_index]
iteration = ENDPOINT_CONFIG["ITERATION_TIME"]

# Process remove operation
if find_value(port_index, INCLUDE_PORT, iteration, None) is not None:
# Remove from exclusion list
Expand Down Expand Up @@ -339,7 +367,32 @@ def check_logs(config):
break
if len(lines) == 0:
print("Could not find log file in " + str(location_logs_can_be))
return 1
return 1

saved_files_tests = True

for option_location in ['abs','delta']:
input_path_dir = Path(join(TELEMETRY_DATASTORE_LOCATION,option_location))
files = list(input_path_dir.glob("*.csv"))

if len(files) == 0: # testing we have files there
print(f"There is no files in the datastore location:{join(TELEMETRY_DATASTORE_LOCATION,option_location)}")
egershonNvidia marked this conversation as resolved.
Show resolved Hide resolved
saved_files_tests = False
continue
print_test_result("there are only 10 items in the set, The test copy in each iterations more logs.",
egershonNvidia marked this conversation as resolved.
Show resolved Hide resolved
len(files), MAX_LOG_FILES)
if len(files) != MAX_LOG_FILES:
saved_files_tests = False
files.sort(key=lambda p: p.name)
latest_file = files[0].name
saved_time = datetime.strptime(latest_file,OUTPUT_FILE_FORMAT)
different_time = datetime.now() - saved_time
print_test_result(f"the latest file saved at {join(TELEMETRY_DATASTORE_LOCATION,option_location)}\
before test running, more than 5 minutes {different_time.total_seconds() > 300}",
different_time.total_seconds() > 300, False)
if different_time.total_seconds() > 300:
egershonNvidia marked this conversation as resolved.
Show resolved Hide resolved
saved_files_tests = False
continue
# if a you want to add more tests, please add more guids and test on other indices.

ports_should_be_isolated_indices = list(set([x[1] for x in POSITIVE_DATA_TEST]))
Expand Down Expand Up @@ -373,7 +426,7 @@ def check_logs(config):
break
print_test_result(f"port {port_name} (index: {p}) which check {tested_counter} should not be in the logs", found, False, "negative")

all_pass = number_of_failed_positive_tests == 0 and number_of_failed_negative_tests == 0
all_pass = number_of_failed_positive_tests == 0 and number_of_failed_negative_tests == 0 and saved_files_tests
return 0 if all_pass else 1

def main():
Expand Down Expand Up @@ -417,7 +470,7 @@ def main():

if not validate_simulation_data():
return 1

port = args.endpoint_port
url = f'http://0.0.0.0:{port}{args.url_suffix}'
print(f'---Starting endpoint {url}')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ class PDRConstants():
SWITCH_TO_HOST_ISOLATION = "SWITCH_TO_HOST_ISOLATION"
TEST_MODE = "TEST_MODE"
TEST_MODE_PORT = 9090
SECONDARY_TELEMETRY_PORT = 9002

GET_SESSION_DATA_REST = "/monitoring/session/0/data"
POST_EVENT_REST = "/app/events/external_event"
Expand All @@ -56,7 +55,6 @@ class PDRConstants():
GET_ACTIVE_PORTS_REST = "/resources/ports?active=true"
API_HEALTHY_PORTS = "healthy_ports"
API_ISOLATED_PORTS = "isolated_ports"
SECONDARY_INSTANCE = "low_freq_debug"

TIMEOUT = 60

Expand Down
89 changes: 89 additions & 0 deletions plugins/pdr_deterministic_plugin/ufm_sim_web_service/data_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from os.path import join,exists
from os import remove,makedirs
from pathlib import Path
from datetime import datetime
import pandas as pd

class DataStore:
"""
arrange the data store of the telemetries that we save, abs or delta. remove old data when want.
"""
OUTPUT_FILE_FORMAT = "%Y_%m_%d_%H_%M_%S.csv"
AMOUNT_FILES_TO_KEEP = 10
BASE_PATH = "/opt/ufm/ufm_plugin_pdr_deterministic/datastore"
ABS_PATH = "abs"
DELTA_PATH = "delta"
TAR_SUFFIX = "*.csv"

def __init__(self,logger) -> None:
self.logger = logger
if not exists(self.BASE_PATH):
makedirs(self._folder_abs())
makedirs(self._folder_delta())

def _folder_abs(self) -> str:
return join(self.BASE_PATH,self.ABS_PATH)

def _folder_delta(self) -> str:
return join(self.BASE_PATH,self.DELTA_PATH)

def _get_filename(self) -> str:
return datetime.now().strftime(self.OUTPUT_FILE_FORMAT)

def get_filename_abs(self) -> str:
"""
return a filename for abs data
"""
return join(self._folder_abs(),self._get_filename())

def get_filename_delta(self) -> str:
"""
return a filename for delta data
"""
return join(self._folder_delta(),self._get_filename())

def _get_files_to_remove(self, data_path:str, suffix:str, to_keep:int) -> list:
"""
find the file names of the oldest which is after the amount of to_keep
search for in the data_path with the suffix.
"""
files_to_remove = []
input_path_dir = Path(data_path)
files = list(input_path_dir.glob(suffix))
files.sort(key=lambda p: p.name)
files = [str(p) for p in files]
if len(files) > to_keep:
files_to_remove = files[:len(files)- to_keep]
return files_to_remove

def clean_old_files(self) -> None:
"""
search for the both locations to clean the old files.
"""
for data_path in [self._folder_abs(),self._folder_delta()]:
files = self._get_files_to_remove(data_path,self.TAR_SUFFIX,self.AMOUNT_FILES_TO_KEEP)
if len(files) > 0:
self._remove_files(files)

def _remove_files(self, files: list) -> None:
"""
Delete a list of files
:param files: (List) List of files to be removed
:return: None
"""
self.logger.info(f"removing {len(files)} old files")
for file in files:
try:
if exists(file):
remove(file)
except FileNotFoundError:
pass
except OSError as exc:
self.logger.error("failed to remove file %s [%s]", file, exc)

def save(self, dataframe:pd.DataFrame, file_name:str) -> None:
"""
save dataframe to the file name
"""
self.logger.info(f"saving data to {file_name}")
dataframe.to_csv(file_name)
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
from pdr_algorithm import PortData, IsolatedPort, PDRAlgorithm

from constants import PDRConstants as Constants
from telemetry_collector import TelemetryCollector
from ufm_communication_mgr import UFMCommunicator
# should actually be persistent and thread safe dictionary pf IsolatedPorts
from data_store import DataStore

#pylint: disable=too-many-instance-attributes
class IsolationMgr:
Expand Down Expand Up @@ -48,6 +49,8 @@ def __init__(self, ufm_client: UFMCommunicator, logger):

self.test_iteration = 0
self.logger = logger
self.data_store = DataStore(self.logger)
self.telemetry_collector = TelemetryCollector(self.test_mode,logger,self.data_store)
self.exclude_list = ExcludeList(self.logger)
self.pdr_alg = PDRAlgorithm(self.ufm_client, self.exclude_list, self.logger, pdr_config)

Expand Down Expand Up @@ -277,7 +280,7 @@ def main_flow(self):
self.logger.info(f"Retrieving test mode telemetry data to determine ports' states: iteration {self.test_iteration}")
self.test_iteration += 1
try:
ports_counters = self.ufm_client.get_telemetry(self.test_mode)
ports_counters = self.telemetry_collector.get_telemetry()
if ports_counters is None:
self.logger.error("Couldn't retrieve telemetry data")
else:
Expand Down Expand Up @@ -309,6 +312,7 @@ def main_flow(self):
for isolated_port in list(self.isolated_ports.values()):
if self.pdr_alg.check_deisolation_conditions(isolated_port):
self.eval_deisolate(isolated_port.name)

self.update_ports_data()
t_end = time.time()
#pylint: disable=broad-except
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ def analyze_telemetry_data(self, ports_data, ports_counters):
issues[port_name] = ber_issue
# If out of operating conditions we'll overwrite the cause
if self.temp_check and self.is_out_of_operating_conf(port_name):
issues[port_name] = Constants.ISSUE_OONOC
issues[port_name].cause = Constants.ISSUE_OONOC
return list(issues.values())

def check_deisolation_conditions(self, isolated_port):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import urllib
import pandas as pd
from data_store import DataStore

class TelemetryCollector:
"""
Represent Telemetry collector which send DataFrame once telemetry is called.
Calls data store class for save the abs or delta data.
"""
BASED_COLUMNS = ["Node_GUID", "port_guid", "Port_Number"]
KEY = BASED_COLUMNS + ["timestamp","tag"]
SECONDARY_TELEMETRY_PORT = 9002
SECONDARY_INSTANCE = "low_freq_debug"

def __init__(self,test_mode:bool,logger,data_store:DataStore) -> None:
self.test_mode = test_mode
self.logger = logger
self.previous_telemetry_data = None
self.data_store = data_store

def get_telemetry(self):
egershonNvidia marked this conversation as resolved.
Show resolved Hide resolved
"""
get the telemetry from secondary telemetry, if it in test mode it get from the simulation
return DataFrame of the telemetry
"""
if self.test_mode:
url = "http://127.0.0.1:9090/csv/xcset/simulated_telemetry"
else:
url = f"http://127.0.0.1:{self.SECONDARY_TELEMETRY_PORT}/csv/xcset/{self.SECONDARY_INSTANCE}"
try:
self.logger.info(f"collecting telemetry from {url}.")
telemetry_data = pd.read_csv(url)
except (pd.errors.ParserError, pd.errors.EmptyDataError, urllib.error.URLError) as connection_error:
self.logger.error("failed to get telemetry data from UFM, fetched url=%s. Error: %s",url,connection_error)
telemetry_data = None
if self.previous_telemetry_data is not None and telemetry_data is not None:
delta = self._get_delta(self.previous_telemetry_data,telemetry_data)
# when we want to keep only delta
if len(delta) > 0:
self.data_store.save(delta,self.data_store.get_filename_delta())
elif telemetry_data is not None:
# when we want to keep the abs
self.previous_telemetry_data = telemetry_data
self.data_store.save(telemetry_data,self.data_store.get_filename_abs())
return telemetry_data

def _get_delta(self, first_df: pd.DataFrame, second_df:pd.DataFrame):
merged_df = pd.merge(second_df, first_df, on=self.BASED_COLUMNS, how='inner', suffixes=('', '_x'))
delta_dataframe = pd.DataFrame()
for index,col in enumerate(second_df.columns):
if col not in self.KEY and not isinstance(merged_df.iat[0,index],str):
col_x = col + "_x"
delta_dataframe[col] = merged_df[col] - merged_df[col_x]
else:
delta_dataframe[col] = second_df[col]
return delta_dataframe
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,10 @@
# provided with the software product.
#

import urllib.error
import urllib
import logging
import http
from constants import PDRConstants as Constants
import requests
import pandas as pd

class UFMCommunicator:
"""
Expand Down Expand Up @@ -59,22 +56,6 @@ def send_request(self, uri, data, method=Constants.POST_METHOD, headers=None):
logging.info("UFM API Request Status: %s, URL: %s",response.status_code, request)
return response

def get_telemetry(self,test_mode):
"""
get the telemetry from secondary telemetry, if it in test mode it get from the simulation
return DataFrame of the telemetry
"""
if test_mode:
url = "http://127.0.0.1:9090/csv/xcset/simulated_telemetry"
else:
url = f"http://127.0.0.1:{Constants.SECONDARY_TELEMETRY_PORT}/csv/xcset/{Constants.SECONDARY_INSTANCE}"
try:
telemetry_data = pd.read_csv(url)
except (pd.errors.ParserError, pd.errors.EmptyDataError, urllib.error.URLError) as error:
logging.error("Failed to get telemetry data from UFM, fetched url=%s. Error: %s",url,error)
telemetry_data = None
return telemetry_data

def send_event(self, message, event_id=Constants.EXTERNAL_EVENT_NOTICE,
external_event_name="PDR Plugin Event", external_event_type="PDR Plugin Event"):
data = {
Expand Down
Loading