Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #37 - performance issues with system predict #38

Merged
merged 3 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 36 additions & 4 deletions src/amisc/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ class Component(BaseModel, Serializable):
_logger: Optional[logging.Logger] = None
_model_start_time: float = -1.0 # Temporarily store the most recent model start timestamp from call_model
_model_end_time: float = -1.0 # Temporarily store the most recent model end timestamp from call_model
_cache: dict = dict() # Temporary cache for faster access to training data and similar

def __init__(self, /, model, *args, inputs=None, outputs=None, name=None, **kwargs):
if name is None:
Expand Down Expand Up @@ -738,14 +739,40 @@ def _match_index_set(self, index_set, misc_coeff):

return index_set, misc_coeff

def cache(self, kind: list | Literal["training"] = "training"):
"""Cache data for quicker access. Only `"training"` is supported.

:param kind: the type(s) of data to cache (only "training" is supported). This will cache the
surrogate training data with nans removed.
"""
if not isinstance(kind, list):
kind = [kind]

if "training" in kind:
self._cache.setdefault("training", {})
y_vars = self._surrogate_outputs()
for alpha, beta in self.active_set.union(self.candidate_set):
self._cache["training"].setdefault(alpha, {})

if beta not in self._cache["training"][alpha]:
self._cache["training"][alpha][beta] = self.training_data.get(alpha, beta[:len(self.data_fidelity)],
y_vars=y_vars, skip_nan=True)

def clear_cache(self):
"""Clear cached data."""
self._cache.clear()

def get_training_data(self, alpha: Literal['best', 'worst'] | MultiIndex = 'best',
beta: Literal['best', 'worst'] | MultiIndex = 'best',
y_vars: list = None) -> tuple[Dataset, Dataset]:
y_vars: list = None,
cached: bool = False) -> tuple[Dataset, Dataset]:
"""Get all training data for a given multi-index pair `(alpha, beta)`.

:param alpha: the model fidelity index (defaults to the maximum available model fidelity)
:param beta: the surrogate fidelity index (defaults to the maximum available surrogate fidelity)
:param y_vars: the training data to return (defaults to all stored data)
:param cached: if True, will get cached training data if available (this will ignore `y_vars` and
only grab whatever is in the cache, which is surrogate outputs only and no nans)
:returns: `(xtrain, ytrain)` - the training data for the given multi-indices
"""
# Find the best alpha
Expand All @@ -769,7 +796,10 @@ def get_training_data(self, alpha: Literal['best', 'worst'] | MultiIndex = 'best
beta = (0,) * len(self.max_beta)

try:
return self.training_data.get(alpha, beta, y_vars=y_vars, skip_nan=True)
if cached and (data := self._cache.get("training", {}).get(alpha, {}).get(beta)) is not None:
return data
else:
return self.training_data.get(alpha, beta[:len(self.data_fidelity)], y_vars=y_vars, skip_nan=True)
except Exception as e:
self.logger.error(f"Error getting training data for alpha={alpha}, beta={beta}.")
raise e
Expand Down Expand Up @@ -1087,7 +1117,7 @@ def predict(self, inputs: dict | Dataset,
if np.abs(comb_coeff) > 0:
coeffs.append(comb_coeff)
args = (self.misc_states.get((alpha, beta)),
self.training_data.get(alpha, beta[:len(self.data_fidelity)], skip_nan=True, y_vars=y_vars))
self.get_training_data(alpha, beta, y_vars=y_vars, cached=True))

results.append(self.interpolator.predict(inputs, *args) if executor is None else
executor.submit(self.interpolator.predict, inputs, *args))
Expand Down Expand Up @@ -1210,6 +1240,7 @@ def activate_index(self, alpha: MultiIndex, beta: MultiIndex, model_dir: str | P
f"'{self.name}' new candidate indices: {indices}...")
model_outputs = self.call_model(model_inputs, model_fidelity=alpha_list, output_path=model_dir,
executor=executor, track_costs=True, **field_coords)
self.logger.info(f"Model evaluations complete for component '{self.name}'.")
errors = model_outputs.pop('errors', {})
else:
self._model_start_time = -1.0
Expand Down Expand Up @@ -1305,7 +1336,7 @@ def gradient(self, inputs: dict | Dataset,
coeffs.append(comb_coeff)
func = self.interpolator.gradient if derivative == 'first' else self.interpolator.hessian
args = (self.misc_states.get((alpha, beta)),
self.training_data.get(alpha, beta[:len(self.data_fidelity)], skip_nan=True, y_vars=y_vars))
self.get_training_data(alpha, beta, y_vars=y_vars, cached=True))

results.append(func(inputs, *args) if executor is None else executor.submit(func, inputs, *args))

Expand Down Expand Up @@ -1431,6 +1462,7 @@ def clear(self):
self.training_data.clear()
self._model_start_time = -1.0
self._model_end_time = -1.0
self.clear_cache()

def serialize(self, keep_yaml_objects: bool = False, serialize_args: dict[str, tuple] = None,
serialize_kwargs: dict[str: dict] = None) -> dict:
Expand Down
4 changes: 3 additions & 1 deletion src/amisc/compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,12 @@ def _iterate_coords_and_fields():
else:
shape = (1,)

always_skip_interp = not coords_obj_array and np.array_equal(field_coords, grid_coords) # must be exact match

all_states = np.empty(shape, dtype=object) # are you in good hands?

for j, f_coords, f_values in _iterate_coords_and_fields():
skip_interp = (f_coords.shape == grid_coords.shape and np.allclose(f_coords, grid_coords))
skip_interp = always_skip_interp or np.array_equal(f_coords, grid_coords) # exact even for floats

coords_shape = f_coords.shape[:-1]
loop_shape = next(iter(f_values.values())).shape[:-len(coords_shape)]
Expand Down
17 changes: 12 additions & 5 deletions src/amisc/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,12 +613,13 @@ def fit(self, targets: list = None,
max_iter: int = 20,
max_tol: float = 1e-3,
runtime_hr: float = 1.,
save_interval: int = 0,
estimate_bounds: bool = False,
update_bounds: bool = True,
test_set: tuple | str | Path = None,
start_test_check: int = None,
save_interval: int = 0,
plot_interval: int = 1,
cache_interval: int = 0,
executor: Executor = None,
weight_fcns: dict[str, callable] | Literal['pdf'] | None = 'pdf'):
"""Train the system surrogate adaptively by iterative refinement until an end condition is met.
Expand All @@ -629,8 +630,6 @@ def fit(self, targets: list = None,
:param max_tol: the max allowable value in relative L2 error to achieve
:param runtime_hr: the threshold wall clock time (hr) at which to stop further refinement (will go
until all models finish the current iteration)
:param save_interval: number of refinement steps between each progress save, none if 0; `System.root_dir`
must be specified to save to file
:param estimate_bounds: whether to estimate bounds for the coupling variables; will only try to estimate from
the `test_set` if provided (defaults to `True`). Otherwise, you should manually
provide domains for all coupling variables.
Expand All @@ -642,8 +641,12 @@ def fit(self, targets: list = None,
:param start_test_check: the iteration to start checking the test set error (defaults to the number
of components); surrogate evaluation isn't useful during initialization so you
should at least allow one iteration per component before checking test set error
:param save_interval: number of refinement steps between each progress save, none if 0; `System.root_dir`
must be specified to save to file
:param plot_interval: how often to plot the error indicator and test set error (defaults to every iteration);
will only plot and save to file if a root directory is set
:param cache_interval: how often to cache component data in order to speed up future training iterations (at
the cost of additional memory usage); defaults to 0 (no caching)
:param executor: a `concurrent.futures.Executor` object to parallelize model evaluations (optional, but
recommended for expensive models)
:param weight_fcns: a `dict` of weight functions to apply to each input variable for training data selection;
Expand Down Expand Up @@ -756,6 +759,10 @@ def fit(self, targets: list = None,
os.mkdir(pth)
self.save_to_file(f'{iter_name}.yml', save_dir=pth) # Save to an iteration-specific directory

if cache_interval > 0 and self.refine_level % cache_interval == 0:
for comp in self.components:
comp.cache()

# Check all end conditions
if self.refine_level >= max_iter:
self._print_title_str(f'Termination criteria reached: Max iteration {self.refine_level}/{max_iter}')
Expand Down Expand Up @@ -787,7 +794,7 @@ def fit(self, targets: list = None,

self.logger.info(f'Final system surrogate: \n {self}')

def test_set_performance(self, xtest: Dataset, ytest: Dataset, index_set='train') -> Dataset:
def test_set_performance(self, xtest: Dataset, ytest: Dataset, index_set='test') -> Dataset:
"""Compute the relative L2 error on a test set for the given target output variables.

:param xtest: `dict` of test set input samples (unnormalized)
Expand Down Expand Up @@ -897,7 +904,7 @@ def refine(self, targets: list = None, num_refine: int = 100, update_bounds: boo

delta_error = np.nanmax([np.nanmax(error[var]) for var in error]) # Max error over all target QoIs
num_evals = comp.get_cost(alpha, beta)
delta_work = comp.model_costs.get(alpha_star, 1.) * num_evals # Cpu time (s)
delta_work = comp.model_costs.get(alpha, 1.) * num_evals # Cpu time (s)
error_indicator = delta_error / delta_work

self.logger.info(f"Candidate multi-index: {(alpha, beta)}. Relative error: {delta_error}. "
Expand Down
Loading
Loading