Source code for CDTK.Dynamics.Trajectory

# *  **************************************************************************
# *
# *  CDTK, Chemical Dynamics Toolkit
# *  A modular system for chemical dynamics applications and more
# *
# *  Copyright (C) 2011, 2012, 2013, 2014, 2015, 2016
# *  Oriol Vendrell, DESY, <oriol.vendrell@desy.de>
# *
#*  Copyright (C) 2017, 2018, 2019
# *  Ralph Welsch, DESY, <ralph.welsch@desy.de>
# *
#*  Copyright (C) 2020, 2021, 2022, 2023
#*  Ludger Inhester, DESY, ludger.inhester@cfel.de
#*
# *  This file is part of CDTK.
# *
# *  CDTK is free software: you can redistribute it and/or modify
# *  it under the terms of the GNU General Public License as published by
# *  the Free Software Foundation, either version 3 of the License, or
# *  (at your option) any later version.
# *
# *  This program is distributed in the hope that it will be useful,
# *  but WITHOUT ANY WARRANTY; without even the implied warranty of
# *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# *  GNU General Public License for more details.
# *
# *  You should have received a copy of the GNU General Public License
# *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
# *
# *  **************************************************************************
#! /usr/bin/env python

import os
import datetime
import uuid
import math as mt

import numpy as np

IDLEN = 8
TINY = 1.0e-8


