diff --git a/.gitignore b/.gitignore index 64b07fd..7b38327 100644 --- a/.gitignore +++ b/.gitignore @@ -107,3 +107,9 @@ ENV/ # In-tree generated files */_version.py +scripts/test_stability_protocol/ +*pdb +*dcd +*pt +test_stability_protocol/ +guardowl/data/drugbank/ \ No newline at end of file diff --git a/guardowl/benchmark.py b/guardowl/benchmark.py index 67a9baa..9c52cc2 100644 --- a/guardowl/benchmark.py +++ b/guardowl/benchmark.py @@ -100,7 +100,7 @@ def run(self) -> None: print(f"{self.implementation=} {self.platform=}") potential = MLPotential(self.nnp) - system = self.system_factory.initialize_pure_ml_system( + system = self.system_factory.initialize_ml_system( potential, self.testsystem.topology, self.remove_constraints, diff --git a/guardowl/data/drugbank.tar.gz b/guardowl/data/drugbank.tar.gz new file mode 100644 index 0000000..a92dc7d Binary files /dev/null and b/guardowl/data/drugbank.tar.gz differ diff --git a/guardowl/parameters.py b/guardowl/parameters.py index e7f6ab9..a8aedf6 100644 --- a/guardowl/parameters.py +++ b/guardowl/parameters.py @@ -7,6 +7,24 @@ @dataclass class BaseParameters: + """ + Base class for simulation parameters. + + Attributes + ---------- + system : System + The OpenMM System object to be used for the simulation. + platform : Platform + The OpenMM Platform object specifying the computation platform. + testsystem : TestSystem + The test system object containing the molecular system setup. + output_folder : str + Directory path where output files will be saved. + log_file_name : str + Filename for the log file. + + """ + system: System platform: Platform testsystem: TestSystem @@ -16,7 +34,36 @@ class BaseParameters: @dataclass class MinimizationTestParameters(BaseParameters): - convergence_criteria: unit.Quantity = field(default_factory=1.0 * unit.kilojoule_per_mole / unit.nanometer) + """ + Parameters specific to minimization tests. + + Attributes + ---------- + convergence_criteria : unit.Quantity + The energy convergence criteria for the minimization process. + env : str + The environment of the simulation (e.g., 'vacuum', 'solution'). + device_index : int + Index of the GPU device to use for the simulation. + temperature : unit.Quantity + The temperature at which the simulation is performed. + ensemble : str + The statistical ensemble for the simulation (e.g., 'NVT'). + + """ + + convergence_criteria: unit.Quantity = field( + default_factory=lambda: unit.Quantity( + 0.5, unit.kilojoule_per_mole / unit.angstrom + ) + ) + env: str = "vacuum" + device_index: int = 0 + temperature: unit.Quantity = field( + default_factory=lambda: unit.Quantity(300, unit.kelvin) + ) + + ensemble: str = "NVT" @dataclass @@ -28,29 +75,24 @@ class StabilityTestParameters(BaseParameters): Attributes ---------- protocol_length : int - Length of the protocol in time units. + The duration of the test protocol. temperature : unit.Quantity - Temperature of the simulation. - ensemble : str - Ensemble type ('NVT', 'NPT', etc.). + The temperature at which the test is conducted. + env : str + The environment for the simulation (e.g., 'vacuum', 'solution'). simulated_annealing : bool - Whether simulated annealing is to be used. - system : System - The OpenMM System object. - platform : Platform - The OpenMM Platform object. - testsystem : TestSystem - The test system for the simulation. - output_folder : str - Path to the output folder. - log_file_name : str - Name of the log file. + Flag to indicate if simulated annealing is used. state_data_reporter : StateDataReporter - The OpenMM StateDataReporter object. + The OpenMM StateDataReporter object for logging. + device_index : int + Index of the GPU device to use for the simulation. + ensemble : Optional[str] + The statistical ensemble for the simulation (e.g., 'NVT', 'NPT'). None if not applicable. + """ protocol_length: int - temperature: Union[int, List[int]] + temperature: unit.Quantity env: str simulated_annealing: bool state_data_reporter: StateDataReporter diff --git a/guardowl/protocols.py b/guardowl/protocols.py index c0d5cac..7ecc337 100644 --- a/guardowl/protocols.py +++ b/guardowl/protocols.py @@ -1,309 +1,23 @@ import csv import os import sys -from abc import ABC -from dataclasses import dataclass, field -from typing import List, TextIO, Tuple, Union, Optional +from typing import List, Tuple, Union, Optional, Dict -import mdtraj as md import numpy as np -import openmm from loguru import logger as log -from openmm import Platform, State, System, unit -from openmm.app import DCDReporter, PDBFile, Simulation, StateDataReporter, Topology -from openmmtools.testsystems import TestSystem +from openmm import State, System, unit, Platform +from openmm.app import Simulation, StateDataReporter, Topology -from .simulation import SimulationFactory from .parameters import ( StabilityTestParameters, - DOFTestParameters, MinimizationTestParameters, + DOFTestParameters, ) +from .reporter import ContinuousProgressReporter +from .simulation import initialize_ml_system -def initialize_ml_system(nnp: str, topology: Topology, implementation: str) -> System: - from openmmml import MLPotential - - from guardowl.simulation import SystemFactory - - nnp_instance = MLPotential(nnp) - system = SystemFactory().initialize_pure_ml_system( - nnp_instance, topology, implementation=implementation - ) - return system - - -# StateDataReporter with custom print function -class ContinuousProgressReporter(object): - """A class for reporting the progress of a simulation continuously. - - Parameters - ---------- - iostream : TextIO - Output stream to write the progress report to. - total_steps : int - Total number of steps in the simulation. - reportInterval : int - Interval at which to report the progress. - - Attributes - ---------- - _out : TextIO - The output stream. - _reportInterval : int - The report interval. - _total_steps : int - Total number of steps. - """ - - def __init__(self, iostream: TextIO, total_steps: int, reportInterval: int): - self._out = iostream - self._reportInterval = reportInterval - self._total_steps = total_steps - - def describeNextReport(self, simulation: Simulation) -> Tuple: - """ - Returns information about the next report. - - Parameters - ---------- - simulation : Simulation - The simulation to report on. - - Returns - ------- - Tuple - A tuple containing information about the next report. - """ - steps = self._reportInterval - simulation.currentStep % self._reportInterval - return (steps, False, False, True, False, None) - - def report(self, simulation: Simulation, state: State) -> None: - """ - Reports the progress of the simulation. - - Parameters - ---------- - simulation : Simulation - The simulation to report on. - state : State - The state of the simulation. - """ - progress = 100.0 * simulation.currentStep / self._total_steps - self._out.write(f"\rProgress: {progress:.2f}") - self._out.flush() - - -class DOFTest(ABC): - """Abstract base class for DOF tests. - - Attributes - ---------- - potential_simulation_factory : SimulationFactory - Factory to generate simulation instances. - """ - - def __init__(self) -> None: - self.potential_simulation_factory = SimulationFactory() - - def _set_bond_length( - self, position: unit.Quantity, atom1: int, atom2: int, length: float - ) -> unit.Quantity: - """ - Sets the length of a bond between two atoms. - - Parameters - ---------- - position : unit.Quantity - The positions of the atoms. - atom1 : int - The index of the first atom. - atom2 : int - The index of the second atom. - length : float - The desired length of the bond. - - Returns - ------- - unit.Quantity - The new positions of the atoms. - """ - diff = (position[atom2] - position[atom1]).value_in_unit(unit.angstrom) - normalized_diff = diff / np.linalg.norm(diff) - new_positions = position.copy() - new_positions[atom2] = position[atom1] + normalized_diff * unit.Quantity( - length, unit.angstrom - ) - log.debug(f"{position=}") - log.debug(f"{new_positions=}") - return unit.Quantity(new_positions, unit.angstrom) - - def _perform_bond_scan( - self, - qsim: Simulation, - parameters: DOFTestParameters, - ) -> Tuple[List, List, List]: - """ - Performs a scan of the length of a bond between two atoms. - - Parameters - ---------- - qsim : Simulation - The simulation to perform the scan on. - parameters : DOFTestParameters - The parameters for the DOF test. - - Returns - ------- - Tuple[List, List, List] - A tuple containing the conformations, potential energies, and bond lengths. - """ - assert parameters.bond is not None - bond_atom1, bond_atom2 = parameters.bond - initial_pos = parameters.testsystem.positions - conformations, potential_energy, bond_length = [], [], [] - for b in np.linspace(0, 8, 100): # in angstrom - new_pos = self._set_bond_length( - initial_pos, - bond_atom1, - bond_atom2, - b, - ) - qsim.context.setPositions(new_pos) - energy = qsim.context.getState(getEnergy=True).getPotentialEnergy() - potential_energy.append(energy.value_in_unit(unit.kilojoule_per_mole)) - conformations.append(new_pos.value_in_unit(unit.nano * unit.meter)) - bond_length.append(b) - - def describeNextReport(self, simulation: Simulation) -> Tuple: - steps = self._reportInterval - simulation.currentStep % self._reportInterval - return (steps, False, False, True, False, None) - - def report(self, simulation: Simulation, state: State) -> None: - progress = 100.0 * simulation.currentStep / self._total_steps - self._out.write(f"\rProgress: {progress:.2f}") - self._out.flush() - - -@dataclass -class DOFTestParameters: - """ - A dataclass for storing parameters for a degree of freedom (DOF) test. - - Attributes - ---------- - protocol_length : int - The length of the protocol to run. - temperature : unit.Quantity - The temperature to run the simulation at. - ensemble : str - The ensemble to simulate. - simulated_annealing : bool - Whether to use simulated annealing. - system : System - The system to simulate. - platform : Platform - The platform to run the simulation on. - testsystem : TestSystem - The test system to simulate. - output_folder : str - The path to the output folder. - log_file_name : str - The name of the log file. - state_data_reporter : StateDataReporter - The reporter to use for state data. - """ - - system: System - platform: Platform - testsystem: TestSystem - output_folder: str - log_file_name: str - bond: List = field(default_factory=lambda: []) - angle: List = field(default_factory=lambda: []) - torsion: List = field(default_factory=lambda: []) - - -class DOFTest(ABC): - def __init__(self) -> None: - """ - Initializes a new instance of the StabilityTest class. - """ - self.potential_simulation_factory = SimulationFactory() - - def _set_bond_length( - self, position: unit.Quantity, atom1: int, atom2: int, length: float - ) -> unit.Quantity: - """ - Sets the bond length between two atoms in a given position. - - Parameters - ---------- - position : unit.Quantity - The initial position of the atoms. - atom1 : int - The index of the first atom. - atom2 : int - The index of the second atom. - length : float - The desired bond length. - - Returns - ------- - unit.Quantity - The new position of the atoms with the updated bond length. - """ - diff = (position[atom2] - position[atom1]).value_in_unit(unit.angstrom) - normalized_diff = diff / np.linalg.norm(diff) - new_positions = position.copy() - new_positions[atom2] = position[atom1] + normalized_diff * unit.Quantity( - length, unit.angstrom - ) - log.debug(f"{position=}") - log.debug(f"{new_positions=}") - return unit.Quantity(new_positions, unit.angstrom) - - def _perform_bond_scan( - self, - qsim: Simulation, - parameters: DOFTestParameters, - ) -> Tuple[List, List, List]: - """ - Performs a scan of the bond length between two atoms in a given position. - - Parameters - ---------- - qsim : Simulation - The simulation object. - parameters : DOFTestParameters - The parameters for the degree of freedom test. - - Returns - ------- - Tuple[List, List, List] - A tuple containing the potential energy, conformations, and bond length. - """ - assert parameters.bond is not None - bond_atom1, bond_atom2 = parameters.bond - initial_pos = parameters.testsystem.positions - conformations, potential_energy, bond_length = [], [], [] - for b in np.linspace(0, 8, 100): # in angstrom - new_pos = self._set_bond_length( - initial_pos, - bond_atom1, - bond_atom2, - b, - ) - qsim.context.setPositions(new_pos) - energy = qsim.context.getState(getEnergy=True).getPotentialEnergy() - potential_energy.append(energy.value_in_unit(unit.kilojoule_per_mole)) - conformations.append(new_pos.value_in_unit(unit.nano * unit.meter)) - bond_length.append(b) - log.debug(f"{b=}, {energy=}") - return (potential_energy, conformations, bond_length) # - - -class StabilityTest(ABC): +class StabilityTest: """ Abstract base class for stability tests on molecular systems. """ @@ -323,6 +37,8 @@ def __init__(self) -> None: ------- None """ + from .simulation import SimulationFactory + self.potential_simulation_factory = SimulationFactory() self.implemented_ensembles = ["npt", "nvt", "nve"] @@ -330,27 +46,7 @@ def __init__(self) -> None: def _get_name(cls) -> str: return cls.__name__ - def _run_simulation( - self, - parameters: StabilityTestParameters, - temperature: unit.Quantity, - ) -> None: - """ - Runs a simulation for stability tests on molecular systems. - - This method runs a simulation for stability tests on molecular systems. It takes in a StabilityTestParameters object and a temperature as input parameters. It checks if the simulated annealing flag is set to True or False and if the ensemble is implemented in the protocol. It then creates a simulation object using the SimulationFactory class and sets the positions of the system. It minimizes the energy of the system and runs a simulated annealing molecular dynamics simulation if the simulated annealing flag is set to True. Finally, it writes out the simulation data to files. - - Parameters - ---------- - parameters: StabilityTestParameters - The parameters for the stability test. - temperature: unit.Quantity - The temperature of the simulation. - - Returns - ------- - None - """ + def _assert_input(self, parameters: StabilityTestParameters): assert parameters.simulated_annealing in [ True, False, @@ -382,19 +78,17 @@ def _run_simulation( """ ) - system = parameters.system + @staticmethod + def _run_simulation( + parameters: StabilityTestParameters, + sim: Simulation, + ) -> None: + from openmm.app import DCDReporter, PDBFile + from pathlib import Path - qsim = SimulationFactory.create_simulation( - system, - parameters.testsystem.topology, - platform=parameters.platform, - temperature=temperature, - env=parameters.env, - device_index=parameters.device_index, - ensemble=ensemble, - ) + output_folder = Path(parameters.output_folder) + output_folder.mkdir(parents=True, exist_ok=True) - os.makedirs(parameters.output_folder, exist_ok=True) output_file_name = f"{parameters.output_folder}/{parameters.log_file_name}" PDBFile.writeFile( @@ -406,32 +100,18 @@ def _run_simulation( parameters.state_data_reporter._out = open( f"{output_file_name}.csv", "w" ) # NOTE: write to file - if parameters.state_data_reporter._step is False: + if not parameters.state_data_reporter._step: log.info("Setting step to True") parameters.state_data_reporter._step = True - qsim.context.setPositions(parameters.testsystem.positions) - - qsim.minimizeEnergy() - if parameters.simulated_annealing: - print("Running Simulated Annealing MD") - # every 1000 steps raise the temperature by 5 K, ending at 325 K - for temp in np.linspace(0, 300, 60): - qsim.step(100) - temp = unit.Quantity(temp, unit.kelvin) - qsim.integrator.setTemperature(temp) - if ensemble == "npt": - barostat = parameters.system.getForce(barostate_force_id) - barostat.setDefaultTemperature(temp) - - qsim.reporters.append( + sim.reporters.append( DCDReporter( f"{output_file_name}.dcd", parameters.state_data_reporter._reportInterval, ) ) # NOTE: align write out frequency of state reporter with dcd reporter - qsim.reporters.append(parameters.state_data_reporter) - qsim.reporters.append( + sim.reporters.append(parameters.state_data_reporter) + sim.reporters.append( ContinuousProgressReporter( sys.stdout, reportInterval=100, @@ -439,33 +119,98 @@ def _run_simulation( ) ) - qsim.step(parameters.protocol_length) + sim.step(parameters.protocol_length) + + @staticmethod + def _setup_simulation( + parameters: StabilityTestParameters, + minimization_tolerance: unit.Quantity = 1 + * unit.kilojoule_per_mole + / unit.angstrom, + minimize: bool = True, + ) -> Simulation: + """ + Sets up and optionally minimizes a simulation based on the provided parameters, + and runs simulated annealing if specified. + + Parameters + ---------- + parameters : StabilityTestParameters + The parameters defining the simulation environment, system, and conditions under which the simulation will be run. + minimization_tolerance : unit.Quantity, optional + The energy tolerance to which the system will be minimized. Default is 1 kJ/mol/Å. + minimize : bool, optional + Flag to determine whether the system should be energy-minimized before simulation. Default is True. + + Returns + ------- + Simulation + The configured OpenMM Simulation object. + + """ + + from .simulation import SimulationFactory + + system = parameters.system + + sim = SimulationFactory.create_simulation( + system, + parameters.testsystem.topology, + platform=parameters.platform, + temperature=parameters.temperature, + env=parameters.env, + device_index=parameters.device_index, + ensemble=parameters.ensemble, + ) + + # Set initial positions + sim.context.setPositions(parameters.testsystem.positions) + + # Perform energy minimization if requested + if minimize: + log.info("Minimizing energy") + log.debug(f"{minimization_tolerance=}") + sim.minimizeEnergy(tolerance=minimization_tolerance) + log.info("Energy minimization complete.") + + # Execute simulated annealing if enabled + if getattr(parameters, "simulated_annealing", False): + log.info("Running Simulated Annealing MD...") + # every 100 steps raise the temperature by 10 K, ending at simulation temperatue + for temp in np.linspace( + 0, parameters.temperature.unit_in_quantity(unit.kelvin), 10 + ): + sim.step(100) + temp = unit.Quantity(temp, unit.kelvin) + sim.integrator.setTemperature(temp) + if parameters.output_folderensemble == "npt": + # FIXME + barostat = parameters.system.getForce(barostate_force_id) + barostat.setDefaultTemperature(temp) + + return sim def perform_stability_test(self, params: StabilityTestParameters) -> None: raise NotImplementedError() -class BondProfileProtocol(DOFTest): +from abc import ABC, abstractmethod + + +class DOFTest(ABC): def __init__(self) -> None: """ - Initializes the StabilityTest class by calling the constructor of its parent class. + Initializes the DOFTest class, setting up the required simulation factory for conducting tests. """ - super().__init__() + from .simulation import SimulationFactory - def perform_bond_scan(self, parameters: DOFTestParameters) -> None: - """ - Performs a bond scan on the given system and test system using the given parameters. + self.potential_simulation_factory = SimulationFactory() - Parameters: - ----------- - parameters : DOFTestParameters - The parameters to use for the bond scan. + def setup_simulation(self, parameters: DOFTestParameters) -> Simulation: + from openmm.app import PDBFile + from .simulation import SimulationFactory - Returns: - -------- - None - """ - qsim = SimulationFactory.create_simulation( + sim = SimulationFactory.create_simulation( parameters.system, parameters.testsystem.topology, platform=parameters.platform, @@ -473,26 +218,132 @@ def perform_bond_scan(self, parameters: DOFTestParameters) -> None: env="vacuum", ) - PDBFile.writeFile( - parameters.testsystem.topology, - parameters.testsystem.positions, - open(f"{parameters.output_folder}/{parameters.log_file_name}.pdb", "w"), - ) + # write pdb file + pdb_path = f"{parameters.output_folder}/{parameters.log_file_name}.pdb" + with open(pdb_path, "w") as pdb_file: - (potential_energy, conformations, bond_length) = super()._perform_bond_scan( - qsim, parameters + PDBFile.writeFile( + parameters.testsystem.topology, + parameters.testsystem.positions, + pdb_path, + ) + return sim + + def perform_scan(self, parameters: DOFTestParameters) -> None: + """ + Conducts a bond length scan for a specified system and saves the results. + + Parameters + ---------- + parameters : DOFTestParameters + The parameters defining the bond scan, including the system, platform, and output details. + """ + import mdtraj as md + + sim = self.setup_simulation(parameters) + potential_energy, conformations, bond_length = self.perform_DOF_scan( + sim, parameters ) + + file_path = f"{parameters.output_folder}/{parameters.log_file_name}" md.Trajectory(conformations, parameters.testsystem.topology).save( - f"{parameters.output_folder}/{parameters.log_file_name}.dcd" + f"{file_path}.dcd" ) # write csv file generated from bond_length and potential_energy - with open( - f"{parameters.output_folder}/{parameters.log_file_name}.csv", "w" - ) as f: + with open(f"{file_path}.csv", "w") as f: writer = csv.writer(f) writer.writerow(["bond distance [A]", "potential energy [kJ/mol]"]) writer.writerows(zip(bond_length, potential_energy)) + @abstractmethod + def perform_DOF_scan( + self, + sim: Simulation, + parameters: DOFTestParameters, + ): + pass + + +class BondProfileProtocol(DOFTest): + def __init__(self) -> None: + """ + Initializes the BondProfileProtocol class. + """ + super().__init__() + + def set_bond_length( + self, position: unit.Quantity, atom1: int, atom2: int, length: float + ) -> unit.Quantity: + """ + Adjusts the bond length between two specified atoms in a given set of positions. + + Parameters + ---------- + position : unit.Quantity + The current positions of all atoms in the system. + atom1 : int + The index of the first atom in the bond. + atom2 : int + The index of the second atom in the bond. + length : float + The desired bond length (in angstroms). + + Returns + ------- + unit.Quantity + The updated positions of all atoms in the system. + """ + diff = (position[atom2] - position[atom1]).value_in_unit(unit.angstrom) + normalized_diff = diff / np.linalg.norm(diff) + new_positions = position.copy() + new_positions[atom2] = position[atom1] + normalized_diff * unit.Quantity( + length, unit.angstrom + ) + return unit.Quantity(new_positions, unit.angstrom) + + def perform_DOF_scan( + self, + sim: Simulation, + parameters: DOFTestParameters, + ) -> Tuple[List[float], List[unit.Quantity], List[float]]: + """ + Performs a scan of the bond length between two atoms in a given position. + + Parameters + ---------- + sim : Simulation + The simulation object. + parameters : DOFTestParameters + The parameters for the degree of freedom test. + + Returns + ------- + Tuple[List[float], List[unit.Quantity], List[float]] + Lists of potential energies, conformations, and bond lengths, respectively. + """ + assert parameters.bond, "Bond parameters must be specified for a bond scan." + bond_atom1, bond_atom2 = parameters.bond + initial_pos = parameters.testsystem.positions + conformations, potential_energy, bond_length = [], [], [] + + for b in np.linspace(0, 8, 80): # in angstrom + new_pos = self.set_bond_length( + initial_pos, + bond_atom1, + bond_atom2, + b, + ) + sim.context.setPositions(new_pos) + state = sim.context.getState(getEnergy=True, getPositions=True) + energy = state.getPotentialEnergy() + potential_energy.append(energy.value_in_unit(unit.kilojoule_per_mole)) + conformations.append( + state.getPositions(asNumpy=True).value_in_unit(unit.nanometer) + ) + bond_length.append(b) + + return (potential_energy, conformations * unit.nanometer, bond_length) + class PropagationProtocol(StabilityTest): def __init__(self) -> None: @@ -511,7 +362,57 @@ def perform_stability_test( ) -> None: assert isinstance(parms.temperature, int) - self._run_simulation(parms, parms.temperature * unit.kelvin) + parms.log_file_name = f"{parms.log_file_name}_{parms.temperature}" + + self._assert_input(parms) + qsim = self._setup_simulation(parms) + self._run_simulation(parms, qsim) + + +class MinimizationProtocol(StabilityTest): + def __init__(self) -> None: + """ + Initializes the MinimizationProtocol class by calling the constructor of its parent class and setting the ensemble. + + Returns: + -------- + None + """ + super().__init__() + + def _assert_input(self, parameters: MinimizationTestParameters): + log.debug( + f""" +------------------------------------------------------------------------------------ +Minimization test parameters: +params.convergence_criteria: {parameters.convergence_criteria} +params.env: {parameters.env} +params.platform: {parameters.platform.getName()} +params.device_index: {parameters.device_index} +params.output_folder: {parameters.output_folder} +params.log_file_name: {parameters.log_file_name} +------------------------------------------------------------------------------------ + """ + ) + + def perform_stability_test( + self, parms: MinimizationTestParameters, minimize: bool = True + ) -> State: + from openmm.app import PDBFile + + self._assert_input(parms) + qsim = self._setup_simulation( + parms, minimization_tolerance=parms.convergence_criteria, minimize=minimize + ) + + output_file_name = f"{parms.output_folder}/{parms.log_file_name}" + state = qsim.context.getState(getPositions=True, getEnergy=True) + PDBFile.writeFile( + parms.testsystem.topology, + state.getPositions(), + open(f"{output_file_name}.pdb", "w"), + ) + return state class MultiTemperatureProtocol(PropagationProtocol): @@ -538,18 +439,22 @@ def perform_stability_test(self, parms: StabilityTestParameters) -> None: -------- None """ - log_file_name_ = parms.log_file_name + import dataclasses + if not isinstance(parms.temperature, list): raise RuntimeError( "You need to provide mutliple temperatures to run the MultiTemperatureProtocol." ) + for temperature in parms.temperature: - parms.log_file_name = f"{log_file_name_}_{temperature}" - log.info("Running simulation at temperature: {temperature} K") - self._run_simulation( - parms, - temperature * unit.kelvin, - ) + _parms = dataclasses.replace(parms) + _parms.temperature = temperature + _parms.log_file_name = f"{parms.log_file_name}_{temperature}K" + log.info(f"Running simulation at temperature: {temperature} K") + self._assert_input(_parms) + + qsim = self._setup_simulation(_parms) + self._run_simulation(_parms, qsim) def run_hipen_protocol( @@ -558,40 +463,59 @@ def run_hipen_protocol( implementation: str, temperature: Union[int, List[int]], reporter: StateDataReporter, - platform: openmm.Platform, + platform: Platform, output_folder: str, device_index: int = 0, nr_of_simulation_steps: int = 5_000_000, ): """ - Perform a stability test for a hipen molecule in vacuum - at multiple temperatures with a nnp/implementation. - :param hipen_idx: The index of the hipen molecule to simulate. - :param nnp: The neural network potential to use. - :param implementation: The implementation to use. - :param nr_of_simulation_steps: The number of simulation steps to perform (default=5_000_000). + Executes stability tests for specified hipen molecules in vacuum using a neural network potential (NNP) + with a specific implementation at multiple temperatures. + + Parameters + ---------- + hipen_idx : Union[int, List[int]] + The index or indices of the hipen molecule(s) to simulate. + nnp : str + The neural network potential to use for the simulation. + implementation : str + The specific implementation of the NNP. + temperature : Union[int, List[int]] + The temperature or list of temperatures at which to perform the simulations. + Multiple temperatures trigger a multi-temperature protocol. + reporter : StateDataReporter + The OpenMM StateDataReporter for logging simulation progress. + platform : Platform + The OpenMM Platform on which to run the simulations. + output_folder : str + The directory path where output files will be saved. + device_index : int, optional + The index of the GPU device to use for the simulations, defaults to 0. + nr_of_simulation_steps : int, optional + The total number of simulation steps to perform, defaults to 5,000,000. + """ - from guardowl.testsystems import HipenTestsystemFactory, hipen_systems + from guardowl.testsystems import SmallMoleculeTestsystemFactory, hipen_systems def _run_protocol(hipen_idx: int): name = list(hipen_systems.keys())[hipen_idx] - print( - f""" ------------------------------------------------------------------------------------- -| Performing vacuum stability test for {name} from the hipen dataset in vacuum. -| The simulation will use the {nnp} potential with the {implementation} implementation. ------------------------------------------------------------------------------------- - """ + log.info( + f"Performing vacuum stability test for {name} using {nnp} with {implementation}." ) - testsystem = HipenTestsystemFactory().generate_testsystems(name) + testsystem = SmallMoleculeTestsystemFactory().generate_testsystems_from_name( + name + ) system = initialize_ml_system(nnp, testsystem.topology, implementation) log_file_name = f"vacuum_{name}_{nnp}_{implementation}" - if isinstance(temperature, int): - stability_test = PropagationProtocol() - else: - stability_test = MultiTemperatureProtocol() + + # Select protocol based on whether temperature is a list or a single value + stability_test = ( + MultiTemperatureProtocol() + if isinstance(temperature, list) + else PropagationProtocol() + ) params = StabilityTestParameters( protocol_length=nr_of_simulation_steps, @@ -609,13 +533,14 @@ def _run_protocol(hipen_idx: int): ) stability_test.perform_stability_test(params) - print(f"\nSaving {params.log_file_name} files to {params.output_folder}") + log.info(f"\nSaving {params.log_file_name} files to {params.output_folder}") - if isinstance(hipen_idx, int): - _run_protocol(hipen_idx) - else: - for hipen_idx_ in hipen_idx: - _run_protocol(hipen_idx_) + # Run protocol for each specified hipen index + if isinstance(hipen_idx, int): + _run_protocol(hipen_idx) + else: + for idx in hipen_idx: + _run_protocol(idx) def run_waterbox_protocol( @@ -625,7 +550,7 @@ def run_waterbox_protocol( implementation: str, temperature: Union[int, List[int]], reporter: StateDataReporter, - platform: openmm.Platform, + platform: Platform, output_folder: str, device_index: int = 0, annealing: bool = False, @@ -633,34 +558,55 @@ def run_waterbox_protocol( nr_of_equilibrium_steps: int = 50_000, ): """ - Perform a stability test for a waterbox with a given edge size - in PBC in an ensemble and with a nnp/implementation. - :param edge_length: The edge length of the waterbox in Angstrom. - :param ensemble: The ensemble to simulate in. - :param nnp: The neural network potential to use. - :param implementation: The implementation to use. - :param annealing: Whether to perform simulated annealing (default=False). - :param nr_of_simulation_steps: The number of simulation steps to perform (default=5_000_000). + Performs a stability test on a waterbox system with specified edge length using a + neural network potential (NNP) and implementation in a given ensemble at multiple temperatures. + + Parameters + ---------- + edge_length : int + The edge length of the waterbox in Angstroms. + ensemble : str + The ensemble to simulate (e.g., 'NVT', 'NPT'). + nnp : str + The neural network potential to use. + implementation : str + The specific implementation of the NNP. + temperature : Union[int, List[int]] + The simulation temperature or list of temperatures for multi-temperature protocols. + reporter : StateDataReporter + The OpenMM StateDataReporter for logging simulation progress. + platform : Platform + The OpenMM Platform on which to run the simulation. + output_folder : str + Directory where output files will be saved. + device_index : int, optional + The index of the GPU device to use, defaults to 0. + annealing : bool, optional + Whether to perform simulated annealing, defaults to False. + nr_of_simulation_steps : int, optional + Total number of simulation steps, defaults to 5,000,000. + nr_of_equilibrium_steps : int, optional + Number of equilibrium steps before the stability test, defaults to 50,000. + """ + log.info( + f"Initiating waterbox stability test: {edge_length}A edge, {nnp} potential, {implementation} implementation, {ensemble} ensemble." + ) from openmm import unit from guardowl.testsystems import WaterboxTestsystemFactory - print( - f""" ------------------------------------------------------------------------------------- -| Performing waterbox stability test for a {edge_length} A waterbox in PBC. -| The simulation will use the {nnp} potential with the {implementation} implementation. ------------------------------------------------------------------------------------- - """ - ) - testsystem = WaterboxTestsystemFactory().generate_testsystems( edge_length * unit.angstrom, nr_of_equilibrium_steps ) system = initialize_ml_system(nnp, testsystem.topology, implementation) log_file_name = f"waterbox_{edge_length}A_{nnp}_{implementation}_{ensemble}" - log.info(f"Writing to {log_file_name}") + if isinstance(temperature, list): + log_file_name += f"_multi-temp" + else: + log_file_name += f"_{temperature}K" + + log.info(f"Simulation output will be written to {log_file_name}") stability_test = PropagationProtocol() @@ -680,18 +626,18 @@ def run_waterbox_protocol( ) stability_test.perform_stability_test(params) - print(f"\nSaving {params.log_file_name} files to {params.output_folder}") + log.info(f"Simulation files saved to {output_folder}") def run_pure_liquid_protocol( - molecule_name: Tuple[str, List[str]], - nr_of_molecule: Tuple[int, List[int]], + molecule_name: Union[str, List[str]], + nr_of_molecule: Union[int, List[int]], ensemble: str, nnp: str, implementation: str, temperature: Union[int, List[int]], reporter: StateDataReporter, - platform: openmm.Platform, + platform: Platform, output_folder: str, device_index: int = 0, annealing: bool = False, @@ -699,33 +645,51 @@ def run_pure_liquid_protocol( nr_of_equilibration_steps: int = 50_000, ): """ - Perform a stability test for a pure liquid with a given number of molecules - in PBC in an ensemble and with a nnp/implementation. - :param molecule_name: The name of the solvent molecule (ethane, butane, propane, methanol, cyclohexane, isobutane). - :param nr_of_molecule: The number of solvent molecules. - :param ensemble: The ensemble to simulate in. - :param nnp: The neural network potential to use. - :param implementation: The implementation to use. - :param annealing: Whether to perform simulated annealing (default=False). - :param nr_of_simulation_steps: The number of simulation steps to perform (default=5_000_000). + Executes stability tests for specified pure liquid systems, each containing a defined number of molecules, using a neural network potential (NNP) with a specified implementation at various temperatures. + + Parameters + ---------- + molecule_name : Union[str, List[str]] + The name or list of names of the solvent molecule(s) to simulate (e.g., 'ethane', 'butane'). + nr_of_molecule : Union[int, List[int]] + The number or list of numbers of solvent molecules for each solvent type to simulate. + ensemble : str + The ensemble to simulate (e.g., 'NVT', 'NPT'). + nnp : str + The neural network potential to use. + implementation : str + The specific implementation of the NNP. + temperature : Union[int, List[int]] + The simulation temperature(s) for the stability test. + reporter : StateDataReporter + The OpenMM StateDataReporter for logging simulation progress. + platform : Platform + The OpenMM Platform on which to run the simulation. + output_folder : str + The directory where output files will be saved. + device_index : int, optional + The index of the GPU device to use, defaults to 0. + annealing : bool, optional + Whether to perform simulated annealing, defaults to False. + nr_of_simulation_steps : int, optional + The total number of simulation steps, defaults to 5,000,000. + nr_of_equilibration_steps : int, optional + The number of equilibration steps before the stability test, defaults to 50,000. + """ from guardowl.testsystems import PureLiquidTestsystemFactory - if isinstance(molecule_name, str): - molecule_name_ = [molecule_name] - nr_of_molecule_ = [nr_of_molecule] - else: - molecule_name_ = molecule_name - nr_of_molecule_ = nr_of_molecule + # Ensure inputs are listified for uniform processing + molecule_names = ( + [molecule_name] if isinstance(molecule_name, str) else molecule_name + ) + nr_of_molecules = ( + [nr_of_molecule] if isinstance(nr_of_molecule, int) else nr_of_molecule + ) - for name, n_atoms in zip(molecule_name_, nr_of_molecule_): - print( - f""" - ------------------------------------------------------------------------------------ - | Performing pure liquid stability test for {n_atoms} {name} in PBC. - | The simulation will use the {nnp} potential with the {implementation} implementation. - ------------------------------------------------------------------------------------ - """ + for name, n_atoms in zip(molecule_names, nr_of_molecules): + log.info( + f"Initiating pure liquid stability test for {n_atoms} molecules of {name} at {temperature}K." ) testsystem = PureLiquidTestsystemFactory().generate_testsystems( @@ -735,10 +699,12 @@ def run_pure_liquid_protocol( ) system = initialize_ml_system(nnp, testsystem.topology, implementation) - log_file_name = ( - f"pure_liquid_{name}_{n_atoms}_{nnp}_{implementation}_{ensemble}" + temperature_str = ( + f"{temperature}K" if isinstance(temperature, int) else "multi-temp" ) - log.info(f"Writing to {log_file_name}") + log_file_name = f"pure_liquid_{name}_{n_atoms}_{nnp}_{implementation}_{ensemble}_{temperature_str}" + + log.info(f"Simulation output will be written to {log_file_name}") stability_test = PropagationProtocol() @@ -758,7 +724,7 @@ def run_pure_liquid_protocol( ) stability_test.perform_stability_test(params) - print(f"\nSaving {params.log_file_name} files to {params.output_folder}") + log.info(f"Simulation files saved to {output_folder}") def run_alanine_dipeptide_protocol( @@ -766,7 +732,7 @@ def run_alanine_dipeptide_protocol( implementation: str, temperature: int, reporter: StateDataReporter, - platform: openmm.Platform, + platform: Platform, output_folder: str, device_index: int = 0, ensemble: Optional[str] = None, @@ -775,34 +741,45 @@ def run_alanine_dipeptide_protocol( env: str = "vacuum", ): """ - Perform a stability test for an alanine dipeptide in water - in PBC in an ensemble and with a nnp/implementation. - :param env: The environment to simulate in (either vacuum or solution). - :param nnp: The neural network potential to use. - :param implementation: The implementation to use. - :param ensemble: The ensemble to simulate in (default=''). - :param nr_of_simulation_steps: The number of simulation steps to perform (default=5_000_000). - """ - from guardowl.testsystems import AlaninDipeptideTestsystemFactory + Executes a stability test for an alanine dipeptide system within specified environmental conditions using a neural network potential (NNP) and its implementation. - print( - f""" ------------------------------------------------------------------------------------- -| Performing alanine dipeptide stability test in {env}. -| The simulation will use the {nnp} potential with the {implementation} implementation. ------------------------------------------------------------------------------------- - """ + Parameters + ---------- + nnp : str + The neural network potential to use for the simulation. + implementation : str + The specific implementation of the NNP. + temperature : int + The temperature at which to perform the simulation, in Kelvin. + reporter : StateDataReporter + The OpenMM StateDataReporter for logging simulation progress. + platform : Platform + The OpenMM Platform on which to run the simulation. + output_folder : str + The directory where output files will be saved. + device_index : int, optional + The index of the GPU device to use, defaults to 0. + ensemble : Optional[str], optional + The ensemble to simulate (e.g., 'NVT', 'NPT'), defaults to None. + annealing : bool, optional + Whether to perform simulated annealing, defaults to False. + nr_of_simulation_steps : int, optional + The total number of simulation steps, defaults to 5,000,000. + env : str, optional + The environment to simulate in ('vacuum' or 'solution'), defaults to 'vacuum'. + + """ + log.info( + f"Initiating alanine dipeptide stability test in {env} using {nnp} potential with {implementation} implementation." ) + from guardowl.testsystems import AlaninDipeptideTestsystemFactory testsystem = AlaninDipeptideTestsystemFactory().generate_testsystems(env=env) system = initialize_ml_system(nnp, testsystem.topology, implementation) - assert env in ["vacuum", "solution"], f"Invalid input: {env}" - if env == "vacuum": - log_file_name = f"alanine_dipeptide_{env}_{nnp}_{implementation}" - else: - log_file_name = f"alanine_dipeptide_{env}_{nnp}_{implementation}_{ensemble}" + env_str = "vacuum" if env == "vacuum" else f"{env}_{ensemble}" + log_file_name = f"alanine_dipeptide_{env_str}_{nnp}_{implementation}_{temperature}K" - log.info(f"Writing to {log_file_name}") + log.info(f"Simulation output will be written to {log_file_name}") stability_test = PropagationProtocol() @@ -822,55 +799,255 @@ def run_alanine_dipeptide_protocol( ) stability_test.perform_stability_test(params) - print(f"\nSaving {params.log_file_name} files to {params.output_folder}") + log.info(f"Simulation files saved to {output_folder}") def run_DOF_scan( nnp: str, implementation: str, - DOF_definition: dict, - platform: openmm.Platform, + DOF_definition: Dict[str, list], + platform: Platform, output_folder: str, name: str = "ethanol", ): """ - Perform a scan on a selected DOF. - :param nnp: The neural network potential to use. - :param implementation: The implementation to use. - :param DOF_definition: The DOF that is scanned. Allowed key values are 'bond', 'angle' and 'torsion', - :param name: The name of the molecule to simulation (default='ethanol'). - values are a list of appropriate atom indices. + Executes a scan over a specified degree of freedom (DOF) for a given molecule using a neural + network potential (NNP) and its implementation. + + Parameters + ---------- + nnp : str + The neural network potential to use for the simulation. + implementation : str + The specific implementation of the NNP. + DOF_definition : Dict[str, list] + The degrees of freedom to scan. Supported keys are 'bond', 'angle', and 'torsion'. Each key maps to a list of atom indices defining the DOF. + platform : Platform + The OpenMM Platform on which to run the simulation. + output_folder : str + The directory where output files will be saved. + name : str, optional + The name of the molecule for simulation, defaults to 'ethanol'. + """ + log.info( + f"Initiating DOF scan for {name} using {nnp} with {implementation} implementation." + ) from guardowl.protocols import BondProfileProtocol, DOFTestParameters from guardowl.testsystems import SmallMoleculeTestsystemFactory - print( - f""" ------------------------------------------------------------------------------------- -| Performing scan on a selected DOG for {name}. -| The scan will use the {nnp} potential with the {implementation} implementation. ------------------------------------------------------------------------------------- - """ + testsystem = SmallMoleculeTestsystemFactory().generate_testsystems_from_name(name) + system = initialize_ml_system(nnp, testsystem.topology, implementation) + + log_file_name = f"DOF_scan_{name}_{nnp}_{implementation}" + + if "bond" in DOF_definition: + protocol = BondProfileProtocol() + dof_type = "bond" + elif "angle" in DOF_definition: + raise NotImplementedError("Angle DOF scans are not yet implemented.") + elif "torsion" in DOF_definition: + raise NotImplementedError("Torsion DOF scans are not yet implemented.") + else: + raise ValueError( + "Unsupported DOF type. Supported types are: 'bond', 'angle', 'torsion'." + ) + + params = DOFTestParameters( + system=system, + platform=platform, + testsystem=testsystem, + output_folder=output_folder, + log_file_name=log_file_name, + **DOF_definition, ) + log.info( + f"Performing {dof_type} scan with DOF definition: {DOF_definition[dof_type]}" + ) + protocol.perform_scan(params) + log.info(f"Scan results saved to {output_folder}") - testsystem = SmallMoleculeTestsystemFactory().generate_testsystems(name) - system = initialize_ml_system(nnp, testsystem.topology, implementation) - log_file_name = f"vacuum_{testsystem.testsystem_name}_{nnp}_{implementation}" +def run_detect_minimum( + nnp: str, + implementation: str, + platform: Platform, + output_folder: str, + percentage: int = 10, + only_molecules_below_10_heavy_atoms: bool = False, +) -> Dict[str, Tuple[float, float]]: + """ + Performs a minimization test on a subset of molecules from the DrugBank database, comparing the energy minimized conformations between DFT and a specified neural network potential (NNP). + + Parameters + ---------- + nnp : str + The neural network potential to use for the minimization test. + implementation : str + The implementation details of the neural network potential. + platform : Platform + The OpenMM Platform to perform simulations on. + output_folder : str + The directory where output files will be saved. + percentage : int, optional + The percentage of the total number of molecules to test, defaults to 10. + only_molecules_below_10_heavy_atoms : bool, optional + If True, only tests molecules with fewer than 10 heavy atoms, defaults to False. + + Returns + ------- + Dict[str, Tuple[float, float]] + A dictionary with molecule names as keys and tuples of RMSD and energy difference as values. + """ + from guardowl.testsystems import SmallMoleculeTestsystemFactory + import mdtraj as md + from .utils import ( + extract_drugbank_tar_gz, + _generate_file_list_for_minimization_test, + _generate_input_for_minimization_test, + ) + + # Extract DrugBank tar.gz file and prepare input files + extract_drugbank_tar_gz() + files = _generate_file_list_for_minimization_test(shuffle=True) + + # calculate the number of molecules to test + nr_of_molecules = files["total_number_of_systems"] + nr_of_molecules_to_test = int(nr_of_molecules * (percentage / 100)) + + score = {} + counter = 0 + + log.info( + f"Performing minimization for {nr_of_molecules_to_test} molecules using {nnp} with {implementation}." + ) + + for (minimized_file, minimized_position), ( + start_file, + start_position, + ) in _generate_input_for_minimization_test(files): + log.info(f"Minimization test: {counter}/{nr_of_molecules_to_test}") + + from openff.toolkit.topology import Molecule + + # Extract directory and name of the molecule file + working_dir = "".join(start_file.split("/")[-1]) + name = os.path.basename(working_dir.removesuffix(".xyz")) + + sdf_file = "".join(start_file.split(".")[0]) + ".sdf" + mol = Molecule.from_file(sdf_file, allow_undefined_stereo=True) + + + + # test if not implemented elements are in molecule, if yes skip + def _contains_unknown_elements(mol: Molecule) -> bool: + for atom in mol.atoms: + if atom.atomic_number >= 15: + log.debug(f"Skipping {name} because it contains unknown elements") + return True + return False + + # test if molecules has below 10 heavy atoms, if yes return True + def _below_10_heavy_atoms(mol: Molecule) -> bool: + heavy_atoms = 0 + for atom in mol.atoms: + if atom.atomic_number != 1: + heavy_atoms += 1 + if heavy_atoms > 10: + log.debug( + f"Skipping {name} because it has more than 10 heavy atoms: {heavy_atoms} heavy atoms" + ) + return False + log.debug( + f"Using {name} because it has less than 10 heavy atoms: {heavy_atoms} heavy atoms" + ) + return True - if DOF_definition["bond"]: - stability_test = BondProfileProtocol() - params = DOFTestParameters( + # ANI-2x is trained on limited elements, if molecule contains unknown elements skip + if _contains_unknown_elements(mol): + continue + + if only_molecules_below_10_heavy_atoms: + if not _below_10_heavy_atoms(mol): + continue + + ######################################### + ######################################### + # initialize the system that has been minimized using DFT + from openff.interchange.exceptions import UnassignedBondError + + try: + reference_testsystem = ( + SmallMoleculeTestsystemFactory().generate_testsystems_from_sdf(sdf_file) + ) + except (ValueError, UnassignedBondError) as e: + log.warning(f"Skipping {name} because of {e}") + continue + # set the minimized positions + reference_testsystem.positions = minimized_position + system = initialize_ml_system( + nnp, reference_testsystem.topology, implementation + ) + log_file_name = f"ref_{name}_{nnp}_{implementation}" + + params = MinimizationTestParameters( + platform=platform, system=system, + testsystem=reference_testsystem, + output_folder=output_folder, + log_file_name=log_file_name, + ) + + state = MinimizationProtocol().perform_stability_test(params, minimize=False) + reference_energy = state.getPotentialEnergy() + + reference_traj = md.Trajectory( + reference_testsystem.positions, reference_testsystem.topology + ) + ######################################### + ######################################### + # initialize the system that will be minimized using NNPs + + minimize_testsystem = reference_testsystem + minimize_testsystem.positions = start_position + + system = initialize_ml_system(nnp, minimize_testsystem.topology, implementation) + log_file_name = f"minimize_{name}_{nnp}_{implementation}" + + params = MinimizationTestParameters( platform=platform, - testsystem=testsystem, + system=system, + testsystem=minimize_testsystem, output_folder=output_folder, log_file_name=log_file_name, - bond=DOF_definition["bond"], ) - stability_test.perform_bond_scan(params) - elif DOF_definition["angle"]: - raise NotImplementedError - elif DOF_definition["torsion"]: - raise NotImplementedError + + state = MinimizationProtocol().perform_stability_test(params, minimize=True) + + minimized_energy = state.getPotentialEnergy() + minimize_testsystem.positions = state.getPositions(asNumpy=True) + + # calculate the energy error between the NNP minimized and the DFT minimized conformation + d_energy = abs(reference_energy - minimized_energy) + + minimized_traj = md.Trajectory( + minimize_testsystem.positions, minimize_testsystem.topology + ) + + # calculate the RMSD between the NNP minimized and the DFT minimized conformation + _score_minimized = md.rmsd(minimized_traj, reference_traj)[0] + + log.debug(f"RMSD: {_score_minimized}; Energy error: {d_energy}") + score[name] = (_score_minimized, d_energy._value) + + counter += 1 + if counter >= nr_of_molecules_to_test: + break + + # print the results to stdout + print(f"{'Name':<40} {'RMSD [A]':<20} {'Energy error [kJ/mol]':<20}") + for name, (rmsd, energy_error) in score.items(): + print(f"{name:<40} {rmsd:<20} {energy_error:<20}") + + return score diff --git a/guardowl/reporter.py b/guardowl/reporter.py new file mode 100644 index 0000000..cfe70be --- /dev/null +++ b/guardowl/reporter.py @@ -0,0 +1,63 @@ +from typing import TextIO, Tuple +from openmm import State, System, unit, Platform +from openmm.app import Simulation, StateDataReporter, Topology + +# StateDataReporter with custom print function +class ContinuousProgressReporter(object): + """A class for reporting the progress of a simulation continuously. + + Parameters + ---------- + iostream : TextIO + Output stream to write the progress report to. + total_steps : int + Total number of steps in the simulation. + reportInterval : int + Interval at which to report the progress. + + Attributes + ---------- + _out : TextIO + The output stream. + _reportInterval : int + The report interval. + _total_steps : int + Total number of steps. + """ + + def __init__(self, iostream: TextIO, total_steps: int, reportInterval: int): + self._out = iostream + self._reportInterval = reportInterval + self._total_steps = total_steps + + def describeNextReport(self, simulation: Simulation) -> Tuple: + """ + Returns information about the next report. + + Parameters + ---------- + simulation : Simulation + The simulation to report on. + + Returns + ------- + Tuple + A tuple containing information about the next report. + """ + steps = self._reportInterval - simulation.currentStep % self._reportInterval + return (steps, False, False, True, False, None) + + def report(self, simulation: Simulation, state: State) -> None: + """ + Reports the progress of the simulation. + + Parameters + ---------- + simulation : Simulation + The simulation to report on. + state : State + The state of the simulation. + """ + progress = 100.0 * simulation.currentStep / self._total_steps + self._out.write(f"\rProgress: {progress:.2f}") + self._out.flush() diff --git a/guardowl/setup.py b/guardowl/setup.py index 261c7fc..c484541 100644 --- a/guardowl/setup.py +++ b/guardowl/setup.py @@ -9,60 +9,65 @@ forcefield = ForceField("openff_unconstrained-2.0.0.offxml") -def generate_molecule(smiles: str, nr_of_conformations: int = 10) -> Molecule: +def generate_molecule_from_smiles( + smiles: str, nr_of_conformations: int = 10 +) -> Molecule: """ - Generate an openFF Molecule instance from a SMILES string and generate conformers. + Generates an OpenFF Molecule instance from a SMILES string and generates conformers for it. Parameters ---------- smiles : str The SMILES string representing the molecule. nr_of_conformations : int, optional - The number of conformers to generate for the molecule. Defaults to 10. + The number of conformers to generate for the molecule, by default 10. Returns ------- Molecule - The generated openFF Molecule instance with conformers. - + An OpenFF Molecule instance with generated conformers. """ molecule = Molecule.from_smiles(smiles, hydrogens_are_explicit=False) molecule.generate_conformers(n_conformers=nr_of_conformations) - # TODO: make sure that conformations are deterministic - # NOTE: packmole, Modeller, PDBfixer to solvate return molecule -def create_system_from_mol( - mol: Molecule, env: str = "vacuum" -) -> Tuple[System, Topology]: +def create_system_from_mol(mol: Molecule) -> Tuple[System, Topology]: """ - Create an OpenMM System and Topology instance from an openFF Molecule. + Creates an OpenMM System and Topology from an OpenFF Molecule. Parameters ---------- mol : Molecule - The openFF Molecule instance to convert into an OpenMM system. - env : str, optional - The environment in which the system should be generated. Must be one of 'waterbox', 'vacuum', or 'complex'. Defaults to 'vacuum'. + The OpenFF Molecule instance to convert into an OpenMM system. Returns ------- Tuple[System, Topology] - A tuple containing the generated OpenMM System and Topology instances. - - Raises - ------ - AssertionError - If the environment is not one of 'waterbox', 'vacuum', or 'complex'. + A tuple containing the generated OpenMM System and the corresponding Topology. """ - assert mol.n_conformers > 0 - log.debug("Using openff ...") - log.debug(f"Generating system in {env}") - assert env in ("waterbox", "vacuum", "complex") - ################### - log.debug(f"{env=}") + assert mol.n_conformers > 0, "Molecule must have at least one conformer." + log.debug("Generating OpenMM system from OpenFF molecule.") topology = mol.to_topology() system = forcefield.create_openmm_system(topology) - return (system, topology) + return (system, topology.to_openmm()) + + +def generate_molecule_from_sdf(path: str) -> Molecule: + """ + Generates an OpenFF Molecule instance from an SDF file. + + Parameters + ---------- + path : str + The file path to the SDF file. + + Returns + ------- + Molecule + An OpenFF Molecule instance loaded from the SDF file. + """ + mol = Molecule.from_file(path) + log.info(f"Molecule loaded from SDF file: {path}") + return mol diff --git a/guardowl/simulation.py b/guardowl/simulation.py index 3675cfe..5f4c2b2 100644 --- a/guardowl/simulation.py +++ b/guardowl/simulation.py @@ -76,9 +76,40 @@ def create_simulation( ) +def initialize_ml_system(nnp: str, topology: Topology, implementation: str) -> System: + """ + Initializes a machine learning system with the given neural network potential, + topology, and implementation details. + + Parameters + ---------- + nnp : str + The name or identifier of the neural network potential. + topology : Topology + The topology of the system to be initialized. + implementation : str + The specific implementation to use for the machine learning potential. + + Returns + ------- + system : System + The initialized OpenMM System object. + """ + + from openmmml import MLPotential + + from guardowl.simulation import SystemFactory + + nnp_instance = MLPotential(nnp) + system = SystemFactory().initialize_ml_system( + nnp_instance, topology, implementation=implementation + ) + return system + + class SystemFactory: @staticmethod - def initialize_pure_ml_system( + def initialize_ml_system( potential: Type[MLPotential], topology: Topology, remove_constraints: bool = True, @@ -103,6 +134,11 @@ def initialize_pure_ml_system( System The OpenMM System object. + Examples + -------- + >>> potential = MLPotential + >>> topology = Topology() + >>> system = initialize_pure_ml_system(potential, topology) """ # create system & simulation instance if implementation: diff --git a/guardowl/tests/conftest.py b/guardowl/tests/conftest.py index 80204a6..a149d37 100644 --- a/guardowl/tests/conftest.py +++ b/guardowl/tests/conftest.py @@ -4,14 +4,29 @@ from openff.toolkit.topology import Topology, Molecule from openmm import System -from guardowl.setup import create_system_from_mol, generate_molecule +from guardowl.setup import create_system_from_mol, generate_molecule_from_smiles from guardowl.testsystems import hipen_systems @pytest.fixture(scope="session") -def generate_hipen_system() -> Tuple[System, Topology, Molecule]: +def single_hipen_system() -> Tuple[System, Topology, Molecule]: + """ + Generate a hipen system. + + Returns: + A tuple containing the generated system, topology, and molecule. + """ name = list(hipen_systems.keys())[1] smiles = hipen_systems[name] - mol = generate_molecule(smiles) + mol = generate_molecule_from_smiles(smiles) system, topology = create_system_from_mol(mol) return (system, topology, mol) + + +@pytest.fixture(scope="session") +def tmp_dir(tmpdir_factory): + # Create a temporary directory for the session + temp_dir = tmpdir_factory.mktemp("data") + + # Yield the temporary directory path to the tests + yield temp_dir diff --git a/guardowl/tests/data/156613987/156613987.sdf b/guardowl/tests/data/156613987/156613987.sdf new file mode 100644 index 0000000..f327f60 --- /dev/null +++ b/guardowl/tests/data/156613987/156613987.sdf @@ -0,0 +1,191 @@ +156613987 + OpenBabel11072317473D + + 33 32 0 0 1 0 0 0 0 0999 V2000 + 1.8323 -0.7109 0.5118 O 0 0 0 0 0 0 0 0 0 0 0 0 + -0.2105 1.6824 -1.3758 O 0 0 0 0 0 0 0 0 0 0 0 0 + -0.4475 -0.2620 1.7084 O 0 0 0 0 0 0 0 0 0 0 0 0 + -2.7555 -0.1585 0.0337 O 0 0 0 0 0 0 0 0 0 0 0 0 + 4.3864 0.9264 -0.4882 O 0 0 0 0 0 0 0 0 0 0 0 0 + 1.8539 -2.0695 -1.3564 O 0 0 0 0 0 0 0 0 0 0 0 0 + -4.8587 0.7932 0.1181 O 0 0 0 0 0 0 0 0 0 0 0 0 + 0.0671 0.5865 -0.5056 C 0 0 1 0 0 0 0 0 0 0 0 0 + 1.5905 0.4309 -0.3121 C 0 0 1 0 0 0 0 0 0 0 0 0 + -0.6842 0.8270 0.8178 C 0 0 1 0 0 0 0 0 0 0 0 0 + 2.2234 1.6596 0.3480 C 0 0 0 0 0 0 0 0 0 0 0 0 + -2.1958 0.9965 0.6464 C 0 0 0 0 0 0 0 0 0 0 0 0 + 3.7229 1.7290 0.1592 C 0 0 0 0 0 0 0 0 0 0 0 0 + 1.9424 -1.9003 -0.1478 C 0 0 0 0 0 0 0 0 0 0 0 0 + -4.1066 -0.1267 -0.1735 C 0 0 0 0 0 0 0 0 0 0 0 0 + 2.1929 -3.0013 0.8376 C 0 0 0 0 0 0 0 0 0 0 0 0 + -4.5529 -1.4023 -0.8216 C 0 0 0 0 0 0 0 0 0 0 0 0 + -0.3226 -0.3264 -0.9719 H 0 0 0 0 0 0 0 0 0 0 0 0 + 2.0532 0.3077 -1.3008 H 0 0 0 0 0 0 0 0 0 0 0 0 + -0.3109 1.7394 1.2969 H 0 0 0 0 0 0 0 0 0 0 0 0 + 1.8227 2.5882 -0.0719 H 0 0 0 0 0 0 0 0 0 0 0 0 + 2.0497 1.6562 1.4298 H 0 0 0 0 0 0 0 0 0 0 0 0 + -2.4129 1.8794 0.0344 H 0 0 0 0 0 0 0 0 0 0 0 0 + -2.6609 1.1257 1.6314 H 0 0 0 0 0 0 0 0 0 0 0 0 + 0.2503 1.5127 -2.2151 H 0 0 0 0 0 0 0 0 0 0 0 0 + -0.8913 -0.0575 2.5491 H 0 0 0 0 0 0 0 0 0 0 0 0 + 4.2123 2.5870 0.6520 H 0 0 0 0 0 0 0 0 0 0 0 0 + 3.1263 -2.8131 1.3740 H 0 0 0 0 0 0 0 0 0 0 0 0 + 1.3547 -3.0714 1.5353 H 0 0 0 0 0 0 0 0 0 0 0 0 + 2.2830 -3.9519 0.3042 H 0 0 0 0 0 0 0 0 0 0 0 0 + -4.0566 -1.5198 -1.7882 H 0 0 0 0 0 0 0 0 0 0 0 0 + -4.3279 -2.2483 -0.1675 H 0 0 0 0 0 0 0 0 0 0 0 0 + -5.6336 -1.3667 -0.9867 H 0 0 0 0 0 0 0 0 0 0 0 0 + 1 9 1 0 0 0 0 + 1 14 1 0 0 0 0 + 2 8 1 0 0 0 0 + 2 25 1 0 0 0 0 + 3 10 1 0 0 0 0 + 3 26 1 0 0 0 0 + 4 12 1 0 0 0 0 + 4 15 1 0 0 0 0 + 5 13 2 0 0 0 0 + 6 14 2 0 0 0 0 + 7 15 2 0 0 0 0 + 8 9 1 0 0 0 0 + 8 10 1 0 0 0 0 + 8 18 1 6 0 0 0 + 9 11 1 0 0 0 0 + 9 19 1 6 0 0 0 + 10 12 1 0 0 0 0 + 10 20 1 1 0 0 0 + 11 13 1 0 0 0 0 + 11 21 1 0 0 0 0 + 11 22 1 0 0 0 0 + 12 23 1 0 0 0 0 + 12 24 1 0 0 0 0 + 13 27 1 0 0 0 0 + 14 16 1 0 0 0 0 + 15 17 1 0 0 0 0 + 16 28 1 0 0 0 0 + 16 29 1 0 0 0 0 + 16 30 1 0 0 0 0 + 17 31 1 0 0 0 0 + 17 32 1 0 0 0 0 + 17 33 1 0 0 0 0 +M END +> +DB17192 + +> +drugbank + +> +PUBCHEM + +> +https://pubchem.ncbi.nlm.nih.gov/rest/pug/compound/inchikey/BYPTXUKSDKAPKO-OPRDCNLKSA-N/SDF?record_type=3d + +> +[H][C@@](O)(COC(C)=O)[C@@]([H])(O)[C@@]([H])(CC=O)OC(C)=O + +> +InChI=1S/C10H16O7/c1-6(12)16-5-8(14)10(15)9(3-4-11)17-7(2)13/h4,8-10,14-15H,3,5H2,1-2H3/t8-,9-,10-/m1/s1 + +> +BYPTXUKSDKAPKO-OPRDCNLKSA-N + +> +C10H16O7 + +> +248.231 + +> +248.089602855 + +> +5 + +> +33 + +> +-1.3146369446636756e-06 + +> +23.354368581827025 + +> +1 + +> +2 + +> +0 + +> +0 + +> +(3R,4R,5R)-6-(acetyloxy)-4,5-dihydroxy-1-oxohexan-3-yl acetate + +> +-0.72 + +> +-1.9709512816666668 + +> +-0.46 + +> +0 + +> +0 + +> +0 + +> +0 + +> +14.35808691156093 + +> +12.90518799939591 + +> +-3.531566236525625 + +> +110.13000000000001 + +> +54.3104 + +> +9 + +> +1 + +> +8.64e+01 g/l + +> +altiratinib + +> +0 + +> +DB17192 + +> +investigational + +> +WP-1122 + +> +3,6-di-O-acetyl-2-deoxy-D-glucopyranose + +$$$$ diff --git a/guardowl/tests/data/156613987/156613987.xyz b/guardowl/tests/data/156613987/156613987.xyz new file mode 100644 index 0000000..67f3acb --- /dev/null +++ b/guardowl/tests/data/156613987/156613987.xyz @@ -0,0 +1,35 @@ +33 +156613987 +O 1.83230 -0.71090 0.51180 +O -0.21050 1.68240 -1.37580 +O -0.44750 -0.26200 1.70840 +O -2.75550 -0.15850 0.03370 +O 4.38640 0.92640 -0.48820 +O 1.85390 -2.06950 -1.35640 +O -4.85870 0.79320 0.11810 +C 0.06710 0.58650 -0.50560 +C 1.59050 0.43090 -0.31210 +C -0.68420 0.82700 0.81780 +C 2.22340 1.65960 0.34800 +C -2.19580 0.99650 0.64640 +C 3.72290 1.72900 0.15920 +C 1.94240 -1.90030 -0.14780 +C -4.10660 -0.12670 -0.17350 +C 2.19290 -3.00130 0.83760 +C -4.55290 -1.40230 -0.82160 +H -0.32260 -0.32640 -0.97190 +H 2.05320 0.30770 -1.30080 +H -0.31090 1.73940 1.29690 +H 1.82270 2.58820 -0.07190 +H 2.04970 1.65620 1.42980 +H -2.41290 1.87940 0.03440 +H -2.66090 1.12570 1.63140 +H 0.25030 1.51270 -2.21510 +H -0.89130 -0.05750 2.54910 +H 4.21230 2.58700 0.65200 +H 3.12630 -2.81310 1.37400 +H 1.35470 -3.07140 1.53530 +H 2.28300 -3.95190 0.30420 +H -4.05660 -1.51980 -1.78820 +H -4.32790 -2.24830 -0.16750 +H -5.63360 -1.36670 -0.98670 diff --git a/guardowl/tests/data/156613987/orca_input.inp b/guardowl/tests/data/156613987/orca_input.inp new file mode 100644 index 0000000..ac57b8d --- /dev/null +++ b/guardowl/tests/data/156613987/orca_input.inp @@ -0,0 +1,2 @@ +! RI BP86 def2-SVP def2/J D3BJ TIGHTSCF Opt +* xyzfile 0 1 156613987.xyz diff --git a/guardowl/tests/data/156613987/orca_input.xyz b/guardowl/tests/data/156613987/orca_input.xyz new file mode 100644 index 0000000..a23a866 --- /dev/null +++ b/guardowl/tests/data/156613987/orca_input.xyz @@ -0,0 +1,35 @@ +33 +Coordinates from ORCA-job orca_input + O 1.18702266774795 -0.65798320796024 0.49456422888783 + O 0.11054752230325 2.29403953291484 -1.49279649563007 + O -0.50972310761747 1.14852101405307 1.82611663419241 + O -2.77876044528026 0.21113770444711 -0.37515622485842 + O 4.13324511218764 0.17316740424119 -0.12040402986321 + O 0.41723167072750 -1.78741187495747 -1.32200638853450 + O -1.93523086595974 -1.08031856534254 1.29695456456318 + C -0.05658337810682 1.15700299572259 -0.64236402223814 + C 1.31998704586275 0.56285211502650 -0.27688115859637 + C -0.90305642148798 1.64515327748260 0.56847567295788 + C 2.21061027416841 1.49551116776215 0.52333533823152 + C -2.40802792986414 1.45508438130153 0.28857275654689 + C 3.66473044789974 1.08580981509283 0.53355196460722 + C 0.76143953232462 -1.77022340566304 -0.14857967474482 + C -2.50000534820022 -0.96807544163626 0.21170791683584 + C 0.76343708444517 -2.95633858459973 0.78463025627339 + C -2.89729959666180 -2.13212517973498 -0.65486847672934 + H -0.59525310640049 0.35493846334446 -1.18751596017744 + H 1.82048750447055 0.29221001256787 -1.23241693989311 + H -0.74357852687866 2.74599619576005 0.60751296024301 + H 2.14628511102479 2.51778763853841 0.08477957361294 + H 1.82862401941898 1.58425835310064 1.56312075546376 + H -2.76161663346435 2.22422467400944 -0.42203128636475 + H -2.96866565775235 1.54139006706264 1.24211599798961 + H 0.35963871754954 1.96378557716889 -2.37620682268722 + H -0.87545000170551 0.23117068879972 1.87611135239397 + H 4.32385824847405 1.71075217268249 1.21164545765260 + H 1.80208531816031 -3.18034645405722 1.10071042386211 + H 0.17473283611104 -2.71671365222381 1.69064750352759 + H 0.33311778541096 -3.83366895934089 0.27148519290483 + H -3.73848622601826 -1.87946232341514 -1.32474579474523 + H -2.01118240058036 -2.38773417429539 -1.27546364258101 + H -3.13856125230880 -3.00319142785233 -0.01960163310294 diff --git a/guardowl/tests/test_simulation.py b/guardowl/tests/test_simulation.py index 2dff995..3727459 100644 --- a/guardowl/tests/test_simulation.py +++ b/guardowl/tests/test_simulation.py @@ -1,4 +1,8 @@ +from typing import Literal, Tuple import numpy as np +from openff.toolkit.topology.molecule import Molecule +from openff.toolkit.topology.topology import Topology +from openmm.openmm import System import pytest from openff.units.openmm import to_openmm from openmm import unit @@ -17,13 +21,15 @@ "nnp, e_lamb_0, e_lamb_1", [("ani2x", 295.1235918998718, -2346060.437261855)] ) def test_generate_simulation_instance( - nnp: str, e_lamb_0: float, e_lamb_1: float, generate_hipen_system + nnp: str, + e_lamb_0: float, + e_lamb_1: float, + single_hipen_system: Tuple[System, Topology, Molecule], ) -> None: """Test if we can generate a simulation instance""" # set up system and topology and define ml region - system, topology, mol = generate_hipen_system - topology = topology.to_openmm() + system, topology, mol = single_hipen_system platform = get_fastest_platform() qml = MLPotential(nnp) ######################################################## @@ -49,7 +55,7 @@ def test_generate_simulation_instance( ######################################################## # create ML simulation rsim = SimulationFactory.create_simulation( - SystemFactory().initialize_pure_ml_system( + SystemFactory().initialize_ml_system( qml, topology, ), @@ -77,19 +83,22 @@ def test_generate_simulation_instance( @pytest.mark.parametrize("nnp, implementation", get_available_nnps_and_implementation()) -def test_simulating(nnp: str, implementation: str, generate_hipen_system) -> None: +def test_simulating( + nnp: str, + implementation: str, + single_hipen_system: Tuple[System, Topology, Molecule], +) -> None: """Test if we can run a simulation for a number of steps""" # set up system and topology and define ml region - system, topology, mol = generate_hipen_system + system, topology, mol = single_hipen_system qml = MLPotential(nnp) platform = get_fastest_platform() - topology = topology.to_openmm() ######################################################## # ---------------------------# # generate pure ML simulation qsim = SimulationFactory.create_simulation( - SystemFactory().initialize_pure_ml_system( + SystemFactory().initialize_ml_system( qml, topology, implementation=implementation, @@ -111,7 +120,10 @@ def test_simulating(nnp: str, implementation: str, generate_hipen_system) -> Non @pytest.mark.parametrize( "nnp, implementation", gpu_memory_constrained_nnps_and_implementation ) -def test_pure_liquid_simulation(nnp, implementation): +def test_pure_liquid_simulation( + nnp: tuple[Literal["ani2x"], Literal["torchani"]], + implementation: tuple[Literal["ani2x"], Literal["torchani"]], +): from guardowl.testsystems import PureLiquidTestsystemFactory factory = PureLiquidTestsystemFactory() @@ -124,7 +136,7 @@ def test_pure_liquid_simulation(nnp, implementation): # ---------------------------# # generate pure ML simulation qsim = SimulationFactory.create_simulation( - SystemFactory().initialize_pure_ml_system( + SystemFactory().initialize_ml_system( qml, liquid_box.topology, implementation=implementation, diff --git a/guardowl/tests/test_stability_protocol.py b/guardowl/tests/test_stability_protocol.py index 6d1dc83..a63bb4f 100644 --- a/guardowl/tests/test_stability_protocol.py +++ b/guardowl/tests/test_stability_protocol.py @@ -15,7 +15,6 @@ ) from guardowl.simulation import SystemFactory from guardowl.testsystems import ( - HipenTestsystemFactory, SmallMoleculeTestsystemFactory, WaterboxTestsystemFactory, ) @@ -30,10 +29,10 @@ def test_setup_vacuum_protocol_individual_parts(nnp: str, implementation: str) - platform = get_fastest_platform() name = "ZINC00107550" - testsystem = HipenTestsystemFactory().generate_testsystems(name) + testsystem = SmallMoleculeTestsystemFactory().generate_testsystems_from_name(name) nnp_instance = MLPotential(nnp) - system = SystemFactory().initialize_pure_ml_system( + system = SystemFactory().initialize_ml_system( nnp_instance, testsystem.topology, implementation=implementation, @@ -104,7 +103,7 @@ def test_run_vacuum_protocol(nnp: str, implementation: str) -> None: @pytest.mark.parametrize("ensemble", ["NVE", "NVT", "NpT"]) @pytest.mark.parametrize("nnp, implementation", get_available_nnps_and_implementation()) def test_setup_waterbox_protocol_individual_parts( - ensemble: str, nnp: str, implementation: str + ensemble: str, nnp: str, implementation: str, temperature: int = 300 ) -> None: """Test if we can run a simulation for a number of steps""" @@ -117,14 +116,16 @@ def test_setup_waterbox_protocol_individual_parts( ) nnp_instance = MLPotential(nnp) - system = SystemFactory().initialize_pure_ml_system( + system = SystemFactory().initialize_ml_system( nnp_instance, testsystem.topology, implementation=implementation, ) output_folder = "test_stability_protocol" - log_file_name = f"waterbox_{edge_size}A_{nnp}_{implementation}_{ensemble}" + log_file_name = ( + f"waterbox_{edge_size}A_{nnp}_{implementation}_{ensemble}_{temperature}K" + ) Path(output_folder).mkdir(parents=True, exist_ok=True) stability_test = PropagationProtocol() @@ -143,7 +144,7 @@ def test_setup_waterbox_protocol_individual_parts( params = StabilityTestParameters( protocol_length=5, - temperature=300, + temperature=temperature, ensemble=ensemble, simulated_annealing=False, system=system, @@ -180,7 +181,7 @@ def test_run_waterbox_protocol(ensemble: str, nnp: str, implementation: str) -> output_folder = "test_stability_protocol" run_waterbox_protocol( - 10, + 5, ensemble, nnp, implementation, @@ -269,18 +270,20 @@ def test_DOF_protocol(nnp: str, implementation: str) -> None: # ---------------------------# platform = get_fastest_platform() - testsystem = SmallMoleculeTestsystemFactory().generate_testsystems(name="ethanol") + testsystem = SmallMoleculeTestsystemFactory().generate_testsystems_from_name( + name="ethanol" + ) nnp_instance = MLPotential(nnp) - system = SystemFactory().initialize_pure_ml_system( + system = SystemFactory().initialize_ml_system( nnp_instance, testsystem.topology, implementation=implementation, ) output_folder = "test_stability_protocol" - log_file_name = f"vacuum_{testsystem.testsystem_name}_{nnp}_{implementation}" + log_file_name = f"vacuum_{testsystem.name}_{nnp}_{implementation}" Path(output_folder).mkdir(parents=True, exist_ok=True) stability_test = BondProfileProtocol() @@ -293,4 +296,76 @@ def test_DOF_protocol(nnp: str, implementation: str) -> None: bond=[0, 3], ) - stability_test.perform_bond_scan(params) + stability_test.perform_scan(params) + + +import os + +IN_GITHUB_ACTIONS = os.getenv("GITHUB_ACTIONS") == "true" + + +@pytest.mark.skipif( + IN_GITHUB_ACTIONS, reason="Github Actions does not return the same file order" +) +def test_input_generation_for_minimization_tests(): + from guardowl.utils import ( + _generate_input_for_minimization_test, + _generate_file_list_for_minimization_test, + extract_drugbank_tar_gz, + ) + import numpy as np + + # extract tar.gz data + extract_drugbank_tar_gz() + # read in file names and build Iterators + files = _generate_file_list_for_minimization_test() + # read in first file + (minimized_file, minimized_position), (start_file, start_position) = next( + _generate_input_for_minimization_test(files) + ) + # test if the file base is the same + assert ( + "/".join(minimized_file.split("/")[-6:]) + == "guardowl/data/drugbank/owl/49957/orca_input.xyz" + ) + assert "".join(minimized_file.split("/")[:-1]) == "".join( + start_file.split("/")[:-1] + ) + + assert np.allclose(minimized_position[0], [5.07249404, -0.21016912, -0.0933702]) + + # now shuffel + files = _generate_file_list_for_minimization_test(shuffle=True) + (minimized_file, minimized_position), (start_file, start_position) = next( + _generate_input_for_minimization_test(files) + ) + assert not ( + "/".join(minimized_file.split("/")[-6:]) + == "guardowl/data/drugbank/owl/49957/orca_input.xyz" + ) + assert "".join(minimized_file.split("/")[:-1]) == "".join( + start_file.split("/")[:-1] + ) + # generate mol from sdf file + sdf_file = "".join(start_file.split(".")[0]) + ".sdf" + reference_testsystem = ( + SmallMoleculeTestsystemFactory().generate_testsystems_from_sdf(sdf_file) + ) + # set positions + reference_testsystem.positions = minimized_position + + +@pytest.mark.parametrize("nnp, implementation", get_available_nnps_and_implementation()) +def test_run_detect_minimum(nnp, implementation, tmp_dir): + from guardowl.protocols import run_detect_minimum + + platform = get_fastest_platform() + + run_detect_minimum( + nnp, + implementation, + platform, + tmp_dir, + percentage=0.1, + only_molecules_below_10_heavy_atoms=True, + ) diff --git a/guardowl/tests/test_stability_test.py b/guardowl/tests/test_stability_test.py index 170ca13..448749f 100644 --- a/guardowl/tests/test_stability_test.py +++ b/guardowl/tests/test_stability_test.py @@ -11,3 +11,6 @@ def test_stability_test_imported(): """Sample test, will always pass so long as import statement worked.""" assert "guardowl" in sys.modules + + + \ No newline at end of file diff --git a/guardowl/tests/test_system.py b/guardowl/tests/test_system.py index 64e9df8..89ce446 100644 --- a/guardowl/tests/test_system.py +++ b/guardowl/tests/test_system.py @@ -1,19 +1,54 @@ -from guardowl.setup import create_system_from_mol, generate_molecule import pytest -from guardowl.testsystems import PureLiquidTestsystemFactory, PureLiquidBoxTestSystem +from guardowl.testsystems import ( + PureLiquidTestsystemFactory, + SmallMoleculeTestsystemFactory, +) +from guardowl.utils import get_data_filename + + +def test_generate_small_molecule(tmp_dir) -> None: + """Test if we can generate a small molecule""" + factory = SmallMoleculeTestsystemFactory() + sdf_file = f"{get_data_filename('tests/data/156613987')}/156613987.sdf" + system = factory.generate_testsystems_from_sdf(sdf_file) + from openmm.app import PDBFile + + print(system.topology) # + PDBFile.writeFile( + system.topology, + system.positions, + open(f"{tmp_dir}/tmp1.pdb", "w"), + ) + + import copy + + system_copy = copy.copy(system) + print(system_copy.topology) + top = system_copy.topology + for atom in top.atoms(): + print(atom.index, atom.name, atom.residue.name) -def test_generate_molecule(generate_hipen_system) -> None: + for bond in top.bonds(): + print(bond) + + PDBFile.writeFile( + system.topology, + system_copy.positions, + open(f"{tmp_dir}/tmp2.pdb", "w"), + ) + + +def test_generate_molecule(single_hipen_system) -> None: """Test if we can generate a molecule instance""" - system, top, mol = generate_hipen_system + system, top, mol = single_hipen_system assert mol.n_conformers >= 1 -def test_generate_system_top_instances(generate_hipen_system) -> None: +def test_generate_system_top_instances(single_hipen_system) -> None: """Test if we can generate a system/topology instance""" - system, topology, mol = generate_hipen_system - topology = topology.to_openmm() - + system, topology, mol = single_hipen_system + assert system.getNumParticles() > 0 indices = [atom.index for atom in topology.atoms()] assert len(indices) > 0 @@ -21,11 +56,11 @@ def test_generate_system_top_instances(generate_hipen_system) -> None: @pytest.mark.parametrize( "molecule_name", PureLiquidTestsystemFactory._AVAILABLE_SYSTEM.keys() ) -@pytest.mark.parametrize( - "nr_of_copies",[100,200] -) +@pytest.mark.parametrize("nr_of_copies", [100, 200]) def test_generate_pure_liquids(molecule_name, nr_of_copies) -> None: """ "Test if we can generate a pure liquid""" factory = PureLiquidTestsystemFactory() - factory.generate_testsystems(name=molecule_name, nr_of_copies=nr_of_copies, nr_of_equilibration_steps=1) + factory.generate_testsystems( + name=molecule_name, nr_of_copies=nr_of_copies, nr_of_equilibration_steps=1 + ) diff --git a/guardowl/tests/test_utils.py b/guardowl/tests/test_utils.py new file mode 100644 index 0000000..fb11e14 --- /dev/null +++ b/guardowl/tests/test_utils.py @@ -0,0 +1,17 @@ +def test_input_expension_for_pure_liquids(): + molecule_name = ["ethane", "butane"] + nr_of_molecules = [100, 200, 300] + molecule_name_ = molecule_name * len(nr_of_molecules) + nr_of_molecule_ = [ + element for element in nr_of_molecules for _ in range(len(molecule_name)) + ] + counter = 0 + for i, j in zip(molecule_name_, nr_of_molecule_): + print(i, j) + counter += 1 + assert counter == 6 + + +def test_tar_gz_drugbank_extraction(): + from guardowl.utils import extract_drugbank_tar_gz + extract_drugbank_tar_gz() diff --git a/guardowl/testsystems.py b/guardowl/testsystems.py index 1646067..0b9a51d 100644 --- a/guardowl/testsystems.py +++ b/guardowl/testsystems.py @@ -12,9 +12,9 @@ WaterBox, ) from openmmtools.utils import get_fastest_platform +from openff.toolkit.topology import Molecule, Topology from .constants import collision_rate, stepsize, temperature -from .setup import create_system_from_mol, generate_molecule class PureLiquidBoxTestSystem(TestSystem): @@ -46,12 +46,13 @@ def __init__(self, molecule_name: str, nr_of_copies: int): if n_atoms < 50: edge_length = 10 else: - edge_length = np.round(0.003813 * n_atoms) + 22 + edge_length = np.round(0.002 * n_atoms) + 15 # NOTE: original regression line Y = 0.003813*X + 19.27 log.debug(f"Calculated intial {edge_length} Angstrom for {n_atoms} atoms") success = False # Repeat until sucess is True while not success: increase_packing = 0 + fail_counter = 0 try: log.debug( f"Trying to pack {nr_of_copies} copies of {molecule_name} in box with edge length {edge_length+increase_packing} ..." @@ -65,8 +66,13 @@ def __init__(self, molecule_name: str, nr_of_copies: int): ) success = True except Exception as e: + fail_counter += 1 log.error(f"Packmol failed with the following error: {e}") - increase_packing += 2 + increase_packing += 1 + if fail_counter > 10: + raise RuntimeError( + f"Packmol failed with the following error: {e}" + ) from e log.debug("Packmol has finished ...") sage = ForceField("openff-2.0.0.offxml") @@ -80,25 +86,7 @@ def __init__(self, molecule_name: str, nr_of_copies: int): ) -class BaseMoleculeTestSystem: - """Base class for molecule test systems. - - This class encapsulates the common functionality for creating - molecule-based test systems. - """ - - def __init__(self, name: str, smiles: str): - self.testsystem_name = name - self.smiles = smiles - - mol = generate_molecule(self.smiles) - self.system, topology = create_system_from_mol(mol) - self.topology = topology.to_openmm() - self.positions = to_openmm(mol.conformers[0]) - self.mol = mol - - -class SmallMoleculeVacuumTestSystem(BaseMoleculeTestSystem): +class SmallMoleculeVacuumTestSystem: """Class for small molecule in vacuum test systems. Parameters @@ -109,69 +97,77 @@ class SmallMoleculeVacuumTestSystem(BaseMoleculeTestSystem): SMILES string of the molecule. """ - pass # All functionality is currently in the base class - + def __init__(self, name, system, topology, positions): + self.name = name + self.system = system + self.topology = topology + self.positions = positions -class HipenSystemVacuum(BaseMoleculeTestSystem): - """Class for HiPen molecule in vacuum test systems. - - Parameters - ---------- - zink_id : str - ZINC identifier for the molecule. - smiles : str - SMILES string of the molecule. - """ + def __copy__(self): + from copy import deepcopy - def __init__(self, zink_id: str, smiles: str): - super().__init__(zink_id, smiles) - self.zink_id = zink_id + return SmallMoleculeVacuumTestSystem( + self.name, + deepcopy(self.system), + deepcopy(self.topology), + deepcopy(self.positions), + ) -class HipenTestsystemFactory: - """Factory for generating HiPen test systems. +class SmallMoleculeTestsystemFactory: + """Factory for generating SmallMoleculeVacuum test systems. - This factory class provides methods to generate HiPen test systems. + This factory class provides methods to generate SmallMoleculeVacuum test systems. """ def __init__(self) -> None: - """Factory that returns HipenSystemVacuum""" - self.hipen_systems = hipen_systems - self.name = "hipen_testsystem" + pass - def generate_testsystems(self, name: str) -> HipenSystemVacuum: - """Generate a HiPen test system. + @staticmethod + def generate_testsystems_from_mol( + mol: Molecule, name: Optional[str] = None + ) -> SmallMoleculeVacuumTestSystem: + """Generate a SmallMoleculeVacuum test system. Parameters ---------- + mol : openff.toolkit.topology.Molecule + Molecule to generate test system from. name : str Name of the test system to generate. Returns ------- - HipenSystemVacuum + SmallMoleculeVacuum Generated test system. """ - return HipenSystemVacuum(name, hipen_systems[name]) + from .setup import create_system_from_mol + system, topology = create_system_from_mol(mol) + positions = to_openmm(mol.conformers[0]) + return SmallMoleculeVacuumTestSystem(name, system, topology, positions) -class SmallMoleculeTestsystemFactory: - """Factory for generating SmallMoleculeVacuum test systems. + def generate_testsystem_from_smiles(self, smiles: str, name: Optional[str] = None): + """Generate a SmallMoleculeVacuum test system. - This factory class provides methods to generate SmallMoleculeVacuum test systems. - """ + Parameters + ---------- + smiles : str + SMILES string of the molecule. - def __init__(self) -> None: - """Factory that returns SmallMoleculeTestsystems""" - self._mols = { - "ethanol": "OCC", - "methanol": "OC", - "propanol": "OCC", - "methane": "C", - } - - @lru_cache(maxsize=None) - def generate_testsystems(self, name: str) -> SmallMoleculeVacuumTestSystem: + Returns + ------- + SmallMoleculeVacuum + Generated test system. + """ + from .setup import generate_molecule_from_smiles + + mol = generate_molecule_from_smiles(smiles) + return self.generate_testsystems_from_mol(mol, name) + + def generate_testsystems_from_name( + self, name: str + ) -> SmallMoleculeVacuumTestSystem: """Generate a SmallMoleculeVacuum test system. Parameters @@ -184,12 +180,35 @@ def generate_testsystems(self, name: str) -> SmallMoleculeVacuumTestSystem: SmallMoleculeVacuum Generated test system. """ - if name not in list(self._mols.keys()): + if name in list(standard_test_systems.keys()): + return self.generate_testsystem_from_smiles( + standard_test_systems[name], name + ) + elif name in list(hipen_systems.keys()): + return self.generate_testsystem_from_smiles(hipen_systems[name], name) + else: raise RuntimeError( - f"Molecule is not in the list of available systems: {self._mols.keys()}" + f"Molecule is not in the list of available systems: {hipen_systems.keys()} and {standard_test_systems.keys()}" ) - return SmallMoleculeVacuumTestSystem(name, self._mols[name]) + def generate_testsystems_from_sdf(self, path: str) -> SmallMoleculeVacuumTestSystem: + """Generate a SmallMoleculeVacuum test system. + + Parameters + ---------- + path : str + Path to the sdf file. + + Returns + ------- + SmallMoleculeVacuum + Generated test system. + """ + from .setup import generate_molecule_from_sdf + + log.debug(f"Generating test system from {path}") + mol = generate_molecule_from_sdf(path) + return self.generate_testsystems_from_mol(mol) class LiquidTestsystemFactory: @@ -226,9 +245,8 @@ def _run_equilibration( sim.context.setPositions(testsystem.positions) sim.step(nr_of_steps) state = sim.context.getState(getPositions=True) - testsystem.positions = ( - state.getPositions() - ) # pylint: disable=unexpected-keyword-arg + testsystem.positions = state.getPositions() + testsystem.box_vectors = state.getPeriodicBoxVectors() return testsystem @@ -402,3 +420,24 @@ def generate_testsystems(self, name: str) -> TestSystem: "ZINC06568023": r"O=C(NNC(=O)c1ccccc1)c1ccccc1", # hipen 21 "ZINC33381936": r"O=S(=O)(O/N=C1/CCc2ccccc21)c1ccc(Cl)cc1", # hipen 22 } + +standard_test_systems = { + "ethanol": "CCO", + "methanol": "CO", + "methane": "C", + "propane": "CCC", + "butane": "CCCC", + "pentane": "CCCCC", + "hexane": "CCCCCC", + "cylohexane": "C1CCCCC1", + "isobutane": "CC(C)C", + "isopentane": "CCC(C)C", + "propanol": "CCCO", + "acetylacetone": "CC(=O)CC(=O)C", + "acetone": "CC(=O)C", + "acetamide": "CC(=O)N", + "acetonitrile": "CC#N", + "aceticacid": "CC(=O)O", + "acetaldehyde": "CC=O", + "benzene": "c1ccccc1", +} diff --git a/guardowl/utils.py b/guardowl/utils.py index 30a3a04..e482a72 100644 --- a/guardowl/utils.py +++ b/guardowl/utils.py @@ -1,5 +1,8 @@ import os +from typing import Tuple, List, Optional, Dict, Iterator +from loguru import logger as log + available_nnps_and_implementation = [ ("ani2x", "nnpops"), ("ani2x", "torchani"), @@ -13,6 +16,7 @@ ("ani2x", "torchani"), ] + def get_available_nnps_and_implementation() -> list: """Return a list of available neural network potentials and implementations""" IN_GITHUB_ACTIONS = os.getenv("GITHUB_ACTIONS") == "true" @@ -62,3 +66,100 @@ def _set_loglevel(level="WARNING"): logger.remove() # Remove all handlers added so far, including the default one. logger.add(sys.stderr, level=level) + + +def extract_drugbank_tar_gz(): + """ + Extracts a .tar.gz archive to the specified path. + """ + import tarfile + import importlib.resources as pkg_resources + + # This assumes 'my_package.data' is the package and 'filename' is the file in the 'data' directory + with pkg_resources.path("guardowl.data", "drugbank.tar.gz") as DATA_DIR: + with pkg_resources.path("guardowl.data", "drugbank") as extract_path: + log.info(f"{DATA_DIR=}") + log.info(f"{extract_path=}") + # Check if the extraction is already done + if not os.path.exists(extract_path): + with tarfile.open(DATA_DIR, "r:gz") as tar: + tar.extractall(path=extract_path) + log.debug(f"Extracted {DATA_DIR} to {extract_path}") + else: + log.debug(f"Extraction already done") + + +def _generate_file_list_for_minimization_test( + dir_name: str = "drugbank", shuffle: bool = False +) -> Dict[str, List]: + import os + + import importlib.resources as pkg_resources + import numpy as np + + # This assumes 'my_package.data' is the package and 'filename' is the file in the 'data' directory + with pkg_resources.path("guardowl.data", f"{dir_name}") as DATA_DIR: + log.info("Reading in data for minimzation test") + log.debug(f"Reading from {DATA_DIR}") + # read in all directories in DATA_DIR + directories = [x[0] for x in os.walk(DATA_DIR)] + if shuffle: + np.random.shuffle(directories) + # read in all xyz files in directories + minimized_xyz_files = [] + start_xyz_files = [] + sdf_files = [] + for directory in directories: + all_files = os.listdir(directory) + # test if there is a xyz, orca and sdf file in the list and only then continue + test_orca = any("orca" in file for file in all_files) + test_xyz = any("xyz" in file for file in all_files) + test_sdf = any("sdf" in file for file in all_files) + if (test_orca and test_xyz and test_sdf) == False: + log.debug(f"Skipping {directory}") + continue + + for file in all_files: + if file.endswith(".xyz"): + if file.startswith("orca"): + minimized_xyz_files.append(os.path.join(directory, file)) + else: + start_xyz_files.append(os.path.join(directory, file)) + sdf_files.append( + os.path.join(directory, file.replace(".xyz", ".sdf")) + ) + + return { + "minimized_xyz_files": minimized_xyz_files, + "unminimized_xyz_files": start_xyz_files, + "sdf_files": sdf_files, + "directories": directories, + "total_number_of_systems": len(minimized_xyz_files), + } + + +def _generate_input_for_minimization_test( + files: Dict[str, List] +) -> Iterator[Tuple[Tuple[str, List], Tuple[str, List]]]: + # read in coordinates from xyz files + import numpy as np + from openmm import unit + + def read_positions(files) -> Iterator[Tuple[str, List]]: + for file in files: + with open(file, "r") as f: + lines = f.readlines() + positions = unit.Quantity( + np.array( + [[float(x) for x in line.split()[1:]] for line in lines[2:]] + ), + unit.angstrom, + ) + yield file, positions + + minimized_xyz_files = files["minimized_xyz_files"] + start_xyz_files = files["unminimized_xyz_files"] + minimized_positions = read_positions(minimized_xyz_files) + start_positions = read_positions(start_xyz_files) + + return zip(minimized_positions, start_positions) diff --git a/guardowl/vis.py b/guardowl/vis.py index 3297717..35091ea 100644 --- a/guardowl/vis.py +++ b/guardowl/vis.py @@ -20,6 +20,7 @@ class MonitoringPlotter: """ def __init__(self, traj_file: str, top_file: str, data_file: str) -> None: + self.data_file = data_file self.canvas = widgets.Output() self.md_traj_instance = md.load(traj_file, top=top_file) self.x_label_names = ['#"Step"', "Time (ps)", "bond distance [A]"] @@ -122,11 +123,18 @@ def generate_summary( """ # generate x axis labels - if '#"Step"' in self.data.keys(): - frames = [idx for idx, _ in enumerate(self.data['#"Step"'])] - elif "bond distance [A]" in self.data.keys(): - # frames = self.data["bond distance [A]"] - frames = [idx for idx, _ in enumerate(self.data["bond distance [A]"])] + nr_of_frames = -1 + try: + nr_of_frames = len(self.data['#"Step"']) + except KeyError as e: + log.debug(e) + try: + nr_of_frames = len(self.data["bond distance [A]"]) + except KeyError as e: + log.debug(e) + + assert nr_of_frames > 0, f"No frames found in data file: {self.data_file}" + frames = [idx for idx in range(nr_of_frames)] labels, observable_data = self._generate_report_data(bonded_scan=bonded_scan) label_to_data_map = {labels[i]: observable_data[i] for i in range(len(labels))} @@ -228,7 +236,7 @@ def generate_summary( verticalalignment="top", bbox=props, ) - except KeyError as e: + except (KeyError, AttributeError) as e: log.debug(e) try: diff --git a/scripts/alanine.yaml b/scripts/alanine.yaml new file mode 100644 index 0000000..b8e6be0 --- /dev/null +++ b/scripts/alanine.yaml @@ -0,0 +1,10 @@ +# config.yaml +tests: + - protocol: "perform_alanine_dipeptide_protocol" + env: "solution" + ensemble: "npt" + nnp: "ani2x" + implementation: "torchani" + annealing: false + nr_of_simulation_steps: 500_000 + temperature: 300 diff --git a/scripts/perform_stability_tests.py b/scripts/perform_stability_tests.py index 2a28d10..96fac93 100644 --- a/scripts/perform_stability_tests.py +++ b/scripts/perform_stability_tests.py @@ -120,6 +120,8 @@ def _setup_logging(): if __name__ == "__main__": import typer import warnings + import torch + torch._C._jit_set_nvfuser_enabled(False) _setup_logging() with warnings.catch_warnings(): diff --git a/scripts/test_config.yaml b/scripts/test_config.yaml index a22d960..cfdafc4 100644 --- a/scripts/test_config.yaml +++ b/scripts/test_config.yaml @@ -6,7 +6,7 @@ tests: nnp: "ani2x" implementation: "torchani" annealing: false - nr_of_simulation_steps: 50_000 + nr_of_simulation_steps: 50 temperature: 300 - protocol: "alanine_dipeptide_protocol" @@ -15,49 +15,41 @@ tests: nnp: "ani2x" implementation: "torchani" annealing: false - nr_of_simulation_steps: 50_000 + nr_of_simulation_steps: 5 temperature: 300 - protocol: "hipen_protocol" - hipen_idx: - [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20] + hipen_idx: [0, 1, 15] nnp: "ani2x" implementation: "torchani" - temperature: 300 - nr_of_simulation_steps: 50_000 - - - protocol: "hipen_protocol" - hipen_idx: 1 - nnp: "ani2x" - implementation: "nnpops" temperature: [300, 400, 500] - nr_of_simulation_steps: 1_000_000 + nr_of_simulation_steps: 50 - protocol: "waterbox_protocol" edge_length: 30 - ensemble: "NVT" + ensemble: "npt" nnp: "ani2x" - implementation: "nnpops" + implementation: "torchani" annealing: false - nr_of_simulation_steps: 500_000 + nr_of_simulation_steps: 50 temperature: 300 - protocol: "waterbox_protocol" edge_length: 30 ensemble: "npt" nnp: "ani2x" - implementation: "nnpops" + implementation: "torchani" annealing: false - nr_of_simulation_steps: 500_000 - temperature: 300 + nr_of_simulation_steps: 50 + temperature: 500 - protocol: "pure_liquid_protocol" - molecule_name: ["ethane", "butane"] - nr_of_molecule: [100, 200] + molecule_name: ["ethane", "butane", "methanol"] + nr_of_molecule: [100, 200, 300] ensemble: "npt" nnp: "ani2x" implementation: "torchani" annealing: false - nr_of_simulation_steps: 500_000 - nr_of_equilibration_steps: 5_000 + nr_of_simulation_steps: 50 + nr_of_equilibration_steps: 50 temperature: 300