# * **************************************************************************
# *
# * CDTK, Chemical Dynamics Toolkit
# * A modular system for chemical dynamics applications and more
# *
# * Copyright (C) 2011, 2012, 2013, 2014, 2015, 2016
# * Oriol Vendrell, DESY, <oriol.vendrell@desy.de>
# *
#* Copyright (C) 2017, 2018, 2019
# * Ralph Welsch, DESY, <ralph.welsch@desy.de>
# *
#* Copyright (C) 2020, 2021, 2022, 2023
#* Ludger Inhester, DESY, ludger.inhester@cfel.de
#*
# * This file is part of CDTK.
# *
# * CDTK is free software: you can redistribute it and/or modify
# * it under the terms of the GNU General Public License as published by
# * the Free Software Foundation, either version 3 of the License, or
# * (at your option) any later version.
# *
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# * GNU General Public License for more details.
# *
# * You should have received a copy of the GNU General Public License
# * along with this program. If not, see <http://www.gnu.org/licenses/>.
# *
# * **************************************************************************
#! /usr/bin/env python
import os
import datetime
import uuid
import math as mt
import numpy as np
IDLEN = 8
TINY = 1.0e-8
[docs]
class Trajectory(object):
"""
Born-Oppenheimer trajectory on a single PES
self.NDof -- int, Number of degrees of freedom
self.R -- np.array, positions
self.V -- np.array, velocities
self.M -- np.array float, masses
self.dt -- float, time-step for nuclear displacements
self.rseed -- int, default None, random seed to be set before 1st step
self.NAME -- Identification string of this trajectory. It is uniquely
generated at the initialization
self.f_Egrad -- function, provides energy and gradient on PES of state S
e,g = f(R,S)
self.ES -- electronic state for large hop electron dynamics
self.MM -- MM region instance
"""
def __init__(self, NDof=1, **opts):
self.NDof = NDof # How many degrees of freedom
self.R = np.zeros(self.NDof, float) # Pos
self.V = np.zeros(self.NDof, float) # Vel
self.M = np.ones(self.NDof, float) # Mass
self.dt = opts.get('dt', 10.0) # Time-step (au)
self.rseed = opts.get('rseed', None) # Random seed
self.t = 0.0 # Current time
self.S = 0 # Electronic state
# whether overall translation should be removed
self.rcom = False
# whether overall rotation should be removed
self.rrot = False
# whether dynamics should be calculated
self.dyn = True
self.dE_thresh = None
self.f_EGrad = opts.get('f_EGrad', None)
self._ID = uuid.uuid4().hex[0:IDLEN] # unique ID
self._CTIME = datetime.datetime.today() # creation date/time
self._BNAME = 'trj_%4i%02i%02i_' % (self._CTIME.year, self._CTIME.month, self._CTIME.day)
self.NAME = self._BNAME + self._ID # Used to name output files, etc.
self.DIR = self.NAME # directory where to log stuff
self._stepnum = 0
self.ES = None # electronic state object for Monte-Carlo electron dynamics
self.thermostat = None # thermostat
self.atomlist = None # List with atom labels
self.MM = None # MM region (MolecularMechanics object)
self.log_MOE = opts.get('log_MOE', False) # log molecular orbital energies
self.getPartial = None # partial charge calculation
self.constraints = False
self.ekin = 0
[docs]
def set_constraints(self, f_constraints):
self.constraints = True
self.getConstraintsCorrection = f_constraints
[docs]
def restart(self):
"""
Function to restart trajectory. Reads in all important values from log-files.
"""
if type(1) == type(self.rseed):
np.random.seed(self.rseed)
self.logR = [] # List of positions along trajectory
self.logV = [] # List of velocities along trajectory
self.logt = [] # List of times along the simulation
self.logKE = [] # List of Total Energy along trajectory
self.logPE = [] # List of Total Energy along trajectory
self.logWarning = [] # List of warning messages
if self.getPartial is not None:
self.logPartial = [] # Log partial charges
# get R, V, t, S, stepnum
with open(self.DIR + '/R.log', 'r') as posfile:
for line in posfile:
self._stepnum += 1
vals = line.split()
self.logt.append(float(vals[0]))
self.logR.append(np.array(list(map(float, vals[1:]))))
with open(self.DIR + '/V.log', 'r') as velfile:
for line in velfile:
vals = line.split()
self.logV.append(np.array(list(map(float, vals[1:]))))
with open(self.DIR + '/E.log', 'r') as efile:
for line in efile:
vals = line.split()
self.logKE.append(float(vals[1]))
self.logPE.append(float(vals[2]))
if self.getPartial is not None:
with open(self.DIR + '/partial.log','r') as partialfile:
for line in partialfile:
vals = line.split()
self.logPartial.append(np.array( list(map(float, vals[1:])) ))
self.t = self.logt[-1]
self.R = self.logR[-1]
self.V = self.logV[-1]
self.ekin = self.kinetic_energy()
if self.getPartial is not None:
self.partialCharges = self.logPartial[-1]
self._pot, self._grd = self.f_EGrad(self.R, self.S, self.t) # get initial POT, GRAD
[docs]
def integrate(self, tfinal=None, **opts):
"""
Integrate trajectory until tfinal
tfinal -- final time to be reached by the integrator in steps of dt.
**opts:
dt -- delta time used in this segment.
It is not a good idea to change this when continuing from a
previous integration
"""
self.dt = opts.get('dt', self.dt)
if tfinal is None:
tfinal = self.t + self.dt
if self.f_EGrad is None:
raise ValueError('self.f_EGrad is not defined, cannot integrate')
if self._stepnum == 0:
if type(1) == type(self.rseed):
np.random.seed(self.rseed)
# Initialization
self._pot, self._grd = self.f_EGrad(self.R, self.S, self.t) # get initial POT, GRAD
self.ekin = self.kinetic_energy()
# Start logging
self.init_log()
while self.t < tfinal:
self._step()
self._stepnum += 1
self.t = self.t + self.dt
self.log()
# TODO no MM step here
# Additional MM steps in between
if self.MM is not None:
if self.MM.dt < self.dt:
while self.MM.t <= self.t + self.dt:
self._mm_step()
#self._mm_stepnum += 1
self.MM.t = self.MM.t + self.MM.dt
self.log()
elif self.MM.dt == self.dt:
self._mm_step() # TODO only if separated
self.MM.t = self.MM.t + self.MM.dt
else:
raise ValueError('Check QM and MM region time steps for compatibility.')
# Check for energy conservation
if self.dE_thresh is not None:
if abs((self.logKE[-1] + self.logPE[-1] - self.logKE[0] - self.logPE[0]) /
(self.logKE[0] + self.logPE[0])) > self.dE_thresh:
raise RuntimeError('Energy is not conserved. I will write the logfiles and exit')
def _step(self, **opts):
"""
Do a classical integration step
"""
self._classical_step()
if self.getPartial is not None:
self.partialCharges = self.getPartial()
def _mm_step(self, **opts):
"""
Do a classical integration step for the MM region
"""
# TODO Update QM coords in MM
# TODO Update gradients here and pass to MM._step
# TODO Distinguish f_Egrad_QM_to_MM
# e,g = self.f_EGrad_QM_to_MM( np.append(self.R, self.MM.x_R, self.S, self.t ))
self.MM._step(**opts)
def _classical_step(self, **opts):
"""
Advance classical coordinates with a velocity Verlet step.
"""
m = self.M
x0 = self.R
v0 = self.V
g0 = self._grd
dt = self.dt
# apply thermostat
if self.thermostat != None:
v0 = self.thermostat.apply(v0, self.t - dt, self.t, 0)
# for now set MM region gradients to zero
if self.MM:
g0[3*self.MM.natoms_qm:] = 0.
# MM -> QM gradient (electrostatic covered already)
if self.MM:
# update QM coords in MM region
self._update_MM_region()
g0[:3*self.MM.natoms_qm] += self.MM.grad_mm_qm
# Classical step (Vel. Verlet)
v1 = v0 - 0.5*dt*g0/m # v(t) -> v(t+dt/2)
if self.constraints:
x1 = x0 + v1*dt
v1 += self.getConstraintsCorrection(x0, x1, dt, case="pos")
v1 = self._remove_com_motion(x0, v1, m, self.rcom, self.rrot)
# include ionization dynamics steps here
if self.ES and self.ES.state != 'doNotRun':
hopped = self.ES.step(dt)
x1 = x0 + v1*dt # x(t) -> x(t+dt)
# apply thermostat
if self.thermostat != None:
v1 = self.thermostat.apply(v1, self.t - dt, self.t, 1)
e1, g1 = self.f_EGrad(x1, self.S, self.t)
# for now set MM region gradients to zero
if self.MM:
g1[3*self.MM.natoms_qm:] = 0.
v1 = v1 - 0.5*dt*g1/m # v(t+dt/2) -> v(t+dt)
# the kinetic energy is calculated before applying velocity constraints.
# this makes a smoother transition based on gromacs data.
self.V = v1
self.ekin = self.kinetic_energy()
# apply constraints
if self.constraints:
v1 += self.getConstraintsCorrection(x1, v1, dt, case="vel")
# apply thermostat
if self.thermostat != None:
v1 = self.thermostat.apply(v1, self.t - dt, self.t, 2)
v1 = self._remove_com_motion(x1, v1, m, self.rcom, self.rrot)
# if self.dyn == True and (self.ES == None):
self._grd = g1 # save gradient for next step
self._pot = e1
self.R = x1
self.V = v1
def _update_MM_region(self):
"""
Update the QM coordinates in the MM region
"""
self.MM.qm_R = self.R[:3*self.MM.natoms_qm]
self.MM.qm_V = self.V[:3*self.MM.natoms_qm]
self.MM.update_grad_mm_qm()
self.MM.update_grad_mm_mm()
def _remove_com_motion(self, r, v, m, rcom, rrot):
"""
TODO: The inertia tensor is currently regularized with 10^-10*Id-Matrix!
This function removes the center of mass motion by setting the center of mass velocity to zero after every
velocity step in velocity verlet.
Input:
r -- 3N array of positions in bohr
v -- 3N array of velocities in au
m -- 3N array of masses in au
rcom -- Whether center of mass motion should be removed
rrot -- Whether center of mass rotation should be removed
Output:
v -- 3N array of new velocities without center of mass motion/rotation in au
"""
if rcom == False:
return v
# n = number of atoms
n = len(m) // 3
# center of mass velocity
v_com = np.sum(v.reshape(n, 3) * m.reshape(n, 3), axis=0) / np.sum(m) * 3.0
v_new = np.zeros(n*3)
# remove center of mass velocity for each atom
for i in range(n):
v_new[i*3:(i+1)*3] = v[i*3:(i+1)*3] - v_com
if rrot == False:
return v_new
# calculate overall rotation with respect to center of mass and substract
# L is overall angular momentum
L = np.zeros(3)
# Theta is inertia tensoer
Theta = np.zeros((3, 3))
# calculate center of mass
com = np.sum(r.reshape(n, 3) * m.reshape(n, 3), axis=0) / np.sum(m) * 3.0
for i in range(n):
# calculate particle position relative to center of mass
r_com = r[i*3:(i+1)*3] - com
# calculate overall angular momentum
L += (m[i*3] * np.cross(r_com, v_new[i*3:(i+1)*3]))
# calculate inertia tensor
Theta += m[i*3] * (np.dot(r_com, r_com) * np.identity(3) - np.outer(r_com, r_com))
# regularize Theta!
Theta += 1e-10 * np.identity(3)
invTheta = np.linalg.inv(Theta)
for i in range(n):
r_com = r[i*3:(i+1)*3] - com
# v_p = v_p - Th^-1 * L (X) r_p
try:
v_new[i*3:(i+1)*3] -= np.cross(np.dot(invTheta, L), r_com)
except np.linalg.linalg.LinAlgError:
return v_new
return v_new
[docs]
def kinetic_energy(self, step=None, atomlist=None):
"""
Calculates the kinetic energy for a certain timestep and a given set of atoms.
Input:
step --- timestep for which kinetic energy should be calculated given as integer.
default is None and gives the current timestep
atomlist --- list of indices for the atoms to be included, default is None and gives all.
Output:
kin --- kinetic energy
"""
kin = 0.0
# current velocities and mass
v = self.V
m = self.M
# velocities at step if given
if step != None:
v = self.logV[step]
# if only subset of atoms should be included
if atomlist != None:
for i in atomlist:
kin += 0.5*np.dot(v[i*3:(i+1)*3] * m[i*3:(i+1)*3], v[i*3:(i+1)*3])
else:
kin = 0.5*np.dot(v*m, v)
return kin
[docs]
def getCOM(self, atomlist, step=0):
"""
This function returns the center of mass of a set of atoms given by their index.
Input:
atomlist --- list of indices for the atoms
Output:
center of mass
"""
totM = 0.0
xsum = np.zeros(3)
for i in atomlist:
totM += self.M[i*3]
xsum += self.M[i*3] * self.logR[step][i*3:(i+1)*3]
return xsum / totM
[docs]
def init_log(self):
self.logR = [self.R, ] # List of positions along trajectory
self.logV = [self.V, ] # List of velocities along trajectory
self.logt = [self.t, ] # List of times along the simulation
self.logKE = [self.ekin, ] # List of Total Energy along trajectory
self.logPE = [self._pot] # List of Total Energy along trajectory
self.logWarning = [] # List of warning messages
self.logPartial = []
if self.log_MOE:
self.logMOE = [self.f_MOE(), ]
[docs]
def log(self):
self.logR.append(self.R)
self.logV.append(self.V)
self.logt.append(self.t)
self.logKE.append(self.ekin)
self.logPE.append(self._pot)
if self.getPartial is not None:
self.logPartial.append(self.partialCharges)
if self.log_MOE:
self.logMOE.append(self.f_MOE())
[docs]
def log_to_file(self, **opts):
"""
Write all log information to a set of files
"""
if not os.path.exists(self.DIR):
os.mkdir(self.DIR)
self._R_to_file() # positions
self._V_to_file() # velocities
self._E_to_file() # kinetic, potential and total energies
if self.ES is not None and self.ES.state != 'doNotRun':
self.ES.write_logs()
self._Warning_to_file() # Warning messages
if self.log_MOE:
self._MOE_to_file() # molecular orbital energies
if self.getPartial is not None:
self._partial_to_file()
self._seed_to_file()
def _issue_warning(self, msg):
"""
Write warning message to warning log-list
"""
strng = '%8i %7.4f %s\n' % (self._stepnum, self.t, msg.rstrip())
self.logWarning.append(strng)
def _MOE_to_file(self):
f = open(self.DIR + '/MOE.log', 'w')
for i, MOE in enumerate(self.logMOE):
f.write('%9.3f' % (self.logt[i],))
for moe in MOE:
f.write(' %12.6f' % (moe,))
f.write('\n')
f.flush()
f.close()
def _seed_to_file(self):
f = open(self.DIR + '/seed.log', 'w')
if self.rseed is not None:
f.write("%d \n" % self.rseed)
f.close()
def _R_to_file(self):
f = open(self.DIR + '/R.log', 'w')
for i, R in enumerate(self.logR):
f.write('%9.3f' % (self.logt[i],))
for r in R:
f.write(' %12.6f' % (r,))
f.write('\n')
f.flush()
f.close()
def _V_to_file(self):
f = open(self.DIR + '/V.log', 'w')
for i, V in enumerate(self.logV):
f.write('%9.3f' % (self.logt[i],))
for v in V:
f.write(' %12.6f' % (v,))
f.write('\n')
f.flush()
f.close()
def _E_to_file(self):
f = open(self.DIR + '/E.log', 'w')
for i in range(len(self.logt)):
f.write('%9.3f %12.6f %12.6f %12.6f\n' % (self.logt[i],
self.logKE[i],
self.logPE[i],
self.logKE[i] + self.logPE[i])
)
f.flush()
f.close()
def _partial_to_file(self):
with open(self.DIR + '/partial.log','w') as f:
for i, ps in enumerate(self.logPartial):
f.write( '%11.4f ' % ((self.logt[i]), ) )
for p in ps:
f.write(' %13.8f' % (p) )
f.write('\n')
f.flush()
f.close()
def _Warning_to_file(self):
f = open(self.DIR + '/Warnings.log', 'w')
for msg in self.logWarning:
f.write(msg.rstrip() + '\n')
f.flush()
f.close()
# -------------------------------------------------------------------------------------
# Test of a single trajectory
# -------------------------------------------------------------------------------------
# if __name__ == "__main__":
# pass