From 27284672f970e79ce86d65127ad29c24948d46a0 Mon Sep 17 00:00:00 2001 From: David Butenhof Date: Wed, 21 Feb 2024 16:56:54 -0500 Subject: [PATCH 1/8] Track and report unpack performance This is a thought experiment, based off our earlier discussion of tracking an asynchronous unpacker. I thought it would be fun to see some information on tarball compression ratios, as well as tracking the min/max unpack time in accessible metadata rather than just in logs. I need to jog back to Horreum, but before diving back into the pool of Java, I took a recreational break... So I added a simple `server.unpack-perf` metadata, which is a JSON block like `{"min": , "max": , "count": }`, and then played with the report generator to get some statistics. The sample below is for a `runlocal`, with a few small-ish tarballs. The big catch in deploying this would be that none of the existing datasets will have `server.unpack-perf` until they're unpacked again, which definitely reduces the usefulness of this thought experiment. Nevertheless, I figured I might as well post it for consideration. ``` Cache report: 5 datasets currently unpacked, consuming 51.7 MB 8 datasets have been unpacked a total of 15 times The least recently used cache was referenced today, pbench-user-benchmark_example-vmstat_2018.10.24T14.38.18 The most recently used cache was referenced today, uperf_rhel8.1_4.18.0-107.el8_snap4_25gb_virt_2019.06.21T01.28.57 The smallest cache is 4.1 kB, nometadata The biggest cache is 19.6 MB, trafficgen_basic-forwarding-example_tg:trex-profile_pf:forwarding_test.json_ml:5_tt:bs__2019-08-27T14:58:38 The worst compression ratio is 22.156%, uperf_rhel8.1_4.18.0-107.el8_snap4_25gb_virt_2019.06.21T01.28.57 The best compression ratio is 96.834%, pbench-user-benchmark_example-vmstat_2018.10.24T14.38.18 The fastest cache unpack is 0.013 seconds, nometadata The slowest cache unpack is 0.078 seconds, trafficgen_basic-forwarding-example_tg:trex-profile_pf:forwarding_test.json_ml:5_tt:bs__2019-08-27T14:58:38 The fastest cache unpack streaming rate is 253.666 Mb/second, trafficgen_basic-forwarding-example_tg:trex-profile_pf:forwarding_test.json_ml:5_tt:bs__2019-08-27T14:58:38 The slowest cache unpack streaming rate is 0.133 Mb/second, nometadata ``` --- lib/pbench/cli/server/report.py | 160 +++++++++++++++--- lib/pbench/server/cache_manager.py | 62 ++++--- lib/pbench/server/database/models/datasets.py | 3 + 3 files changed, 176 insertions(+), 49 deletions(-) diff --git a/lib/pbench/cli/server/report.py b/lib/pbench/cli/server/report.py index 8702dcfc0d..45132532ba 100644 --- a/lib/pbench/cli/server/report.py +++ b/lib/pbench/cli/server/report.py @@ -23,6 +23,12 @@ # An arbitrary really big number GINORMOUS = 2**64 +# A similarly arbitrary big floating point number +GINORMOUS_FP = 1000000.0 + +# Number of bytes in a megabyte, coincidentally also a really big number +MEGABYTE_FP = 1000000.0 + class Detail: """Encapsulate generation of additional diagnostics""" @@ -217,6 +223,10 @@ def report_cache(tree: CacheManager): cached_size = 0 lacks_size = 0 bad_size = 0 + lacks_metrics = 0 + bad_metrics = 0 + unpacked_count = 0 + unpacked_times = 0 oldest_cache = time.time() * 2.0 # moderately distant future oldest_cache_name = None newest_cache = 0.0 # wayback machine @@ -225,11 +235,24 @@ def report_cache(tree: CacheManager): smallest_cache_name = None biggest_cache = 0 biggest_cache_name = None + smallest_compression = GINORMOUS + smallest_compression_name = None + biggest_compression = 0.0 + biggest_compression_name = None + fastest_unpack = GINORMOUS_FP + fastest_unpack_name = None + slowest_unpack = 0.0 + slowest_unpack_name = None + stream_unpack_skipped = 0 + fastest_stream_unpack = 0.0 + fastest_stream_unpack_name = None + slowest_stream_unpack = GINORMOUS_FP + slowest_stream_unpack_name = None last_ref_errors = 0 for tarball in tree.datasets.values(): watcher.update(f"({cached_count}) cache {tarball.name}") - if tarball.unpacked: + if tarball.cache: try: referenced = tarball.last_ref.stat().st_mtime except Exception as e: @@ -242,36 +265,86 @@ def report_cache(tree: CacheManager): if referenced > newest_cache: newest_cache = referenced newest_cache_name = tarball.name - cached_count += 1 - size = Metadata.getvalue(tarball.dataset, Metadata.SERVER_UNPACKED) - if not size: - detailer.error(f"{tarball.name} has no unpacked size") - lacks_size += 1 - elif not isinstance(size, int): - detailer.error( - f"{tarball.name} has non-integer unpacked size " - f"{size!r} ({type(size)})" - ) - bad_size += 1 - else: - if size < smallest_cache: - smallest_cache = size - smallest_cache_name = tarball.name - if size > biggest_cache: - biggest_cache = size - biggest_cache_name = tarball.name - cached_size += size + if tarball.unpacked: + cached_count += 1 + size = Metadata.getvalue(tarball.dataset, Metadata.SERVER_UNPACKED) + if not size: + detailer.error(f"{tarball.name} has no unpacked size") + lacks_size += 1 + elif not isinstance(size, int): + detailer.error( + f"{tarball.name} has non-integer unpacked size " + f"{size!r} ({type(size).__name__})" + ) + bad_size += 1 + else: + if size < smallest_cache: + smallest_cache = size + smallest_cache_name = tarball.name + if size > biggest_cache: + biggest_cache = size + biggest_cache_name = tarball.name + cached_size += size + + # Check compression ratios + tar_size = tarball.tarball_path.stat().st_size + ratio = float(size - tar_size) / float(size) + if ratio < smallest_compression: + smallest_compression = ratio + smallest_compression_name = tarball.name + if ratio > biggest_compression: + biggest_compression = ratio + biggest_compression_name = tarball.name + metrics = Metadata.getvalue(tarball.dataset, Metadata.SERVER_UNPACK_PERF) + if not metrics: + detailer.message(f"{tarball.name} has no unpack metrics") + lacks_metrics += 1 + elif not isinstance(metrics, dict) or {"min", "max", "count"} - set( + metrics.keys() + ): + detailer.error( + f"{tarball.name} has bad unpack metrics " + f"{metrics!r} ({type(metrics).__name__})" + ) + bad_metrics += 1 + else: + unpacked_count += 1 + unpacked_times += metrics["count"] + if metrics["min"] < fastest_unpack: + fastest_unpack = metrics["min"] + fastest_unpack_name = tarball.name + if metrics["max"] > slowest_unpack: + slowest_unpack = metrics["max"] + slowest_unpack_name = tarball.name + if size and metrics: + stream_fast = size / metrics["min"] / MEGABYTE_FP + stream_slow = size / metrics["max"] / MEGABYTE_FP + if stream_fast > fastest_stream_unpack: + fastest_stream_unpack = stream_fast + fastest_stream_unpack_name = tarball.name + if stream_slow < slowest_stream_unpack: + slowest_stream_unpack = stream_slow + slowest_stream_unpack_name = tarball.name + else: + stream_unpack_skipped += 1 oldest = datetime.datetime.fromtimestamp(oldest_cache, datetime.timezone.utc) newest = datetime.datetime.fromtimestamp(newest_cache, datetime.timezone.utc) click.echo("Cache report:") click.echo( - f" {cached_count:,d} datasets consuming " + f" {cached_count:,d} datasets currently unpacked, consuming " f"{humanize.naturalsize(cached_size)}" ) click.echo( - f" {lacks_size:,d} datasets have never been unpacked, " - f"{last_ref_errors:,d} are missing reference timestamps, " - f"{bad_size:,d} have bad size metadata" + f" {unpacked_count:,d} datasets have been unpacked a total of " + f"{unpacked_times:,d} times" + ) + click.echo( + " The least recently used cache was referenced " + f"{humanize.naturaldate(oldest)}, {oldest_cache_name}" + ) + click.echo( + " The most recently used cache was referenced " + f"{humanize.naturaldate(newest)}, {newest_cache_name}" ) click.echo( f" The smallest cache is {humanize.naturalsize(smallest_cache)}, " @@ -282,13 +355,44 @@ def report_cache(tree: CacheManager): f"{biggest_cache_name}" ) click.echo( - " The least recently used cache was referenced " - f"{humanize.naturaldate(oldest)}, {oldest_cache_name}" + f" The worst compression ratio is {smallest_compression:.3%}, " + f"{smallest_compression_name}" ) click.echo( - " The most recently used cache was referenced " - f"{humanize.naturaldate(newest)}, {newest_cache_name}" + f" The best compression ratio is {biggest_compression:.3%}, " + f"{biggest_compression_name}" + ) + click.echo( + f" The fastest cache unpack is {fastest_unpack:.3f} seconds, " + f"{fastest_unpack_name}" + ) + click.echo( + f" The slowest cache unpack is {slowest_unpack:.3f} seconds, " + f"{slowest_unpack_name}" ) + click.echo( + f" The fastest cache unpack streaming rate is {fastest_stream_unpack:.3f} Mb/second, " + f"{fastest_stream_unpack_name}" + ) + click.echo( + f" The slowest cache unpack streaming rate is {slowest_stream_unpack:.3f} Mb/second, " + f"{slowest_stream_unpack_name}" + ) + if lacks_size or last_ref_errors or bad_size or verifier.verify: + click.echo( + f" {lacks_size:,d} datasets have no unpacked size, " + f"{last_ref_errors:,d} are missing reference timestamps, " + f"{bad_size:,d} have bad size metadata" + ) + if lacks_metrics or bad_metrics or verifier.verify: + click.echo( + f" {lacks_metrics:,d} datasets are missing unpack metric data, " + f"{bad_metrics} have bad unpack metric data" + ) + if stream_unpack_skipped or verifier.verify: + click.echo( + f" Missing unpack performance data for {stream_unpack_skipped:,d} datasets" + ) def report_sql(): diff --git a/lib/pbench/server/cache_manager.py b/lib/pbench/server/cache_manager.py index 2c47401cc8..7d292692b3 100644 --- a/lib/pbench/server/cache_manager.py +++ b/lib/pbench/server/cache_manager.py @@ -1034,7 +1034,8 @@ def get_unpacked_size(self) -> int: Once we've unpacked it once, the size is tracked in the metadata key 'server.unpacked'. If we haven't yet unpacked it, and the dataset's - metadata.log provides a 'raw_size", use that as an estimate + metadata.log provides "run.raw_size", use that as an estimate (the + accuracy depends on relative block size compared to the server). Returns: unpacked size of the tarball, or 0 if unknown @@ -1133,27 +1134,46 @@ def get_results(self, lock: LockManager) -> Path: uend - ustart, ) - self.last_ref.touch(exist_ok=True) - - # If we have a Dataset, and haven't already done this, compute the - # unpacked size and record it in metadata so we can use it later. - ssize = time.time() - if self.dataset and not Metadata.getvalue( - self.dataset, Metadata.SERVER_UNPACKED - ): - try: - process = subprocess.run( - ["du", "-s", "-B1", str(self.unpacked)], - capture_output=True, - text=True, + # If we have a Dataset, and haven't already done this, compute the + # unpacked size and record it in metadata so we can use it later. + ssize = time.time() + if self.dataset: + if not Metadata.getvalue(self.dataset, Metadata.SERVER_UNPACKED): + try: + process = subprocess.run( + ["du", "-s", "-B1", str(self.unpacked)], + capture_output=True, + text=True, + ) + if process.returncode == 0: + size = int(process.stdout.split("\t", maxsplit=1)[0]) + self.unpacked_size = size + Metadata.setvalue( + self.dataset, Metadata.SERVER_UNPACKED, size + ) + except Exception as e: + self.logger.warning("usage check failed: {}", e) + + # Update the time-to-unpack statistic. If the metadata doesn't exist, + # or somehow isn't a dict (JSONOBJECT), create a new metric: otherwise + # update the existing metric. + unpack_time = uend - ustart + metric: JSONOBJECT = Metadata.getvalue( + self.dataset, Metadata.SERVER_UNPACK_PERF ) - if process.returncode == 0: - size = int(process.stdout.split("\t", maxsplit=1)[0]) - self.unpacked_size = size - Metadata.setvalue(self.dataset, Metadata.SERVER_UNPACKED, size) - except Exception as e: - self.logger.warning("usage check failed: {}", e) - self.logger.info("{}: size update {:.3f}", self.name, time.time() - ssize) + if not isinstance(metric, dict): + metric = {"min": unpack_time, "max": unpack_time, "count": 1} + else: + # We max out at about 12 hours here, which seems safely excessive! + metric["min"] = min(metric.get("min", 1000000.0), uend - ustart) + metric["max"] = max(metric.get("max", 0), uend - ustart) + metric["count"] = metric.get("count", 0) + 1 + Metadata.setvalue(self.dataset, Metadata.SERVER_UNPACK_PERF, metric) + self.logger.info( + "{}: size update {:.3f}", self.name, time.time() - ssize + ) + + self.last_ref.touch(exist_ok=True) return self.unpacked def cache_delete(self): diff --git a/lib/pbench/server/database/models/datasets.py b/lib/pbench/server/database/models/datasets.py index d3d18785b5..c999c2be97 100644 --- a/lib/pbench/server/database/models/datasets.py +++ b/lib/pbench/server/database/models/datasets.py @@ -674,6 +674,9 @@ class Metadata(Database.Base): # UNPACKED records the size of the unpacked directory tree. SERVER_UNPACKED = "server.unpacked-size" + # UNPACK performance: {"best": , "worst": , "count": } + SERVER_UNPACK_PERF = "server.unpack-perf" + # TARBALL_PATH access path of the dataset tarball. (E.g., we could use this # to record an S3 object store key.) NOT YET USED. # From f0c491b99fee32b4783fac357405f07c97a20692 Mon Sep 17 00:00:00 2001 From: David Butenhof Date: Thu, 22 Feb 2024 20:05:53 -0500 Subject: [PATCH 2/8] Add a "reclaim" tweak The `pbench-tree-manage` utility supports a deep `ARCHIVE` tree display and also managed the periodic background cache reclamation. Curious after we saw a failed reclaim, I wanted to play with it a bit and realized that it does a full (`search`) discovery unconditionally. First, even with `--display` (which now that we have a proper report generator is rarely necessary) we probably can use the faster SQL discovery most of the time, although I added an option to select the slower `--search` discovery. More importantly, though, the cache reclaimer doesn't need a fully discovered cache manager since it takes the short-cut of examining the `/srv/pbench/cache` tree directly: so we can move the discovery into the `--display` path. --- lib/pbench/cli/server/tree_manage.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/lib/pbench/cli/server/tree_manage.py b/lib/pbench/cli/server/tree_manage.py index 4c29ae905d..bbedaa3635 100644 --- a/lib/pbench/cli/server/tree_manage.py +++ b/lib/pbench/cli/server/tree_manage.py @@ -57,9 +57,18 @@ def print_tree(tree: CacheManager): is_flag=False, help="Reclaim cached data to maintain specified free space", ) +@click.option( + "--search", + is_flag=True, + help="Do an exhaustive search of ARCHIVE tree rather than using SQL", +) @common_options def tree_manage( - context: object, display: bool, reclaim_percent: float, reclaim_size: str + context: object, + display: bool, + reclaim_percent: float, + reclaim_size: str, + search: bool, ): """ Discover, display, and manipulate the on-disk representation of controllers @@ -76,14 +85,15 @@ def tree_manage( lifetime: Number of hours to retain unused cache before reclaim reclaim-percent: Reclaim cached data to free specified % on drive reclaim-size: Reclame cached data to free specified size on drive + search: Discover cache with a full disk search """ logger = None try: config = config_setup(context) logger = get_pbench_logger("pbench-tree-manager", config) cache_m = CacheManager(config, logger) - cache_m.full_discovery() if display: + cache_m.full_discovery(search=search) print_tree(cache_m) rv = 0 if reclaim_percent or reclaim_size: From 8b070c75fda960a913aea5dddb22ea124adbecd8 Mon Sep 17 00:00:00 2001 From: David Butenhof Date: Thu, 29 Feb 2024 16:15:39 -0500 Subject: [PATCH 3/8] Review comments Also a few minor corrections identified during ops review. --- lib/pbench/cli/server/__init__.py | 122 ++++++++ lib/pbench/cli/server/report.py | 275 +++++------------- lib/pbench/cli/server/tree_manage.py | 38 ++- .../server/api/resources/intake_base.py | 6 +- lib/pbench/server/cache_manager.py | 24 +- lib/pbench/server/database/models/datasets.py | 2 +- lib/pbench/server/indexing_tarballs.py | 24 +- 7 files changed, 266 insertions(+), 225 deletions(-) diff --git a/lib/pbench/cli/server/__init__.py b/lib/pbench/cli/server/__init__.py index 587b5edc28..fef103774c 100644 --- a/lib/pbench/cli/server/__init__.py +++ b/lib/pbench/cli/server/__init__.py @@ -1,7 +1,129 @@ +import datetime +from threading import Thread +import time + +import click + from pbench.server import PbenchServerConfig from pbench.server.database import init_db +class Detail: + """Encapsulate generation of additional diagnostics""" + + def __init__(self, detail: bool = False, errors: bool = False): + """Initialize the object. + + Args: + detail: True if detailed messages should be generated + errors: True if individual file errors should be reported + """ + self.detail = detail + self.errors = errors + + def __bool__(self) -> bool: + """Report whether detailed messages are enabled + + Returns: + True if details are enabled + """ + return self.detail + + def error(self, message: str): + """Write a message if details are enabled. + + Args: + message: Detail string + """ + if self.errors: + click.secho(f"|| {message}", fg="red") + + def message(self, message: str): + """Write a message if details are enabled. + + Args: + message: Detail string + """ + if self.detail: + click.echo(f"|| {message}") + + +class Verify: + """Encapsulate -v status messages.""" + + def __init__(self, verify: bool): + """Initialize the object. + + Args: + verify: True to write status messages. + """ + self.verify = verify + + def __bool__(self) -> bool: + """Report whether verification is enabled. + + Returns: + True if verification is enabled. + """ + return self.verify + + def status(self, message: str): + """Write a message if verification is enabled. + + Args: + message: status string + """ + if self.verify: + ts = datetime.datetime.now().astimezone() + click.secho(f"({ts:%H:%M:%S}) {message}", fg="green") + + +class Watch: + """Encapsulate a periodic status update. + + The active message can be updated at will; a background thread will + periodically print the most recent status. + """ + + def __init__(self, interval: float): + """Initialize the object. + + Args: + interval: interval in seconds for status updates + """ + self.start = time.time() + self.interval = interval + self.status = "starting" + if interval: + self.thread = Thread(target=self.watcher) + self.thread.setDaemon(True) + self.thread.start() + + def update(self, status: str): + """Update status if appropriate. + + Update the message to be printed at the next interval, if progress + reporting is enabled. + + Args: + status: status string + """ + self.status = status + + def watcher(self): + """A worker thread to periodically write status messages.""" + + while True: + time.sleep(self.interval) + now = time.time() + delta = int(now - self.start) + hours, remainder = divmod(delta, 3600) + minutes, seconds = divmod(remainder, 60) + click.secho( + f"[{hours:02d}:{minutes:02d}:{seconds:02d}] {self.status}", fg="cyan" + ) + + def config_setup(context: object) -> PbenchServerConfig: config = PbenchServerConfig.create(context.config) # We're going to need the DB to track dataset state, so setup DB access. diff --git a/lib/pbench/cli/server/report.py b/lib/pbench/cli/server/report.py index 45132532ba..1654210e78 100644 --- a/lib/pbench/cli/server/report.py +++ b/lib/pbench/cli/server/report.py @@ -1,16 +1,16 @@ from collections import defaultdict import datetime import re -from threading import Thread +import shutil import time -from typing import Optional +from typing import Optional, Union import click import humanize from sqlalchemy import inspect, select, text from pbench.cli import pass_cli_context -from pbench.cli.server import config_setup +from pbench.cli.server import config_setup, Detail, Verify, Watch from pbench.cli.server.options import common_options from pbench.client.types import Dataset from pbench.common.logger import get_pbench_logger @@ -29,128 +29,35 @@ # Number of bytes in a megabyte, coincidentally also a really big number MEGABYTE_FP = 1000000.0 - -class Detail: - """Encapsulate generation of additional diagnostics""" - - def __init__(self, detail: bool = False, errors: bool = False): - """Initialize the object. - - Args: - detail: True if detailed messages should be generated - errors: True if individual file errors should be reported - """ - self.detail = detail - self.errors = errors - - def __bool__(self) -> bool: - """Report whether detailed messages are enabled - - Returns: - True if details are enabled - """ - return self.detail - - def error(self, message: str): - """Write a message if details are enabled. - - Args: - message: Detail string - """ - if self.errors: - click.secho(f"|| {message}", fg="red") - - def message(self, message: str): - """Write a message if details are enabled. - - Args: - message: Detail string - """ - if self.detail: - click.echo(f"|| {message}") - - -class Verify: - """Encapsulate -v status messages.""" - - def __init__(self, verify: bool): - """Initialize the object. - - Args: - verify: True to write status messages. - """ - self.verify = verify - - def __bool__(self) -> bool: - """Report whether verification is enabled. - - Returns: - True if verification is enabled. - """ - return self.verify - - def status(self, message: str): - """Write a message if verification is enabled. - - Args: - message: status string - """ - if self.verify: - ts = datetime.datetime.now().astimezone() - click.secho(f"({ts:%H:%M:%S}) {message}", fg="green") - - -class Watch: - """Encapsulate a periodic status update. - - The active message can be updated at will; a background thread will - periodically print the most recent status. - """ - - def __init__(self, interval: float): - """Initialize the object. - - Args: - interval: interval in seconds for status updates - """ - self.start = time.time() - self.interval = interval - self.status = "starting" - if interval: - self.thread = Thread(target=self.watcher) - self.thread.setDaemon(True) - self.thread.start() - - def update(self, status: str): - """Update status if appropriate. - - Update the message to be printed at the next interval, if progress - reporting is enabled. - - Args: - status: status string - """ - self.status = status - - def watcher(self): - """A worker thread to periodically write status messages.""" - - while True: - time.sleep(self.interval) - now = time.time() - delta = int(now - self.start) - hours, remainder = divmod(delta, 3600) - minutes, seconds = divmod(remainder, 60) - click.secho( - f"[{hours:02d}:{minutes:02d}:{seconds:02d}] {self.status}", fg="cyan" - ) - - detailer: Optional[Detail] = None watcher: Optional[Watch] = None verifier: Optional[Verify] = None +class Comparator: + def __init__(self, name: str, really_big: Union[int, float] = GINORMOUS): + self.name = name + self.min = really_big + self.min_name = None + self.max = 0 + self.max_name = None + + def add( + self, + value: Union[int, float], + name: str, + max: Optional[Union[int, float]] = None, + ): + minv = value + maxv = max if max else value + if minv < self.min: + self.min = minv + self.min_name = name + elif maxv > self.max: + self.max = maxv + self.max_name = name + + def report_archive(tree: CacheManager): """Report archive statistics. @@ -161,32 +68,29 @@ def report_archive(tree: CacheManager): watcher.update("inspecting archive") tarball_count = len(tree.datasets) tarball_size = 0 - smallest_tarball = GINORMOUS - smallest_tarball_name = None - biggest_tarball = 0 - biggest_tarball_name = None + tcomp = Comparator("tarball") + usage = shutil.disk_usage(tree.archive_root) for tarball in tree.datasets.values(): watcher.update(f"({tarball_count}) archive {tarball.name}") size = tarball.tarball_path.stat().st_size tarball_size += size - if size < smallest_tarball: - smallest_tarball = size - smallest_tarball_name = tarball.name - if size > biggest_tarball: - biggest_tarball = size - biggest_tarball_name = tarball.name + tcomp.add(size, tarball.name) click.echo("Archive report:") + click.echo( + f" ARCHIVE ({tree.archive_root}): {humanize.naturalsize(usage.total)}: {humanize.naturalsize(usage.used)} " + f"used, {humanize.naturalsize(usage.free)}" + ) click.echo( f" {tarball_count:,d} tarballs consuming {humanize.naturalsize(tarball_size)}" ) click.echo( - f" The smallest tarball is {humanize.naturalsize(smallest_tarball)}, " - f"{smallest_tarball_name}" + f" The smallest tarball is {humanize.naturalsize(tcomp.min)}, " + f"{tcomp.min_name}" ) click.echo( - f" The biggest tarball is {humanize.naturalsize(biggest_tarball)}, " - f"{biggest_tarball_name}" + f" The biggest tarball is {humanize.naturalsize(tcomp.max)}, " + f"{tcomp.max_name}" ) @@ -200,12 +104,17 @@ def report_backup(tree: CacheManager): watcher.update("inspecting backups") backup_count = 0 backup_size = 0 + usage = shutil.disk_usage(tree.backup_root) for tarball in tree.backup_root.glob("**/*.tar.xz"): watcher.update(f"({backup_count}) backup {Dataset.stem(tarball)}") backup_count += 1 backup_size += tarball.stat().st_size click.echo("Backup report:") + click.echo( + f" BACKUP ({tree.backup_root}): {humanize.naturalsize(usage.total)}: {humanize.naturalsize(usage.used)} " + f"used, {humanize.naturalsize(usage.free)}" + ) click.echo( f" {backup_count:,d} tarballs consuming {humanize.naturalsize(backup_size)}" ) @@ -227,28 +136,13 @@ def report_cache(tree: CacheManager): bad_metrics = 0 unpacked_count = 0 unpacked_times = 0 - oldest_cache = time.time() * 2.0 # moderately distant future - oldest_cache_name = None - newest_cache = 0.0 # wayback machine - newest_cache_name = None - smallest_cache = GINORMOUS - smallest_cache_name = None - biggest_cache = 0 - biggest_cache_name = None - smallest_compression = GINORMOUS - smallest_compression_name = None - biggest_compression = 0.0 - biggest_compression_name = None - fastest_unpack = GINORMOUS_FP - fastest_unpack_name = None - slowest_unpack = 0.0 - slowest_unpack_name = None stream_unpack_skipped = 0 - fastest_stream_unpack = 0.0 - fastest_stream_unpack_name = None - slowest_stream_unpack = GINORMOUS_FP - slowest_stream_unpack_name = None last_ref_errors = 0 + agecomp = Comparator("age", really_big=time.time() * 2.0) + sizecomp = Comparator("size") + compcomp = Comparator("compression") + speedcomp = Comparator("speed", really_big=GINORMOUS_FP) + streamcomp = Comparator("streaming", really_big=GINORMOUS_FP) for tarball in tree.datasets.values(): watcher.update(f"({cached_count}) cache {tarball.name}") @@ -259,12 +153,7 @@ def report_cache(tree: CacheManager): detailer.error(f"{tarball.name} last ref access: {str(e)!r}") last_ref_errors += 1 else: - if referenced < oldest_cache: - oldest_cache = referenced - oldest_cache_name = tarball.name - if referenced > newest_cache: - newest_cache = referenced - newest_cache_name = tarball.name + agecomp.add(referenced, tarball.name) if tarball.unpacked: cached_count += 1 size = Metadata.getvalue(tarball.dataset, Metadata.SERVER_UNPACKED) @@ -278,23 +167,13 @@ def report_cache(tree: CacheManager): ) bad_size += 1 else: - if size < smallest_cache: - smallest_cache = size - smallest_cache_name = tarball.name - if size > biggest_cache: - biggest_cache = size - biggest_cache_name = tarball.name + sizecomp.add(size, tarball.name) cached_size += size # Check compression ratios tar_size = tarball.tarball_path.stat().st_size ratio = float(size - tar_size) / float(size) - if ratio < smallest_compression: - smallest_compression = ratio - smallest_compression_name = tarball.name - if ratio > biggest_compression: - biggest_compression = ratio - biggest_compression_name = tarball.name + compcomp.add(ratio, tarball.name) metrics = Metadata.getvalue(tarball.dataset, Metadata.SERVER_UNPACK_PERF) if not metrics: detailer.message(f"{tarball.name} has no unpack metrics") @@ -310,25 +189,15 @@ def report_cache(tree: CacheManager): else: unpacked_count += 1 unpacked_times += metrics["count"] - if metrics["min"] < fastest_unpack: - fastest_unpack = metrics["min"] - fastest_unpack_name = tarball.name - if metrics["max"] > slowest_unpack: - slowest_unpack = metrics["max"] - slowest_unpack_name = tarball.name + speedcomp.add(metrics["min"], tarball.name, metrics["max"]) if size and metrics: stream_fast = size / metrics["min"] / MEGABYTE_FP stream_slow = size / metrics["max"] / MEGABYTE_FP - if stream_fast > fastest_stream_unpack: - fastest_stream_unpack = stream_fast - fastest_stream_unpack_name = tarball.name - if stream_slow < slowest_stream_unpack: - slowest_stream_unpack = stream_slow - slowest_stream_unpack_name = tarball.name + streamcomp.add(stream_slow, tarball.name, stream_fast) else: stream_unpack_skipped += 1 - oldest = datetime.datetime.fromtimestamp(oldest_cache, datetime.timezone.utc) - newest = datetime.datetime.fromtimestamp(newest_cache, datetime.timezone.utc) + oldest = datetime.datetime.fromtimestamp(agecomp.min, datetime.timezone.utc) + newest = datetime.datetime.fromtimestamp(agecomp.max, datetime.timezone.utc) click.echo("Cache report:") click.echo( f" {cached_count:,d} datasets currently unpacked, consuming " @@ -340,43 +209,41 @@ def report_cache(tree: CacheManager): ) click.echo( " The least recently used cache was referenced " - f"{humanize.naturaldate(oldest)}, {oldest_cache_name}" + f"{humanize.naturaldate(oldest)}, {agecomp.min_name}" ) click.echo( " The most recently used cache was referenced " - f"{humanize.naturaldate(newest)}, {newest_cache_name}" + f"{humanize.naturaldate(newest)}, {agecomp.max_name}" ) click.echo( - f" The smallest cache is {humanize.naturalsize(smallest_cache)}, " - f"{smallest_cache_name}" + f" The smallest cache is {humanize.naturalsize(sizecomp.min)}, " + f"{sizecomp.min_name}" ) click.echo( - f" The biggest cache is {humanize.naturalsize(biggest_cache)}, " - f"{biggest_cache_name}" + f" The biggest cache is {humanize.naturalsize(sizecomp.max)}, " + f"{sizecomp.max_name}" ) click.echo( - f" The worst compression ratio is {smallest_compression:.3%}, " - f"{smallest_compression_name}" + f" The worst compression ratio is {compcomp.min:.3%}, " f"{compcomp.min_name}" ) click.echo( - f" The best compression ratio is {biggest_compression:.3%}, " - f"{biggest_compression_name}" + f" The best compression ratio is {compcomp.max:.3%}, " f"{compcomp.max_name}" ) click.echo( - f" The fastest cache unpack is {fastest_unpack:.3f} seconds, " - f"{fastest_unpack_name}" + f" The fastest cache unpack is {speedcomp.min:.3f} seconds, " + f"{speedcomp.min_name}" ) click.echo( - f" The slowest cache unpack is {slowest_unpack:.3f} seconds, " - f"{slowest_unpack_name}" + f" The slowest cache unpack is {speedcomp.max:.3f} seconds, " + f"{speedcomp.max_name}" ) click.echo( - f" The fastest cache unpack streaming rate is {fastest_stream_unpack:.3f} Mb/second, " - f"{fastest_stream_unpack_name}" + f" The fastest cache unpack streaming rate is {streamcomp.max:.3f} Mb/second, " + f"{streamcomp.max_name}" ) click.echo( - f" The slowest cache unpack streaming rate is {slowest_stream_unpack:.3f} Mb/second, " - f"{slowest_stream_unpack_name}" + f" The slowest cache unpack streaming rate is {streamcomp.min:.3f} Mb/second, " + f"{streamcomp.min_name}" ) if lacks_size or last_ref_errors or bad_size or verifier.verify: click.echo( diff --git a/lib/pbench/cli/server/tree_manage.py b/lib/pbench/cli/server/tree_manage.py index bbedaa3635..e419018f86 100644 --- a/lib/pbench/cli/server/tree_manage.py +++ b/lib/pbench/cli/server/tree_manage.py @@ -1,15 +1,20 @@ import datetime +from typing import Optional import click import humanfriendly from pbench.cli import pass_cli_context -from pbench.cli.server import config_setup +from pbench.cli.server import config_setup, Detail, Verify, Watch from pbench.cli.server.options import common_options from pbench.common.logger import get_pbench_logger from pbench.server import BadConfig from pbench.server.cache_manager import CacheManager +detailer: Optional[Detail] = None +watcher: Optional[Watch] = None +verifier: Optional[Verify] = None + def print_tree(tree: CacheManager): """Print basic information about the cache @@ -41,9 +46,19 @@ def print_tree(tree: CacheManager): @click.command(name="pbench-tree-manager") @pass_cli_context +@click.option( + "--detail", + "-d", + default=False, + is_flag=True, + help="Provide extra diagnostic information", +) @click.option( "--display", default=False, is_flag=True, help="Display the full tree on completion" ) +@click.option( + "--progress", "-p", type=float, default=0.0, help="Show periodic progress messages" +) @click.option( "--reclaim-percent", show_default=True, @@ -62,13 +77,19 @@ def print_tree(tree: CacheManager): is_flag=True, help="Do an exhaustive search of ARCHIVE tree rather than using SQL", ) +@click.option( + "--verify", "-v", default=False, is_flag=True, help="Display intermediate messages" +) @common_options def tree_manage( context: object, + detail: bool, display: bool, + progress: float, reclaim_percent: float, reclaim_size: str, search: bool, + verify: bool, ): """ Discover, display, and manipulate the on-disk representation of controllers @@ -88,18 +109,33 @@ def tree_manage( search: Discover cache with a full disk search """ logger = None + + global detailer, verifier, watcher + detailer = Detail(detail, False) + verifier = Verify(verify) + watcher = Watch(progress) + try: config = config_setup(context) logger = get_pbench_logger("pbench-tree-manager", config) cache_m = CacheManager(config, logger) if display: + verifier.status("starting discovery") + watcher.update("discovering cache") cache_m.full_discovery(search=search) + verifier.status("finished discovery") + watcher.update("building report") print_tree(cache_m) rv = 0 if reclaim_percent or reclaim_size: + verifier.status("starting reclaim") + watcher.update("reclaiming") target_size = humanfriendly.parse_size(reclaim_size) if reclaim_size else 0 target_pct = reclaim_percent if reclaim_percent else 20.0 outcome = cache_m.reclaim_cache(goal_pct=target_pct, goal_bytes=target_size) + verifier.status( + f"finished reclaiming: goal {''if outcome else 'not '}achieved" + ) rv = 0 if outcome else 1 except Exception as exc: if logger: diff --git a/lib/pbench/server/api/resources/intake_base.py b/lib/pbench/server/api/resources/intake_base.py index bd53b846cc..e782023735 100644 --- a/lib/pbench/server/api/resources/intake_base.py +++ b/lib/pbench/server/api/resources/intake_base.py @@ -396,7 +396,7 @@ def _intake( except APIAbort: raise # Propagate an APIAbort exception to the outer block except Exception as e: - raise APIInternalError("Unable to create dataset") from e + raise APIInternalError(f"Unable to create dataset: {str(e)!r}") from e recovery.add(dataset.delete) @@ -452,11 +452,11 @@ def _intake( ) raise APIAbort(HTTPStatus.INSUFFICIENT_STORAGE, "Out of space") raise APIInternalError( - f"Unexpected error encountered during file upload: {str(exc)!r} " + f"Unexpected error encountered during file upload: {str(exc)!r}" ) from exc except Exception as e: raise APIInternalError( - "Unexpected error encountered during file upload: {str(e)!r}" + f"Unexpected error encountered during file upload: {str(e)!r}" ) from e if bytes_received != stream.length: diff --git a/lib/pbench/server/cache_manager.py b/lib/pbench/server/cache_manager.py index 7d292692b3..e02c67c10b 100644 --- a/lib/pbench/server/cache_manager.py +++ b/lib/pbench/server/cache_manager.py @@ -1164,9 +1164,9 @@ def get_results(self, lock: LockManager) -> Path: if not isinstance(metric, dict): metric = {"min": unpack_time, "max": unpack_time, "count": 1} else: - # We max out at about 12 hours here, which seems safely excessive! - metric["min"] = min(metric.get("min", 1000000.0), uend - ustart) - metric["max"] = max(metric.get("max", 0), uend - ustart) + # We max out at about 12 days here, which seems safely excessive! + metric["min"] = min(metric.get("min", 1000000.0), unpack_time) + metric["max"] = max(metric.get("max", 0), unpack_time) metric["count"] = metric.get("count", 0) + 1 Metadata.setvalue(self.dataset, Metadata.SERVER_UNPACK_PERF, metric) self.logger.info( @@ -1202,7 +1202,18 @@ def delete(self): We'll log errors in deletion, but "succeed" and clear the links to both files. There's nothing more we can do. """ - self.cache_delete() + + # NOTE: this is similar to cache_delete above, but doesn't re-raise and + # operates on the root cache directory rather than unpack. + self.cachemap = None + if self.cache: + try: + shutil.rmtree(self.cache) + except Exception as e: + self.logger.error("cache delete for {} failed with {}", self.name, e) + + # Remove the isolator directory with the tarball and MD5 files; or if + # this is a pre-isolator tarball, unlink the MD5 and tarball. if self.isolator and self.isolator.exists(): try: shutil.rmtree(self.isolator) @@ -1893,6 +1904,7 @@ def reached_goal() -> GoalCheck: # our goals. candidates = sorted(candidates, key=lambda c: c.last_ref) has_cache = len(candidates) + goal_check = reached_goal() for candidate in candidates: name = candidate.cache.name cache_d = candidate.cache.parent @@ -1923,7 +1935,8 @@ def reached_goal() -> GoalCheck: error = e else: reclaimed += 1 - if reached_goal().reached: + goal_check = reached_goal() + if goal_check.reached: break except OSError as e: if e.errno in (errno.EAGAIN, errno.EACCES): @@ -1939,7 +1952,6 @@ def reached_goal() -> GoalCheck: error = e if error: self.logger.error("RECLAIM: {} failed with '{}'", name, error) - goal_check = reached_goal() free_pct = goal_check.usage.free * 100.0 / goal_check.usage.total self.logger.info( "RECLAIM {} (goal {}%, {}): {} datasets, " diff --git a/lib/pbench/server/database/models/datasets.py b/lib/pbench/server/database/models/datasets.py index c999c2be97..66ce3d5633 100644 --- a/lib/pbench/server/database/models/datasets.py +++ b/lib/pbench/server/database/models/datasets.py @@ -674,7 +674,7 @@ class Metadata(Database.Base): # UNPACKED records the size of the unpacked directory tree. SERVER_UNPACKED = "server.unpacked-size" - # UNPACK performance: {"best": , "worst": , "count": } + # UNPACK performance: {"min": , "max": , "count": } SERVER_UNPACK_PERF = "server.unpack-perf" # TARBALL_PATH access path of the dataset tarball. (E.g., we could use this diff --git a/lib/pbench/server/indexing_tarballs.py b/lib/pbench/server/indexing_tarballs.py index 0123daf3ab..85d3172b79 100644 --- a/lib/pbench/server/indexing_tarballs.py +++ b/lib/pbench/server/indexing_tarballs.py @@ -4,10 +4,13 @@ from collections import deque import os from pathlib import Path +import shutil import signal import tempfile from typing import Callable, List, NamedTuple, Optional, Tuple +import humanize + from pbench.common.exceptions import ( BadDate, BadMDLogFormat, @@ -314,13 +317,21 @@ def process_tb(self, tarballs: List[TarballData]) -> int: idxctx.logger.info("Load templates {!r}", res) return res.value - idxctx.logger.debug("Preparing to index {:d} tar balls", len(tb_deque)) - with tempfile.TemporaryDirectory( prefix=f"{self.name}.", dir=idxctx.config.TMP ) as tmpdir: - idxctx.logger.debug("start processing list of tar balls") + usage = shutil.disk_usage(tmpdir) + idxctx.logger.info( + "start processing {:d} tar balls using TMP {} ({} free)", + len(tb_deque), + tmpdir, + humanize.naturalsize(usage.free), + ) tb_list = Path(tmpdir, f"{self.name}.{idxctx.TS}.list") + indexed = Path(tmpdir, f"{self.name}.{idxctx.TS}.indexed") + erred = Path(tmpdir, f"{self.name}.{idxctx.TS}.erred") + skipped = Path(tmpdir, f"{self.name}.{idxctx.TS}.skipped") + ie_filepath = Path(tmpdir, f"{self.name}.{idxctx.TS}.indexing-errors.json") try: with tb_list.open(mode="w") as lfp: # Write out all the tar balls we are processing so external @@ -328,13 +339,6 @@ def process_tb(self, tarballs: List[TarballData]) -> int: for size, dataset, tb in tarballs: print(f"{size:20d} {dataset.name} {tb}", file=lfp) - indexed = Path(tmpdir, f"{self.name}.{idxctx.TS}.indexed") - erred = Path(tmpdir, f"{self.name}.{idxctx.TS}.erred") - skipped = Path(tmpdir, f"{self.name}.{idxctx.TS}.skipped") - ie_filepath = Path( - tmpdir, f"{self.name}.{idxctx.TS}.indexing-errors.json" - ) - # We use a list object here so that when we close over this # variable in the handler, the list object will be closed over, # but not its contents. From 9ad4a16a6e2d90978eba026ecf727842f9ca6b3c Mon Sep 17 00:00:00 2001 From: David Butenhof Date: Fri, 1 Mar 2024 10:02:14 -0500 Subject: [PATCH 4/8] Review comments, cleanup --- lib/pbench/cli/server/options.py | 6 ++++-- lib/pbench/cli/server/report.py | 8 ++++---- lib/pbench/cli/server/tree_manage.py | 18 ++++++++++-------- lib/pbench/server/cache_manager.py | 1 - 4 files changed, 18 insertions(+), 15 deletions(-) diff --git a/lib/pbench/cli/server/options.py b/lib/pbench/cli/server/options.py index 2d40cda904..258363f035 100644 --- a/lib/pbench/cli/server/options.py +++ b/lib/pbench/cli/server/options.py @@ -22,13 +22,15 @@ def _pbench_server_config(f: Callable) -> Callable: def callback(ctx, param, value): clictx = ctx.ensure_object(CliContext) - clictx.config = value + clictx.config = ( + value if value else "/opt/pbench-server/lib/config/pbench-server.cfg" + ) return value return click.option( "-C", "--config", - required=True, + required=False, envvar="_PBENCH_SERVER_CONFIG", type=click.Path(exists=True, readable=True), callback=callback, diff --git a/lib/pbench/cli/server/report.py b/lib/pbench/cli/server/report.py index 1654210e78..b8a3fbcf1e 100644 --- a/lib/pbench/cli/server/report.py +++ b/lib/pbench/cli/server/report.py @@ -39,7 +39,7 @@ def __init__(self, name: str, really_big: Union[int, float] = GINORMOUS): self.name = name self.min = really_big self.min_name = None - self.max = 0 + self.max = -really_big self.max_name = None def add( @@ -49,11 +49,11 @@ def add( max: Optional[Union[int, float]] = None, ): minv = value - maxv = max if max else value + maxv = max if max is not None else value if minv < self.min: self.min = minv self.min_name = name - elif maxv > self.max: + if maxv > self.max: self.max = maxv self.max_name = name @@ -258,7 +258,7 @@ def report_cache(tree: CacheManager): ) if stream_unpack_skipped or verifier.verify: click.echo( - f" Missing unpack performance data for {stream_unpack_skipped:,d} datasets" + f" {stream_unpack_skipped:,d} datasets are missing unpack performance data" ) diff --git a/lib/pbench/cli/server/tree_manage.py b/lib/pbench/cli/server/tree_manage.py index e419018f86..b2cb57bbd0 100644 --- a/lib/pbench/cli/server/tree_manage.py +++ b/lib/pbench/cli/server/tree_manage.py @@ -3,6 +3,7 @@ import click import humanfriendly +import humanize from pbench.cli import pass_cli_context from pbench.cli.server import config_setup, Detail, Verify, Watch @@ -61,16 +62,15 @@ def print_tree(tree: CacheManager): ) @click.option( "--reclaim-percent", - show_default=True, is_flag=False, flag_value=20.0, type=click.FLOAT, - help="Reclaim cached data to maintain a target % free space", + help="Reclaim cached data to maintain a target % free space [default 20]", ) @click.option( "--reclaim-size", is_flag=False, - help="Reclaim cached data to maintain specified free space", + help="Reclaim cached data to maintain specified free space (e.g., '1Gb')", ) @click.option( "--search", @@ -111,11 +111,12 @@ def tree_manage( logger = None global detailer, verifier, watcher - detailer = Detail(detail, False) + detailer = Detail(detail, errors=False) verifier = Verify(verify) watcher = Watch(progress) try: + rv = 0 config = config_setup(context) logger = get_pbench_logger("pbench-tree-manager", config) cache_m = CacheManager(config, logger) @@ -126,12 +127,13 @@ def tree_manage( verifier.status("finished discovery") watcher.update("building report") print_tree(cache_m) - rv = 0 if reclaim_percent or reclaim_size: - verifier.status("starting reclaim") - watcher.update("reclaiming") + target_pct = reclaim_percent if reclaim_percent else 0.0 target_size = humanfriendly.parse_size(reclaim_size) if reclaim_size else 0 - target_pct = reclaim_percent if reclaim_percent else 20.0 + verifier.status( + f"starting to reclaim {target_pct}% or {humanize.naturalsize(target_size)}" + ) + watcher.update("reclaiming") outcome = cache_m.reclaim_cache(goal_pct=target_pct, goal_bytes=target_size) verifier.status( f"finished reclaiming: goal {''if outcome else 'not '}achieved" diff --git a/lib/pbench/server/cache_manager.py b/lib/pbench/server/cache_manager.py index e02c67c10b..0da346a3f1 100644 --- a/lib/pbench/server/cache_manager.py +++ b/lib/pbench/server/cache_manager.py @@ -1573,7 +1573,6 @@ def _discover_datasets(self): and_(Dataset.id == Metadata.dataset_ref, Metadata.key == "server"), ) .yield_per(1000) - .all() ) for name, resource_id, path in rows: From e99a072fa7f76bf7ebbae53d47b79a2e5629cf36 Mon Sep 17 00:00:00 2001 From: David Butenhof Date: Sat, 2 Mar 2024 09:11:08 -0500 Subject: [PATCH 5/8] Add audit report I thought this morning about adding a CLI audit tool to query the audit log. While I wasn't quite motivated enough to write it, it occurred to me to at least cobble up a simple set of audit log statistics while eating breakfast. So here 'tis. --- lib/pbench/cli/server/report.py | 70 ++++++++++++++++++++++++++++++++- 1 file changed, 68 insertions(+), 2 deletions(-) diff --git a/lib/pbench/cli/server/report.py b/lib/pbench/cli/server/report.py index b8a3fbcf1e..5b2caccd70 100644 --- a/lib/pbench/cli/server/report.py +++ b/lib/pbench/cli/server/report.py @@ -17,6 +17,7 @@ from pbench.server import BadConfig from pbench.server.cache_manager import CacheManager from pbench.server.database.database import Database +from pbench.server.database.models.audit import Audit, AuditStatus from pbench.server.database.models.datasets import Metadata from pbench.server.database.models.index_map import IndexMap @@ -262,6 +263,63 @@ def report_cache(tree: CacheManager): ) +def report_audit(): + """Report audit log statistics.""" + + counter = 0 + events = 0 + unmatched_roots = set() + unmatched_terminal = set() + status = defaultdict(int) + operations = defaultdict(int) + objects = defaultdict(int) + users = defaultdict(int) + watcher.update("inspecting audit log") + audit_logs = ( + Database.db_session.query(Audit) + .execution_options(stream_results=True) + .order_by(Audit.timestamp) + .yield_per(1000) + ) + for audit in audit_logs: + counter += 1 + watcher.update(f"[{counter}] inspecting {audit.id} -> {audit.timestamp}") + if audit.status is AuditStatus.BEGIN: + events += 1 + unmatched_roots.add(audit.id) + operations[audit.name] += 1 + n = audit.user_name if audit.user_name else "" + users[n] += 1 + t = audit.object_type if audit.object_type else "" + objects[t] += 1 + else: + status[audit.status] += 1 + try: + unmatched_roots.remove(audit.root_id) + except KeyError: + detailer.error(f"audit {audit} has no matching `BEGIN`") + unmatched_terminal.add(audit.id) + + click.echo("Audit logs:") + click.echo(f" {counter:,d} audit log rows for {events:,d} events") + click.echo( + f" {len(unmatched_roots)} unterminated root rows, " + f"{len(unmatched_terminal)} unmatched terminators" + ) + click.echo(" Status summary:") + for name, count in status.items(): + click.echo(f" {name:>20s} {count:>10,d}") + click.echo(" Operation summary:") + for name, count in operations.items(): + click.echo(f" {name:>20s} {count:>10,d}") + click.echo(" Object type summary:") + for name, count in objects.items(): + click.echo(f" {name:>20s} {count:>10,d}") + click.echo(" Users summary:") + for name, count in users.items(): + click.echo(f" {name:>20s} {count:>10,d}") + + def report_sql(): """Report the SQL table storage statistics""" @@ -343,8 +401,9 @@ def report_states(): statement=text( "SELECT d.name, o.name, o.state, o.message FROM datasets AS d LEFT OUTER JOIN " "dataset_operations AS o ON o.dataset_ref = d.id" - ) - ) + ), + execution_options={"stream_results": True}, + ).yield_per(1000) for dataset, operation, state, message in rows: watcher.update(f"inspecting {dataset}:{operation}") if operation is None: @@ -387,6 +446,9 @@ def report_states(): @click.option( "--archive", "-A", default=False, is_flag=True, help="Display archive statistics" ) +@click.option( + "--audit", "-L", default=False, is_flag=True, help="Display audit log statistics" +) @click.option( "--backup", "-b", default=False, is_flag=True, help="Display backup statistics" ) @@ -422,6 +484,7 @@ def report( context: object, all: bool, archive: bool, + audit: bool, backup: bool, cache: bool, detail: bool, @@ -440,6 +503,7 @@ def report( context: click context all: report all statistics archive: report archive statistics + audit: report audit log statistics backup: report backup statistics cache: report cache statistics detail: provide additional per-file diagnostics @@ -471,6 +535,8 @@ def report( report_backup(cache_m) if all or cache: report_cache(cache_m) + if all or audit: + report_audit() if all or sql: report_sql() if all or states: From 45b8cff9184254df6937c850834ea476779f7ca8 Mon Sep 17 00:00:00 2001 From: David Butenhof Date: Sat, 2 Mar 2024 09:52:26 -0500 Subject: [PATCH 6/8] A few tweaks --- lib/pbench/cli/server/report.py | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/lib/pbench/cli/server/report.py b/lib/pbench/cli/server/report.py index 5b2caccd70..2c5314005a 100644 --- a/lib/pbench/cli/server/report.py +++ b/lib/pbench/cli/server/report.py @@ -37,6 +37,12 @@ class Comparator: def __init__(self, name: str, really_big: Union[int, float] = GINORMOUS): + """Initialize a comparator + + Args: + name: A name for the comparator + really_big: An optional maximum value + """ self.name = name self.min = really_big self.min_name = None @@ -45,10 +51,17 @@ def __init__(self, name: str, really_big: Union[int, float] = GINORMOUS): def add( self, - value: Union[int, float], name: str, + value: Union[int, float], max: Optional[Union[int, float]] = None, ): + """Add a data point to the comparator + + Args: + name: The name of the associated dataset + value: The value of the datapoint + max: [Optional] A second "maximum" value if adding a min/max pair + """ minv = value maxv = max if max is not None else value if minv < self.min: @@ -76,7 +89,7 @@ def report_archive(tree: CacheManager): watcher.update(f"({tarball_count}) archive {tarball.name}") size = tarball.tarball_path.stat().st_size tarball_size += size - tcomp.add(size, tarball.name) + tcomp.add(tarball.name, size) click.echo("Archive report:") click.echo( f" ARCHIVE ({tree.archive_root}): {humanize.naturalsize(usage.total)}: {humanize.naturalsize(usage.used)} " @@ -154,7 +167,7 @@ def report_cache(tree: CacheManager): detailer.error(f"{tarball.name} last ref access: {str(e)!r}") last_ref_errors += 1 else: - agecomp.add(referenced, tarball.name) + agecomp.add(tarball.name, referenced) if tarball.unpacked: cached_count += 1 size = Metadata.getvalue(tarball.dataset, Metadata.SERVER_UNPACKED) @@ -168,13 +181,13 @@ def report_cache(tree: CacheManager): ) bad_size += 1 else: - sizecomp.add(size, tarball.name) + sizecomp.add(tarball.name, size) cached_size += size # Check compression ratios tar_size = tarball.tarball_path.stat().st_size ratio = float(size - tar_size) / float(size) - compcomp.add(ratio, tarball.name) + compcomp.add(tarball.name, ratio) metrics = Metadata.getvalue(tarball.dataset, Metadata.SERVER_UNPACK_PERF) if not metrics: detailer.message(f"{tarball.name} has no unpack metrics") @@ -190,11 +203,11 @@ def report_cache(tree: CacheManager): else: unpacked_count += 1 unpacked_times += metrics["count"] - speedcomp.add(metrics["min"], tarball.name, metrics["max"]) + speedcomp.add(tarball.name, metrics["min"], metrics["max"]) if size and metrics: stream_fast = size / metrics["min"] / MEGABYTE_FP stream_slow = size / metrics["max"] / MEGABYTE_FP - streamcomp.add(stream_slow, tarball.name, stream_fast) + streamcomp.add(tarball.name, stream_slow, stream_fast) else: stream_unpack_skipped += 1 oldest = datetime.datetime.fromtimestamp(agecomp.min, datetime.timezone.utc) @@ -284,16 +297,16 @@ def report_audit(): for audit in audit_logs: counter += 1 watcher.update(f"[{counter}] inspecting {audit.id} -> {audit.timestamp}") + status[audit.status.name] += 1 if audit.status is AuditStatus.BEGIN: events += 1 unmatched_roots.add(audit.id) operations[audit.name] += 1 n = audit.user_name if audit.user_name else "" users[n] += 1 - t = audit.object_type if audit.object_type else "" + t = audit.object_type.name if audit.object_type else "" objects[t] += 1 else: - status[audit.status] += 1 try: unmatched_roots.remove(audit.root_id) except KeyError: From cdbc009ba115e632d073f4d46f689422f7dfe42c Mon Sep 17 00:00:00 2001 From: David Butenhof Date: Mon, 4 Mar 2024 15:29:43 -0500 Subject: [PATCH 7/8] Optimize cache discovery to pure SQL Some cleanup and review comments. --- lib/pbench/cli/server/report.py | 132 ++++++++++++++++++++------------ 1 file changed, 82 insertions(+), 50 deletions(-) diff --git a/lib/pbench/cli/server/report.py b/lib/pbench/cli/server/report.py index 2c5314005a..18da553faf 100644 --- a/lib/pbench/cli/server/report.py +++ b/lib/pbench/cli/server/report.py @@ -1,5 +1,7 @@ from collections import defaultdict import datetime +from operator import and_ +from pathlib import Path import re import shutil import time @@ -12,13 +14,12 @@ from pbench.cli import pass_cli_context from pbench.cli.server import config_setup, Detail, Verify, Watch from pbench.cli.server.options import common_options -from pbench.client.types import Dataset from pbench.common.logger import get_pbench_logger from pbench.server import BadConfig from pbench.server.cache_manager import CacheManager from pbench.server.database.database import Database from pbench.server.database.models.audit import Audit, AuditStatus -from pbench.server.database.models.datasets import Metadata +from pbench.server.database.models.datasets import Dataset, Metadata from pbench.server.database.models.index_map import IndexMap # An arbitrary really big number @@ -30,6 +31,9 @@ # Number of bytes in a megabyte, coincidentally also a really big number MEGABYTE_FP = 1000000.0 +# SQL "chunk size" +SQL_CHUNK = 2000 + detailer: Optional[Detail] = None watcher: Optional[Watch] = None verifier: Optional[Verify] = None @@ -93,7 +97,7 @@ def report_archive(tree: CacheManager): click.echo("Archive report:") click.echo( f" ARCHIVE ({tree.archive_root}): {humanize.naturalsize(usage.total)}: {humanize.naturalsize(usage.used)} " - f"used, {humanize.naturalsize(usage.free)}" + f"used, {humanize.naturalsize(usage.free)} free" ) click.echo( f" {tarball_count:,d} tarballs consuming {humanize.naturalsize(tarball_size)}" @@ -127,7 +131,7 @@ def report_backup(tree: CacheManager): click.echo("Backup report:") click.echo( f" BACKUP ({tree.backup_root}): {humanize.naturalsize(usage.total)}: {humanize.naturalsize(usage.used)} " - f"used, {humanize.naturalsize(usage.free)}" + f"used, {humanize.naturalsize(usage.free)} free" ) click.echo( f" {backup_count:,d} tarballs consuming {humanize.naturalsize(backup_size)}" @@ -145,7 +149,9 @@ def report_cache(tree: CacheManager): cached_count = 0 cached_size = 0 lacks_size = 0 + lacks_tarpath = 0 bad_size = 0 + found_metrics = False lacks_metrics = 0 bad_metrics = 0 unpacked_count = 0 @@ -158,56 +164,81 @@ def report_cache(tree: CacheManager): speedcomp = Comparator("speed", really_big=GINORMOUS_FP) streamcomp = Comparator("streaming", really_big=GINORMOUS_FP) - for tarball in tree.datasets.values(): - watcher.update(f"({cached_count}) cache {tarball.name}") - if tarball.cache: + rows = ( + Database.db_session.query( + Dataset.name, + Dataset.resource_id, + Metadata.value["tarball-path"].as_string(), + Metadata.value["unpacked-size"], + Metadata.value["unpack-perf"], + ) + .execution_options(stream_results=True) + .outerjoin( + Metadata, + and_(Dataset.id == Metadata.dataset_ref, Metadata.key == "server"), + ) + .yield_per(SQL_CHUNK) + ) + + for dsname, rid, tar, size, metrics in rows: + watcher.update(f"({cached_count}) cache {dsname}") + if tar: + tarball = Path(tar) + tarname = Dataset.stem(tarball) + else: + detailer.error(f"{dsname} doesn't have a 'server.tarball-path' metadata") + lacks_tarpath += 1 + tarball = None + tarname = dsname + cache = Path(tree.cache_root / rid) + if (cache / tarname).exists(): + cached_count += 1 try: - referenced = tarball.last_ref.stat().st_mtime + referenced = (cache / "last_ref").stat().st_mtime except Exception as e: - detailer.error(f"{tarball.name} last ref access: {str(e)!r}") + detailer.error(f"{dsname} last ref access: {str(e)!r}") last_ref_errors += 1 else: - agecomp.add(tarball.name, referenced) - if tarball.unpacked: - cached_count += 1 - size = Metadata.getvalue(tarball.dataset, Metadata.SERVER_UNPACKED) + agecomp.add(dsname, referenced) if not size: - detailer.error(f"{tarball.name} has no unpacked size") + detailer.error(f"{dsname} has no unpacked size") lacks_size += 1 elif not isinstance(size, int): detailer.error( - f"{tarball.name} has non-integer unpacked size " + f"{dsname} has non-integer unpacked size " f"{size!r} ({type(size).__name__})" ) bad_size += 1 else: - sizecomp.add(tarball.name, size) + sizecomp.add(dsname, size) cached_size += size # Check compression ratios - tar_size = tarball.tarball_path.stat().st_size - ratio = float(size - tar_size) / float(size) - compcomp.add(tarball.name, ratio) - metrics = Metadata.getvalue(tarball.dataset, Metadata.SERVER_UNPACK_PERF) + if tarball: + tar_size = tarball.stat().st_size + ratio = float(size - tar_size) / float(size) + compcomp.add(dsname, ratio) if not metrics: - detailer.message(f"{tarball.name} has no unpack metrics") + # NOTE: message not error since nobody has this yet (noise) + detailer.message(f"{dsname} has no unpack metrics") lacks_metrics += 1 elif not isinstance(metrics, dict) or {"min", "max", "count"} - set( metrics.keys() ): detailer.error( - f"{tarball.name} has bad unpack metrics " + f"{dsname} has bad unpack metrics " f"{metrics!r} ({type(metrics).__name__})" ) bad_metrics += 1 else: + found_metrics = True unpacked_count += 1 unpacked_times += metrics["count"] - speedcomp.add(tarball.name, metrics["min"], metrics["max"]) + speedcomp.add(dsname, metrics["min"], metrics["max"]) if size and metrics: stream_fast = size / metrics["min"] / MEGABYTE_FP stream_slow = size / metrics["max"] / MEGABYTE_FP - streamcomp.add(tarball.name, stream_slow, stream_fast) + streamcomp.add(dsname, stream_slow, stream_fast) else: stream_unpack_skipped += 1 oldest = datetime.datetime.fromtimestamp(agecomp.min, datetime.timezone.utc) @@ -243,22 +274,23 @@ def report_cache(tree: CacheManager): click.echo( f" The best compression ratio is {compcomp.max:.3%}, " f"{compcomp.max_name}" ) - click.echo( - f" The fastest cache unpack is {speedcomp.min:.3f} seconds, " - f"{speedcomp.min_name}" - ) - click.echo( - f" The slowest cache unpack is {speedcomp.max:.3f} seconds, " - f"{speedcomp.max_name}" - ) - click.echo( - f" The fastest cache unpack streaming rate is {streamcomp.max:.3f} Mb/second, " - f"{streamcomp.max_name}" - ) - click.echo( - f" The slowest cache unpack streaming rate is {streamcomp.min:.3f} Mb/second, " - f"{streamcomp.min_name}" - ) + if found_metrics: + click.echo( + f" The fastest cache unpack is {speedcomp.min:.3f} seconds, " + f"{speedcomp.min_name}" + ) + click.echo( + f" The slowest cache unpack is {speedcomp.max:.3f} seconds, " + f"{speedcomp.max_name}" + ) + click.echo( + f" The fastest cache unpack streaming rate is {streamcomp.max:.3f} Mb/second, " + f"{streamcomp.max_name}" + ) + click.echo( + f" The slowest cache unpack streaming rate is {streamcomp.min:.3f} Mb/second, " + f"{streamcomp.min_name}" + ) if lacks_size or last_ref_errors or bad_size or verifier.verify: click.echo( f" {lacks_size:,d} datasets have no unpacked size, " @@ -270,9 +302,9 @@ def report_cache(tree: CacheManager): f" {lacks_metrics:,d} datasets are missing unpack metric data, " f"{bad_metrics} have bad unpack metric data" ) - if stream_unpack_skipped or verifier.verify: + if lacks_tarpath: click.echo( - f" {stream_unpack_skipped:,d} datasets are missing unpack performance data" + f" {lacks_tarpath} datasets are missing server.tarball-path metadata" ) @@ -292,7 +324,7 @@ def report_audit(): Database.db_session.query(Audit) .execution_options(stream_results=True) .order_by(Audit.timestamp) - .yield_per(1000) + .yield_per(SQL_CHUNK) ) for audit in audit_logs: counter += 1 @@ -377,7 +409,7 @@ def report_sql(): query = select(IndexMap.root, IndexMap.index) for root, index in Database.db_session.execute( query, execution_options={"stream_results": True} - ).yield_per(500): + ).yield_per(SQL_CHUNK): record_count += 1 watcher.update(f"({record_count}: {root} -> {index}") roots.add(root) @@ -416,7 +448,7 @@ def report_states(): "dataset_operations AS o ON o.dataset_ref = d.id" ), execution_options={"stream_results": True}, - ).yield_per(1000) + ).yield_per(SQL_CHUNK) for dataset, operation, state, message in rows: watcher.update(f"inspecting {dataset}:{operation}") if operation is None: @@ -535,10 +567,10 @@ def report( try: config = config_setup(context) logger = get_pbench_logger("pbench-report-generator", config) - if any((all, archive, backup, cache)): - cache_m = CacheManager(config, logger) + cache_m = CacheManager(config, logger) + if any((all, archive, backup)): verifier.status("starting discovery") - watcher.update("discovering cache") + watcher.update("discovering archive tree") cache_m.full_discovery(search=False) watcher.update("processing reports") verifier.status("finished discovery") @@ -546,8 +578,8 @@ def report( report_archive(cache_m) if all or backup: report_backup(cache_m) - if all or cache: - report_cache(cache_m) + if all or cache: + report_cache(cache_m) if all or audit: report_audit() if all or sql: From 8b32bc6be7f3c3c52e4871e0b287008e84f9171c Mon Sep 17 00:00:00 2001 From: David Butenhof Date: Tue, 5 Mar 2024 19:08:09 -0500 Subject: [PATCH 8/8] Minor nesting --- lib/pbench/cli/server/report.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/pbench/cli/server/report.py b/lib/pbench/cli/server/report.py index 18da553faf..028f32d4e8 100644 --- a/lib/pbench/cli/server/report.py +++ b/lib/pbench/cli/server/report.py @@ -235,12 +235,12 @@ def report_cache(tree: CacheManager): unpacked_count += 1 unpacked_times += metrics["count"] speedcomp.add(dsname, metrics["min"], metrics["max"]) - if size and metrics: - stream_fast = size / metrics["min"] / MEGABYTE_FP - stream_slow = size / metrics["max"] / MEGABYTE_FP - streamcomp.add(dsname, stream_slow, stream_fast) - else: - stream_unpack_skipped += 1 + if size: + stream_fast = size / metrics["min"] / MEGABYTE_FP + stream_slow = size / metrics["max"] / MEGABYTE_FP + streamcomp.add(dsname, stream_slow, stream_fast) + else: + stream_unpack_skipped += 1 oldest = datetime.datetime.fromtimestamp(agecomp.min, datetime.timezone.utc) newest = datetime.datetime.fromtimestamp(agecomp.max, datetime.timezone.utc) click.echo("Cache report:")