diff --git a/.github/workflows/tfs_plugin_ci_workflow.yml b/.github/workflows/tfs_plugin_ci_workflow.yml new file mode 100644 index 000000000..23410a53f --- /dev/null +++ b/.github/workflows/tfs_plugin_ci_workflow.yml @@ -0,0 +1,24 @@ +name: TFS Plugin CI Workflow + +on: + push: + paths: + - 'plugins/fluentd_telemetry_plugin/**' +jobs: + pylint: + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@main + + - name: Set up Python + uses: actions/setup-python@main + with: + python-version: 3.9 + + - name: Install dependencies + run: | + pip install -r plugins/fluentd_telemetry_plugin/requirements.txt + pip install pylint + - name: Run PyLint + run: pylint --rcfile=plugins/fluentd_telemetry_plugin/.pylintrc plugins/fluentd_telemetry_plugin diff --git a/plugins/fluentd_telemetry_plugin/.pylintrc b/plugins/fluentd_telemetry_plugin/.pylintrc new file mode 100644 index 000000000..7694ac1d4 --- /dev/null +++ b/plugins/fluentd_telemetry_plugin/.pylintrc @@ -0,0 +1,17 @@ +[MASTER] +init-hook="import os, sys; sys.path.append(os.path.join(os.getcwd(), 'plugins', 'fluentd_telemetry_plugin', 'src')); sys.path.append(os.path.join(os.getcwd(), 'utils'))" + +[MAIN] +max-public-methods=100 + +[DESIGN] +max-attributes=10 + +[MESSAGES CONTROL] +disable=missing-module-docstring,missing-function-docstring,fixme + +[FORMAT] +max-line-length=140 + +[BASIC] +min-public-methods=0 \ No newline at end of file diff --git a/plugins/fluentd_telemetry_plugin/requirements.txt b/plugins/fluentd_telemetry_plugin/requirements.txt index c57a5e855..a7a69a228 100644 --- a/plugins/fluentd_telemetry_plugin/requirements.txt +++ b/plugins/fluentd_telemetry_plugin/requirements.txt @@ -8,4 +8,4 @@ twisted tzlocal<3.0 jsonschema prometheus_client -msgpack \ No newline at end of file +msgpack diff --git a/plugins/fluentd_telemetry_plugin/src/__init__.py b/plugins/fluentd_telemetry_plugin/src/__init__.py new file mode 100644 index 000000000..cfc21cdb1 --- /dev/null +++ b/plugins/fluentd_telemetry_plugin/src/__init__.py @@ -0,0 +1,15 @@ +""" +@copyright: + Copyright (C) Mellanox Technologies Ltd. 2014-2020. ALL RIGHTS RESERVED. + + This software product is a proprietary product of Mellanox Technologies + Ltd. (the "Company") and all right, title, and interest in and to the + software product, including all associated intellectual property rights, + are and shall remain exclusively with the Company. + + This software product is governed by the End User License Agreement + provided with the software product. + +@author: Anan Al-Aghbar +@date: May 9, 2024 +""" diff --git a/plugins/fluentd_telemetry_plugin/src/api/__init__.py b/plugins/fluentd_telemetry_plugin/src/api/__init__.py index c134de4b4..db9a38f80 100644 --- a/plugins/fluentd_telemetry_plugin/src/api/__init__.py +++ b/plugins/fluentd_telemetry_plugin/src/api/__init__.py @@ -1,4 +1,5 @@ class InvalidConfRequest(Exception): + """InvalidConfRequest Exception class""" def __init__(self, message): - Exception.__init__(self,message) \ No newline at end of file + Exception.__init__(self, message) diff --git a/plugins/fluentd_telemetry_plugin/src/api/base_api.py b/plugins/fluentd_telemetry_plugin/src/api/base_api.py index 939dc9e03..3d76e17b9 100644 --- a/plugins/fluentd_telemetry_plugin/src/api/base_api.py +++ b/plugins/fluentd_telemetry_plugin/src/api/base_api.py @@ -1,11 +1,10 @@ -from flask import Flask -from flask_restful import Api from http import HTTPStatus from functools import partial -from utils.json_schema_validator import ValidationError, SchemaValidationError +from flask import Flask +from flask_restful import Api from web_service_error_messages import \ - no_running_streaming_instance,\ - streaming_already_running + NO_RUNNING_STREAMING_INSTANCE,\ + STREAMING_ALREADY_RUNNING from streaming_scheduler import \ NoRunningStreamingInstance,\ @@ -13,8 +12,12 @@ from api import InvalidConfRequest +# pylint: disable=no-name-in-module,import-error +from utils.json_schema_validator import ValidationError, SchemaValidationError + class BaseAPIApplication: + """BaseAPIApplication API Flask Class Base""" def __init__(self): self.app = Flask(__name__) @@ -26,9 +29,9 @@ def __init__(self): def _get_error_handlers(self): return [ (NoRunningStreamingInstance, - lambda e: (no_running_streaming_instance, HTTPStatus.BAD_REQUEST)), + lambda e: (NO_RUNNING_STREAMING_INSTANCE, HTTPStatus.BAD_REQUEST)), (StreamingAlreadyRunning, - lambda e: (streaming_already_running, HTTPStatus.BAD_REQUEST)), + lambda e: (STREAMING_ALREADY_RUNNING, HTTPStatus.BAD_REQUEST)), (InvalidConfRequest, lambda e: (str(e), HTTPStatus.BAD_REQUEST)), (ValidationError, @@ -41,8 +44,8 @@ def _get_error_handlers(self): def _add_error_handlers(self): hdlrs = self._get_error_handlers() - for code_or_exception, f in hdlrs: - self.app.register_error_handler(code_or_exception, f) + for code_or_exception, callback in hdlrs: + self.app.register_error_handler(code_or_exception, callback) @property def application(self): diff --git a/plugins/fluentd_telemetry_plugin/src/api/conf_api.py b/plugins/fluentd_telemetry_plugin/src/api/conf_api.py index cbd99096b..4dbd2da59 100644 --- a/plugins/fluentd_telemetry_plugin/src/api/conf_api.py +++ b/plugins/fluentd_telemetry_plugin/src/api/conf_api.py @@ -1,19 +1,23 @@ import re import json import logging +from http import HTTPStatus from flask import make_response, request from api import InvalidConfRequest from api.base_api import BaseAPIApplication from streamer import UFMTelemetryStreaming from streaming_scheduler import StreamingScheduler + +# pylint: disable=no-name-in-module,import-error from utils.json_schema_validator import validate_schema from utils.utils import Utils class StreamingConfigurationsAPI(BaseAPIApplication): + """StreamingConfigurationsAPI class""" def __init__(self, conf): - super(StreamingConfigurationsAPI, self).__init__() + super(StreamingConfigurationsAPI, self).__init__() # pylint: disable=super-with-arguments self.conf = conf self.scheduler = StreamingScheduler.getInstance() self.streamer = UFMTelemetryStreaming.getInstance() @@ -26,10 +30,10 @@ def __init__(self, conf): def _get_routes(self): return { - self.get: dict(urls=["/"], methods=["GET"]), - self.post: dict(urls=["/"], methods=["POST"]), - self.get_streaming_attributes: dict(urls=["/attributes"], methods=["GET"]), - self.update_streaming_attributes: dict(urls=["/attributes"], methods=["POST"]) + self.get: {'urls': ["/"], 'methods': ["GET"]}, + self.post: {'urls': ["/"], 'methods': ["POST"]}, + self.get_streaming_attributes: {'urls': ["/attributes"], 'methods': ["GET"]}, + self.update_streaming_attributes: {'urls': ["/attributes"], 'methods': ["POST"]} } def _set_new_conf(self): @@ -68,7 +72,7 @@ def _set_new_conf(self): def _validate_required_configurations_on_enable(self): # just checking the required attributes # if one of the attributes below was missing it will throw an exception - fluentd_host = self.conf.get_fluentd_host() + return self.conf.get_fluentd_host() def post(self): # validate the new conf json @@ -89,15 +93,15 @@ def post(self): self.conf.update_config_file(self.conf.config_file) raise ex - def get(self): + def get(self): # pylint: disable=too-many-locals, too-many-branches try: - with open(Utils.get_absolute_path(self.conf_schema_path)) as json_data: + with open(Utils.get_absolute_path(self.conf_schema_path), encoding='utf-8') as json_data: schema = json.load(json_data) properties = schema.get('properties', None) if properties is None: raise InvalidConfRequest("Failed to get the configurations schema properties") conf_dict = {} - for section in self.conf.get_conf_sections(): + for section in self.conf.get_conf_sections(): # pylint: disable=too-many-nested-blocks section_properties = properties.get(section, None) if section_properties is None: raise InvalidConfRequest("Failed to get the configurations schema for the section: " + section) @@ -116,13 +120,12 @@ def get(self): f'under the section: {section}') item_type = item_type.get('type', None) item_values = item_value.split(",") - for i in range(len(item_values)): + for i, value in enumerate(item_values): try: arr_element_obj = conf_dict[section][i] except IndexError: arr_element_obj = {} conf_dict[section].append(arr_element_obj) - value = item_values[i] arr_element_obj[item_key] = Utils.convert_str_to_type(value, item_type) conf_dict[section][i] = arr_element_obj elif section_type == "object": @@ -143,8 +146,10 @@ def get(self): raise InvalidConfRequest(f'Failed to get the configurations, unsupported type ' f'{section_type}: for the section: {section}') return make_response(conf_dict) - except InvalidConfRequest as e: - logging.error("Error occurred while getting the current streaming configurations: " + str(e)) + except InvalidConfRequest as ex: + err_msg = f"Error occurred while getting the current streaming configurations: {str(ex)}" + logging.error(err_msg) + return make_response(err_msg, HTTPStatus.INTERNAL_SERVER_ERROR) def get_streaming_attributes(self): return make_response(self.streamer.streaming_attributes) diff --git a/plugins/fluentd_telemetry_plugin/src/api/streaming_monitoring_stats_api.py b/plugins/fluentd_telemetry_plugin/src/api/streaming_monitoring_stats_api.py index 0e592ff8d..fd67e65b2 100644 --- a/plugins/fluentd_telemetry_plugin/src/api/streaming_monitoring_stats_api.py +++ b/plugins/fluentd_telemetry_plugin/src/api/streaming_monitoring_stats_api.py @@ -18,13 +18,15 @@ class StreamingMonitoringStatsAPI(BaseAPIApplication): + """StreamingMonitoringStatsAPI API class""" + def __init__(self): - super(StreamingMonitoringStatsAPI, self).__init__() + super(StreamingMonitoringStatsAPI, self).__init__() # pylint: disable=super-with-arguments self.streamer = UFMTelemetryStreaming.getInstance() def _get_routes(self): return { - self.get: dict(urls=["/"], methods=["GET"]) + self.get: {'urls': ["/"], 'methods': ["GET"]} } def get(self): diff --git a/plugins/fluentd_telemetry_plugin/src/app.py b/plugins/fluentd_telemetry_plugin/src/app.py index 06351f6ec..7862d6639 100644 --- a/plugins/fluentd_telemetry_plugin/src/app.py +++ b/plugins/fluentd_telemetry_plugin/src/app.py @@ -17,11 +17,8 @@ import sys sys.path.append(os.getcwd()) +# pylint: disable=wrong-import-position import logging -from utils.flask_server import run_api -from utils.args_parser import ArgsParser -from utils.logger import Logger -from utils.utils import Utils from web_service import UFMTelemetryFluentdStreamingAPI from streamer import \ UFMTelemetryStreaming,\ @@ -29,6 +26,12 @@ UFMTelemetryConstants from streaming_scheduler import StreamingScheduler +# pylint: disable=no-name-in-module,import-error +from utils.flask_server import run_api +from utils.args_parser import ArgsParser +from utils.logger import Logger +from utils.utils import Utils + def _init_logs(config_parser): # init logs configs @@ -44,14 +47,14 @@ def _init_logs(config_parser): # init app config parser & load config files args = ArgsParser.parse_args("UFM Telemetry Streaming to fluentd", UFMTelemetryConstants.args_list) - config_parser = UFMTelemetryStreamingConfigParser(args) + _config_parser = UFMTelemetryStreamingConfigParser(args) - _init_logs(config_parser) + _init_logs(_config_parser) - streamer = None + STREAMER = None try: - streamer = UFMTelemetryStreaming.getInstance(config_parser) - if config_parser.get_enable_streaming_flag(): + STREAMER = UFMTelemetryStreaming.getInstance(_config_parser) + if _config_parser.get_enable_streaming_flag(): scheduler = StreamingScheduler.getInstance() job_id = scheduler.start_streaming() logging.info("Streaming has been started successfully") @@ -59,16 +62,17 @@ def _init_logs(config_parser): logging.warning("Streaming was not started, need to enable the streaming & set the required configurations") except ValueError as ex: - logging.warning("Streaming was not started, need to enable the streaming & set the required configurations. "+ str(ex)) - except Exception as ex: - logging.error(f'Streaming was not started due to the following error: {str(ex)}') + logging.warning("Streaming was not started, need to enable the streaming & set the required configurations. %s" + , str(ex)) + except Exception as ex: # pylint: disable=broad-except + logging.error('Streaming was not started due to the following error: %s', str(ex)) - if streamer: + if STREAMER: try: - app = UFMTelemetryFluentdStreamingAPI(config_parser) + app = UFMTelemetryFluentdStreamingAPI(_config_parser) port = Utils.get_plugin_port('/config/tfs_httpd_proxy.conf', 8981) run_api(app=app, port_number=int(port)) - except Exception as ex: - logging.error(f'Streaming server was not started due to the following error: {str(ex)}') + except Exception as ex: # pylint: disable=broad-except + logging.error('Streaming server was not started due to the following error: %s', str(ex)) else: - logging.error(f'Streaming server was not started.') + logging.error('Streaming server was not started.') diff --git a/plugins/fluentd_telemetry_plugin/src/fluentbit_writer.py b/plugins/fluentd_telemetry_plugin/src/fluentbit_writer.py index f2c191046..59495654a 100644 --- a/plugins/fluentd_telemetry_plugin/src/fluentbit_writer.py +++ b/plugins/fluentd_telemetry_plugin/src/fluentbit_writer.py @@ -15,13 +15,16 @@ @author: Anan Al-Aghbar @date: Mar 10, 2024 -This file originally is based on the clxcli writer https://gitlab-master.nvidia.com/telemetry/Collectx/-/blob/master/server/exporter/fluentbit_writer.py +This file originally is based on the clxcli writer +https://gitlab-master.nvidia.com/telemetry/Collectx/-/blob/master/server/exporter/fluentbit_writer.py """ -import msgpack import time import datetime -from ctypes import * -from timeit import default_timer as timer +from ctypes import Structure, CDLL, POINTER, pointer,\ + cast, c_char_p, c_void_p, c_int +import msgpack + +# pylint: disable=no-name-in-module,import-error from utils.logger import Logger, LOG_LEVELS from utils.fluentd.fluent import asyncsender as asycsender from utils.utils import Utils @@ -39,32 +42,37 @@ def str2c_char_ptr(str_val): def load_api_lib_from_path(path): if not path: - return [None, None] + return None try: api_lib = CDLL(path) - Logger.log_message("opened raw_msgpack API lib: %s" % api_lib, LOG_LEVELS.DEBUG) - return [api_lib, path] - except Exception as ex: + Logger.log_message(f'opened raw_msgpack API lib: {api_lib}', LOG_LEVELS.DEBUG) + return api_lib + except Exception as ex: # pylint: disable=broad-except Logger.log_message(f'Failed to load the API: {path}, due to the error: {str(ex)}', LOG_LEVELS.DEBUG) - return [None, None] + return None class LoadFBLibFailure(Exception): - pass + """LoadFBLibFailure Exception""" class InitFBLibFailure(Exception): - pass + """InitFBLibFailure Exception""" class ParamPair(Structure): + """ParamPair Structure class""" _fields_ = [("name", c_char_p), ("val", c_char_p)] class PluginParams(Structure): + """ + PluginParams Structure class + for initializing and managing the FB plugin's params + """ _fields_ = [("param_count", c_char_p), ("params", POINTER(ParamPair))] - def __init__(self, params_count, in_plugin_params): + def __init__(self, params_count, in_plugin_params): # pylint: disable=super-init-not-called elems = (ParamPair * params_count)() self.params = cast(elems, POINTER(ParamPair)) self.param_count = params_count @@ -76,10 +84,15 @@ def __init__(self, params_count, in_plugin_params): i += 1 -class FluentBitCWriter(object): +class FluentBitCWriter: + """ + FluentBitCWriter class + Wrapper class for sending the FB messages via a given plugin + """ def __init__(self, context): self.initialized = False + self.raw_msgpack_api_ctx = None self.lib = None self.lib_path = None @@ -172,7 +185,7 @@ def close_connection(self): def init_fb_writer(host, port, tag_prefix, timeout=120, use_c=True): if use_c: - [lib, lib_path] = load_api_lib_from_path(LIB_RAW_MSGPACK_API_SO_PATH) + lib = load_api_lib_from_path(LIB_RAW_MSGPACK_API_SO_PATH) ctx = { 'plugin_name': 'forward', 'plugin_host': host, @@ -182,28 +195,28 @@ def init_fb_writer(host, port, tag_prefix, timeout=120, use_c=True): 'tag_prefix': tag_prefix } fluent_writer = FluentBitCWriter(ctx) - init_msg = f'Fluent sender is initialized in C' + init_msg = 'Fluent sender is initialized in C' else: # use the fluentd's python sender fluent_writer = asycsender.FluentSender(tag_prefix, host, int(port), timeout=timeout) - init_msg = f'Fluent sender is initialized in Python' + init_msg = 'Fluent sender is initialized in Python' Logger.log_message(init_msg, LOG_LEVELS.DEBUG) return fluent_writer if __name__ == '__main__': # Example on how to set & use the FB writer - _use_c = True - _host = DEFAULT_FB_HOST - _port = DEFAULT_FB_PORT - _tag = 'UFM_Telemetry_Streaming' + _USE_C = True + _HOST = DEFAULT_FB_HOST + _PORT = DEFAULT_FB_PORT + _TAG = 'UFM_Telemetry_Streaming' msg_record = Utils.read_json_from_file('../tests/message_samples/small_telemetry.json') - writer = init_fb_writer(_host, _port, _tag, use_c=_use_c) + writer = init_fb_writer(_HOST, _PORT, _TAG, use_c=_USE_C) ##### print(f"Start streaming on {datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%dT%H:%M:%S%z')}, " - f"{'Using the C writer' if _use_c else 'Using the Python writer'}..") + f"{'Using the C writer' if _USE_C else 'Using the Python writer'}..") ################## - writer.write(_tag, msg_record) - if _use_c: + writer.write(_TAG, msg_record) + if _USE_C: time.sleep(30) print('Streaming is completed') diff --git a/plugins/fluentd_telemetry_plugin/src/monitor_streaming_mgr.py b/plugins/fluentd_telemetry_plugin/src/monitor_streaming_mgr.py index 0995104f3..0ce8b9196 100644 --- a/plugins/fluentd_telemetry_plugin/src/monitor_streaming_mgr.py +++ b/plugins/fluentd_telemetry_plugin/src/monitor_streaming_mgr.py @@ -17,6 +17,11 @@ class MonitorStreamingMgr: + """ + MonitorStreamingMgr class + To manage the streaming statistics metrics/attributes + that will be used to monitor the streaming pipeline + """ def __init__(self): @@ -67,4 +72,4 @@ def update_streaming_metrics(self, endpoint, **kwargs): metric_obj.labels(endpoint).set(value) def get_streaming_metrics_text(self): - return generate_latest(self.metrics_registry) \ No newline at end of file + return generate_latest(self.metrics_registry) diff --git a/plugins/fluentd_telemetry_plugin/src/streamer.py b/plugins/fluentd_telemetry_plugin/src/streamer.py index c2669f22a..a1251d688 100644 --- a/plugins/fluentd_telemetry_plugin/src/streamer.py +++ b/plugins/fluentd_telemetry_plugin/src/streamer.py @@ -16,29 +16,28 @@ @date: Nov 23, 2021 """ import os -import sys -from datetime import time - -sys.path.append(os.getcwd()) +import time import json import gzip -import requests import logging -import time import datetime -from requests.exceptions import ConnectionError +import requests +from requests.exceptions import ConnectionError # pylint: disable=redefined-builtin from prometheus_client.parser import text_string_to_metric_families from fluentbit_writer import init_fb_writer -from utils.utils import Utils +from monitor_streaming_mgr import MonitorStreamingMgr +# pylint: disable=no-name-in-module,import-error +from utils.utils import Utils from utils.args_parser import ArgsParser from utils.config_parser import ConfigParser from utils.logger import Logger, LOG_LEVELS from utils.singleton import Singleton -from monitor_streaming_mgr import MonitorStreamingMgr class UFMTelemetryConstants: + """UFMTelemetryConstants Class""" + PLUGIN_NAME = "UFM_Telemetry_Streaming" args_list = [ @@ -92,6 +91,11 @@ class UFMTelemetryConstants: class UFMTelemetryStreamingConfigParser(ConfigParser): + """ + UFMTelemetryStreamingConfigParser class to manage + the TFS configurations + """ + # for debugging #config_file = "../conf/fluentd_telemetry_plugin.cfg" @@ -216,29 +220,34 @@ def get_meta_fields(self): "value": value }) else: - logging.warning("The meta field type : {} is not from the supported types list [alias, add]".format(meta_field_type)) - return aliases,custom + logging.warning("The meta field type : %s is not from the supported types list [alias, add]", + meta_field_type) + return aliases, custom class UFMTelemetryStreaming(Singleton): + """ + UFMTelemetryStreaming class + to manage/control the streaming + """ - def __init__(self, config_parser): + def __init__(self, conf_parser): - self.config_parser = config_parser + self.config_parser = conf_parser self.last_streamed_data_sample_timestamp = None self.port_id_keys = ['node_guid', 'Node_GUID', 'port_guid', 'port_num', 'Port_Number', 'Port'] self.port_constants_keys = { - 'timestamp': 'timestamp', 'source_id': 'source_id', 'tag': 'tag', 'node_guid': 'node_guid', 'port_guid': 'port_guid', - 'port_num': 'port_num', 'node_description': 'node_description','m_label': 'm_label', 'port_label': 'port_label', 'status_message': 'status_message', + 'timestamp': 'timestamp', 'source_id': 'source_id', 'tag': 'tag', + 'node_guid': 'node_guid', 'port_guid': 'port_guid', + 'port_num': 'port_num', 'node_description': 'node_description', + 'm_label': 'm_label', 'port_label': 'port_label', 'status_message': 'status_message', 'Port_Number': 'Port_Number', 'Node_GUID': 'Node_GUID', 'Device_ID': 'Device_ID', 'device_id': 'Device_ID', 'mvcr_sensor_name': 'mvcr_sensor_name', 'mtmp_sensor_name': 'mtmp_sensor_name', 'switch_serial_number': 'switch_serial_number', 'switch_part_number': 'switch_part_number' } self.last_streamed_data_sample_per_port = {} - self.TIMESTAMP_CSV_FIELD_KEY = 'timestamp' - self.streaming_metrics_mgr = MonitorStreamingMgr() self.streaming_attributes_file = "/config/tfs_streaming_attributes.json" # this path on the docker @@ -273,13 +282,14 @@ def ufm_telemetry_endpoints(self): intervals = self.streaming_interval.split(splitter) msg_tags = self.fluentd_msg_tag.split(splitter) endpoints = [] - for i in range(len(hosts)): + for i, value in enumerate(hosts): endpoints.append({ - self.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_HOST: hosts[i], + self.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_HOST: value, self.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_PORT: ports[i], self.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_URL: urls[i], self.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_INTERVAL: intervals[i], - self.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_MSG_TAG_NAME: msg_tags[i] if msg_tags[i] else f'{hosts[i]}:{ports[i]}/{urls[i]}', + self.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_MSG_TAG_NAME: + msg_tags[i] if msg_tags[i] else f'{value}:{ports[i]}/{urls[i]}' }) return endpoints @@ -335,9 +345,9 @@ def fluent_sender(self): def _get_metrics(self, _host, _port, _url, msg_tag): _host = f'[{_host}]' if Utils.is_ipv6_address(_host) else _host url = f'http://{_host}:{_port}/{_url}' - logging.info(f'Send UFM Telemetry Endpoint Request, Method: GET, URL: {url}') + logging.info('Send UFM Telemetry Endpoint Request, Method: GET, URL: %s', url) try: - response = requests.get(url) + response = requests.get(url) # pylint: disable=missing-timeout response.raise_for_status() actual_content_size = len(response.content) expected_content_size = int(response.headers.get('Content-Length', actual_content_size)) @@ -357,8 +367,8 @@ def _get_metrics(self, _host, _port, _url, msg_tag): self.streaming_metrics_mgr.telemetry_received_response_size_bytes_key: actual_content_size }) return response.text - except Exception as e: - logging.error(e) + except Exception as ex: # pylint: disable=broad-except + logging.error(ex) return None def _append_meta_fields_to_dict(self, dic): @@ -370,7 +380,7 @@ def _append_meta_fields_to_dict(self, dic): value = dic.get(alias_key, None) if value is None: logging.warning( - "The alias : {} does not exist in the telemetry response keys: {}".format(alias_key, str(keys))) + "The alias : %s does not exist in the telemetry response keys: %s", alias_key, str(keys)) continue dic[alias_value] = value for custom_field in custom_meta_fields: @@ -404,7 +414,7 @@ def _get_filtered_counters(self, counters): modified_keys[i] = attr_obj.get('name', key) return modified_keys - def _parse_telemetry_csv_metrics_to_json_with_delta(self, data): + def _parse_telemetry_csv_metrics_to_json_with_delta(self, data): # pylint: disable=too-many-locals """ :desc: parsed the data csv input & convert it to list of ports records each record contains key[s]:value[s] for the port's counters @@ -429,8 +439,8 @@ def _parse_telemetry_csv_metrics_to_json_with_delta(self, data): output = [] port_id_keys_indices = [] - for pIDKey in self.port_id_keys: - port_id_keys_indices += [i for i, x in enumerate(keys) if x == pIDKey] + for port_id_key in self.port_id_keys: + port_id_keys_indices += [i for i, x in enumerate(keys) if x == port_id_key] modified_keys = self._get_filtered_counters(keys) available_keys_indices = modified_keys.keys() @@ -513,7 +523,7 @@ def _parse_telemetry_csv_metrics_to_json(self, data): return self._parse_telemetry_csv_metrics_to_json_with_delta(data) return self._parse_telemetry_csv_metrics_to_json_without_delta(data) - def _parse_telemetry_prometheus_metrics_to_json(self, data): + def _parse_telemetry_prometheus_metrics_to_json(self, data): # pylint: disable=too-many-locals,too-many-branches elements_dict = {} timestamp = current_port_values = None num_of_counters = 0 @@ -521,9 +531,9 @@ def _parse_telemetry_prometheus_metrics_to_json(self, data): if len(family.samples): timestamp = family.samples[0].timestamp for sample in family.samples: - id = port_key = ":".join([sample.labels.get(key, '') for key in self.port_id_keys]) - id += f':{str(sample.timestamp)}' - current_row = elements_dict.get(id, {}) + uid = port_key = ":".join([sample.labels.get(key, '') for key in self.port_id_keys]) + uid += f':{str(sample.timestamp)}' + current_row = elements_dict.get(uid, {}) if self.stream_only_new_samples: current_port_values = self.last_streamed_data_sample_per_port.get(port_key, {}) @@ -553,10 +563,9 @@ def _parse_telemetry_prometheus_metrics_to_json(self, data): if attr_obj and attr_obj.get('enabled', False) and len(value): current_row[attr_obj.get("name", key)] = value current_num_of_counters = len(current_row) - if current_num_of_counters > num_of_counters: - num_of_counters = current_num_of_counters + num_of_counters = max(num_of_counters, current_num_of_counters) current_row = self._append_meta_fields_to_dict(current_row) - elements_dict[id] = current_row + elements_dict[uid] = current_row #### if self.stream_only_new_samples: self.last_streamed_data_sample_per_port[port_key] = current_port_values @@ -564,8 +573,9 @@ def _parse_telemetry_prometheus_metrics_to_json(self, data): return list(elements_dict.values()), timestamp, num_of_counters def _stream_data_to_fluentd(self, data_to_stream, fluentd_msg_tag=''): - logging.info(f'Streaming to Fluentd IP: {self.fluentd_host} port: {self.fluentd_port} timeout: {self.fluentd_timeout}') - st = time.time() + logging.info('Streaming to Fluentd IP: %s port: %s timeout: %s', + self.fluentd_host, self.fluentd_port, self.fluentd_timeout) + start_time = time.time() try: fluentd_message = { "timestamp": datetime.datetime.fromtimestamp(int(time.time())).strftime('%Y-%m-%d %H:%M:%S'), @@ -578,6 +588,8 @@ def _stream_data_to_fluentd(self, data_to_stream, fluentd_msg_tag=''): _fluentd_host = self.fluentd_host _fluentd_host = f'[{_fluentd_host}]' if Utils.is_ipv6_address(_fluentd_host) else _fluentd_host compressed = gzip.compress(json.dumps(fluentd_message).encode('utf-8')) + + # pylint: disable=missing-timeout res = requests.post( url=f'http://{_fluentd_host}:{self.fluentd_port}/' f'{UFMTelemetryConstants.PLUGIN_NAME}.{fluentd_msg_tag}', @@ -587,22 +599,22 @@ def _stream_data_to_fluentd(self, data_to_stream, fluentd_msg_tag=''): else: plugin_fluent_protocol = 'FORWARD' self.fluent_sender.write(fluentd_msg_tag, fluentd_message) - et = time.time() - streaming_time = round(et-st, 6) + end_time = time.time() + streaming_time = round(end_time-start_time, 6) self.streaming_metrics_mgr.update_streaming_metrics(fluentd_msg_tag, **{ self.streaming_metrics_mgr.streaming_time_seconds_key: streaming_time }) - logging.info(f'Finished Streaming to Fluentd Host: {self.fluentd_host} port: {self.fluentd_port} in ' - f'{streaming_time} Seconds using {plugin_fluent_protocol} plugin protocol') - except ConnectionError as e: - logging.error(f'Failed to connect to stream destination due to the error :{str(e)}') - except Exception as e: - logging.error(f'Failed to stream the data due to the error: {str(e)}') + logging.info('Finished Streaming to Fluentd Host: %s port: %s in %.2f Seconds using %s plugin protocol', + self.fluentd_host, self.fluentd_port, streaming_time, plugin_fluent_protocol) + except ConnectionError as ex: + logging.error('Failed to connect to stream destination due to the error : %s', str(ex)) + except Exception as ex: # pylint: disable=broad-except + logging.error('Failed to stream the data due to the error: %s', str(ex)) def _check_data_prometheus_format(self, telemetry_data): return telemetry_data and telemetry_data.startswith('#') - def stream_data(self, telemetry_endpoint): + def stream_data(self, telemetry_endpoint): # pylint: disable=too-many-locals _host = telemetry_endpoint.get(self.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_HOST) _port = telemetry_endpoint.get(self.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_PORT) _url = telemetry_endpoint.get(self.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_URL) @@ -611,14 +623,14 @@ def stream_data(self, telemetry_endpoint): if telemetry_data: try: ufm_telemetry_is_prometheus_format = self._check_data_prometheus_format(telemetry_data) - logging.info(f'Start Processing The Received Response From {msg_tag}') - st = time.time() + logging.info('Start Processing The Received Response From %s', msg_tag) + start_time = time.time() data_to_stream, new_data_timestamp, num_of_counters = self._parse_telemetry_prometheus_metrics_to_json(telemetry_data) \ if ufm_telemetry_is_prometheus_format else \ self._parse_telemetry_csv_metrics_to_json(telemetry_data) - et = time.time() + end_time = time.time() data_len = len(data_to_stream) - resp_process_time = round(et - st, 6) + resp_process_time = round(end_time - start_time, 6) self.streaming_metrics_mgr.update_streaming_metrics(msg_tag, **{ self.streaming_metrics_mgr.telemetry_response_process_time_seconds_key: resp_process_time }) @@ -629,11 +641,9 @@ def stream_data(self, telemetry_endpoint): self.streaming_metrics_mgr.num_of_streamed_ports_in_last_msg_key: data_len, self.streaming_metrics_mgr.num_of_processed_counters_in_last_msg_key: num_of_counters }) - logging.info( - f'Processing of endpoint {msg_tag} Completed In: ' - f'{resp_process_time} Seconds. ' - f'({data_len}) Ports, ' - f'({num_of_counters}) Counters Were Handled') + logging.info('Processing of endpoint %s Completed In: %.2f Seconds. ' + '(%d) Ports, (%d) Counters Were Handled', + msg_tag, resp_process_time, data_len, num_of_counters) if self.bulk_streaming_flag: self._stream_data_to_fluentd(data_to_stream, msg_tag) else: @@ -641,10 +651,10 @@ def stream_data(self, telemetry_endpoint): self._stream_data_to_fluentd(row, msg_tag) self.last_streamed_data_sample_timestamp = new_data_timestamp elif self.stream_only_new_samples: - logging.info(f"No new samples in endpoint {msg_tag}, nothing to stream") + logging.info('No new samples in endpoint %s, nothing to stream', msg_tag) - except Exception as e: - logging.error("Exception occurred during parsing telemetry data: "+ str(e)) + except Exception as ex: # pylint: disable=broad-except + logging.error("Exception occurred during parsing telemetry data: %s", str(ex)) else: logging.error("Failed to get the telemetry data metrics") @@ -656,18 +666,19 @@ def _add_streaming_attribute(self, attribute): 'enabled': True } - def init_streaming_attributes(self): + def init_streaming_attributes(self): # pylint: disable=too-many-locals Logger.log_message('Updating The streaming attributes', LOG_LEVELS.DEBUG) # load the saved attributes self.streaming_attributes = self._get_saved_streaming_attributes() telemetry_endpoints = self.ufm_telemetry_endpoints processed_endpoints = {} - for endpoint in telemetry_endpoints: + for endpoint in telemetry_endpoints: # pylint: disable=too-many-nested-blocks _host = endpoint.get(self.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_HOST) _port = endpoint.get(self.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_PORT) _url = endpoint.get(self.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_URL) _msg_tag = endpoint.get(self.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_MSG_TAG_NAME) - endpoint_id = f'{_host}:{_port}:{_url.split("?")[0]}' # the ID of the endpoint is the full URL without filters like the shading,etc... + # the ID of the endpoint is the full URL without filters like the shading,etc... + endpoint_id = f'{_host}:{_port}:{_url.split("?")[0]}' is_processed = processed_endpoints.get(endpoint_id) if not is_processed: telemetry_data = self._get_metrics(_host, _port, _url, _msg_tag) @@ -713,10 +724,10 @@ def _convert_str_to_num(self, str_val): if __name__ == "__main__": # init app args - args = ArgsParser.parse_args("UFM Telemetry Streaming to fluentd", UFMTelemetryConstants.args_list) + _args = ArgsParser.parse_args("UFM Telemetry Streaming to fluentd", UFMTelemetryConstants.args_list) # init app config parser & load config files - config_parser = UFMTelemetryStreamingConfigParser(args) + config_parser = UFMTelemetryStreamingConfigParser(_args) # init logs configs logs_file_name = config_parser.get_logs_file_name() @@ -727,7 +738,7 @@ def _convert_str_to_num(self, str_val): telemetry_streaming = UFMTelemetryStreaming(config_parser) - #streaming_scheduler = StreamingScheduler.getInstance() - #streaming_scheduler.start_streaming(telemetry_streaming.stream_data, telemetry_streaming.streaming_interval) + # streaming_scheduler = StreamingScheduler.getInstance() + # streaming_scheduler.start_streaming(telemetry_streaming.stream_data, telemetry_streaming.streaming_interval) - telemetry_streaming.stream_data() + # telemetry_streaming.stream_data() diff --git a/plugins/fluentd_telemetry_plugin/src/streaming_scheduler.py b/plugins/fluentd_telemetry_plugin/src/streaming_scheduler.py index 7b235a76b..9e8cc93b8 100644 --- a/plugins/fluentd_telemetry_plugin/src/streaming_scheduler.py +++ b/plugins/fluentd_telemetry_plugin/src/streaming_scheduler.py @@ -14,27 +14,30 @@ @date: Jan 25, 2022 """ +from datetime import datetime from streamer import UFMTelemetryStreaming -from utils.singleton import Singleton from apscheduler.schedulers.background import BackgroundScheduler -from apscheduler.schedulers.base import STATE_RUNNING -from datetime import datetime +# pylint: disable=no-name-in-module,import-error +from utils.singleton import Singleton class StreamingAlreadyRunning(Exception): - pass + """StreamingAlreadyRunning Exception class""" class NoRunningStreamingInstance(Exception): - pass + """NoRunningStreamingInstance Exception class""" class StreamingScheduler(Singleton): + """ + StreamingScheduler class + """ + def __init__(self): self.scheduler = BackgroundScheduler() self.streaming_jobs = None - pass def start_streaming(self, update_attributes=False): streamer = UFMTelemetryStreaming.getInstance() @@ -46,7 +49,9 @@ def start_streaming(self, update_attributes=False): for telemetry_endpoint in streamer.ufm_telemetry_endpoints: interval = int(telemetry_endpoint[streamer.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_INTERVAL]) streaming_job = self.scheduler.add_job(streamer.stream_data, 'interval', - name=telemetry_endpoint[streamer.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_MSG_TAG_NAME], + name=telemetry_endpoint[ + streamer.config_parser.UFM_TELEMETRY_ENDPOINT_SECTION_MSG_TAG_NAME + ], args=[telemetry_endpoint], seconds=interval, next_run_time=datetime.now()) @@ -62,6 +67,7 @@ def stop_streaming(self): self.scheduler.remove_job(job.id) self.streaming_jobs = None return True + return False def get_streaming_state(self): return self.scheduler.state diff --git a/plugins/fluentd_telemetry_plugin/src/web_service.py b/plugins/fluentd_telemetry_plugin/src/web_service.py index 9c9f7fb4b..226039f70 100644 --- a/plugins/fluentd_telemetry_plugin/src/web_service.py +++ b/plugins/fluentd_telemetry_plugin/src/web_service.py @@ -23,12 +23,17 @@ class UFMTelemetryFluentdStreamingAPI(DispatcherMiddleware): + """ + UFMTelemetryFluentdStreamingAPI Flask API class wrapper extends DispatcherMiddleware + """ - def __init__(self,config_parser): + def __init__(self, config_parser): frontend = BaseAPIApplication() self.streaming_conf = StreamingConfigurationsAPI(config_parser) self.streaming_stats = StreamingMonitoringStatsAPI() + + # pylint: disable=super-with-arguments super(UFMTelemetryFluentdStreamingAPI, self).__init__( frontend.application, { "/conf": self.streaming_conf.application, diff --git a/plugins/fluentd_telemetry_plugin/src/web_service_error_messages.py b/plugins/fluentd_telemetry_plugin/src/web_service_error_messages.py index 8cc523e74..914250e89 100644 --- a/plugins/fluentd_telemetry_plugin/src/web_service_error_messages.py +++ b/plugins/fluentd_telemetry_plugin/src/web_service_error_messages.py @@ -1,2 +1,2 @@ -no_running_streaming_instance = 'No running streaming job instance' -streaming_already_running = 'Streaming job is already running' +NO_RUNNING_STREAMING_INSTANCE = 'No running streaming job instance' +STREAMING_ALREADY_RUNNING = 'Streaming job is already running'