From 86a6a04abedc4bbc97a544f23658fd05b00386c2 Mon Sep 17 00:00:00 2001 From: Joshua Eckels Date: Thu, 2 Jan 2025 16:17:49 -0700 Subject: [PATCH] feat: clean sample inputs and plot slice api --- src/amisc/system.py | 127 ++++++++++++++++++++++-------------------- src/amisc/variable.py | 7 ++- tests/test_system.py | 26 ++++++++- 3 files changed, 99 insertions(+), 61 deletions(-) diff --git a/src/amisc/system.py b/src/amisc/system.py index db52efb..05a228f 100644 --- a/src/amisc/system.py +++ b/src/amisc/system.py @@ -467,66 +467,75 @@ def set_logger(self, log_file: str | Path | bool = None, stdout: bool = None, lo for comp in self.components: comp.set_logger(log_file=log_file, stdout=stdout, logger=logger, level=level) - def sample_inputs(self, size: tuple | int, comp: str = 'System', use_pdf: bool = False, - nominal: dict[str: float] = None, constants: set[str] = None) -> Dataset: + def sample_inputs(self, size: tuple | int, + component: str = 'System', + normalize: bool = True, + use_pdf: bool | str | list[str] = False, + include: str | list[str] = None, + exclude: str | list[str] = None, + nominal: dict[str, float] = None) -> Dataset: """Return samples of the inputs according to provided options. Will return samples in the - normalized/compressed space of the surrogate. See [`to_model_dataset`][amisc.utils.to_model_dataset] to - convert the samples to be usable by the true model directly. + normalized/compressed space of the surrogate by default. See [`to_model_dataset`][amisc.utils.to_model_dataset] + to convert the samples to be usable by the true model directly. :param size: tuple or integer specifying shape or number of samples to obtain - :param comp: which component to sample inputs for (defaults to full system exogenous inputs) - :param use_pdf: whether to sample from each variable's pdf, defaults to random samples over input domain instead - :param nominal: `dict(var_id=value)` of nominal values for params with relative uncertainty, also can use - to specify constant values for a variable listed in `constants`. Specify nominal values - as the true model would expect them (i.e. not normalized or compressed) -- they will be - returned in normalized form. - :param constants: set of param types to hold constant while sampling (i.e. calibration, design, etc.), - can also put a `var.name` string in here to specify a single variable to hold constant - :returns: `dict` of `(*size,)` samples for each input variable + :param component: which component to sample inputs for (defaults to full system exogenous inputs) + :param normalize: whether to normalize the samples (defaults to True) + :param use_pdf: whether to sample from variable pdfs (defaults to False, which will instead sample from the + variable domain bounds). If a string or list of strings is provided, then only those variables + or variable categories will be sampled using their pdfs. + :param include: a list of variable or variable categories to include in the sampling. Defaults to using all + input variables. + :param exclude: a list of variable or variable categories to exclude from the sampling. Empty by default. + :param nominal: `dict(var_id=value)` of nominal values for params with relative uncertainty. Specify nominal + values as unnormalized (will be normalized if `normalize=True`) + :returns: `dict` of `(*size,)` samples for each selected input variable """ size = (size, ) if isinstance(size, int) else size nominal = nominal or dict() - constants = constants or set() - inputs = self.inputs() if comp == 'System' else self[comp].inputs - samples = {} + inputs = self.inputs() if component == 'System' else self[component].inputs + if include is None: + include = [] + if not isinstance(include, list): + include = [include] + if exclude is None: + exclude = [] + if not isinstance(exclude, list): + exclude = [exclude] + if isinstance(use_pdf, str): + use_pdf = [use_pdf] + + selected_inputs = [] for var in inputs: + if len(include) == 0 or var.name in include or var.category in include: + if var.name not in exclude and var.category not in exclude: + selected_inputs.append(var) + + samples = {} + for var in selected_inputs: # Sample from latent variable domains for field quantities if var.compression is not None: - if var.category in constants or var in constants: - nom = nominal.get(var, None) - if nom is not None: - nom = np.broadcast_to(np.atleast_1d(nom), size + (var.compression.latent_size(),)).copy() - for i, d in enumerate(var.get_domain()): - samples[f'{var.name}{LATENT_STR_ID}{i}'] = nom[..., i] if nom is not None else np.mean(d) - else: - latent = var.sample_domain(size) - for i in range(latent.shape[-1]): - samples[f'{var.name}{LATENT_STR_ID}{i}'] = latent[..., i] + latent = var.sample_domain(size) + for i in range(latent.shape[-1]): + samples[f'{var.name}{LATENT_STR_ID}{i}'] = latent[..., i] # Sample scalars normally else: - # Set a constant value for this variable - if var.category in constants or var in constants: - if nom := nominal.get(var): - samples[var.name] = var.normalize(nom) - else: - samples[var.name] = var.normalize(var.get_nominal()) + lb, ub = var.get_domain() + pdf = (var.name in use_pdf or var.category in use_pdf) if isinstance(use_pdf, list) else use_pdf + nom = nominal.get(var.name, None) - # Sample from this variable's pdf or randomly within its domain bounds (reject if outside bounds) - else: - lb, ub = var.get_domain() - x_sample = var.sample(size, nominal=nominal.get(var, None)) if use_pdf else var.sample_domain(size) + x_sample = var.sample(size, nominal=nom) if pdf else var.sample_domain(size) + good_idx = (x_sample < ub) & (x_sample > lb) + num_reject = np.sum(~good_idx) + + while num_reject > 0: + new_sample = var.sample((num_reject,), nominal=nom) if pdf else var.sample_domain((num_reject,)) + x_sample[~good_idx] = new_sample good_idx = (x_sample < ub) & (x_sample > lb) num_reject = np.sum(~good_idx) - while num_reject > 0: - new_sample = var.sample((num_reject,), nominal=nominal.get(var, None)) \ - if use_pdf else var.sample_domain((num_reject,)) - x_sample[~good_idx] = new_sample - good_idx = (x_sample < ub) & (x_sample > lb) - num_reject = np.sum(~good_idx) - - samples[var.name] = var.normalize(x_sample) + samples[var.name] = var.normalize(x_sample) if normalize else x_sample return samples @@ -1402,8 +1411,8 @@ def plot_slice(self, inputs: list[str] = None, outputs: list[str] = None, num_steps: int = 20, show_surr: bool = True, - show_model: list = None, - model_dir: str | Path = None, + show_model: str | tuple | list = None, + save_dir: str | Path = None, executor: Executor = None, nominal: dict[str: float] = None, random_walk: bool = False, @@ -1420,8 +1429,8 @@ def plot_slice(self, inputs: list[str] = None, :param num_steps: the number of points to take in the 1d slice for each input variable; this amounts to a total of `num_steps*len(inputs)` model/surrogate evaluations :param show_surr: whether to show the surrogate prediction - :param show_model: also plot model predictions, `list` of ['best', 'worst', tuple(alpha), etc.] - :param model_dir: base directory to save model outputs (if specified) + :param show_model: also compute and plot model predictions, `list` of ['best', 'worst', tuple(alpha), etc.] + :param save_dir: base directory to save model outputs and plots (if specified) :param executor: a `concurrent.futures.Executor` object to parallelize model or surrogate evaluations :param nominal: `dict` of `var->nominal` to use as constant values for all non-sliced variables (use unnormalized values only; use `var_LATENT0` to specify nominal latent values) @@ -1440,14 +1449,14 @@ def plot_slice(self, inputs: list[str] = None, show_model = slice_data['show_model'] # Must use same model data as save file outputs = slice_data.get('outputs') if outputs is None else outputs input_slices = slice_data['input_slices'] - model_dir = None # Don't run or save any models if loading from file + save_dir = None # Don't run or save any models if loading from file # Set default values (take up to the first 3 inputs by default) all_inputs = self.inputs() all_outputs = self.outputs() rand_id = ''.join(random.choices(string.ascii_uppercase + string.digits, k=4)) - if model_dir is not None: - os.mkdir(Path(model_dir) / f'slice_{rand_id}') + if save_dir is not None: + os.mkdir(Path(save_dir) / f'slice_{rand_id}') if nominal is None: nominal = dict() inputs = all_inputs[:3] if inputs is None else inputs @@ -1465,12 +1474,12 @@ def plot_slice(self, inputs: list[str] = None, outputs[i] = f'{var}{LATENT_STR_ID}0' bds = all_inputs.get_domains() - xlabels = [all_inputs[var].get_tex(units=True) if LATENT_STR_ID not in str(var) else - all_inputs[str(var).split(LATENT_STR_ID)[0]].get_tex(units=True) + + xlabels = [all_inputs[var].get_tex(units=False) if LATENT_STR_ID not in str(var) else + all_inputs[str(var).split(LATENT_STR_ID)[0]].get_tex(units=False) + f' (latent {str(var).split(LATENT_STR_ID)[1]})' for var in inputs] - ylabels = [all_outputs[var].get_tex(units=True) if LATENT_STR_ID not in str(var) else - all_outputs[str(var).split(LATENT_STR_ID)[0]].get_tex(units=True) + + ylabels = [all_outputs[var].get_tex(units=False) if LATENT_STR_ID not in str(var) else + all_outputs[str(var).split(LATENT_STR_ID)[0]].get_tex(units=False) + f' (latent {str(var).split(LATENT_STR_ID)[1]})' for var in outputs] # Construct slices of model inputs (if not provided) @@ -1510,8 +1519,8 @@ def plot_slice(self, inputs: list[str] = None, output_slices_model = list() for model in show_model: output_dir = None - if model_dir is not None: - output_dir = (Path(model_dir) / f'slice_{rand_id}' / + if save_dir is not None: + output_dir = (Path(save_dir) / f'slice_{rand_id}' / str(model).replace('{', '').replace('}', '').replace(':', '=').replace("'", '')) os.mkdir(output_dir) output_slices_model.append(self.predict(input_slices, use_model=model, model_dir=output_dir, @@ -1551,10 +1560,10 @@ def plot_slice(self, inputs: list[str] = None, fig.tight_layout() # Save results (unless we were already loading from a save file) - if from_file is None and self.root_dir is not None: + if from_file is None and save_dir is not None: fname = f'in={",".join([str(v) for v in inputs])}_out={",".join([str(v) for v in outputs])}' fname = f'slice_rand{rand_id}_' + fname if random_walk else f'slice_nom{rand_id}_' + fname - fdir = Path(self.root_dir) if model_dir is None else Path(model_dir) / f'slice_{rand_id}' + fdir = Path(save_dir) / f'slice_{rand_id}' fig.savefig(fdir / f'{fname}.pdf', bbox_inches='tight', format='pdf') save_dict = {'inputs': inputs, 'outputs': outputs, 'show_model': show_model, 'show_surr': show_surr, 'nominal': nominal, 'random_walk': random_walk, 'input_slices': input_slices, diff --git a/src/amisc/variable.py b/src/amisc/variable.py index 56da021..5623b95 100644 --- a/src/amisc/variable.py +++ b/src/amisc/variable.py @@ -279,6 +279,9 @@ def sample_domain(self, shape: tuple | int) -> np.ndarray: """Return an array of the given `shape` for uniform samples over the domain of this variable. Returns samples for each latent dimension if this is a field quantity with compression. + Will always sample uniformly over the normalized surrogate domain if `norm` is specified, and will return + samples in the original unnormalized domain. + !!! Note The last dim of the returned samples will be the latent space size for field quantities. @@ -293,7 +296,9 @@ def sample_domain(self, shape: tuple | int) -> np.ndarray: ub = np.atleast_1d([d[1] for d in domain]) return np.random.rand(*shape, 1) * (ub - lb) + lb else: - return np.random.rand(*shape) * (domain[1] - domain[0]) + domain[0] + lb, ub = self.normalize(domain) + norm_samples = np.random.rand(*shape) * (ub - lb) + lb + return self.denormalize(norm_samples) else: raise RuntimeError(f'Variable "{self.name}" does not have a domain specified.') diff --git a/tests/test_system.py b/tests/test_system.py index ac09175..dd93179 100644 --- a/tests/test_system.py +++ b/tests/test_system.py @@ -15,6 +15,30 @@ from amisc.utils import relative_error +def test_sample_inputs(): + def my_func(inputs): + return inputs + + inputs = [Variable('x1', distribution='N(0, 1)', category='calibration'), + Variable('x2', domain=(0.1, 100), norm='log10', category='calibration'), + Variable('x3', domain=(1e-6, 10e-6), norm='linear(1e6)', category='other')] + surr = System(Component(my_func, inputs, ['y1', 'y2', 'y3'], name='c1')) + + s1 = surr.sample_inputs(100) + assert all([v in s1 for v in ['x1', 'x2', 'x3']]) + assert np.all(s1['x1'] < 3) and np.all(s1['x1'] > -3) + assert np.all(s1['x2'] < 2) and np.all(s1['x2'] > -1) + assert np.all(s1['x3'] < 10) and np.all(s1['x3'] > 1) + + s1 = surr.sample_inputs(100, use_pdf='x1', include='calibration', normalize=False) + assert 'x3' not in s1 + assert np.all(s1['x2'] < 100) and np.all(s1['x2'] > 0.1) + + s1 = surr.sample_inputs(100, exclude=['x1', 'x2'], normalize=False) + assert 'x1' not in s1 and 'x2' not in s1 + assert np.all(s1['x3'] < 10e-6) and np.all(s1['x3'] > 1e-6) + + def test_feedforward_simple(plots=False): """Test the MD system in Figure 5 in Jakeman 2022.""" from amisc.examples.models import f1, f2 @@ -348,7 +372,7 @@ def test_fire_sat(tmp_path, plots=False): # Plot 1d slices slice_idx = ['H', 'Po', 'Cd'] try: - surr.plot_slice(slice_idx, targs, show_model=['best', 'worst'], model_dir=surr.root_dir, + surr.plot_slice(slice_idx, targs, show_model=['best', 'worst'], save_dir=surr.root_dir, random_walk=True, num_steps=15) except np.linalg.LinAlgError: print('Its alright. Sometimes the random walks are wacky and FPI wont converge.')