diff --git a/.github/workflows/push.yaml b/.github/workflows/push.yaml index 9bba1720..7af3e42e 100644 --- a/.github/workflows/push.yaml +++ b/.github/workflows/push.yaml @@ -34,7 +34,7 @@ jobs: needs: [ lint_and_type_check ] strategy: matrix: - python-version: [ 3.6, 3.7, 3.8, 3.9 ] + python-version: [ 3.7, 3.8, 3.9 ] steps: - uses: actions/checkout@v2 diff --git a/.gitignore b/.gitignore index 5dcf4947..42a540e4 100644 --- a/.gitignore +++ b/.gitignore @@ -140,3 +140,5 @@ catalog-*.xml *.ttl *.sqlite3 *.sqlite3-journal +*.dat +*.owl diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 21c96cde..df79e386 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -6,11 +6,11 @@ repos: args: ['--maxkb=100'] - id: check-yaml -- repo: https://github.com/pre-commit/mirrors-mypy - rev: v0.910 - hooks: - - id: mypy - args: ['--install-types', '--non-interactive', '--ignore-missing-imports'] +#- repo: https://github.com/pre-commit/mirrors-mypy +# rev: v0.910 +# hooks: +# - id: mypy +# args: ['--install-types', '--non-interactive', '--ignore-missing-imports'] - repo: https://github.com/psf/black rev: 22.3.0 diff --git a/examples/example_surrogate_hartman6D.py b/examples/example_surrogate_hartman6D.py new file mode 100644 index 00000000..4f0d27d9 --- /dev/null +++ b/examples/example_surrogate_hartman6D.py @@ -0,0 +1,342 @@ +""" +Example of a Bayesian parameter estimation problem using surrogate +modeling with probeye. + +The Hartmann test function f:[0, 1]^6 -> R^1 is used to simulate a +physical model. The last two dimensions are considered as space and +time coordinates, while the first four dimensions are taken as +latent variables to be inferred. Measurements are generated by +adding I.i.d. Gaussian noise to samples from this function. + +Notes: + * A specific version of `harlow` is required to run this example, which can + be found here: + https://github.com/TNO/harlow/tree/implement-data-container +""" + +# ========================================================================= +# Imports +# ========================================================================= + +# third party imports +import numpy as np + +# local imports (problem definition) +from probeye.definition.inverse_problem import InverseProblem +from probeye.definition.forward_model import ForwardModelBase +from probeye.definition.sensor import Sensor +from probeye.definition.likelihood_model import GaussianLikelihoodModel +from probeye.inference.emcee.solver import EmceeSolver +from probeye.definition.distribution import Uniform +from probeye.metamodeling.sampling import LatinHypercubeSampler, HarlowSampler +from probeye.metamodeling.surrogating import HarlowSurrogate + +# local imports (inference data post-processing) +from probeye.postprocessing.sampling_plots import create_pair_plot +from probeye.postprocessing.sampling_plots import create_posterior_plot + +# Surrogate model imports +from harlow.sampling import FuzzyLolaVoronoi, ProbabilisticSampler, LatinHypercube +from harlow.surrogating import ( + VanillaGaussianProcess, +) +from harlow.utils.transforms import ( + Identity, +) + +import torch +from botorch.test_functions.synthetic import Hartmann +from matplotlib import pyplot as plt + +# ========================================================================= +# General settings +# ========================================================================= + +plot = True + +# Emcee settings +n_steps = 1_000 +n_init_steps = 200 +n_walkers = 20 + +# Sampler settings +n_init = 100 +n_iter = 5 +n_point_per_iter = 20 +stopping_criterium = -np.inf + +# Surrogate settings +N_train_iter = 50 +show_progress = True + +# ========================================================================= +# Define parameters +# ========================================================================= + +# Ground truth +X_true = np.array([0.5, 0.5, 0.5, 0.5]) + +# Bounds for function defined on unit hypercube +X_low = 0.0 +X_high = 1.0 + +# Ground truth and prior for measurement uncertainty std. dev. +std_true = 0.05 +std_low = 0.0 +std_high = 1.0 + +# ========================================================================= +# Define physical model +# ========================================================================= + +# Number of sensors and number of points in timeseries +Nx = 3 +Nt = 5 + +# Sensor names and positions +sensor_names = ["S" + str(i + 1) for i in range(Nx)] +x_vec = np.linspace(0, 1, Nx) +t_vec = np.linspace(0, 1, Nt) + +isensor = Sensor("t") +osensor_list = [ + Sensor(sensor_names[i], x=float(x_vec[i]), std_model="sigma") for i in range(Nx) +] + +# ========================================================================= +# Define forward model +# ========================================================================= + +# Initialize model +expensive_model = Hartmann(noise_std=0.00001) + + +class SyntheticModel(ForwardModelBase): + def interface(self): + self.parameters = ["X" + str(i + 1) for i in range(4)] + self.input_sensors = isensor + self.output_sensors = osensor_list + + def response(self, inp: dict) -> dict: + + # Arange input vector + params = np.tile([inp["X" + str(i + 1)] for i in range(4)], (Nx * Nt, 1)) + xt = np.array(np.meshgrid(x_vec, t_vec)).T.reshape(-1, 2) + X = torch.tensor(np.hstack((params, xt))) + + # Evaluate function and arange output on grid + f = np.array(expensive_model(X)).reshape(Nx, Nt) + + # Store prediction as dict + response = dict() + for idx_x, os in enumerate(self.output_sensors): + response[os.name] = f[idx_x, :] + return response + + +# ========================================================================= +# Define inference problem +# ========================================================================= +problem = InverseProblem("Parameter estimation using surrogate model") + +# Parameters of the Hartmann function +for i in range(4): + problem.add_parameter( + "X" + str(i + 1), + "model", + prior=Uniform(low=X_low, high=X_high), + info="Parameter of the 6D Hartmann function", + tex=r"$X_{{{}}}$".format(i + 1), + ) + +# Noise std. dev. +problem.add_parameter( + "sigma", + "likelihood", + prior=Uniform(low=std_low, high=std_high), + info="Std. dev. of zero-mean noise model", + tex=r"$\sigma$", +) + +# add the forward model to the problem +forward_model = SyntheticModel("ExpensiveModel") + +# ========================================================================= +# Add test data to the inference problem +# ========================================================================= +def generate_data(): + inp = {"X" + str(idx + 1): X_i for idx, X_i in enumerate(X_true)} + sensors = forward_model(inp) + for sname, svals in sensors.items(): + sensors[sname] = list( + np.array(svals) + np.random.normal(loc=0.0, scale=std_true, size=Nt) + ) + + # To avoid errors when `Nt = 1` + if len(t_vec) == 1: + sensors[isensor.name] = float(t_vec[0]) + else: + sensors[isensor.name] = t_vec + + # Add experiments + problem.add_experiment("TestSeriesFull", sensor_data=sensors) + problem.add_experiment("TestSeriesSurrogate", sensor_data=sensors) + + +# Generate data and add expensive forward model +generate_data() +problem.add_forward_model(forward_model, experiments="TestSeriesFull") + +# ========================================================================= +# Create surrogate model +# ========================================================================= +n_params = 4 +list_params = [[i for i in range(n_params)]] * len(sensor_names) * len(t_vec) + +# Kwargs to be passed to the surrogate model +surrogate_kwargs = { + "training_max_iter": N_train_iter, + "list_params": list_params, + "show_progress": True, + "silence_warnings": True, + "fast_pred_var": True, + "input_transform": Identity, + "output_transform": Identity, +} + +# Define the surrogate model +surrogate_model = VanillaGaussianProcess(**surrogate_kwargs) + +# Probeye's latin hypercube sampler +lhs_sampler = LatinHypercubeSampler(problem) + +# An iterative sampler. Here we pass the surrogate ForwardModel directly to the sampler. However, it is +# also possible to pass a surrogate model that will be included in a forward model after fitting. +harlow_sampler = HarlowSampler(problem, forward_model, LatinHypercube, surrogate_model) + +# Sampler and fit +harlow_sampler.sample( + n_initial_point=n_init, + n_new_point_per_iteration=n_point_per_iter, + n_iter=n_iter, + stopping_criterium=stopping_criterium, +) +harlow_sampler.fit() + +# Define the surrogate forward model +forward_surrogate_model = HarlowSurrogate( + "SurrogateModel", surrogate_model, forward_model +) + +# ========================================================================= +# Add forward models +# ========================================================================= + +# Add surrogate model to forward models +problem.add_forward_model(forward_surrogate_model, experiments="TestSeriesSurrogate") + +# add the likelihood models to the problem +for osensor in osensor_list: + problem.add_likelihood_model( + GaussianLikelihoodModel( + experiment_name="TestSeriesSurrogate", + model_error="additive", + ) + ) + +# ==================================================================== +# Plot surrogate vs. FE model prediction +# ==================================================================== + +# Physical model prediction +arr_x = np.linspace(X_low, X_high, 100) +inp = {"X" + str(idx + 1): X_i for idx, X_i in enumerate(X_true)} + +# Initialize zero arrays to store results +y_true = np.zeros((len(arr_x), len(sensor_names) * len(t_vec))) +y_pred = np.zeros((len(arr_x), len(sensor_names) * len(t_vec))) + +# Evaluate physical model for each input vector +for idx_xi, xi in enumerate(arr_x): + inp["X1"] = xi + res_true = forward_model.response(inp) + res_pred = forward_surrogate_model.response(inp) + sensor_out_true = [] + sensor_out_pred = [] + for idx_os, os in enumerate(osensor_list): + sensor_out_true.append(res_true[os.name]) + sensor_out_pred.append(res_pred[os.name]) + y_true[idx_xi, :] = np.ravel(sensor_out_true) + y_pred[idx_xi, :] = np.ravel(sensor_out_pred) + +# Plot +nrows = 3 +ncols = int(np.ceil(len(sensor_names) * len(t_vec) / 3)) +f, axes = plt.subplots(nrows, ncols, sharex=True, figsize=(3 * ncols, 3 * nrows)) +for j in range(len(sensor_names) * len(t_vec)): + ax_i = axes.ravel()[j] + grid_idx = np.unravel_index(j, (nrows, ncols)) + ax_i.plot(arr_x, y_true[:, j], color="blue", label="True") + ax_i.plot(arr_x, y_pred[:, j], color="red", linestyle="dashed", label="Surrogate") + ax_i.set_title( + f"Sensor: {str(sensor_names[grid_idx[0]]) + '_' + str(t_vec[grid_idx[1]])}" + ) + +axes = np.atleast_2d(axes) +[ax_i.set_xlabel(r"$X_1$") for ax_i in axes[-1, :]] +axes[0, 0].legend() +plt.show() + + +# ========================================================================= +# Add noise models +# ========================================================================= +# Problem overview +problem.info() + +true_values = { + "X1": X_true[0], + "X2": X_true[1], + "X3": X_true[2], + "X4": X_true[3], + "sigma": std_true, +} + +# ========================================================================= +# Initialize and run solver +# ========================================================================= + +solver = EmceeSolver( + problem, + show_progress=True, +) + + +inference_data = solver.run( + n_walkers=n_walkers, n_steps=n_steps, n_initial_steps=n_init_steps, vectorize=False +) + +# ========================================================================= +# Plotting +# ========================================================================= +create_pair_plot( + inference_data, + solver.problem, + show=False, + true_values=true_values, + title="Joint posterior", +) + +create_posterior_plot( + inference_data, + solver.problem, + show=False, + true_values=true_values, + title="Marginal posteriors", +) + + +if plot: + plt.show() # shows all plots at once due to 'show=False' above +else: + plt.close("all") diff --git a/probeye/definition/inverse_problem.py b/probeye/definition/inverse_problem.py index 9f47b042..8f85c65a 100644 --- a/probeye/definition/inverse_problem.py +++ b/probeye/definition/inverse_problem.py @@ -408,6 +408,37 @@ def change_constant(self, prm_name: str, new_value: Union[int, float]): value=new_value ) + def get_latent_prior_hyperparameters(self, prm_name: str) -> list: + """ + Returns a list of the latent hyperparameters of a parameter's prior. In most + cases there will be none, so an empty list will be returned. + + Parameters + ---------- + prm_name + The name of the parameter the prior of which should be checked for latent + hyperparameters. + + Returns + ------- + latent_hyperparameters + Contains the global names of latent hyperparameters in the prior of the + parameter 'prm_name'. + """ + + # first, make sure that the given parameter exists + self._parameters.confirm_that_parameter_exists(prm_name) + + # now look for possible latent hyperparameters + latent_hyperparameters = [] + if self.parameters[prm_name].is_latent: + hyperparameters = self.parameters[prm_name].prior.hyperparameters + for hyperparameter in hyperparameters: + if self.parameters[hyperparameter].is_latent: + latent_hyperparameters.append(hyperparameter) + + return latent_hyperparameters + def get_parameters(self, theta: np.ndarray, prm_def: dict) -> dict: """ Extracts the numeric values for given parameters that have been defined within @@ -451,6 +482,35 @@ def get_parameters(self, theta: np.ndarray, prm_def: dict) -> dict: prms[local_name] = theta[idx:idx_end] return prms + def get_constants(self, prm_def: dict) -> dict: + """ + Similar to 'get_parameters' with the difference that this method only extracts + the numeric values of constants that have been defined within the problem scope. + For that reason it does not need the 'theta' argument as in 'get_parameters'. If + 'prm_def' also contains latent parameters, they will simply be ignored. + + Parameters + ---------- + prm_def + Defines which constants to extract. The keys of this dictionary are the + global parameter names, while the values are the local parameter names. In + most cases global and local names will be identical, but sometimes it is + convenient to define a local parameter name, e.g. in the forward model. + + Returns + ------- + prms + Contains : <(global) parameter value> pairs. If a + parameter is scalar, its value will be returned as a float. In case of a + vector-valued parameter, its value will be returned as a np.ndarray. + """ + prms = {} + for global_name, local_name in prm_def.items(): + idx = self._parameters[global_name].index + if idx is None: + prms[local_name] = self._parameters[global_name].value + return prms + def check_parameter_domains(self, theta: np.ndarray) -> bool: """ Checks whether the given values of the latent parameters are within their diff --git a/probeye/metamodeling/__init__.py b/probeye/metamodeling/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/probeye/metamodeling/initial_sampling.py b/probeye/metamodeling/initial_sampling.py new file mode 100644 index 00000000..a4924f10 --- /dev/null +++ b/probeye/metamodeling/initial_sampling.py @@ -0,0 +1,175 @@ +# standard library +from typing import Tuple +import copy + +# third party imports +import numpy as np +import pandas as pd +from scipy.stats import qmc + +# local imports +from probeye.definition.inverse_problem import InverseProblem +from probeye.definition.forward_model import ForwardModelBase +from probeye.inference.priors import translate_prior +from probeye.subroutines import len_or_one + + +class LatinHypercubeSampler: + """ + Contains functionalities to provide samples of an inverse problem's parameters by + utilizing latin hypercube sampling. The generate samples are intended for training + of a surrogate model. + + Parameters + ---------- + problem + The considered inverse problem. + """ + + def __init__(self, problem: InverseProblem): + + # the considered inverse problem + self.problem = problem + + # the sampling happens before the problem is given to a solver; this means that + # the priors of the problem are merely descriptive and they have to be + # translated to have their respective computing routines + self.priors = copy.deepcopy(self.problem.priors) + for prior_template in self.problem.priors.values(): + prm_name = prior_template.ref_prm + self.priors[prm_name] = translate_prior(prior_template) + + def generate_samples(self, n_samples: int, seed: int = 1) -> np.ndarray: + """ + Samples the problems latent parameter vector from the parameter's priors in + combination with latin hypercube sampling. + + Parameters + ---------- + n_samples + The number of requested samples. + seed + Seed for random number generator. + + Returns + ------- + sample_array + The sampled latent parameter vectors. Each row corresponds to a single + latent parameter vector. + """ + + # make sure that all parameters are one-dimensional; it is not straight forward + # how to do LHS for general multivariate parameters + for prm_name in self.problem.latent_prms: + if self.problem.parameters[prm_name].dim > 1: + raise RuntimeError( + f"The given problem has a multivariate parameter ('{prm_name}') " + f"for which LHS is not supported." + ) + + # the dimension of the required hypercube is the number of parameters; note + # that this only holds since all parameters are 1D (see the check above) + dim = self.problem.n_latent_prms + + # create the requested number of latin hypercube samples with the requested dim. + lhd = qmc.LatinHypercube(d=dim, seed=seed).random(n=n_samples) + + # this is going to be the array of the parameter samples; each row will + # correspond to a theta-vector + sample_array = np.zeros(lhd.shape) + + # fill the prepared array for the parameter samples using the LHS samples + for prm_name, parameter in self.problem.parameters.items(): + if parameter.is_latent: + idx = parameter.index + prior = self.priors[prm_name] + prms = self.problem.get_constants(prior.hyperparameters) + for lt_prm in self.problem.get_latent_prior_hyperparameters(prm_name): + idx_lt = self.problem.parameters[lt_prm].index + prms[lt_prm] = sample_array[:, idx_lt] + q = lhd[:, idx] + sample_array[:, idx] = prior(prms, "ppf", q, use_ref_prm=False) + + return sample_array + + def generate_training_data( + self, + forward_model: ForwardModelBase, + n_samples: int, + seed: int = 1, + ) -> Tuple[pd.DataFrame, dict]: + """ + Generates a given number of training data for fitting a surrogate model. The + training data contains a number of parameter vectors (sampled using LHS) and + the corresponding model responses. + + Parameters + ---------- + forward_model + The forward model that should be evaluated. + n_samples + The number of parameter vectors the forward model should be evaluated for. + seed + Seed for random number generator. + + Returns + ------- + prm_samples_pd + The parameter samples the forward model was evaluated at. + responses_over_experiments + The keys are the names of the experiment the forward model is associated + with, while the values are 3D-arrays containing the forward model's + response. responses_over_experiments[i][j] will contain the forward model's + response with the ith parameter-vector for the jth output sensor. + """ + + # get the forward model object with the given name and prepare the corresponding + # experimental in- and output dictionaries + forward_model.prepare_experimental_inputs_and_outputs() + + # generate the latent parameter samples and convert it to a data frame to have + # the association between columns and parameter names + prm_samples = self.generate_samples(n_samples, seed=seed) + prm_samples_pd = pd.DataFrame( + prm_samples, columns=self.problem.get_theta_names() + ) + + # this dictionary will contain the forward model responses for each of the + # experiments associated with the forward model; so the keys will be experiment + # names while the values will be 3D-arrays with the forward model's responses; + # responses_over_experiments[i] will correspond to the response of the ith + # parameter vector; responses_over_experiments[i][j] will contain the forward + # model's response with the ith parameter vector for the jth output sensor + responses_over_experiments = {} + + # here, the length of the vectors of the forward model's output sensors is + # determined; to that end, the forward model is evaluated once + first_exp_name = forward_model.experiment_names[0] + exp_inp = forward_model.input_from_experiments[first_exp_name] + first_theta = prm_samples[0] + prms_model = self.problem.get_parameters(first_theta, forward_model.prms_def) + inp = {**exp_inp, **prms_model} + response_dict = forward_model.response(inp) + # make sure that the vectors returned by each of the forward model's output + # sensors has the same length; otherwise an AssertionError is raised + length_set = set() + for value in response_dict.values(): + length_set.add(len_or_one(value)) + assert len(length_set) == 1 + n_out_values = list(length_set)[0] + + # evaluate the forward model for each experiment/parameter vector + for exp_name in forward_model.experiment_names: + exp_inp = forward_model.input_from_experiments[exp_name] + response_array = np.zeros( + (n_samples, forward_model.n_output_sensors, n_out_values) + ) + for i, theta in enumerate(prm_samples): + prms_model = self.problem.get_parameters(theta, forward_model.prms_def) + inp = {**exp_inp, **prms_model} # adds the two dictionaries + response_dict = forward_model.response(inp) + for j, response_vector in enumerate(response_dict.values()): + response_array[i, j, :] = response_vector + responses_over_experiments[exp_name] = response_array + + return prm_samples_pd, responses_over_experiments diff --git a/probeye/metamodeling/sampling.py b/probeye/metamodeling/sampling.py new file mode 100644 index 00000000..7924a80d --- /dev/null +++ b/probeye/metamodeling/sampling.py @@ -0,0 +1,485 @@ +# standard library +from typing import Tuple, Callable +import copy + +# third party imports +import numpy as np +import pandas as pd +from scipy.stats import qmc + +# local imports +from probeye.definition.inverse_problem import InverseProblem +from probeye.definition.forward_model import ForwardModelBase +from probeye.inference.scipy.priors import translate_prior +from probeye.subroutines import len_or_one + +# external imports +from harlow.sampling import Sampler as HarlowSamplerBase +from harlow.surrogating import Surrogate as HarlowSurrogateBase +from harlow.utils.helper_functions import latin_hypercube_sampling + + +class SamplerBase: + """ + Base class for probeye samplers + """ + + def __init__(self, problem: InverseProblem): + # the considered inverse problem + self.problem = problem + + # the sampling happens before the problem is given to a solver; this means that + # the priors of the problem are merely descriptive and they have to be + # translated to have their respective computing routines + self.priors = copy.deepcopy(self.problem.priors) + for prior_template in self.problem.priors.values(): + prm_name = prior_template.ref_prm + self.priors[prm_name] = translate_prior(prior_template) + + def sample( + self, forward_model: ForwardModelBase, **kwargs + ) -> Tuple[pd.DataFrame, dict]: + raise NotImplementedError + + +class LatinHypercubeSampler(SamplerBase): + """ + Contains functionalities to provide samples of an inverse problem's parameters by + utilizing latin hypercube sampling. The generate samples are intended for training + of a surrogate model. + + Parameters + ---------- + problem + The considered inverse problem. + """ + + def __init__(self, problem): + super().__init__(problem) + + def generate_samples(self, n_samples: int, seed: int = 1) -> np.ndarray: + """ + Samples the problems latent parameter vector from the parameter's priors in + combination with latin hypercube sampling. + + Parameters + ---------- + n_samples + The number of requested samples. + seed + Seed for random number generator. + + Returns + ------- + sample_array + The sampled latent parameter vectors. Each row corresponds to a single + latent parameter vector. + """ + + # make sure that all parameters are one-dimensional; it is not straight forward + # how to do LHS for general multivariate parameters + for prm_name in self.problem.latent_prms: + if self.problem.parameters[prm_name].dim > 1: + raise RuntimeError( + f"The given problem has a multivariate parameter ('{prm_name}') " + f"for which LHS is not supported." + ) + + # the dimension of the required hypercube is the number of parameters; note + # that this only holds since all parameters are 1D (see the check above) + dim = self.problem.n_latent_prms + + # create the requested number of latin hypercube samples with the requested dim. + lhd = qmc.LatinHypercube(d=dim, seed=seed).random(n=n_samples) + + # this is going to be the array of the parameter samples; each row will + # correspond to a theta-vector + sample_array = np.zeros(lhd.shape) + + # fill the prepared array for the parameter samples using the LHS samples + for prm_name, parameter in self.problem.parameters.items(): + if parameter.is_latent: + idx = parameter.index + prior = self.priors[prm_name] + prms = self.problem.get_constants(prior.hyperparameters) + for lt_prm in self.problem.get_latent_prior_hyperparameters(prm_name): + idx_lt = self.problem.parameters[lt_prm].index + prms[lt_prm] = sample_array[:, idx_lt] + q = lhd[:, idx] + sample_array[:, idx] = prior(prms, "ppf", q, use_ref_prm=False) + + return sample_array + + def sample( + self, + forward_model: ForwardModelBase, + n_samples: int, + seed: int = 1, + ) -> Tuple[pd.DataFrame, dict]: + """ + Generates a given number of training data for fitting a surrogate model. The + training data contains a number of parameter vectors (sampled using LHS) and + the corresponding model responses. + + Parameters + ---------- + forward_model + The forward model that should be evaluated. + n_samples + The number of parameter vectors the forward model should be evaluated for. + seed + Seed for random number generator. + + Returns + ------- + prm_samples_pd + The parameter samples the forward model was evaluated at. + responses_over_experiments + The keys are the names of the experiment the forward model is associated + with, while the values are 3D-arrays containing the forward model's + response. responses_over_experiments[i][j] will contain the forward model's + response with the ith parameter-vector for the jth output sensor. + """ + + # get the forward model object with the given name and prepare the corresponding + # experimental in- and output dictionaries + forward_model.prepare_experimental_inputs_and_outputs() + + # generate the latent parameter samples and convert it to a data frame to have + # the association between columns and parameter names + prm_samples = self.generate_samples(n_samples, seed=seed) + prm_samples_pd = pd.DataFrame( + prm_samples, columns=self.problem.get_theta_names() + ) + + # this dictionary will contain the forward model responses for each of the + # experiments associated with the forward model; so the keys will be experiment + # names while the values will be 3D-arrays with the forward model's responses; + # responses_over_experiments[i] will correspond to the response of the ith + # parameter vector; responses_over_experiments[i][j] will contain the forward + # model's response with the ith parameter vector for the jth output sensor + responses_over_experiments = {} + + # here, the length of the vectors of the forward model's output sensors is + # determined; to that end, the forward model is evaluated once + first_exp_name = forward_model.experiment_names[0] + exp_inp = forward_model.input_from_experiments[first_exp_name] + first_theta = prm_samples[0] + prms_model = self.problem.get_parameters(first_theta, forward_model.prms_def) + inp = {**exp_inp, **prms_model} + response_dict = forward_model.response(inp) + # make sure that the vectors returned by each of the forward model's output + # sensors has the same length; otherwise an AssertionError is raised + length_set = set() + for value in response_dict.values(): + length_set.add(len_or_one(value)) + assert len(length_set) == 1 + n_out_values = list(length_set)[0] + + # evaluate the forward model for each experiment/parameter vector + for exp_name in forward_model.experiment_names: + exp_inp = forward_model.input_from_experiments[exp_name] + response_array = np.zeros( + (n_samples, forward_model.n_output_sensors, n_out_values) + ) + for i, theta in enumerate(prm_samples): + prms_model = self.problem.get_parameters(theta, forward_model.prms_def) + inp = {**exp_inp, **prms_model} # adds the two dictionaries + response_dict = forward_model.response(inp) + for j, response_vector in enumerate(response_dict.values()): + response_array[i, j, :] = response_vector + responses_over_experiments[exp_name] = response_array + + return prm_samples_pd, responses_over_experiments + + +class HarlowSampler(SamplerBase): + """ + Model factory that returns a `ForwardModel` object, where the response + is obtained from a fitted surrogate model. + + Usage: + TODO + + Todo: + * Currently, each point is added as a separate surrogate model + + Notes: + * This is a utility class meant to facilitate the sampling and surrogating + of expensive forward models by bridging `probeye` with `harlow`. The + main objective is to reduce the manual effort required for surrogating + by eliminating the need to re-specify the problem when building the + surrogate. + + """ + + def __init__( + self, + problem: InverseProblem, + forward_model: ForwardModelBase, + sampler: HarlowSamplerBase, + surrogate_model: HarlowSurrogateBase, + fit_points_x: np.ndarray = None, + fit_points_y: np.ndarray = None, + test_points_x: np.ndarray = None, + test_points_y: np.ndarray = None, + evaluation_metric: Callable = None, + verbose: bool = False, + run_name: str = "test", + **surrogate_kwargs, + ): + + super().__init__(problem) + + # Initialize + self.surrogate_kwargs = surrogate_kwargs + self.problem = problem + self.forward_model = forward_model + self.input_sensors = forward_model.input_sensors + self.output_sensors = forward_model.output_sensors + self.parameters = forward_model.parameters + self.fit_points_x = fit_points_x + self.fit_points_y = fit_points_y + self.test_points_x = test_points_x + self.test_points_y = test_points_y + self.evaluation_metric = evaluation_metric + self.verbose = verbose + self.run_name = run_name + + # Find the experiment associated with the expensive forward model + # and get sensor information + # TODO: Implement check that the expensive forward model + # has been added to the problem + self.input_sensor_data = dict() + for exp_name, exp in self.problem.experiments.items(): + if exp.forward_model == self.forward_model.name: + for inp_sensor in self.input_sensors: + self.input_sensor_data[inp_sensor.name] = list( + exp.sensor_data[inp_sensor.name] + ) + + # Check that the supplied forward model only has a single sensor + # TODO: Can a forward model have multiple input sensors? + if len(list(self.input_sensor_data.keys())) != 1: + raise ValueError( + f"The specified forward model must have " + f"exactly one input sensor but has " + f"{len(list(self.input_sensor_data.keys()))}" + ) + + # Extract sensor names and input/output shape information + # from forward model. + # TODO: This should be adjusted depending on the problem definition + # or specified by the user e.g. with keyword arguments that specify + # the structure of the surrogate model, e.g.: + # * `shape = None` to create an independent surrogate for each observation + # * `shape = Sensor` to create a surrogate model per sensor + self.vec_t = list(self.input_sensor_data.values())[0] + self.model_names = [] + for idx_x, os in enumerate(forward_model.output_sensors): + for idx_t, t in enumerate(self.vec_t): + self.model_names.append(os.name + "_" + str(t)) + self.surrogate_kwargs["model_names"] = self.model_names + self.num_features = len(forward_model.output_sensors) + self.surrogate_kwargs["num_features"] = self.num_features + + # Get bounds + self._get_bounds() + + # Surrogate model + self.surrogate = surrogate_model + self.func_pred = self.surrogate.predict + + # Initialize sampler + self.sampler = sampler( + target_function=self.target, + surrogate_model=self.surrogate, + domain_lower_bound=self.lower_bounds, + domain_upper_bound=self.upper_bounds, + fit_points_x=self.fit_points_x, + fit_points_y=self.fit_points_y, + test_points_x=self.test_points_x, + test_points_y=self.test_points_y, + evaluation_metric=self.evaluation_metric, + verbose=self.verbose, + run_name=self.run_name, + ) + + def fit(self, **kwargs): + """ + Fit the surrogate model using the sampling points and observations + stored in `self.sampler` + + Parameters + ---------- + kwargs + + Returns + ------- + + """ + + self.surrogate.fit( + self.sampler.fit_points_x, self.sampler.fit_points_y, **kwargs + ) + + def sample(self, **kwargs): + """ + Draw samples from the forward model and fit the surrogate model + using the specified sampler. + + Parameters + ---------- + kwargs + + Returns + ------- + + """ + + self.fit_points_x, self.fit_points_y = self.sampler.sample(**kwargs) + + # # Get prediction function + # self.func_pred = self.surrogate.predict + + def get_training_data(self, n_samples=1): + """ + Obtain training points `X` with latin hypercube sampling and + corresponding observations `y` by evaluating the target function. + + Returns + ------- + A tuple of training points and observations `(train_X, train_y)` + + """ + train_X = latin_hypercube_sampling( + n_sample=n_samples, + domain_lower_bound=np.array(self.lower_bounds), + domain_upper_bound=np.array(self.upper_bounds), + ) + train_y = self.target(train_X) + + return train_X, train_y + + def get_harlow_model( + self, + name: str, + ) -> ForwardModelBase: + + """ + Model factory for the `HarlowSurrogate` class. + + TODO: + * Refactor this function so that it returns a `ForwardModel` object + with a `.predict()` method that queries an instance of `Surrogate`. + * It is important that the generated `ForwardModel` instance does not + contain the surrogate model itself (due to incompatibility of tensors + with `copy.deepcopy`), or the problem definition. + + Notes + --------- + * This function was originally created as a workaround for passing arguments to + classes derived from `ForwardModelBase`. + + Parameters + ---------- + problem + forward_model + surrogate + n_output_dims + kwargs + + Returns + ------- + + """ + + class HarlowSurrogate(ForwardModelBase): + + # Get interface from existing forward model + def interface(self): + self.input_sensors = self.kwargs["input_sensors"] + self.output_sensors = self.kwargs["output_sensors"] + self.parameters = self.kwargs["parameters"] + self.input_coordinates = self.kwargs["input_coordinates"] + + def response(self, inp: dict) -> dict: + # TODO: Currently this method assumes that `Nx` and `Nt` are available. + # Instead they should be obtained from the problem definition + params = np.atleast_2d([inp["X" + str(i + 1)] for i in range(4)]) + response = dict() + + # TODO: Adjust `X` to account for the case of sensors with vector + # input (e.g. timeseries). + vec_t = self.input_coordinates[self.input_sensors[0].name] + + # Evaluate function and arange output on grid + f = self.kwargs["func_pred"](params, return_std=False) + f = np.reshape(f, (len(self.output_sensors), len(vec_t))) + + for idx_x, os in enumerate(self.output_sensors): + response[os.name] = f[idx_x, :] + + return response + + return HarlowSurrogate( + name, + func_pred=self.func_pred, + input_sensors=self.input_sensors, + output_sensors=self.output_sensors, + parameters=self.parameters, + input_coordinates=self.input_sensor_data, + ) + + # TODO: This will likely fail for multidimensional parameters. FIX! + def _get_bounds(self): + + self.lower_bounds = [] + self.upper_bounds = [] + self.bounds = {} + + # Find the priors corresponding to model parameters + for key, value in self.problem.priors.items(): + + # Error if any of the model priors are not uniform + if value.prior_type != "uniform": + raise ValueError( + f"Non-uniform distribution of type {value.prior_type}" + f" found for prior {key}. The `HarlowSurrogate` currently" + f" only supports uniform priors." + ) + + # Dictionary of upper and lower bounds + for param in self.parameters: + lb = self.problem.constant_prms_dict["low_" + param] + ub = self.problem.constant_prms_dict["high_" + param] + + # List of upper and lower bounds + self.lower_bounds.append(lb) + self.upper_bounds.append(ub) + + # Append to dict + self.bounds[param] = {"low": lb, "high": ub} + + def _cast_to_numpy(self, inp: dict) -> np.ndarray: + raise NotImplementedError + + def _cast_to_dict(self, X: np.ndarray) -> dict: + return dict(zip(self.parameters, X)) + + def target(self, X: np.ndarray) -> np.ndarray: + # TODO: Adjust `y` to account for the case of sensors with vector + # input (e.g. timeseries). + + # Initialize output array + y = np.zeros((X.shape[0], len(self.output_sensors) * len(self.vec_t))) + + # Evaluate physical model for each input vector + for idx_Xi, Xi in enumerate(X): + response = self.forward_model.response(self._cast_to_dict(Xi)) + sensor_out = [] + for idx_os, os in enumerate(self.output_sensors): + sensor_out.append(response[os.name]) + y[idx_Xi, :] = np.ravel(sensor_out) + + return np.array(y) diff --git a/probeye/metamodeling/surrogating.py b/probeye/metamodeling/surrogating.py new file mode 100644 index 00000000..01891396 --- /dev/null +++ b/probeye/metamodeling/surrogating.py @@ -0,0 +1,430 @@ +""" +Surrogating base class and specific implementations + +Notes: + * There is likely no need for the surrogate to have access to any of the problem + internals, since the sampling will be handled by the sampler class. + * This allows us to have a very simple surrogate base class by only implementing + `fit`, `predict` (optional?) and `response`. + * An existing forward model must be provided as an input argument. The surrogate + will copy the interface method from that forward model. + * The base class is used similar to `ForwardModelBase`. The user is expected to derive + their own class from it. +""" + +# standard library +from copy import deepcopy + +# external +import numpy as np + +# local imports +from probeye.definition.forward_model import ForwardModelBase + +# harlow imports +from harlow.surrogating import Surrogate as HarlowSurrogateBase + + +class SurrogateModelBase(ForwardModelBase): + """ + Base class for a surrogate model, i.e., a forward model that approximates another + (typically computationally more expensive) forward model. The surrogate does not + need to have access to the problem internals. Therefore, the surrogate base class + simply extends `ForwardModelBase` with a `fit` method. + + Notes: + * The `forward_model` input is needed to obtain the interface. Surrogate models + will copy the interface of the associated forward model and override the + `response` method. + + TODO: + * Check out probeye's existing machinery for copying elements of the forward + model definition. Use that instead of `deepcopy`. + + Parameters + ---------- + + """ + + def __init__( + self, name: str, surrogate: HarlowSurrogateBase, forward_model: ForwardModelBase + ): + super().__init__(name, surrogate=surrogate, forward_model=forward_model) + + def interface(self): + self.parameters = deepcopy(self.kwargs["forward_model"].parameters) + self.input_sensors = deepcopy(self.kwargs["forward_model"].input_sensors) + self.output_sensors = deepcopy(self.kwargs["forward_model"].output_sensors) + + def fit(self, train_X: np.ndarray, train_y: np.ndarray): + """ + Prepares the surrogate model by approximating the forward model in some way. + """ + raise NotImplementedError + + def predict(self, X: np.ndarray): + """ + Used by samplers that require evaluation of the surrogate model. + """ + raise NotImplementedError + + def response(self, inp: dict) -> dict: + """ + Overrides the `response` method of the forward model. + """ + raise NotImplementedError + + +class HarlowSurrogate(SurrogateModelBase): + """ """ + + def __init__( + self, + name: str, + surrogate: HarlowSurrogateBase, + forward_model: ForwardModelBase, + **kwargs + ): + + # Initialize + self.forward_model = forward_model + self.surrogate = surrogate + self.func_pred = self.surrogate.predict + self.kwargs = kwargs + + super().__init__(name, surrogate=surrogate, forward_model=forward_model) + + def fit(self, train_X: np.ndarray, train_y: np.ndarray, **kwargs): + """ + Fit the surrogate model using the sampling points and observations + stored in `self.sampler` + """ + + self.surrogate.fit(self, train_X, train_y, **kwargs) + + def predict(self, X: np.ndarray, **kwargs) -> np.ndarray: + return self.surrogate.predict(X, **kwargs) + + def response(self, inp: dict) -> dict: + + # # TODO: + # # * Adjust `X` to account for the case of sensors with vector input (e.g. timeseries). + # # * Obtain input coordinates from the interface + # input_sensor_name = self.input_sensors[0].name + # N_x = len(self.output_sensors) + # N_t = len(inp[input_sensor_name]) + + # Evaluate function and arange output on grid + f = self.surrogate.predict(self.cast_to_array(inp)) + f = np.reshape(f, (len(self.output_sensors), -1)) + + response = dict() + for idx_x, os in enumerate(self.output_sensors): + response[os.name] = f[idx_x, :] + + return response + + def cast_to_array(self, inp: dict) -> np.ndarray: + """ + Convert dictionary of parameter: value pairs to a 1D numpy array in the order that + they are provided in `inp`. + """ + return np.atleast_2d([inp[param] for param in self.parameters]) + + def cast_to_dict(self, X: np.ndarray) -> dict: + """ + Convert 1D numpy array of inputs to the dictionary input expected by `response`. The order + of elements in `X` is assumed to be the order of parameters in `self.parameters`. + """ + return dict(zip(self.parameters, X)) + + +# # TODO: Old code to be removed +# class SurrogateModelFactory: +# """ +# Model factory that returns a `ForwardModel` object, where the response +# is obtained from a fitted surrogate model. +# +# Usage: +# TODO +# +# Todo: +# * Currently, each point is added as a separate surrogate model +# +# Notes: +# * This is a utility class meant to facilitate the sampling and surrogating +# of expensive forward models by bridging `probeye` with `harlow`. The +# main objective is to reduce the manual effort required for surrogating +# by eliminating the need to re-specify the problem when building the +# surrogate. +# +# """ +# +# def __init__( +# self, +# problem: InverseProblem, +# forward_model: ForwardModelBase, +# sampler: Sampler, +# surrogate_model: Surrogate, +# fit_points_x: np.ndarray = None, +# fit_points_y: np.ndarray = None, +# test_points_x: np.ndarray = None, +# test_points_y: np.ndarray = None, +# evaluation_metric: Callable = None, +# verbose: bool = False, +# run_name: str = "test", +# **surrogate_kwargs, +# ): +# +# # Initialize +# self.surrogate_kwargs = surrogate_kwargs +# self.problem = problem +# self.forward_model = forward_model +# self.input_sensors = forward_model.input_sensors +# self.output_sensors = forward_model.output_sensors +# self.parameters = forward_model.parameters +# self.fit_points_x = fit_points_x +# self.fit_points_y = fit_points_y +# self.test_points_x = test_points_x +# self.test_points_y = test_points_y +# self.evaluation_metric = evaluation_metric +# self.verbose = verbose +# self.run_name = run_name +# +# # Find the experiment associated with the expensive forward model +# # and get sensor information +# # TODO: Implement check that the expensive forward model +# # has been added to the problem +# self.input_sensor_data = dict() +# for exp_name, exp in self.problem.experiments.items(): +# if exp.forward_model == self.forward_model.name: +# for inp_sensor in self.input_sensors: +# self.input_sensor_data[inp_sensor.name] = list( +# exp.sensor_data[inp_sensor.name] +# ) +# +# # Check that the supplied forward model only has a single sensor +# # TODO: Can a forward model have multiple input sensors? +# if len(list(self.input_sensor_data.keys())) != 1: +# raise ValueError( +# f"The specified forward model must have " +# f"exactly one input sensor but has " +# f"{len(list(self.input_sensor_data.keys()))}" +# ) +# +# # Extract sensor names and input/output shape information +# # from forward model. +# # TODO: This should be adjusted depending on the problem definition +# # or specified by the user e.g. with keyword arguments that specify +# # the structure of the surrogate model, e.g.: +# # * `shape = None` to create an independent surrogate for each observation +# # * `shape = Sensor` to create a surrogate model per sensor +# self.vec_t = list(self.input_sensor_data.values())[0] +# self.model_names = [] +# for idx_x, os in enumerate(forward_model.output_sensors): +# for idx_t, t in enumerate(self.vec_t): +# self.model_names.append(os.name + "_" + str(t)) +# self.surrogate_kwargs["model_names"] = self.model_names +# self.num_features = len(forward_model.output_sensors) +# self.surrogate_kwargs["num_features"] = self.num_features +# +# # Get bounds +# self._get_bounds() +# +# # Initialize surrogate +# self.surrogate = surrogate_model(**surrogate_kwargs) +# self.func_pred = self.surrogate.predict +# +# # Initialize sampler +# self.sampler = sampler( +# target_function=self.target, +# surrogate_model=self.surrogate, +# domain_lower_bound=self.lower_bounds, +# domain_upper_bound=self.upper_bounds, +# fit_points_x=self.fit_points_x, +# fit_points_y=self.fit_points_y, +# test_points_x=self.test_points_x, +# test_points_y=self.test_points_y, +# evaluation_metric=self.evaluation_metric, +# verbose=self.verbose, +# run_name=self.run_name, +# ) +# +# def fit(self, **kwargs): +# """ +# Fit the surrogate model using the sampling points and observations +# stored in `self.sampler` +# +# Parameters +# ---------- +# kwargs +# +# Returns +# ------- +# +# """ +# +# self.surrogate.fit( +# self.sampler.fit_points_x, self.sampler.fit_points_y, **kwargs +# ) +# +# def sample(self, **kwargs): +# """ +# Draw samples from the forward model and fit the surrogate model +# using the specified sampler. +# +# Parameters +# ---------- +# kwargs +# +# Returns +# ------- +# +# """ +# +# self.fit_points_x, self.fit_points_y = self.sampler.sample(**kwargs) +# +# # # Get prediction function +# # self.func_pred = self.surrogate.predict +# +# def get_training_data(self, n_samples=1): +# """ +# Obtain training points `X` with latin hypercube sampling and +# corresponding observations `y` by evaluating the target function. +# +# Returns +# ------- +# A tuple of training points and observations `(train_X, train_y)` +# +# """ +# train_X = latin_hypercube_sampling( +# n_sample=n_samples, +# domain_lower_bound=np.array(self.lower_bounds), +# domain_upper_bound=np.array(self.upper_bounds), +# ) +# train_y = self.target(train_X) +# +# return train_X, train_y +# +# def get_harlow_model( +# self, +# name: str, +# ) -> ForwardModelBase: +# +# """ +# Model factory for the `HarlowSurrogate` class. +# +# TODO: +# * Refactor this function so that it returns a `ForwardModel` object +# with a `.predict()` method that queries an instance of `Surrogate`. +# * It is important that the generated `ForwardModel` instance does not +# contain the surrogate model itself (due to incompatibility of tensors +# with `copy.deepcopy`), or the problem definition. +# +# Notes +# --------- +# * This function was originally created as a workaround for passing arguments to +# classes derived from `ForwardModelBase`. +# +# Parameters +# ---------- +# problem +# forward_model +# surrogate +# n_output_dims +# kwargs +# +# Returns +# ------- +# +# """ +# +# class HarlowSurrogate(ForwardModelBase): +# +# # Get interface from existing forward model +# def interface(self): +# self.input_sensors = self.kwargs["input_sensors"] +# self.output_sensors = self.kwargs["output_sensors"] +# self.parameters = self.kwargs["parameters"] +# self.input_coordinates = self.kwargs["input_coordinates"] +# +# def response(self, inp: dict) -> dict: +# # TODO: Currently this method assumes that `Nx` and `Nt` are available. +# # Instead they should be obtained from the problem definition +# params = np.atleast_2d([inp["X" + str(i + 1)] for i in range(4)]) +# response = dict() +# +# # TODO: Adjust `X` to account for the case of sensors with vector +# # input (e.g. timeseries). +# vec_t = self.input_coordinates[self.input_sensors[0].name] +# +# # Evaluate function and arange output on grid +# f = self.kwargs["func_pred"](params, return_std=False) +# f = np.reshape(f, (len(self.output_sensors), len(vec_t))) +# +# for idx_x, os in enumerate(self.output_sensors): +# response[os.name] = f[idx_x, :] +# +# return response +# +# return HarlowSurrogate( +# name, +# func_pred=self.func_pred, +# input_sensors=self.input_sensors, +# output_sensors=self.output_sensors, +# parameters=self.parameters, +# input_coordinates=self.input_sensor_data, +# ) +# +# # TODO: This will likely fail for multidimensional parameters. FIX! +# def _get_bounds(self): +# +# self.lower_bounds = [] +# self.upper_bounds = [] +# self.bounds = {} +# +# # Find the priors corresponding to model parameters +# for key, value in self.problem.priors.items(): +# +# # Error if any of the model priors are not uniform +# if value.prior_type != "uniform": +# raise ValueError( +# f"Non-uniform distribution of type {value.prior_type}" +# f" found for prior {key}. The `HarlowSurrogate` currently" +# f" only supports uniform priors." +# ) +# +# # Dictionary of upper and lower bounds +# for param in self.parameters: +# lb = self.problem.constant_prms_dict["low_" + param] +# ub = self.problem.constant_prms_dict["high_" + param] +# +# # List of upper and lower bounds +# self.lower_bounds.append(lb) +# self.upper_bounds.append(ub) +# +# # Append to dict +# self.bounds[param] = {"low": lb, "high": ub} +# +# def _cast_to_numpy(self, inp: dict) -> np.ndarray: +# raise NotImplementedError +# +# def _cast_to_dict(self, X: np.ndarray) -> dict: +# return dict(zip(self.parameters, X)) +# +# def target(self, X: np.ndarray) -> np.ndarray: +# # TODO: Adjust `y` to account for the case of sensors with vector +# # input (e.g. timeseries). +# +# # Initialize output array +# y = np.zeros((X.shape[0], len(self.output_sensors) * len(self.vec_t))) +# +# # Evaluate physical model for each input vector +# for idx_Xi, Xi in enumerate(X): +# response = self.forward_model.response(self._cast_to_dict(Xi)) +# sensor_out = [] +# for idx_os, os in enumerate(self.output_sensors): +# sensor_out.append(response[os.name]) +# y[idx_Xi, :] = np.ravel(sensor_out) +# +# return np.array(y) diff --git a/setup.cfg b/setup.cfg index 9c42a4bf..acb79190 100644 --- a/setup.cfg +++ b/setup.cfg @@ -7,7 +7,6 @@ description = A general framework for setting up parameter estimation problems. long_description = file: README.md long_description_content_type = text/markdown classifiers = - Programming Language :: Python :: 3.6 Programming Language :: Python :: 3.7 Programming Language :: Python :: 3.8 Programming Language :: Python :: 3.9 @@ -18,7 +17,7 @@ classifiers = license_files = LICENSE [options] -python_requires = >= 3.6 +python_requires = >= 3.7 packages = find: include_package_data = True install_requires = @@ -33,6 +32,7 @@ install_requires = owlready2<1 dynesty tri-py<1 + pandas<2 [options.package_data] probeye = diff --git a/tests/integration_tests/tests_without_correlation/test_surrogate_model.py b/tests/integration_tests/tests_without_correlation/test_surrogate_model.py new file mode 100644 index 00000000..0f311168 --- /dev/null +++ b/tests/integration_tests/tests_without_correlation/test_surrogate_model.py @@ -0,0 +1,175 @@ +# standard library imports +import unittest +import time + +# third party imports +import numpy as np +import matplotlib.pyplot as plt + +# local imports (problem definition) +from probeye.definition.inverse_problem import InverseProblem +from probeye.definition.forward_model import ForwardModelBase +from probeye.metamodeling.surrogating import SurrogateModelBase +from probeye.definition.sensor import Sensor +from probeye.definition.likelihood_model import GaussianLikelihoodModel + + +class TestProblem(unittest.TestCase): + def test_surrogate_model(self, plot: bool = False): + """ + Demonstrates how to use a surrogate model. Still under discussion. + + Parameters + ---------- + plot + Triggers a plot of the data used for calibration. + """ + + # ============================================================================ # + # Set numeric values # + # ============================================================================ # + + # 'true' value of a, and its normal prior parameters + m_true = 2.5 + mean_m = 2.0 + std_m = 1.0 + + # 'true' value of b, and its normal prior parameters + b_true = 1.7 + mean_b = 1.0 + std_b = 1.0 + + # 'true' value of additive error sd, and its uniform prior parameters + sigma = 0.5 + low_sigma = 0.0 + high_sigma = 0.8 + + # the number of generated experiment_names and seed for random numbers + n_tests = 50 + seed = 1 + + # ============================================================================ # + # Define the Forward Model # + # ============================================================================ # + + class ExpensiveModel(ForwardModelBase): + def interface(self): + self.parameters = ["m", "b"] + self.input_sensors = Sensor("x") + self.output_sensors = Sensor("y", std_model="sigma") + + def response(self, inp: dict) -> dict: + x = inp["x"] + m = inp["m"] + b = inp["b"] + time.sleep(1.0) + return {"y": m * x + b} + + # ============================================================================ # + # Define the Surrogate Model # + # ============================================================================ # + + class SurrogateModel(ExpensiveModel, SurrogateModelBase): + """ + The inheritance from ExpensiveModel 'copies' the interface-method from + ExpensiveModel (the surrogate model should have the same interface as the + forward model). The inheritance from SurrogateModelBase is required to + assign a forward model to the surrogate model, see surrogate_model.py. + """ + + def response(self, inp: dict) -> dict: + x = inp["x"] + m = inp["m"] + b = inp["b"] + return {"y": m * x + b} + + # ============================================================================ # + # Define the Inference Problem # + # ============================================================================ # + + # initialize the inverse problem with a useful name + problem = InverseProblem("Using a surrogate model") + + # add all parameters to the problem + problem.add_parameter( + "m", + "model", + tex="$m$", + info="Slope of the graph", + prior=("normal", {"mean": mean_m, "std": std_m}), + ) + problem.add_parameter( + "b", + "model", + info="Intersection of graph with y-axis", + tex="$b$", + prior=("normal", {"mean": mean_b, "std": std_b}), + ) + problem.add_parameter( + "sigma", + "likelihood", + domain="(0, +oo)", + tex=r"$\sigma$", + info="Standard deviation, of zero-mean additive model error", + prior=("uniform", {"low": low_sigma, "high": high_sigma}), + ) + + # add the surrogate model (which is essentially a forward model) to the problem; + # whether or not the surrogate model gets the original forward model is not + # finally decided on, but it might not be necessary + forward_model = ExpensiveModel("ExpensiveModel") + surrogate_model = SurrogateModel("FastModel", forward_model=forward_model) + problem.add_forward_model(surrogate_model) + + # ============================================================================ # + # Add test data to the Inference Problem # + # ============================================================================ # + + # data-generation; normal likelihood with constant variance around each point + np.random.seed(seed) + x_test = np.linspace(0.0, 1.0, n_tests) + y_true = surrogate_model.response( + {surrogate_model.input_sensor.name: x_test, "m": m_true, "b": b_true} + )[surrogate_model.output_sensor.name] + y_test = np.random.normal(loc=y_true, scale=sigma) + + # add the experimental data + problem.add_experiment( + f"TestSeries_1", + fwd_model_name="FastModel", + sensor_values={ + forward_model.input_sensor.name: x_test, + forward_model.output_sensor.name: y_test, + }, + ) + + # plot the true and noisy data + if plot: + plt.scatter(x_test, y_test, label="measured data", s=10, c="red", zorder=10) + plt.plot(x_test, y_true, label="true", c="black") + plt.xlabel(forward_model.input_sensor.name) + plt.ylabel(forward_model.output_sensor.name) + plt.legend() + plt.tight_layout() + plt.draw() # does not stop execution + + # ============================================================================ # + # Add likelihood model(s) # + # ============================================================================ # + + # add the likelihood model to the problem + problem.add_likelihood_model( + GaussianLikelihoodModel( + prms_def="sigma", + experiment_name="TestSeries_1", + model_error="additive", + name="SimpleLikelihoodModel", + ) + ) + + # give problem overview + problem.info() + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit_tests/surrogate/test_initial_sampling.py b/tests/unit_tests/surrogate/test_initial_sampling.py new file mode 100644 index 00000000..b7e5a64a --- /dev/null +++ b/tests/unit_tests/surrogate/test_initial_sampling.py @@ -0,0 +1,105 @@ +# standard library +import unittest + +# third party imports +import numpy as np + +# local imports +from probeye.definition.inverse_problem import InverseProblem +from probeye.definition.forward_model import ForwardModelBase +from probeye.definition.sensor import Sensor +from probeye.metamodeling.initial_sampling import LatinHypercubeSampler + + +class LinearModel(ForwardModelBase): + def interface(self): + self.parameters = ["a", "b"] + self.input_sensors = Sensor("x") + self.output_sensors = Sensor("y", std_model="sigma") + + def response(self, inp: dict) -> dict: + x = inp["x"] + m = inp["a"] + b = inp["b"] + return {"y": m * x + b} + + +class TestProblem(unittest.TestCase): + def test_LatinHypercubeSampler_generate_samples_normal_case(self): + + # prepare a dummy problem as far as necessary for this test + problem = InverseProblem("Test") + problem.add_parameter("a", prior=("normal", {"mean": 2.0, "std": 1.0})) + problem.add_parameter("b", prior=("normal", {"mean": 1.0, "std": 0.5})) + + # create the samples from the priors + lhs = LatinHypercubeSampler(problem) + sample_array = lhs.generate_samples(10, seed=1) + + # check if everything is as expected + self.assertEqual(sample_array.shape, (10, 2)) + self.assertAlmostEqual(float(np.mean(sample_array[:, 0])), 2.0, delta=0.3) + self.assertAlmostEqual(float(np.std(sample_array[:, 0])), 1.0, delta=0.3) + self.assertAlmostEqual(float(np.mean(sample_array[:, 1])), 1.0, delta=0.3) + self.assertAlmostEqual(float(np.std(sample_array[:, 1])), 0.5, delta=0.2) + + def test_LatinHypercubeSampler_generate_samples_mv_prior(self): + + # prepare a dummy problem as far as necessary for this test + problem = InverseProblem("Test") + problem.add_parameter( + "ab", + dim=2, + prior=( + "multivariate-normal", + { + "mean": np.array([2.0, 1.0]), + "cov": np.array([[1.0, 0], [0, 0.5**2]]), + }, + ), + ) + + # try creating the samples from the priors; this should not work since the + # problem contains a multivariate prior + lhs = LatinHypercubeSampler(problem) + with self.assertRaises(RuntimeError): + _ = lhs.generate_samples(10, seed=1) + + def test_LatinHypercubeSampler_generate_training_data(self): + + # prepare a dummy problem as far as necessary for this test + problem = InverseProblem("Test") + problem.add_parameter("a", prior=("normal", {"mean": 2.0, "std": 1.0})) + problem.add_parameter("b", prior=("normal", {"mean": 1.0, "std": 0.5})) + forward_model = LinearModel("LinearModel") + problem.add_forward_model(forward_model) + problem.add_experiment( + exp_name="Exp1", + fwd_model_name="LinearModel", + sensor_values={ + "x": np.array([0.0, 1.0, 2.0]), + "y": np.array([1.1, 2.9, 5.3]), + }, + ) + + # create the training data + lhs = LatinHypercubeSampler(problem) + prm_samples_pd, responses_over_experiments = lhs.generate_training_data( + forward_model, 10, seed=1 + ) + + # check if everything is as expected (with prm_samples_pd) + sample_array = prm_samples_pd.values + self.assertEqual(sample_array.shape, (10, 2)) + self.assertTrue((prm_samples_pd.columns == ["a", "b"]).all()) + self.assertAlmostEqual(float(np.mean(sample_array[:, 0])), 2.0, delta=0.3) + self.assertAlmostEqual(float(np.std(sample_array[:, 0])), 1.0, delta=0.3) + self.assertAlmostEqual(float(np.mean(sample_array[:, 1])), 1.0, delta=0.3) + self.assertAlmostEqual(float(np.std(sample_array[:, 1])), 0.5, delta=0.2) + + # check if everything is as expected (with responses_over_experiments) + self.assertEqual(responses_over_experiments["Exp1"].shape, (10, 1, 3)) + + +if __name__ == "__main__": + unittest.main()