diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..5af6191 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,8 @@ +{ + "python.testing.pytestArgs": [ + "tests" + ], + "python.testing.unittestEnabled": false, + "python.testing.pytestEnabled": true, + "python.analysis.typeCheckingMode": "basic" +} \ No newline at end of file diff --git a/environment.yml b/environment.yml index cb7cb6e..8eb2e98 100644 --- a/environment.yml +++ b/environment.yml @@ -8,6 +8,8 @@ dependencies: - matplotlib - python-gmsh - meshio + - jsonschema + - scipy # tests - pytest - coverage diff --git a/src/fenicsxconcrete/finite_element_problem/base_material.py b/src/fenicsxconcrete/finite_element_problem/base_material.py index 8e270a1..15bc576 100644 --- a/src/fenicsxconcrete/finite_element_problem/base_material.py +++ b/src/fenicsxconcrete/finite_element_problem/base_material.py @@ -1,11 +1,16 @@ +import importlib +import json from abc import ABC, abstractmethod from pathlib import Path, PosixPath +import dolfinx as df +import jsonschema import pint from fenicsxconcrete.experimental_setup.base_experiment import Experiment from fenicsxconcrete.helper import LogMixin, Parameters from fenicsxconcrete.sensor_definition.base_sensor import BaseSensor +from fenicsxconcrete.sensor_definition.sensor_schema import generate_sensor_schema from fenicsxconcrete.unit_registry import ureg @@ -61,6 +66,10 @@ def __init__( self.residual = None # initialize residual + # set up xdmf file with mesh info + with df.io.XDMFFile(self.mesh.comm, self.pv_output_file, "w") as f: + f.write_mesh(self.mesh) + # setup the material object to access the function self.setup() @@ -99,6 +108,60 @@ def delete_sensor(self) -> None: del self.sensors self.sensors = self.SensorDict() + def export_sensors_metadata(self, path: Path) -> None: + """Exports sensor metadata to JSON file according to the appropriate schema. + + Args: + path : Path + Path where the metadata should be stored + + """ + + sensors_metadata_dict = {"sensors": []} + + for key, value in self.sensors.items(): + sensors_metadata_dict["sensors"].append(value.report_metadata()) + # sensors_metadata_dict[key]["name"] = key + + with open(path, "w") as f: + json.dump(sensors_metadata_dict, f) + + def import_sensors_from_metadata(self, path: Path) -> None: + """Import sensor metadata to JSON file and validate with the appropriate schema. + + Args: + path : Path + Path where the metadata file is + + """ + + # Load and validate + sensors_metadata_dict = {} + with open(path, "r") as f: + sensors_metadata_dict = json.load(f) + schema = generate_sensor_schema() + jsonschema.validate(instance=sensors_metadata_dict, schema=schema) + + for sensor in sensors_metadata_dict["sensors"]: + # Dynamically import the module containing the class + module_name = "fenicsxconcrete.sensor_definition." + sensor["sensor_file"].lower() + module = importlib.import_module(module_name) + + # Create a dictionary of keyword arguments from the remaining properties in the dictionary + kwargs = { + k: v for k, v in sensor.items() if k not in ["id", "type", "sensor_file", "units", "dimensionality"] + } + + # Dynamically retrieve the class by its name + class_name = sensor["type"] + MySensorClass = getattr(module, class_name) + + # Instantiate an object of the class with the given properties + sensor_i = MySensorClass(name=sensor["id"], **kwargs) + sensor_i.set_units(units=sensor["units"]) + + self.add_sensor(sensor_i) + class SensorDict(dict): """ Dict that also allows to access the parameter p["parameter"] via the matching attribute p.parameter diff --git a/src/fenicsxconcrete/finite_element_problem/linear_elasticity_grf.py b/src/fenicsxconcrete/finite_element_problem/linear_elasticity_grf.py new file mode 100644 index 0000000..c01f25a --- /dev/null +++ b/src/fenicsxconcrete/finite_element_problem/linear_elasticity_grf.py @@ -0,0 +1,160 @@ +from pathlib import Path, PosixPath + +import dolfinx as df +import numpy as np +import pint +import ufl +from petsc4py.PETSc import ScalarType + +from fenicsxconcrete.experimental_setup.base_experiment import Experiment +from fenicsxconcrete.finite_element_problem.linear_elasticity import LinearElasticity +from fenicsxconcrete.gaussian_random_field import Randomfield + + +class LinearElasticityGRF(LinearElasticity): + """Material definition for linear elasticity""" + + def __init__( + self, + experiment: Experiment, + parameters: dict[str, pint.Quantity], + pv_name: str = "pv_output_full", + pv_path: str = None, + ) -> None: + super().__init__(experiment, parameters, pv_name, pv_path) + # TODO There should be more elegant ways of doing this + self.pv_file = Path(pv_path) / (pv_name + ".xdmf") + + def setup(self) -> None: + self.field_function_space = df.fem.FunctionSpace(self.experiment.mesh, ("CG", 1)) + self.lambda_ = df.fem.Function(self.field_function_space) + self.mu = df.fem.Function(self.field_function_space) + + lame1, lame2 = self.get_lames_constants() + self.lambda_.vector[:] = lame1 + self.mu.vector[ + : + ] = lame2 # make this vector as a fenics constant array. Update the lame1 and lame2 in each iteration. + + # define function space ets. + self.V = df.fem.VectorFunctionSpace(self.mesh, ("Lagrange", self.p["degree"])) # 2 for quadratic elements + self.V_scalar = df.fem.FunctionSpace(self.mesh, ("Lagrange", self.p["degree"])) + + # Define variational problem + self.u_trial = ufl.TrialFunction(self.V) + self.v = ufl.TestFunction(self.V) + + # initialize L field, not sure if this is the best way... + zero_field = df.fem.Constant(self.mesh, ScalarType(np.zeros(self.p["dim"]))) + self.L = ufl.dot(zero_field, self.v) * ufl.dx + + # apply external loads + external_force = self.experiment.create_force_boundary(self.v) + if external_force: + self.L = self.L + external_force + + body_force = self.experiment.create_body_force(self.v) + if body_force: + self.L = self.L + body_force + + # boundary conditions only after function space + bcs = self.experiment.create_displacement_boundary(self.V) + + self.a = ufl.inner(self.sigma(self.u_trial), self.epsilon(self.v)) * ufl.dx + self.weak_form_problem = df.fem.petsc.LinearProblem( + self.a, + self.L, + bcs=bcs, + petsc_options={"ksp_type": "preonly", "pc_type": "lu"}, + ) + + # Random E and nu fields. + def random_field_generator( + self, + field_function_space, + cov_name, + mean, + correlation_length1, + correlation_length2, + variance, + no_eigen_values, + ktol, + ): + random_field = Randomfield( + field_function_space, + cov_name, + mean, + correlation_length1, + correlation_length2, + variance, + no_eigen_values, + ktol, + ) + # random_field.solve_covariance_EVP() + return random_field + + def parameters_conversion(self, lognormal_mean, lognormal_sigma): + from math import sqrt + + normal_mean = np.log(lognormal_mean / sqrt(1 + (lognormal_sigma / lognormal_mean) ** 2)) + normal_sigma = np.log(1 + (lognormal_sigma / lognormal_mean) ** 2) + return normal_mean, normal_sigma + + def get_lames_constants( + self, + ): + # Random E and nu fields. + E_mean, E_variance = self.parameters_conversion(self.p["E"], self.p["E_std"]) # 3 + Nu_mean, Nu_variance = self.parameters_conversion(self.p["nu"], self.p["nu_std"]) # 0.03 + + self.E_randomfield = self.random_field_generator( + self.field_function_space, + "squared_exp", + E_mean, + self.p["correlation_length_1"], + self.p["correlation_length_2"], + E_variance, + 3, + 0.01, + ) + self.E_randomfield.create_random_field(_type="random", _dist="LN") + + self.nu_randomfield = self.random_field_generator( + self.field_function_space, + "squared_exp", + Nu_mean, + self.p["correlation_length_1"], + self.p["correlation_length_2"], + Nu_variance, + 3, + 0.01, + ) + self.nu_randomfield.create_random_field(_type="random", _dist="LN") + + lame1 = (self.E_randomfield.field.vector[:] * self.nu_randomfield.field.vector[:]) / ( + (1 + self.nu_randomfield.field.vector[:]) * (1 - 2 * self.nu_randomfield.field.vector[:]) + ) + lame2 = self.E_randomfield.field.vector[:] / (2 * (1 + self.nu_randomfield.field.vector[:])) + return lame1, lame2 + + # TODO move this to sensor definition!?!?! + def pv_plot(self, t: pint.Quantity | float = 1) -> None: + """creates paraview output at given time step + + Args: + t: time point of output (default = 1) + """ + print("create pv plot for t", t) + try: + _t = t.magnitude + except: + _t = t + + self.displacement.name = "Displacement" + self.E_randomfield.field.name = "E_field" + self.nu_randomfield.field.name = "nu_field" + + with df.io.XDMFFile(self.mesh.comm, self.pv_output_file, "a") as f: + f.write_function(self.displacement, _t) + f.write_function(self.E_randomfield.field, _t) + f.write_function(self.nu_randomfield.field, _t) diff --git a/src/fenicsxconcrete/gaussian_random_field.py b/src/fenicsxconcrete/gaussian_random_field.py new file mode 100644 index 0000000..39fd563 --- /dev/null +++ b/src/fenicsxconcrete/gaussian_random_field.py @@ -0,0 +1,401 @@ +import logging + +import dolfinx +import numpy as np +import scipy as sc +import ufl + + +class Randomfield(object): + def __init__( + self, fct_space, cov_name="squared_exp", mean=0, rho1=1, rho2=1, sigma=1, k=None, ktol=None, _type="" + ): + """ + Class for random field + generates a random field using Karhunen Loeve decomposition + field of form: mean + sum_i sqr(lambda_i) EV_i xi_i + lambda_i, EV_i: eigenvalues and -vectors of covariance matrix C=c(r) + xi_i: new variables of random field (in general gaussian distributed N(0,1**2) + c(r=|x-y|): covariance function for distance between to space nodes x and y + + + Attributes: + logger: logger object + cov_name: name of covariance function to use + mean: mean of random field + rho1: correlation length x + rho2: correlation length z + sigma2: sigma**2 in covariance function + V: FE fct space for problem + k: number of eigenvectors to compute + ktol: tolerance to chose k (norm eigenvalue_k < ktol) + C: correlation matrix + M: mass matrix for eigenvalue problem + lambdas: eigenvalues + EV: eigenvectors + field: a representation of the field + values: values of the variables (set by user or choosen randomly) + values_means: values of the variables for asymptotic expansion to be set by user + _type: special case for correlation matrix + + Args: + fct_space: FE function space + cov_name: string with name of covariance function (e.g. exp, squared_exp ..) + mean: mean value of random field + rho1: correlation length x + rho2: correlation length z + sigma: covariance standard deviation + k: number of eigenvectors to compute + ktol: tolerance to chose k (norm eigenvalue_k < ktol) + _type: special cases + """ + + self.logger = logging.getLogger(__name__) + + self.cov_name = cov_name # name of covariance function to use + self.cov = getattr(self, "cov_" + cov_name) # select the right function + self.mean = mean # mean of random field + # self.rho = rho # correlation length + self.rho1 = rho1 # correlation length x + self.rho2 = rho2 # correlation length z + self.sigma2 = sigma**2 # sigma**2 in covariance function + self.V = fct_space # FE fct space for problem + if k: + self.k = k + self.ktol = ktol + + self.C = None # correlation matrix + self.M = None # mass matrix for eigenvalue problem + self.lambdas = [] # eigenvalues + self.EV = [] # eigenvectors + self.k = k # number of eigenvalues + self._type = _type # specail case for correlation matrix + + self.field = None # a representation of the field + self.values = np.zeros(self.k) # values of the variables (set by user or choosen randomly) + + self.values_means = np.zeros(self.k) # values of the variables for asymptotic expansion to be set by user!! + + def __str__(self): + """ + Overloaded description of the field. + """ + name = self.__class__.__name__ + name += "random field with cov fct %s, mean %s, rho1 %s, rho2 %s, k %s, sig2 %s" + return name % (self.cov_name, self.mean, self.rho1, self.rho2, self.k, self.sigma2) + + def __repr__(self): + """ + Overloaded description of the field. + """ + + return str(self) + + def cov_exp(self, r): + """ + Exponential covariance function: sig^2 exp(-r/rho) + + Args: + r: correlation lenghth + + Returns: + The covarinace sig^2 exp(-r/rho) + """ + # TODO: This does not work if rho + return self.sigma2 * np.exp(-1 / self.rho * r) + + def cov_squared_exp(self, r1, r2): + """ + 2D-Exponential covariance function + + Args: + r1: correlation length in x + r2: correlation length in y + + Returns: + The covariance Squared Exponential covariance function + """ + return self.sigma2 * np.exp((-1 / (2 * self.rho1**2) * r1**2) - (1 / (2 * self.rho2**2) * r2**2)) + + def generate_C(self): + """ + Generate the covariance matrix for the random field representation + based on tutorial at http://www.wias-berlin.de/people/marschall/lesson3.html + + Returns: + self + """ + + # directly use dof coordinates (dof_to_vertex map does not work in higher order spaces) + coords = self.V.tabulate_dof_coordinates() + + self.logger.debug("shape coordinates %s", coords.shape) + + # evaluate covariance matrix + L = coords.shape[0] + c0 = np.repeat(coords, L, axis=0) + c1 = np.tile(coords, [L, 1]) + # TODO: This does not work if "x" + if self._type == "x": + r = np.absolute(c0[:, 0] - c1[:, 0]) + else: + r1 = c0[:, 0] - c1[:, 0] + r2 = c0[:, 1] - c1[:, 1] + C = self.cov(r1, r2) + + C.shape = [L, L] + + self.C = np.copy(C) + + return self + + def solve_covariance_EVP(self): + """ + Solve generalized eigenvalue problem to generate decomposition of C + based on tutorial at http://www.wias-berlin.de/people/marschall/lesson3.html + + Returns: + self + """ + + def get_A(A, B): + return np.dot(A, np.dot(B, A)) + + self.generate_C() + + # mass matrix + u = ufl.TrialFunction(self.V) + v = ufl.TestFunction(self.V) + + # assemble mass matrix and convert to scipy + aa = dolfinx.fem.form(ufl.dot(u, v) * ufl.dx) + M = dolfinx.fem.assemble_matrix(aa) # dolfin.assemble(u * v * ufl.dx) + self.M = M + self.M = M.to_dense() + + self.logger.debug("shape of M %s", self.M.shape) + + # solve generalized eigenvalue problem + A = get_A(self.M, self.C) + + self.logger.debug("shape of A %s", A.shape) + w, v = sc.linalg.eigh(A, self.M) + + self.logger.info("EVP size: %s, %s, %s", A.shape, w.shape, v.shape) + + # start with largest eigenvalue + w_reverse = w[::-1] + v_reverse = np.flip(v, 1) + + # compute self.k if self.ktol is given + if self.ktol != None: + self.logger.debug("self.ktol %s", self.ktol) + normed_lambdas = w_reverse / w_reverse[0] + self.logger.debug("normed_lambdas %s", normed_lambdas) + self.k = np.argmax(normed_lambdas <= self.ktol) + 1 # EV with smaller ktol will not be used + self.logger.info("required number of modes is %s (according tolerance %s)", self.k, self.ktol) + if self.k == 0: + raise ValueError('cannot select enough modes - tolerance "ktol" to small') + + # selected vectors / all values for plotting afterwards + self.lambdas = w_reverse + self.EV = v_reverse[:, 0 : self.k] + + self.logger.debug("eigenvalues %s", self.lambdas) + + return self + + def create_random_field(self, _type="random", _dist="N", plot=False, evp="generalized"): + """ + Create a fixed random field using random or fixed values for the new variables + + Args: + _type: how to choose the values for the random field variables (random or given) + _dist: which type of random field if 'N' standard gaussian if 'LN' lognormal field computed as exp(gauss field) + plot: if True plot eigenvalue decay + + Returns + Self + """ + + # generate decomposition: + + if len(self.lambdas) <= self.k: + if evp == "generalized": + self.solve_covariance_EVP() # generalized eigenvalue problem + elif evp == "standard": + self.solve_covariance_EVP_02() # standard eigenvalue problem + else: + raise ValueError(f"Eigenvalue problems can only be in standard or generalized form. Introduced {evp}.") + # else already computed + + if plot: + import matplotlib.pyplot as plt + + plt.figure() + plt.title("Eigenvalue decay of covariance matrix") + plt.semilogy(np.arange(len(self.lambdas)) + 1, self.lambdas) + plt.axvline(x=self.k - 1) + plt.show() + + # generate representations of variables + # create the output field gaussian random field + self.field = dolfinx.fem.Function(self.V) + new_data = np.zeros(len(self.field.vector[:])) + new_data += np.copy(self.mean) + + if _type == "random": + # random selection of gaussian variables + self.values = np.random.normal(0, 1, self.k) + self.logger.info("choose random values for xi %s", self.values) + elif _type == "given": + self.logger.info("choose given values for xi %s", self.values) + + # compute field representation with number of modes given in self.k or by self.ktol + self.lambdas = self.lambdas[0 : self.k] + self.EV = self.EV[:, 0 : self.k] + new = np.dot(self.EV, np.diag(np.sqrt(self.lambdas))) + new_data += np.dot(new, self.values) + + if _dist == "N": + self.logger.info("N given -> computes standard gauss field") + self.field.vector[:] = new_data[:] + elif _dist == "LN": + self.logger.info("LN given -> computes exp(gauss field)") + self.field.vector[:] = np.exp(new_data[:]) + else: + self.logger.error("distribution type <%s> not implemented", _dist) + raise ValueError("distribution type not implemented") + + return self + + def modes(self, num, plot=False): + """ + Create fenics functions for the first 'num' modes of the random field (= sqrt(\lambda)*eigenvector) + + Args: + num: specify number of functions needed + plot: plots the eigenvalue decay and modes if true + Returns: + out: Output field + """ + + # output field + out = list() + # check if eigenvectors already computed + if len(self.lambdas) < num: + self.solve_covariance_EVP() # generalized eigenvalue problem + + # transform discrete EV to fenics functions + for i in range(num): + fct = dolfinx.fem.Function(self.V) + fct.vector[:] = self.EV[:, i] * np.sqrt(self.lambdas[i]) + out.append(fct) + + self.logger.info( + f"eigenvalue[{self.k}-1]/eigenvalue[0]: {self.lambdas[self.k-1]/self.lambdas[0]}; eigenvalue[{self.k}-1] = {self.lambdas[self.k-1]} " + ) + + if plot: + import matplotlib.pyplot as plt + + plt.figure(1) + plt.title("Eigenvalue decay of covariance matrix") + plt.semilogy(np.arange(len(self.lambdas)) + 1, 1 / self.lambdas[0] * self.lambdas, "-*r", label="normed") + plt.axvline(x=num) # start with 1!! + # plt.show() + if self.V.mesh().topology().dim() == 1: + plt.figure(2) + plt.title("Eigenvectors \Phi_i \sqrt(\lambda_i)") + for i in range(num): + plt.plot( + out[i].function_space().tabulate_dof_coordinates()[:], + out[i].vector()[:], + "*", + label="EV %s" % (i), + ) # plotting over dof coordinates!! only as points because order not ongoing + plt.legend() + else: + for i in range(num): + plt.figure("10" + str(i)) + dolfinx.plot(self.V.mesh()) + plt_mode = dolfinx.plot(out[i]) + plt.colorbar(plt_mode) + plt.title("Eigen mode scaled with \sqrt(\lambda_i) %s" % (i)) + + plt.show() + + self.mode_data = out + + return out + + def save_modes_txt(self, file): + """ + Save modes in txt file for 1D problems + + Args: + file: filedirectory name + + Returns: + True if success + """ + self.logger.info("saving modes in txt file %s", file) + try: + a = len(self.mode_data) + except: + # generate modes + out = self.modes(self.k) + a = len(self.mode_data) + + if self.V.mesh().topology().dim() == 1: + x = self.V.tabulate_dof_coordinates()[:] + data_out = np.zeros((len(x), a)) + for i in range(a): + data_out[:, i] = self.mode_data[i].vector()[:] + + np.savetxt(file + ".txt", np.c_[x, data_out]) + + if self.V.mesh().topology().dim() > 1: + # save as pxdmf file + ##file_pvd=dolfin.File(file+'.pvd') + # file_xdmf = dolfin.XDMFFile(file+'.xdmf') + for i in range(a): + self.mode_data[i].rename("E_i", "E_i") + ##file_pvd << self.mode_data[i],i + # file_xdmf.write(self.mode_data[i],i) + + return True + + def solve_covariance_EVP_02(self): + """ + Solve eigenvalue problem assuming massmatrix == I --> standard eigenvalue problem + to generate decomposition of C + Returns: + self + """ + + self.generate_C() + + # solve generalized eigenvalue problem + A = self.C + w, v = np.linalg.eigh(A) # solve standard eigenvalue problem (faster) Eigenvalues in increasing order!! + + self.logger.info("EVP size: %s, %s, %s", A.shape, w.shape, v.shape) + + # start with largest eigenvalue + w_reverse = w[::-1] + v_reverse = np.flip(v, 1) + + # compute self.k if self.ktol is given + if self.ktol != None: + normed_lambdas = w_reverse / w_reverse[0] + self.logger.debug("normed_lambdas %s", normed_lambdas) + self.k = np.argmax(normed_lambdas <= self.ktol) + 1 # index starts with 0 + self.logger.info("required number of modes is %s (according tolerance %s)", self.k, self.ktol) + + # selected vectors / all values for plotting afterwards + self.lambdas = w_reverse + self.EV = v_reverse[:, 0 : self.k] + + self.logger.info("eigenvalues %s", self.lambdas) + return self diff --git a/src/fenicsxconcrete/sensor_definition/base_sensor.py b/src/fenicsxconcrete/sensor_definition/base_sensor.py index 8af3897..7066be8 100644 --- a/src/fenicsxconcrete/sensor_definition/base_sensor.py +++ b/src/fenicsxconcrete/sensor_definition/base_sensor.py @@ -1,5 +1,6 @@ from __future__ import annotations +import os from abc import ABC, abstractmethod import pint @@ -45,6 +46,16 @@ def measure(self): def base_unit(): """Defines the base unit of this sensor""" + def report_metadata(self) -> dict: + """Generates dictionary with the metadata of this sensor""" + metadata = {} + metadata["id"] = self.name + metadata["type"] = self.__class__.__name__ + metadata["sensor_file"] = os.path.splitext(os.path.basename(__file__))[0] + metadata["units"] = f"{self.units._units}" + metadata["dimensionality"] = f"{self.units.dimensionality}" + return metadata + def get_data_list(self) -> pint.Quantity[list]: """Returns the measured data with respective unit @@ -142,3 +153,10 @@ def measure(self): @abstractmethod def base_unit(): """Defines the base unit of this sensor, must be specified by child""" + + def report_metadata(self) -> dict: + """Generates dictionary with the metadata of this sensor""" + metadata = super().report_metadata() + metadata["sensor_file"] = os.path.splitext(os.path.basename(__file__))[0] + metadata["where"] = self.where + return metadata diff --git a/src/fenicsxconcrete/sensor_definition/displacement_sensor.py b/src/fenicsxconcrete/sensor_definition/displacement_sensor.py index e769552..f6d7562 100644 --- a/src/fenicsxconcrete/sensor_definition/displacement_sensor.py +++ b/src/fenicsxconcrete/sensor_definition/displacement_sensor.py @@ -1,3 +1,5 @@ +import os + import dolfinx as df from fenicsxconcrete.finite_element_problem.base_material import MaterialProblem @@ -45,6 +47,12 @@ def measure(self, problem: MaterialProblem, t: float = 1.0) -> None: self.data.append(displacement_data) self.time.append(t) + def report_metadata(self) -> dict: + """Generates dictionary with the metadata of this sensor""" + metadata = super().report_metadata() + metadata["sensor_file"] = os.path.splitext(os.path.basename(__file__))[0] + return metadata + @staticmethod def base_unit() -> ureg: """Defines the base unit of this sensor diff --git a/src/fenicsxconcrete/sensor_definition/reaction_force_sensor.py b/src/fenicsxconcrete/sensor_definition/reaction_force_sensor.py index f32b383..88b3bd3 100644 --- a/src/fenicsxconcrete/sensor_definition/reaction_force_sensor.py +++ b/src/fenicsxconcrete/sensor_definition/reaction_force_sensor.py @@ -1,3 +1,4 @@ +import os from collections.abc import Callable import dolfinx as df @@ -88,6 +89,13 @@ def measure(self, problem: MaterialProblem, t: float = 1.0) -> None: self.data.append(reaction_force_vector) self.time.append(t) + def report_metadata(self) -> dict: + """Generates dictionary with the metadata of this sensor""" + metadata = super().report_metadata() + metadata["surface"] = self.surface.__name__ + metadata["sensor_file"] = os.path.splitext(os.path.basename(__file__))[0] + return metadata + @staticmethod def base_unit() -> ureg: """Defines the base unit of this sensor diff --git a/src/fenicsxconcrete/sensor_definition/sensor_schema.py b/src/fenicsxconcrete/sensor_definition/sensor_schema.py new file mode 100644 index 0000000..65fe49c --- /dev/null +++ b/src/fenicsxconcrete/sensor_definition/sensor_schema.py @@ -0,0 +1,123 @@ +def generate_sensor_schema() -> dict: + """Function that returns the sensor schema. Necessary to include the schema in the package accessible. + + Returns: + Schema for sensor's list metadata + """ + + schema = { + "$schema": "http://json-schema.org/2020-12/schema#", + "title": "SensorsList", + "type": "object", + "properties": { + "sensors": { + "type": "array", + "items": { + "oneOf": [ + {"$ref": "#/definitions/BaseSensor"}, + {"$ref": "#/definitions/PointSensor"}, + {"$ref": "#/definitions/DisplacementSensor"}, + {"$ref": "#/definitions/ReactionForceSensor"}, + {"$ref": "#/definitions/StrainSensor"}, + {"$ref": "#/definitions/StressSensor"}, + ] + }, + } + }, + "definitions": { + "baseSensorProperties": { + "type": "object", + "properties": { + "id": {"type": "string", "description": "A unique identifier for the sensor"}, + "type": {"type": "string", "description": "The python class for the sensor"}, + "units": {"type": "string", "description": "The unit of measurement for the sensor"}, + "dimensionality": { + "type": "string", + "description": "The dimensionality of measurement for the sensor between brackets []", + }, + "sensor_file": { + "type": "string", + "description": "Python file where the sensor is defined whithout extension", + }, + }, + "required": ["id", "type", "units", "dimensionality"], + }, + "pointSensorProperties": { + "allOf": [ + {"$ref": "#/definitions/baseSensorProperties"}, + { + "type": "object", + "properties": {"where": {"type": "array", "description": "Location of the sensor"}}, + "required": ["where"], + }, + ] + }, + "BaseSensor": { + "allOf": [ + {"$ref": "#/definitions/baseSensorProperties"}, + { + "type": "object", + "properties": {"type": {"const": "BaseSensor", "description": "The type of sensor"}}, + "required": ["type"], + }, + ] + }, + "PointSensor": { + "allOf": [ + {"$ref": "#/definitions/pointSensorProperties"}, + { + "type": "object", + "properties": {"type": {"const": "PointSensor", "description": "The type of sensor"}}, + "required": ["type"], + }, + ] + }, + "DisplacementSensor": { + "allOf": [ + {"$ref": "#/definitions/pointSensorProperties"}, + { + "type": "object", + "properties": {"type": {"const": "DisplacementSensor", "description": "The type of sensor"}}, + "required": ["type"], + }, + ] + }, + "ReactionForceSensor": { + "allOf": [ + {"$ref": "#/definitions/baseSensorProperties"}, + { + "type": "object", + "properties": { + "type": {"const": "ReactionForceSensor", "description": "The type of sensor"}, + "surface": { + "type": "string", + "description": "Surface where the reactionforce is measured", + }, + }, + "required": ["type", "surface"], + }, + ] + }, + "StrainSensor": { + "allOf": [ + {"$ref": "#/definitions/pointSensorProperties"}, + { + "type": "object", + "properties": {"type": {"const": "StrainSensor", "description": "The type of sensor"}}, + "required": ["type"], + }, + ] + }, + "StressSensor": { + "allOf": [ + {"$ref": "#/definitions/pointSensorProperties"}, + { + "type": "object", + "properties": {"type": {"const": "StressSensor", "description": "The type of sensor"}}, + "required": ["type"], + }, + ] + }, + }, + } + return schema diff --git a/src/fenicsxconcrete/sensor_definition/strain_sensor.py b/src/fenicsxconcrete/sensor_definition/strain_sensor.py index 72293ad..5ef406c 100644 --- a/src/fenicsxconcrete/sensor_definition/strain_sensor.py +++ b/src/fenicsxconcrete/sensor_definition/strain_sensor.py @@ -1,3 +1,5 @@ +import os + import dolfinx as df import ufl @@ -59,6 +61,12 @@ def measure(self, problem: MaterialProblem, t: float = 1.0) -> None: self.data.append(strain_data) self.time.append(t) + def report_metadata(self) -> dict: + """Generates dictionary with the metadata of this sensor""" + metadata = super().report_metadata() + metadata["sensor_file"] = os.path.splitext(os.path.basename(__file__))[0] + return metadata + @staticmethod def base_unit() -> ureg: """Defines the base unit of this sensor diff --git a/src/fenicsxconcrete/sensor_definition/stress_sensor.py b/src/fenicsxconcrete/sensor_definition/stress_sensor.py index ef69a85..519b91c 100644 --- a/src/fenicsxconcrete/sensor_definition/stress_sensor.py +++ b/src/fenicsxconcrete/sensor_definition/stress_sensor.py @@ -1,3 +1,5 @@ +import os + import dolfinx as df import ufl @@ -58,6 +60,12 @@ def measure(self, problem: MaterialProblem, t: float = 1.0) -> None: self.data.append(stress_data) self.time.append(t) + def report_metadata(self) -> dict: + """Generates dictionary with the metadata of this sensor""" + metadata = super().report_metadata() + metadata["sensor_file"] = os.path.splitext(os.path.basename(__file__))[0] + return metadata + @staticmethod def base_unit() -> ureg: """Defines the base unit of this sensor diff --git a/tests/finite_element_problem/test_base_material.py b/tests/finite_element_problem/test_base_material.py index 6084fd4..0083c34 100644 --- a/tests/finite_element_problem/test_base_material.py +++ b/tests/finite_element_problem/test_base_material.py @@ -1,3 +1,8 @@ +import json +import os +from copy import deepcopy +from pathlib import Path + import pytest from fenicsxconcrete.experimental_setup.cantilever_beam import CantileverBeam @@ -63,7 +68,26 @@ def test_sensor_options() -> None: problem.solve() # check that some data is in sensor - assert problem.sensors[sensor.name].data != [] + measure = deepcopy(problem.sensors[sensor.name].data) + assert measure != [] + + # check export sensor data + problem.export_sensors_metadata(Path("sensors_metadata.json")) + expected_metadata = { + "sensors": [ + { + "id": "DisplacementSensor", + "type": "DisplacementSensor", + "sensor_file": "displacement_sensor", + "units": "meter", + "dimensionality": "[length]", + "where": [1, 0.0, 0.0], + } + ] + } + with open("sensors_metadata.json", "r") as f: + sensor_metadata = json.load(f) + assert sensor_metadata == expected_metadata # check cleaning of sensor data problem.clean_sensor_data() @@ -72,3 +96,14 @@ def test_sensor_options() -> None: # delete sensor problem.delete_sensor() assert problem.sensors == {} + + # check import sensor data + problem.import_sensors_from_metadata(Path("sensors_metadata.json")) + + os.remove("sensors_metadata.json") + + # repeat solving and plotting + problem.solve() + + # repeat check that some data is in imported sensor + assert problem.sensors[sensor.name].data[0] == pytest.approx(measure[0]) diff --git a/tests/sensor_definition/test_point_sensors.py b/tests/sensor_definition/test_point_sensors.py index 12bb452..9ea4a2f 100644 --- a/tests/sensor_definition/test_point_sensors.py +++ b/tests/sensor_definition/test_point_sensors.py @@ -27,3 +27,8 @@ def test_point_sensor(point_sensor) -> None: # check that something is stored data = fem_problem.sensors[sensor.name].get_last_entry() assert data is not None + + # check that location metadata is reported correctly + # other metadata tested in test_sensors.py + metadata = sensor.report_metadata() + assert metadata["where"] == sensor_location diff --git a/tests/sensor_definition/test_reaction_force_sensor.py b/tests/sensor_definition/test_reaction_force_sensor.py index 2dd386b..8fc10f7 100644 --- a/tests/sensor_definition/test_reaction_force_sensor.py +++ b/tests/sensor_definition/test_reaction_force_sensor.py @@ -77,6 +77,12 @@ def test_full_boundary_reaction(dim: int, degree: int) -> None: assert force_top == pytest.approx(-1 * force_bottom) # checking equal forces on sides assert force_left == pytest.approx(force_bottom) + # checking report metadata + # TODO Figure out how to identify which boundary is applied + assert fem_problem.sensors.ReactionForceSensor.report_metadata()["surface"] == "boundary" + assert fem_problem.sensors.ReactionForceSensor2.report_metadata()["surface"] == "boundary" + assert fem_problem.sensors.ReactionForceSensor3.report_metadata()["surface"] == "boundary" + assert fem_problem.sensors.ReactionForceSensor4.report_metadata()["surface"] == "boundary" if dim == 3: force_front = fem_problem.sensors.ReactionForceSensor5.get_last_entry().magnitude[1] @@ -86,6 +92,9 @@ def test_full_boundary_reaction(dim: int, degree: int) -> None: assert force_front == pytest.approx(-1 * force_back) # checking equal forces left-front assert force_left == pytest.approx(force_front) + # checking report metadata + assert fem_problem.sensors.ReactionForceSensor5.report_metadata()["surface"] == "boundary" + assert fem_problem.sensors.ReactionForceSensor6.report_metadata()["surface"] == "boundary" @pytest.mark.parametrize("dim", [2, 3]) diff --git a/tests/sensor_definition/test_sensors.py b/tests/sensor_definition/test_sensors.py index 65be7b9..bf72f2b 100644 --- a/tests/sensor_definition/test_sensors.py +++ b/tests/sensor_definition/test_sensors.py @@ -41,6 +41,16 @@ def test_base_sensor() -> None: assert u_sensor.get_last_entry().units == ureg.millimeter # check magnitude assert m_data.magnitude == pytest.approx(mm_data.magnitude / 1000) + # testing metadata report + metadata = u_sensor.report_metadata() + true_metadata = { + "id": "DisplacementSensor", + "type": "DisplacementSensor", + "units": "millimeter", + "dimensionality": "[length]", + } + for key in true_metadata: + assert key in metadata and true_metadata[key] == metadata[key] @pytest.mark.parametrize("sensor", [DisplacementSensor, ReactionForceSensor, StressSensor, StrainSensor])