diff --git a/src/amisc/component.py b/src/amisc/component.py index 24afcb2..54aad7e 100644 --- a/src/amisc/component.py +++ b/src/amisc/component.py @@ -415,6 +415,7 @@ class Component(BaseModel, Serializable): _logger: Optional[logging.Logger] = None _model_start_time: float = -1.0 # Temporarily store the most recent model start timestamp from call_model _model_end_time: float = -1.0 # Temporarily store the most recent model end timestamp from call_model + _cache: dict = dict() # Temporary cache for faster access to training data and similar def __init__(self, /, model, *args, inputs=None, outputs=None, name=None, **kwargs): if name is None: @@ -738,14 +739,40 @@ def _match_index_set(self, index_set, misc_coeff): return index_set, misc_coeff + def cache(self, kind: list | Literal["training"] = "training"): + """Cache data for quicker access. Only `"training"` is supported. + + :param kind: the type(s) of data to cache (only "training" is supported). This will cache the + surrogate training data with nans removed. + """ + if not isinstance(kind, list): + kind = [kind] + + if "training" in kind: + self._cache.setdefault("training", {}) + y_vars = self._surrogate_outputs() + for alpha, beta in self.active_set.union(self.candidate_set): + self._cache["training"].setdefault(alpha, {}) + + if beta not in self._cache["training"][alpha]: + self._cache["training"][alpha][beta] = self.training_data.get(alpha, beta[:len(self.data_fidelity)], + y_vars=y_vars, skip_nan=True) + + def clear_cache(self): + """Clear cached data.""" + self._cache.clear() + def get_training_data(self, alpha: Literal['best', 'worst'] | MultiIndex = 'best', beta: Literal['best', 'worst'] | MultiIndex = 'best', - y_vars: list = None) -> tuple[Dataset, Dataset]: + y_vars: list = None, + cached: bool = False) -> tuple[Dataset, Dataset]: """Get all training data for a given multi-index pair `(alpha, beta)`. :param alpha: the model fidelity index (defaults to the maximum available model fidelity) :param beta: the surrogate fidelity index (defaults to the maximum available surrogate fidelity) :param y_vars: the training data to return (defaults to all stored data) + :param cached: if True, will get cached training data if available (this will ignore `y_vars` and + only grab whatever is in the cache, which is surrogate outputs only and no nans) :returns: `(xtrain, ytrain)` - the training data for the given multi-indices """ # Find the best alpha @@ -769,7 +796,10 @@ def get_training_data(self, alpha: Literal['best', 'worst'] | MultiIndex = 'best beta = (0,) * len(self.max_beta) try: - return self.training_data.get(alpha, beta, y_vars=y_vars, skip_nan=True) + if cached and (data := self._cache.get("training", {}).get(alpha, {}).get(beta)) is not None: + return data + else: + return self.training_data.get(alpha, beta[:len(self.data_fidelity)], y_vars=y_vars, skip_nan=True) except Exception as e: self.logger.error(f"Error getting training data for alpha={alpha}, beta={beta}.") raise e @@ -1087,7 +1117,7 @@ def predict(self, inputs: dict | Dataset, if np.abs(comb_coeff) > 0: coeffs.append(comb_coeff) args = (self.misc_states.get((alpha, beta)), - self.training_data.get(alpha, beta[:len(self.data_fidelity)], skip_nan=True, y_vars=y_vars)) + self.get_training_data(alpha, beta, y_vars=y_vars, cached=True)) results.append(self.interpolator.predict(inputs, *args) if executor is None else executor.submit(self.interpolator.predict, inputs, *args)) @@ -1210,6 +1240,7 @@ def activate_index(self, alpha: MultiIndex, beta: MultiIndex, model_dir: str | P f"'{self.name}' new candidate indices: {indices}...") model_outputs = self.call_model(model_inputs, model_fidelity=alpha_list, output_path=model_dir, executor=executor, track_costs=True, **field_coords) + self.logger.info(f"Model evaluations complete for component '{self.name}'.") errors = model_outputs.pop('errors', {}) else: self._model_start_time = -1.0 @@ -1305,7 +1336,7 @@ def gradient(self, inputs: dict | Dataset, coeffs.append(comb_coeff) func = self.interpolator.gradient if derivative == 'first' else self.interpolator.hessian args = (self.misc_states.get((alpha, beta)), - self.training_data.get(alpha, beta[:len(self.data_fidelity)], skip_nan=True, y_vars=y_vars)) + self.get_training_data(alpha, beta, y_vars=y_vars, cached=True)) results.append(func(inputs, *args) if executor is None else executor.submit(func, inputs, *args)) @@ -1431,6 +1462,7 @@ def clear(self): self.training_data.clear() self._model_start_time = -1.0 self._model_end_time = -1.0 + self.clear_cache() def serialize(self, keep_yaml_objects: bool = False, serialize_args: dict[str, tuple] = None, serialize_kwargs: dict[str: dict] = None) -> dict: diff --git a/src/amisc/compression.py b/src/amisc/compression.py index a2834e8..8b1ce0f 100644 --- a/src/amisc/compression.py +++ b/src/amisc/compression.py @@ -211,10 +211,12 @@ def _iterate_coords_and_fields(): else: shape = (1,) + always_skip_interp = not coords_obj_array and np.array_equal(field_coords, grid_coords) # must be exact match + all_states = np.empty(shape, dtype=object) # are you in good hands? for j, f_coords, f_values in _iterate_coords_and_fields(): - skip_interp = (f_coords.shape == grid_coords.shape and np.allclose(f_coords, grid_coords)) + skip_interp = always_skip_interp or np.array_equal(f_coords, grid_coords) # exact even for floats coords_shape = f_coords.shape[:-1] loop_shape = next(iter(f_values.values())).shape[:-len(coords_shape)] diff --git a/src/amisc/system.py b/src/amisc/system.py index 05a228f..1cac5a1 100644 --- a/src/amisc/system.py +++ b/src/amisc/system.py @@ -613,12 +613,13 @@ def fit(self, targets: list = None, max_iter: int = 20, max_tol: float = 1e-3, runtime_hr: float = 1., - save_interval: int = 0, estimate_bounds: bool = False, update_bounds: bool = True, test_set: tuple | str | Path = None, start_test_check: int = None, + save_interval: int = 0, plot_interval: int = 1, + cache_interval: int = 0, executor: Executor = None, weight_fcns: dict[str, callable] | Literal['pdf'] | None = 'pdf'): """Train the system surrogate adaptively by iterative refinement until an end condition is met. @@ -629,8 +630,6 @@ def fit(self, targets: list = None, :param max_tol: the max allowable value in relative L2 error to achieve :param runtime_hr: the threshold wall clock time (hr) at which to stop further refinement (will go until all models finish the current iteration) - :param save_interval: number of refinement steps between each progress save, none if 0; `System.root_dir` - must be specified to save to file :param estimate_bounds: whether to estimate bounds for the coupling variables; will only try to estimate from the `test_set` if provided (defaults to `True`). Otherwise, you should manually provide domains for all coupling variables. @@ -642,8 +641,12 @@ def fit(self, targets: list = None, :param start_test_check: the iteration to start checking the test set error (defaults to the number of components); surrogate evaluation isn't useful during initialization so you should at least allow one iteration per component before checking test set error + :param save_interval: number of refinement steps between each progress save, none if 0; `System.root_dir` + must be specified to save to file :param plot_interval: how often to plot the error indicator and test set error (defaults to every iteration); will only plot and save to file if a root directory is set + :param cache_interval: how often to cache component data in order to speed up future training iterations (at + the cost of additional memory usage); defaults to 0 (no caching) :param executor: a `concurrent.futures.Executor` object to parallelize model evaluations (optional, but recommended for expensive models) :param weight_fcns: a `dict` of weight functions to apply to each input variable for training data selection; @@ -756,6 +759,10 @@ def fit(self, targets: list = None, os.mkdir(pth) self.save_to_file(f'{iter_name}.yml', save_dir=pth) # Save to an iteration-specific directory + if cache_interval > 0 and self.refine_level % cache_interval == 0: + for comp in self.components: + comp.cache() + # Check all end conditions if self.refine_level >= max_iter: self._print_title_str(f'Termination criteria reached: Max iteration {self.refine_level}/{max_iter}') @@ -787,7 +794,7 @@ def fit(self, targets: list = None, self.logger.info(f'Final system surrogate: \n {self}') - def test_set_performance(self, xtest: Dataset, ytest: Dataset, index_set='train') -> Dataset: + def test_set_performance(self, xtest: Dataset, ytest: Dataset, index_set='test') -> Dataset: """Compute the relative L2 error on a test set for the given target output variables. :param xtest: `dict` of test set input samples (unnormalized) @@ -897,7 +904,7 @@ def refine(self, targets: list = None, num_refine: int = 100, update_bounds: boo delta_error = np.nanmax([np.nanmax(error[var]) for var in error]) # Max error over all target QoIs num_evals = comp.get_cost(alpha, beta) - delta_work = comp.model_costs.get(alpha_star, 1.) * num_evals # Cpu time (s) + delta_work = comp.model_costs.get(alpha, 1.) * num_evals # Cpu time (s) error_indicator = delta_error / delta_work self.logger.info(f"Candidate multi-index: {(alpha, beta)}. Relative error: {delta_error}. " diff --git a/src/amisc/training.py b/src/amisc/training.py index 570625f..ddb1fc5 100644 --- a/src/amisc/training.py +++ b/src/amisc/training.py @@ -185,53 +185,51 @@ def get_by_coord(self, alpha: MultiIndex, coords: list, y_vars: list = None, ski :param alpha: the model fidelity indices :param coords: a list of grid coordinates to locate the `yi` values in the sparse grid data structure :param y_vars: the keys of the outputs to return (if `None`, return all outputs) - :param skip_nan: skip any data points with remaining `nan` values if `skip_nan=True` + :param skip_nan: skip any data points with remaining `nan` values if `skip_nan=True` (only for numeric outputs) :returns: `dicts` of model inputs `xi_dict` and outputs `yi_dict` """ - xi_dict = {} + N = len(coords) + is_numeric = {} + is_singleton = {} + xi_dict = self._extract_grid_points(coords) yi_dict = {} - y_squeeze = {} # Keep track of whether to squeeze extra output dimensions (i.e. non field quantities) - for coord in coords: + + first_yi = next(iter(self.yi_map[alpha].values())) + if y_vars is None: + y_vars = first_yi.keys() + + for var in y_vars: + yi = np.atleast_1d(first_yi[var]) + is_numeric[var] = self._is_numeric(yi) + is_singleton[var] = self._is_singleton(yi) + yi_dict[var] = np.empty(N, dtype=np.float64 if is_numeric[var] and is_singleton[var] else object) + + for i, coord in enumerate(coords): try: - skip_coord = False - yi_curr = copy.deepcopy(self.yi_map[alpha][coord]) - output_vars = self._numeric_outputs(yi_curr) - for var in output_vars: - if np.any(np.isnan(yi_curr[var])): - yi_curr[var] = self.yi_nan_map[alpha].get(coord, yi_curr)[var] - if skip_nan and np.any(np.isnan(yi_curr[var])): - skip_coord = True - break - - if not skip_coord: - self._append_grid_points(coord, xi_dict) - yi_vars = y_vars if y_vars is not None else yi_curr.keys() - for var in yi_vars: - yi = np.atleast_1d(yi_curr[var]) - y_squeeze[var] = self._is_singleton(yi) # squeeze for scalar quantities - if y_squeeze[var]: - yi = np.expand_dims(yi, axis=0) - yi_dict[var] = yi if yi_dict.get(var) is None else np.concatenate((yi_dict[var], yi), - axis=0) - else: - # Object arrays for field qtys - yi_dict[var] = np.array([None], dtype=object) if yi_dict.get(var) is None else \ - np.concatenate((yi_dict[var], [None]), axis=0) - yi_dict[var][-1] = yi + yi_curr = self.yi_map[alpha][coord] + for var in y_vars: + yi = arr if (arr := self.yi_nan_map[alpha].get(coord, {}).get(var)) is not None else yi_curr[var] + yi_dict[var][i] = yi if is_singleton[var] else np.atleast_1d(yi) + except KeyError as e: raise KeyError(f"Can't access sparse grid data for alpha={alpha}, coord={coord}. " f"Make sure the data has been set first.") from e - # Squeeze out extra dimension for scalar quantities - for var in yi_dict.keys(): - if y_squeeze[var]: - yi_dict[var] = np.squeeze(yi_dict[var], axis=-1) + # Delete nans if requested (only for numeric singleton outputs) + if skip_nan: + nan_idx = np.full(N, False) + for var in y_vars: + if is_numeric[var] and is_singleton[var]: + nan_idx |= np.isnan(yi_dict[var]) + + xi_dict = {k: v[~nan_idx] for k, v in xi_dict.items()} + yi_dict = {k: v[~nan_idx] for k, v in yi_dict.items()} return xi_dict, yi_dict # Both with elements of shape (N, ...) for N grid points def get(self, alpha: MultiIndex, beta: MultiIndex, y_vars: list[str] = None, skip_nan: bool = False): """Get the training data from the sparse grid for a given `alpha` and `beta` pair.""" - return self.get_by_coord(alpha, self._expand_grid_coords(beta), y_vars=y_vars, skip_nan=skip_nan) + return self.get_by_coord(alpha, list(self._expand_grid_coords(beta)), y_vars=y_vars, skip_nan=skip_nan) def set_errors(self, alpha: MultiIndex, beta: MultiIndex, coords: list, errors: list[dict]): """Store error information in the sparse-grid for a given multi-index pair.""" @@ -256,16 +254,18 @@ def set(self, alpha: MultiIndex, beta: MultiIndex, coords: list, yi_dict: dict[s def impute_missing_data(self, alpha: MultiIndex, beta: MultiIndex): """Impute missing values in the sparse grid for a given multi-index pair by linear regression imputation.""" imputer, xi_all, yi_all = None, None, None - for coord, yi_dict in self.yi_map[alpha].items(): - # only impute (small-length) numeric quantities - output_vars = [var for var in self._numeric_outputs(yi_dict) - if len(np.ravel(yi_dict[var])) <= self.MAX_IMPUTE_SIZE] + # only impute (small-length) numeric quantities + yi_dict = next(iter(self.yi_map[alpha].values())) + output_vars = [var for var in self._numeric_outputs(yi_dict) + if len(np.ravel(yi_dict[var])) <= self.MAX_IMPUTE_SIZE] + + for coord, yi_dict in self.yi_map[alpha].items(): if any([np.any(np.isnan(yi_dict[var])) for var in output_vars]): if imputer is None: # Grab all 'good' interpolation points and train a simple linear regression fit xi_all, yi_all = self.get(alpha, beta, y_vars=output_vars, skip_nan=True) - if len(xi_all) == 0: + if len(xi_all) == 0 or len(next(iter(xi_all.values()))) == 0: continue # possible if no good data has been set yet N = next(iter(xi_all.values())).shape[0] # number of grid points @@ -278,7 +278,7 @@ def impute_missing_data(self, alpha: MultiIndex, beta: MultiIndex): imputer.fit(xi_mat, yi_mat) # Run the imputer for this coordinate - x_interp = self._append_grid_points(coord) + x_interp = self._extract_grid_points(coord) x_interp = np.concatenate([x_interp[var][:, np.newaxis] if len(x_interp[var].shape) == 1 else x_interp[var] for var in x_interp.keys()], axis=-1) y_interp = imputer.predict(x_interp) @@ -339,7 +339,7 @@ def refine(self, alpha: MultiIndex, beta: MultiIndex, input_domains: dict, weigh self.yi_nan_map.setdefault(alpha, dict()) self.error_map.setdefault(alpha, dict()) new_coords = list(self._expand_grid_coords(beta)) - return new_coords, self._append_grid_points(new_coords[0]) + return new_coords, self._extract_grid_points(new_coords) # Otherwise, refine the sparse grid for beta_old in self.betas: @@ -368,31 +368,32 @@ def refine(self, alpha: MultiIndex, beta: MultiIndex, input_domains: dict, weigh break new_coords = [] - new_pts = {} for coord in self._expand_grid_coords(beta): if coord not in self.yi_map[alpha]: # If we have not computed this grid coordinate yet - self._append_grid_points(coord, new_pts) new_coords.append(coord) + new_pts = self._extract_grid_points(new_coords) + self.betas.add(beta) return new_coords, new_pts - def _append_grid_points(self, coord: tuple, pts: dict = None): - """Extract the `x` grid point located at `coord` from `x_grids` and append to the `pts` dictionary.""" - if pts is None: - pts = {} - grids = iter(self.x_grids.items()) - for idx in coord: - if isinstance(idx, int): # scalar grid point - var, grid = next(grids) - new_pt = np.atleast_1d(grid[idx]) - pts[var] = new_pt if pts.get(var) is None else np.concatenate((pts[var], new_pt), axis=0) - else: # latent coefficients - for i in idx: + def _extract_grid_points(self, coords: list[tuple] | tuple): + """Extract the `x` grid points located at `coords` from `x_grids` and return as the `pts` dictionary.""" + if not isinstance(coords, list): + coords = [coords] + pts = {var: np.empty(len(coords)) for var in self.x_grids} + + for k, coord in enumerate(coords): + grids = iter(self.x_grids.items()) + for idx in coord: + if isinstance(idx, int): # scalar grid point var, grid = next(grids) - new_pt = np.atleast_1d(grid[i]) - pts[var] = new_pt if pts.get(var) is None else np.concatenate((pts[var], new_pt), axis=0) + pts[var][k] = grid[idx] + else: # latent coefficients + for i in idx: + var, grid = next(grids) + pts[var][k] = grid[i] return pts diff --git a/tests/test_system.py b/tests/test_system.py index dd93179..ad32fc8 100644 --- a/tests/test_system.py +++ b/tests/test_system.py @@ -314,8 +314,8 @@ def test_simulate_fit(): train_error = {var: relative_error(y_surr_train[var], ytest[var]) for var in ytest} test_error = {var: relative_error(y_surr_test[var], ytest[var]) for var in ytest} - # Training error should be the same as was originally computed during fit - assert all([np.allclose(error[var], train_error[var]) for var in train_error]) + # Testing error should be the same as was originally computed during fit + assert all([np.allclose(error[var], test_error[var]) for var in train_error]) # Error using the candidate set should be less than the training error assert all([np.all(test_error[var] <= train_error[var]) for var in test_error])