Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using map groups in MBCn #1951

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@
Changelog
=========

v0.54.0 (unreleased)
--------------------
Contributors to this version: Éric Dupuis (:user:`coxipi`).

New features and enhancements
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
* `xclim.sdba.MBCn` nows uses `map_groups` to performing grouping operations, as other adjustment methods. (:pull:`1951`)

v0.53.0 (2024-10-17)
--------------------
Contributors to this version: Adrien Lamarche (:user:`LamAdr`), Trevor James Smith (:user:`Zeitsperre`), Éric Dupuis (:user:`coxipi`), Pascal Bourgault (:user:`aulemahal`), Sascha Hofmann (:user:`saschahofmann`), David Huard (:user:`huard`).
Expand Down
1 change: 1 addition & 0 deletions tests/test_sdba/test_adjustment.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,7 @@ def test_add_dims(self, use_dask, open_dataset):
assert scen2.sel(location=["Kugluktuk", "Vancouver"]).isnull().all()


# TODO: Add more tests?
@pytest.mark.slow
class TestMBCn:
@pytest.mark.parametrize("use_dask", [True, False])
Expand Down
223 changes: 99 additions & 124 deletions xclim/sdba/_adjustment.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,20 +202,29 @@ def _npdft_train(ref, hist, rots, quantiles, method, extrap, n_escore, standardi
return af_q, escores


@map_groups(
# that is never used elsewhere, but we should precise that
# Grouper.DIM is dropped no?
reduces=[Grouper.DIM],
af_q=[Grouper.PROP, "quantiles", "iterations"],
escores=[Grouper.PROP, "quantiles", "iterations"],
)
def mbcn_train(
ds: xr.Dataset,
*,
dim: str,
rot_matrices: xr.DataArray,
pts_dims: Sequence[str],
quantiles: np.ndarray,
gw_idxs: xr.DataArray,
iterations: np.ndarray,
interp: str,
extrapolation: str,
n_escore: int,
) -> xr.Dataset:
"""Npdf transform training.

Adjusting factors obtained for each rotation in the npdf transform and conserved to be applied in
the adjusting step in :py:func:`mcbn_adjust`.
the adjusting step in :py:func:`mbcn_adjust`.

Parameters
----------
Expand All @@ -230,8 +239,8 @@ def mbcn_train(
is the normal case when using :py:func:`xclim.sdba.base.stack_variables`, and "multivar_prime",
quantiles : array-like
The quantiles to compute.
gw_idxs : xr.DataArray
Indices of the times in each windowed time group
iterations : array-like
The iterations for the various rotations in `rot_matrices`.
interp : str
The interpolation method to use.
extrapolation : str
Expand All @@ -248,53 +257,38 @@ def mbcn_train(
# unpack data
ref = ds.ref
hist = ds.hist
gr_dim = gw_idxs.attrs["group_dim"]

# npdf training core
af_q_l = []
escores_l = []

# loop over time blocks
for ib in range(gw_idxs[gr_dim].size):
# indices in a given time block
indices = gw_idxs[{gr_dim: ib}].fillna(-1).astype(int).values
ind = indices[indices >= 0]

# npdft training : multiple rotations on standardized datasets
# keep track of adjustment factors in each rotation for later use
af_q, escores = xr.apply_ufunc(
_npdft_train,
ref[{"time": ind}],
hist[{"time": ind}],
rot_matrices,
quantiles,
input_core_dims=[
[pts_dims[0], "time"],
[pts_dims[0], "time"],
["iterations", pts_dims[1], pts_dims[0]],
["quantiles"],
],
output_core_dims=[
["iterations", pts_dims[1], "quantiles"],
["iterations"],
],
dask="parallelized",
output_dtypes=[hist.dtype, hist.dtype],
kwargs={
"method": interp,
"extrap": extrapolation,
"n_escore": n_escore,
"standardize": True,
},
vectorize=True,
)
af_q_l.append(af_q.expand_dims({gr_dim: [ib]}))
escores_l.append(escores.expand_dims({gr_dim: [ib]}))
af_q = xr.concat(af_q_l, dim=gr_dim)
escores = xr.concat(escores_l, dim=gr_dim)
out = xr.Dataset({"af_q": af_q, "escores": escores}).assign_coords(
{"quantiles": quantiles, gr_dim: gw_idxs[gr_dim].values}
af_q, escores = xr.apply_ufunc(
_npdft_train,
ref,
hist,
rot_matrices,
quantiles,
input_core_dims=[
[pts_dims[0], "time"],
[pts_dims[0], "time"],
["iterations", pts_dims[1], pts_dims[0]],
["quantiles"],
],
output_core_dims=[
# this should really be pts_dims[1], but useful to keep same dim name
["iterations", pts_dims[0], "quantiles"],
["iterations"],
],
dask="parallelized",
output_dtypes=[hist.dtype, hist.dtype],
kwargs={
"method": interp,
"extrap": extrapolation,
"n_escore": n_escore,
"standardize": True,
},
vectorize=True,
)
# I broadcast escores like af_q for now
# the problem is with the multivar
out = xr.Dataset({"af_q": af_q, "escores": escores.broadcast_like(af_q)})
out = out.assign_coords({"quantiles": quantiles, "iterations": iterations})
return out


Expand Down Expand Up @@ -338,11 +332,12 @@ def _npdft_adjust(sim, af_q, rots, quantiles, method, extrap):
return sim


@map_groups(reduces=[Grouper.PROP, "iterations", "quantiles"], scen=[])
def mbcn_adjust(
ref: xr.Dataset,
hist: xr.Dataset,
sim: xr.Dataset,
ds: xr.Dataset,
*,
dim: str,
rot_matrices: xr.DataArray,
pts_dims: Sequence[str],
interp: str,
extrapolation: str,
Expand All @@ -355,22 +350,19 @@ def mbcn_adjust(

The function :py:func:`mbcn_train` pre-computes the adjustment factors for each rotation
in the npdf portion of the MBCn algorithm. The rest of adjustment is performed here
in `mbcn_adjust``.
in `mbcn_adjust`.

Parameters
----------
ref : xr.DataArray
training target
hist : xr.DataArray
training data
sim : xr.DataArray
data to adjust (stacked with multivariate dimension)
ds : xr.Dataset
Dataset variables:
rot_matrices : Rotation matrices used in the training step.
ref : training target
hist : training data
sim : data to adjust
af_q : Adjustment factors obtained in the training step for the npdf transform
g_idxs : Indices of the times in each time group
gw_idxs: Indices of the times in each windowed time group
rot_matrices : xr.DataArray
The rotation matrices as a 3D array ('iterations', <pts_dims[0]>, <pts_dims[1]>), with shape (n_iter, <N>, <N>).
pts_dims : [str, str]
The name of the "multivariate" dimension and its primed counterpart. Defaults to "multivar", which
is the normal case when using :py:func:`xclim.sdba.base.stack_variables`, and "multivar_prime"
Expand All @@ -397,72 +389,55 @@ def mbcn_adjust(
The adjusted data.
"""
# unpacking training parameters
rot_matrices = ds.rot_matrices
af_q = ds.af_q
ref = ds.ref
hist = ds.hist
sim = ds.sim
# TODO: investigate this below
# according to other docstrings, dim should have been a str
# but it's a list[str]. Are there times where len(dim)>1 ?
dim = dim if isinstance(dim, str) else dim[0]
# multivar_prime was renamed multivar, changing back
# drop "time", af_q was broadcasted to be used with map_groups
af_q = ds.af_q.rename({pts_dims[0]: pts_dims[1]})[{dim: 0}].drop(dim)
quantiles = af_q.quantiles
g_idxs = ds.g_idxs
gw_idxs = ds.gw_idxs
gr_dim = gw_idxs.attrs["group_dim"]
win = gw_idxs.attrs["group"][1]

# this way of handling was letting open the possibility to perform
# interpolation for multiple periods in the simulation all at once
# in principle, avoiding redundancy. Need to test this on small data
# to confirm it works, and on big data to check performance.
dims = ["time"] if period_dim is None else [period_dim, "time"]

# mbcn core
scen_mbcn = xr.zeros_like(sim)
for ib in range(gw_idxs[gr_dim].size):
# indices in a given time block (with and without the window)
indices_gw = gw_idxs[{gr_dim: ib}].fillna(-1).astype(int).values
ind_gw = indices_gw[indices_gw >= 0]
indices_g = g_idxs[{gr_dim: ib}].fillna(-1).astype(int).values
ind_g = indices_g[indices_g >= 0]

# 1. univariate adjustment of sim -> scen
# the kind may differ depending on the variables
scen_block = xr.zeros_like(sim[{"time": ind_gw}])
for iv, v in enumerate(sim[pts_dims[0]].values):
sl = {"time": ind_gw, pts_dims[0]: iv}
with set_options(sdba_extra_output=False):
ADJ = base.train(
ref[sl], hist[sl], **base_kws_vars[v], skip_input_checks=True
)
scen_block[{pts_dims[0]: iv}] = ADJ.adjust(
sim[sl], **adj_kws, skip_input_checks=True
)

# 2. npdft adjustment of sim
npdft_block = xr.apply_ufunc(
_npdft_adjust,
standardize(sim[{"time": ind_gw}].copy(), dim="time")[0],
af_q[{gr_dim: ib}],
rot_matrices,
quantiles,
input_core_dims=[
[pts_dims[0]] + dims,
["iterations", pts_dims[1], "quantiles"],
["iterations", pts_dims[1], pts_dims[0]],
["quantiles"],
],
output_core_dims=[
[pts_dims[0]] + dims,
],
dask="parallelized",
output_dtypes=[sim.dtype],
kwargs={"method": interp, "extrap": extrapolation},
vectorize=True,
)

# 3. reorder scen according to npdft results
reordered = reordering(ref=npdft_block, sim=scen_block)
if win > 1:
# keep central value of window (intersecting indices in gw_idxs and g_idxs)
scen_mbcn[{"time": ind_g}] = reordered[{"time": np.in1d(ind_gw, ind_g)}]
else:
scen_mbcn[{"time": ind_g}] = reordered
# 1. univariate adjustment of sim -> scen
# the kind may differ depending on the variables
scen_block = xr.zeros_like(sim)
for iv, v in enumerate(sim[pts_dims[0]].values):
sl = {pts_dims[0]: iv}
with set_options(sdba_extra_output=False):
ADJ = base.train(
ref[sl], hist[sl], **base_kws_vars[v], skip_input_checks=True
)
scen_block[{pts_dims[0]: iv}] = ADJ.adjust(
sim[sl], **adj_kws, skip_input_checks=True
)

# 2. npdft adjustment of sim
npdft_block = xr.apply_ufunc(
_npdft_adjust,
standardize(sim, dim=dim)[0],
af_q,
rot_matrices,
quantiles,
input_core_dims=[
[pts_dims[0], dim],
["iterations", pts_dims[1], "quantiles"],
["iterations", pts_dims[1], pts_dims[0]],
["quantiles"],
],
output_core_dims=[
[pts_dims[0], dim],
],
dask="parallelized",
output_dtypes=[sim.dtype],
kwargs={"method": interp, "extrap": extrapolation},
vectorize=True,
)

# 3. reorder scen according to npdft results
scen_mbcn = reordering(ref=npdft_block, sim=scen_block)
return scen_mbcn.to_dataset(name="scen")


Expand Down
Loading
Loading