From 5d9a699e8accc754a7e3c33efbd967c9b7a5fcfa Mon Sep 17 00:00:00 2001 From: ananalaghbar <79898567+ananalaghbar@users.noreply.github.com> Date: Thu, 26 Sep 2024 17:10:38 +0300 Subject: [PATCH] issue:4043499: Implement XDR Support for TFS plugin (#259) --- .gitmodules | 4 + .../fluentd_telemetry_plugin/build/Dockerfile | 1 + .../build/docker_build.sh | 1 + .../conf/fluentd_telemetry_plugin.cfg | 2 + .../src/api/conf_api.py | 19 ++-- .../src/schemas/set_conf.schema.json | 16 ++- .../fluentd_telemetry_plugin/src/streamer.py | 101 ++++++++++++++++-- ufm_sdk_tools | 1 + utils/utils.py | 4 +- 9 files changed, 131 insertions(+), 18 deletions(-) create mode 160000 ufm_sdk_tools diff --git a/.gitmodules b/.gitmodules index dd922c4ab..188561b24 100644 --- a/.gitmodules +++ b/.gitmodules @@ -10,3 +10,7 @@ path = plugins/UFM_NDT_Plugin/ufm_sim_web_service/ndt-ui/sms-ui-suite url = git@github.com:Mellanox-lab/sw_mng_ui.git branch = standalone-suite +[submodule "ufm_sdk_tools"] + path = ufm_sdk_tools + url = git@github.com:Mellanox/ufm_sdk_tools.git + branch = main diff --git a/plugins/fluentd_telemetry_plugin/build/Dockerfile b/plugins/fluentd_telemetry_plugin/build/Dockerfile index 4ec6a45c6..9045285fe 100644 --- a/plugins/fluentd_telemetry_plugin/build/Dockerfile +++ b/plugins/fluentd_telemetry_plugin/build/Dockerfile @@ -8,6 +8,7 @@ ARG SRC_BASE_DIR=fluentd_telemetry_plugin COPY ${SRC_BASE_DIR}/ ${BASE_PATH}/${SRC_BASE_DIR}/ COPY utils/ ${BASE_PATH}/utils/ +COPY ufm_sdk_tools/ ${BASE_PATH}/ufm_sdk_tools/ COPY ${SRC_BASE_DIR}/conf/supervisord.conf /etc/supervisor/conf.d/ COPY ${SRC_BASE_DIR}/scripts/init.sh ${SRC_BASE_DIR}/scripts/deinit.sh / diff --git a/plugins/fluentd_telemetry_plugin/build/docker_build.sh b/plugins/fluentd_telemetry_plugin/build/docker_build.sh index 3555b06fb..9caf8baac 100755 --- a/plugins/fluentd_telemetry_plugin/build/docker_build.sh +++ b/plugins/fluentd_telemetry_plugin/build/docker_build.sh @@ -102,6 +102,7 @@ pushd ${SCRIPT_DIR} BUILD_DIR=$(create_out_dir) cp Dockerfile ${BUILD_DIR} cp -r ../../../utils ${BUILD_DIR} +cp -r ../../../ufm_sdk_tools ${BUILD_DIR} cp -r ../../fluentd_telemetry_plugin ${BUILD_DIR} echo "BUILD_DIR : [${BUILD_DIR}]" diff --git a/plugins/fluentd_telemetry_plugin/conf/fluentd_telemetry_plugin.cfg b/plugins/fluentd_telemetry_plugin/conf/fluentd_telemetry_plugin.cfg index 9b0800472..9288988d9 100644 --- a/plugins/fluentd_telemetry_plugin/conf/fluentd_telemetry_plugin.cfg +++ b/plugins/fluentd_telemetry_plugin/conf/fluentd_telemetry_plugin.cfg @@ -4,6 +4,8 @@ port = 9001 url = csv/metrics interval = 30 message_tag_name = +xdr_mode=False +xdr_ports_types=legacy;aggregated;plane [fluentd-endpoint] host = diff --git a/plugins/fluentd_telemetry_plugin/src/api/conf_api.py b/plugins/fluentd_telemetry_plugin/src/api/conf_api.py index 4dbd2da59..c5fb1165e 100644 --- a/plugins/fluentd_telemetry_plugin/src/api/conf_api.py +++ b/plugins/fluentd_telemetry_plugin/src/api/conf_api.py @@ -36,7 +36,7 @@ def _get_routes(self): self.update_streaming_attributes: {'urls': ["/attributes"], 'methods': ["POST"]} } - def _set_new_conf(self): + def _set_new_conf(self): # pylint: disable=too-many-branches new_conf = request.json sections = self.conf.get_conf_sections() for section, section_items in new_conf.items(): @@ -60,7 +60,10 @@ def _set_new_conf(self): new_section_data = {} for endpoint in section_items: for endpoint_item in endpoint_obj_keys: - new_section_data[endpoint_item] = f'{new_section_data.get(endpoint_item, "")},{endpoint.get(endpoint_item,"")}' + endpoint_item_value = endpoint.get(endpoint_item,"") + if endpoint_item == self.conf.UFM_TELEMETRY_ENDPOINT_SECTION_XDR_PORTS_TYPE: + endpoint_item_value = self.conf.UFM_TELEMETRY_ENDPOINT_SECTION_XDR_PORTS_TYPE_SPLITTER.join(endpoint_item_value) + new_section_data[endpoint_item] = f'{new_section_data.get(endpoint_item, "")},{endpoint_item_value}' for endpoint_item in endpoint_obj_keys: self.conf.set_item_value(section, endpoint_item, new_section_data.get(endpoint_item)[1:].strip()) else: @@ -110,23 +113,25 @@ def get(self): # pylint: disable=too-many-locals, too-many-branches if section_type == "array": # in case the section_type is array, we need to collect # the array elements from the saved comma separated strings + section_value_splitter = section_properties.get('splitter', ',') section_properties = section_properties.get('items', {}).get("properties", None) if section_properties: conf_dict[section] = [] for item_key, item_value in section_items: - item_type = section_properties.get(item_key, None) - if item_type is None: + item = section_properties.get(item_key, None) + if item is None: raise InvalidConfRequest(f'Failed to get the configurations schema for the item {item_key} ' f'under the section: {section}') - item_type = item_type.get('type', None) - item_values = item_value.split(",") + item_type = item.get('type', None) + item_value_splitter = item.get('splitter', ';') + item_values = item_value.split(section_value_splitter) for i, value in enumerate(item_values): try: arr_element_obj = conf_dict[section][i] except IndexError: arr_element_obj = {} conf_dict[section].append(arr_element_obj) - arr_element_obj[item_key] = Utils.convert_str_to_type(value, item_type) + arr_element_obj[item_key] = Utils.convert_str_to_type(value, item_type, item_value_splitter) conf_dict[section][i] = arr_element_obj elif section_type == "object": section_properties = section_properties.get('properties', None) diff --git a/plugins/fluentd_telemetry_plugin/src/schemas/set_conf.schema.json b/plugins/fluentd_telemetry_plugin/src/schemas/set_conf.schema.json index f037793d2..39876d78f 100644 --- a/plugins/fluentd_telemetry_plugin/src/schemas/set_conf.schema.json +++ b/plugins/fluentd_telemetry_plugin/src/schemas/set_conf.schema.json @@ -37,7 +37,20 @@ "interval": { "type": "integer", "minimum": 1, - "err_message": "[streaming - interval] attribute should be an integer greater than 0" + "err_message": "[ufm-telemetry-endpoint - interval] attribute should be an integer greater than 0" + }, + "xdr_mode": { + "type": "boolean" + }, + "xdr_ports_types": { + "type": "array", + "splitter": ";", + "items": { + "type": "string", + "enum": ["legacy", "aggregated", "plane"] + }, + "err_message": "[ufm-telemetry-endpoint - xdr_ports_types] attribute should be list consists of at least one of the following types: ['legacy', 'aggregated', 'plane']", + "minItems": 1 }, "message_tag_name": { "type": "string" @@ -51,6 +64,7 @@ ], "additionalProperties": false }, + "splitter": ",", "minItems": 1 }, "fluentd-endpoint": { diff --git a/plugins/fluentd_telemetry_plugin/src/streamer.py b/plugins/fluentd_telemetry_plugin/src/streamer.py index a1251d688..5afb91d00 100644 --- a/plugins/fluentd_telemetry_plugin/src/streamer.py +++ b/plugins/fluentd_telemetry_plugin/src/streamer.py @@ -21,6 +21,8 @@ import gzip import logging import datetime +from typing import List + import requests from requests.exceptions import ConnectionError # pylint: disable=redefined-builtin from prometheus_client.parser import text_string_to_metric_families @@ -33,6 +35,7 @@ from utils.config_parser import ConfigParser from utils.logger import Logger, LOG_LEVELS from utils.singleton import Singleton +from ufm_sdk_tools.src.xdr_utils import PortType,prepare_port_type_http_telemetry_filter class UFMTelemetryConstants: @@ -50,6 +53,16 @@ class UFMTelemetryConstants: },{ "name": '--ufm_telemetry_url', "help": "URL of UFM Telemetry endpoint" + },{ + "name": '--ufm_telemetry_xdr_mode', + "help": "Telemetry XDR mode flag, " + "i.e., if True, the enabled ports types in `xdr_ports_types` " + "will be collected from the telemetry and streamed to fluentd" + },{ + "name": '--ufm_telemetry_xdr_ports_types', + "help": "Telemetry XDR ports types, " + "i.e., List of XDR ports types that should be collected and streamed, " + "separated by `;`. For example legacy;aggregated;plane" },{ "name": '--streaming_interval', "help": "Interval for telemetry streaming in seconds" @@ -107,6 +120,9 @@ class UFMTelemetryStreamingConfigParser(ConfigParser): UFM_TELEMETRY_ENDPOINT_SECTION_URL = "url" UFM_TELEMETRY_ENDPOINT_SECTION_INTERVAL = "interval" UFM_TELEMETRY_ENDPOINT_SECTION_MSG_TAG_NAME = "message_tag_name" + UFM_TELEMETRY_ENDPOINT_SECTION_XDR_MODE = "xdr_mode" + UFM_TELEMETRY_ENDPOINT_SECTION_XDR_PORTS_TYPE = "xdr_ports_types" + UFM_TELEMETRY_ENDPOINT_SECTION_XDR_PORTS_TYPE_SPLITTER = ";" FLUENTD_ENDPOINT_SECTION = "fluentd-endpoint" FLUENTD_ENDPOINT_SECTION_HOST = "host" @@ -143,6 +159,18 @@ def get_telemetry_url(self): self.UFM_TELEMETRY_ENDPOINT_SECTION_URL, "csv/metrics") + def get_ufm_telemetry_xdr_mode_flag(self): + return self.get_config_value(self.args.ufm_telemetry_xdr_mode, + self.UFM_TELEMETRY_ENDPOINT_SECTION, + self.UFM_TELEMETRY_ENDPOINT_SECTION_XDR_MODE, + "False") + + def get_ufm_telemetry_xdr_ports_types(self): + return self.get_config_value(self.args.ufm_telemetry_xdr_ports_types, + self.UFM_TELEMETRY_ENDPOINT_SECTION, + self.UFM_TELEMETRY_ENDPOINT_SECTION_XDR_PORTS_TYPE, + "legacy;aggregated;plane") + def get_streaming_interval(self): return self.get_config_value(self.args.streaming_interval, self.UFM_TELEMETRY_ENDPOINT_SECTION, @@ -225,6 +253,7 @@ def get_meta_fields(self): return aliases, custom +#pylint: disable=too-many-instance-attributes class UFMTelemetryStreaming(Singleton): """ UFMTelemetryStreaming class @@ -236,10 +265,13 @@ def __init__(self, conf_parser): self.config_parser = conf_parser self.last_streamed_data_sample_timestamp = None - self.port_id_keys = ['node_guid', 'Node_GUID', 'port_guid', 'port_num', 'Port_Number', 'Port'] + self.normal_port_id_keys = ['node_guid', 'Node_GUID', 'port_guid', 'port_num', 'Port_Number', 'Port'] + self.agg_port_id_keys = ['sys_image_guid', 'aport'] + self.port_type_key = 'port_type' self.port_constants_keys = { 'timestamp': 'timestamp', 'source_id': 'source_id', 'tag': 'tag', 'node_guid': 'node_guid', 'port_guid': 'port_guid', + 'sys_image_guid': 'sys_image_guid', 'aport': 'aport', 'port_num': 'port_num', 'node_description': 'node_description', 'm_label': 'm_label', 'port_label': 'port_label', 'status_message': 'status_message', 'Port_Number': 'Port_Number', 'Node_GUID': 'Node_GUID', 'Device_ID': 'Device_ID', 'device_id': 'Device_ID', @@ -269,6 +301,14 @@ def ufm_telemetry_port(self): def ufm_telemetry_url(self): return self.config_parser.get_telemetry_url() + @property + def ufm_telemetry_xdr_mode_flag(self): + return self.config_parser.get_ufm_telemetry_xdr_mode_flag() + + @property + def ufm_telemetry_xdr_ports_types(self): + return self.config_parser.get_ufm_telemetry_xdr_ports_types() + @property def streaming_interval(self): return self.config_parser.get_streaming_interval() @@ -279,17 +319,24 @@ def ufm_telemetry_endpoints(self): hosts = self.ufm_telemetry_host.split(splitter) ports = self.ufm_telemetry_port.split(splitter) urls = self.ufm_telemetry_url.split(splitter) + xdr_mode = self.ufm_telemetry_xdr_mode_flag.split(splitter) + xdr_ports_types = self.ufm_telemetry_xdr_ports_types.split(splitter) intervals = self.streaming_interval.split(splitter) msg_tags = self.fluentd_msg_tag.split(splitter) endpoints = [] for i, value in enumerate(hosts): + _url = self._append_filters_to_telemetry_url( + urls[i], + Utils.convert_str_to_type(xdr_mode[i], 'boolean'), + xdr_ports_types[i].split(self.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_XDR_PORTS_TYPE_SPLITTER) + ) endpoints.append({ self.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_HOST: value, self.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_PORT: ports[i], - self.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_URL: urls[i], + self.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_URL: _url, self.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_INTERVAL: intervals[i], self.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_MSG_TAG_NAME: - msg_tags[i] if msg_tags[i] else f'{value}:{ports[i]}/{urls[i]}' + msg_tags[i] if msg_tags[i] else f'{value}:{ports[i]}/{_url}' }) return endpoints @@ -342,6 +389,26 @@ def fluent_sender(self): use_c=_use_c) return self._fluent_sender + def _append_filters_to_telemetry_url(self, url: str, xdr_mode: bool, port_types: List[str]): + """ + This function constructs and appends filter parameters to the given URL if certain conditions are met. + + Parameters: + url (str): The base telemetry URL to which filters may be appended. + xdr_mode (bool): A flag indicating whether extended data record (XDR) mode is enabled. + port_types (List[str]): list of port type names used to generate filters. + + Returns: + str: The telemetry URL with appended filter parameters if applicable, or the original URL. + """ + filters = [] + if xdr_mode: + filters.append(prepare_port_type_http_telemetry_filter(port_types)) + if filters: + filters_sign = '&' if '?' in url else '?' + return f'{url}{filters_sign}{"&".join(filters)}' + return url + def _get_metrics(self, _host, _port, _url, msg_tag): _host = f'[{_host}]' if Utils.is_ipv6_address(_host) else _host url = f'http://{_host}:{_port}/{_url}' @@ -414,7 +481,7 @@ def _get_filtered_counters(self, counters): modified_keys[i] = attr_obj.get('name', key) return modified_keys - def _parse_telemetry_csv_metrics_to_json_with_delta(self, data): # pylint: disable=too-many-locals + def _parse_telemetry_csv_metrics_to_json_with_delta(self, data): # pylint: disable=too-many-locals,too-many-branches """ :desc: parsed the data csv input & convert it to list of ports records each record contains key[s]:value[s] for the port's counters @@ -438,9 +505,20 @@ def _parse_telemetry_csv_metrics_to_json_with_delta(self, data): # pylint: disa is_meta_fields_available = len(self.meta_fields[0]) or len(self.meta_fields[1]) output = [] - port_id_keys_indices = [] - for port_id_key in self.port_id_keys: - port_id_keys_indices += [i for i, x in enumerate(keys) if x == port_id_key] + normal_port_id_keys_indices = [] + aggr_port_id_keys_indices = [] + port_type_key_index = -1 + + normal_port_id_keys_set = set(self.normal_port_id_keys) + agg_port_id_keys_set = set(self.agg_port_id_keys) + + for i, key in enumerate(keys): + if key in normal_port_id_keys_set: + normal_port_id_keys_indices.append(i) + if key in agg_port_id_keys_set: + aggr_port_id_keys_indices.append(i) + if key == self.port_type_key and port_type_key_index == -1: + port_type_key_index = i modified_keys = self._get_filtered_counters(keys) available_keys_indices = modified_keys.keys() @@ -449,6 +527,11 @@ def _parse_telemetry_csv_metrics_to_json_with_delta(self, data): # pylint: disa # skip the first row since it contains the headers # skip the last row since its empty row values = row.split(UFMTelemetryConstants.CSV_ROW_ATTRS_SEPARATOR) + port_id_keys_indices = normal_port_id_keys_indices + if port_type_key_index != -1: + port_type = values[port_type_key_index] + if port_type == PortType.AGGREGATED.value: + port_id_keys_indices = aggr_port_id_keys_indices # prepare the port_key that will be used as an ID in delta port_key = ":".join([values[index] for index in port_id_keys_indices]) @@ -531,7 +614,7 @@ def _parse_telemetry_prometheus_metrics_to_json(self, data): # pylint: disable= if len(family.samples): timestamp = family.samples[0].timestamp for sample in family.samples: - uid = port_key = ":".join([sample.labels.get(key, '') for key in self.port_id_keys]) + uid = port_key = ":".join([sample.labels.get(key, '') for key in self.normal_port_id_keys]) uid += f':{str(sample.timestamp)}' current_row = elements_dict.get(uid, {}) if self.stream_only_new_samples: @@ -656,7 +739,7 @@ def stream_data(self, telemetry_endpoint): # pylint: disable=too-many-locals except Exception as ex: # pylint: disable=broad-except logging.error("Exception occurred during parsing telemetry data: %s", str(ex)) else: - logging.error("Failed to get the telemetry data metrics") + logging.error("Failed to get the telemetry data metrics for %s", _url) def _add_streaming_attribute(self, attribute): if self.streaming_attributes.get(attribute, None) is None: diff --git a/ufm_sdk_tools b/ufm_sdk_tools new file mode 160000 index 000000000..749801037 --- /dev/null +++ b/ufm_sdk_tools @@ -0,0 +1 @@ +Subproject commit 74980103703b112217d7ca2f2fa5d8adb43fb287 diff --git a/utils/utils.py b/utils/utils.py index 9933cbc44..b8094f327 100644 --- a/utils/utils.py +++ b/utils/utils.py @@ -97,10 +97,12 @@ def get_plugin_port(port_conf_file, default_port_value): return port @staticmethod - def convert_str_to_type(value, new_type): + def convert_str_to_type(value, new_type, arr_splitter=','): if isinstance(value, str): if new_type == "integer": value = int(value) elif new_type == "boolean": value = value.lower() == 'true' + elif new_type == "array": + value = value.split(arr_splitter) return value