diff --git a/.idea/amisc.iml b/.idea/amisc.iml index c65d086..a70e6cc 100644 --- a/.idea/amisc.iml +++ b/.idea/amisc.iml @@ -9,7 +9,7 @@ - + \ No newline at end of file diff --git a/pdm.lock b/pdm.lock index c5d2390..23f4787 100644 --- a/pdm.lock +++ b/pdm.lock @@ -5,7 +5,7 @@ groups = ["default", "dev"] strategy = ["cross_platform"] lock_version = "4.4.1" -content_hash = "sha256:c985783d4849d85a7bc339394b6e7655e676f88f862905e072111fae03103ef1" +content_hash = "sha256:91c38e6797b32439e269ef891a9c650f1d36bf54f44cba2e683ec1df087da3a3" [[package]] name = "astroid" diff --git a/pdm.toml b/pdm.toml new file mode 100644 index 0000000..e2874a6 --- /dev/null +++ b/pdm.toml @@ -0,0 +1,2 @@ +[install] +cache = true diff --git a/pyproject.toml b/pyproject.toml index 52517e9..e0e3e78 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,6 +13,7 @@ dependencies = [ "scipy>=1.11.4", "matplotlib>=3.8.2", "uqtils>=0.1.0", + "joblib>=1.3.2", ] requires-python = ">=3.11" readme = "docs/README.md" diff --git a/src/amisc/__init__.py b/src/amisc/__init__.py index e35bf27..f1129d5 100644 --- a/src/amisc/__init__.py +++ b/src/amisc/__init__.py @@ -8,7 +8,7 @@ from amisc.interpolator import BaseInterpolator from amisc.rv import BaseRV -__version__ = "0.2.1" +__version__ = "0.3.0" # Custom types that are used frequently IndexSet = list[tuple[tuple, tuple]] diff --git a/src/amisc/component.py b/src/amisc/component.py index 4a6b3a4..e4c73ac 100644 --- a/src/amisc/component.py +++ b/src/amisc/component.py @@ -17,11 +17,14 @@ from pathlib import Path from abc import ABC, abstractmethod from concurrent.futures import Executor, ALL_COMPLETED, wait +import tempfile +import os import numpy as np from sklearn.linear_model import Ridge from sklearn.pipeline import Pipeline from sklearn.preprocessing import MaxAbsScaler +from joblib import delayed from amisc.utils import get_logger from amisc.rv import BaseRV @@ -237,7 +240,7 @@ def iterate_candidates(self): del self.index_set[-1] def predict(self, x: np.ndarray | float, use_model: str | tuple = None, model_dir: str | Path = None, - training: bool = False, index_set: IndexSet = None) -> np.ndarray: + training: bool = False, index_set: IndexSet = None, ppool=None) -> np.ndarray: """Evaluate the MISC approximation at new points `x`. !!! Note @@ -249,6 +252,7 @@ def predict(self, x: np.ndarray | float, use_model: str | tuple = None, model_di :param model_dir: directory to save output files if `use_model` is specified, ignored otherwise :param training: if `True`, then only compute with the active index set, otherwise use all candidates as well :param index_set: a list of concatenated $(\\alpha, \\beta)$ to override `self.index_set` if given, else ignore + :param ppool: a joblib `Parallel` pool to loop over multi-indices in parallel :returns y: `(..., y_dim)` the surrogate approximation of the function (or the function itself if `use_model`) """ x = np.atleast_1d(x) @@ -257,13 +261,26 @@ def predict(self, x: np.ndarray | float, use_model: str | tuple = None, model_di index_set, misc_coeff = self._combination(index_set, training) # Choose the correct index set and misc_coeff - y = np.zeros(x.shape[:-1] + (self.ydim,), dtype=x.dtype) - for alpha, beta in index_set: + def run_batch(alpha, beta, y): comb_coeff = misc_coeff[str(alpha)][str(beta)] if np.abs(comb_coeff) > 0: func = self.surrogates[str(alpha)][str(beta)] y += int(comb_coeff) * func(x) + if ppool is not None: + with tempfile.NamedTemporaryFile(suffix='.dat', mode='w+b', delete=False) as y_fd: + pass + y_ret = np.memmap(y_fd.name, dtype=x.dtype, mode='r+', shape=x.shape[:-1] + (self.ydim,)) + res = ppool(delayed(run_batch)(alpha, beta, y_ret) for alpha, beta in index_set) + y = np.empty(y_ret.shape, dtype=x.dtype) + y[:] = y_ret[:] + del y_ret + os.unlink(y_fd.name) + else: + y = np.zeros(x.shape[:-1] + (self.ydim,), dtype=x.dtype) + for alpha, beta in index_set: + run_batch(alpha, beta, y) + return y def grad(self, x: np.ndarray | float | list, training: bool = False, index_set: IndexSet = None) -> np.ndarray: @@ -597,7 +614,7 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Override - def predict(self, x, use_model=None, model_dir=None, training=False, index_set=None): + def predict(self, x, use_model=None, model_dir=None, training=False, index_set=None, ppool=None): """Need to override `super()` to allow passing in interpolation grids `xi` and `yi`.""" x = np.atleast_1d(x) if use_model is not None: @@ -605,8 +622,7 @@ def predict(self, x, use_model=None, model_dir=None, training=False, index_set=N index_set, misc_coeff = self._combination(index_set, training) - y = np.zeros(x.shape[:-1] + (self.ydim,), dtype=x.dtype) - for alpha, beta in index_set: + def run_batch(alpha, beta, y): comb_coeff = misc_coeff[str(alpha)][str(beta)] if np.abs(comb_coeff) > 0: # Gather the xi/yi interpolation points/qoi_ind for this sub tensor-product grid @@ -616,6 +632,20 @@ def predict(self, x, use_model=None, model_dir=None, training=False, index_set=N # Add this sub tensor-product grid to the MISC approximation y += int(comb_coeff) * interp(x, xi=xi, yi=yi) + if ppool is not None: + with tempfile.NamedTemporaryFile(suffix='.dat', mode='w+b', delete=False) as y_fd: + pass + y_ret = np.memmap(y_fd.name, dtype=x.dtype, mode='r+', shape=x.shape[:-1] + (self.ydim,)) + res = ppool(delayed(run_batch)(alpha, beta, y_ret) for alpha, beta in index_set) + y = np.empty(y_ret.shape, dtype=x.dtype) + y[:] = y_ret[:] + del y_ret + os.unlink(y_fd.name) + else: + y = np.zeros(x.shape[:-1] + (self.ydim,), dtype=x.dtype) + for alpha, beta in index_set: + run_batch(alpha, beta, y) + return y # Override diff --git a/src/amisc/system.py b/src/amisc/system.py index 9af258b..893e92a 100644 --- a/src/amisc/system.py +++ b/src/amisc/system.py @@ -764,7 +764,8 @@ def compute_error(alpha, beta): def predict(self, x: np.ndarray | float, max_fpi_iter: int = 100, anderson_mem: int = 10, fpi_tol: float = 1e-10, use_model: str | tuple | dict = None, model_dir: str | Path = None, verbose: bool = False, - training: bool = False, index_set: dict[str: IndexSet] = None, qoi_ind: IndicesRV = None) -> np.ndarray: + training: bool = False, index_set: dict[str: IndexSet] = None, qoi_ind: IndicesRV = None, + ppool=None) -> np.ndarray: """Evaluate the system surrogate at exogenous inputs `x`. !!! Warning @@ -785,6 +786,7 @@ def predict(self, x: np.ndarray | float, max_fpi_iter: int = 100, anderson_mem: :param training: whether to call the system surrogate in training or evaluation mode, ignored if `use_model` :param index_set: `dict(node=[indices])` to override default index set for a node (only useful for parallel) :param qoi_ind: list of qoi indices to return, defaults to returning all system `coupling_vars` + :param ppool: a joblib `Parallel` instance to pass to each component to loop over multi-indices in parallel :returns y: `(..., y_dim)` the surrogate approximation of the system QoIs """ # Allocate space for all system outputs (just save all coupling vars) @@ -839,7 +841,8 @@ def predict(self, x: np.ndarray | float, max_fpi_iter: int = 100, anderson_mem: output_dir = Path(model_dir) / scc[0] os.mkdir(output_dir) comp_output = node_obj['surrogate'](comp_input[valid_idx, :], use_model=use_model.get(scc[0]), - model_dir=output_dir, training=training, index_set=indices) + model_dir=output_dir, training=training, index_set=indices, + ppool=ppool) for local_i, global_i in enumerate(node_obj['global_out']): y[valid_idx, global_i] = comp_output[..., local_i] is_computed[global_i] = True @@ -890,7 +893,8 @@ def predict(self, x: np.ndarray | float, max_fpi_iter: int = 100, anderson_mem: # Compute component outputs (just don't do this FPI with the real models, please..) indices = index_set.get(node, None) if index_set is not None else None comp_output = node_obj['surrogate'](comp_input[valid_idx, :], use_model=use_model.get(node), - model_dir=None, training=training, index_set=indices) + model_dir=None, training=training, index_set=indices, + ppool=ppool) global_idx = node_obj['global_out'] for local_i, global_i in enumerate(global_idx): x_couple_next[valid_idx, global_i] = comp_output[..., local_i]