Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xarray.open_mfdataset() and .load() slow or hangs with 4D dataset (76 GB) after upgrading to dask>=2024.12.0 #9926

Closed
4 of 5 tasks
tomvothecoder opened this issue Jan 6, 2025 · 7 comments

Comments

@tomvothecoder
Copy link
Contributor

tomvothecoder commented Jan 6, 2025

What happened?

Overview

Related to E3SM-Project/e3sm_diags#880 (comment)

I noticed a significant slow down with the following operation after upgrading from dask=2024.11.2 to dask>=2024.12.0. I decided to post the issue here first, then see if we should escalate it over to the dask repo. I'm not sure what change might have contributed to this slow down.

Operation steps:

  1. Open a 76 GB dataset with open_mfdataset()
  2. Subset the 76 GB dataset on a time slice, reducing the dataset to ~2 GB
  3. Load the dataset into memory with .load()

Results:

  1. xarray=2024.11.0, dask=2024.11.2

    • mamba create -y -n xr_2024110_dask_2024112 -c conda-forge xarray=2024.11.0 dask=2024.11.2 netcdf4 ipykernel
    • Result: ~3-4 secs to load
  2. xarray=2024.11.0, dask=2024.12.0

    • mamba create -y -n xr_2024110_dask_2024120 -c conda-forge xarray=2024.11.0 dask=2024.12.0 netcdf4 ipykernel
    • Result: ~85 secs to load

What did you expect to happen?

The dataset should load into memory at the same speed between dask versions.

Minimal Complete Verifiable Example

"""
This script benchmarks the time taken to load a subset of a large xarray dataset
into memory.

Test Environments:
1. xarray=2024.11.0, dask=2024.11.2
    - Command: mamba create -y -n xr_2024110_dask_2024112 -c conda-forge xarray=2024.11.0 dask=2024.11.2 netcdf4 ipykernel
    - Result: ~3-4 secs to load

2. xarray=2024.11.0, dask=2024.12.0
    - Command: mamba create -y -n xr_2024110_dask_2024120 -c conda-forge xarray=2024.11.0 dask=2024.12.0 netcdf4 ipykernel
    - Result: ~85 secs to load

Steps:
1. Open the "ua" dataset (~76 GB) from a specified file path.
2. Subset the "ua" dataset to a smaller size (~2 GB) based on a time range.
3. Load the subsetted dataset into memory and measure the time taken for this operation.
"""

# %%
import timeit

import xarray as xr

# 1. Open the "ua" dataset (~76 GB)
# Dataset can be downloaded here: web.lcrc.anl.gov/public/e3sm/diagnostics/observations/Atm/time-series/ERA5/ua_197901_201912.nc
filepaths = [
    "/lcrc/group/e3sm/diagnostics/observations/Atm/time-series/ERA5/ua_197901_201912.nc"
]

ds = xr.open_mfdataset(filepaths)

# 2. Subset the "ua" dataset (~2 GB)
ds_sub = ds.sel(time=slice("1996-01-15", "1997-01-15", None))

# %%
# 3. Load into memory
start_time = timeit.default_timer()
ds_sub.load()
elapsed = timeit.default_timer() - start_time

print(f"Time taken to load ds_xc_sub: {elapsed} seconds")

MVCE confirmation

  • Minimal example — the example is as focused as reasonably possible to demonstrate the underlying issue in xarray.
  • Complete example — the example is self-contained, including all data and the text of any traceback.
  • Verifiable example — the example copy & pastes into an IPython prompt or Binder notebook, returning the result.
  • New issue — a search of GitHub Issues suggests this is not a duplicate.
  • Recent environment — the issue occurs with the latest version of xarray and its dependencies.

Relevant log output

No response

Anything else we need to know?

This is the self-contained MVCE that does not reproduce the performance slow down for some reason, even though it replicates the dataset I've linked for download.

"""
This script benchmarks the time taken to load a subset of a large xarray dataset
into memory.

Test Environments:
1. xarray=2024.11.0, dask=2024.11.2
    - Command: mamba create -y -n xr_2024110_dask_2024112 -c conda-forge xarray=2024.11.0 dask=2024.11.2 netcdf4 ipykernel

2. xarray=2024.11.0, dask=2024.12.0
    - Command: mamba create -y -n xr_2024110_dask_2024120 -c conda-forge xarray=2024.11.0 dask=2024.12.0 netcdf4 ipykernel

"""

