Skip to content

Commit

Permalink
feat: adding ppool option for surrogate evaluation
Browse files Browse the repository at this point in the history
  • Loading branch information
eckelsjd committed Feb 20, 2024
1 parent 2f60056 commit 7422206
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 12 deletions.
2 changes: 1 addition & 1 deletion .idea/amisc.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pdm.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pdm.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[install]
cache = true
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dependencies = [
"scipy>=1.11.4",
"matplotlib>=3.8.2",
"uqtils>=0.1.0",
"joblib>=1.3.2",
]
requires-python = ">=3.11"
readme = "docs/README.md"
Expand Down
2 changes: 1 addition & 1 deletion src/amisc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from amisc.interpolator import BaseInterpolator
from amisc.rv import BaseRV

__version__ = "0.2.1"
__version__ = "0.3.0"

# Custom types that are used frequently
IndexSet = list[tuple[tuple, tuple]]
Expand Down
42 changes: 36 additions & 6 deletions src/amisc/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
from pathlib import Path
from abc import ABC, abstractmethod
from concurrent.futures import Executor, ALL_COMPLETED, wait
import tempfile
import os

import numpy as np
from sklearn.linear_model import Ridge
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import MaxAbsScaler
from joblib import delayed

from amisc.utils import get_logger
from amisc.rv import BaseRV
Expand Down Expand Up @@ -237,7 +240,7 @@ def iterate_candidates(self):
del self.index_set[-1]

def predict(self, x: np.ndarray | float, use_model: str | tuple = None, model_dir: str | Path = None,
training: bool = False, index_set: IndexSet = None) -> np.ndarray:
training: bool = False, index_set: IndexSet = None, ppool=None) -> np.ndarray:
"""Evaluate the MISC approximation at new points `x`.
!!! Note
Expand All @@ -249,6 +252,7 @@ def predict(self, x: np.ndarray | float, use_model: str | tuple = None, model_di
:param model_dir: directory to save output files if `use_model` is specified, ignored otherwise
:param training: if `True`, then only compute with the active index set, otherwise use all candidates as well
:param index_set: a list of concatenated $(\\alpha, \\beta)$ to override `self.index_set` if given, else ignore
:param ppool: a joblib `Parallel` pool to loop over multi-indices in parallel
:returns y: `(..., y_dim)` the surrogate approximation of the function (or the function itself if `use_model`)
"""
x = np.atleast_1d(x)
Expand All @@ -257,13 +261,26 @@ def predict(self, x: np.ndarray | float, use_model: str | tuple = None, model_di

index_set, misc_coeff = self._combination(index_set, training) # Choose the correct index set and misc_coeff

y = np.zeros(x.shape[:-1] + (self.ydim,), dtype=x.dtype)
for alpha, beta in index_set:
def run_batch(alpha, beta, y):
comb_coeff = misc_coeff[str(alpha)][str(beta)]
if np.abs(comb_coeff) > 0:
func = self.surrogates[str(alpha)][str(beta)]
y += int(comb_coeff) * func(x)

if ppool is not None:
with tempfile.NamedTemporaryFile(suffix='.dat', mode='w+b', delete=False) as y_fd:
pass
y_ret = np.memmap(y_fd.name, dtype=x.dtype, mode='r+', shape=x.shape[:-1] + (self.ydim,))
res = ppool(delayed(run_batch)(alpha, beta, y_ret) for alpha, beta in index_set)
y = np.empty(y_ret.shape, dtype=x.dtype)
y[:] = y_ret[:]
del y_ret
os.unlink(y_fd.name)
else:
y = np.zeros(x.shape[:-1] + (self.ydim,), dtype=x.dtype)
for alpha, beta in index_set:
run_batch(alpha, beta, y)

return y

def grad(self, x: np.ndarray | float | list, training: bool = False, index_set: IndexSet = None) -> np.ndarray:
Expand Down Expand Up @@ -597,16 +614,15 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

# Override
def predict(self, x, use_model=None, model_dir=None, training=False, index_set=None):
def predict(self, x, use_model=None, model_dir=None, training=False, index_set=None, ppool=None):
"""Need to override `super()` to allow passing in interpolation grids `xi` and `yi`."""
x = np.atleast_1d(x)
if use_model is not None:
return self._bypass_surrogate(x, use_model, model_dir)

index_set, misc_coeff = self._combination(index_set, training)

y = np.zeros(x.shape[:-1] + (self.ydim,), dtype=x.dtype)
for alpha, beta in index_set:
def run_batch(alpha, beta, y):
comb_coeff = misc_coeff[str(alpha)][str(beta)]
if np.abs(comb_coeff) > 0:
# Gather the xi/yi interpolation points/qoi_ind for this sub tensor-product grid
Expand All @@ -616,6 +632,20 @@ def predict(self, x, use_model=None, model_dir=None, training=False, index_set=N
# Add this sub tensor-product grid to the MISC approximation
y += int(comb_coeff) * interp(x, xi=xi, yi=yi)

if ppool is not None:
with tempfile.NamedTemporaryFile(suffix='.dat', mode='w+b', delete=False) as y_fd:
pass
y_ret = np.memmap(y_fd.name, dtype=x.dtype, mode='r+', shape=x.shape[:-1] + (self.ydim,))
res = ppool(delayed(run_batch)(alpha, beta, y_ret) for alpha, beta in index_set)
y = np.empty(y_ret.shape, dtype=x.dtype)
y[:] = y_ret[:]
del y_ret
os.unlink(y_fd.name)
else:
y = np.zeros(x.shape[:-1] + (self.ydim,), dtype=x.dtype)
for alpha, beta in index_set:
run_batch(alpha, beta, y)

return y

# Override
Expand Down
10 changes: 7 additions & 3 deletions src/amisc/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,8 @@ def compute_error(alpha, beta):

def predict(self, x: np.ndarray | float, max_fpi_iter: int = 100, anderson_mem: int = 10, fpi_tol: float = 1e-10,
use_model: str | tuple | dict = None, model_dir: str | Path = None, verbose: bool = False,
training: bool = False, index_set: dict[str: IndexSet] = None, qoi_ind: IndicesRV = None) -> np.ndarray:
training: bool = False, index_set: dict[str: IndexSet] = None, qoi_ind: IndicesRV = None,
ppool=None) -> np.ndarray:
"""Evaluate the system surrogate at exogenous inputs `x`.
!!! Warning
Expand All @@ -785,6 +786,7 @@ def predict(self, x: np.ndarray | float, max_fpi_iter: int = 100, anderson_mem:
:param training: whether to call the system surrogate in training or evaluation mode, ignored if `use_model`
:param index_set: `dict(node=[indices])` to override default index set for a node (only useful for parallel)
:param qoi_ind: list of qoi indices to return, defaults to returning all system `coupling_vars`
:param ppool: a joblib `Parallel` instance to pass to each component to loop over multi-indices in parallel
:returns y: `(..., y_dim)` the surrogate approximation of the system QoIs
"""
# Allocate space for all system outputs (just save all coupling vars)
Expand Down Expand Up @@ -839,7 +841,8 @@ def predict(self, x: np.ndarray | float, max_fpi_iter: int = 100, anderson_mem:
output_dir = Path(model_dir) / scc[0]
os.mkdir(output_dir)
comp_output = node_obj['surrogate'](comp_input[valid_idx, :], use_model=use_model.get(scc[0]),
model_dir=output_dir, training=training, index_set=indices)
model_dir=output_dir, training=training, index_set=indices,
ppool=ppool)
for local_i, global_i in enumerate(node_obj['global_out']):
y[valid_idx, global_i] = comp_output[..., local_i]
is_computed[global_i] = True
Expand Down Expand Up @@ -890,7 +893,8 @@ def predict(self, x: np.ndarray | float, max_fpi_iter: int = 100, anderson_mem:
# Compute component outputs (just don't do this FPI with the real models, please..)
indices = index_set.get(node, None) if index_set is not None else None
comp_output = node_obj['surrogate'](comp_input[valid_idx, :], use_model=use_model.get(node),
model_dir=None, training=training, index_set=indices)
model_dir=None, training=training, index_set=indices,
ppool=ppool)
global_idx = node_obj['global_out']
for local_i, global_i in enumerate(global_idx):
x_couple_next[valid_idx, global_i] = comp_output[..., local_i]
Expand Down

0 comments on commit 7422206

Please sign in to comment.