Skip to content

Commit

Permalink
issue:4043499: Implement XDR Support for TFS plugin (#259)
Browse files Browse the repository at this point in the history
  • Loading branch information
ananalaghbar authored Sep 26, 2024
1 parent 8f41b36 commit 5d9a699
Show file tree
Hide file tree
Showing 9 changed files with 131 additions and 18 deletions.
4 changes: 4 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,7 @@
path = plugins/UFM_NDT_Plugin/ufm_sim_web_service/ndt-ui/sms-ui-suite
url = git@github.com:Mellanox-lab/sw_mng_ui.git
branch = standalone-suite
[submodule "ufm_sdk_tools"]
path = ufm_sdk_tools
url = git@github.com:Mellanox/ufm_sdk_tools.git
branch = main
1 change: 1 addition & 0 deletions plugins/fluentd_telemetry_plugin/build/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ ARG SRC_BASE_DIR=fluentd_telemetry_plugin

COPY ${SRC_BASE_DIR}/ ${BASE_PATH}/${SRC_BASE_DIR}/
COPY utils/ ${BASE_PATH}/utils/
COPY ufm_sdk_tools/ ${BASE_PATH}/ufm_sdk_tools/

COPY ${SRC_BASE_DIR}/conf/supervisord.conf /etc/supervisor/conf.d/
COPY ${SRC_BASE_DIR}/scripts/init.sh ${SRC_BASE_DIR}/scripts/deinit.sh /
Expand Down
1 change: 1 addition & 0 deletions plugins/fluentd_telemetry_plugin/build/docker_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ pushd ${SCRIPT_DIR}
BUILD_DIR=$(create_out_dir)
cp Dockerfile ${BUILD_DIR}
cp -r ../../../utils ${BUILD_DIR}
cp -r ../../../ufm_sdk_tools ${BUILD_DIR}
cp -r ../../fluentd_telemetry_plugin ${BUILD_DIR}

echo "BUILD_DIR : [${BUILD_DIR}]"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ port = 9001
url = csv/metrics
interval = 30
message_tag_name =
xdr_mode=False
xdr_ports_types=legacy;aggregated;plane

[fluentd-endpoint]
host =
Expand Down
19 changes: 12 additions & 7 deletions plugins/fluentd_telemetry_plugin/src/api/conf_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def _get_routes(self):
self.update_streaming_attributes: {'urls': ["/attributes"], 'methods': ["POST"]}
}