# %%
import numpy as np
import pandas as pd
import xarray as xr
import timeit

import dask.array as da

# 1. Create the coordinates.
times = pd.date_range("1979-01-01", "2019-12-31", freq="MS")
plevs = np.array(
    [
        100000.0,
        97500.0,
        95000.0,
        92500.0,
        90000.0,
        87500.0,
        85000.0,
        82500.0,
        80000.0,
        77500.0,
        75000.0,
        70000.0,
        65000.0,
        60000.0,
        55000.0,
        50000.0,
        45000.0,
        40000.0,
        35000.0,
        30000.0,
        25000.0,
        22500.0,
        20000.0,
        17500.0,
        15000.0,
        12500.0,
        10000.0,
        7000.0,
        5000.0,
        3000.0,
        2000.0,
        1000.0,
        700.0,
        500.0,
        300.0,
        200.0,
        100.0,
    ]
)
lats = np.linspace(-90, 90, 721)
lons = np.linspace(0, 360, 1440, endpoint=False)

# 2. Define the dimensions
time = len(times)
plev = len(plevs)
lat = len(lats)
lon = len(lons)

# 3. Create the dataset and subset it on time.
ds = xr.DataArray(
    name="ua",
    data=da.random.random(
        size=(time, plev, lat, lon), chunks=(497, 37, 721, 1440)
    ).astype(np.float32),
    dims=["time", "plev", "lat", "lon"],
    coords={"time": times, "plev": plevs, "lat": lats, "lon": lons},
).to_dataset()


ds_sub = ds.sel(time=slice("1996-01-15", "1997-01-15"))

# %%
# 4. Load the sub-setted dataset into memory.
start_time = timeit.default_timer()
ds_sub.load()
end_time = timeit.default_timer()

print(f"Time taken to load the dataset: {end_time - start_time} seconds")

Environment

INSTALLED VERSIONS

commit: None
python: 3.13.1 | packaged by conda-forge | (main, Dec 5 2024, 21:23:54) [GCC 13.3.0]
python-bits: 64
OS: Linux
OS-release: 5.14.21-150400.24.111_12.0.91-cray_shasta_c
machine: x86_64
processor: x86_64
byteorder: little
LC_ALL: None
LANG: en_US.UTF-8
LOCALE: ('en_US', 'UTF-8')
libhdf5: 1.14.4
libnetcdf: 4.9.2

xarray: 2024.11.0
pandas: 2.2.3
numpy: 2.2.1
scipy: None
netCDF4: 1.7.2
pydap: None
h5netcdf: None
h5py: None
zarr: None
cftime: 1.6.4
nc_time_axis: None
iris: None
bottleneck: None
dask: 2024.12.0
distributed: 2024.12.0
matplotlib: None
cartopy: None
seaborn: None
numbagg: None
fsspec: 2024.12.0
cupy: None
pint: None
sparse: None
flox: None
numpy_groupies: None
setuptools: None
pip: 24.3.1
conda: None
pytest: None
mypy: None
IPython: 8.31.0
sphinx: None

@tomvothecoder tomvothecoder added bug needs triage Issue that has not been reviewed by xarray team member labels Jan 6, 2025
@tomvothecoder tomvothecoder changed the title xarray.open_mfdataset() and .load() slow or hangs using dask>=2024.12.0 and 4D dataset (76 GB) xarray.open_mfdataset() and .load() slow or hangs after upgrading to dask>=2024.12.0 and 4D dataset (76 GB) Jan 6, 2025
@tomvothecoder tomvothecoder changed the title xarray.open_mfdataset() and .load() slow or hangs after upgrading to dask>=2024.12.0 and 4D dataset (76 GB) xarray.open_mfdataset() and .load() slow or hangs with 4D dataset (76 GB) after upgrading to dask>=2024.12.0 Jan 6, 2025
@dcherian dcherian added topic-dask and removed needs triage Issue that has not been reviewed by xarray team member labels Jan 6, 2025
@dcherian
Copy link
Contributor

dcherian commented Jan 6, 2025

cc @phofl

@phofl
Copy link
Contributor

phofl commented Jan 6, 2025

Thanks. I’ll take a look tomorrow..

@tomvothecoder any chance you could try this with an explicit cluster instantiated? I.e.

