diff --git a/lib/pbench/cli/server/__init__.py b/lib/pbench/cli/server/__init__.py index 587b5edc28..fef103774c 100644 --- a/lib/pbench/cli/server/__init__.py +++ b/lib/pbench/cli/server/__init__.py @@ -1,7 +1,129 @@ +import datetime +from threading import Thread +import time + +import click + from pbench.server import PbenchServerConfig from pbench.server.database import init_db +class Detail: + """Encapsulate generation of additional diagnostics""" + + def __init__(self, detail: bool = False, errors: bool = False): + """Initialize the object. + + Args: + detail: True if detailed messages should be generated + errors: True if individual file errors should be reported + """ + self.detail = detail + self.errors = errors + + def __bool__(self) -> bool: + """Report whether detailed messages are enabled + + Returns: + True if details are enabled + """ + return self.detail + + def error(self, message: str): + """Write a message if details are enabled. + + Args: + message: Detail string + """ + if self.errors: + click.secho(f"|| {message}", fg="red") + + def message(self, message: str): + """Write a message if details are enabled. + + Args: + message: Detail string + """ + if self.detail: + click.echo(f"|| {message}") + + +class Verify: + """Encapsulate -v status messages.""" + + def __init__(self, verify: bool): + """Initialize the object. + + Args: + verify: True to write status messages. + """ + self.verify = verify + + def __bool__(self) -> bool: + """Report whether verification is enabled. + + Returns: + True if verification is enabled. + """ + return self.verify + + def status(self, message: str): + """Write a message if verification is enabled. + + Args: + message: status string + """ + if self.verify: + ts = datetime.datetime.now().astimezone() + click.secho(f"({ts:%H:%M:%S}) {message}", fg="green") + + +class Watch: + """Encapsulate a periodic status update. + + The active message can be updated at will; a background thread will + periodically print the most recent status. + """ + + def __init__(self, interval: float): + """Initialize the object. + + Args: + interval: interval in seconds for status updates + """ + self.start = time.time() + self.interval = interval + self.status = "starting" + if interval: + self.thread = Thread(target=self.watcher) + self.thread.setDaemon(True) + self.thread.start() + + def update(self, status: str): + """Update status if appropriate. + + Update the message to be printed at the next interval, if progress + reporting is enabled. + + Args: + status: status string + """ + self.status = status + + def watcher(self): + """A worker thread to periodically write status messages.""" + + while True: + time.sleep(self.interval) + now = time.time() + delta = int(now - self.start) + hours, remainder = divmod(delta, 3600) + minutes, seconds = divmod(remainder, 60) + click.secho( + f"[{hours:02d}:{minutes:02d}:{seconds:02d}] {self.status}", fg="cyan" + ) + + def config_setup(context: object) -> PbenchServerConfig: config = PbenchServerConfig.create(context.config) # We're going to need the DB to track dataset state, so setup DB access. diff --git a/lib/pbench/cli/server/options.py b/lib/pbench/cli/server/options.py index 2d40cda904..258363f035 100644 --- a/lib/pbench/cli/server/options.py +++ b/lib/pbench/cli/server/options.py @@ -22,13 +22,15 @@ def _pbench_server_config(f: Callable) -> Callable: def callback(ctx, param, value): clictx = ctx.ensure_object(CliContext) - clictx.config = value + clictx.config = ( + value if value else "/opt/pbench-server/lib/config/pbench-server.cfg" + ) return value return click.option( "-C", "--config", - required=True, + required=False, envvar="_PBENCH_SERVER_CONFIG", type=click.Path(exists=True, readable=True), callback=callback, diff --git a/lib/pbench/cli/server/report.py b/lib/pbench/cli/server/report.py index 8702dcfc0d..028f32d4e8 100644 --- a/lib/pbench/cli/server/report.py +++ b/lib/pbench/cli/server/report.py @@ -1,148 +1,79 @@ from collections import defaultdict import datetime +from operator import and_ +from pathlib import Path import re -from threading import Thread +import shutil import time -from typing import Optional +from typing import Optional, Union import click import humanize from sqlalchemy import inspect, select, text from pbench.cli import pass_cli_context -from pbench.cli.server import config_setup +from pbench.cli.server import config_setup, Detail, Verify, Watch from pbench.cli.server.options import common_options -from pbench.client.types import Dataset from pbench.common.logger import get_pbench_logger from pbench.server import BadConfig from pbench.server.cache_manager import CacheManager from pbench.server.database.database import Database -from pbench.server.database.models.datasets import Metadata +from pbench.server.database.models.audit import Audit, AuditStatus +from pbench.server.database.models.datasets import Dataset, Metadata from pbench.server.database.models.index_map import IndexMap # An arbitrary really big number GINORMOUS = 2**64 +# A similarly arbitrary big floating point number +GINORMOUS_FP = 1000000.0 -class Detail: - """Encapsulate generation of additional diagnostics""" +# Number of bytes in a megabyte, coincidentally also a really big number +MEGABYTE_FP = 1000000.0 - def __init__(self, detail: bool = False, errors: bool = False): - """Initialize the object. +# SQL "chunk size" +SQL_CHUNK = 2000 - Args: - detail: True if detailed messages should be generated - errors: True if individual file errors should be reported - """ - self.detail = detail - self.errors = errors - - def __bool__(self) -> bool: - """Report whether detailed messages are enabled - - Returns: - True if details are enabled - """ - return self.detail - - def error(self, message: str): - """Write a message if details are enabled. - - Args: - message: Detail string - """ - if self.errors: - click.secho(f"|| {message}", fg="red") - - def message(self, message: str): - """Write a message if details are enabled. - - Args: - message: Detail string - """ - if self.detail: - click.echo(f"|| {message}") - - -class Verify: - """Encapsulate -v status messages.""" - - def __init__(self, verify: bool): - """Initialize the object. - - Args: - verify: True to write status messages. - """ - self.verify = verify - - def __bool__(self) -> bool: - """Report whether verification is enabled. - - Returns: - True if verification is enabled. - """ - return self.verify - - def status(self, message: str): - """Write a message if verification is enabled. - - Args: - message: status string - """ - if self.verify: - ts = datetime.datetime.now().astimezone() - click.secho(f"({ts:%H:%M:%S}) {message}", fg="green") - - -class Watch: - """Encapsulate a periodic status update. +detailer: Optional[Detail] = None +watcher: Optional[Watch] = None +verifier: Optional[Verify] = None - The active message can be updated at will; a background thread will - periodically print the most recent status. - """ - def __init__(self, interval: float): - """Initialize the object. +class Comparator: + def __init__(self, name: str, really_big: Union[int, float] = GINORMOUS): + """Initialize a comparator Args: - interval: interval in seconds for status updates + name: A name for the comparator + really_big: An optional maximum value """ - self.start = time.time() - self.interval = interval - self.status = "starting" - if interval: - self.thread = Thread(target=self.watcher) - self.thread.setDaemon(True) - self.thread.start() - - def update(self, status: str): - """Update status if appropriate. - - Update the message to be printed at the next interval, if progress - reporting is enabled. + self.name = name + self.min = really_big + self.min_name = None + self.max = -really_big + self.max_name = None + + def add( + self, + name: str, + value: Union[int, float], + max: Optional[Union[int, float]] = None, + ): + """Add a data point to the comparator Args: - status: status string + name: The name of the associated dataset + value: The value of the datapoint + max: [Optional] A second "maximum" value if adding a min/max pair """ - self.status = status - - def watcher(self): - """A worker thread to periodically write status messages.""" - - while True: - time.sleep(self.interval) - now = time.time() - delta = int(now - self.start) - hours, remainder = divmod(delta, 3600) - minutes, seconds = divmod(remainder, 60) - click.secho( - f"[{hours:02d}:{minutes:02d}:{seconds:02d}] {self.status}", fg="cyan" - ) - - -detailer: Optional[Detail] = None -watcher: Optional[Watch] = None -verifier: Optional[Verify] = None + minv = value + maxv = max if max is not None else value + if minv < self.min: + self.min = minv + self.min_name = name + if maxv > self.max: + self.max = maxv + self.max_name = name def report_archive(tree: CacheManager): @@ -155,32 +86,29 @@ def report_archive(tree: CacheManager): watcher.update("inspecting archive") tarball_count = len(tree.datasets) tarball_size = 0 - smallest_tarball = GINORMOUS - smallest_tarball_name = None - biggest_tarball = 0 - biggest_tarball_name = None + tcomp = Comparator("tarball") + usage = shutil.disk_usage(tree.archive_root) for tarball in tree.datasets.values(): watcher.update(f"({tarball_count}) archive {tarball.name}") size = tarball.tarball_path.stat().st_size tarball_size += size - if size < smallest_tarball: - smallest_tarball = size - smallest_tarball_name = tarball.name - if size > biggest_tarball: - biggest_tarball = size - biggest_tarball_name = tarball.name + tcomp.add(tarball.name, size) click.echo("Archive report:") + click.echo( + f" ARCHIVE ({tree.archive_root}): {humanize.naturalsize(usage.total)}: {humanize.naturalsize(usage.used)} " + f"used, {humanize.naturalsize(usage.free)} free" + ) click.echo( f" {tarball_count:,d} tarballs consuming {humanize.naturalsize(tarball_size)}" ) click.echo( - f" The smallest tarball is {humanize.naturalsize(smallest_tarball)}, " - f"{smallest_tarball_name}" + f" The smallest tarball is {humanize.naturalsize(tcomp.min)}, " + f"{tcomp.min_name}" ) click.echo( - f" The biggest tarball is {humanize.naturalsize(biggest_tarball)}, " - f"{biggest_tarball_name}" + f" The biggest tarball is {humanize.naturalsize(tcomp.max)}, " + f"{tcomp.max_name}" ) @@ -194,12 +122,17 @@ def report_backup(tree: CacheManager): watcher.update("inspecting backups") backup_count = 0 backup_size = 0 + usage = shutil.disk_usage(tree.backup_root) for tarball in tree.backup_root.glob("**/*.tar.xz"): watcher.update(f"({backup_count}) backup {Dataset.stem(tarball)}") backup_count += 1 backup_size += tarball.stat().st_size click.echo("Backup report:") + click.echo( + f" BACKUP ({tree.backup_root}): {humanize.naturalsize(usage.total)}: {humanize.naturalsize(usage.used)} " + f"used, {humanize.naturalsize(usage.free)} free" + ) click.echo( f" {backup_count:,d} tarballs consuming {humanize.naturalsize(backup_size)}" ) @@ -216,79 +149,220 @@ def report_cache(tree: CacheManager): cached_count = 0 cached_size = 0 lacks_size = 0 + lacks_tarpath = 0 bad_size = 0 - oldest_cache = time.time() * 2.0 # moderately distant future - oldest_cache_name = None - newest_cache = 0.0 # wayback machine - newest_cache_name = None - smallest_cache = GINORMOUS - smallest_cache_name = None - biggest_cache = 0 - biggest_cache_name = None + found_metrics = False + lacks_metrics = 0 + bad_metrics = 0 + unpacked_count = 0 + unpacked_times = 0 + stream_unpack_skipped = 0 last_ref_errors = 0 + agecomp = Comparator("age", really_big=time.time() * 2.0) + sizecomp = Comparator("size") + compcomp = Comparator("compression") + speedcomp = Comparator("speed", really_big=GINORMOUS_FP) + streamcomp = Comparator("streaming", really_big=GINORMOUS_FP) + + rows = ( + Database.db_session.query( + Dataset.name, + Dataset.resource_id, + Metadata.value["tarball-path"].as_string(), + Metadata.value["unpacked-size"], + Metadata.value["unpack-perf"], + ) + .execution_options(stream_results=True) + .outerjoin( + Metadata, + and_(Dataset.id == Metadata.dataset_ref, Metadata.key == "server"), + ) + .yield_per(SQL_CHUNK) + ) - for tarball in tree.datasets.values(): - watcher.update(f"({cached_count}) cache {tarball.name}") - if tarball.unpacked: + for dsname, rid, tar, size, metrics in rows: + watcher.update(f"({cached_count}) cache {dsname}") + if tar: + tarball = Path(tar) + tarname = Dataset.stem(tarball) + else: + detailer.error(f"{dsname} doesn't have a 'server.tarball-path' metadata") + lacks_tarpath += 1 + tarball = None + tarname = dsname + cache = Path(tree.cache_root / rid) + if (cache / tarname).exists(): + cached_count += 1 try: - referenced = tarball.last_ref.stat().st_mtime + referenced = (cache / "last_ref").stat().st_mtime except Exception as e: - detailer.error(f"{tarball.name} last ref access: {str(e)!r}") + detailer.error(f"{dsname} last ref access: {str(e)!r}") last_ref_errors += 1 else: - if referenced < oldest_cache: - oldest_cache = referenced - oldest_cache_name = tarball.name - if referenced > newest_cache: - newest_cache = referenced - newest_cache_name = tarball.name - cached_count += 1 - size = Metadata.getvalue(tarball.dataset, Metadata.SERVER_UNPACKED) - if not size: - detailer.error(f"{tarball.name} has no unpacked size") - lacks_size += 1 - elif not isinstance(size, int): - detailer.error( - f"{tarball.name} has non-integer unpacked size " - f"{size!r} ({type(size)})" - ) - bad_size += 1 + agecomp.add(dsname, referenced) + if not size: + detailer.error(f"{dsname} has no unpacked size") + lacks_size += 1 + elif not isinstance(size, int): + detailer.error( + f"{dsname} has non-integer unpacked size " + f"{size!r} ({type(size).__name__})" + ) + bad_size += 1 + else: + sizecomp.add(dsname, size) + cached_size += size + + # Check compression ratios + if tarball: + tar_size = tarball.stat().st_size + ratio = float(size - tar_size) / float(size) + compcomp.add(dsname, ratio) + if not metrics: + # NOTE: message not error since nobody has this yet (noise) + detailer.message(f"{dsname} has no unpack metrics") + lacks_metrics += 1 + elif not isinstance(metrics, dict) or {"min", "max", "count"} - set( + metrics.keys() + ): + detailer.error( + f"{dsname} has bad unpack metrics " + f"{metrics!r} ({type(metrics).__name__})" + ) + bad_metrics += 1 + else: + found_metrics = True + unpacked_count += 1 + unpacked_times += metrics["count"] + speedcomp.add(dsname, metrics["min"], metrics["max"]) + if size: + stream_fast = size / metrics["min"] / MEGABYTE_FP + stream_slow = size / metrics["max"] / MEGABYTE_FP + streamcomp.add(dsname, stream_slow, stream_fast) else: - if size < smallest_cache: - smallest_cache = size - smallest_cache_name = tarball.name - if size > biggest_cache: - biggest_cache = size - biggest_cache_name = tarball.name - cached_size += size - oldest = datetime.datetime.fromtimestamp(oldest_cache, datetime.timezone.utc) - newest = datetime.datetime.fromtimestamp(newest_cache, datetime.timezone.utc) + stream_unpack_skipped += 1 + oldest = datetime.datetime.fromtimestamp(agecomp.min, datetime.timezone.utc) + newest = datetime.datetime.fromtimestamp(agecomp.max, datetime.timezone.utc) click.echo("Cache report:") click.echo( - f" {cached_count:,d} datasets consuming " + f" {cached_count:,d} datasets currently unpacked, consuming " f"{humanize.naturalsize(cached_size)}" ) click.echo( - f" {lacks_size:,d} datasets have never been unpacked, " - f"{last_ref_errors:,d} are missing reference timestamps, " - f"{bad_size:,d} have bad size metadata" + f" {unpacked_count:,d} datasets have been unpacked a total of " + f"{unpacked_times:,d} times" + ) + click.echo( + " The least recently used cache was referenced " + f"{humanize.naturaldate(oldest)}, {agecomp.min_name}" ) click.echo( - f" The smallest cache is {humanize.naturalsize(smallest_cache)}, " - f"{smallest_cache_name}" + " The most recently used cache was referenced " + f"{humanize.naturaldate(newest)}, {agecomp.max_name}" ) click.echo( - f" The biggest cache is {humanize.naturalsize(biggest_cache)}, " - f"{biggest_cache_name}" + f" The smallest cache is {humanize.naturalsize(sizecomp.min)}, " + f"{sizecomp.min_name}" ) click.echo( - " The least recently used cache was referenced " - f"{humanize.naturaldate(oldest)}, {oldest_cache_name}" + f" The biggest cache is {humanize.naturalsize(sizecomp.max)}, " + f"{sizecomp.max_name}" ) click.echo( - " The most recently used cache was referenced " - f"{humanize.naturaldate(newest)}, {newest_cache_name}" + f" The worst compression ratio is {compcomp.min:.3%}, " f"{compcomp.min_name}" ) + click.echo( + f" The best compression ratio is {compcomp.max:.3%}, " f"{compcomp.max_name}" + ) + if found_metrics: + click.echo( + f" The fastest cache unpack is {speedcomp.min:.3f} seconds, " + f"{speedcomp.min_name}" + ) + click.echo( + f" The slowest cache unpack is {speedcomp.max:.3f} seconds, " + f"{speedcomp.max_name}" + ) + click.echo( + f" The fastest cache unpack streaming rate is {streamcomp.max:.3f} Mb/second, " + f"{streamcomp.max_name}" + ) + click.echo( + f" The slowest cache unpack streaming rate is {streamcomp.min:.3f} Mb/second, " + f"{streamcomp.min_name}" + ) + if lacks_size or last_ref_errors or bad_size or verifier.verify: + click.echo( + f" {lacks_size:,d} datasets have no unpacked size, " + f"{last_ref_errors:,d} are missing reference timestamps, " + f"{bad_size:,d} have bad size metadata" + ) + if lacks_metrics or bad_metrics or verifier.verify: + click.echo( + f" {lacks_metrics:,d} datasets are missing unpack metric data, " + f"{bad_metrics} have bad unpack metric data" + ) + if lacks_tarpath: + click.echo( + f" {lacks_tarpath} datasets are missing server.tarball-path metadata" + ) + + +def report_audit(): + """Report audit log statistics.""" + + counter = 0 + events = 0 + unmatched_roots = set() + unmatched_terminal = set() + status = defaultdict(int) + operations = defaultdict(int) + objects = defaultdict(int) + users = defaultdict(int) + watcher.update("inspecting audit log") + audit_logs = ( + Database.db_session.query(Audit) + .execution_options(stream_results=True) + .order_by(Audit.timestamp) + .yield_per(SQL_CHUNK) + ) + for audit in audit_logs: + counter += 1 + watcher.update(f"[{counter}] inspecting {audit.id} -> {audit.timestamp}") + status[audit.status.name] += 1 + if audit.status is AuditStatus.BEGIN: + events += 1 + unmatched_roots.add(audit.id) + operations[audit.name] += 1 + n = audit.user_name if audit.user_name else "" + users[n] += 1 + t = audit.object_type.name if audit.object_type else "" + objects[t] += 1 + else: + try: + unmatched_roots.remove(audit.root_id) + except KeyError: + detailer.error(f"audit {audit} has no matching `BEGIN`") + unmatched_terminal.add(audit.id) + + click.echo("Audit logs:") + click.echo(f" {counter:,d} audit log rows for {events:,d} events") + click.echo( + f" {len(unmatched_roots)} unterminated root rows, " + f"{len(unmatched_terminal)} unmatched terminators" + ) + click.echo(" Status summary:") + for name, count in status.items(): + click.echo(f" {name:>20s} {count:>10,d}") + click.echo(" Operation summary:") + for name, count in operations.items(): + click.echo(f" {name:>20s} {count:>10,d}") + click.echo(" Object type summary:") + for name, count in objects.items(): + click.echo(f" {name:>20s} {count:>10,d}") + click.echo(" Users summary:") + for name, count in users.items(): + click.echo(f" {name:>20s} {count:>10,d}") def report_sql(): @@ -335,7 +409,7 @@ def report_sql(): query = select(IndexMap.root, IndexMap.index) for root, index in Database.db_session.execute( query, execution_options={"stream_results": True} - ).yield_per(500): + ).yield_per(SQL_CHUNK): record_count += 1 watcher.update(f"({record_count}: {root} -> {index}") roots.add(root) @@ -372,8 +446,9 @@ def report_states(): statement=text( "SELECT d.name, o.name, o.state, o.message FROM datasets AS d LEFT OUTER JOIN " "dataset_operations AS o ON o.dataset_ref = d.id" - ) - ) + ), + execution_options={"stream_results": True}, + ).yield_per(SQL_CHUNK) for dataset, operation, state, message in rows: watcher.update(f"inspecting {dataset}:{operation}") if operation is None: @@ -416,6 +491,9 @@ def report_states(): @click.option( "--archive", "-A", default=False, is_flag=True, help="Display archive statistics" ) +@click.option( + "--audit", "-L", default=False, is_flag=True, help="Display audit log statistics" +) @click.option( "--backup", "-b", default=False, is_flag=True, help="Display backup statistics" ) @@ -451,6 +529,7 @@ def report( context: object, all: bool, archive: bool, + audit: bool, backup: bool, cache: bool, detail: bool, @@ -469,6 +548,7 @@ def report( context: click context all: report all statistics archive: report archive statistics + audit: report audit log statistics backup: report backup statistics cache: report cache statistics detail: provide additional per-file diagnostics @@ -487,10 +567,10 @@ def report( try: config = config_setup(context) logger = get_pbench_logger("pbench-report-generator", config) - if any((all, archive, backup, cache)): - cache_m = CacheManager(config, logger) + cache_m = CacheManager(config, logger) + if any((all, archive, backup)): verifier.status("starting discovery") - watcher.update("discovering cache") + watcher.update("discovering archive tree") cache_m.full_discovery(search=False) watcher.update("processing reports") verifier.status("finished discovery") @@ -498,8 +578,10 @@ def report( report_archive(cache_m) if all or backup: report_backup(cache_m) - if all or cache: - report_cache(cache_m) + if all or cache: + report_cache(cache_m) + if all or audit: + report_audit() if all or sql: report_sql() if all or states: diff --git a/lib/pbench/cli/server/tree_manage.py b/lib/pbench/cli/server/tree_manage.py index 4c29ae905d..b2cb57bbd0 100644 --- a/lib/pbench/cli/server/tree_manage.py +++ b/lib/pbench/cli/server/tree_manage.py @@ -1,15 +1,21 @@ import datetime +from typing import Optional import click import humanfriendly +import humanize from pbench.cli import pass_cli_context -from pbench.cli.server import config_setup +from pbench.cli.server import config_setup, Detail, Verify, Watch from pbench.cli.server.options import common_options from pbench.common.logger import get_pbench_logger from pbench.server import BadConfig from pbench.server.cache_manager import CacheManager +detailer: Optional[Detail] = None +watcher: Optional[Watch] = None +verifier: Optional[Verify] = None + def print_tree(tree: CacheManager): """Print basic information about the cache @@ -41,25 +47,49 @@ def print_tree(tree: CacheManager): @click.command(name="pbench-tree-manager") @pass_cli_context +@click.option( + "--detail", + "-d", + default=False, + is_flag=True, + help="Provide extra diagnostic information", +) @click.option( "--display", default=False, is_flag=True, help="Display the full tree on completion" ) +@click.option( + "--progress", "-p", type=float, default=0.0, help="Show periodic progress messages" +) @click.option( "--reclaim-percent", - show_default=True, is_flag=False, flag_value=20.0, type=click.FLOAT, - help="Reclaim cached data to maintain a target % free space", + help="Reclaim cached data to maintain a target % free space [default 20]", ) @click.option( "--reclaim-size", is_flag=False, - help="Reclaim cached data to maintain specified free space", + help="Reclaim cached data to maintain specified free space (e.g., '1Gb')", +) +@click.option( + "--search", + is_flag=True, + help="Do an exhaustive search of ARCHIVE tree rather than using SQL", +) +@click.option( + "--verify", "-v", default=False, is_flag=True, help="Display intermediate messages" ) @common_options def tree_manage( - context: object, display: bool, reclaim_percent: float, reclaim_size: str + context: object, + detail: bool, + display: bool, + progress: float, + reclaim_percent: float, + reclaim_size: str, + search: bool, + verify: bool, ): """ Discover, display, and manipulate the on-disk representation of controllers @@ -76,20 +106,38 @@ def tree_manage( lifetime: Number of hours to retain unused cache before reclaim reclaim-percent: Reclaim cached data to free specified % on drive reclaim-size: Reclame cached data to free specified size on drive + search: Discover cache with a full disk search """ logger = None + + global detailer, verifier, watcher + detailer = Detail(detail, errors=False) + verifier = Verify(verify) + watcher = Watch(progress) + try: + rv = 0 config = config_setup(context) logger = get_pbench_logger("pbench-tree-manager", config) cache_m = CacheManager(config, logger) - cache_m.full_discovery() if display: + verifier.status("starting discovery") + watcher.update("discovering cache") + cache_m.full_discovery(search=search) + verifier.status("finished discovery") + watcher.update("building report") print_tree(cache_m) - rv = 0 if reclaim_percent or reclaim_size: + target_pct = reclaim_percent if reclaim_percent else 0.0 target_size = humanfriendly.parse_size(reclaim_size) if reclaim_size else 0 - target_pct = reclaim_percent if reclaim_percent else 20.0 + verifier.status( + f"starting to reclaim {target_pct}% or {humanize.naturalsize(target_size)}" + ) + watcher.update("reclaiming") outcome = cache_m.reclaim_cache(goal_pct=target_pct, goal_bytes=target_size) + verifier.status( + f"finished reclaiming: goal {''if outcome else 'not '}achieved" + ) rv = 0 if outcome else 1 except Exception as exc: if logger: diff --git a/lib/pbench/server/api/resources/intake_base.py b/lib/pbench/server/api/resources/intake_base.py index bd53b846cc..e782023735 100644 --- a/lib/pbench/server/api/resources/intake_base.py +++ b/lib/pbench/server/api/resources/intake_base.py @@ -396,7 +396,7 @@ def _intake( except APIAbort: raise # Propagate an APIAbort exception to the outer block except Exception as e: - raise APIInternalError("Unable to create dataset") from e + raise APIInternalError(f"Unable to create dataset: {str(e)!r}") from e recovery.add(dataset.delete) @@ -452,11 +452,11 @@ def _intake( ) raise APIAbort(HTTPStatus.INSUFFICIENT_STORAGE, "Out of space") raise APIInternalError( - f"Unexpected error encountered during file upload: {str(exc)!r} " + f"Unexpected error encountered during file upload: {str(exc)!r}" ) from exc except Exception as e: raise APIInternalError( - "Unexpected error encountered during file upload: {str(e)!r}" + f"Unexpected error encountered during file upload: {str(e)!r}" ) from e if bytes_received != stream.length: diff --git a/lib/pbench/server/cache_manager.py b/lib/pbench/server/cache_manager.py index 2c47401cc8..0da346a3f1 100644 --- a/lib/pbench/server/cache_manager.py +++ b/lib/pbench/server/cache_manager.py @@ -1034,7 +1034,8 @@ def get_unpacked_size(self) -> int: Once we've unpacked it once, the size is tracked in the metadata key 'server.unpacked'. If we haven't yet unpacked it, and the dataset's - metadata.log provides a 'raw_size", use that as an estimate + metadata.log provides "run.raw_size", use that as an estimate (the + accuracy depends on relative block size compared to the server). Returns: unpacked size of the tarball, or 0 if unknown @@ -1133,27 +1134,46 @@ def get_results(self, lock: LockManager) -> Path: uend - ustart, ) - self.last_ref.touch(exist_ok=True) - - # If we have a Dataset, and haven't already done this, compute the - # unpacked size and record it in metadata so we can use it later. - ssize = time.time() - if self.dataset and not Metadata.getvalue( - self.dataset, Metadata.SERVER_UNPACKED - ): - try: - process = subprocess.run( - ["du", "-s", "-B1", str(self.unpacked)], - capture_output=True, - text=True, + # If we have a Dataset, and haven't already done this, compute the + # unpacked size and record it in metadata so we can use it later. + ssize = time.time() + if self.dataset: + if not Metadata.getvalue(self.dataset, Metadata.SERVER_UNPACKED): + try: + process = subprocess.run( + ["du", "-s", "-B1", str(self.unpacked)], + capture_output=True, + text=True, + ) + if process.returncode == 0: + size = int(process.stdout.split("\t", maxsplit=1)[0]) + self.unpacked_size = size + Metadata.setvalue( + self.dataset, Metadata.SERVER_UNPACKED, size + ) + except Exception as e: + self.logger.warning("usage check failed: {}", e) + + # Update the time-to-unpack statistic. If the metadata doesn't exist, + # or somehow isn't a dict (JSONOBJECT), create a new metric: otherwise + # update the existing metric. + unpack_time = uend - ustart + metric: JSONOBJECT = Metadata.getvalue( + self.dataset, Metadata.SERVER_UNPACK_PERF ) - if process.returncode == 0: - size = int(process.stdout.split("\t", maxsplit=1)[0]) - self.unpacked_size = size - Metadata.setvalue(self.dataset, Metadata.SERVER_UNPACKED, size) - except Exception as e: - self.logger.warning("usage check failed: {}", e) - self.logger.info("{}: size update {:.3f}", self.name, time.time() - ssize) + if not isinstance(metric, dict): + metric = {"min": unpack_time, "max": unpack_time, "count": 1} + else: + # We max out at about 12 days here, which seems safely excessive! + metric["min"] = min(metric.get("min", 1000000.0), unpack_time) + metric["max"] = max(metric.get("max", 0), unpack_time) + metric["count"] = metric.get("count", 0) + 1 + Metadata.setvalue(self.dataset, Metadata.SERVER_UNPACK_PERF, metric) + self.logger.info( + "{}: size update {:.3f}", self.name, time.time() - ssize + ) + + self.last_ref.touch(exist_ok=True) return self.unpacked def cache_delete(self): @@ -1182,7 +1202,18 @@ def delete(self): We'll log errors in deletion, but "succeed" and clear the links to both files. There's nothing more we can do. """ - self.cache_delete() + + # NOTE: this is similar to cache_delete above, but doesn't re-raise and + # operates on the root cache directory rather than unpack. + self.cachemap = None + if self.cache: + try: + shutil.rmtree(self.cache) + except Exception as e: + self.logger.error("cache delete for {} failed with {}", self.name, e) + + # Remove the isolator directory with the tarball and MD5 files; or if + # this is a pre-isolator tarball, unlink the MD5 and tarball. if self.isolator and self.isolator.exists(): try: shutil.rmtree(self.isolator) @@ -1542,7 +1573,6 @@ def _discover_datasets(self): and_(Dataset.id == Metadata.dataset_ref, Metadata.key == "server"), ) .yield_per(1000) - .all() ) for name, resource_id, path in rows: @@ -1873,6 +1903,7 @@ def reached_goal() -> GoalCheck: # our goals. candidates = sorted(candidates, key=lambda c: c.last_ref) has_cache = len(candidates) + goal_check = reached_goal() for candidate in candidates: name = candidate.cache.name cache_d = candidate.cache.parent @@ -1903,7 +1934,8 @@ def reached_goal() -> GoalCheck: error = e else: reclaimed += 1 - if reached_goal().reached: + goal_check = reached_goal() + if goal_check.reached: break except OSError as e: if e.errno in (errno.EAGAIN, errno.EACCES): @@ -1919,7 +1951,6 @@ def reached_goal() -> GoalCheck: error = e if error: self.logger.error("RECLAIM: {} failed with '{}'", name, error) - goal_check = reached_goal() free_pct = goal_check.usage.free * 100.0 / goal_check.usage.total self.logger.info( "RECLAIM {} (goal {}%, {}): {} datasets, " diff --git a/lib/pbench/server/database/models/datasets.py b/lib/pbench/server/database/models/datasets.py index d3d18785b5..66ce3d5633 100644 --- a/lib/pbench/server/database/models/datasets.py +++ b/lib/pbench/server/database/models/datasets.py @@ -674,6 +674,9 @@ class Metadata(Database.Base): # UNPACKED records the size of the unpacked directory tree. SERVER_UNPACKED = "server.unpacked-size" + # UNPACK performance: {"min": , "max": , "count": } + SERVER_UNPACK_PERF = "server.unpack-perf" + # TARBALL_PATH access path of the dataset tarball. (E.g., we could use this # to record an S3 object store key.) NOT YET USED. # diff --git a/lib/pbench/server/indexing_tarballs.py b/lib/pbench/server/indexing_tarballs.py index 0123daf3ab..85d3172b79 100644 --- a/lib/pbench/server/indexing_tarballs.py +++ b/lib/pbench/server/indexing_tarballs.py @@ -4,10 +4,13 @@ from collections import deque import os from pathlib import Path +import shutil import signal import tempfile from typing import Callable, List, NamedTuple, Optional, Tuple +import humanize + from pbench.common.exceptions import ( BadDate, BadMDLogFormat, @@ -314,13 +317,21 @@ def process_tb(self, tarballs: List[TarballData]) -> int: idxctx.logger.info("Load templates {!r}", res) return res.value - idxctx.logger.debug("Preparing to index {:d} tar balls", len(tb_deque)) - with tempfile.TemporaryDirectory( prefix=f"{self.name}.", dir=idxctx.config.TMP ) as tmpdir: - idxctx.logger.debug("start processing list of tar balls") + usage = shutil.disk_usage(tmpdir) + idxctx.logger.info( + "start processing {:d} tar balls using TMP {} ({} free)", + len(tb_deque), + tmpdir, + humanize.naturalsize(usage.free), + ) tb_list = Path(tmpdir, f"{self.name}.{idxctx.TS}.list") + indexed = Path(tmpdir, f"{self.name}.{idxctx.TS}.indexed") + erred = Path(tmpdir, f"{self.name}.{idxctx.TS}.erred") + skipped = Path(tmpdir, f"{self.name}.{idxctx.TS}.skipped") + ie_filepath = Path(tmpdir, f"{self.name}.{idxctx.TS}.indexing-errors.json") try: with tb_list.open(mode="w") as lfp: # Write out all the tar balls we are processing so external @@ -328,13 +339,6 @@ def process_tb(self, tarballs: List[TarballData]) -> int: for size, dataset, tb in tarballs: print(f"{size:20d} {dataset.name} {tb}", file=lfp) - indexed = Path(tmpdir, f"{self.name}.{idxctx.TS}.indexed") - erred = Path(tmpdir, f"{self.name}.{idxctx.TS}.erred") - skipped = Path(tmpdir, f"{self.name}.{idxctx.TS}.skipped") - ie_filepath = Path( - tmpdir, f"{self.name}.{idxctx.TS}.indexing-errors.json" - ) - # We use a list object here so that when we close over this # variable in the handler, the list object will be closed over, # but not its contents.