Skip to content

Commit

Permalink
Optimize cache discovery to pure SQL
Browse files Browse the repository at this point in the history
Some cleanup and review comments.
  • Loading branch information
dbutenhof committed Mar 4, 2024
1 parent 45b8cff commit cdbc009
Showing 1 changed file with 82 additions and 50 deletions.
132 changes: 82 additions & 50 deletions lib/pbench/cli/server/report.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from collections import defaultdict
import datetime
from operator import and_
from pathlib import Path
import re
import shutil
import time
Expand All @@ -12,13 +14,12 @@
from pbench.cli import pass_cli_context
from pbench.cli.server import config_setup, Detail, Verify, Watch
from pbench.cli.server.options import common_options
from pbench.client.types import Dataset
from pbench.common.logger import get_pbench_logger
from pbench.server import BadConfig
from pbench.server.cache_manager import CacheManager
from pbench.server.database.database import Database
from pbench.server.database.models.audit import Audit, AuditStatus
from pbench.server.database.models.datasets import Metadata
from pbench.server.database.models.datasets import Dataset, Metadata
from pbench.server.database.models.index_map import IndexMap

# An arbitrary really big number
Expand All @@ -30,6 +31,9 @@
# Number of bytes in a megabyte, coincidentally also a really big number
MEGABYTE_FP = 1000000.0

# SQL "chunk size"
SQL_CHUNK = 2000