[docs] class Trajectory(object): """ Born-Oppenheimer trajectory on a single PES self.NDof -- int, Number of degrees of freedom self.R -- np.array, positions self.V -- np.array, velocities self.M -- np.array float, masses self.dt -- float, time-step for nuclear displacements self.rseed -- int, default None, random seed to be set before 1st step self.NAME -- Identification string of this trajectory. It is uniquely generated at the initialization self.f_Egrad -- function, provides energy and gradient on PES of state S e,g = f(R,S) self.ES -- electronic state for large hop electron dynamics self.MM -- MM region instance """ def __init__(self, NDof=1, **opts): self.NDof = NDof # How many degrees of freedom self.R = np.zeros(self.NDof, float) # Pos self.V = np.zeros(self.NDof, float) # Vel self.M = np.ones(self.NDof, float) # Mass self.dt = opts.get('dt', 10.0) # Time-step (au) self.rseed = opts.get('rseed', None) # Random seed self.t = 0.0 # Current time self.S = 0 # Electronic state # whether overall translation should be removed self.rcom = False # whether overall rotation should be removed self.rrot = False # whether dynamics should be calculated self.dyn = True self.dE_thresh = None self.f_EGrad = opts.get('f_EGrad', None) self._ID = uuid.uuid4().hex[0:IDLEN] # unique ID self._CTIME = datetime.datetime.today() # creation date/time self._BNAME = 'trj_%4i%02i%02i_' % (self._CTIME.year, self._CTIME.month, self._CTIME.day) self.NAME = self._BNAME + self._ID # Used to name output files, etc. self.DIR = self.NAME # directory where to log stuff self._stepnum = 0 self.ES = None # electronic state object for Monte-Carlo electron dynamics self.thermostat = None # thermostat self.atomlist = None # List with atom labels self.MM = None # MM region (MolecularMechanics object) self.log_MOE = opts.get('log_MOE', False) # log molecular orbital energies self.getPartial = None # partial charge calculation self.constraints = False self.ekin = 0
[docs] def set_constraints(self, f_constraints): self.constraints = True self.getConstraintsCorrection = f_constraints
[docs] def restart(self): """ Function to restart trajectory. Reads in all important values from log-files. """ if type(1) == type(self.rseed): np.random.seed(self.rseed) self.logR = [] # List of positions along trajectory self.logV = [] # List of velocities along trajectory self.logt = [] # List of times along the simulation self.logKE = [] # List of Total Energy along trajectory self.logPE = [] # List of Total Energy along trajectory self.logWarning = [] # List of warning messages if self.getPartial is not None: self.logPartial = [] # Log partial charges # get R, V, t, S, stepnum with open(self.DIR + '/R.log', 'r') as posfile: for line in posfile: self._stepnum += 1 vals = line.split() self.logt.append(float(vals[0])) self.logR.append(np.array(list(map(float, vals[1:])))) with open(self.DIR + '/V.log', 'r') as velfile: for line in velfile: vals = line.split() self.logV.append(np.array(list(map(float, vals[1:])))) with open(self.DIR + '/E.log', 'r') as efile: for line in efile: vals = line.split() self.logKE.append(float(vals[1])) self.logPE.append(float(vals[2])) if self.getPartial is not None: with open(self.DIR + '/partial.log','r') as partialfile: for line in partialfile: vals = line.split() self.logPartial.append(np.array( list(map(float, vals[1:])) )) self.t = self.logt[-1] self.R = self.logR[-1] self.V = self.logV[-1] self.ekin = self.kinetic_energy() if self.getPartial is not None: self.partialCharges = self.logPartial[-1] self._pot, self._grd = self.f_EGrad(self.R, self.S, self.t) # get initial POT, GRAD
[docs] def integrate(self, tfinal=None, **opts): """ Integrate trajectory until tfinal tfinal -- final time to be reached by the integrator in steps of dt. **opts: dt -- delta time used in this segment. It is not a good idea to change this when continuing from a previous integration """ self.dt = opts.get('dt', self.dt) if tfinal is None: tfinal = self.t + self.dt if self.f_EGrad is None: raise ValueError('self.f_EGrad is not defined, cannot integrate') if self._stepnum == 0: if type(1) == type(self.rseed): np.random.seed(self.rseed) # Initialization self._pot, self._grd = self.f_EGrad(self.R, self.S, self.t) # get initial POT, GRAD self.ekin = self.kinetic_energy() # Start logging self.init_log() while self.t < tfinal: self._step() self._stepnum += 1 self.t = self.t + self.dt self.log() # TODO no MM step here # Additional MM steps in between if self.MM is not None: if self.MM.dt < self.dt: while self.MM.t <= self.t + self.dt: self._mm_step() #self._mm_stepnum += 1 self.MM.t = self.MM.t + self.MM.dt self.log() elif self.MM.dt == self.dt: self._mm_step() # TODO only if separated self.MM.t = self.MM.t + self.MM.dt else: raise ValueError('Check QM and MM region time steps for compatibility.') # Check for energy conservation if self.dE_thresh is not None: if abs((self.logKE[-1] + self.logPE[-1] - self.logKE[0] - self.logPE[0]) / (self.logKE[0] + self.logPE[0])) > self.dE_thresh: raise RuntimeError('Energy is not conserved. I will write the logfiles and exit')
def _step(self, **opts): """ Do a classical integration step """ self._classical_step() if self.getPartial is not None: self.partialCharges = self.getPartial() def _mm_step(self, **opts): """ Do a classical integration step for the MM region """ # TODO Update QM coords in MM # TODO Update gradients here and pass to MM._step # TODO Distinguish f_Egrad_QM_to_MM # e,g = self.f_EGrad_QM_to_MM( np.append(self.R, self.MM.x_R, self.S, self.t )) self.MM._step(**opts) def _classical_step(self, **opts): """ Advance classical coordinates with a velocity Verlet step. """ m = self.M x0 = self.R v0 = self.V g0 = self._grd dt = self.dt # apply thermostat if self.thermostat != None: v0 = self.thermostat.apply(v0, self.t - dt, self.t, 0) # for now set MM region gradients to zero if self.MM: g0[3*self.MM.natoms_qm:] = 0. # MM -> QM gradient (electrostatic covered already) if self.MM: # update QM coords in MM region self._update_MM_region() g0[:3*self.MM.natoms_qm] += self.MM.grad_mm_qm # Classical step (Vel. Verlet) v1 = v0 - 0.5*dt*g0/m # v(t) -> v(t+dt/2) if self.constraints: x1 = x0 + v1*dt v1 += self.getConstraintsCorrection(x0, x1, dt, case="pos") v1 = self._remove_com_motion(x0, v1, m, self.rcom, self.rrot) # include ionization dynamics steps here if self.ES and self.ES.state != 'doNotRun': hopped = self.ES.step(dt) x1 = x0 + v1*dt # x(t) -> x(t+dt) # apply thermostat if self.thermostat != None: v1 = self.thermostat.apply(v1, self.t - dt, self.t, 1) e1, g1 = self.f_EGrad(x1, self.S, self.t) # for now set MM region gradients to zero if self.MM: g1[3*self.MM.natoms_qm:] = 0. v1 = v1 - 0.5*dt*g1/m # v(t+dt/2) -> v(t+dt) # the kinetic energy is calculated before applying velocity constraints. # this makes a smoother transition based on gromacs data. self.V = v1 self.ekin = self.kinetic_energy() # apply constraints if self.constraints: v1 += self.getConstraintsCorrection(x1, v1, dt, case="vel") # apply thermostat if self.thermostat != None: v1 = self.thermostat.apply(v1, self.t - dt, self.t, 2) v1 = self._remove_com_motion(x1, v1, m, self.rcom, self.rrot) # if self.dyn == True and (self.ES == None): self._grd = g1 # save gradient for next step self._pot = e1 self.R = x1 self.V = v1 def _update_MM_region(self): """ Update the QM coordinates in the MM region """ self.MM.qm_R = self.R[:3*self.MM.natoms_qm] self.MM.qm_V = self.V[:3*self.MM.natoms_qm] self.MM.update_grad_mm_qm() self.MM.update_grad_mm_mm() def _remove_com_motion(self, r, v, m, rcom, rrot): """ TODO: The inertia tensor is currently regularized with 10^-10*Id-Matrix! This function removes the center of mass motion by setting the center of mass velocity to zero after every velocity step in velocity verlet. Input: r -- 3N array of positions in bohr v -- 3N array of velocities in au m -- 3N array of masses in au rcom -- Whether center of mass motion should be removed rrot -- Whether center of mass rotation should be removed Output: v -- 3N array of new velocities without center of mass motion/rotation in au """ if rcom == False: return v # n = number of atoms n = len(m) // 3 # center of mass velocity v_com = np.sum(v.reshape(n, 3) * m.reshape(n, 3), axis=0) / np.sum(m) * 3.0 v_new = np.zeros(n*3) # remove center of mass velocity for each atom for i in range(n): v_new[i*3:(i+1)*3] = v[i*3:(i+1)*3] - v_com if rrot == False: return v_new # calculate overall rotation with respect to center of mass and substract # L is overall angular momentum L = np.zeros(3) # Theta is inertia tensoer Theta = np.zeros((3, 3)) # calculate center of mass com = np.sum(r.reshape(n, 3) * m.reshape(n, 3), axis=0) / np.sum(m) * 3.0 for i in range(n): # calculate particle position relative to center of mass r_com = r[i*3:(i+1)*3] - com # calculate overall angular momentum L += (m[i*3] * np.cross(r_com, v_new[i*3:(i+1)*3])) # calculate inertia tensor Theta += m[i*3] * (np.dot(r_com, r_com) * np.identity(3) - np.outer(r_com, r_com)) # regularize Theta! Theta += 1e-10 * np.identity(3) invTheta = np.linalg.inv(Theta) for i in range(n): r_com = r[i*3:(i+1)*3] - com # v_p = v_p - Th^-1 * L (X) r_p try: v_new[i*3:(i+1)*3] -= np.cross(np.dot(invTheta, L), r_com) except np.linalg.linalg.LinAlgError: return v_new return v_new
[docs] def kinetic_energy(self, step=None, atomlist=None): """ Calculates the kinetic energy for a certain timestep and a given set of atoms. Input: step --- timestep for which kinetic energy should be calculated given as integer. default is None and gives the current timestep atomlist --- list of indices for the atoms to be included, default is None and gives all. Output: kin --- kinetic energy """ kin = 0.0 # current velocities and mass v = self.V m = self.M # velocities at step if given if step != None: v = self.logV[step] # if only subset of atoms should be included if atomlist != None: for i in atomlist: kin += 0.5*np.dot(v[i*3:(i+1)*3] * m[i*3:(i+1)*3], v[i*3:(i+1)*3]) else: kin = 0.5*np.dot(v*m, v) return kin
[docs] def getCOM(self, atomlist, step=0): """ This function returns the center of mass of a set of atoms given by their index. Input: atomlist --- list of indices for the atoms Output: center of mass """ totM = 0.0 xsum = np.zeros(3) for i in atomlist: totM += self.M[i*3] xsum += self.M[i*3] * self.logR[step][i*3:(i+1)*3] return xsum / totM
[docs] def init_log(self): self.logR = [self.R, ] # List of positions along trajectory self.logV = [self.V, ] # List of velocities along trajectory self.logt = [self.t, ] # List of times along the simulation self.logKE = [self.ekin, ] # List of Total Energy along trajectory self.logPE = [self._pot] # List of Total Energy along trajectory self.logWarning = [] # List of warning messages self.logPartial = [] if self.log_MOE: self.logMOE = [self.f_MOE(), ]
[docs] def log(self): self.logR.append(self.R) self.logV.append(self.V) self.logt.append(self.t) self.logKE.append(self.ekin) self.logPE.append(self._pot) if self.getPartial is not None: self.logPartial.append(self.partialCharges) if self.log_MOE: self.logMOE.append(self.f_MOE())
[docs] def log_to_file(self, **opts): """ Write all log information to a set of files """ if not os.path.exists(self.DIR): os.mkdir(self.DIR) self._R_to_file() # positions self._V_to_file() # velocities self._E_to_file() # kinetic, potential and total energies if self.ES is not None and self.ES.state != 'doNotRun': self.ES.write_logs() self._Warning_to_file() # Warning messages if self.log_MOE: self._MOE_to_file() # molecular orbital energies if self.getPartial is not None: self._partial_to_file() self._seed_to_file()
def _issue_warning(self, msg): """ Write warning message to warning log-list """ strng = '%8i %7.4f %s\n' % (self._stepnum, self.t, msg.rstrip()) self.logWarning.append(strng) def _MOE_to_file(self): f = open(self.DIR + '/MOE.log', 'w') for i, MOE in enumerate(self.logMOE): f.write('%9.3f' % (self.logt[i],)) for moe in MOE: f.write(' %12.6f' % (moe,)) f.write('\n') f.flush() f.close() def _seed_to_file(self): f = open(self.DIR + '/seed.log', 'w') if self.rseed is not None: f.write("%d \n" % self.rseed) f.close() def _R_to_file(self): f = open(self.DIR + '/R.log', 'w') for i, R in enumerate(self.logR): f.write('%9.3f' % (self.logt[i],)) for r in R: f.write(' %12.6f' % (r,)) f.write('\n') f.flush() f.close() def _V_to_file(self): f = open(self.DIR + '/V.log', 'w') for i, V in enumerate(self.logV): f.write('%9.3f' % (self.logt[i],)) for v in V: f.write(' %12.6f' % (v,)) f.write('\n') f.flush() f.close() def _E_to_file(self): f = open(self.DIR + '/E.log', 'w') for i in range(len(self.logt)): f.write('%9.3f %12.6f %12.6f %12.6f\n' % (self.logt[i], self.logKE[i], self.logPE[i], self.logKE[i] + self.logPE[i]) ) f.flush() f.close() def _partial_to_file(self): with open(self.DIR + '/partial.log','w') as f: for i, ps in enumerate(self.logPartial): f.write( '%11.4f ' % ((self.logt[i]), ) ) for p in ps: f.write(' %13.8f' % (p) ) f.write('\n') f.flush() f.close() def _Warning_to_file(self): f = open(self.DIR + '/Warnings.log', 'w') for msg in self.logWarning: f.write(msg.rstrip() + '\n') f.flush() f.close()
# ------------------------------------------------------------------------------------- # Test of a single trajectory # ------------------------------------------------------------------------------------- # if __name__ == "__main__": # pass