Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

task:4151977Log analyzer analyze telemetry logs #277

Merged
merged 9 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions plugins/ufm_log_analyzer_plugin/src/loganalyze/log_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from loganalyze.log_analyzers.console_log_analyzer import ConsoleLogAnalyzer
from loganalyze.log_analyzers.rest_api_log_analyzer import RestApiAnalyzer
from loganalyze.log_analyzers.link_flapping_analyzer import LinkFlappingAnalyzer
from loganalyze.log_analyzers.ibdiagnet2_port_counters_analyzer import Ibdiagnet2PortCountersAnalyzer

from loganalyze.pdf_creator import PDFCreator
from loganalyze.utils.common import delete_files_by_types
Expand Down Expand Up @@ -252,7 +253,7 @@ def create_analyzer(parsed_args, full_extracted_logs_list,
in the full report.
Returns the created analyzer
"""
if log_name in full_extracted_logs_list:
if any(os.path.basename(log) == log_name for log in full_extracted_logs_list):
log_csvs = get_files_in_dest_by_type(parsed_args.destination,
log_name,
parsed_args.extract_level)
Expand Down Expand Up @@ -305,7 +306,7 @@ def create_analyzer(parsed_args, full_extracted_logs_list,
log.LOGGER.debug("Starting analyzing the data")
partial_create_analyzer = partial(create_analyzer,
parsed_args=args,
full_extracted_logs_list=full_logs_list,
full_extracted_logs_list=logs_to_work_with,
ufm_top_analyzer_obj=ufm_top_analyzer)

# Creating the analyzer for each log
Expand All @@ -328,6 +329,12 @@ def create_analyzer(parsed_args, full_extracted_logs_list,

rest_api_log_analyzer = partial_create_analyzer(log_name="rest_api.log",
analyzer_clc=RestApiAnalyzer)

ibdianget_2_ports_primary_analyzer = partial_create_analyzer(log_name="ufm_logs_ibdiagnet2_port_counters.log",
analyzer_clc=Ibdiagnet2PortCountersAnalyzer)

ibdianget_2_ports_secondary_analyzer = partial_create_analyzer(log_name="secondary_telemetry_ibdiagnet2_port_counters.log",
analyzer_clc=Ibdiagnet2PortCountersAnalyzer)
second_telemetry_samples = get_files_in_dest_by_type(args.destination,
"secondary_",
1000,
Expand Down Expand Up @@ -380,6 +387,9 @@ def create_analyzer(parsed_args, full_extracted_logs_list,
text_to_show_in_pdf += os.linesep + os.linesep + "More than 5 events burst over a minute:" \
+ os.linesep + critical_events_text

# Adding telemetry stats to the PDF
for cur_telemetry in [ibdianget_2_ports_primary_analyzer, ibdianget_2_ports_secondary_analyzer]:
text_to_show_in_pdf += cur_telemetry.text_to_show_in_pdf
# PDF creator gets all the images and to add to the report
pdf = PDFCreator(pdf_path, pdf_header, png_images, text_to_show_in_pdf)
pdf.created_pdf()
Expand All @@ -388,6 +398,11 @@ def create_analyzer(parsed_args, full_extracted_logs_list,
for image, title in images_and_title_to_present:
log.LOGGER.info(f"{title}: {image}")
log.LOGGER.info(f"Summary PDF was created! you can open here at {pdf_path}")

if args.interactive:
import IPython
IPython.embed()

boazhaim marked this conversation as resolved.
Show resolved Hide resolved
# Clean some unended files created during run
files_types_to_delete = set()
files_types_to_delete.add("png") #png images created for PDF report
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# pylint: disable=missing-function-docstring
# pylint: disable=missing-module-docstring

import logging
import os
import csv
import shutil
Expand All @@ -21,15 +22,17 @@
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib
import matplotlib.dates as mdates

from loganalyze.log_analyzers.constants import DataConstants
import loganalyze.logger as log
logging.getLogger('matplotlib').setLevel(logging.ERROR) # This makes sure the user does not see the warning from plotting
matplotlib.use('Agg') # This allows to run the tool on servers without graphic card/headless

pd.set_option("display.max_colwidth", None)
warnings.filterwarnings("ignore")


class BaseImageCreator:
# Setting the graph time interval to 1 hour
# This is out side of the constructor since
Expand All @@ -47,7 +50,7 @@ def __init__(self, dest_image_path):
self._funcs_for_analysis = set()

def _save_data_based_on_timestamp(
self, data_to_plot, x_label, y_label, title
self, data_to_plot, x_label, y_label, title, large_sample=False
):
with plt.ion():
log.LOGGER.debug(f"saving {title}")
Expand All @@ -61,7 +64,10 @@ def _save_data_based_on_timestamp(
# Set the locator to show ticks every hour and the formatter to
# include both date and time
ax = plt.gca()
ax.xaxis.set_major_locator(mdates.HourLocator())
if large_sample:
ax.xaxis.set_major_locator(mdates.HourLocator(interval=24)) # Show labels every hour
else:
ax.xaxis.set_major_locator(mdates.HourLocator())
ax.xaxis.set_minor_locator(
mdates.MinuteLocator(interval=15)
) # Add minor ticks every 15 minutes
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
#
# Copyright © 2013-2024 NVIDIA CORPORATION & AFFILIATES. ALL RIGHTS RESERVED.
#
# This software product is a proprietary product of Nvidia Corporation and its affiliates
# (the "Company") and all right, title, and interest in and to the software
# product, including all associated intellectual property rights, are and
# shall remain exclusively with the Company.
#
# This software product is governed by the End User License Agreement
# provided with the software product.
#

import os
from typing import List
import warnings
import pandas as pd
from loganalyze.log_analyzers.base_analyzer import BaseAnalyzer


class Ibdiagnet2PortCountersAnalyzer(BaseAnalyzer):
def __init__(self, logs_csvs: List[str], hours: int, dest_image_path: str, sort_timestamp=False):
super().__init__(logs_csvs, hours, dest_image_path, sort_timestamp)
self._iteration_time_data = None
self._iteration_time_stats = None
self.text_to_show_in_pdf = ""
# This will make sure all the extra columns are int
extra_columns = ['extra1', 'extra2', 'extra3', 'extra4', 'extra5']
for col in extra_columns:
self._log_data_sorted[col] = pd.to_numeric(
self._log_data_sorted[col],
errors='coerce'
).astype('Int64')
self._funcs_for_analysis = {self.plot_iteration_time_over_time}
# Based on the log path, decided if this is primary or secondary
if "ufm_logs" in logs_csvs[0]:
haithamwj marked this conversation as resolved.
Show resolved Hide resolved
self.telemetry_type = "primary"
elif "secondary_telemetry" in logs_csvs[0]:
self.telemetry_type = "secondary"
else:
self.telemetry_type = "Unknown_telemetry_type"

def get_collectx_versions(self):
unique_collectx_versions = self._log_data_sorted[self._log_data_sorted['type'] == 'collectx_version']['data'].unique()
return unique_collectx_versions

def get_number_of_switches_and_ports(self):
"""
Generate summary statistics for 'total_devices_ports' data.
This function calculates the average, maximum, minimum
for switches, CAs, routers, and ports.
"""
filtered_data = self._log_data_sorted[self._log_data_sorted['type'] == 'total_devices_ports']

ports_numbers_columns = ['extra1', 'extra3', 'extra5']
filtered_data['extra135'] = pd.to_numeric(
filtered_data[ports_numbers_columns].stack(), errors='coerce'
).groupby(level=0).sum(min_count=1)

columns_of_interest = ['data', 'extra2', 'extra4', 'extra135']
column_mapping = {
'data': 'Number of Switches',
'extra2': 'CAs',
'extra4': 'Routers',
'extra135': 'Ports'
}

summary_stats = []

for col in columns_of_interest:
numeric_col = pd.to_numeric(filtered_data[col], errors='coerce')
non_zero_col = numeric_col[numeric_col != 0]

avg = round(non_zero_col.mean()) if not non_zero_col.empty else 0
max_val = int(non_zero_col.max()) if not non_zero_col.empty else 0
min_val = int(non_zero_col.min()) if not non_zero_col.empty else 0
count = int(non_zero_col.count())

summary_stats.append({
'Category': column_mapping.get(col, col),
'Average': avg,
'Maximum': max_val,
'Minimum': min_val,
'Total Rows (Non-Zero)': count
})

summary_df = pd.DataFrame(summary_stats)

return summary_df

def analyze_iteration_time(self, threshold=0.15):
"""
Analyze rows where 'type' is 'iteration_time'.
Keep only 'type', 'timestamp', and 'data' columns.
Calculate statistics for the 'data' column, including timestamps for max and min.
Also, find gaps of at least 2 minutes with no data and allow filtering by a threshold.

Parameters:
- threshold (float): Minimum value to consider for analysis. Default is 0.5 seconds.
"""
filtered_data = self._log_data_sorted[self._log_data_sorted['type'] == 'iteration_time']
boazhaim marked this conversation as resolved.
Show resolved Hide resolved
filtered_data = filtered_data[['type', 'timestamp', 'data']]
filtered_data['data'] = pd.to_numeric(filtered_data['data'], errors='coerce')

filtered_data = filtered_data[filtered_data['data'] >= threshold]
filtered_data['timestamp'] = pd.to_datetime(filtered_data['timestamp'], errors='coerce')
filtered_data = filtered_data.dropna(subset=['timestamp'])

if not filtered_data['data'].empty:
average = filtered_data['data'].mean()
max_value = filtered_data['data'].max()
min_value = filtered_data['data'].min()

max_timestamp = filtered_data.loc[filtered_data['data'] == max_value, 'timestamp'].iloc[0]
min_timestamp = filtered_data.loc[filtered_data['data'] == min_value, 'timestamp'].iloc[0]
else:
average = max_value = min_value = 0.0
max_timestamp = min_timestamp = None

stats = {
'Average': average,
'Maximum': max_value,
'Max Timestamp': max_timestamp,
'Minimum': min_value,
'Min Timestamp': min_timestamp,
'Total Rows': filtered_data['data'].count()
}
stats_df = pd.DataFrame([stats])
self._iteration_time_data = filtered_data
self._iteration_time_stats = stats_df
return stats_df

def get_last_iterations_time_stats(self):
return self._iteration_time_stats

def plot_iteration_time_over_time(self):
if self._iteration_time_data is None:
self.analyze_iteration_time()

self._iteration_time_data.set_index('timestamp', inplace=True)

# Plot the data using the existing method
with warnings.catch_warnings():
warnings.filterwarnings("ignore", ".*Locator attempting to generate.*")
haithamwj marked this conversation as resolved.
Show resolved Hide resolved
self._save_data_based_on_timestamp(
data_to_plot=self._iteration_time_data['data'],
x_label='Timestamp',
y_label='Iteration Time (s)',
title=f'{self.telemetry_type} Iteration Time',
large_sample=True)

def get_number_of_core_dumps(self):
core_dumps = self._log_data_sorted[self._log_data_sorted['type'] == 'timeout_dump_core']
return len(core_dumps)

def full_analysis(self):
txt_for_pdf = os.linesep + os.linesep
txt_for_pdf += f"{self.telemetry_type} info: {os.linesep}"
txt_for_pdf += f"Found the following collectx version(s):{os.linesep}"
for collectx_version in self.get_collectx_versions():
txt_for_pdf += f"{collectx_version}, "
txt_for_pdf += os.linesep
txt_for_pdf += f"Found {self.get_number_of_core_dumps()} core dumps{os.linesep}"
txt_for_pdf += str(self.get_number_of_switches_and_ports())
iteration_stats = self.get_last_iterations_time_stats()
if iteration_stats is None:
self.analyze_iteration_time()
iteration_stats = self.get_last_iterations_time_stats()
txt_for_pdf += f"Iteration time stats:{os.linesep}"
txt_for_pdf += str(iteration_stats)
self.text_to_show_in_pdf = txt_for_pdf
print(f"stats for {self.telemetry_type}:")
print(self.get_last_iterations_time_stats())
print(self.get_number_of_switches_and_ports())
print(f"Collectx versions {self.get_collectx_versions()}")

return super().full_analysis()
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,34 @@

TIMEOUT_DUMP_CORE_REGEX = re.compile(r"^timeout: the monitored command dumped core$")

TOTAL_SWITCH_PORTS_REGEX = re.compile(r"^.*Total switches\/ports \[(\d+)\/(\d+)\]\,.*$")
TOTAL_SWITCH_PORTS_REGEX = re.compile(r"^.*Total switches\/ports \[(\d+)\/(\d+)\]\, CAs\/ports \[(\d+)\/(\d+)\]\, Routers\/ports \[(\d+)\/(\d+)\]\s*$")

COLLECTX_VERSION_REGEX = re.compile(r"^\[ExportAPI\] Collectx version ([\d\.]+)$")

def iteration_time(match: Match):
iteration_time_sec = match.group(1)
timestamp = match.group(2)
return ("iteration_time", timestamp, iteration_time_sec, None)
return ("iteration_time", timestamp, iteration_time_sec, None, None, None, None, None)

def timeout_dump_core(_: Match):
return ("timeout_dump_core", None, None, None)
return ("timeout_dump_core", None, None, None, None, None, None, None)

def total_switch_ports(match: Match):
total_switches = match.group(1)
total_ports = match.group(2)
return ("total_switch_ports", None, total_switches, total_ports)
total_switch_ports = match.group(2)
total_cas = match.group(3)
total_cas_ports = match.group(4)
total_routers = match.group(5)
total_routers_ports = match.group(6)
return ("total_devices_ports", None, total_switches, total_switch_ports,\
total_cas, total_cas_ports,\
total_routers, total_routers_ports)

def collectx_version(match:Match):
collectx_version_str = match.group(1)
return ("collectx_version", None, collectx_version_str, None)
return ("collectx_version", None, collectx_version_str, None, None, None, None, None)

ibdiagnet2_headers = ("type", "timestamp", "data", "extra")
ibdiagnet2_headers = ("type", "timestamp", "data", "extra1", "extra2", "extra3", "extra4", "extra5")
boazhaim marked this conversation as resolved.
Show resolved Hide resolved

ibdiagnet2_primary_log_regex_cls = \
RegexAndHandlers("ufm_logs_ibdiagnet2_port_counters.log", ibdiagnet2_headers)
Expand Down
Loading