from distributed import Client

client = Client()

And then your code.

@tomvothecoder
Copy link
Contributor Author

Thanks. I’ll take a look tomorrow..

@tomvothecoder any chance you could try this with an explicit cluster instantiated? I.e.

from distributed import Client

client = Client()

And then your code.

@phofl Thanks for looking into it. Sure I'll give it a shot.

@tomvothecoder
Copy link
Contributor Author

tomvothecoder commented Jan 6, 2025

Using the explicitly instantiated cluster, the code crashes with dask=2024.12.0. I've captured the console output, which indicates a memory leak. The code works fine with dask=2024.11.2 (~4 sec runtime).

New script

import timeit

from dask.distributed import Client
import xarray as xr

if __name__ == "__main__":
    client = Client()

    # 1. Open the "ua" dataset (~76 GB)
    filepaths = [
        "/lcrc/group/e3sm/diagnostics/observations/Atm/time-series/ERA5/ua_197901_201912.nc"
    ]

    ds = xr.open_mfdataset(filepaths)

    # 2. Subset the "ua" dataset (~2 GB)
    ds_sub = ds.sel(time=slice("1996-01-15", "1997-01-15", None))

    # 3. Load into memory
    start_time = timeit.default_timer()
    ds_sub.load()
    elapsed = timeit.default_timer() - start_time

    print(f"Time taken to load ds_xc_sub: {elapsed} seconds")

Output

2025-01-06 15:14:00,987 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 44.08 GiB -- Worker memory limit: 62.95 GiB
2025-01-06 15:14:05,523 - distributed.worker.memory - WARNING - Worker is at 80% memory usage. Pausing worker.  Process memory: 50.42 GiB -- Worker memory limit: 62.95 GiB
2025-01-06 15:14:12,128 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:37423 (pid=2260359) exceeded 95% memory budget. Restarting...
2025-01-06 15:14:12,237 - distributed.nanny - WARNING - Restarting worker
2025-01-06 15:14:42,891 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 44.11 GiB -- Worker memory limit: 62.95 GiB
2025-01-06 15:14:47,640 - distributed.worker.memory - WARNING - Worker is at 80% memory usage. Pausing worker.  Process memory: 50.37 GiB -- Worker memory limit: 62.95 GiB
2025-01-06 15:14:55,728 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:40597 (pid=2260367) exceeded 95% memory budget. Restarting...
2025-01-06 15:14:55,844 - distributed.nanny - WARNING - Restarting worker
2025-01-06 15:15:29,190 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 44.12 GiB -- Worker memory limit: 62.95 GiB
2025-01-06 15:15:33,633 - distributed.worker.memory - WARNING - Worker is at 80% memory usage. Pausing worker.  Process memory: 50.42 GiB -- Worker memory limit: 62.95 GiB
2025-01-06 15:15:40,227 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:43119 (pid=2260355) exceeded 95% memory budget. Restarting...
2025-01-06 15:15:40,343 - distributed.nanny - WARNING - Restarting worker
2025-01-06 15:16:10,188 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 44.18 GiB -- Worker memory limit: 62.95 GiB
2025-01-06 15:16:14,729 - distributed.worker.memory - WARNING - Worker is at 80% memory usage. Pausing worker.  Process memory: 50.44 GiB -- Worker memory limit: 62.95 GiB
2025-01-06 15:16:21,228 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:43531 (pid=2260375) exceeded 95% memory budget. Restarting...
2025-01-06 15:16:21,338 - distributed.scheduler - ERROR - Task ('open_dataset-ua-original-getitem-668b8aa617f97e374f7aa417de9048ba', 0, 0, 0, 0) marked as failed because 4 workers died while trying to run it
2025-01-06 15:16:21,344 - distributed.nanny - WARNING - Restarting worker

---------------------------------------------------------------------------
KilledWorker                              Traceback (most recent call last)
Cell In[1], [line 23](vscode-notebook-cell:?execution_count=1&line=23)
     [21](vscode-notebook-cell:?execution_count=1&line=21) # 3. Load into memory
     [22](vscode-notebook-cell:?execution_count=1&line=22) start_time = timeit.default_timer()
---> [23](vscode-notebook-cell:?execution_count=1&line=23) ds_sub.load()
     [24](vscode-notebook-cell:?execution_count=1&line=24) elapsed = timeit.default_timer() - start_time
     [26](vscode-notebook-cell:?execution_count=1&line=26) print(f"Time taken to load ds_xc_sub: {elapsed} seconds")

File /gpfs/fs1/home/ac.tvo/mambaforge/envs/xr_2024110_dask_2024120/lib/python3.13/site-packages/xarray/core/dataset.py:899, in Dataset.load(self, **kwargs)
    [896](https://vscode-remote+ssh-002dremote-002bchrysalis.vscode-resource.vscode-cdn.net/gpfs/fs1/home/ac.tvo/mambaforge/envs/xr_2024110_dask_2024120/lib/python3.13/site-packages/xarray/core/dataset.py:896) chunkmanager = get_chunked_array_type(*lazy_data.values())
    [898](https://vscode-remote+ssh-002dremote-002bchrysalis.vscode-resource.vscode-cdn.net/gpfs/fs1/home/ac.tvo/mambaforge/envs/xr_2024110_dask_2024120/lib/python3.13/site-packages/xarray/core/dataset.py:898) # evaluate all the chunked arrays simultaneously
--> [899](https://vscode-remote+ssh-002dremote-002bchrysalis.vscode-resource.vscode-cdn.net/gpfs/fs1/home/ac.tvo/mambaforge/envs/xr_2024110_dask_2024120/lib/python3.13/site-packages/xarray/core/dataset.py:899) evaluated_data: tuple[np.ndarray[Any, Any], ...] = chunkmanager.compute(
    [900](https://vscode-remote+ssh-002dremote-002bchrysalis.vscode-resource.vscode-cdn.net/gpfs/fs1/home/ac.tvo/mambaforge/envs/xr_2024110_dask_2024120/lib/python3.13/site-packages/xarray/core/dataset.py:900)     *lazy_data.values(), **kwargs
    [901](https://vscode-remote+ssh-002dremote-002bchrysalis.vscode-resource.vscode-cdn.net/gpfs/fs1/home/ac.tvo/mambaforge/envs/xr_2024110_dask_2024120/lib/python3.13/site-packages/xarray/core/dataset.py:901) )
    [903](https://vscode-remote+ssh-002dremote-002bchrysalis.vscode-resource.vscode-cdn.net/gpfs/fs1/home/ac.tvo/mambaforge/envs/xr_2024110_dask_2024120/lib/python3.13/site-packages/xarray/core/dataset.py:903) for k, data in zip(lazy_data, evaluated_data, strict=False):
    [904](https://vscode-remote+ssh-002dremote-002bchrysalis.vscode-resource.vscode-cdn.net/gpfs/fs1/home/ac.tvo/mambaforge/envs/xr_2024110_dask_2024120/lib/python3.13/site-packages/xarray/core/dataset.py:904)     self.variables[k].data = data

File /gpfs/fs1/home/ac.tvo/mambaforge/envs/xr_2024110_dask_2024120/lib/python3.13/site-packages/xarray/namedarray/daskmanager.py:85, in DaskManager.compute(self, *data, **kwargs)
     [80](https://vscode-remote+ssh-002dremote-002bchrysalis.vscode-resource.vscode-cdn.net/gpfs/fs1/home/ac.tvo/mambaforge/envs/xr_2024110_dask_2024120/lib/python3.13/site-packages/xarray/namedarray/daskmanager.py:80) def compute(
     [81](https://vscode-remote+ssh-002dremote-002bchrysalis.vscode-resource.vscode-cdn.net/gpfs/fs1/home/ac.tvo/mambaforge/envs/xr_2024110_dask_2024120/lib/python3.13/site-packages/xarray/namedarray/daskmanager.py:81)     self, *data: Any, **kwargs: Any
     [82](https://vscode-remote+ssh-002dremote-002bchrysalis.vscode-resource.vscode-cdn.net/gpfs/fs1/home/ac.tvo/mambaforge/envs/xr_2024110_dask_2024120/lib/python3.13/site-packages/xarray/namedarray/daskmanager.py:82) ) -> tuple[np.ndarray[Any, _DType_co], ...]:
     [83](https://vscode-remote+ssh-002dremote-002bchrysalis.vscode-resource.vscode-cdn.net/gpfs/fs1/home/ac.tvo/mambaforge/envs/xr_2024110_dask_2024120/lib/python3.13/site-packages/xarray/namedarray/daskmanager.py:83)     from dask.array import compute
---> [85](https://vscode-remote+ssh-002dremote-002bchrysalis.vscode-resource.vscode-cdn.net/gpfs/fs1/home/ac.tvo/mambaforge/envs/xr_2024110_dask_2024120/lib/python3.13/site-packages/xarray/namedarray/daskmanager.py:85)     return compute(*data, **kwargs)

File /gpfs/fs1/home/ac.tvo/mambaforge/envs/xr_2024110_dask_2024120/lib/python3.13/site-packages/dask/base.py:660, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    [657](https://vscode-remote+ssh-002dremote-002bchrysalis.vscode-resource.vscode-cdn.net/gpfs/fs1/home/ac.tvo/mambaforge/envs/xr_2024110_dask_2024120/lib/python3.13/site-packages/dask/base.py:657)     postcomputes.append(x.__dask_postcompute__())
    [659](https://vscode-remote+ssh-002dremote-002bchrysalis.vscode-resource.vscode-cdn.net/gpfs/fs1/home/ac.tvo/mambaforge/envs/xr_2024110_dask_2024120/lib/python3.13/site-packages/dask/base.py:659) with shorten_traceback():
--> [660](https://vscode-remote+ssh-002dremote-002bchrysalis.vscode-resource.vscode-cdn.net/gpfs/fs1/home/ac.tvo/mambaforge/envs/xr_2024110_dask_2024120/lib/python3.13/site-packages/dask/base.py:660)     results = schedule(dsk, keys, **kwargs)
    [662](https://vscode-remote+ssh-002dremote-002bchrysalis.vscode-resource.vscode-cdn.net/gpfs/fs1/home/ac.tvo/mambaforge/envs/xr_2024110_dask_2024120/lib/python3.13/site-packages/dask/base.py:662) return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File /gpfs/fs1/home/ac.tvo/mambaforge/envs/xr_2024110_dask_2024120/lib/python3.13/site-packages/distributed/client.py:2427, in Client._gather(self, futures, errors, direct, local_worker)
   [2425](https://vscode-remote+ssh-002dremote-002bchrysalis.vscode-resource.vscode-cdn.net/gpfs/fs1/home/ac.tvo/mambaforge/envs/xr_2024110_dask_2024120/lib/python3.13/site-packages/distributed/client.py:2425)     exception = st.exception
   [2426](https://vscode-remote+ssh-002dremote-002bchrysalis.vscode-resource.vscode-cdn.net/gpfs/fs1/home/ac.tvo/mambaforge/envs/xr_2024110_dask_2024120/lib/python3.13/site-packages/distributed/client.py:2426)     traceback = st.traceback
-> [2427](https://vscode-remote+ssh-002dremote-002bchrysalis.vscode-resource.vscode-cdn.net/gpfs/fs1/home/ac.tvo/mambaforge/envs/xr_2024110_dask_2024120/lib/python3.13/site-packages/distributed/client.py:2427)     raise exception.with_traceback(traceback)
   [2428](https://vscode-remote+ssh-002dremote-002bchrysalis.vscode-resource.vscode-cdn.net/gpfs/fs1/home/ac.tvo/mambaforge/envs/xr_2024110_dask_2024120/lib/python3.13/site-packages/distributed/client.py:2428) if errors == "skip":
   [2429](https://vscode-remote+ssh-002dremote-002bchrysalis.vscode-resource.vscode-cdn.net/gpfs/fs1/home/ac.tvo/mambaforge/envs/xr_2024110_dask_2024120/lib/python3.13/site-packages/distributed/client.py:2429)     bad_keys.add(key)

KilledWorker: Attempted to run task ('open_dataset-ua-original-getitem-668b8aa617f97e374f7aa417de9048ba', 0, 0, 0, 0) on 4 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://127.0.0.1:43531. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.

@phofl
Copy link
Contributor

phofl commented Jan 6, 2025

Thanks! I’ll take a look tomorrow and get back to you

@phofl
Copy link
Contributor

phofl commented Jan 10, 2025

The new dask release will fix this

@dcherian dcherian closed this as not planned Won't fix, can't repro, duplicate, stale Jan 10, 2025
@tomvothecoder
Copy link
Contributor Author

The new dask release will fix this

Thank you, @phofl!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants