From 2226b6edc21f953e9e81cd319195ada3bbe76507 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Wed, 8 Jan 2025 16:13:34 -0700 Subject: [PATCH] Benchmark with pytest-benchmark --- icechunk-python/benchmarks/__init__.py | 0 icechunk-python/benchmarks/lib.py | 101 ++++++++++++++++++ icechunk-python/benchmarks/run_refs.py | 90 ++++++++++++++++ .../benchmarks/setup_benchmarks.py | 72 +++++++++++++ .../benchmarks/test_benchmark_reads.py | 94 ++++++++++++++++ icechunk-python/pyproject.toml | 7 ++ 6 files changed, 364 insertions(+) create mode 100644 icechunk-python/benchmarks/__init__.py create mode 100644 icechunk-python/benchmarks/lib.py create mode 100644 icechunk-python/benchmarks/run_refs.py create mode 100644 icechunk-python/benchmarks/setup_benchmarks.py create mode 100644 icechunk-python/benchmarks/test_benchmark_reads.py diff --git a/icechunk-python/benchmarks/__init__.py b/icechunk-python/benchmarks/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/icechunk-python/benchmarks/lib.py b/icechunk-python/benchmarks/lib.py new file mode 100644 index 00000000..7427eecf --- /dev/null +++ b/icechunk-python/benchmarks/lib.py @@ -0,0 +1,101 @@ +from dataclasses import dataclass +from typing import Any + +import fsspec + +import icechunk as ic + + +@dataclass +class Dataset: + bucket: str + prefix: str + storage: ic.Storage + # data variable to load in `time_xarray_read_chunks` + load_variables: list[str] + # Passed to .isel for `time_xarray_read_chunks` + chunk_selector: dict[str, Any] + # name of (coordinate) variable used for testing "time to first byte" + first_byte_variable: str | None + # core useful group + group: str | None = None + + def create(self) -> ic.Repository: + fs = fsspec.filesystem("s3") + try: + fs.rm(f"{self.bucket}/{self.prefix}", recursive=True) + except FileNotFoundError: + pass + return ic.Repository.create(self.storage) + + @property + def store(self) -> ic.IcechunkStore: + repo = ic.Repository.open(self.storage) + return repo.readonly_session(branch="main").store + + +ERA5 = Dataset( + # FIXME: make these accessible from storage + bucket="icechunk-test", + prefix="era5-demo-repository-a10", + storage=ic.Storage.new_s3( + bucket="icechunk-test", + prefix="era5-demo-repository-a10", + config=ic.S3Options(), + ), + load_variables=["2m_temperature"], + chunk_selector={"time": 1}, + first_byte_variable="latitude", +) +ERA5_SINGLE = Dataset( + bucket="icechunk-test", + prefix="perf-era5-single", + storage=ic.Storage.new_s3( + bucket="icechunk-test", + prefix="perf-era5-single", + config=ic.S3Options(), + ), + load_variables=["PV"], + chunk_selector={"time": 1}, + first_byte_variable="latitude", +) +GB_128MB_CHUNKS = Dataset( + bucket="icechunk-test", + prefix="gb-128mb-chunks", + storage=ic.Storage.new_s3( + bucket="icechunk-test", + prefix="gb-128mb-chunks", + config=ic.S3Options(), + ), + load_variables=["array"], + chunk_selector={}, + first_byte_variable=None, +) + +GB_8MB_CHUNKS = Dataset( + bucket="icechunk-test", + prefix="gb-8mb-chunks", + storage=ic.Storage.new_s3( + bucket="icechunk-test", + prefix="gb-8mb-chunks", + config=ic.S3Options(), + ), + load_variables=["array"], + chunk_selector={}, + first_byte_variable=None, +) + +GPM_IMERG_VIRTUAL = Dataset( + bucket="earthmover-icechunk-us-west-2", + prefix="nasa-impact/GPM_3IMERGHH.07-virtual-1998", + storage=ic.s3_storage( + bucket="earthmover-icechunk-us-west-2", + prefix="nasa-impact/GPM_3IMERGHH.07-virtual-1998", + # access_key_id=access_key_id, + # secret_access_key=secret, + # session_token=session_token, + ), + load_variables=["foo"], + chunk_selector={"time": 1}, + first_byte_variable="lat", +) diff --git a/icechunk-python/benchmarks/run_refs.py b/icechunk-python/benchmarks/run_refs.py new file mode 100644 index 00000000..f9b48c73 --- /dev/null +++ b/icechunk-python/benchmarks/run_refs.py @@ -0,0 +1,90 @@ +#!/usr/bin/env python3 +# helper script to run and save benchmarks against named refs. +# AKA a shitty version of asv's env management + +import os +import subprocess +import tempfile + +import tqdm.contrib.concurrent + +TMP = tempfile.gettempdir() +CURRENTDIR = os.getcwd() +if not CURRENTDIR.endswith("icechunk-python"): + raise ValueError( + "Running in the wrong directory. Please run from the `icechunk-python` directory." + ) + + +def setup(ref): + base = f"{TMP}/icechunk-bench-{ref}" + cwd = f"{TMP}/icechunk-bench-{ref}/icechunk" + pycwd = f"{TMP}/icechunk-bench-{ref}/icechunk/icechunk-python" + activate = "source .venv/bin/activate" + + kwargs = dict(cwd=cwd, check=True) + pykwargs = dict(cwd=pycwd, check=True) + + print(f"checking out {ref} to {base}") + subprocess.run(["mkdir", base], check=False) + # TODO: copy the local one instead to save time? + subprocess.run( + ["git", "clone", "-q", "git@github.com:earth-mover/icechunk"], + cwd=base, + check=True, + ) + subprocess.run(["git", "checkout", ref], **kwargs) + subprocess.run(["cp", "-r", "benchmarks", pycwd], check=True) + subprocess.run(["python3", "-m", "venv", ".venv"], cwd=pycwd, check=True) + subprocess.run( + ["maturin", "build", "--release", "--out", "dist", "--find-interpreter"], + **pykwargs, + ) + subprocess.run( + f"{activate}" + "&& pip install -q icechunk['test'] --find-links dist" + # TODO: figure this out from the current pyproject.toml [benchmark] section + "&& pip install pytest-benchmark s3fs h5netcdf pooch tqdm ", + shell=True, + **pykwargs, + ) + + # TODO: this is only needed for format changes + print(f"setup_benchmarks for {ref}") + subprocess.run( + f"{activate} && python benchmarks/setup_benchmarks.py --prefix={ref}", + **pykwargs, + shell=True, + ) + + +def run(ref): + pycwd = f"{TMP}/icechunk-bench-{ref}/icechunk/icechunk-python" + activate = "source .venv/bin/activate" + + print(f"running for {ref}") + subprocess.run( + f"{activate} " + # .benchmarks is the default location for pytest-benchmark + f"&& pytest --benchmark-storage={CURRENTDIR}/.benchmarks --benchmark-save={ref}" + " benchmarks/test_benchmark_reads.py", + shell=True, + cwd=pycwd, + check=False, # don't stop if benchmarks fail + ) + + +if __name__ == "__main__": + refs = [ + # "icechunk-v0.1.0-alpha.8", + "icechunk-v0.1.0-alpha.10", + # "main", + ] + # TODO: parallelize the setup either here or externally + # will need to provide an extra prefix to setup_benchmarks + # somehow the Dataset class will have to take this extra prefix in to account. + # A context manager may be a good idea? + print("Setting up benchmarks") + [setup(ref) for ref in tqdm.tqdm(refs)] + print("Running benchmarks") + [run(ref) for ref in tqdm.tqdm(refs)] diff --git a/icechunk-python/benchmarks/setup_benchmarks.py b/icechunk-python/benchmarks/setup_benchmarks.py new file mode 100644 index 00000000..9e425647 --- /dev/null +++ b/icechunk-python/benchmarks/setup_benchmarks.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python3 +import datetime +import time + +import numpy as np +import tqdm +from lib import ERA5_SINGLE, GB_8MB_CHUNKS, GB_128MB_CHUNKS + +import xarray as xr +import zarr + +rng = np.random.default_rng(seed=123) + + +def setup_synthetic_gb_dataset(): + shape = (512, 512, 512) + for dataset, chunksize in tqdm.tqdm( + ( + (GB_128MB_CHUNKS, (64, 512, 512)), + (GB_8MB_CHUNKS, (4, 512, 512)), + ) + ): + repo = dataset.create() + session = repo.writable_session("main") + store = session.store + group = zarr.group(store) + array = group.create_array( + name="array", + shape=shape, + chunks=chunksize, + dtype=np.int64, + dimension_names=("t", "y", "x"), + ) + array[:] = rng.integers(-1000, high=1000, size=shape, dtype=np.int64) + session.commit("initialized") + + +def setup_era5_single(): + import pooch + + # FIXME: move to earthmover-sample-data + url = "https://nsf-ncar-era5.s3.amazonaws.com/e5.oper.an.pl/194106/e5.oper.an.pl.128_060_pv.ll025sc.1941060100_1941060123.nc" + print(f"Reading {url}") + tic = time.time() + ds = xr.open_dataset( + pooch.retrieve( + url, + known_hash="2322a4baaabd105cdc68356bdf2447b59fbbec559ee1f9a9a91d3c94f242701a", + ), + engine="h5netcdf", + ) + ds = ds.drop_encoding().load() + print(f"Loaded data in {time.time() - tic} seconds") + + repo = ERA5_SINGLE.create() + session = repo.writable_session("main") + tic = time.time() + encoding = { + "PV": {"compressors": [zarr.codecs.ZstdCodec()], "chunks": (1, 1, 721, 1440)} + } + print("Writing data...") + ds.to_zarr( + session.store, mode="w", zarr_format=3, consolidated=False, encoding=encoding + ) + print(f"Wrote data in {time.time() - tic} seconds") + session.commit(f"wrote data at {datetime.datetime.now(datetime.UTC)}") + + +# FIXME: Take a prefix +if __name__ == "__main__": + setup_era5_single() + setup_synthetic_gb_dataset() diff --git a/icechunk-python/benchmarks/test_benchmark_reads.py b/icechunk-python/benchmarks/test_benchmark_reads.py new file mode 100644 index 00000000..423f9720 --- /dev/null +++ b/icechunk-python/benchmarks/test_benchmark_reads.py @@ -0,0 +1,94 @@ +import operator + +import pytest + +import xarray as xr +import zarr +from benchmarks.lib import ERA5_SINGLE, GB_8MB_CHUNKS, GB_128MB_CHUNKS, Dataset +from zarr.abc.store import Store + +# TODO: configurable? +zarr.config.set({"async.concurrency": 64}) + + +@pytest.fixture( + params=[ + pytest.param(ERA5_SINGLE, id="era5-single"), + pytest.param(GB_128MB_CHUNKS, id="gb-128mb"), + pytest.param(GB_8MB_CHUNKS, id="gb-8mb"), + ] +) +def datasets(request) -> Store: + """These are "real-world" datasets stored in the cloud.""" + return request.param + + +# @pytest.fixture( +# params=[ +# "era5", # "icebreakhrrr" +# ] +# ) +# def synthetic_stores_bench(request) -> Store: +# """These are synthetic datasets stored in minio. +# FIXME: run `....py` to initialize `minio`. +# """ +# if request.param == "era5": +# s3_storage = ic.Storage.new_s3( +# bucket="icechunk-test", +# prefix="era5-demo-repository-a9", +# config=ic.S3Options(), +# ) +# repo = ic.Repository.open_or_create(storage=s3_storage) +# return repo.readonly_session("main").store + + +def test_time_create_store(datasets: Dataset, benchmark) -> None: + """time to create the icechunk store object""" + benchmark(operator.attrgetter("store"), datasets) + + +def test_time_zarr_open(datasets: Dataset, benchmark) -> None: + benchmark(zarr.open_group, datasets.store, path=datasets.group, mode="r") + + +def test_time_zarr_members(datasets: Dataset, benchmark) -> None: + group = zarr.open_group(datasets.store, path=datasets.group, mode="r") + benchmark(operator.methodcaller("members"), group) + + +@pytest.mark.benchmark(min_rounds=10) +def test_time_xarray_open(datasets: Dataset, benchmark) -> None: + benchmark( + xr.open_zarr, + datasets.store, + group=datasets.group, + chunks=None, + consolidated=False, + ) + + +# TODO: mark as slow? +@pytest.mark.benchmark(min_rounds=2) +def test_time_xarray_read_chunks(datasets: Dataset, benchmark) -> None: + ds = xr.open_zarr( + datasets.store, group=datasets.group, chunks=None, consolidated=False + ) + subset = ds.isel(datasets.chunk_selector) + # important this cannot be `load` + benchmark(operator.methodcaller("compute"), subset[datasets.load_variables]) + + +def test_time_first_bytes(datasets: Dataset, benchmark) -> None: + def open_and_read(): + # by opening the group repeatedly we force re-download of manifest + # so that we actually measure what we want. + group = zarr.open_group(datasets.store, path=datasets.group, mode="r") + group[datasets.first_byte_variable][:] + + if datasets.first_byte_variable is None: + pytest.skip("first_byte_variable not set!") + benchmark(open_and_read) + + +# TODO: write a large number of virtual chunk refs +# TODO: synthetic dataset with very deep and large hierarchies for e.g. tree & members diff --git a/icechunk-python/pyproject.toml b/icechunk-python/pyproject.toml index b9371586..438f09d6 100644 --- a/icechunk-python/pyproject.toml +++ b/icechunk-python/pyproject.toml @@ -44,6 +44,13 @@ test = [ "pandas-stubs", "boto3-stubs[s3]", ] +benchmark = [ + "pytest-benchmark[histogram]", + "s3fs", + "h5netcdf", + "pooch", + "tqdm", +] [tool.maturin] features = ["pyo3/extension-module"]