def _set_new_conf(self):
def _set_new_conf(self): # pylint: disable=too-many-branches
new_conf = request.json
sections = self.conf.get_conf_sections()
for section, section_items in new_conf.items():
Expand All @@ -60,7 +60,10 @@ def _set_new_conf(self):
new_section_data = {}
for endpoint in section_items:
for endpoint_item in endpoint_obj_keys:
new_section_data[endpoint_item] = f'{new_section_data.get(endpoint_item, "")},{endpoint.get(endpoint_item,"")}'
endpoint_item_value = endpoint.get(endpoint_item,"")
if endpoint_item == self.conf.UFM_TELEMETRY_ENDPOINT_SECTION_XDR_PORTS_TYPE:
endpoint_item_value = self.conf.UFM_TELEMETRY_ENDPOINT_SECTION_XDR_PORTS_TYPE_SPLITTER.join(endpoint_item_value)
new_section_data[endpoint_item] = f'{new_section_data.get(endpoint_item, "")},{endpoint_item_value}'
for endpoint_item in endpoint_obj_keys:
self.conf.set_item_value(section, endpoint_item, new_section_data.get(endpoint_item)[1:].strip())
else:
Expand Down Expand Up @@ -110,23 +113,25 @@ def get(self): # pylint: disable=too-many-locals, too-many-branches
if section_type == "array":
# in case the section_type is array, we need to collect
# the array elements from the saved comma separated strings
section_value_splitter = section_properties.get('splitter', ',')
section_properties = section_properties.get('items', {}).get("properties", None)
if section_properties:
conf_dict[section] = []
for item_key, item_value in section_items:
item_type = section_properties.get(item_key, None)
if item_type is None:
item = section_properties.get(item_key, None)
if item is None:
raise InvalidConfRequest(f'Failed to get the configurations schema for the item {item_key} '
f'under the section: {section}')
item_type = item_type.get('type', None)
item_values = item_value.split(",")
item_type = item.get('type', None)
item_value_splitter = item.get('splitter', ';')
item_values = item_value.split(section_value_splitter)
for i, value in enumerate(item_values):
try:
arr_element_obj = conf_dict[section][i]
except IndexError:
arr_element_obj = {}
conf_dict[section].append(arr_element_obj)
arr_element_obj[item_key] = Utils.convert_str_to_type(value, item_type)
arr_element_obj[item_key] = Utils.convert_str_to_type(value, item_type, item_value_splitter)
conf_dict[section][i] = arr_element_obj
elif section_type == "object":
section_properties = section_properties.get('properties', None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,20 @@
"interval": {
"type": "integer",
"minimum": 1,
"err_message": "[streaming - interval] attribute should be an integer greater than 0"
"err_message": "[ufm-telemetry-endpoint - interval] attribute should be an integer greater than 0"
},
"xdr_mode": {
"type": "boolean"
},
"xdr_ports_types": {
"type": "array",
"splitter": ";",
"items": {
"type": "string",
"enum": ["legacy", "aggregated", "plane"]
},
"err_message": "[ufm-telemetry-endpoint - xdr_ports_types] attribute should be list consists of at least one of the following types: ['legacy', 'aggregated', 'plane']",
"minItems": 1
},
"message_tag_name": {
"type": "string"
Expand All @@ -51,6 +64,7 @@
],
"additionalProperties": false
},
"splitter": ",",
"minItems": 1
},
"fluentd-endpoint": {
Expand Down
101 changes: 92 additions & 9 deletions plugins/fluentd_telemetry_plugin/src/streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import gzip
import logging
import datetime
from typing import List

import requests
from requests.exceptions import ConnectionError # pylint: disable=redefined-builtin
from prometheus_client.parser import text_string_to_metric_families
Expand All @@ -33,6 +35,7 @@
from utils.config_parser import ConfigParser
from utils.logger import Logger, LOG_LEVELS
from utils.singleton import Singleton
from ufm_sdk_tools.src.xdr_utils import PortType,prepare_port_type_http_telemetry_filter


class UFMTelemetryConstants:
Expand All @@ -50,6 +53,16 @@ class UFMTelemetryConstants:
},{
"name": '--ufm_telemetry_url',
"help": "URL of UFM Telemetry endpoint"
},{
"name": '--ufm_telemetry_xdr_mode',
"help": "Telemetry XDR mode flag, "
"i.e., if True, the enabled ports types in `xdr_ports_types` "
"will be collected from the telemetry and streamed to fluentd"
},{
"name": '--ufm_telemetry_xdr_ports_types',
"help": "Telemetry XDR ports types, "
"i.e., List of XDR ports types that should be collected and streamed, "
"separated by `;`. For example legacy;aggregated;plane"
},{
"name": '--streaming_interval',
"help": "Interval for telemetry streaming in seconds"
Expand Down Expand Up @@ -107,6 +120,9 @@ class UFMTelemetryStreamingConfigParser(ConfigParser):
UFM_TELEMETRY_ENDPOINT_SECTION_URL = "url"
UFM_TELEMETRY_ENDPOINT_SECTION_INTERVAL = "interval"
UFM_TELEMETRY_ENDPOINT_SECTION_MSG_TAG_NAME = "message_tag_name"
UFM_TELEMETRY_ENDPOINT_SECTION_XDR_MODE = "xdr_mode"
UFM_TELEMETRY_ENDPOINT_SECTION_XDR_PORTS_TYPE = "xdr_ports_types"
UFM_TELEMETRY_ENDPOINT_SECTION_XDR_PORTS_TYPE_SPLITTER = ";"

FLUENTD_ENDPOINT_SECTION = "fluentd-endpoint"
FLUENTD_ENDPOINT_SECTION_HOST = "host"
Expand Down Expand Up @@ -143,6 +159,18 @@ def get_telemetry_url(self):
self.UFM_TELEMETRY_ENDPOINT_SECTION_URL,
"csv/metrics")

def get_ufm_telemetry_xdr_mode_flag(self):
return self.get_config_value(self.args.ufm_telemetry_xdr_mode,
self.UFM_TELEMETRY_ENDPOINT_SECTION,
self.UFM_TELEMETRY_ENDPOINT_SECTION_XDR_MODE,
"False")

