#* **************************************************************************
#*
#* CDTK, Chemical Dynamics Toolkit
#* A modular system for chemical dynamics applications and more
#*
#* Copyright (C) 2011, 2012, 2013, 2014, 2015, 2016
#* Oriol Vendrell, DESY, <oriol.vendrell@desy.de>
#*
#* Copyright (C) 2017, 2018, 2019
#* Ralph Welsch, DESY, <ralph.welsch@desy.de>
#*
#* Copyright (C) 2020, 2021, 2022, 2023
#* Ludger Inhester, DESY, ludger.inhester@cfel.de
#*
#* This file is part of CDTK.
#*
#* CDTK is free software: you can redistribute it and/or modify
#* it under the terms of the GNU General Public License as published by
#* the Free Software Foundation, either version 3 of the License, or
#* (at your option) any later version.
#*
#* This program is distributed in the hope that it will be useful,
#* but WITHOUT ANY WARRANTY; without even the implied warranty of
#* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#* GNU General Public License for more details.
#*
#* You should have received a copy of the GNU General Public License
#* along with this program. If not, see <http://www.gnu.org/licenses/>.
#*
#* **************************************************************************
#! /usr/bin/env python
import os
import sys
import datetime
import shelve
import uuid
from time import time
import numpy as np
import CDTK.Tools.Conversion as conv
from . import MDIntegrators as mdi
from . import Tools as ntool
IDLEN = 12
SEP = '#--------------------------------------------------#\n'
[docs]
class SimulationBox(object):
"""
Bunch of atoms given by their cartesian coordinates.
Provides functionality to follow and analyze their classical dynamics
"""
def __init__(self,**opts):
self.X = opts.get('X',None) # 3N coordinates of the atoms
self.V = opts.get('V',None) # 3N velocities of the atoms
self.DT = opts.get('DT',10.0) # Time-step
self.M = opts.get('M',None) # 3N masses of each Cartesian coordinate
self.ndim = opts.get('ndim',3) # spatial dimensions
self.atomSymbols = opts.get('atomSymbols',[]) # list of atoms
self.external_E = opts.get('external_E',None) # energy function
self.external_G = opts.get('external_G',None) # gradient function
self.internals = opts.get('internals',False) # if true, coordinates are regarded as internal
self.isrestart = opts.get('restart',False) # whether to restart calculation from intermediate time
self.islogfile = opts.get('logfile',True) # produce a logfile
self.sh_method = opts.get('sh_method','LZ') # surface hopp method
self.label = opts.get('label',None)
self.unit = opts.get('unit','Bohr') # unit. default Bohr. optional Angstrom
self.constraints = opts.get('constraints',None)
# [stop] protection mechanism
self.save_secure = opts.get('save_secure',False)
self.use_runtime = opts.get('use_runtime',False)
self.runtime_usable = opts.get('runtime_usable',0.01) # usable runtime [h]
self.TIME = 0.0 # Current time
self._g = None # 3N gradient vector
self.TrajX = [] # List of positions along trajectory
self.KinE = [] # List of Total Energy along trajectory
self.PotE = [] # List of Total Energy along trajectory
self.TotE = [] # List of Total Energy along trajectory
self.TrajV = [] # List of velocities along trajectory
self.TotE = [] # List of total energies along trajectory
self.TrajCharge = [] # List of Charges along trajectory
self._ID = uuid.uuid4().hex[0:IDLEN] # unique ID
self._CTIME = datetime.datetime.today() # creation date/time
self._BNAME = 'sb_%4i%02i%02i_' % (self._CTIME.year,self._CTIME.month,self._CTIME.day)
if self.label: self._BNAME = self._BNAME + self.label + '_'
self._NAME = self._BNAME + self._ID
self._debug = False # If true provide output to screen
self._has_sh = False # True if working in combination with a SurfaceHopping object
self._sh = None # SurfaceHopping object
self._has_ed = False # True if working in combination with an EhrenfestDyn object
self._ed = None # EhrenfestDyn object
self._has_ob = False # True if working in combination with an Observable object
self._ob = None # Observable object
self._stepnum = 0 # Classical step number
self._has_pd = False # True if working in combination with state population dynamics (X-ray ionization and decay)
self._pd = None # state population dynamics object
[docs]
def integrate(self,**opts):
"""
Integrate up to tfinal
"""
self.DT = opts.get('dt',self.DT)
tfinal = opts.get('tfinal',self.TIME+self.DT)
dxgrad = opts.get('dxgrad',0.001) # step for numerical energy differentiation
is_first_restart = True
if self.external_E is None:
raise ValueError('No energy function (self.external_E) defined!')
if self.external_G is None: # fall back to numerical gradient
self.external_G = self._getNumGrad(dx=dxgrad)
if self.TIME >= tfinal: return
if len(self.TrajX) == 0: # log status fot TIME=0
# [stop] protection mechanism
self.stop_protect(init=True)
#
if self._has_ed:
# initializing density matrix at TIME=0 for Ehrenfest dynamics
self._ed._updateDensityMat(self.X,init=True)
self.state = self._ed.state
self._g = self.external_G(self.X)
if self.sh_method == 'FS' and self._has_sh:
# initializing quantum steps at TIME=0
self._sh._is_Hopp(self.X,init=True)
self.state = self._sh.state
self._updateLog(init=True)
if self.isrestart and is_first_restart: # log status for TIME of restart
# [stop] protection mechanism
self.stop_protect(init=True)
#
is_first_restart = False
self._stepnum = 0
self._stepnum_restart = self._stepnum
if self._has_ed:
# initializing density matrix at TIME=0 for Ehrenfest dynamics
self._ed._updateDensityMat(self.X,init=True,restart=True)
self.state = self._ed.state
self._g = self.external_G(self.X)
if self.sh_method == 'FS' and self._has_sh:
# initializing quantum steps at TIME=0
self._sh._is_Hopp(self.X,init=True,restart=True)
self.state = self._sh.state
self._updateLog(init=True)
while self.TIME < tfinal:
# [stop] protection mechanism
self.stop_protect()
#
self._step(integrator='VelocityVerlet')
self._stepnum += 1
# if PD calculation integrate pop dynamics
if self._has_pd:
self._pd._propagatePop()
self.state=self._pd.occ
# If ED calculation, ask ED module to provide gradient function via energies calculated based on density matrix
if self._has_ed:
self._ed._updateDensityMat(self.X)
self.state = self._ed.state
# If SH calculation, ask SH module to determine possible switching before kinetic energy check.
if self.sh_method == 'FS' and self._has_sh:
self._sh._is_Hopp(self.X)
self.TIME += self.DT
# If SH calculation, ask SH module for possible jump.
if self._has_sh and self._sh.is_hopp:
xnew,vnew,is_hopp = self._sh.switch_condition()
if is_hopp:
self.X = xnew
self.V = vnew
self._g = self._sh.gradient_s
self._sh.state = self._sh.hop_state
self.state = self._sh.state
self._updateLog()
self._checkpoint()
[docs]
def stop_protect(self,**opts):
"""
Protect the running processes via [stop] mechanism
optional arguments:
is_init - whether to be the initialization step
default: False
"""
is_init = opts.get('init',False)
file_stop = "stop" # [stop] file
#
_is_stop = False # whether should stop the running process
#
if is_init:
# create [stop] file
if not os.path.exists(file_stop):
open(file_stop, 'w').close()
# runtime
if self.use_runtime:
self.runtime_start = time()
else:
# check status of [stop] file
if not os.path.exists(file_stop):
_is_stop = True
# check runtime
if self.use_runtime:
# determine time elapsed [h]
time_elapsed = (time() - self.runtime_start) / 3600 # [s] - [h]
if time_elapsed >= self.runtime_usable:
sys.stdout.write("Usable runtime is met\n")
_is_stop = True
if _is_stop:
# softly bring the running calculation to intermediate completion
sys.stdout.write("QDTK soft landing\n")
self._checkpoint()
sys.exit(0)
[docs]
def add_population_dynamics_cntrl(self,pd):
self._has_pd = True
self._pd = pd
self._pd.simbox=self
[docs]
def add_surface_hopping_cntrl(self,sh):
"""
Add a surface-hopping object.
SH objects provide energy and gradient functions
"""
self._has_sh = True
self._sh = sh # crosslink objects
sh.simbox = self # "
self.external_E = sh.get_funcE()
self.external_G = sh.get_funcG()
if not self.isrestart:
self.State = []
if self.sh_method == 'FS' and self._has_sh:
self.phase_track = np.ones(self._sh.nstates,float) # The phase of electronic wavefunction
[docs]
def add_field_coupling_cntrl(self,fc):
"""
Add a field-coupling object.
FC objects provide field-coupling functions. invoked from sh.simbox._fc.
Loaded after the surface hopping object
"""
self._has_fc = True
self._fc = fc # crosslink objects
fc.simbox = self
[docs]
def add_ehrenfest_dynamics_cntrl(self,ed):
"""
Add an Ehrenfest-dynamics object.
ED objects provide energy and gradient functions
"""
self._has_ed = True
self._ed = ed # cross link objects
ed.simbox = self # "
self.external_E = ed.get_funcE()
self.external_G = ed.get_funcG()
self.reduced_mass = self.getReducedMass()
if not self.isrestart:
self.State = []
if self._has_ed:
self.phase_track = np.ones(self._ed.nstates,float) # The phase of electronic wavefunction
[docs]
def add_observable_cntrl(self,ob):
"""
Add an observable object
OB objects provide observable properties for the molecule
"""
if not self._has_ob or self.isrestart:
self._has_ob = True
# init the observable objects
# assume to contain several single observable objects
self._ob = []
ob.simbox = self # "
self._ob.append(ob) # cross link objects
if not self.isrestart:
self.TrajXeh = [] # List of Observable. [position of the electronic hole]
self.TrajTAS = [] # List of Observable. [transient absorption spectra]
self.TrajCharge = [] # List of Observable. [positive charge on each atom]
[docs]
def getReducedMass(self):
"""
Return the reduced mass of the molecule
"""
natoms = len(self.X) / self.ndim
masses = self.M.copy()
masses.shape = (natoms,self.ndim)
_rm_s = 0.0
for i in range(natoms):
_rm_s += 1.0 / masses[i,0]
reduced_mass = 1.0 / _rm_s
return reduced_mass
[docs]
def removeCenterMassMotion(self):
"""
Remove center of mass translational motion
"""
if self.internals: return # do nothing for internal coordinates
ncoords = self.X.size
nat = ncoords/self.ndim
mv = self.M * self.V # 3N momenta from velocities
mv.shape = (nat,self.ndim)
ptot = self.getMomentum()
mv = mv - ptot/nat # remove total momentum
mv.shape = (ncoords,)
self.V = mv / self.M # velocities from rescaled momenta)
[docs]
def getMomentum(self,**opts):
"""
Return total momentum vector (in [au])
Optional arguments:
- indices: tuple with indices of a group of atoms
- stepnum: step number to consider
- time: time to consider
** use time OR stepnum
"""
indices = opts.get('indices',None)
stepnum = opts.get('stepnum',None)
time = opts.get('time',None)
ncoords = self.X.size
if time is not None:
stepnum = self._indexfromtime(time)
if stepnum is None:
v = self.V
else:
v = self.TrajV[stepnum]
mv = self.M * v
mv.shape = (ncoords/self.ndim,self.ndim)
if indices is None:
ptot = mv.sum(axis=0)
else:
ptot = np.zeros(self.ndim,float)
for i in indices:
ptot = ptot + mv[i,:]
return ptot
[docs]
def getKineticEnergy(self,**opts):
"""
Total kinetic energy of the system of particles
Optional arguments:
- stepnum: step number to consider
- time: time to consider
** use time OR stepnum
"""
stepnum = opts.get('stepnum',None)
time = opts.get('time',None)
if time is not None:
stepnum = self._indexfromtime(time)
if stepnum is None:
v = self.V
else:
v = self.TrajV[stepnum]
return 0.5 * np.dot(self.M*v,v)
[docs]
def getTranslationalEnergy(self,**opts):
"""
Translational energy of the center of mass of the system of particles
Optional arguments:
- stepnum: step number to consider
- time: time to consider
** use time OR stepnum
"""
stepnum = opts.get('stepnum',None)
time = opts.get('time',None)
if time is not None:
stepnum = self._indexfromtime(time)
if stepnum is None:
v = self.V.copy()
else:
v = self.TrajV[stepnum].copy()
m = self.M.copy()
v.shape=(len(v)/self.ndim,self.ndim)
m.shape=(len(m)/self.ndim,self.ndim)
et = ntool.etrans(v,m[:,0],ndim=self.ndim)
return et
[docs]
def getRotationalEnergy(self,**opts):
"""
Rotational energy around the center of mass of the system of particles
Optional arguments:
- stepnum: step number to consider
- time: time to consider
** use time OR stepnum
"""
stepnum = opts.get('stepnum',None)
time = opts.get('time',None)
if time is not None:
stepnum = self._indexfromtime(time)
if stepnum is None:
x = self.X.copy()
v = self.V.copy()
else:
x = self.TrajX[stepnum].copy()
v = self.TrajV[stepnum].copy()
m = self.M.copy()
x.shape=(len(x)/self.ndim,self.ndim)
v.shape=(len(v)/self.ndim,self.ndim)
m.shape=(len(m)/self.ndim,self.ndim)
er = ntool.erot(x,v,m[:,0])
return er
[docs]
def getVibrationalEnergy(self,**opts):
"""
Internal energy of the system of particles
Optional arguments:
- stepnum: step number to consider
- time: time to consider
** use time OR stepnum
"""
stepnum = opts.get('stepnum',None)
time = opts.get('time',None)
if time is not None:
stepnum = self._indexfromtime(time)
if stepnum is None:
x = self.X.copy()
v = self.V.copy()
else:
x = self.TrajX[stepnum].copy()
v = self.TrajV[stepnum].copy()
m = self.M.copy()
m.shape=(len(m)/self.ndim,self.ndim)
x.shape=(len(x)/self.ndim,self.ndim)
v.shape=(len(v)/self.ndim,self.ndim)
ev = ntool.evib(x,v,m[:,0],ndim=self.ndim)
return ev
[docs]
def getVibrationalMomentum(self,**opts):
"""
Internal momentum of the system of particles
Optional arguments:
- stepnum: step number to consider
- time: time to consider
** use time OR stepnum
"""
stepnum = opts.get('stepnum',None)
time = opts.get('time',None)
if time is not None:
stepnum = self._indexfromtime(time)
if stepnum is None:
x = self.X.copy()
v = self.V.copy()
else:
x = self.TrajX[stepnum].copy()
v = self.TrajV[stepnum].copy()
m = self.M.copy()
m.shape=(len(m)/self.ndim,self.ndim)
x.shape=(len(x)/self.ndim,self.ndim)
v.shape=(len(v)/self.ndim,self.ndim)
pv = ntool.pvib(x,v,m[:,0],ndim=self.ndim)
return pv
[docs]
def getPotEnergy(self,**opts):
"""
Return potential energy (last time or stored) of the system (a.u.)
Optional arguments:
- stepnum: step number to consider
- time: time to consider
** use time OR stepnum
"""
stepnum = opts.get('stepnum',None)
time = opts.get('time',None)
if time is not None:
stepnum = self._indexfromtime(time)
if stepnum is None:
e = self.PotE[-1]
else:
e = self.PotE[stepnum]
return e
[docs]
def centerOfMassVelocity(self,**opts):
"""
Compute the velocity vector of the center of mass
Optional arguments:
- stepnum: step number to consider
- time: time to consider
** use time OR stepnum
"""
if self.ndim != 3:
raise ValueError('Function centerOfMassVelocity is currently for three spatial dimensions')
stepnum = opts.get('stepnum',None)
time = opts.get('time',None)
if time is not None:
stepnum = self._indexfromtime(time)
if stepnum is None:
vv = self.V
else:
vv = self.TrajV[stepnum]
totM = self.M.sum()/float(self.ndim)
v = vv.copy()
m = self.M.copy()
v.shape = (len(v)/self.ndim,self.ndim)
m.shape = (len(m)/self.ndim,self.ndim)
px = np.dot(v[:,0],m[:,0])
py = np.dot(v[:,1],m[:,1])
pz = np.dot(v[:,2],m[:,2])
vcm = np.array([px,py,pz])/totM
return vcm
[docs]
def getAdiabaticPopulation(self):
"""
Return the adiabatic populations and the norm of electronic wavefunction
"""
norm_el = 0.0
adpop = []
for amp_el in self._sh.c:
adp_s = abs(amp_el.conjugate() * amp_el)
norm_el = norm_el + adp_s
adpop.append(adp_s)
return adpop, norm_el
[docs]
def save(self,**opts):
"""
Save object using pickle
optional arguments:
- fname: filename to save to. If not given a random
hash will be used
"""
# Process arguments
fname = opts.get('fname',self._NAME)
# Undefine functions, these cannot be saved!
fC = self.constraints
fE = self.external_E
fG = self.external_G
sh = self._sh
ed = self._ed
ob = self._ob
pd = self._pd
if self._has_pd:
self._pd.write_chkpt(fname+'.xmol')
if self._has_sh:
self.state_save = sh.state
if self.sh_method == 'FS':
self.c_wavefunc_save = sh.c.copy()
self.energy_reference = sh.energy_reference.copy()
if self._has_ed:
self.state_save = ed.state
self.rho_densitymat_save = ed.rho.copy()
self.energy_reference = ed.energy_reference.copy()
self.T0 = ed.T0
self.constraints = None
self.external_E = None
self.external_G = None
self._sh = None
self._ed = None
self._ob = None
self._pd = None
# Start save
s = shelve.open(fname,protocol=2)
s['sbox'] = self
s.close()
self.constraints = fC
self.external_E = fE
self.external_G = fG
self._sh = sh
self._ed = ed
self._ob = ob
self._pd = pd
# secure the file
TIME_c = str(self.TIME)
#fname_sec = fname.upper() + '_SEC_' + TIME_c
#
fname_sec = fname.upper() + '_SEC'
if self.save_secure:
EXEC = 'cp ' + fname + ' ' + fname_sec
status = os.system(EXEC)
[docs]
def load(self,a_fname,**opts):
"""
Load a simulationbox object from file
"""
try:
s = shelve.open(a_fname,protocol=2)
except:
msg = 'File '+a_fname+' could not be accessed for reading'
raise ValueError(msg)
b = s['sbox']
return b
[docs]
def loadDirectory(self,a_dir,match=None,**opts):
"""
Load all simulationbox objects stored in a directory
Input arguments:
a_dir -- directory where to load simbox objects from
match -- (optional) load only files that match "match"
"""
sboxes = []
try:
listFiles = os.listdir(a_dir)
except:
raise ValueError('Could not open directory '+a_dir)
for filename in listFiles:
pathname = os.path.join(a_dir,filename)
if os.path.isdir(pathname):
sboxes = sboxes + self.loadDirectory(pathname,match=match)
else:
if match is not None:
if filename.find(match) == -1:
continue
try:
sbox = self.load(pathname)
sboxes.append(sbox)
except:
continue
return sboxes
[docs]
def traj2VTF(self,**opts):
"""
Generate a trajectory in .vtf format
Useful for plots with VMD
"""
if self.internals: return
fileName = opts.get('fileName',self._NAME+'.vtf')
out = file(fileName,'w')
ncoords = self.X.size
nat = ncoords/self.ndim
for i,s in enumerate(self.atomSymbols):
out.write( 'atom %i radius 1.0 name %s\n' % (i,s) )
out.write('\n')
for geom in self.TrajX:
out.write('timestep\n')
geom.shape = (nat,self.ndim)
for atom in geom:
for coord in atom:
out.write(' %12.7f' % (coord*conv.au2an))
out.write('\n')
out.write('\n')
geom.shape = (ncoords,)
[docs]
def getPropertyTable(self,funcX=None,filename=None,usevel=False,**opts):
"""
Return property as a function of time
Works on a single SimulationBox instance
Input arguments:
funcX -- (optional) function of the coordinates
may have more than one return value
filename -- (optional) file where to store the data in table form
usevel -- (optional) if True, velocities are
used instead of positions
on return the following arrays:
time, ekin, etrans, erot, evib, epot
or
time, func_val
if funcX provided
Options
finalTime -- final time for property analysis over patial time domain
"""
finalTime = opts.get('finalTime',None)
if finalTime is not None:
# analyse over partial time domain terminating at [finalTime]
if sbox.TIME < finalTime:
return None
else:
times = np.linspace(0.0,finalTime,num=int(finalTime/sbox.DT)+1)
else:
# analyse over full time domain
times = np.linspace(0.0,self.TIME,num=len(self.TrajX))
tlen = len(times)
if funcX is not None:
if not usevel:
fval = np.array([funcX(x) for x in self.TrajX[:tlen]])
else:
fval = np.array([funcX(x) for x in self.TrajV[:tlen]])
if filename is not None:
fout = file(filename,'w')
fout.write('# t[as] func( X(t) )\n')
for i,t in enumerate(times):
fout.write('%13.7f ' % (t,))
if isinstance(fval[i],np.ndarray): # multidimensional funcX
for v in fval[i]: fout.write(' %13.7f' % (v,))
else:
fout.write(' %13.7f' % (fval[i],))
fout.write('\n')
fout.close()
return times,fval
else:
ekin = np.array([self.getKineticEnergy(time=t) for t in times])
etra = np.array([self.getTranslationalEnergy(time=t) for t in times])
erot = np.array([self.getRotationalEnergy(time=t) for t in times])
evib = np.array([self.getVibrationalEnergy(time=t) for t in times])
epot = np.array([self.getPotEnergy(time=t) for t in times])
if filename is not None:
fout = file(filename,'w')
fout.write('# time[as] E_kin E_trans E_rot E_vib\n')
for i,t in enumerate(times):
fout.write('%13.7f %13.7f %13.7f %13.7f %13.7f %13.7f\n' %
(t,ekin[i],etra[i],erot[i],evib[i],epot[i]))
fout.close()
return times,ekin,etra,erot,evib,epot
[docs]
def getPropertyTableDir_F(self,a_dir,a_funcX,filename=None,match=None,usevel=False,**opts):
"""
Return an ensemble average property as a function of time
Input arguments:
a_dir -- directory with SimulationBox to be averaged
a_funcX -- function of the coordinates;
may have more than one return value
filename -- (optional) file where to store the data in table form
match -- load only files matching match
usevel -- (optional) if True, velocities are used instead of positions
On return two arrays with:
time, func_val
Options
finalTime -- final time for property analysis over patial time domain
"""
finalTime = opts.get('finalTime',None)
lboxes = self.loadDirectory(a_dir,match=match)
if finalTime is not None:
times = np.linspace(0.0,finalTime,num=int(finalTime/lbox[0].DT)+1)
else:
times = np.linspace(0.0,lboxes[0].TIME,num=len(lboxes[0].TrajX))
N = len(lboxes) # number of systems
S = len(times) # number of time steps
data = []
for sb in lboxes:
if finalTime is not None:
tab = sb.getPropertyTable(funX=a_funcX,usevel=usevel,finalTime=finalTime)
if tab is not None:
data.append(tab[1])
else:
data.append(sb.getPropertyTable(funcX=a_funcX,usevel=usevel)[1])
adata = np.array(data) # indices of adata: system,timestep,property
if len(adata.shape) == 2: # single property
fval = np.array([adata[:,it].sum()/N for it in range(S)])
if filename is not None:
fout = file(filename,'w')
fout.write('# t[as] func( X(t) )\n')
for i,t in enumerate(times):
fout.write('%13.7f %13.7f\n' % (t,fval[i]))
fout.close()
if len(adata.shape) == 3: # multiple properties
nprop = adata.shape[2]
fval = []
for p in range(nprop):
fval.append( np.array([adata[:,it,p].sum()/N for it in range(S)]) )
fval = np.array(fval)
if filename is not None:
fout = file(filename,'w')
fout.write('# t[as] func( X(t) )\n')
for i,t in enumerate(times):
fout.write('%13.7f ' % (t,))
for p in range(nprop):
fout.write(' %13.7f' % (fval[p,i]))
fout.write('\n')
fout.close()
return times,fval
[docs]
def getPropertyTableDir_E(self,a_dir,filename=None,match=None,**opts):
"""
Return ensemble averages of various energies
Input arguments:
a_dir -- directory with SimulationBox to be averaged
filename -- (optional) file where to store the data in table form
match -- load only files matching a_match
On return two arrays with:
time, func_val
Options
finalTime -- final time for property analysis over patial time domain
"""
finalTime = opts.get('finalTime',None)
lboxes = self.loadDirectory(a_dir,match=match)
if finalTime is not None:
times = np.linspace(0.0,finalTime,num=int(finalTime/lbox[0].DT)+1)
else:
times = np.linspace(0.0,lboxes[0].TIME,num=len(lboxes[0].TrajX))
data = []
for sb in lboxes:
data.append(sb.getPropertyTable(finalTime=finalTime))
adata = np.array(data)
N = len(adata[:,0,0]) # number of systems
S = len(adata[0,0,:]) # number of time steps
ekin = np.array([adata[:,1,it].sum()/N for it in range(S)])
etra = np.array([adata[:,2,it].sum()/N for it in range(S)])
erot = np.array([adata[:,3,it].sum()/N for it in range(S)])
evib = np.array([adata[:,4,it].sum()/N for it in range(S)])
epot = np.array([adata[:,5,it].sum()/N for it in range(S)])
if filename is not None:
fout = file(filename,'w')
fout.write('# time[as] E_kin E_trans E_rot E_vib\n')
for i,t in enumerate(times):
fout.write('%13.7f %13.7f %13.7f %13.7f %13.7f %13.7f\n' %
(t,ekin[i],etra[i],erot[i],evib[i],epot[i]))
fout.close()
return times,ekin,etra,erot,evib,epot
[docs]
def getProperty2D_F(self,a_dir,a_funcX,a_filename,match=None):
"""
Return a 2D function of the system in block form, one block per time step
Input arguments:
a_dir -- directory with SimulationBox to be averaged
a_filename -- file where to store the data in table form
match -- load only files matching a_match
On return two arrays with:
time, func_val
"""
lboxes = self.loadDirectory(a_dir,match=match)
times = np.linspace(0.0,lboxes[0].TIME,num=len(lboxes[0].TrajX))
N = len(lboxes) # number of systems
S = len(times) # number of time steps
data = []
for sb in lboxes:
data.append(sb.getPropertyTable(funcX=a_funcX)[1])
adata = np.array(data) # indices of adata: system,timestep,property
fout = file(a_filename,'w')
for i,t in enumerate(times):
for n in range(N):
fout.write('%13.7f %13.7f %i %13.7f\n' % (adata[n,i,0],adata[n,i,1],1,t))
fout.write('\n\n')
fout.close()
[docs]
def reset(self):
self.X = self.TrajX[0]
self.V = self.TrajV[0]
self.TIME = 0.0
self.TrajX = []
self.TrajV = []
self.KinE = []
self.PotE = []
self.TotE = []
if self._has_sh or self._has_ed:
self.State = []
def _updateLog(self,**opts):
is_first = opts.get('init',False)
# Initialize logfile
if is_first and self.islogfile and not self.isrestart:
self.logfile = file(self._NAME+'.log','w')
self.logfile.write(SEP)
self.logfile.write('# '+self._NAME+'\n')
if self.sh_method == 'FS' and self._has_sh:
self.logfile.write(SEP)
self.logfile.write('# global energy shift %16.7f has been applied.\n' % (self._sh.energy_reference))
self.logfile.write(SEP)
self.logfile.write('# step time KinE PotE TotE <state>\n')
self.logfile_e = file(self._NAME+'_e.log','w')
self.logfile_e.write('# step time KinE PotE TotE <state>\n')
self.logfile_ep = file(self._NAME+'_ep.log','w')
self.logfile_ep.write('# The potential energy of each electronic state.\n')
if self.sh_method == 'FS' and self._has_sh:
self.logfile_ep.write('# Fewest-switching surface-hopping.\n')
elif self._has_ed:
self.logfile_ep.write('# Ehrenfest dynamics. The Last column of energy is the Ehrenfest energy from density matrix.\n')
self.logfile_es = file(self._NAME+'_es.log','w')
self.logfile_es.write('# step time <state>\n')
self.logfile_adp = file(self._NAME+'_adp.log','w')
self.logfile_adp.write('# The adiabatic populations. The Last column is the norm of electronic wavefunction.\n')
if self.sh_method == 'FS' and self._has_sh:
self.logfile_adp.write('# Fewest-switching surface-hopping\n')
elif self._has_ed:
self.logfile_adp.write('# Ehrenfest dynamics\n')
# Appending logfile for [restart] calculation
if is_first and self.islogfile and self.isrestart:
self.logfile = file(self._NAME+'.log','a+w')
self.logfile.write(SEP)
self.logfile.write('# '+self._NAME+'\n')
if self.sh_method == 'FS' and self._has_sh:
self.logfile.write(SEP)
self.logfile.write('# global energy shift %16.7f has been applied.\n' % (self._sh.energy_reference))
self.logfile.write(SEP)
self.logfile.write('# step time KinE PotE TotE <state>\n')
self.logfile_e = file(self._NAME+'_e.log','a+w')
self.logfile_e.write('# step time KinE PotE TotE <state>\n')
self.logfile_ep = file(self._NAME+'_ep.log','a+w')
self.logfile_ep.write('# The potential energy of each electronic state.\n')
if self.sh_method == 'FS' and self._has_sh:
self.logfile_ep.write('# Fewest-switching surface-hopping.\n')
elif self._has_ed:
self.logfile_ep.write('# Ehrenfest dynamics. The Last column of energy is the Ehrenfest energy from density matrix.\n')
self.logfile_es = file(self._NAME+'_es.log','a+w')
self.logfile_es.write('# step time <state>\n')
self.logfile_adp = file(self._NAME+'_adp.log','a+w')
self.logfile_adp.write('# The adiabatic populations. The Last column is the norm of electronic wavefunction.\n')
if self.sh_method == 'FS' and self._has_sh:
self.logfile_adp.write('# Fewest-switching surface-hopping\n')
elif self._has_ed:
self.logfile_adp.write('# Ehrenfest dynamics\n')
kinE = self.getKineticEnergy()
if self._sh is None:
potE = self.external_E(self.X)
else:
potE = self._sh.e_adiabatic[self._sh.state]
totE = potE + kinE
if self.sh_method == 'FS' and self._has_sh:
adpop, norm_el = self.getAdiabaticPopulation()
# compute observables
if self._has_ob:
for _ob_s in self._ob:
if _ob_s._ONAME == 'ob_eh':
# position of electronic hole
self.Xeh = _ob_s.getElecHolePosition(self.X, self.state)
elif _ob_s._ONAME == 'ob_tas':
# transient absorption spectra
self.TAS = _ob_s.getDipoleEnergyTAS(self.X, self.state)
elif _ob_s._ONAME == 'ob_charge':
# positive charge on each atom
self.Charge = _ob_s.getCharge(self.state)
else:
pass
self.KinE.append(kinE)
self.PotE.append(potE)
self.TotE.append(totE)
if not self.isrestart:
self.TrajX.append(self.X)
self.TrajV.append(self.V)
if self._has_sh:
self.State.append(self._sh.state)
if self._has_ed:
self.State.append(self._ed.state)
if self._has_ob:
for _ob_s in self._ob:
if _ob_s._ONAME == 'ob_eh':
self.TrajXeh.append(self.Xeh)
elif _ob_s._ONAME == 'ob_tas':
self.TrajTAS.append(self.TAS)
elif _ob_s._ONAME == 'ob_charge':
self.TrajCharge.append(self.Charge)
else:
pass
elif self.isrestart and not is_first:
# avoid double record at the point of restart
self.TrajX.append(self.X)
self.TrajV.append(self.V)
if self._has_sh:
self.State.append(self._sh.state)
if self._has_ed:
self.State.append(self._ed.state)
if self._has_ob:
for _ob_s in self._ob:
if _ob_s._ONAME == 'ob_eh':
self.TrajXeh.append(self.Xeh)
elif _ob_s._ONAME == 'ob_tas':
self.TrajTAS.append(self.TAS)
elif _ob_s._ONAME == 'ob_charge':
self.TrajCharge.append(self.Charge)
else:
pass
if self.islogfile:
if not self.isrestart:
stepstr = " %i %10.6f %15.12f %15.12f %15.12f" % (
self._stepnum,self.TIME,kinE,potE,totE)
# potential energy of each electronic state
epstr = "%6i %16.7f" % (
self._stepnum,self.TIME)
if self._has_sh:
for potE_s in self._sh.e_adiabatic:
epstr = epstr + " %16.7f" % (potE_s,)
epstr = epstr + " %3i\n" % (self._sh.state,)
elif self._has_ed:
for potE_s in self._ed.e_adiabatic:
epstr = epstr + " %16.7f" % (potE_s,)
epstr = epstr + " %16.7f" % (self._ed.e_ehrenfest)
epstr = epstr + " %3i\n" % (self._ed.state,)
else:
epstr = epstr + " \n"
if self._has_sh:
# energies and current electronic state
stepstr = stepstr + " %i\n" % (self._sh.state,)
# current electronic state
esstr = "%6i %16.7f %i\n" % (
self._stepnum,self.TIME,self._sh.state)
# adiabatic populations
if self.sh_method == 'FS':
adpstr = "%6i %16.7f" % (
self._stepnum,self.TIME)
for adp_s in adpop:
adpstr = adpstr + " %16.7f" % (adp_s)
adpstr = adpstr + " %16.7f\n" % (norm_el,)
elif self._has_ed:
# energies and current decoherence state
stepstr = stepstr + " %i\n" % (self._ed.state,)
# current decoherence state
esstr = "%6i %16.7f %i\n" % (
self._stepnum,self.TIME,self._ed.state)
elif self._has_pd:
stepstr = stepstr + " %s \n" % (''.join(map(str,self._pd.occ)))
else:
stepstr = stepstr + " \n"
elif self.isrestart:
stepstr = " %i %10.6f %15.12f %15.12f %15.12f" % (
(self._stepnum_restart+self._stepnum),self.TIME,kinE,potE,totE)
# potential energy of each electronic state
epstr = "%6i %16.7f" % (
(self._stepnum_restart+self._stepnum),self.TIME)
if self._has_sh:
for potE_s in self._sh.e_adiabatic:
epstr = epstr + " %16.7f" % (potE_s,)
epstr = epstr + " %3i\n" % (self._sh.state,)
elif self._has_ed:
for potE_s in self._ed.e_adiabatic:
epstr = epstr + " %16.7f" % (potE_s,)
epstr = epstr + " %16.7f" % (self._ed.e_ehrenfest)
epstr = epstr + " %3i\n" % (self._ed.state,)
else:
epstr = epstr + " \n"
if self._has_sh:
# energies and current electronic state
stepstr = stepstr + " %i\n" % (self._sh.state,)
# current electronic state
esstr = "%6i %16.7f %i\n" % (
(self._stepnum_restart+self._stepnum),self.TIME,self._sh.state)
# adiabatic populations
if self.sh_method == 'FS':
adpstr = "%6i %16.7f" % (
(self._stepnum_restart+self._stepnum),self.TIME)
for adp_s in adpop:
adpstr = adpstr + " %16.7f" % (adp_s)
adpstr = adpstr + " %16.7f\n" % (norm_el,)
elif self._has_ed:
# energies and current decoherence state
stepstr = stepstr + " %i\n" % (self._ed.state,)
# current decoherence state
esstr = "%6i %16.7f %i\n" % (
(self._stepnum_restart+self._stepnum),self.TIME,self._ed.state)
elif self._has_pd:
stepstr = stepstr + " %s \n" % (''.join(map(str,self._pd.occ)))
else:
stepstr = stepstr + " \n"
if not self.isrestart:
self.logfile.write(SEP)
self.logfile.write(stepstr)
self.logfile_e.write(stepstr)
if self._has_sh:
self.logfile_ep.write(epstr)
self.logfile_es.write(esstr)
elif self._has_ed:
self.logfile_ep.write(epstr)
if self.sh_method == 'FS' and self._has_sh:
self.logfile_adp.write(adpstr)
elif self.isrestart and not is_first:
# avoid double recording
self.logfile.write(SEP)
self.logfile.write(stepstr)
self.logfile_e.write(stepstr)
if self._has_sh:
self.logfile_ep.write(epstr)
self.logfile_es.write(esstr)
elif self._has_ed:
self.logfile_ep.write(epstr)
if self.sh_method == 'FS' and self._has_sh:
self.logfile_adp.write(adpstr)
if not is_first:
if self._has_sh:
self._sh._update_logfile()
elif self._has_ed:
self._ed._update_logfile()
# logfile for observables
if self._has_ob:
for _ob_s in self._ob:
_ob_s._update_logfile()
self.logfile.write(SEP)
self.logfile.write('\n')
def _step(self,**opts):
"""
Do an integration step
"""
integrator = opts.get('integrator','VelocityVerlet')
if integrator == 'VelocityVerlet':
(self.X,self.V,self._g) = mdi.velocityVerlet(self.X,
self.V,
self._g,
self.M,
self.DT,
self.external_G,
constraints=self.constraints)
def _checkpoint(self):
"""
Use the normal saving mechanism to checkpoint
"""
self.save(fname=self._NAME+'.rst')
self.traj2VTF()
## def _getTimeDeriv(self,**opts):
##chage this part
def _getNumGrad(self,**opts):
dx = opts.get('dx',0.001) # step for numerical differentiation
def g(x):
grad = np.zeros(x.size,float)
for i in range(x.size):
x1 = x.copy()
x1[i] = x1[i] + 0.5*dx
f1 = self.external_E(x1)
x0 = x.copy()
x0[i] = x0[i] - 0.5*dx
f0 = self.external_E(x0)
grad[i] = (f1-f0)/dx
return grad
return g
def _indexfromtime(self,a_t):
dt = self.DT
tf = float(a_t)
i,r = divmod(tf,dt)
if r <= 0.5*dt:
i = int(i)
else:
i = int(i) + 1
return i
[docs]
def load(filename):
"""
Load a SimulationBox from file and return it
"""
s = SimulationBox().load(filename)
return s