diff --git a/emmet-core/emmet/core/neb.py b/emmet-core/emmet/core/neb.py new file mode 100644 index 0000000000..fcd8546096 --- /dev/null +++ b/emmet-core/emmet/core/neb.py @@ -0,0 +1,372 @@ +"""Schemas and utils for NEB calculations.""" + +from datetime import datetime +import numpy as np +from pathlib import Path +from pydantic import BaseModel, Field, model_validator +from scipy.interpolate import CubicSpline +from typing import Optional, Tuple, Union, Sequence, Any +from typing_extensions import Self + +from monty.os.path import zpath +from pymatgen.core import Structure + +from emmet.core.tasks import ( + _VOLUMETRIC_FILES, + _find_vasp_files, + _parse_custodian, + _parse_orig_inputs, + InputDoc, + CustodianDoc, + OrigInputs, +) +from emmet.core.utils import ValueEnum, utcnow +from emmet.core.vasp.calculation import Calculation +from emmet.core.vasp.task_valid import TaskState + + +class NebMethod(ValueEnum): + """Common methods for NEB calculations. + + TODO: convert to StrEnum + """ + + STANDARD = "standard" + CLIMBING_IMAGE = "climbing_image" + APPROX = "approxNEB" + + +class NebTaskDoc(BaseModel, extra="allow"): + """Define schema for VASP NEB tasks.""" + + endpoint_structures: Sequence[Structure] = Field( + None, + description="The initial and final configurations (reactants and products) of the barrier.", + ) + endpoint_energies: Optional[Sequence[float]] = Field( + None, description="Energies of the endpoint structures." + ) + endpoint_calculations: Optional[list[Calculation]] = Field( + None, description="Calculation information for the endpoint structures" + ) + endpoint_objects: Optional[list[dict]] = Field( + None, description="VASP objects for each endpoint calculation." + ) + endpoint_directories: Optional[list[str]] = Field( + None, description="List of the directories for the endpoint calculations." + ) + + image_structures: list[Structure] = Field( + None, description="Final structures for each NEB images." + ) + image_energies: Optional[list[float]] = Field( + None, description="Final energies for each image" + ) + image_calculations: Optional[list[Calculation]] = Field( + None, description="Full calculation output for the NEB images." + ) + dir_name: str = Field(None, description="Top-level NEB calculation directory.") + + image_directories: list[str] = Field( + None, description="List of the directories where the NEB images are located" + ) + image_objects: Optional[dict[int, dict]] = Field( + None, description="VASP objects for each image calculation." + ) + + orig_inputs: Optional[OrigInputs] = Field( + None, + description="The exact set of input parameters used to generate the current task document.", + ) + + inputs: Optional[InputDoc] = Field( + None, description="Inputs used in this calculation." + ) + + neb_method: Optional[NebMethod] = Field( + None, description="The NEB method used for this calculation." + ) + + state: Optional[TaskState] = Field(None, description="State of this calculation") + + custodian: Optional[list[CustodianDoc]] = Field( + None, + description="Detailed custodian data for each VASP calculation contributing to the task document.", + ) + + last_updated: Optional[datetime] = Field( + utcnow(), + description="Timestamp for the most recent calculation for this task document", + ) + + completed_at: Optional[datetime] = Field( + None, description="Timestamp for when this task was completed" + ) + + forward_barrier: Optional[float] = Field( + None, + description=( + "Forward barrier for this reaction, " + "i.e., the transition state energy minus " + "the reactant / initial configuration energy." + ), + ) + + reverse_barrier: Optional[float] = Field( + None, + description=( + "Reverse barrier for this reaction, " + "i.e., the transition state energy minus " + "the product / final configuration energy." + ), + ) + + barrier_analysis: Optional[dict[str, Any]] = Field( + None, description="Analysis of the reaction barrier." + ) + + @model_validator(mode="after") + def set_barriers(self) -> Self: + """Perform analysis on barrier if needed.""" + if not self.forward_barrier or not self.reverse_barrier: + self.barrier_analysis = neb_barrier_spline_fit(self.energies) + for k in ("forward", "reverse"): + setattr(self, f"{k}_barrier", self.barrier_analysis[f"{k}_barrier"]) + return self + + @property + def num_images(self) -> int: + """Return the number of VASP calculations / number of images performed.""" + return len(self.image_directories) + + @property + def energies(self) -> list[float]: + """Return the endpoint (optional) and image energies.""" + if self.endpoint_energies is not None: + return [ + self.endpoint_energies[0], + *self.image_energies, + self.endpoint_energies[1], + ] + return self.image_energies + + @property + def structures(self) -> list[Structure]: + """Return the endpoint and image structures.""" + return [ + self.endpoint_structures[0], + *self.image_structures, + self.endpoint_structures[1], + ] + + @classmethod + def from_directory( + cls, + dir_name: Union[Path, str], + volumetric_files: Tuple[str, ...] = _VOLUMETRIC_FILES, + store_calculations: bool = True, + **neb_task_doc_kwargs, + ) -> Self: + """ + Return an NebTaskDoc from a single NEB calculation directory. + + This method populates only the image energies and calculations fields, + and the endpoint structures. + """ + if isinstance(dir_name, str): + dir_name = Path(dir_name) + + neb_directories = sorted(dir_name.glob("[0-9][0-9]")) + + if (ep_calcs := neb_task_doc_kwargs.pop("endpoint_calculations", None)) is None: + endpoint_directories = [neb_directories[0], neb_directories[-1]] + endpoint_structures = [ + Structure.from_file(zpath(f"{endpoint_dir}/POSCAR")) + for endpoint_dir in endpoint_directories + ] + endpoint_energies = None + else: + endpoint_directories = neb_task_doc_kwargs.pop("endpoint_directories") + endpoint_structures = [ep_calc.output.structure for ep_calc in ep_calcs] + endpoint_energies = [ep_calc.output.energy for ep_calc in ep_calcs] + + image_directories = neb_directories[1:-1] + + image_calculations = [] + image_structures = [] + image_objects = {} + for iimage, image_dir in enumerate(image_directories): + vasp_files = _find_vasp_files(image_dir, volumetric_files=volumetric_files) + + calc, image_objects[iimage] = Calculation.from_vasp_files( + dir_name=image_dir, + task_name=f"NEB image {iimage + 1}", + vasprun_file=vasp_files["standard"]["vasprun_file"], + outcar_file=vasp_files["standard"]["outcar_file"], + contcar_file=vasp_files["standard"]["contcar_file"], + volumetric_files=vasp_files["standard"].get("volumetric_files", []), + oszicar_file=vasp_files["standard"].get("oszicar_file", None), + vasprun_kwargs={ + "parse_potcar_file": False, + }, + ) + image_calculations.append(calc) + image_structures.append(calc.output.structure) + + calcs_to_check = image_calculations + (ep_calcs or []) + + task_state = ( + TaskState.SUCCESS + if all( + calc.has_vasp_completed == TaskState.SUCCESS for calc in calcs_to_check + ) + else TaskState.FAILED + ) + + inputs = {} + for suffix in (None, ".orig"): + vis = { + k.lower(): v + for k, v in _parse_orig_inputs(dir_name, suffix=suffix).items() + } + if (potcar_spec := vis.get("potcar")) is not None: + vis["potcar_spec"] = potcar_spec + vis["potcar"] = [spec.titel for spec in potcar_spec] + + if suffix is None: + inputs["inputs"] = InputDoc( + **vis, magnetic_moments=vis.get("incar", {}).get("MAGMOM") + ) + else: + inputs["orig_inputs"] = OrigInputs(**vis) + + neb_method = ( + NebMethod.CLIMBING_IMAGE + if inputs["inputs"].incar.get("LCLIMB", False) + else NebMethod.STANDARD + ) + + return cls( + endpoint_structures=endpoint_structures, + endpoint_energies=endpoint_energies, + endpoint_directories=[str(ep_dir) for ep_dir in endpoint_directories], + endpoint_calculations=ep_calcs if store_calculations else None, + image_calculations=image_calculations if store_calculations else None, + image_structures=image_structures, + dir_name=str(dir_name), + image_directories=[str(img_dir) for img_dir in image_directories], + orig_inputs=inputs["orig_inputs"], + inputs=inputs["inputs"], + image_objects=image_objects, + neb_method=neb_method, # type: ignore[arg-type] + state=task_state, + image_energies=[calc.output.energy for calc in image_calculations], + custodian=_parse_custodian(dir_name), + completed_at=max(calc.completed_at for calc in calcs_to_check), + **neb_task_doc_kwargs, + ) + + @classmethod + def from_directories( + cls, + endpoint_directories: list[str | Path], + neb_directory: str | Path, + volumetric_files: Tuple[str, ...] = _VOLUMETRIC_FILES, + **neb_task_doc_kwargs, + ) -> Self: + """ + Return an NebTaskDoc from endpoint and NEB calculation directories. + + This method populates the endpoint and image fields completely, + permitting an analysis of the barrier. + """ + endpoint_calculations = [None for _ in range(2)] + endpoint_objects = [None for _ in range(2)] + for idx, endpoint_dir in enumerate(endpoint_directories): + vasp_files = _find_vasp_files( + endpoint_dir, volumetric_files=volumetric_files + ) + ep_key = ( + "standard" + if vasp_files.get("standard") + else "relax" + + str( + max( + int(k.split("relax")[-1]) + for k in vasp_files + if k.startswith("relax") + ) + ) + ) + + ( + endpoint_calculations[idx], + endpoint_objects[idx], + ) = Calculation.from_vasp_files( + dir_name=endpoint_dir, + task_name=f"NEB endpoint {idx + 1}", + vasprun_file=vasp_files[ep_key]["vasprun_file"], + outcar_file=vasp_files[ep_key]["outcar_file"], + contcar_file=vasp_files[ep_key]["contcar_file"], + volumetric_files=vasp_files[ep_key].get("volumetric_files", []), + oszicar_file=vasp_files[ep_key].get("oszicar_file", None), + vasprun_kwargs={ + "parse_potcar_file": False, + }, + ) + + return cls.from_directory( + neb_directory, + volumetric_files=volumetric_files, + endpoint_calculations=endpoint_calculations, + endpoint_objects=endpoint_objects, + endpoint_directories=endpoint_directories, + **neb_task_doc_kwargs, + ) + + +def neb_barrier_spline_fit( + energies: Sequence[float], + spline_kwargs: dict | None = None, + frame_match_tol: float = 1.0e-6, +) -> dict[str, Any]: + """ + Define basic NEB analysis tools. + + Parameters + ---------- + energies : Sequence[float] + The energies sorted by increasing frame index. Must include endpoints. + frame_match_tol : float = 1.e-6 + The tolerance for matching the transition state frame index to the + input frame indices. + """ + analysis: dict[str, Any] = { + "energies": list(energies), + "frame_index": list(frame_idx := np.linspace(0.0, 1.0, len(energies))), + } + energies = np.array(energies) + + spline_kwargs = spline_kwargs or {"bc_type": "clamped"} + spline_fit = CubicSpline(frame_idx, energies, **spline_kwargs) + analysis["cubic_spline_pars"] = list(spline_fit.c) + + crit_points = spline_fit.derivative().roots() + analysis["ts_frame_index"] = -1 + analysis["ts_energy"] = -np.inf + for crit_point in crit_points: + if (energy := spline_fit(crit_point)) > analysis["ts_energy"] and spline_fit( + crit_point, 2 + ) <= 0.0: + analysis["ts_frame_index"] = crit_point + analysis["ts_energy"] = float(energy) + + analysis["ts_in_frames"] = any( + abs(analysis["ts_frame_index"] - frame_idx) + < frame_match_tol * max(frame_idx, frame_match_tol) + for frame_idx in frame_idx + ) + analysis["forward_barrier"] = analysis["ts_energy"] - energies[0] + analysis["reverse_barrier"] = analysis["ts_energy"] - energies[-1] + + return analysis diff --git a/emmet-core/emmet/core/tasks.py b/emmet-core/emmet/core/tasks.py index 2d994dd58a..b8b71b567c 100644 --- a/emmet-core/emmet/core/tasks.py +++ b/emmet-core/emmet/core/tasks.py @@ -926,7 +926,7 @@ def _parse_custodian(dir_name: Path) -> Optional[Dict]: def _parse_orig_inputs( - dir_name: Path, + dir_name: Path, suffix: str | None = ".orig" ) -> Dict[str, Union[Kpoints, Poscar, PotcarSpec, Incar]]: """ Parse original input files. @@ -938,6 +938,8 @@ def _parse_orig_inputs( ---------- dir_name Path to calculation directory. + suffix : str or None = ".orig" + The suffix of the original input files to use. Returns ------- @@ -951,9 +953,13 @@ def _parse_orig_inputs( "POTCAR": VaspPotcar, "POSCAR": Poscar, } - for filename in dir_name.glob("*.orig*"): + suffix = suffix or "" + for filename in dir_name.glob("*".join(f"{suffix}.".split("."))): + if "POTCAR.spec" in str(filename): + # Can't parse POTCAR.spec files + continue for name, vasp_input in input_mapping.items(): - if f"{name}.orig" in str(filename): + if f"{name}{suffix}" in str(filename): if name == "POTCAR": # can't serialize POTCAR orig_inputs[name.lower()] = PotcarSpec.from_potcar( diff --git a/emmet-core/tests/test_neb.py b/emmet-core/tests/test_neb.py new file mode 100644 index 0000000000..72c808b545 --- /dev/null +++ b/emmet-core/tests/test_neb.py @@ -0,0 +1,120 @@ +"""Test NEB document class.""" + +from datetime import datetime +from pathlib import Path +import pytest +import shutil +from tempfile import TemporaryDirectory + +from monty.serialization import loadfn +from pymatgen.core import Structure + +from emmet.core.neb import NebTaskDoc, NebMethod +from emmet.core.tasks import InputDoc, OrigInputs +from emmet.core.utils import jsanitize +from emmet.core.vasp.calculation import Calculation + +from tests.conftest import assert_schemas_equal + + +@pytest.mark.parametrize("from_dir", [True, False]) +def test_neb_doc(test_dir, from_dir: bool): + if from_dir: + with TemporaryDirectory() as tmpdir: + shutil.unpack_archive(test_dir / "neb_sample_calc.zip", tmpdir, "zip") + neb_doc = NebTaskDoc.from_directory(Path(tmpdir) / "neb") + neb_doc_dict = jsanitize(neb_doc) + else: + neb_doc_dict = loadfn(test_dir / "Si_neb_doc.json.bz2", cls=None) + for k in ( + "completed_at", + "last_updated", + ): + neb_doc_dict[k] = datetime.fromisoformat(neb_doc_dict[k]["string"]) + + neb_doc = NebTaskDoc(**neb_doc_dict) + + assert neb_doc.num_images == 3 + assert len(neb_doc.image_structures) == neb_doc.num_images + assert len(neb_doc.energies) == neb_doc.num_images + assert ( + len(neb_doc.structures) == neb_doc.num_images + 2 + ) # always includes endpoints + assert isinstance(neb_doc.orig_inputs, OrigInputs) + + # test that NEB image calculations are all VASP Calculation objects + assert len(neb_doc.image_calculations) == neb_doc.num_images + for image_idx, image_calc in enumerate(neb_doc.image_calculations): + assert_schemas_equal(image_calc, neb_doc_dict["image_calculations"][image_idx]) + assert isinstance(image_calc, Calculation) + + assert isinstance(neb_doc.inputs, InputDoc) + + # check NEB method parsing + if neb_doc.inputs.incar.get("LCLIMB", False): + assert neb_doc.neb_method == NebMethod.CLIMBING_IMAGE + else: + assert neb_doc.neb_method == NebMethod.STANDARD + + # check that endpoint structures exist + assert all(isinstance(ep, Structure) for ep in neb_doc.endpoint_structures) + + # Check that image calculation dirs all have common root / expected format + assert all( + image_dir.startswith(neb_doc.dir_name) + for image_dir in neb_doc.image_directories + ) + assert all( + Path(neb_doc.image_directories[image_idx]) + == Path(neb_doc.dir_name) / f"{image_idx+1:02}" + for image_idx in range(neb_doc.num_images) + ) + + # Check that VASP objects pre-allocated for each image calc + assert len(neb_doc.image_objects) == neb_doc.num_images + + assert all( + energy == pytest.approx(neb_doc.image_calculations[idx].output.energy) + for idx, energy in enumerate(neb_doc.image_energies) + ) + assert len(neb_doc.image_energies) == neb_doc.num_images + + +def test_from_directories(test_dir): + with TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + shutil.unpack_archive(test_dir / "neb_sample_calc.zip", tmpdir, "zip") + neb_doc = NebTaskDoc.from_directories( + [tmpdir / f"relax_endpoint_{idx+1}" for idx in range(2)], + tmpdir / "neb", + ) + + assert all( + isinstance(ep_calc, Calculation) for ep_calc in neb_doc.endpoint_calculations + ) + + assert all("relax_endpoint_" in ep_dir for ep_dir in neb_doc.endpoint_directories) + + assert len(neb_doc.energies) == neb_doc.num_images + 2 + assert len(neb_doc.structures) == neb_doc.num_images + 2 + assert isinstance(neb_doc.barrier_analysis, dict) + + assert all( + neb_doc.barrier_analysis.get(k) is not None + for k in ( + "energies", + "frame_index", + "cubic_spline_pars", + "ts_frame_index", + "ts_energy", + "ts_in_frames", + "forward_barrier", + "reverse_barrier", + ) + ) + + assert all( + getattr(neb_doc, f"{direction}_barrier") + == neb_doc.barrier_analysis[f"{direction}_barrier"] + for direction in ("forward", "reverse") + ) diff --git a/test_files/Si_neb_doc.json.bz2 b/test_files/Si_neb_doc.json.bz2 new file mode 100644 index 0000000000..34264c16a3 Binary files /dev/null and b/test_files/Si_neb_doc.json.bz2 differ diff --git a/test_files/neb_sample_calc.zip b/test_files/neb_sample_calc.zip new file mode 100644 index 0000000000..0395f98496 Binary files /dev/null and b/test_files/neb_sample_calc.zip differ