def get_ufm_telemetry_xdr_ports_types(self):
return self.get_config_value(self.args.ufm_telemetry_xdr_ports_types,
self.UFM_TELEMETRY_ENDPOINT_SECTION,
self.UFM_TELEMETRY_ENDPOINT_SECTION_XDR_PORTS_TYPE,
"legacy;aggregated;plane")

def get_streaming_interval(self):
return self.get_config_value(self.args.streaming_interval,
self.UFM_TELEMETRY_ENDPOINT_SECTION,
Expand Down Expand Up @@ -225,6 +253,7 @@ def get_meta_fields(self):
return aliases, custom


#pylint: disable=too-many-instance-attributes
class UFMTelemetryStreaming(Singleton):
"""
UFMTelemetryStreaming class
Expand All @@ -236,10 +265,13 @@ def __init__(self, conf_parser):
self.config_parser = conf_parser

self.last_streamed_data_sample_timestamp = None
self.port_id_keys = ['node_guid', 'Node_GUID', 'port_guid', 'port_num', 'Port_Number', 'Port']
self.normal_port_id_keys = ['node_guid', 'Node_GUID', 'port_guid', 'port_num', 'Port_Number', 'Port']
self.agg_port_id_keys = ['sys_image_guid', 'aport']
self.port_type_key = 'port_type'
self.port_constants_keys = {
'timestamp': 'timestamp', 'source_id': 'source_id', 'tag': 'tag',
'node_guid': 'node_guid', 'port_guid': 'port_guid',
'sys_image_guid': 'sys_image_guid', 'aport': 'aport',
'port_num': 'port_num', 'node_description': 'node_description',
'm_label': 'm_label', 'port_label': 'port_label', 'status_message': 'status_message',
'Port_Number': 'Port_Number', 'Node_GUID': 'Node_GUID', 'Device_ID': 'Device_ID', 'device_id': 'Device_ID',
Expand Down Expand Up @@ -269,6 +301,14 @@ def ufm_telemetry_port(self):
def ufm_telemetry_url(self):
return self.config_parser.get_telemetry_url()

@property
def ufm_telemetry_xdr_mode_flag(self):
return self.config_parser.get_ufm_telemetry_xdr_mode_flag()

@property
def ufm_telemetry_xdr_ports_types(self):
return self.config_parser.get_ufm_telemetry_xdr_ports_types()

@property
def streaming_interval(self):
return self.config_parser.get_streaming_interval()
Expand All @@ -279,17 +319,24 @@ def ufm_telemetry_endpoints(self):
hosts = self.ufm_telemetry_host.split(splitter)
ports = self.ufm_telemetry_port.split(splitter)
urls = self.ufm_telemetry_url.split(splitter)
xdr_mode = self.ufm_telemetry_xdr_mode_flag.split(splitter)
xdr_ports_types = self.ufm_telemetry_xdr_ports_types.split(splitter)
intervals = self.streaming_interval.split(splitter)
msg_tags = self.fluentd_msg_tag.split(splitter)
endpoints = []
for i, value in enumerate(hosts):
_url = self._append_filters_to_telemetry_url(
urls[i],
Utils.convert_str_to_type(xdr_mode[i], 'boolean'),
xdr_ports_types[i].split(self.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_XDR_PORTS_TYPE_SPLITTER)
)
endpoints.append({
self.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_HOST: value,
self.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_PORT: ports[i],
self.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_URL: urls[i],
self.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_URL: _url,
self.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_INTERVAL: intervals[i],
self.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_MSG_TAG_NAME:
msg_tags[i] if msg_tags[i] else f'{value}:{ports[i]}/{urls[i]}'
msg_tags[i] if msg_tags[i] else f'{value}:{ports[i]}/{_url}'
})
return endpoints

Expand Down Expand Up @@ -342,6 +389,26 @@ def fluent_sender(self):
use_c=_use_c)
return self._fluent_sender

def _append_filters_to_telemetry_url(self, url: str, xdr_mode: bool, port_types: List[str]):
"""
This function constructs and appends filter parameters to the given URL if certain conditions are met.
Parameters:
url (str): The base telemetry URL to which filters may be appended.
xdr_mode (bool): A flag indicating whether extended data record (XDR) mode is enabled.
port_types (List[str]): list of port type names used to generate filters.
Returns:
str: The telemetry URL with appended filter parameters if applicable, or the original URL.
"""
filters = []
if xdr_mode:
filters.append(prepare_port_type_http_telemetry_filter(port_types))
if filters:
filters_sign = '&' if '?' in url else '?'
return f'{url}{filters_sign}{"&".join(filters)}'
return url

def _get_metrics(self, _host, _port, _url, msg_tag):
_host = f'[{_host}]' if Utils.is_ipv6_address(_host) else _host
url = f'http://{_host}:{_port}/{_url}'
Expand Down Expand Up @@ -414,7 +481,7 @@ def _get_filtered_counters(self, counters):
modified_keys[i] = attr_obj.get('name', key)
return modified_keys

def _parse_telemetry_csv_metrics_to_json_with_delta(self, data): # pylint: disable=too-many-locals
def _parse_telemetry_csv_metrics_to_json_with_delta(self, data): # pylint: disable=too-many-locals,too-many-branches
"""
:desc: parsed the data csv input & convert it to list of ports records
each record contains key[s]:value[s] for the port's counters
Expand All @@ -438,9 +505,20 @@ def _parse_telemetry_csv_metrics_to_json_with_delta(self, data): # pylint: disa
is_meta_fields_available = len(self.meta_fields[0]) or len(self.meta_fields[1])
output = []

port_id_keys_indices = []
for port_id_key in self.port_id_keys:
port_id_keys_indices += [i for i, x in enumerate(keys) if x == port_id_key]
normal_port_id_keys_indices = []
aggr_port_id_keys_indices = []
port_type_key_index = -1

normal_port_id_keys_set = set(self.normal_port_id_keys)
agg_port_id_keys_set = set(self.agg_port_id_keys)

for i, key in enumerate(keys):
if key in normal_port_id_keys_set:
normal_port_id_keys_indices.append(i)
if key in agg_port_id_keys_set:
aggr_port_id_keys_indices.append(i)
if key == self.port_type_key and port_type_key_index == -1:
port_type_key_index = i

modified_keys = self._get_filtered_counters(keys)
available_keys_indices = modified_keys.keys()
Expand All @@ -449,6 +527,11 @@ def _parse_telemetry_csv_metrics_to_json_with_delta(self, data): # pylint: disa
# skip the first row since it contains the headers
# skip the last row since its empty row
values = row.split(UFMTelemetryConstants.CSV_ROW_ATTRS_SEPARATOR)
port_id_keys_indices = normal_port_id_keys_indices
if port_type_key_index != -1:
port_type = values[port_type_key_index]
if port_type == PortType.AGGREGATED.value:
port_id_keys_indices = aggr_port_id_keys_indices

# prepare the port_key that will be used as an ID in delta
port_key = ":".join([values[index] for index in port_id_keys_indices])
Expand Down Expand Up @@ -531,7 +614,7 @@ def _parse_telemetry_prometheus_metrics_to_json(self, data): # pylint: disable=
if len(family.samples):
timestamp = family.samples[0].timestamp
for sample in family.samples:
uid = port_key = ":".join([sample.labels.get(key, '') for key in self.port_id_keys])
uid = port_key = ":".join([sample.labels.get(key, '') for key in self.normal_port_id_keys])
uid += f':{str(sample.timestamp)}'
current_row = elements_dict.get(uid, {})
if self.stream_only_new_samples:
Expand Down Expand Up @@ -656,7 +739,7 @@ def stream_data(self, telemetry_endpoint): # pylint: disable=too-many-locals
except Exception as ex: # pylint: disable=broad-except
logging.error("Exception occurred during parsing telemetry data: %s", str(ex))
else:
logging.error("Failed to get the telemetry data metrics")
logging.error("Failed to get the telemetry data metrics for %s", _url)

def _add_streaming_attribute(self, attribute):
if self.streaming_attributes.get(attribute, None) is None:
Expand Down
1 change: 1 addition & 0 deletions ufm_sdk_tools
Submodule ufm_sdk_tools added at 749801
4 changes: 3 additions & 1 deletion utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,12 @@ def get_plugin_port(port_conf_file, default_port_value):
return port

@staticmethod
def convert_str_to_type(value, new_type):
def convert_str_to_type(value, new_type, arr_splitter=','):
if isinstance(value, str):
if new_type == "integer":
value = int(value)
elif new_type == "boolean":
value = value.lower() == 'true'
elif new_type == "array":
value = value.split(arr_splitter)
return value

0 comments on commit 5d9a699

Please sign in to comment.