-
Notifications
You must be signed in to change notification settings - Fork 20
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
364 additions
and
0 deletions.
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
from dataclasses import dataclass | ||
from typing import Any | ||
|
||
import fsspec | ||
|
||
import icechunk as ic | ||
|
||
|
||
@dataclass | ||
class Dataset: | ||
bucket: str | ||
prefix: str | ||
storage: ic.Storage | ||
# data variable to load in `time_xarray_read_chunks` | ||
load_variables: list[str] | ||
# Passed to .isel for `time_xarray_read_chunks` | ||
chunk_selector: dict[str, Any] | ||
# name of (coordinate) variable used for testing "time to first byte" | ||
first_byte_variable: str | None | ||
# core useful group | ||
group: str | None = None | ||
|
||
def create(self) -> ic.Repository: | ||
fs = fsspec.filesystem("s3") | ||
try: | ||
fs.rm(f"{self.bucket}/{self.prefix}", recursive=True) | ||
except FileNotFoundError: | ||
pass | ||
return ic.Repository.create(self.storage) | ||
|
||
@property | ||
def store(self) -> ic.IcechunkStore: | ||
repo = ic.Repository.open(self.storage) | ||
return repo.readonly_session(branch="main").store | ||
|
||
|
||
ERA5 = Dataset( | ||
# FIXME: make these accessible from storage | ||
bucket="icechunk-test", | ||
prefix="era5-demo-repository-a10", | ||
storage=ic.Storage.new_s3( | ||
bucket="icechunk-test", | ||
prefix="era5-demo-repository-a10", | ||
config=ic.S3Options(), | ||
), | ||
load_variables=["2m_temperature"], | ||
chunk_selector={"time": 1}, | ||
first_byte_variable="latitude", | ||
) | ||
ERA5_SINGLE = Dataset( | ||
bucket="icechunk-test", | ||
prefix="perf-era5-single", | ||
storage=ic.Storage.new_s3( | ||
bucket="icechunk-test", | ||
prefix="perf-era5-single", | ||
config=ic.S3Options(), | ||
), | ||
load_variables=["PV"], | ||
chunk_selector={"time": 1}, | ||
first_byte_variable="latitude", | ||
) | ||
GB_128MB_CHUNKS = Dataset( | ||
bucket="icechunk-test", | ||
prefix="gb-128mb-chunks", | ||
storage=ic.Storage.new_s3( | ||
bucket="icechunk-test", | ||
prefix="gb-128mb-chunks", | ||
config=ic.S3Options(), | ||
), | ||
load_variables=["array"], | ||
chunk_selector={}, | ||
first_byte_variable=None, | ||
) | ||
|
||
GB_8MB_CHUNKS = Dataset( | ||
bucket="icechunk-test", | ||
prefix="gb-8mb-chunks", | ||
storage=ic.Storage.new_s3( | ||
bucket="icechunk-test", | ||
prefix="gb-8mb-chunks", | ||
config=ic.S3Options(), | ||
), | ||
load_variables=["array"], | ||
chunk_selector={}, | ||
first_byte_variable=None, | ||
) | ||
|
||
GPM_IMERG_VIRTUAL = Dataset( | ||
bucket="earthmover-icechunk-us-west-2", | ||
prefix="nasa-impact/GPM_3IMERGHH.07-virtual-1998", | ||
storage=ic.s3_storage( | ||
bucket="earthmover-icechunk-us-west-2", | ||
prefix="nasa-impact/GPM_3IMERGHH.07-virtual-1998", | ||
# access_key_id=access_key_id, | ||
# secret_access_key=secret, | ||
# session_token=session_token, | ||
), | ||
load_variables=["foo"], | ||
chunk_selector={"time": 1}, | ||
first_byte_variable="lat", | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
#!/usr/bin/env python3 | ||
# helper script to run and save benchmarks against named refs. | ||
# AKA a shitty version of asv's env management | ||
|
||
import os | ||
import subprocess | ||
import tempfile | ||
|
||
import tqdm.contrib.concurrent | ||
|
||
TMP = tempfile.gettempdir() | ||
CURRENTDIR = os.getcwd() | ||
if not CURRENTDIR.endswith("icechunk-python"): | ||
raise ValueError( | ||
"Running in the wrong directory. Please run from the `icechunk-python` directory." | ||
) | ||
|
||
|
||
def setup(ref): | ||
base = f"{TMP}/icechunk-bench-{ref}" | ||
cwd = f"{TMP}/icechunk-bench-{ref}/icechunk" | ||
pycwd = f"{TMP}/icechunk-bench-{ref}/icechunk/icechunk-python" | ||
activate = "source .venv/bin/activate" | ||
|
||
kwargs = dict(cwd=cwd, check=True) | ||
pykwargs = dict(cwd=pycwd, check=True) | ||
|
||
print(f"checking out {ref} to {base}") | ||
subprocess.run(["mkdir", base], check=False) | ||
# TODO: copy the local one instead to save time? | ||
subprocess.run( | ||
["git", "clone", "-q", "git@github.com:earth-mover/icechunk"], | ||
cwd=base, | ||
check=True, | ||
) | ||
subprocess.run(["git", "checkout", ref], **kwargs) | ||
subprocess.run(["cp", "-r", "benchmarks", pycwd], check=True) | ||
subprocess.run(["python3", "-m", "venv", ".venv"], cwd=pycwd, check=True) | ||
subprocess.run( | ||
["maturin", "build", "--release", "--out", "dist", "--find-interpreter"], | ||
**pykwargs, | ||
) | ||
subprocess.run( | ||
f"{activate}" | ||
"&& pip install -q icechunk['test'] --find-links dist" | ||
# TODO: figure this out from the current pyproject.toml [benchmark] section | ||
"&& pip install pytest-benchmark s3fs h5netcdf pooch tqdm ", | ||
shell=True, | ||
**pykwargs, | ||
) | ||
|
||
# TODO: this is only needed for format changes | ||
print(f"setup_benchmarks for {ref}") | ||
subprocess.run( | ||
f"{activate} && python benchmarks/setup_benchmarks.py --prefix={ref}", | ||
**pykwargs, | ||
shell=True, | ||
) | ||
|
||
|
||
def run(ref): | ||
pycwd = f"{TMP}/icechunk-bench-{ref}/icechunk/icechunk-python" | ||
activate = "source .venv/bin/activate" | ||
|
||
print(f"running for {ref}") | ||
subprocess.run( | ||
f"{activate} " | ||
# .benchmarks is the default location for pytest-benchmark | ||
f"&& pytest --benchmark-storage={CURRENTDIR}/.benchmarks --benchmark-save={ref}" | ||
" benchmarks/test_benchmark_reads.py", | ||
shell=True, | ||
cwd=pycwd, | ||
check=False, # don't stop if benchmarks fail | ||
) | ||
|
||
|
||
if __name__ == "__main__": | ||
refs = [ | ||
# "icechunk-v0.1.0-alpha.8", | ||
"icechunk-v0.1.0-alpha.10", | ||
# "main", | ||
] | ||
# TODO: parallelize the setup either here or externally | ||
# will need to provide an extra prefix to setup_benchmarks | ||
# somehow the Dataset class will have to take this extra prefix in to account. | ||
# A context manager may be a good idea? | ||
print("Setting up benchmarks") | ||
[setup(ref) for ref in tqdm.tqdm(refs)] | ||
print("Running benchmarks") | ||
[run(ref) for ref in tqdm.tqdm(refs)] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
#!/usr/bin/env python3 | ||
import datetime | ||
import time | ||
|
||
import numpy as np | ||
import tqdm | ||
from lib import ERA5_SINGLE, GB_8MB_CHUNKS, GB_128MB_CHUNKS | ||
|
||
import xarray as xr | ||
import zarr | ||
|
||
rng = np.random.default_rng(seed=123) | ||
|
||
|
||
def setup_synthetic_gb_dataset(): | ||
shape = (512, 512, 512) | ||
for dataset, chunksize in tqdm.tqdm( | ||
( | ||
(GB_128MB_CHUNKS, (64, 512, 512)), | ||
(GB_8MB_CHUNKS, (4, 512, 512)), | ||
) | ||
): | ||
repo = dataset.create() | ||
session = repo.writable_session("main") | ||
store = session.store | ||
group = zarr.group(store) | ||
array = group.create_array( | ||
name="array", | ||
shape=shape, | ||
chunks=chunksize, | ||
dtype=np.int64, | ||
dimension_names=("t", "y", "x"), | ||
) | ||
array[:] = rng.integers(-1000, high=1000, size=shape, dtype=np.int64) | ||
session.commit("initialized") | ||
|
||
|
||
def setup_era5_single(): | ||
import pooch | ||
|
||
# FIXME: move to earthmover-sample-data | ||
url = "https://nsf-ncar-era5.s3.amazonaws.com/e5.oper.an.pl/194106/e5.oper.an.pl.128_060_pv.ll025sc.1941060100_1941060123.nc" | ||
print(f"Reading {url}") | ||
tic = time.time() | ||
ds = xr.open_dataset( | ||
pooch.retrieve( | ||
url, | ||
known_hash="2322a4baaabd105cdc68356bdf2447b59fbbec559ee1f9a9a91d3c94f242701a", | ||
), | ||
engine="h5netcdf", | ||
) | ||
ds = ds.drop_encoding().load() | ||
print(f"Loaded data in {time.time() - tic} seconds") | ||
|
||
repo = ERA5_SINGLE.create() | ||
session = repo.writable_session("main") | ||
tic = time.time() | ||
encoding = { | ||
"PV": {"compressors": [zarr.codecs.ZstdCodec()], "chunks": (1, 1, 721, 1440)} | ||
} | ||
print("Writing data...") | ||
ds.to_zarr( | ||
session.store, mode="w", zarr_format=3, consolidated=False, encoding=encoding | ||
) | ||
print(f"Wrote data in {time.time() - tic} seconds") | ||
session.commit(f"wrote data at {datetime.datetime.now(datetime.UTC)}") | ||
|
||
|
||
# FIXME: Take a prefix | ||
if __name__ == "__main__": | ||
setup_era5_single() | ||
setup_synthetic_gb_dataset() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
import operator | ||
|
||
import pytest | ||
|
||
import xarray as xr | ||
import zarr | ||
from benchmarks.lib import ERA5_SINGLE, GB_8MB_CHUNKS, GB_128MB_CHUNKS, Dataset | ||
from zarr.abc.store import Store | ||
|
||
# TODO: configurable? | ||
zarr.config.set({"async.concurrency": 64}) | ||
|
||
|
||
@pytest.fixture( | ||
params=[ | ||
pytest.param(ERA5_SINGLE, id="era5-single"), | ||
pytest.param(GB_128MB_CHUNKS, id="gb-128mb"), | ||
pytest.param(GB_8MB_CHUNKS, id="gb-8mb"), | ||
] | ||
) | ||
def datasets(request) -> Store: | ||
"""These are "real-world" datasets stored in the cloud.""" | ||
return request.param | ||
|
||
|
||
# @pytest.fixture( | ||
# params=[ | ||
# "era5", # "icebreakhrrr" | ||
# ] | ||
# ) | ||
# def synthetic_stores_bench(request) -> Store: | ||
# """These are synthetic datasets stored in minio. | ||
# FIXME: run `....py` to initialize `minio`. | ||
# """ | ||
# if request.param == "era5": | ||
# s3_storage = ic.Storage.new_s3( | ||
# bucket="icechunk-test", | ||
# prefix="era5-demo-repository-a9", | ||
# config=ic.S3Options(), | ||
# ) | ||
# repo = ic.Repository.open_or_create(storage=s3_storage) | ||
# return repo.readonly_session("main").store | ||
|
||
|
||
def test_time_create_store(datasets: Dataset, benchmark) -> None: | ||
"""time to create the icechunk store object""" | ||
benchmark(operator.attrgetter("store"), datasets) | ||
|
||
|
||
def test_time_zarr_open(datasets: Dataset, benchmark) -> None: | ||
benchmark(zarr.open_group, datasets.store, path=datasets.group, mode="r") | ||
|
||
|
||
def test_time_zarr_members(datasets: Dataset, benchmark) -> None: | ||
group = zarr.open_group(datasets.store, path=datasets.group, mode="r") | ||
benchmark(operator.methodcaller("members"), group) | ||
|
||
|
||
@pytest.mark.benchmark(min_rounds=10) | ||
def test_time_xarray_open(datasets: Dataset, benchmark) -> None: | ||
benchmark( | ||
xr.open_zarr, | ||
datasets.store, | ||
group=datasets.group, | ||
chunks=None, | ||
consolidated=False, | ||
) | ||
|
||
|
||
# TODO: mark as slow? | ||
@pytest.mark.benchmark(min_rounds=2) | ||
def test_time_xarray_read_chunks(datasets: Dataset, benchmark) -> None: | ||
ds = xr.open_zarr( | ||
datasets.store, group=datasets.group, chunks=None, consolidated=False | ||
) | ||
subset = ds.isel(datasets.chunk_selector) | ||
# important this cannot be `load` | ||
benchmark(operator.methodcaller("compute"), subset[datasets.load_variables]) | ||
|
||
|
||
def test_time_first_bytes(datasets: Dataset, benchmark) -> None: | ||
def open_and_read(): | ||
# by opening the group repeatedly we force re-download of manifest | ||
# so that we actually measure what we want. | ||
group = zarr.open_group(datasets.store, path=datasets.group, mode="r") | ||
group[datasets.first_byte_variable][:] | ||
|
||
if datasets.first_byte_variable is None: | ||
pytest.skip("first_byte_variable not set!") | ||
benchmark(open_and_read) | ||
|
||
|
||
# TODO: write a large number of virtual chunk refs | ||
# TODO: synthetic dataset with very deep and large hierarchies for e.g. tree & members |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters