diff --git a/hwbench/bench/test_benchmarks.py b/hwbench/bench/test_benchmarks.py index aaa0b10..e1a2b6d 100644 --- a/hwbench/bench/test_benchmarks.py +++ b/hwbench/bench/test_benchmarks.py @@ -10,7 +10,7 @@ def __init__(self, *args, **kwargs): # We need to patch list_module_parameters() function # to avoid considering the local stress-ng binary with patch( - "hwbench.engines.stressng.EngineModuleCpu.list_module_parameters" + "hwbench.engines.stressng_cpu.EngineModuleCpu.list_module_parameters" ) as p: print(pathlib.Path(".")) p.return_value = ( @@ -98,7 +98,7 @@ def get_monitoring_members(metric: Metrics): def test_stream_short(self): with patch( - "hwbench.engines.stressng.EngineModuleCpu.list_module_parameters" + "hwbench.engines.stressng_cpu.EngineModuleCpu.list_module_parameters" ) as p: p.return_value = ( pathlib.Path("./hwbench/tests/parsing/stressngmethods/v17/stdout") diff --git a/hwbench/config/test_parse.py b/hwbench/config/test_parse.py index 7ed4d12..70d285c 100644 --- a/hwbench/config/test_parse.py +++ b/hwbench/config/test_parse.py @@ -8,7 +8,7 @@ class TestParseConfig(tbc.TestCommon): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) with patch( - "hwbench.engines.stressng.EngineModuleCpu.list_module_parameters" + "hwbench.engines.stressng_cpu.EngineModuleCpu.list_module_parameters" ) as p: p.return_value = ( pathlib.Path("./hwbench/tests/parsing/stressngmethods/v17/stdout") @@ -40,7 +40,7 @@ def test_keywords(self): # We need to patch list_module_parameters() function # to avoid considering the local stress-ng binary with patch( - "hwbench.engines.stressng.EngineModuleCpu.list_module_parameters" + "hwbench.engines.stressng_cpu.EngineModuleCpu.list_module_parameters" ) as p: p.return_value = ( pathlib.Path("./hwbench/tests/parsing/stressngmethods/v17/stdout") @@ -56,7 +56,7 @@ def test_keywords(self): def test_defaults(self): """Check if default values are properly set.""" with patch( - "hwbench.engines.stressng.EngineModuleCpu.list_module_parameters" + "hwbench.engines.stressng_cpu.EngineModuleCpu.list_module_parameters" ) as p: p.return_value = ( pathlib.Path("./hwbench/tests/parsing/stressngmethods/v17/stdout") diff --git a/hwbench/engines/stressng.py b/hwbench/engines/stressng.py index c866c69..e2a3518 100644 --- a/hwbench/engines/stressng.py +++ b/hwbench/engines/stressng.py @@ -1,12 +1,6 @@ -import os import re -import subprocess -from collections.abc import Iterable -from typing import NamedTuple, Callable +from typing import Optional -from typing import Optional, Any - -from ..environment.hardware import BaseHardware from ..bench.parameters import BenchmarkParameters from ..bench.engine import EngineBase, EngineModuleBase from ..bench.benchmark import ExternalBench @@ -29,135 +23,16 @@ def validate_module_parameters(self, params: BenchmarkParameters): return "" -class EngineModuleQsort(EngineModulePinnable): - """This class implements the Qsort EngineModuleBase for StressNG""" - - def __init__(self, engine: EngineBase, engine_module_name: str): - super().__init__(engine, engine_module_name) - self.engine_module_name = engine_module_name - self.add_module_parameter("qsort") - - def run_cmd(self, p: BenchmarkParameters): - return StressNGQsort(self, p).run_cmd() - - def run(self, p: BenchmarkParameters): - return StressNGQsort(self, p).run() - - def fully_skipped_job(self, p) -> bool: - return StressNGQsort(self, p).fully_skipped_job() - - -class EngineModuleMemrate(EngineModulePinnable): - """This class implements the Memrate EngineModuleBase for StressNG""" - - def __init__(self, engine: EngineBase, engine_module_name: str): - super().__init__(engine, engine_module_name) - self.engine_module_name = engine_module_name - self.add_module_parameter("memrate") - - def run_cmd(self, p: BenchmarkParameters): - return StressNGMemrate(self, p).run_cmd() - - def run(self, p: BenchmarkParameters): - return StressNGMemrate(self, p).run() - - def fully_skipped_job(self, p) -> bool: - return StressNGMemrate(self, p).fully_skipped_job() - - -class EngineModuleStream(EngineModulePinnable): - """This class implements the Stream EngineModuleBase for StressNG""" - - def __init__(self, engine: EngineBase, engine_module_name: str): - super().__init__(engine, engine_module_name) - self.engine_module_name = engine_module_name - self.add_module_parameter("stream") - - def run_cmd(self, p: BenchmarkParameters): - return StressNGStream(self, p).run_cmd() - - def run(self, p: BenchmarkParameters): - return StressNGStream(self, p).run() - - def validate_module_parameters(self, params: BenchmarkParameters): - msg = super().validate_module_parameters(params) - if params.get_runtime() < 5: - return f"{msg}; StressNGStream needs at least a 5s of run time" - return msg - - def fully_skipped_job(self, p) -> bool: - return StressNGStream(self, p).fully_skipped_job() - - -class EngineModuleVNNI(EngineModulePinnable): - """This class implements the VNNI EngineModuleBase for StressNG""" - - def __init__(self, engine: EngineBase, engine_module_name: str): - super().__init__(engine, engine_module_name) - self.engine_module_name = engine_module_name - self.methods = StressNGVNNIMethods() - for method in self.methods.enumerate(): - self.add_module_parameter(method) - - def fully_skipped_job(self, p) -> bool: - return StressNGVNNI(self, p).fully_skipped_job() - - def run_cmd(self, p: BenchmarkParameters): - return StressNGVNNI(self, p).run_cmd() - - def run(self, p: BenchmarkParameters): - return StressNGVNNI(self, p).run() - - def validate_module_parameters(self, params: BenchmarkParameters): - msg = super().validate_module_parameters(params) - method = params.get_engine_module_parameter() - if not self.methods.available(method): - msg += f"Unknown method {method}\n" - if not self.methods.cpu_check(method, params.get_hw()): - print(f"WARNING: CPU does not support method {method}, perf will be 0") - return msg - - -class EngineModuleCpu(EngineModulePinnable): - """This class implements the EngineModuleBase for StressNG""" - - def __init__(self, engine: EngineBase, engine_module_name: str): - super().__init__(engine, engine_module_name) - self.engine_module_name = engine_module_name - self.load_module_parameter() - - def list_module_parameters(self): - english_env = os.environ.copy() - english_env["LC_ALL"] = "C" - cmd_out = subprocess.run( - [self.engine.get_binary(), "--cpu-method", "list"], - capture_output=True, - env=english_env, - stdin=subprocess.DEVNULL, - ) - return (cmd_out.stdout or cmd_out.stderr).split(b":", 1) - - def load_module_parameter(self): - out = self.list_module_parameters() - methods = out[1].decode("utf-8").split() - methods.remove("all") - for method in methods: - self.add_module_parameter(method) - - def run_cmd(self, p: BenchmarkParameters): - return StressNGCPU(self, p).run_cmd() - - def run(self, p: BenchmarkParameters): - return StressNGCPU(self, p).run() - - def fully_skipped_job(self, p) -> bool: - return StressNGCPU(self, p).fully_skipped_job() - - class Engine(EngineBase): """The main stressn2 class.""" def __init__(self): + from .stressng_cpu import EngineModuleCpu + from .stressng_qsort import EngineModuleQsort + from .stressng_memrate import EngineModuleMemrate + from .stressng_stream import EngineModuleStream + from .stressng_vnni import EngineModuleVNNI + super().__init__("stressng", "stress-ng") self.add_module(EngineModuleCpu(self, "cpu")) self.add_module(EngineModuleQsort(self, "qsort")) @@ -294,336 +169,3 @@ def empty_result(self): "effective_runtime": 0, "skipped": True, } - - -class StressNGCPU(StressNG): - """The StressNG CPU stressor.""" - - def run_cmd(self) -> list[str]: - # Let's build the command line to run the tool - return super().run_cmd() + [ - "--cpu", - str(self.parameters.get_engine_instances_count()), - "--cpu-method", - self.parameters.get_engine_module_parameter(), - ] - - -class StressNGQsort(StressNG): - """The StressNG Qsort CPU stressor.""" - - def run_cmd(self) -> list[str]: - # Let's build the command line to run the tool - return super().run_cmd() + [ - "--qsort", - str(self.parameters.get_engine_instances_count()), - ] - - -class StressNGStream(StressNG): - """The StressNG STREAM memory stressor.""" - - def run_cmd(self) -> list[str]: - # TODO: handle get_pinned_cpu ; it does not necessarily make sense for this - # benchmark, but it could be revisited once we support pinning on multiple CPUs. - skip = self.need_skip_because_version() - if skip: - return skip - ret: list[str] = [ - self.engine_module.get_engine().get_binary(), - "--timeout", - str(self.parameters.get_runtime()), - "--metrics", - "--stream", - str(self.parameters.get_engine_instances_count()), - ] - - self.stream_l3_size: Optional[int] = None - if self.stream_l3_size is not None: - ret.extend(["--stream-l3-size", str(self.stream_l3_size)]) - return ret - - def empty_result(self): - detail_size = self.parameters.get_engine_instances_count() - return { - "detail": { - "read": [0] * detail_size, - "write": [0] * detail_size, - "Mflop/s": [0] * detail_size, - }, - "avg_read": 0.0, - "avg_write": 0.0, - "avg_Mflop/s": 0.0, - "avg_total": 0.0, - "sum_read": 0.0, - "sum_write": 0.0, - "sum_Mflop/s": 0.0, - "sum_total": 0.0, - "skipped": True, - "effective_runtime": 0.0, - } - - def parse_cmd(self, stdout: bytes, stderr: bytes) -> dict[str, Any]: - if self.skip: - return self.parameters.get_result_format() | self.empty_result() - detail_rate = re.compile(r"\] stream: memory rate: ") - summary_rate = re.compile(r"\] stream ") - detail_parse = re.compile( - r"memory rate: (?P[0-9\.]+) MB read/sec, " - r"(?P[0-9\.]+) MB write/sec, " - r"(?P[0-9\.]+) double precision Mflop/sec " - r"\(instance (?P[0-9]+)\)" - ) - summary_parse = re.compile( - r"stream\s+(?P[0-9\.]+) " - r"(?:(?:MB per sec memory)|(?:Mflop per sec \(double precision\))) " - r"(?Pread|write|compute) rate" - ) - - out = (stdout or stderr).splitlines() - - detail = [str(line) for line in out if detail_rate.search(str(line))] - summary = [str(line) for line in out if summary_rate.search(str(line))] - - detail_size = self.parameters.get_engine_instances_count() or len(detail) - - ret: dict[str, Any] = { - "detail": { - "read": [0] * detail_size, - "write": [0] * detail_size, - "Mflop/s": [0] * detail_size, - }, - "avg_read": 0.0, - "avg_write": 0.0, - "avg_Mflop/s": 0.0, - "avg_total": 0.0, - "sum_read": 0.0, - "sum_write": 0.0, - "sum_Mflop/s": 0.0, - "sum_total": 0.0, - "effective_runtime": 0.0, - } - - for line in detail: - matches = detail_parse.search(line) - if matches is not None: - r = matches.groupdict() - instance = int(r["instance"]) - ret["detail"]["read"][instance] = float(r["read"]) - ret["detail"]["write"][instance] = float(r["write"]) - ret["detail"]["Mflop/s"][instance] = float(r["flop"]) - ret["sum_read"] += float(r["read"]) - ret["sum_write"] += float(r["write"]) - ret["sum_Mflop/s"] += float(r["flop"]) - - for line in summary: - matches = summary_parse.search(line) - if matches is not None: - r = matches.groupdict() - source = r["source"] - if source == "read" or source == "write": - ret[f"avg_{source}"] = float(r["rate"]) - elif source == "compute": - ret["avg_Mflop/s"] = float(r["rate"]) - - stats = self.stats_parse().search(line) - if stats: - ret["effective_runtime"] = float(stats["real_time"]) - - # Let's build the grand total of read + write - ret["sum_total"] = ret["sum_read"] + ret["sum_write"] - ret["avg_total"] = ret["avg_read"] + ret["avg_write"] - - return ret | self.parameters.get_result_format() - - -class StressNGVNNIMethods: - class Method(NamedTuple): - check: Callable[[BaseHardware], bool] - parameters: list[str] - - def __init__(self): - M = StressNGVNNIMethods.Method - - def check_support(flag: str) -> Callable[[BaseHardware], bool]: - def check(hw: BaseHardware) -> bool: - return flag in hw.cpu_flags() - - return check - - # We don't enumerate with stress-ng --vnni-method list because we still need to - # be able to check for compatibility. - self.methods = { - # tests generic methods - "noavx_vpaddb": M(lambda _: True, ["--vnni-method", "vpaddb"]), - "noavx_vpdpbusd": M(lambda _: True, ["--vnni-method", "vpdpbusd"]), - "noavx_vpdpwssd": M(lambda _: True, ["--vnni-method", "vpdpwssd"]), - # avx512 bw, relatively common - "avx_vpaddb512": M( - check_support("avx512bw"), - ["--vnni-method", "vpaddb512", "--vnni-intrinsic"], - ), - # avx512 vnni, Xeon 3rd gen+ and Zen4 + - "avx_vpdpbusd512": M( - check_support("avx512_vnni"), - ["--vnni-method", "vpdpbusd512", "--vnni-intrinsic"], - ), - "avx_vpdpwssd512": M( - check_support("avx512_vnni"), - ["--vnni-method", "vpdpwssd512", "--vnni-intrinsic"], - ), - # avx vnni, Xeon 4th gen+) - "avx_vpaddb128": M( - check_support("avx_vnni"), - ["--vnni-method", "vpaddb128", "--vnni-intrinsic"], - ), - "avx_vpaddb256": M( - check_support("avx_vnni"), - ["--vnni-method", "vpaddb256", "--vnni-intrinsic"], - ), - "avx_vpdpbusd128": M( - check_support("avx_vnni"), - ["--vnni-method", "vpdpbusd128", "--vnni-intrinsic"], - ), - "avx_vpdpbusd256": M( - check_support("avx_vnni"), - ["--vnni-method", "vpdpbusd256", "--vnni-intrinsic"], - ), - "avx_vpdpwssd128": M( - check_support("avx_vnni"), - ["--vnni-method", "vpdpwssd128", "--vnni-intrinsic"], - ), - "avx_vpdpwssd256": M( - check_support("avx_vnni"), - ["--vnni-method", "vpdpwssd256", "--vnni-intrinsic"], - ), - } - - def enumerate(self) -> Iterable[str]: - return self.methods.keys() - - def available(self, method: str) -> bool: - return method in self.methods - - def cpu_check(self, method: str, hw: BaseHardware) -> bool: - return self.methods[method].check(hw) - - def parameters(self, method: str) -> list[str]: - for name, sm in self.methods.items(): - if name == method: - return sm.parameters - return [] - - def enumerate_cpu(self, hw: BaseHardware) -> Iterable[str]: - """list available methods for the cpu in hw""" - return filter(lambda m: self.cpu_check(m, hw), self.enumerate()) - - -class StressNGVNNI(StressNG): - def __init__( - self, engine_module: EngineModuleVNNI, parameters: BenchmarkParameters - ): - super().__init__(engine_module, parameters) - self.method = parameters.get_engine_module_parameter() - self.methods = engine_module.methods - if not self.methods.available(self.method): - raise LookupError(f"Unknown method {self.method}") - if not self.methods.cpu_check(self.method, parameters.get_hw()): - self.skip = True - - def run_cmd(self) -> list[str]: - skip = self.need_skip_because_version() - if skip: - return skip - return ( - super().run_cmd() - + [ - "--vnni", - str(self.parameters.get_engine_instances_count()), - ] - + self.methods.parameters(self.method) - ) - - def parse_cmd(self, stdout: bytes, stderr: bytes): - return super().parse_cmd(stdout, stderr) - - -class StressNGMemrate(StressNG): - """The StressNG Memrate memory stressor.""" - - def run_cmd(self) -> list[str]: - # TODO: handle get_pinned_cpu ; it does not necessarily make sense for this - # benchmark, but it could be revisited once we support pinning on multiple CPUs. - skip = self.need_skip_because_version() - if skip: - return skip - return super().run_cmd() + [ - "--memrate", - str(self.parameters.get_engine_instances_count()), - "--memrate-flush", - ] - - def empty_result(self): - ret = {} - for method in [ - "read1024", - "read128", - "read128pf", - "read16", - "read256", - "read32", - "read512", - "read64", - "read64pf", - "read8", - "write1024", - "write128", - "write128nt", - "write16", - "write16stod", - "write256", - "write32", - "write32nt", - "write32stow", - "write512", - "write64", - "write64nt", - "write64stoq", - "write8", - "write8stob", - ]: - ret[method] = { - "avg_speed": 0.0, - "sum_speed": 0.0, - "effective_runtime": 0.0, - } - ret["skipped"] = True - return ret - - def parse_cmd(self, stdout: bytes, stderr: bytes) -> dict[str, Any]: - if self.skip: - return self.parameters.get_result_format() | self.empty_result() - summary_parse = re.compile(r"memrate .*") - summary_parse_perf = re.compile( - r"memrate .* (?P[0-9\.]+) (?P[a-z0-9]+) MB per sec .*$" - ) - out = (stdout or stderr).splitlines() - - summary = [str(line) for line in out if summary_parse.search(str(line))] - - ret = {} - - for line in summary: - stats = self.stats_parse().search(line) - if stats: - ret["effective_runtime"] = float(stats["real_time"]) - matches = summary_parse_perf.search(line) - if matches is not None: - r = matches.groupdict() - test = r["test"] - ret[test] = { - "avg_speed": float(r["speed"]), - "sum_speed": float(r["speed"]) - * self.parameters.get_engine_instances_count(), - } # type: ignore[assignment] - return ret | self.parameters.get_result_format() diff --git a/hwbench/engines/stressng_cpu.py b/hwbench/engines/stressng_cpu.py new file mode 100644 index 0000000..a153e72 --- /dev/null +++ b/hwbench/engines/stressng_cpu.py @@ -0,0 +1,53 @@ +import os +import subprocess +from ..bench.parameters import BenchmarkParameters +from .stressng import EngineBase, EngineModulePinnable, StressNG + + +class StressNGCPU(StressNG): + """The StressNG CPU stressor.""" + + def run_cmd(self) -> list[str]: + # Let's build the command line to run the tool + return super().run_cmd() + [ + "--cpu", + str(self.parameters.get_engine_instances_count()), + "--cpu-method", + self.parameters.get_engine_module_parameter(), + ] + + +class EngineModuleCpu(EngineModulePinnable): + """This class implements the EngineModuleBase for StressNG""" + + def __init__(self, engine: EngineBase, engine_module_name: str): + super().__init__(engine, engine_module_name) + self.engine_module_name = engine_module_name + self.load_module_parameter() + + def list_module_parameters(self): + english_env = os.environ.copy() + english_env["LC_ALL"] = "C" + cmd_out = subprocess.run( + [self.engine.get_binary(), "--cpu-method", "list"], + capture_output=True, + env=english_env, + stdin=subprocess.DEVNULL, + ) + return (cmd_out.stdout or cmd_out.stderr).split(b":", 1) + + def load_module_parameter(self): + out = self.list_module_parameters() + methods = out[1].decode("utf-8").split() + methods.remove("all") + for method in methods: + self.add_module_parameter(method) + + def run_cmd(self, p: BenchmarkParameters): + return StressNGCPU(self, p).run_cmd() + + def run(self, p: BenchmarkParameters): + return StressNGCPU(self, p).run() + + def fully_skipped_job(self, p) -> bool: + return StressNGCPU(self, p).fully_skipped_job() diff --git a/hwbench/engines/stressng_memrate.py b/hwbench/engines/stressng_memrate.py new file mode 100644 index 0000000..5434eb9 --- /dev/null +++ b/hwbench/engines/stressng_memrate.py @@ -0,0 +1,103 @@ +import re +from typing import Any +from ..bench.parameters import BenchmarkParameters +from .stressng import EngineBase, EngineModulePinnable, StressNG + + +class StressNGMemrate(StressNG): + """The StressNG Memrate memory stressor.""" + + def run_cmd(self) -> list[str]: + # TODO: handle get_pinned_cpu ; it does not necessarily make sense for this + # benchmark, but it could be revisited once we support pinning on multiple CPUs. + skip = self.need_skip_because_version() + if skip: + return skip + return super().run_cmd() + [ + "--memrate", + str(self.parameters.get_engine_instances_count()), + "--memrate-flush", + ] + + def empty_result(self): + ret = {} + for method in [ + "read1024", + "read128", + "read128pf", + "read16", + "read256", + "read32", + "read512", + "read64", + "read64pf", + "read8", + "write1024", + "write128", + "write128nt", + "write16", + "write16stod", + "write256", + "write32", + "write32nt", + "write32stow", + "write512", + "write64", + "write64nt", + "write64stoq", + "write8", + "write8stob", + ]: + ret[method] = { + "avg_speed": 0.0, + "sum_speed": 0.0, + "effective_runtime": 0.0, + } + ret["skipped"] = True + return ret + + def parse_cmd(self, stdout: bytes, stderr: bytes) -> dict[str, Any]: + if self.skip: + return self.parameters.get_result_format() | self.empty_result() + summary_parse = re.compile(r"memrate .*") + summary_parse_perf = re.compile( + r"memrate .* (?P[0-9\.]+) (?P[a-z0-9]+) MB per sec .*$" + ) + out = (stdout or stderr).splitlines() + + summary = [str(line) for line in out if summary_parse.search(str(line))] + + ret = {} + + for line in summary: + stats = self.stats_parse().search(line) + if stats: + ret["effective_runtime"] = float(stats["real_time"]) + matches = summary_parse_perf.search(line) + if matches is not None: + r = matches.groupdict() + test = r["test"] + ret[test] = { + "avg_speed": float(r["speed"]), + "sum_speed": float(r["speed"]) + * self.parameters.get_engine_instances_count(), + } # type: ignore[assignment] + return ret | self.parameters.get_result_format() + + +class EngineModuleMemrate(EngineModulePinnable): + """This class implements the Memrate EngineModuleBase for StressNG""" + + def __init__(self, engine: EngineBase, engine_module_name: str): + super().__init__(engine, engine_module_name) + self.engine_module_name = engine_module_name + self.add_module_parameter("memrate") + + def run_cmd(self, p: BenchmarkParameters): + return StressNGMemrate(self, p).run_cmd() + + def run(self, p: BenchmarkParameters): + return StressNGMemrate(self, p).run() + + def fully_skipped_job(self, p) -> bool: + return StressNGMemrate(self, p).fully_skipped_job() diff --git a/hwbench/engines/stressng_qsort.py b/hwbench/engines/stressng_qsort.py new file mode 100644 index 0000000..02ac484 --- /dev/null +++ b/hwbench/engines/stressng_qsort.py @@ -0,0 +1,31 @@ +from ..bench.parameters import BenchmarkParameters +from .stressng import EngineBase, EngineModulePinnable, StressNG + + +class StressNGQsort(StressNG): + """The StressNG Qsort CPU stressor.""" + + def run_cmd(self) -> list[str]: + # Let's build the command line to run the tool + return super().run_cmd() + [ + "--qsort", + str(self.parameters.get_engine_instances_count()), + ] + + +class EngineModuleQsort(EngineModulePinnable): + """This class implements the Qsort EngineModuleBase for StressNG""" + + def __init__(self, engine: EngineBase, engine_module_name: str): + super().__init__(engine, engine_module_name) + self.engine_module_name = engine_module_name + self.add_module_parameter("qsort") + + def run_cmd(self, p: BenchmarkParameters): + return StressNGQsort(self, p).run_cmd() + + def run(self, p: BenchmarkParameters): + return StressNGQsort(self, p).run() + + def fully_skipped_job(self, p) -> bool: + return StressNGQsort(self, p).fully_skipped_job() diff --git a/hwbench/engines/stressng_stream.py b/hwbench/engines/stressng_stream.py new file mode 100644 index 0000000..cde3883 --- /dev/null +++ b/hwbench/engines/stressng_stream.py @@ -0,0 +1,146 @@ +import re +from typing import Optional, Any + +from ..bench.parameters import BenchmarkParameters +from .stressng import EngineBase, EngineModulePinnable, StressNG + + +class StressNGStream(StressNG): + """The StressNG STREAM memory stressor.""" + + def run_cmd(self) -> list[str]: + # TODO: handle get_pinned_cpu ; it does not necessarily make sense for this + # benchmark, but it could be revisited once we support pinning on multiple CPUs. + skip = self.need_skip_because_version() + if skip: + return skip + ret: list[str] = [ + self.engine_module.get_engine().get_binary(), + "--timeout", + str(self.parameters.get_runtime()), + "--metrics", + "--stream", + str(self.parameters.get_engine_instances_count()), + ] + + self.stream_l3_size: Optional[int] = None + if self.stream_l3_size is not None: + ret.extend(["--stream-l3-size", str(self.stream_l3_size)]) + return ret + + def empty_result(self): + detail_size = self.parameters.get_engine_instances_count() + return { + "detail": { + "read": [0] * detail_size, + "write": [0] * detail_size, + "Mflop/s": [0] * detail_size, + }, + "avg_read": 0.0, + "avg_write": 0.0, + "avg_Mflop/s": 0.0, + "avg_total": 0.0, + "sum_read": 0.0, + "sum_write": 0.0, + "sum_Mflop/s": 0.0, + "sum_total": 0.0, + "skipped": True, + "effective_runtime": 0.0, + } + + def parse_cmd(self, stdout: bytes, stderr: bytes) -> dict[str, Any]: + if self.skip: + return self.parameters.get_result_format() | self.empty_result() + detail_rate = re.compile(r"\] stream: memory rate: ") + summary_rate = re.compile(r"\] stream ") + detail_parse = re.compile( + r"memory rate: (?P[0-9\.]+) MB read/sec, " + r"(?P[0-9\.]+) MB write/sec, " + r"(?P[0-9\.]+) double precision Mflop/sec " + r"\(instance (?P[0-9]+)\)" + ) + summary_parse = re.compile( + r"stream\s+(?P[0-9\.]+) " + r"(?:(?:MB per sec memory)|(?:Mflop per sec \(double precision\))) " + r"(?Pread|write|compute) rate" + ) + + out = (stdout or stderr).splitlines() + + detail = [str(line) for line in out if detail_rate.search(str(line))] + summary = [str(line) for line in out if summary_rate.search(str(line))] + + detail_size = self.parameters.get_engine_instances_count() or len(detail) + + ret: dict[str, Any] = { + "detail": { + "read": [0] * detail_size, + "write": [0] * detail_size, + "Mflop/s": [0] * detail_size, + }, + "avg_read": 0.0, + "avg_write": 0.0, + "avg_Mflop/s": 0.0, + "avg_total": 0.0, + "sum_read": 0.0, + "sum_write": 0.0, + "sum_Mflop/s": 0.0, + "sum_total": 0.0, + "effective_runtime": 0.0, + } + + for line in detail: + matches = detail_parse.search(line) + if matches is not None: + r = matches.groupdict() + instance = int(r["instance"]) + ret["detail"]["read"][instance] = float(r["read"]) + ret["detail"]["write"][instance] = float(r["write"]) + ret["detail"]["Mflop/s"][instance] = float(r["flop"]) + ret["sum_read"] += float(r["read"]) + ret["sum_write"] += float(r["write"]) + ret["sum_Mflop/s"] += float(r["flop"]) + + for line in summary: + matches = summary_parse.search(line) + if matches is not None: + r = matches.groupdict() + source = r["source"] + if source == "read" or source == "write": + ret[f"avg_{source}"] = float(r["rate"]) + elif source == "compute": + ret["avg_Mflop/s"] = float(r["rate"]) + + stats = self.stats_parse().search(line) + if stats: + ret["effective_runtime"] = float(stats["real_time"]) + + # Let's build the grand total of read + write + ret["sum_total"] = ret["sum_read"] + ret["sum_write"] + ret["avg_total"] = ret["avg_read"] + ret["avg_write"] + + return ret | self.parameters.get_result_format() + + +class EngineModuleStream(EngineModulePinnable): + """This class implements the Stream EngineModuleBase for StressNG""" + + def __init__(self, engine: EngineBase, engine_module_name: str): + super().__init__(engine, engine_module_name) + self.engine_module_name = engine_module_name + self.add_module_parameter("stream") + + def run_cmd(self, p: BenchmarkParameters): + return StressNGStream(self, p).run_cmd() + + def run(self, p: BenchmarkParameters): + return StressNGStream(self, p).run() + + def validate_module_parameters(self, params: BenchmarkParameters): + msg = super().validate_module_parameters(params) + if params.get_runtime() < 5: + return f"{msg}; StressNGStream needs at least a 5s of run time" + return msg + + def fully_skipped_job(self, p) -> bool: + return StressNGStream(self, p).fully_skipped_job() diff --git a/hwbench/engines/stressng_vnni.py b/hwbench/engines/stressng_vnni.py new file mode 100644 index 0000000..d9dbf17 --- /dev/null +++ b/hwbench/engines/stressng_vnni.py @@ -0,0 +1,145 @@ +from collections.abc import Iterable +from typing import NamedTuple, Callable +from ..bench.parameters import BenchmarkParameters +from ..environment.hardware import BaseHardware +from .stressng import EngineBase, EngineModulePinnable, StressNG + + +class StressNGVNNIMethods: + class Method(NamedTuple): + check: Callable[[BaseHardware], bool] + parameters: list[str] + + def __init__(self): + M = StressNGVNNIMethods.Method + + def check_support(flag: str) -> Callable[[BaseHardware], bool]: + def check(hw: BaseHardware) -> bool: + return flag in hw.cpu_flags() + + return check + + # We don't enumerate with stress-ng --vnni-method list because we still need to + # be able to check for compatibility. + self.methods = { + # tests generic methods + "noavx_vpaddb": M(lambda _: True, ["--vnni-method", "vpaddb"]), + "noavx_vpdpbusd": M(lambda _: True, ["--vnni-method", "vpdpbusd"]), + "noavx_vpdpwssd": M(lambda _: True, ["--vnni-method", "vpdpwssd"]), + # avx512 bw, relatively common + "avx_vpaddb512": M( + check_support("avx512bw"), + ["--vnni-method", "vpaddb512", "--vnni-intrinsic"], + ), + # avx512 vnni, Xeon 3rd gen+ and Zen4 + + "avx_vpdpbusd512": M( + check_support("avx512_vnni"), + ["--vnni-method", "vpdpbusd512", "--vnni-intrinsic"], + ), + "avx_vpdpwssd512": M( + check_support("avx512_vnni"), + ["--vnni-method", "vpdpwssd512", "--vnni-intrinsic"], + ), + # avx vnni, Xeon 4th gen+) + "avx_vpaddb128": M( + check_support("avx_vnni"), + ["--vnni-method", "vpaddb128", "--vnni-intrinsic"], + ), + "avx_vpaddb256": M( + check_support("avx_vnni"), + ["--vnni-method", "vpaddb256", "--vnni-intrinsic"], + ), + "avx_vpdpbusd128": M( + check_support("avx_vnni"), + ["--vnni-method", "vpdpbusd128", "--vnni-intrinsic"], + ), + "avx_vpdpbusd256": M( + check_support("avx_vnni"), + ["--vnni-method", "vpdpbusd256", "--vnni-intrinsic"], + ), + "avx_vpdpwssd128": M( + check_support("avx_vnni"), + ["--vnni-method", "vpdpwssd128", "--vnni-intrinsic"], + ), + "avx_vpdpwssd256": M( + check_support("avx_vnni"), + ["--vnni-method", "vpdpwssd256", "--vnni-intrinsic"], + ), + } + + def enumerate(self) -> Iterable[str]: + return self.methods.keys() + + def available(self, method: str) -> bool: + return method in self.methods + + def cpu_check(self, method: str, hw: BaseHardware) -> bool: + return self.methods[method].check(hw) + + def parameters(self, method: str) -> list[str]: + for name, sm in self.methods.items(): + if name == method: + return sm.parameters + return [] + + def enumerate_cpu(self, hw: BaseHardware) -> Iterable[str]: + """list available methods for the cpu in hw""" + return filter(lambda m: self.cpu_check(m, hw), self.enumerate()) + + +class EngineModuleVNNI(EngineModulePinnable): + """This class implements the VNNI EngineModuleBase for StressNG""" + + def __init__(self, engine: EngineBase, engine_module_name: str): + super().__init__(engine, engine_module_name) + self.engine_module_name = engine_module_name + self.methods = StressNGVNNIMethods() + for method in self.methods.enumerate(): + self.add_module_parameter(method) + + def fully_skipped_job(self, p) -> bool: + return StressNGVNNI(self, p).fully_skipped_job() + + def run_cmd(self, p: BenchmarkParameters): + return StressNGVNNI(self, p).run_cmd() + + def run(self, p: BenchmarkParameters): + return StressNGVNNI(self, p).run() + + def validate_module_parameters(self, params: BenchmarkParameters): + msg = super().validate_module_parameters(params) + method = params.get_engine_module_parameter() + if not self.methods.available(method): + msg += f"Unknown method {method}\n" + if not self.methods.cpu_check(method, params.get_hw()): + print(f"WARNING: CPU does not support method {method}, perf will be 0") + return msg + + +class StressNGVNNI(StressNG): + def __init__( + self, engine_module: EngineModuleVNNI, parameters: BenchmarkParameters + ): + super().__init__(engine_module, parameters) + self.method = parameters.get_engine_module_parameter() + self.methods = engine_module.methods + if not self.methods.available(self.method): + raise LookupError(f"Unknown method {self.method}") + if not self.methods.cpu_check(self.method, parameters.get_hw()): + self.skip = True + + def run_cmd(self) -> list[str]: + skip = self.need_skip_because_version() + if skip: + return skip + return ( + super().run_cmd() + + [ + "--vnni", + str(self.parameters.get_engine_instances_count()), + ] + + self.methods.parameters(self.method) + ) + + def parse_cmd(self, stdout: bytes, stderr: bytes): + return super().parse_cmd(stdout, stderr) diff --git a/hwbench/engines/test_parse.py b/hwbench/engines/test_parse.py index 656df8f..5dfcd75 100644 --- a/hwbench/engines/test_parse.py +++ b/hwbench/engines/test_parse.py @@ -6,17 +6,10 @@ from ..bench.parameters import BenchmarkParameters from ..environment.mock import MockHardware from .stressng import Engine as StressNG -from .stressng import ( - StressNGQsort, - StressNGMemrate, - StressNGStream, - EngineModuleQsort, - EngineModuleMemrate, - EngineModuleStream, - EngineModuleVNNI, - StressNGVNNIMethods, - StressNGVNNI, -) +from .stressng_qsort import EngineModuleQsort, StressNGQsort +from .stressng_memrate import EngineModuleMemrate, StressNGMemrate +from .stressng_stream import EngineModuleStream, StressNGStream +from .stressng_vnni import EngineModuleVNNI, StressNGVNNIMethods, StressNGVNNI def mock_engine(version: str) -> StressNG: @@ -25,7 +18,7 @@ def mock_engine(version: str) -> StressNG: with patch("hwbench.utils.helpers.is_binary_available") as iba: iba.return_value = True with patch( - "hwbench.engines.stressng.EngineModuleCpu.list_module_parameters" + "hwbench.engines.stressng_cpu.EngineModuleCpu.list_module_parameters" ) as p: p.return_value = ( pathlib.Path(