detailer: Optional[Detail] = None
watcher: Optional[Watch] = None
verifier: Optional[Verify] = None
Expand Down Expand Up @@ -93,7 +97,7 @@ def report_archive(tree: CacheManager):
click.echo("Archive report:")
click.echo(
f" ARCHIVE ({tree.archive_root}): {humanize.naturalsize(usage.total)}: {humanize.naturalsize(usage.used)} "
f"used, {humanize.naturalsize(usage.free)}"
f"used, {humanize.naturalsize(usage.free)} free"
)
click.echo(
f" {tarball_count:,d} tarballs consuming {humanize.naturalsize(tarball_size)}"
Expand Down Expand Up @@ -127,7 +131,7 @@ def report_backup(tree: CacheManager):
click.echo("Backup report:")
click.echo(
f" BACKUP ({tree.backup_root}): {humanize.naturalsize(usage.total)}: {humanize.naturalsize(usage.used)} "
f"used, {humanize.naturalsize(usage.free)}"
f"used, {humanize.naturalsize(usage.free)} free"
)
click.echo(
f" {backup_count:,d} tarballs consuming {humanize.naturalsize(backup_size)}"
Expand All @@ -145,7 +149,9 @@ def report_cache(tree: CacheManager):
cached_count = 0
cached_size = 0
lacks_size = 0
lacks_tarpath = 0
bad_size = 0
found_metrics = False
lacks_metrics = 0
bad_metrics = 0
unpacked_count = 0
Expand All @@ -158,56 +164,81 @@ def report_cache(tree: CacheManager):
speedcomp = Comparator("speed", really_big=GINORMOUS_FP)
streamcomp = Comparator("streaming", really_big=GINORMOUS_FP)

for tarball in tree.datasets.values():
watcher.update(f"({cached_count}) cache {tarball.name}")
if tarball.cache:
rows = (
Database.db_session.query(
Dataset.name,
Dataset.resource_id,
Metadata.value["tarball-path"].as_string(),
Metadata.value["unpacked-size"],
Metadata.value["unpack-perf"],
)
.execution_options(stream_results=True)
.outerjoin(
Metadata,
and_(Dataset.id == Metadata.dataset_ref, Metadata.key == "server"),
)
.yield_per(SQL_CHUNK)
)

for dsname, rid, tar, size, metrics in rows:
watcher.update(f"({cached_count}) cache {dsname}")
if tar:
tarball = Path(tar)
tarname = Dataset.stem(tarball)
else:
detailer.error(f"{dsname} doesn't have a 'server.tarball-path' metadata")
lacks_tarpath += 1
tarball = None
tarname = dsname
cache = Path(tree.cache_root / rid)
if (cache / tarname).exists():
cached_count += 1
try:
referenced = tarball.last_ref.stat().st_mtime
referenced = (cache / "last_ref").stat().st_mtime
except Exception as e:
detailer.error(f"{tarball.name} last ref access: {str(e)!r}")
detailer.error(f"{dsname} last ref access: {str(e)!r}")
last_ref_errors += 1
else:
agecomp.add(tarball.name, referenced)
if tarball.unpacked:
cached_count += 1
size = Metadata.getvalue(tarball.dataset, Metadata.SERVER_UNPACKED)
agecomp.add(dsname, referenced)
if not size:
detailer.error(f"{tarball.name} has no unpacked size")
detailer.error(f"{dsname} has no unpacked size")
lacks_size += 1
elif not isinstance(size, int):
detailer.error(
f"{tarball.name} has non-integer unpacked size "
f"{dsname} has non-integer unpacked size "
f"{size!r} ({type(size).__name__})"
)
bad_size += 1
else:
sizecomp.add(tarball.name, size)
sizecomp.add(dsname, size)
cached_size += size

# Check compression ratios
tar_size = tarball.tarball_path.stat().st_size
ratio = float(size - tar_size) / float(size)
compcomp.add(tarball.name, ratio)
metrics = Metadata.getvalue(tarball.dataset, Metadata.SERVER_UNPACK_PERF)
if tarball:
tar_size = tarball.stat().st_size
ratio = float(size - tar_size) / float(size)
compcomp.add(dsname, ratio)
if not metrics:
detailer.message(f"{tarball.name} has no unpack metrics")
# NOTE: message not error since nobody has this yet (noise)
detailer.message(f"{dsname} has no unpack metrics")
lacks_metrics += 1
elif not isinstance(metrics, dict) or {"min", "max", "count"} - set(
metrics.keys()
):
detailer.error(
f"{tarball.name} has bad unpack metrics "
f"{dsname} has bad unpack metrics "
f"{metrics!r} ({type(metrics).__name__})"
)
bad_metrics += 1
else:
found_metrics = True
unpacked_count += 1
unpacked_times += metrics["count"]
speedcomp.add(tarball.name, metrics["min"], metrics["max"])
speedcomp.add(dsname, metrics["min"], metrics["max"])
if size and metrics:
stream_fast = size / metrics["min"] / MEGABYTE_FP
stream_slow = size / metrics["max"] / MEGABYTE_FP
streamcomp.add(tarball.name, stream_slow, stream_fast)
streamcomp.add(dsname, stream_slow, stream_fast)
else:
stream_unpack_skipped += 1
oldest = datetime.datetime.fromtimestamp(agecomp.min, datetime.timezone.utc)
Expand Down Expand Up @@ -243,22 +274,23 @@ def report_cache(tree: CacheManager):
click.echo(
f" The best compression ratio is {compcomp.max:.3%}, " f"{compcomp.max_name}"
)
click.echo(
f" The fastest cache unpack is {speedcomp.min:.3f} seconds, "
f"{speedcomp.min_name}"
)
click.echo(
f" The slowest cache unpack is {speedcomp.max:.3f} seconds, "
f"{speedcomp.max_name}"
)
click.echo(
f" The fastest cache unpack streaming rate is {streamcomp.max:.3f} Mb/second, "
f"{streamcomp.max_name}"
)
click.echo(
f" The slowest cache unpack streaming rate is {streamcomp.min:.3f} Mb/second, "
f"{streamcomp.min_name}"
)
if found_metrics:
click.echo(
f" The fastest cache unpack is {speedcomp.min:.3f} seconds, "
f"{speedcomp.min_name}"
)
click.echo(
f" The slowest cache unpack is {speedcomp.max:.3f} seconds, "
f"{speedcomp.max_name}"
)
click.echo(
f" The fastest cache unpack streaming rate is {streamcomp.max:.3f} Mb/second, "
f"{streamcomp.max_name}"
)
click.echo(
f" The slowest cache unpack streaming rate is {streamcomp.min:.3f} Mb/second, "
f"{streamcomp.min_name}"
)
if lacks_size or last_ref_errors or bad_size or verifier.verify:
click.echo(
f" {lacks_size:,d} datasets have no unpacked size, "
Expand All @@ -270,9 +302,9 @@ def report_cache(tree: CacheManager):
f" {lacks_metrics:,d} datasets are missing unpack metric data, "
f"{bad_metrics} have bad unpack metric data"
)
if stream_unpack_skipped or verifier.verify:
if lacks_tarpath:
click.echo(
f" {stream_unpack_skipped:,d} datasets are missing unpack performance data"
f" {lacks_tarpath} datasets are missing server.tarball-path metadata"
)


Expand All @@ -292,7 +324,7 @@ def report_audit():
Database.db_session.query(Audit)
.execution_options(stream_results=True)
.order_by(Audit.timestamp)
.yield_per(1000)
.yield_per(SQL_CHUNK)
)
for audit in audit_logs:
counter += 1
Expand Down Expand Up @@ -377,7 +409,7 @@ def report_sql():
query = select(IndexMap.root, IndexMap.index)
for root, index in Database.db_session.execute(
query, execution_options={"stream_results": True}
).yield_per(500):
).yield_per(SQL_CHUNK):
record_count += 1
watcher.update(f"({record_count}: {root} -> {index}")
roots.add(root)
Expand Down Expand Up @@ -416,7 +448,7 @@ def report_states():
"dataset_operations AS o ON o.dataset_ref = d.id"
),
execution_options={"stream_results": True},
).yield_per(1000)
).yield_per(SQL_CHUNK)
for dataset, operation, state, message in rows:
watcher.update(f"inspecting {dataset}:{operation}")
if operation is None:
Expand Down Expand Up @@ -535,19 +567,19 @@ def report(
try:
config = config_setup(context)
logger = get_pbench_logger("pbench-report-generator", config)
if any((all, archive, backup, cache)):
cache_m = CacheManager(config, logger)
cache_m = CacheManager(config, logger)
if any((all, archive, backup)):
verifier.status("starting discovery")
watcher.update("discovering cache")
watcher.update("discovering archive tree")
cache_m.full_discovery(search=False)
watcher.update("processing reports")
verifier.status("finished discovery")
if all or archive:
report_archive(cache_m)
if all or backup:
report_backup(cache_m)
if all or cache:
report_cache(cache_m)
if all or cache:
report_cache(cache_m)
if all or audit:
report_audit()
if all or sql:
Expand Down

0 comments on commit cdbc009

Please sign in to comment.