Skip to content

Commit

Permalink
feat: clean sample inputs and plot slice api
Browse files Browse the repository at this point in the history
  • Loading branch information
eckelsjd committed Jan 9, 2025
1 parent 02fa6b6 commit 86a6a04
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 61 deletions.
127 changes: 68 additions & 59 deletions src/amisc/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,66 +467,75 @@ def set_logger(self, log_file: str | Path | bool = None, stdout: bool = None, lo
for comp in self.components:
comp.set_logger(log_file=log_file, stdout=stdout, logger=logger, level=level)

def sample_inputs(self, size: tuple | int, comp: str = 'System', use_pdf: bool = False,
nominal: dict[str: float] = None, constants: set[str] = None) -> Dataset:
def sample_inputs(self, size: tuple | int,
component: str = 'System',
normalize: bool = True,
use_pdf: bool | str | list[str] = False,
include: str | list[str] = None,
exclude: str | list[str] = None,
nominal: dict[str, float] = None) -> Dataset:
"""Return samples of the inputs according to provided options. Will return samples in the
normalized/compressed space of the surrogate. See [`to_model_dataset`][amisc.utils.to_model_dataset] to
convert the samples to be usable by the true model directly.
normalized/compressed space of the surrogate by default. See [`to_model_dataset`][amisc.utils.to_model_dataset]
to convert the samples to be usable by the true model directly.
:param size: tuple or integer specifying shape or number of samples to obtain
:param comp: which component to sample inputs for (defaults to full system exogenous inputs)
:param use_pdf: whether to sample from each variable's pdf, defaults to random samples over input domain instead
:param nominal: `dict(var_id=value)` of nominal values for params with relative uncertainty, also can use
to specify constant values for a variable listed in `constants`. Specify nominal values
as the true model would expect them (i.e. not normalized or compressed) -- they will be
returned in normalized form.
:param constants: set of param types to hold constant while sampling (i.e. calibration, design, etc.),
can also put a `var.name` string in here to specify a single variable to hold constant
:returns: `dict` of `(*size,)` samples for each input variable
:param component: which component to sample inputs for (defaults to full system exogenous inputs)
:param normalize: whether to normalize the samples (defaults to True)
:param use_pdf: whether to sample from variable pdfs (defaults to False, which will instead sample from the
variable domain bounds). If a string or list of strings is provided, then only those variables
or variable categories will be sampled using their pdfs.
:param include: a list of variable or variable categories to include in the sampling. Defaults to using all
input variables.
:param exclude: a list of variable or variable categories to exclude from the sampling. Empty by default.
:param nominal: `dict(var_id=value)` of nominal values for params with relative uncertainty. Specify nominal
values as unnormalized (will be normalized if `normalize=True`)
:returns: `dict` of `(*size,)` samples for each selected input variable
"""
size = (size, ) if isinstance(size, int) else size
nominal = nominal or dict()
constants = constants or set()
inputs = self.inputs() if comp == 'System' else self[comp].inputs
samples = {}
inputs = self.inputs() if component == 'System' else self[component].inputs
if include is None:
include = []
if not isinstance(include, list):
include = [include]
if exclude is None:
exclude = []
if not isinstance(exclude, list):
exclude = [exclude]
if isinstance(use_pdf, str):
use_pdf = [use_pdf]

selected_inputs = []
for var in inputs:
if len(include) == 0 or var.name in include or var.category in include:
if var.name not in exclude and var.category not in exclude:
selected_inputs.append(var)

samples = {}
for var in selected_inputs:
# Sample from latent variable domains for field quantities
if var.compression is not None:
if var.category in constants or var in constants:
nom = nominal.get(var, None)
if nom is not None:
nom = np.broadcast_to(np.atleast_1d(nom), size + (var.compression.latent_size(),)).copy()
for i, d in enumerate(var.get_domain()):
samples[f'{var.name}{LATENT_STR_ID}{i}'] = nom[..., i] if nom is not None else np.mean(d)
else:
latent = var.sample_domain(size)
for i in range(latent.shape[-1]):
samples[f'{var.name}{LATENT_STR_ID}{i}'] = latent[..., i]
latent = var.sample_domain(size)
for i in range(latent.shape[-1]):
samples[f'{var.name}{LATENT_STR_ID}{i}'] = latent[..., i]

# Sample scalars normally
else:
# Set a constant value for this variable
if var.category in constants or var in constants:
if nom := nominal.get(var):
samples[var.name] = var.normalize(nom)
else:
samples[var.name] = var.normalize(var.get_nominal())
lb, ub = var.get_domain()
pdf = (var.name in use_pdf or var.category in use_pdf) if isinstance(use_pdf, list) else use_pdf
nom = nominal.get(var.name, None)

# Sample from this variable's pdf or randomly within its domain bounds (reject if outside bounds)
else:
lb, ub = var.get_domain()
x_sample = var.sample(size, nominal=nominal.get(var, None)) if use_pdf else var.sample_domain(size)
x_sample = var.sample(size, nominal=nom) if pdf else var.sample_domain(size)
good_idx = (x_sample < ub) & (x_sample > lb)
num_reject = np.sum(~good_idx)

while num_reject > 0:
new_sample = var.sample((num_reject,), nominal=nom) if pdf else var.sample_domain((num_reject,))
x_sample[~good_idx] = new_sample
good_idx = (x_sample < ub) & (x_sample > lb)
num_reject = np.sum(~good_idx)

while num_reject > 0:
new_sample = var.sample((num_reject,), nominal=nominal.get(var, None)) \
if use_pdf else var.sample_domain((num_reject,))
x_sample[~good_idx] = new_sample
good_idx = (x_sample < ub) & (x_sample > lb)
num_reject = np.sum(~good_idx)

samples[var.name] = var.normalize(x_sample)
samples[var.name] = var.normalize(x_sample) if normalize else x_sample

return samples

Expand Down Expand Up @@ -1402,8 +1411,8 @@ def plot_slice(self, inputs: list[str] = None,
outputs: list[str] = None,
num_steps: int = 20,
show_surr: bool = True,
show_model: list = None,
model_dir: str | Path = None,
show_model: str | tuple | list = None,
save_dir: str | Path = None,
executor: Executor = None,
nominal: dict[str: float] = None,
random_walk: bool = False,
Expand All @@ -1420,8 +1429,8 @@ def plot_slice(self, inputs: list[str] = None,
:param num_steps: the number of points to take in the 1d slice for each input variable; this amounts to a total
of `num_steps*len(inputs)` model/surrogate evaluations
:param show_surr: whether to show the surrogate prediction
:param show_model: also plot model predictions, `list` of ['best', 'worst', tuple(alpha), etc.]
:param model_dir: base directory to save model outputs (if specified)
:param show_model: also compute and plot model predictions, `list` of ['best', 'worst', tuple(alpha), etc.]
:param save_dir: base directory to save model outputs and plots (if specified)
:param executor: a `concurrent.futures.Executor` object to parallelize model or surrogate evaluations
:param nominal: `dict` of `var->nominal` to use as constant values for all non-sliced variables (use
unnormalized values only; use `var_LATENT0` to specify nominal latent values)
Expand All @@ -1440,14 +1449,14 @@ def plot_slice(self, inputs: list[str] = None,
show_model = slice_data['show_model'] # Must use same model data as save file
outputs = slice_data.get('outputs') if outputs is None else outputs
input_slices = slice_data['input_slices']
model_dir = None # Don't run or save any models if loading from file
save_dir = None # Don't run or save any models if loading from file

# Set default values (take up to the first 3 inputs by default)
all_inputs = self.inputs()
all_outputs = self.outputs()
rand_id = ''.join(random.choices(string.ascii_uppercase + string.digits, k=4))
if model_dir is not None:
os.mkdir(Path(model_dir) / f'slice_{rand_id}')
if save_dir is not None:
os.mkdir(Path(save_dir) / f'slice_{rand_id}')
if nominal is None:
nominal = dict()
inputs = all_inputs[:3] if inputs is None else inputs
Expand All @@ -1465,12 +1474,12 @@ def plot_slice(self, inputs: list[str] = None,
outputs[i] = f'{var}{LATENT_STR_ID}0'

bds = all_inputs.get_domains()
xlabels = [all_inputs[var].get_tex(units=True) if LATENT_STR_ID not in str(var) else
all_inputs[str(var).split(LATENT_STR_ID)[0]].get_tex(units=True) +
xlabels = [all_inputs[var].get_tex(units=False) if LATENT_STR_ID not in str(var) else
all_inputs[str(var).split(LATENT_STR_ID)[0]].get_tex(units=False) +
f' (latent {str(var).split(LATENT_STR_ID)[1]})' for var in inputs]

ylabels = [all_outputs[var].get_tex(units=True) if LATENT_STR_ID not in str(var) else
all_outputs[str(var).split(LATENT_STR_ID)[0]].get_tex(units=True) +
ylabels = [all_outputs[var].get_tex(units=False) if LATENT_STR_ID not in str(var) else
all_outputs[str(var).split(LATENT_STR_ID)[0]].get_tex(units=False) +
f' (latent {str(var).split(LATENT_STR_ID)[1]})' for var in outputs]

# Construct slices of model inputs (if not provided)
Expand Down Expand Up @@ -1510,8 +1519,8 @@ def plot_slice(self, inputs: list[str] = None,
output_slices_model = list()
for model in show_model:
output_dir = None
if model_dir is not None:
output_dir = (Path(model_dir) / f'slice_{rand_id}' /
if save_dir is not None:
output_dir = (Path(save_dir) / f'slice_{rand_id}' /
str(model).replace('{', '').replace('}', '').replace(':', '=').replace("'", ''))
os.mkdir(output_dir)
output_slices_model.append(self.predict(input_slices, use_model=model, model_dir=output_dir,
Expand Down Expand Up @@ -1551,10 +1560,10 @@ def plot_slice(self, inputs: list[str] = None,
fig.tight_layout()

# Save results (unless we were already loading from a save file)
if from_file is None and self.root_dir is not None:
if from_file is None and save_dir is not None:
fname = f'in={",".join([str(v) for v in inputs])}_out={",".join([str(v) for v in outputs])}'
fname = f'slice_rand{rand_id}_' + fname if random_walk else f'slice_nom{rand_id}_' + fname
fdir = Path(self.root_dir) if model_dir is None else Path(model_dir) / f'slice_{rand_id}'
fdir = Path(save_dir) / f'slice_{rand_id}'
fig.savefig(fdir / f'{fname}.pdf', bbox_inches='tight', format='pdf')
save_dict = {'inputs': inputs, 'outputs': outputs, 'show_model': show_model, 'show_surr': show_surr,
'nominal': nominal, 'random_walk': random_walk, 'input_slices': input_slices,
Expand Down
7 changes: 6 additions & 1 deletion src/amisc/variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,9 @@ def sample_domain(self, shape: tuple | int) -> np.ndarray:
"""Return an array of the given `shape` for uniform samples over the domain of this variable. Returns
samples for each latent dimension if this is a field quantity with compression.
Will always sample uniformly over the normalized surrogate domain if `norm` is specified, and will return
samples in the original unnormalized domain.
!!! Note
The last dim of the returned samples will be the latent space size for field quantities.
Expand All @@ -293,7 +296,9 @@ def sample_domain(self, shape: tuple | int) -> np.ndarray:
ub = np.atleast_1d([d[1] for d in domain])
return np.random.rand(*shape, 1) * (ub - lb) + lb
else:
return np.random.rand(*shape) * (domain[1] - domain[0]) + domain[0]
lb, ub = self.normalize(domain)
norm_samples = np.random.rand(*shape) * (ub - lb) + lb
return self.denormalize(norm_samples)
else:
raise RuntimeError(f'Variable "{self.name}" does not have a domain specified.')

Expand Down
26 changes: 25 additions & 1 deletion tests/test_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,30 @@
from amisc.utils import relative_error


def test_sample_inputs():
def my_func(inputs):
return inputs

inputs = [Variable('x1', distribution='N(0, 1)', category='calibration'),
Variable('x2', domain=(0.1, 100), norm='log10', category='calibration'),
Variable('x3', domain=(1e-6, 10e-6), norm='linear(1e6)', category='other')]
surr = System(Component(my_func, inputs, ['y1', 'y2', 'y3'], name='c1'))

s1 = surr.sample_inputs(100)
assert all([v in s1 for v in ['x1', 'x2', 'x3']])
assert np.all(s1['x1'] < 3) and np.all(s1['x1'] > -3)
assert np.all(s1['x2'] < 2) and np.all(s1['x2'] > -1)
assert np.all(s1['x3'] < 10) and np.all(s1['x3'] > 1)

s1 = surr.sample_inputs(100, use_pdf='x1', include='calibration', normalize=False)
assert 'x3' not in s1
assert np.all(s1['x2'] < 100) and np.all(s1['x2'] > 0.1)

s1 = surr.sample_inputs(100, exclude=['x1', 'x2'], normalize=False)
assert 'x1' not in s1 and 'x2' not in s1
assert np.all(s1['x3'] < 10e-6) and np.all(s1['x3'] > 1e-6)


def test_feedforward_simple(plots=False):
"""Test the MD system in Figure 5 in Jakeman 2022."""
from amisc.examples.models import f1, f2
Expand Down Expand Up @@ -348,7 +372,7 @@ def test_fire_sat(tmp_path, plots=False):
# Plot 1d slices
slice_idx = ['H', 'Po', 'Cd']
try:
surr.plot_slice(slice_idx, targs, show_model=['best', 'worst'], model_dir=surr.root_dir,
surr.plot_slice(slice_idx, targs, show_model=['best', 'worst'], save_dir=surr.root_dir,
random_walk=True, num_steps=15)
except np.linalg.LinAlgError:
print('Its alright. Sometimes the random walks are wacky and FPI wont converge.')
Expand Down

0 comments on commit 86a6a04

Please sign in to comment.