diff --git a/graph/common.py b/graph/common.py index 7a9978d..e6b81ea 100644 --- a/graph/common.py +++ b/graph/common.py @@ -3,6 +3,6 @@ def fatal(reason): """Print the error and exit 1.""" - sys.stderr.write("Fatal: {}\n".format(reason)) + sys.stderr.write(f"Fatal: {reason}\n") sys.stderr.flush() sys.exit(1) diff --git a/graph/graph.py b/graph/graph.py index b367752..4fcadd7 100644 --- a/graph/graph.py +++ b/graph/graph.py @@ -1,4 +1,4 @@ -from typing import Optional +from __future__ import annotations import matplotlib import matplotlib.pyplot as plt @@ -36,7 +36,7 @@ def __init__( square=False, show_source_file=None, ) -> None: - self.ax2: Optional[Axes] = None + self.ax2: Axes | None = None self.args = args self.fig, self.ax = plt.subplots() self.dpi = 100 diff --git a/graph/trace.py b/graph/trace.py index b4bae4f..33cd509 100644 --- a/graph/trace.py +++ b/graph/trace.py @@ -132,7 +132,7 @@ def get_metric_unit(self, metric_type: Metrics): def get_all_metrics(self, metric_type: Metrics, filter=None) -> list[MonitorMetric]: """Return all metrics of a given type.""" metrics = [] - for _, metric in self.get_monitoring_metric(metric_type).items(): + for metric in self.get_monitoring_metric(metric_type).values(): for component_name, component in metric.items(): if not filter: metrics.append(component) @@ -349,7 +349,7 @@ def get_psu_power(self): power = 0 if psus: power = [0] * len(psus[next(iter(psus))].get_samples()) - for _, psu in psus.items(): + for psu in psus.values(): count = 0 for value in psu.get_mean(): power[count] = power[count] + value diff --git a/hwbench/bench/benchmark.py b/hwbench/bench/benchmark.py index f792b3f..cc8c56d 100644 --- a/hwbench/bench/benchmark.py +++ b/hwbench/bench/benchmark.py @@ -112,28 +112,20 @@ def pre_run(self): p = self.parameters cpu_location = "" if p.get_pinned_cpu(): - if isinstance(p.get_pinned_cpu(), (int, str)): - cpu_location = " on CPU {:3d}".format(p.get_pinned_cpu()) + if isinstance(p.get_pinned_cpu(), int | str): + cpu_location = f" on CPU {p.get_pinned_cpu():3d}" elif isinstance(p.get_pinned_cpu(), list): - cpu_location = " on CPU [{}]".format(h.cpu_list_to_range(p.get_pinned_cpu())) + cpu_location = f" on CPU [{h.cpu_list_to_range(p.get_pinned_cpu())}]" else: - h.fatal("Unsupported get_pinned_cpu() format :{}".format(type(p.get_pinned_cpu()))) + h.fatal(f"Unsupported get_pinned_cpu() format :{type(p.get_pinned_cpu())}") monitoring = "" if self.parameters.get_monitoring(): monitoring = "(M)" print( - "[{}] {}/{}/{}{}: {:3d} stressor{} for {}s{}".format( - p.get_name(), - self.engine_module.get_engine().get_name(), - self.engine_module.get_name(), - p.get_engine_module_parameter(), - monitoring, - p.get_engine_instances_count(), - cpu_location, - p.get_runtime(), - status, - ) + f"[{p.get_name()}] {self.engine_module.get_engine().get_name()}/" + f"{self.engine_module.get_name()}/{p.get_engine_module_parameter()}{monitoring}: " + f"{p.get_engine_instances_count():3d} stressor{cpu_location} for {p.get_runtime()}s{status}" ) def post_run(self, run): diff --git a/hwbench/bench/benchmarks.py b/hwbench/bench/benchmarks.py index b9da53e..4971829 100644 --- a/hwbench/bench/benchmarks.py +++ b/hwbench/bench/benchmarks.py @@ -1,7 +1,8 @@ +from __future__ import annotations + import datetime import time from datetime import timedelta -from typing import Optional from ..environment.hardware import BaseHardware from ..utils import helpers as h @@ -101,7 +102,7 @@ def parse_jobs_config(self, validate_parameters=True): h.fatal("hosting_cpu_cores is not module hosting_cpu_cores_scaling !") pinned_cpu = [] while len(hosting_cpu_cores): - for step in range(steps): + for _step in range(steps): for cpu in hosting_cpu_cores.pop(): pinned_cpu.append(cpu) self.__schedule_benchmarks( @@ -111,7 +112,7 @@ def parse_jobs_config(self, validate_parameters=True): validate_parameters, ) elif hosting_cpu_cores_scaling == "iterate": - for iteration in range(len(hosting_cpu_cores)): + for _iteration in range(len(hosting_cpu_cores)): # Pick the last CPU of the list pinned_cpu = hosting_cpu_cores.pop() self.__schedule_benchmarks(job, stressor_range_scaling, pinned_cpu, validate_parameters) @@ -242,7 +243,7 @@ def run(self): print(f"hwbench: [{bench_name}]: started at {datetime.datetime.utcnow()}") # Save each benchmark result - results["{}_{}".format(benchmark.get_parameters().get_name(), benchmark.get_job_number())] = benchmark.run() + results[f"{benchmark.get_parameters().get_name()}_{benchmark.get_job_number()}"] = benchmark.run() return results def dump(self): @@ -272,7 +273,7 @@ def dump(self): print(f"cmdline={' '.join(em.run_cmd(param))}", file=f) print("", file=f) - def get_monitoring(self) -> Optional[Monitoring]: + def get_monitoring(self) -> Monitoring | None: """Return the monitoring object""" return self.monitoring diff --git a/hwbench/bench/engine.py b/hwbench/bench/engine.py index 0b9b5c4..f4179c8 100644 --- a/hwbench/bench/engine.py +++ b/hwbench/bench/engine.py @@ -1,6 +1,7 @@ +from __future__ import annotations + import abc import pathlib -from typing import Optional from ..utils.external import External from ..utils.helpers import fatal @@ -41,7 +42,9 @@ def run(self, params: BenchmarkParameters): class EngineBase(External): - def __init__(self, name: str, binary: str, modules: dict[str, EngineModuleBase] = {}): + def __init__(self, name: str, binary: str, modules: dict[str, EngineModuleBase] | None = None): + if modules is None: + modules = {} External.__init__(self, pathlib.Path("")) self.engine_name = name self.binary = binary @@ -69,7 +72,7 @@ def add_module(self, engine_module: EngineModuleBase): def get_modules(self) -> dict[str, EngineModuleBase]: return self.modules - def get_module(self, module_name: str) -> Optional[EngineModuleBase]: + def get_module(self, module_name: str) -> EngineModuleBase | None: return self.modules.get(module_name) def module_exists(self, module_name) -> bool: diff --git a/hwbench/bench/monitoring.py b/hwbench/bench/monitoring.py index c35f10c..9a8d798 100644 --- a/hwbench/bench/monitoring.py +++ b/hwbench/bench/monitoring.py @@ -123,7 +123,7 @@ def __compact(self): # Do not compact metadata if metric_name in MonitoringMetadata.list_str(): continue - for _, component in metric_type.items(): + for component in metric_type.values(): for metric_name, metric in component.items(): metric.compact() diff --git a/hwbench/bench/test_benchmarks_common.py b/hwbench/bench/test_benchmarks_common.py index 98108e9..5c8e394 100644 --- a/hwbench/bench/test_benchmarks_common.py +++ b/hwbench/bench/test_benchmarks_common.py @@ -66,7 +66,7 @@ def parse_jobs_config(self, validate_parameters=True): with patch("hwbench.environment.turbostat.Turbostat.check_version") as cv: cv.return_value = True with patch("hwbench.environment.turbostat.Turbostat.run") as ts: - with open("hwbench/tests/parsing/turbostat/run", "r") as f: + with open("hwbench/tests/parsing/turbostat/run") as f: ts.return_value = ast.literal_eval(f.read()) return self.benches.parse_jobs_config(validate_parameters) diff --git a/hwbench/config/config.py b/hwbench/config/config.py index 7944302..925d1a1 100644 --- a/hwbench/config/config.py +++ b/hwbench/config/config.py @@ -86,7 +86,7 @@ def get_engine(self, section_name) -> str: def load_engine(self, engine_name) -> EngineBase: """Return the engine from type.""" - module = importlib.import_module("..engines.{}".format(engine_name), package="hwbench.engines") + module = importlib.import_module(f"..engines.{engine_name}", package="hwbench.engines") return module.Engine() def get_engine_module(self, section_name) -> str: @@ -220,11 +220,11 @@ def validate_section(self, section_name): """Validate section of a config file.""" for directive in self.get_section(section_name): if not self.is_valid_keyword(directive): - h.fatal("job {}: invalid keyword {}".format(section_name, directive)) + h.fatal(f"job {section_name}: invalid keyword {directive}") # Execute the validations_ from config_syntax file # It will validate the syntax of this particular function. # An invalid syntax is fatal and halts the program - validate_function = getattr(config_syntax, "validate_{}".format(directive)) + validate_function = getattr(config_syntax, f"validate_{directive}") message = validate_function(self, section_name, self.get_section(section_name)[directive]) if message: h.fatal(f"Job {section_name}: keyword {directive} : {message}") diff --git a/hwbench/config/test_parse.py b/hwbench/config/test_parse.py index 30873a7..33c334c 100644 --- a/hwbench/config/test_parse.py +++ b/hwbench/config/test_parse.py @@ -44,7 +44,7 @@ def test_keywords(self): iba.return_value = True self.get_jobs_config().validate_sections() except Exception as exc: - assert False, f"'validate_sections' detected a syntax error {exc}" + raise AssertionError(f"'validate_sections' detected a syntax error {exc}") def test_defaults(self): """Check if default values are properly set.""" diff --git a/hwbench/engines/spike.py b/hwbench/engines/spike.py index 8b32755..fcc1605 100644 --- a/hwbench/engines/spike.py +++ b/hwbench/engines/spike.py @@ -134,7 +134,9 @@ def get_fans_speed(self): ) return sum([fan.get_values()[-1] for _, fan in raw_fans.items()]) - def __spawn_stressor(self, additional_args=[], wait_stressor=False): + def __spawn_stressor(self, additional_args=None, wait_stressor=False): + if additional_args is None: + additional_args = [] args = [ self.engine_module.engine.get_binary(), "-c", diff --git a/hwbench/engines/stressng.py b/hwbench/engines/stressng.py index a9eacef..805ab2e 100644 --- a/hwbench/engines/stressng.py +++ b/hwbench/engines/stressng.py @@ -1,5 +1,6 @@ +from __future__ import annotations + import re -from typing import Optional from ..bench.benchmark import ExternalBench from ..bench.engine import EngineBase, EngineModuleBase @@ -61,7 +62,7 @@ def version_minor(self) -> int: return int(self.version.split(b".")[2]) return 0 - def get_version(self) -> Optional[str]: + def get_version(self) -> str | None: if self.version: return self.version.decode("utf-8") return None diff --git a/hwbench/engines/stressng_stream.py b/hwbench/engines/stressng_stream.py index cd63844..c50ff30 100644 --- a/hwbench/engines/stressng_stream.py +++ b/hwbench/engines/stressng_stream.py @@ -1,5 +1,7 @@ +from __future__ import annotations + import re -from typing import Any, Optional +from typing import Any from ..bench.parameters import BenchmarkParameters from .stressng import EngineBase, EngineModulePinnable, StressNG @@ -23,7 +25,7 @@ def run_cmd(self) -> list[str]: str(self.parameters.get_engine_instances_count()), ] - self.stream_l3_size: Optional[int] = None + self.stream_l3_size: int | None = None if self.stream_l3_size is not None: ret.extend(["--stream-l3-size", str(self.stream_l3_size)]) return ret diff --git a/hwbench/engines/stressng_vnni.py b/hwbench/engines/stressng_vnni.py index 228caf3..ac8ef23 100644 --- a/hwbench/engines/stressng_vnni.py +++ b/hwbench/engines/stressng_vnni.py @@ -1,5 +1,5 @@ -from collections.abc import Iterable -from typing import Callable, NamedTuple +from collections.abc import Callable, Iterable +from typing import NamedTuple from ..bench.parameters import BenchmarkParameters from ..environment.hardware import BaseHardware diff --git a/hwbench/environment/base.py b/hwbench/environment/base.py index 738db49..46de2e4 100644 --- a/hwbench/environment/base.py +++ b/hwbench/environment/base.py @@ -2,7 +2,6 @@ import pathlib from abc import ABC, abstractmethod -from typing import Optional # This is the interface of Environment @@ -13,5 +12,5 @@ def __init__(self, out_dir: pathlib.Path): pass @abstractmethod - def dump(self) -> dict[str, Optional[str | int] | dict]: + def dump(self) -> dict[str, str | int | None | dict]: return {} diff --git a/hwbench/environment/cpu.py b/hwbench/environment/cpu.py index 247858e..18d43ee 100644 --- a/hwbench/environment/cpu.py +++ b/hwbench/environment/cpu.py @@ -1,4 +1,4 @@ -from typing import Optional +from __future__ import annotations from .cpu_cores import CPU_CORES from .cpu_info import CPU_INFO @@ -54,7 +54,7 @@ def get_peer_siblings(self, logical_cpu) -> list[int]: """Return the list of logical cores running on the same physical core.""" return self.cpu_cores.get_peer_siblings(logical_cpu) - def get_peer_sibling(self, logical_cpu) -> Optional[int]: + def get_peer_sibling(self, logical_cpu) -> int | None: """Return sibling of a logical core.""" return self.cpu_cores.get_peer_sibling(logical_cpu) diff --git a/hwbench/environment/cpu_cores.py b/hwbench/environment/cpu_cores.py index 90c81bf..001199f 100644 --- a/hwbench/environment/cpu_cores.py +++ b/hwbench/environment/cpu_cores.py @@ -1,5 +1,6 @@ +from __future__ import annotations + import pathlib -from typing import Optional from ..utils.external import External @@ -81,7 +82,7 @@ def get_peer_siblings(self, logical_cpu) -> list[int]: return self.get_cores(socket, core) return [] - def get_peer_sibling(self, logical_cpu) -> Optional[int]: + def get_peer_sibling(self, logical_cpu) -> int | None: """Return sibling of a logical core.""" # Let's find the associated core/ht of a given logical_cpu for core in self.get_peer_siblings(logical_cpu): diff --git a/hwbench/environment/dmi.py b/hwbench/environment/dmi.py index 8ef3252..679c84b 100644 --- a/hwbench/environment/dmi.py +++ b/hwbench/environment/dmi.py @@ -2,7 +2,6 @@ import os import pathlib -from typing import Optional from ..utils.archive import create_tar_from_directory, extract_file_from_tar from ..utils.external import External @@ -18,19 +17,19 @@ def __init__(self, out_dir: pathlib.Path): create_tar_from_directory(self.SYS_DMI, pathlib.Path(self.tarfilename.as_posix())) @staticmethod - def bytes_to_dmi_info(payload: Optional[bytes]) -> Optional[str]: + def bytes_to_dmi_info(payload: bytes | None) -> str | None: if payload is None: return None return payload.decode("utf-8", "strict").replace("\n", "") @staticmethod - def extract_dmi_payload(tarfile: pathlib.Path, file: str, root_path=SYS_DMI) -> Optional[bytes]: + def extract_dmi_payload(tarfile: pathlib.Path, file: str, root_path=SYS_DMI) -> bytes | None: return extract_file_from_tar(tarfile.as_posix(), os.path.join(root_path, file)) - def info(self, name: str) -> Optional[str]: + def info(self, name: str) -> str | None: return self.bytes_to_dmi_info(self.extract_dmi_payload(self.tarfilename, name)) - def dump(self) -> dict[str, Optional[str | int] | dict]: + def dump(self) -> dict[str, str | int | None | dict]: return { "vendor": self.info("sys_vendor"), "product": self.info("product_name"), diff --git a/hwbench/environment/hardware.py b/hwbench/environment/hardware.py index 72ee6b4..dda8a2e 100644 --- a/hwbench/environment/hardware.py +++ b/hwbench/environment/hardware.py @@ -2,7 +2,6 @@ import pathlib from abc import abstractmethod -from typing import Optional from ..utils.external import External_Simple from .base import BaseEnvironment @@ -49,7 +48,7 @@ def __init__(self, out_dir: pathlib.Path, monitoring_config): DmidecodeRaw(out_dir).run() External_Simple(self.out_dir, ["ipmitool", "sdr"], "ipmitool-sdr") - def dump(self) -> dict[str, Optional[str | int] | dict]: + def dump(self) -> dict[str, str | int | None | dict]: dump = { "dmi": self.dmi.dump(), "cpu": self.cpu.dump(), diff --git a/hwbench/environment/mock.py b/hwbench/environment/mock.py index 960c3d1..51d12ed 100644 --- a/hwbench/environment/mock.py +++ b/hwbench/environment/mock.py @@ -1,9 +1,13 @@ +from __future__ import annotations + from .hardware import Hardware from .vendors.mock import MockVendor class MockHardware(Hardware): - def __init__(self, flags: list[str] = [], cores: int = 0, cpu=None): + def __init__(self, flags: list[str] | None = None, cores: int = 0, cpu=None): + if flags is None: + flags = [] self.cpu = cpu self.flags = flags self.cores = cores diff --git a/hwbench/environment/test_parse.py b/hwbench/environment/test_parse.py index 9ad188e..383ff3b 100644 --- a/hwbench/environment/test_parse.py +++ b/hwbench/environment/test_parse.py @@ -8,7 +8,7 @@ path = pathlib.Path("") -class TestParseCPU(object): +class TestParseCPU: def test_ami_aptio(self): d = pathlib.Path("./hwbench/tests/parsing/ami_aptio/v5") print(f"parsing test {d.name}") diff --git a/hwbench/environment/test_vendors.py b/hwbench/environment/test_vendors.py index 8706c23..76e3121 100644 --- a/hwbench/environment/test_vendors.py +++ b/hwbench/environment/test_vendors.py @@ -83,7 +83,7 @@ def tearDown(self): def sample(self, name): """Return the samples for this test.""" output = None - file = open(self.__get_samples_file_name(name), "r") + file = open(self.__get_samples_file_name(name)) output = file.readlines() # If the file is empty but json output is requested, let's return an empty string if not len(output): @@ -118,17 +118,17 @@ def generic_power_output(self): def generic_test(self, expected_output, func): for pc in func: if pc not in expected_output.keys(): - assert False, f"Missing Physical Context '{pc}' in expected_output" + raise AssertionError(f"Missing Physical Context '{pc}' in expected_output") for sensor in func[pc]: if sensor not in expected_output[pc]: - assert False, f"Missing sensor '{sensor}' in '{pc}'" + raise AssertionError(f"Missing sensor '{sensor}' in '{pc}'") if func[pc][sensor] != expected_output[pc][sensor]: print( f"name: |{func[pc][sensor].get_name()}| vs |{expected_output[pc][sensor].get_name()}|\n" f"value:|{func[pc][sensor].get_values()}| vs |{expected_output[pc][sensor].get_values()}|\n" f"unit: |{func[pc][sensor].get_unit()}| vs |{expected_output[pc][sensor].get_unit()}|" ) - assert False, "Metrics do not match" + raise AssertionError("Metrics do not match") def generic_thermal_test(self, expected_output): return self.generic_test(expected_output, self.get_vendor().get_bmc().read_thermals({})) diff --git a/hwbench/environment/turbostat.py b/hwbench/environment/turbostat.py index cf07102..1184b51 100644 --- a/hwbench/environment/turbostat.py +++ b/hwbench/environment/turbostat.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import os import re import subprocess @@ -35,9 +37,13 @@ class Turbostat: def __init__( self, hardware: BaseHardware, - freq_metrics: dict[str, dict[str, dict[str, MonitorMetric]]] = {}, - power_metrics: dict[str, dict[str, dict[str, MonitorMetric]]] = {}, + freq_metrics: dict[str, dict[str, dict[str, MonitorMetric]]] | None = None, + power_metrics: dict[str, dict[str, dict[str, MonitorMetric]]] | None = None, ): + if power_metrics is None: + power_metrics = {} + if freq_metrics is None: + freq_metrics = {} self.__output = None self.cores_count = 0 self.sensor_list = { diff --git a/hwbench/environment/vendors/bmc.py b/hwbench/environment/vendors/bmc.py index 3db5ea4..9bcf93f 100644 --- a/hwbench/environment/vendors/bmc.py +++ b/hwbench/environment/vendors/bmc.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import functools import os.path import pathlib @@ -131,8 +133,12 @@ def get_thermal(self) -> dict: return next(iter(th.values())) # return only element return {} # return nothing if there are more than 1 elements - def read_thermals(self, thermals: dict[str, dict[str, Temperature]] = {}) -> dict[str, dict[str, Temperature]]: + def read_thermals( + self, thermals: dict[str, dict[str, Temperature]] | None = None + ) -> dict[str, dict[str, Temperature]]: """Return thermals from server""" + if thermals is None: + thermals = {} th = self._get_thermals() for chassis, thermal in th.items(): prefix = "" @@ -152,9 +158,11 @@ def read_thermals(self, thermals: dict[str, dict[str, Temperature]] = {}) -> dic ) return thermals - def read_fans(self, fans: dict[str, dict[str, MonitorMetric]] = {}) -> dict[str, dict[str, MonitorMetric]]: + def read_fans(self, fans: dict[str, dict[str, MonitorMetric]] | None = None) -> dict[str, dict[str, MonitorMetric]]: """Return fans from server""" # Generic for now, could be override by vendors + if fans is None: + fans = {} if str(FanContext.FAN) not in fans: fans[str(FanContext.FAN)] = {} # type: ignore[no-redef] for f in self.get_thermal().get("Fans", []): @@ -178,10 +186,12 @@ def get_power(self): return {} # return nothing if there are more than 1 elements def read_power_consumption( - self, power_consumption: dict[str, dict[str, Power]] = {} + self, power_consumption: dict[str, dict[str, Power]] | None = None ) -> dict[str, dict[str, Power]]: """Return power consumption from server""" # Generic for now, could be override by vendors + if power_consumption is None: + power_consumption = {} if str(PowerContext.BMC) not in power_consumption: power_consumption[str(PowerContext.BMC)] = {str(PowerCategories.SERVER): Power(str(PowerCategories.SERVER))} # type: ignore[no-redef] @@ -190,9 +200,13 @@ def read_power_consumption( power_consumption[str(PowerContext.BMC)][str(PowerCategories.SERVER)].add(power) return power_consumption - def read_power_supplies(self, power_supplies: dict[str, dict[str, Power]] = {}) -> dict[str, dict[str, Power]]: + def read_power_supplies( + self, power_supplies: dict[str, dict[str, Power]] | None = None + ) -> dict[str, dict[str, Power]]: """Return power supplies power from server""" # Generic for now, could be override by vendors + if power_supplies is None: + power_supplies = {} if str(PowerContext.BMC) not in power_supplies: power_supplies[str(PowerContext.BMC)] = {} # type: ignore[no-redef] for psu in self.get_power().get("PowerSupplies", []): diff --git a/hwbench/environment/vendors/dell/dell.py b/hwbench/environment/vendors/dell/dell.py index 10ef678..5df32f9 100644 --- a/hwbench/environment/vendors/dell/dell.py +++ b/hwbench/environment/vendors/dell/dell.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from typing import cast from ....bench.monitoring_structs import ( @@ -19,7 +21,11 @@ class IDRAC(BMC): def get_thermal(self): return self.get_redfish_url("/redfish/v1/Chassis/System.Embedded.1/Thermal") - def read_thermals(self, thermals: dict[str, dict[str, Temperature]] = {}) -> dict[str, dict[str, Temperature]]: + def read_thermals( + self, thermals: dict[str, dict[str, Temperature]] | None = None + ) -> dict[str, dict[str, Temperature]]: + if thermals is None: + thermals = {} for t in self.get_thermal().get("Temperatures"): if t["ReadingCelsius"] is None or t["ReadingCelsius"] <= 0: continue @@ -68,7 +74,9 @@ def get_oem_system(self): self.oem_endpoint = new_oem_endpoint return oem - def read_power_consumption(self, power_consumption: dict[str, dict[str, Power]] = {}): + def read_power_consumption(self, power_consumption: dict[str, dict[str, Power]] | None = None): + if power_consumption is None: + power_consumption = {} power_consumption = super().read_power_consumption(power_consumption) oem_system = self.get_oem_system() if "ServerPwr.1.SCViewSledPwr" in oem_system["Attributes"]: diff --git a/hwbench/environment/vendors/detect.py b/hwbench/environment/vendors/detect.py index 3ae40d6..45a1605 100644 --- a/hwbench/environment/vendors/detect.py +++ b/hwbench/environment/vendors/detect.py @@ -23,4 +23,4 @@ def first_matching_vendor(out_dir: pathlib.Path, dmi: DmiSys, monitoring_config_ # If the vendor matched, it may need to prepare some stuff v.prepare() return v - assert False, "Unreachable: the GenericVendor should have been selected" + raise AssertionError("Unreachable: the GenericVendor should have been selected") diff --git a/hwbench/environment/vendors/hpe/hpe.py b/hwbench/environment/vendors/hpe/hpe.py index 44e1fea..78ab209 100644 --- a/hwbench/environment/vendors/hpe/hpe.py +++ b/hwbench/environment/vendors/hpe/hpe.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import logging import pathlib import re @@ -38,7 +40,11 @@ def get_url(self) -> str: def get_thermal(self): return self.get_redfish_url("/redfish/v1/Chassis/1/Thermal") - def read_thermals(self, thermals: dict[str, dict[str, Temperature]] = {}) -> dict[str, dict[str, Temperature]]: + def read_thermals( + self, thermals: dict[str, dict[str, Temperature]] | None = None + ) -> dict[str, dict[str, Temperature]]: + if thermals is None: + thermals = {} for t in self.get_thermal().get("Temperatures"): if t["ReadingCelsius"] <= 0: continue @@ -88,8 +94,12 @@ def get_power(self): def __warn_psu(self, psu_number, message): logging.error(f"PSU {psu_number}: {message}") - def read_power_supplies(self, power_supplies: dict[str, dict[str, Power]] = {}) -> dict[str, dict[str, Power]]: + def read_power_supplies( + self, power_supplies: dict[str, dict[str, Power]] | None = None + ) -> dict[str, dict[str, Power]]: """Return power supplies power from server""" + if power_supplies is None: + power_supplies = {} if str(PowerContext.BMC) not in power_supplies: power_supplies[str(PowerContext.BMC)] = {} # type: ignore[no-redef] for psu in self.get_power().get("PowerSupplies"): @@ -124,7 +134,9 @@ def read_power_supplies(self, power_supplies: dict[str, dict[str, Power]] = {}) return power_supplies - def read_power_consumption(self, power_consumption: dict[str, dict[str, Power]] = {}): + def read_power_consumption(self, power_consumption: dict[str, dict[str, Power]] | None = None): + if power_consumption is None: + power_consumption = {} oem_chassis = self.get_oem_chassis() # If server is not in a chassis, the default parsing is good diff --git a/hwbench/environment/vendors/hpe/ilorest.py b/hwbench/environment/vendors/hpe/ilorest.py index 0d20a03..d552e4c 100644 --- a/hwbench/environment/vendors/hpe/ilorest.py +++ b/hwbench/environment/vendors/hpe/ilorest.py @@ -87,18 +87,18 @@ def login(self, last_try=False): # We cannot login because of CreateLimitReachedForResource # Let's reset the bmc and retry if return_code == 32: - h.fatal("Cannot login to local ilo, return_code = {}".format(return_code)) + h.fatal(f"Cannot login to local ilo, return_code = {return_code}") elif return_code == 64: h.fatal("Cannot login to local ilo", details="BMC is missing") elif return_code == 0: self.logged = True return True else: - h.fatal("Cannot login to local ilo, return_code = {}".format(return_code)) + h.fatal(f"Cannot login to local ilo, return_code = {return_code}") def raw_get(self, endpoint, to_json=False): """Perform a raw get.""" - command = "rawget /redfish/v1{}".format(endpoint) + command = f"rawget /redfish/v1{endpoint}" return_code, rawget = self.__ilorest(command) if return_code != 0: raise subprocess.CalledProcessError(returncode=return_code, cmd=command) @@ -108,11 +108,11 @@ def raw_get(self, endpoint, to_json=False): def get(self, endpoint, select=None, filter=None, to_json=False): """Perform a get.""" - command = "get {}".format(endpoint) + command = f"get {endpoint}" if select: - command += " --select {}".format(select) + command += f" --select {select}" if filter: - command += ' --filter "{}"'.format(filter) + command += f' --filter "{filter}"' command += " -j" return_code, get = self.__ilorest(command) if return_code != 0: @@ -125,9 +125,9 @@ def list(self, select, filter=None, to_json=False): """Perform a get.""" command = "list " if select: - command += " --select {}".format(select) + command += f" --select {select}" if filter: - command += ' --filter "{}"'.format(filter) + command += f' --filter "{filter}"' command += " -j" return_code, get = self.__ilorest(command) if return_code != 0: diff --git a/hwbench/environment/vendors/mock.py b/hwbench/environment/vendors/mock.py index e7cbc0e..d15264a 100644 --- a/hwbench/environment/vendors/mock.py +++ b/hwbench/environment/vendors/mock.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from typing import cast from ...bench.monitoring_structs import ( @@ -20,8 +22,12 @@ def detect(self): self.firmware_version = "1.0.0" self.model = "MockedBMC" - def read_thermals(self, thermals: dict[str, dict[str, Temperature]] = {}) -> dict[str, dict[str, Temperature]]: + def read_thermals( + self, thermals: dict[str, dict[str, Temperature]] | None = None + ) -> dict[str, dict[str, Temperature]]: # Let's add a faked thermal metric + if thermals is None: + thermals = {} name = "CPU1" super().add_monitoring_value( @@ -33,8 +39,10 @@ def read_thermals(self, thermals: dict[str, dict[str, Temperature]] = {}) -> dic ) return thermals - def read_fans(self, fans: dict[str, dict[str, MonitorMetric]] = {}) -> dict[str, dict[str, MonitorMetric]]: + def read_fans(self, fans: dict[str, dict[str, MonitorMetric]] | None = None) -> dict[str, dict[str, MonitorMetric]]: # Let's add a faked fans metric + if fans is None: + fans = {} name = "Fan1" super().add_monitoring_value( cast(dict[str, dict[str, MonitorMetric]], fans), @@ -46,9 +54,11 @@ def read_fans(self, fans: dict[str, dict[str, MonitorMetric]] = {}) -> dict[str, return fans def read_power_consumption( - self, power_consumption: dict[str, dict[str, Power]] = {} + self, power_consumption: dict[str, dict[str, Power]] | None = None ) -> dict[str, dict[str, Power]]: # Let's add a faked power metric + if power_consumption is None: + power_consumption = {} name = str(PowerCategories.CHASSIS) super().add_monitoring_value( cast(dict[str, dict[str, MonitorMetric]], power_consumption), @@ -59,8 +69,12 @@ def read_power_consumption( ) return power_consumption - def read_power_supplies(self, power_supplies: dict[str, dict[str, Power]] = {}) -> dict[str, dict[str, Power]]: + def read_power_supplies( + self, power_supplies: dict[str, dict[str, Power]] | None = None + ) -> dict[str, dict[str, Power]]: # Let's add a faked power supplies + if power_supplies is None: + power_supplies = {} status = "PS1 status" name = "PS1" super().add_monitoring_value( diff --git a/hwbench/environment/vendors/monitoring_device.py b/hwbench/environment/vendors/monitoring_device.py index aac2fbe..e707d95 100644 --- a/hwbench/environment/vendors/monitoring_device.py +++ b/hwbench/environment/vendors/monitoring_device.py @@ -105,13 +105,13 @@ def connect_redfish(self, username: str, password: str, device_url: str): self.redfish_obj.login() self.logged = True except json.decoder.JSONDecodeError: - h.fatal("JSONDecodeError on {}".format(device_url)) + h.fatal(f"JSONDecodeError on {device_url}") except redfish.rest.v1.RetriesExhaustedError: - h.fatal("RetriesExhaustedError on {}".format(device_url)) + h.fatal(f"RetriesExhaustedError on {device_url}") except redfish.rest.v1.BadRequestError: - h.fatal("BadRequestError on {}".format(device_url)) + h.fatal(f"BadRequestError on {device_url}") except redfish.rest.v1.InvalidCredentialsError: - h.fatal("Invalid credentials for {}".format(device_url)) + h.fatal(f"Invalid credentials for {device_url}") except Exception as exception: h.fatal(type(exception)) diff --git a/hwbench/environment/vendors/pdu.py b/hwbench/environment/vendors/pdu.py index b2de477..84886ba 100644 --- a/hwbench/environment/vendors/pdu.py +++ b/hwbench/environment/vendors/pdu.py @@ -1,7 +1,6 @@ -from ...bench.monitoring_structs import ( - Power, - PowerContext, -) +from __future__ import annotations + +from ...bench.monitoring_structs import Power, PowerContext from ...utils import helpers as h from .monitoring_device import MonitoringDevice @@ -38,10 +37,12 @@ def get_power(self): return {} def read_power_consumption( - self, power_consumption: dict[str, dict[str, Power]] = {} + self, power_consumption: dict[str, dict[str, Power]] | None = None ) -> dict[str, dict[str, Power]]: """Return power consumption from server""" # Generic for now, could be override by vendors + if power_consumption is None: + power_consumption = {} if str(PowerContext.PDU) not in power_consumption: power_consumption[str(PowerContext.PDU)] = {} # type: ignore[no-redef] diff --git a/hwbench/environment/vendors/pdus/__init__.py b/hwbench/environment/vendors/pdus/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/hwbench/environment/vendors/pdus/raritan.py b/hwbench/environment/vendors/pdus/raritan.py index f01ec4e..ef213b8 100644 --- a/hwbench/environment/vendors/pdus/raritan.py +++ b/hwbench/environment/vendors/pdus/raritan.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from ....bench.monitoring_structs import Power, PowerContext from ....utils import helpers as h from ..pdu import PDU @@ -31,9 +33,11 @@ def get_power(self): return self.get_redfish_url(f"/redfish/v1/PowerEquipment/RackPDUs/1/Outlets/{self.outlet}/") def read_power_consumption( - self, power_consumption: dict[str, dict[str, Power]] = {} + self, power_consumption: dict[str, dict[str, Power]] | None = None ) -> dict[str, dict[str, Power]]: """Return power consumption from pdu""" + if power_consumption is None: + power_consumption = {} power_consumption = super().read_power_consumption(power_consumption) power_consumption[str(PowerContext.PDU)][self.get_name()].add(self.get_power().get("PowerWatts")["Reading"]) return power_consumption diff --git a/hwbench/environment/vendors/vendor.py b/hwbench/environment/vendors/vendor.py index 7b44332..6705fc1 100644 --- a/hwbench/environment/vendors/vendor.py +++ b/hwbench/environment/vendors/vendor.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import configparser import os from abc import ABC, abstractmethod @@ -41,7 +43,7 @@ def _load_vendor(self, directory: str, vendor: str): vendor_modulename = f"hwbench.environment.vendors.{directory}.{vendor}" if not find_spec(vendor_modulename): - h.fatal("cannot_find vendor module {}".format(vendor_modulename)) + h.fatal(f"cannot_find vendor module {vendor_modulename}") return import_module(vendor_modulename) @@ -67,8 +69,10 @@ def get_pdus(self) -> list[PDU]: """Return a list of PDUs object""" return self.pdus - def find_monitoring_sections(self, section_type: str, sections_list=[], max_sections=0): + def find_monitoring_sections(self, section_type: str, sections_list: list | None = None, max_sections=0): """Return sections of section_type from the monitoring configuration file""" + if sections_list is None: + sections_list = [] sections = [] if not self.get_monitoring_config_filename(): h.fatal("Missing monitoring configuration file, please use -m option.") diff --git a/hwbench/tests/__init__.py b/hwbench/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/hwbench/tests/test_env.py b/hwbench/tests/test_env.py index 5a69ef1..33edbfd 100644 --- a/hwbench/tests/test_env.py +++ b/hwbench/tests/test_env.py @@ -1,3 +1,3 @@ -class TestEnv(object): +class TestEnv: def test_dummy(self): print("dummy test") diff --git a/hwbench/tuning/power_profile.py b/hwbench/tuning/power_profile.py index f0968c0..54b48a6 100644 --- a/hwbench/tuning/power_profile.py +++ b/hwbench/tuning/power_profile.py @@ -21,7 +21,7 @@ def run(self) -> None: log.info("skip PerformancePowerProfile as no cpu governor detected") return pattern = re.compile("cpu[0-9]+") - for rootpath, dirnames, filenames in os.walk("/sys/devices/system/cpu"): + for rootpath, dirnames, _filenames in os.walk("/sys/devices/system/cpu"): for dirname in dirnames: if pattern.match(dirname): file = pathlib.Path(rootpath).joinpath(dirname, "cpufreq", "scaling_governor") diff --git a/hwbench/tuning/scheduler.py b/hwbench/tuning/scheduler.py index 6d5ab23..271c316 100644 --- a/hwbench/tuning/scheduler.py +++ b/hwbench/tuning/scheduler.py @@ -11,7 +11,7 @@ def __init__(self, out_dir, scheduler): def run(self): log = tunninglog() - for rootpath, dirnames, filenames in os.walk("/sys/block"): + for rootpath, dirnames, _filenames in os.walk("/sys/block"): for dirname in dirnames: diskdir = pathlib.Path(rootpath) / dirname file = diskdir / "queue/scheduler" diff --git a/hwbench/utils/archive.py b/hwbench/utils/archive.py index 59a8d49..98da6b8 100644 --- a/hwbench/utils/archive.py +++ b/hwbench/utils/archive.py @@ -1,9 +1,10 @@ +from __future__ import annotations + import errno import io import os import pathlib import tarfile -from typing import Optional def create_tar_from_directory(dir: str, tarfilename: pathlib.Path) -> None: @@ -11,7 +12,7 @@ def create_tar_from_directory(dir: str, tarfilename: pathlib.Path) -> None: following the symlinks.""" # may raise tarfile.ReadError if tarfilename is not a tar file tarfd = tarfile.open(tarfilename, "x") - for rootpath, dirnames, filenames in os.walk(dir): + for rootpath, _dirnames, filenames in os.walk(dir): for filename in filenames: file = pathlib.Path(rootpath) / filename try: @@ -27,7 +28,7 @@ def create_tar_from_directory(dir: str, tarfilename: pathlib.Path) -> None: return None -def extract_file_from_tar(tarfilename: str, filename: str) -> Optional[bytes]: +def extract_file_from_tar(tarfilename: str, filename: str) -> bytes | None: """return a specific file in a tar archive as bytes if the file exists.""" # may raise tarfile.ReadError if tarfilename is not a tar file diff --git a/hwbench/utils/external.py b/hwbench/utils/external.py index 63b7b08..b96735b 100644 --- a/hwbench/utils/external.py +++ b/hwbench/utils/external.py @@ -89,7 +89,7 @@ def run_cmd(self) -> list[str]: return self.cmd_list def parse_version(self, stdout: bytes, _stderr: bytes) -> bytes: - return bytes() + return b"" def parse_cmd(self, stdout: bytes, stderr: bytes): return {} diff --git a/hwbench/utils/helpers.py b/hwbench/utils/helpers.py index 651d347..501545c 100644 --- a/hwbench/utils/helpers.py +++ b/hwbench/utils/helpers.py @@ -1,9 +1,11 @@ +from __future__ import annotations + import datetime import logging import sys from datetime import timedelta from shutil import which -from typing import NoReturn, Optional +from typing import NoReturn def fatal(message) -> NoReturn: @@ -42,7 +44,7 @@ def cpu_list_to_range(cpu_list: list[int]) -> str: """ cpu_list.sort() output: list[str] = [] - previous_entry: Optional[int] = cpu_list[0] + previous_entry: int | None = cpu_list[0] for i in range(1, len(cpu_list)): current_entry = cpu_list[i]