From cdbc009ba115e632d073f4d46f689422f7dfe42c Mon Sep 17 00:00:00 2001 From: David Butenhof Date: Mon, 4 Mar 2024 15:29:43 -0500 Subject: [PATCH] Optimize cache discovery to pure SQL Some cleanup and review comments. --- lib/pbench/cli/server/report.py | 132 ++++++++++++++++++++------------ 1 file changed, 82 insertions(+), 50 deletions(-) diff --git a/lib/pbench/cli/server/report.py b/lib/pbench/cli/server/report.py index 2c5314005a..18da553faf 100644 --- a/lib/pbench/cli/server/report.py +++ b/lib/pbench/cli/server/report.py @@ -1,5 +1,7 @@ from collections import defaultdict import datetime +from operator import and_ +from pathlib import Path import re import shutil import time @@ -12,13 +14,12 @@ from pbench.cli import pass_cli_context from pbench.cli.server import config_setup, Detail, Verify, Watch from pbench.cli.server.options import common_options -from pbench.client.types import Dataset from pbench.common.logger import get_pbench_logger from pbench.server import BadConfig from pbench.server.cache_manager import CacheManager from pbench.server.database.database import Database from pbench.server.database.models.audit import Audit, AuditStatus -from pbench.server.database.models.datasets import Metadata +from pbench.server.database.models.datasets import Dataset, Metadata from pbench.server.database.models.index_map import IndexMap # An arbitrary really big number @@ -30,6 +31,9 @@ # Number of bytes in a megabyte, coincidentally also a really big number MEGABYTE_FP = 1000000.0 +# SQL "chunk size" +SQL_CHUNK = 2000 + detailer: Optional[Detail] = None watcher: Optional[Watch] = None verifier: Optional[Verify] = None @@ -93,7 +97,7 @@ def report_archive(tree: CacheManager): click.echo("Archive report:") click.echo( f" ARCHIVE ({tree.archive_root}): {humanize.naturalsize(usage.total)}: {humanize.naturalsize(usage.used)} " - f"used, {humanize.naturalsize(usage.free)}" + f"used, {humanize.naturalsize(usage.free)} free" ) click.echo( f" {tarball_count:,d} tarballs consuming {humanize.naturalsize(tarball_size)}" @@ -127,7 +131,7 @@ def report_backup(tree: CacheManager): click.echo("Backup report:") click.echo( f" BACKUP ({tree.backup_root}): {humanize.naturalsize(usage.total)}: {humanize.naturalsize(usage.used)} " - f"used, {humanize.naturalsize(usage.free)}" + f"used, {humanize.naturalsize(usage.free)} free" ) click.echo( f" {backup_count:,d} tarballs consuming {humanize.naturalsize(backup_size)}" @@ -145,7 +149,9 @@ def report_cache(tree: CacheManager): cached_count = 0 cached_size = 0 lacks_size = 0 + lacks_tarpath = 0 bad_size = 0 + found_metrics = False lacks_metrics = 0 bad_metrics = 0 unpacked_count = 0 @@ -158,56 +164,81 @@ def report_cache(tree: CacheManager): speedcomp = Comparator("speed", really_big=GINORMOUS_FP) streamcomp = Comparator("streaming", really_big=GINORMOUS_FP) - for tarball in tree.datasets.values(): - watcher.update(f"({cached_count}) cache {tarball.name}") - if tarball.cache: + rows = ( + Database.db_session.query( + Dataset.name, + Dataset.resource_id, + Metadata.value["tarball-path"].as_string(), + Metadata.value["unpacked-size"], + Metadata.value["unpack-perf"], + ) + .execution_options(stream_results=True) + .outerjoin( + Metadata, + and_(Dataset.id == Metadata.dataset_ref, Metadata.key == "server"), + ) + .yield_per(SQL_CHUNK) + ) + + for dsname, rid, tar, size, metrics in rows: + watcher.update(f"({cached_count}) cache {dsname}") + if tar: + tarball = Path(tar) + tarname = Dataset.stem(tarball) + else: + detailer.error(f"{dsname} doesn't have a 'server.tarball-path' metadata") + lacks_tarpath += 1 + tarball = None + tarname = dsname + cache = Path(tree.cache_root / rid) + if (cache / tarname).exists(): + cached_count += 1 try: - referenced = tarball.last_ref.stat().st_mtime + referenced = (cache / "last_ref").stat().st_mtime except Exception as e: - detailer.error(f"{tarball.name} last ref access: {str(e)!r}") + detailer.error(f"{dsname} last ref access: {str(e)!r}") last_ref_errors += 1 else: - agecomp.add(tarball.name, referenced) - if tarball.unpacked: - cached_count += 1 - size = Metadata.getvalue(tarball.dataset, Metadata.SERVER_UNPACKED) + agecomp.add(dsname, referenced) if not size: - detailer.error(f"{tarball.name} has no unpacked size") + detailer.error(f"{dsname} has no unpacked size") lacks_size += 1 elif not isinstance(size, int): detailer.error( - f"{tarball.name} has non-integer unpacked size " + f"{dsname} has non-integer unpacked size " f"{size!r} ({type(size).__name__})" ) bad_size += 1 else: - sizecomp.add(tarball.name, size) + sizecomp.add(dsname, size) cached_size += size # Check compression ratios - tar_size = tarball.tarball_path.stat().st_size - ratio = float(size - tar_size) / float(size) - compcomp.add(tarball.name, ratio) - metrics = Metadata.getvalue(tarball.dataset, Metadata.SERVER_UNPACK_PERF) + if tarball: + tar_size = tarball.stat().st_size + ratio = float(size - tar_size) / float(size) + compcomp.add(dsname, ratio) if not metrics: - detailer.message(f"{tarball.name} has no unpack metrics") + # NOTE: message not error since nobody has this yet (noise) + detailer.message(f"{dsname} has no unpack metrics") lacks_metrics += 1 elif not isinstance(metrics, dict) or {"min", "max", "count"} - set( metrics.keys() ): detailer.error( - f"{tarball.name} has bad unpack metrics " + f"{dsname} has bad unpack metrics " f"{metrics!r} ({type(metrics).__name__})" ) bad_metrics += 1 else: + found_metrics = True unpacked_count += 1 unpacked_times += metrics["count"] - speedcomp.add(tarball.name, metrics["min"], metrics["max"]) + speedcomp.add(dsname, metrics["min"], metrics["max"]) if size and metrics: stream_fast = size / metrics["min"] / MEGABYTE_FP stream_slow = size / metrics["max"] / MEGABYTE_FP - streamcomp.add(tarball.name, stream_slow, stream_fast) + streamcomp.add(dsname, stream_slow, stream_fast) else: stream_unpack_skipped += 1 oldest = datetime.datetime.fromtimestamp(agecomp.min, datetime.timezone.utc) @@ -243,22 +274,23 @@ def report_cache(tree: CacheManager): click.echo( f" The best compression ratio is {compcomp.max:.3%}, " f"{compcomp.max_name}" ) - click.echo( - f" The fastest cache unpack is {speedcomp.min:.3f} seconds, " - f"{speedcomp.min_name}" - ) - click.echo( - f" The slowest cache unpack is {speedcomp.max:.3f} seconds, " - f"{speedcomp.max_name}" - ) - click.echo( - f" The fastest cache unpack streaming rate is {streamcomp.max:.3f} Mb/second, " - f"{streamcomp.max_name}" - ) - click.echo( - f" The slowest cache unpack streaming rate is {streamcomp.min:.3f} Mb/second, " - f"{streamcomp.min_name}" - ) + if found_metrics: + click.echo( + f" The fastest cache unpack is {speedcomp.min:.3f} seconds, " + f"{speedcomp.min_name}" + ) + click.echo( + f" The slowest cache unpack is {speedcomp.max:.3f} seconds, " + f"{speedcomp.max_name}" + ) + click.echo( + f" The fastest cache unpack streaming rate is {streamcomp.max:.3f} Mb/second, " + f"{streamcomp.max_name}" + ) + click.echo( + f" The slowest cache unpack streaming rate is {streamcomp.min:.3f} Mb/second, " + f"{streamcomp.min_name}" + ) if lacks_size or last_ref_errors or bad_size or verifier.verify: click.echo( f" {lacks_size:,d} datasets have no unpacked size, " @@ -270,9 +302,9 @@ def report_cache(tree: CacheManager): f" {lacks_metrics:,d} datasets are missing unpack metric data, " f"{bad_metrics} have bad unpack metric data" ) - if stream_unpack_skipped or verifier.verify: + if lacks_tarpath: click.echo( - f" {stream_unpack_skipped:,d} datasets are missing unpack performance data" + f" {lacks_tarpath} datasets are missing server.tarball-path metadata" ) @@ -292,7 +324,7 @@ def report_audit(): Database.db_session.query(Audit) .execution_options(stream_results=True) .order_by(Audit.timestamp) - .yield_per(1000) + .yield_per(SQL_CHUNK) ) for audit in audit_logs: counter += 1 @@ -377,7 +409,7 @@ def report_sql(): query = select(IndexMap.root, IndexMap.index) for root, index in Database.db_session.execute( query, execution_options={"stream_results": True} - ).yield_per(500): + ).yield_per(SQL_CHUNK): record_count += 1 watcher.update(f"({record_count}: {root} -> {index}") roots.add(root) @@ -416,7 +448,7 @@ def report_states(): "dataset_operations AS o ON o.dataset_ref = d.id" ), execution_options={"stream_results": True}, - ).yield_per(1000) + ).yield_per(SQL_CHUNK) for dataset, operation, state, message in rows: watcher.update(f"inspecting {dataset}:{operation}") if operation is None: @@ -535,10 +567,10 @@ def report( try: config = config_setup(context) logger = get_pbench_logger("pbench-report-generator", config) - if any((all, archive, backup, cache)): - cache_m = CacheManager(config, logger) + cache_m = CacheManager(config, logger) + if any((all, archive, backup)): verifier.status("starting discovery") - watcher.update("discovering cache") + watcher.update("discovering archive tree") cache_m.full_discovery(search=False) watcher.update("processing reports") verifier.status("finished discovery") @@ -546,8 +578,8 @@ def report( report_archive(cache_m) if all or backup: report_backup(cache_m) - if all or cache: - report_cache(cache_m) + if all or cache: + report_cache(cache_m) if all or audit: report_audit() if all or sql: