#!/usr/bin/env python
#* **************************************************************************
#*
#* CDTK, Chemical Dynamics Toolkit
#* A modular system for chemical dynamics applications and more
#*
#* Copyright (C) 2011, 2012, 2013, 2014, 2015, 2016
#* Oriol Vendrell, DESY, <oriol.vendrell@desy.de>
#*
#* Copyright (C) 2017, 2018, 2019
#* Ralph Welsch, DESY, <ralph.welsch@desy.de>
#*
#* Copyright (C) 2020, 2021, 2022, 2023
#* Ludger Inhester, DESY, ludger.inhester@cfel.de
#*
#* This file is part of CDTK.
#*
#* CDTK is free software: you can redistribute it and/or modify
#* it under the terms of the GNU General Public License as published by
#* the Free Software Foundation, either version 3 of the License, or
#* (at your option) any later version.
#*
#* This program is distributed in the hope that it will be useful,
#* but WITHOUT ANY WARRANTY; without even the implied warranty of
#* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#* GNU General Public License for more details.
#*
#* You should have received a copy of the GNU General Public License
#* along with this program. If not, see <http://www.gnu.org/licenses/>.
#*
#* **************************************************************************
from builtins import str
from builtins import range
import sys
import os
import shutil
from optparse import OptionParser
import time
import numpy as np
import scipy
import math
import CDTK.Dynamics.IniConSampler as ico
import CDTK.Dynamics.Thermostat as ts
import CDTK.Dynamics.Trajectory as trj
import CDTK.Dynamics.Constraints as constraints
import CDTK.Interfaces.WrapperInterface as win
import CDTK.Tools.NormalModeAnalysis as nmo
import CDTK.Tools.Inputfile as inf
import CDTK.Tools.Conversion as cv
import CDTK.Tools.Utils as uti
IDLEN = 8
TINY = 1.0e-8
PREFIX = 'ensmbl'
[docs]
def start():
# --------------------------------------------------------------------------
# Parse command line options
# --------------------------------------------------------------------------
parser=OptionParser()
parser.add_option('-i','--input-file',
dest='input_file',
type='str',
default=None,
help='Path to input file')
parser.add_option('-d','--project-dir',
dest='project_dir',
type='str',
default=None,
help='Project name used to for output directory')
parser.add_option('-H','--hessian-file',
dest='hessian_file',
type='str',
default=None,
help='ascii file with Hessian matrix as a 2D table')
parser.add_option('-N','--normal-modes-frequency-file',
dest='normalmode_freq_file',
type='str',
default=None,
help='ascii file with normal mode frequencies and coordinates')
parser.add_option('-a', '--add',
dest='doAdd',
action='store_true',
default=False,
help='Adds new initial conditions to a folder.')
parser.add_option('-n', '--noDirec',
dest='noDirec',
action='store_true',
default=False,
help='Do not create directory, but write everything to two files.')
parser.add_option('-r','--rho',
dest='rho',
action='store_true',
default=False,
help='Generates histogram of x/v.')
parser.add_option('--r-displacement',
dest='rDisplacement',
type=str,
default=None,
help='position displacement for each mode')
parser.add_option('--v-displacement',
dest='vDisplacement',
type=str,
default=None,
help='velocity displacement for each mode')
opts, args = parser.parse_args(sys.argv[1:])
# --------------------------------------------------------------------------
# Parse input file
# --------------------------------------------------------------------------
if opts.input_file:
basename = opts.input_file.split('.')[0]
I = inf.Inputfile(opts.input_file).sections
else:
raise ValueError('No input file specified')
# --------------------------------------------------------------------------
# Check for existance of Hessian and Normal mode files
# --------------------------------------------------------------------------
if opts.hessian_file:
hessian = uti.readTableFile(opts.hessian_file)
nmfreq = None
nmcoord = None
elif opts.normalmode_freq_file:
nmfreq = uti.readfirstline_File(opts.normalmode_freq_file)
nmcoord = uti.readTable_exceptfirstline_File(opts.normalmode_freq_file)
hessian = None
elif I['sampling']['type'][0] == 'boltzmann':
hessian = None
nmfreq = None
nmcoord = None
elif I['sampling']['type'][0] == 'monte-carlo':
hessian = None
nmfreq = None
nmcoord = None
elif I['sampling']['type'][0] == 'rpsh':
hessian = None
nmfreq = None
nmcoord = None
elif I['sampling']['type'][0] == 'thermostat':
hessian = None
nmfreq = None
nmcoord = None
elif I['sampling']['type'][0] == 'wignerNEU' or I['sampling']['type'][0] == 'wignerNP':
hessian = None
nmfreq = None
nmcoord = None
else:
raise ValueError('No Hessian/Normal mode file specified')
# --------------------------------------------------------------------------
# Set projectname
# --------------------------------------------------------------------------
projdir = opts.project_dir
if not projdir:
_id = uti.getUniqueID(IDLEN)
_date = uti.getDateString()
projdir = PREFIX + '_' + basename + '_' + _date + '_' + _id
else:
projdir = PREFIX + '_' + projdir
# --------------------------------------------------------------------------
# Create name directory, copy input and Hessian files, change to it
# --------------------------------------------------------------------------
if not os.path.exists(projdir):
os.mkdir(projdir)
shutil.copy(opts.input_file,projdir+'/input_xsample')
if I['sampling']['type'][0] == 'zero_point_energy' or I['sampling']['type'][0] == 'wigner':
if 'hessian' in I['sampling']:
shutil.copy(opts.hessian_file,projdir+'/hessian')
elif 'normalmodes' in I['sampling']:
shutil.copy(opts.normalmode_freq_file,projdir+'/normalmodes')
currentDir = os.getcwd()
os.chdir(projdir)
nshift = 0
## if we are supposed to add
if opts.doAdd == True:
# obtain number of alread existing trj
nshift = len(uti.getDirectoryList())
# --------------------------------------------------------------------------
# Read atom list and reference coordinates X0
# --------------------------------------------------------------------------
atomlist = I['cartpos']['atomlist']
nat = len(atomlist)
atomnums = []
for atom in atomlist:
atomnums.append(cv.periodicTable[atom]['atomic_number'])
atommass = []
atommass3 = []
for atom in atomlist:
atommass.append(cv.periodicTable[atom]['atomic_mass'])
atommass3.append(cv.periodicTable[atom]['atomic_mass'])
atommass3.append(cv.periodicTable[atom]['atomic_mass'])
atommass3.append(cv.periodicTable[atom]['atomic_mass'])
m = np.asarray(atommass3)*cv.am2au
xcart = np.array( I['cartpos']['coordinates'], float )
if I['system']['xunit'][0] == 'an':
xcart = xcart * cv.an2au
vcart = np.array( I['cartvel']['coordinates'], float )
uti.listToFile(atomlist,'atomlist')
uti.listToFile(atomnums,'atomnums')
uti.listToFile(atommass,'atommass')
# --------------------------------------------------------------------------
# Sampling of initial conditions
# --------------------------------------------------------------------------
# nsample = number of samples to produce
nsample = int( I['sampling']['nsample'][0] ) - nshift
# nstride = number of samples to skip; relevant for monte carlo sampling
nstride = int( I['sampling'].get('nstride', [1])[0] )
# nobtain = number of samples to actually produce
nobtain = nsample * nstride
# qn = quantum number of the harmonic mode
qn = I['sampling'].get('qn',None)
if qn != None:
quantum_numbers = [int(i) for i in qn]
else:
quantum_numbers = np.zeros(len(xcart))
if 'linear' in I['sampling']:
is_linear = True
else:
is_linear = False
if '1d' in I['sampling']:
is_1d = True
else:
is_1d = False
# Sample by using the hessian, harmonic oscillators and
if I['sampling']['type'][0] == 'zero_point_energy':
quantum_numbers = I['sampling'].get('quantum_numbers',None)
X,V = ico.sampler_zero_point_energy(nsample,
xcart,
m,
hessian, nmfreq, nmcoord,
quantum_numbers,
is_linear)
if len(I['sampling']['type']) > 1 and I['sampling']['type'][1] == 'rotation':
X, V = ico.rotation(X, V)
# Sample by using Monte-Carlo scheme and exp(-\beta H) or long trajectories
elif I['sampling']['type'][0] == 'boltzmann' or I['sampling']['type'][0] == 'thermostat' or I['sampling']['type'][0] == 'wignerNEU' or I['sampling']['type'][0] == 'wignerNP' or I['sampling']['type'][0] == 'wignerHarmonic':
# --------------------------------------------------------------------------
# Create interface to quantum chemistry engine (QCE)
# --------------------------------------------------------------------------
is_keepinp = False
is_keeplog = False
if 'keepinp' in I['system']: is_keepinp = True
if 'keeplog' in I['system']: is_keeplog = True
if I['system']['qchemistry'] == ['gamess']:
import CDTK.Interfaces.GamessUSInterface as gin
QCE = gin.gamess()
QCE.is_storeinp = is_keepinp # Keep inp file of every calculation performed
QCE.is_storelog = is_keeplog # Keep log file of every calculation performed
QCE.atomicSymbols = uti.changeIsotopeSymbols(atomlist)
QCE.atomicNumbers = atomnums
IG = I.get('gamess',None) # IG -> $gamess input section, if any
if IG:
QCE.init_input_options( IG )
if I['system']['qchemistry'] == ['molcas']:
import CDTK.Interfaces.MolcasInterface as mol
QCE = mol.molcas()
QCE.is_storeinp = is_keepinp # Keep inp file of every calculation performed
QCE.is_storelog = is_keeplog # Keep log file of every calculation performed
QCE.atomicSymbols = uti.changeIsotopeSymbols(atomlist)
QCE.atomicNumbers = atomnums
IM = I.get('molcas',None) # IM -> $molcas input section, if any
if IM:
QCE.init_input_options( IM )
if I['system']['qchemistry'] == ['analytic']:
import CDTK.Interfaces.AnalyticInterface as aint
QCE = aint.analytic()
QCE.is_storeinp = is_keepinp # Keep inp file of every calculation performed
QCE.is_storelog = is_keeplog # Keep log file of every calculation performed
QCE.atomicSymbols = uti.changeIsotopeSymbols(atomlist)
QCE.atomicNumbers = atomnums
IA = I.get('analytic',None) # IA -> $analytic input section, if any
if IA:
QCE.init_input_options( IA )
avec = IA.get('a', None)
if avec != None:
QCE.a = np.array(avec, dtype=float)
# for XMolecule as a quantum chemistry tool
if I['system']['qchemistry'] == ['xmolecule']:
import CDTK.Interfaces.XMoleculeInterface as xmim
QCE = xmim.xmolecule()
QCE.is_storeinp = is_keepinp # Keep inp file of every calculation performed
QCE.is_storelog = is_keeplog # Keep log file of every calculation performed
QCE.atomicSymbols = uti.changeIsotopeSymbols(atomlist)
QCE.atomicNumbers = atomnums
QCE.atomicMasses = np.array(atommass3,float)*cv.am2au
IX = I.get('xmolecule',None) # IG -> $xmolecule input section, if any
if IX:
# also give position to prepare the initialization
QCE.init_input_options( IX, atomlist, xcart)
else:
print("Add good error message - RW TODO")
exit(-1)
if I['system']['qchemistry'] == ['embedding']:
import CDTK.Interfaces.EmbeddingInterface as emb
IM = I.get('embedding',None)
IM['quantum_options'] = (I.get(IM['quantum'][0]))
QCE = emb.embedding(atomlist, atomnums, atommass3, xcart, **IM)
# --------------------------------------------------------------------------
# Attach quantum chemistry engine to general wrapper interface
# --------------------------------------------------------------------------
W = win.wrapperInterface()
W.QCE = QCE
if I['sampling']['type'][0] == 'boltzmann':
energy = W.f_E(xcart)
## TODO RW: this needs to be adapted to work with RPMD
if I.get('rpmd', None) != None:
print("Boltzmann sampling not implemented for RPMD - please go adhead and code it!!")
exit(-1)
# desired acceptance ratio
accept = float( I['sampling'].get('accept', [-1])[0] )
# initial step
step = float( I['sampling'].get('step', [0.01])[0] )
# temperature
temperature = float( I['sampling'].get('temperature', [300.0])[0] )
sampler = ico.IniConSampler(xcart, W.f_E, T=temperature, M=m, NSAMPLES=nobtain, RefEnergy=energy,stepnorm=step)
# if a reasonable acceptance ratio is given then run burn-in with 5000 steps
# TODO RW: add flexibility on steps used for burnin
if accept > 0.0 and accept < 1.0:
sampler.burnin(steps=5000, targetRate=accept)
ncol = sampler._log['ncol']
ntot = sampler._log['ntot']
nabs = sampler._log['NABS']
ratio = sampler._log['ratio']
sys.stdout.write('After Burnin ======= \n')
sys.stdout.write('N Collected: %i\n' % (ncol,))
sys.stdout.write('N Calls (collection): %i\n' % (ntot,))
sys.stdout.write('N Calls (absolute): %i\n' % (nabs,))
sys.stdout.write('Acceptance ratio: %6.3f\n' % (ratio,))
sys.stdout.write('Step size: %6.3f\n' % (sampler._optimalStepNorm,))
sys.stdout.write('After Burnin ======= \n')
# Use the sampler to obtain positions and velocities distributed according to exp(-beta H)
X,V = sampler.sampler_boltzmann()
ncol = sampler._log['ncol']
ntot = sampler._log['ntot']
nabs = sampler._log['NABS']
ratio = sampler._log['ratio']
# obtain energies and histogram them
energies = sampler._log['energy']
hist, bin_edges = np.histogram(energies[::nstride], bins=500, density=True)
sys.stdout.write('N Collected: %i\n' % (ncol,))
sys.stdout.write('N Calls (collection): %i\n' % (ntot,))
sys.stdout.write('N Calls (absolute): %i\n' % (nabs,))
sys.stdout.write('Acceptance ratio: %6.3f\n' % (ratio,))
sys.stdout.write('Step size: %6.3f\n' % (sampler._optimalStepNorm,))
f = open('energies', 'w')
for i, val in enumerate(hi1st):
v = (bin_edges[i] + bin_edges[i+1]) / 2.0
f.write(str(v) + ' ' + str(val) + '\n')
f.close()
# # testing with histogramming!
# V = np.array(V)
# X = np.array(X)
# vel = []
# pos = []
# for i in range(nsample):
# vel.append(V[i, 0])
# pos.append(X[i, 0])
# hist, bin_edges = np.histogram(vel, bins=200, density=True)
# f = open('vel', 'w')
# for i, val in enumerate(hist):
# v = (bin_edges[i] + bin_edges[i+1]) / 2.0
# f.write(str(v) + ' ' + str(val) + '\n')
# f.close()
# hist, bin_edges = np.histogram(pos, bins=200, density=True)
# f = open('pos', 'w')
# for i, val in enumerate(hist):
# v = (bin_edges[i] + bin_edges[i+1]) / 2.0
# f.write(str(v) + ' ' + str(val) + '\n')
# f.close()
elif I['sampling']['type'][0] == 'thermostat':
# temperature
temperature = float( I['sampling'].get('temperature', [300.0])[0] )
# timestep for trajectories
dt = cv.numval( I['sampling'].get('dt', ['1.0', 'fs']), 'time' )
# timestep for sampling
st = cv.numval( I['sampling'].get('st', ['1.0', 'fs']), 'time' )
total_t = st * nsample
ts_type = I['thermostat'].get('type', 'massive-andersen')[0]
# number of beads
nbeads = [int(n) for n in I.get('rpmd', {'beads': ['1']}).get('beads', ['1'])]
# set up thermostatt
thermostat = ts.Thermostat(type=ts_type, temp=temperature, nb = nbeads, dt=st)
thermostat.M = m
# W.setStorage(max(nbeads))
## create trajectory object
try:
rseed = int( I['system']['rseed'][0] )
except:
rseed = None
try:
rcom = I['sampling']['rcom'][0].lower() == 'yes'.lower()
except:
rcom = False
try:
rrot = I['sampling']['rrot'][0].lower() == 'yes'.lower()
except:
rrot = False
trajectory = trj.Trajectory(3*nat)
trajectory.rseed = rseed
trajectory.DIR = os.getcwd()
trajectory.R = xcart.flatten()
trajectory.V = vcart.flatten()
trajectory.M = np.array(atommass3,float)*cv.am2au
trajectory.dt = dt
trajectory.f_EGrad = W.f_EGrad_GS
trajectory.rcom = rcom
trajectory.rrot = rrot
trajectory.thermostat = thermostat
# trajectory._init_rpmd(1.0 / (cv.FPC_K_B_EV * cv.ev2au * temperature) , nbeads)
if I['system']['qchemistry'] == ['embedding']:
cs = constraints.Constraints(f_constraint = QCE.getConstraints)
trajectory.const = cs
## propagate and save
X = []
V = []
X_rp = []
V_rp = []
energies = []
energiesC = []
energiesET = []
simtime = st
modn = nobtain//10
if modn < 1:
modn = 1
for i in range(nobtain):
trajectory.integrate(simtime, log=False) # dt step
simtime += st
X.append(trajectory.R.copy())
V.append(trajectory.V.copy())
if I.get('rpmd', None) is not None:
X_rp.append([rrp.copy() for rrp in trajectory.R_rp])
V_rp.append([vrp.copy() for vrp in trajectory.V_rp])
energies.append(trajectory.getEnergy() / trajectory.nmax)
energiesC.append(trajectory.getCentroidEnergy())
prefac = trajectory.nmax * trajectory.M[0] / (2.0 * np.pi * trajectory.beta)
et = 1.0 # np.power(prefac, trajectory.NDof*trajectory.nmax/2.0)
# print trajectory.M
# print trajectory.nmax
# print trajectory.beta
# print et
# print trajectory.nmax/(2.0*trajectory.beta)
# print trajectory.spring_rp()
# print trajectory.pot_rp()
# print X[-1]
# exit(-1)
energiesET.append(et*(trajectory.NDof*trajectory.nmax/(2.0*trajectory.beta) + trajectory.spring_rp()+trajectory.pot_rp())/ trajectory.nmax)
if i%modn == 0:
print("Sampled ", i, " of ", nsample)
try:
hist, bin_edges = np.histogram(energies[::nstride], bins=500, density=False)
histC, bin_edgesC = np.histogram(energiesC[::nstride], bins=500, density=False)
histET, bin_edgesET = np.histogram(energiesET[::nstride], bins=500, density=False)
f = open('energies', 'w')
for i, val in enumerate(hist):
v = (bin_edges[i] + bin_edges[i+1]) / 2.0
f.write(str(v) + ' ' + str(val) + '\n')
f.close()
eav = np.average(energies)
e2av = np.average(np.array(energies)*np.array(energies))
print("<E_rp> = ", eav)
print("<E_rp^2> = ", e2av)
print("\Delta E_rp = \sqrt(<E_rp^2> - <E_rp>^2) = ", np.sqrt(e2av - eav**2))
f = open('energiesC', 'w')
for i, val in enumerate(histC):
v = (bin_edgesC[i] + bin_edgesC[i+1]) / 2.0
f.write(str(v) + ' ' + str(val) + '\n')
f.close()
eav = np.average(energiesC)
e2av = np.average(np.array(energiesC)*np.array(energiesC))
print("<E_c> = ", eav)
print("<E_c^2> = ", e2av)
print("\Delta E_c = \sqrt(<E_c^2> - <E_c>^2) = ", np.sqrt(e2av - eav**2))
f = open('energiesET', 'w')
for i, val in enumerate(histET):
v = (bin_edgesET[i] + bin_edgesET[i+1]) / 2.0
f.write(str(v) + ' ' + str(val) + '\n')
f.close()
eav = np.average(energiesET)
e2av = np.average(np.array(energiesET)*np.array(energiesET))
print("<E_et> = ", eav)
print("<E_et^2> = ", e2av)
print("\Delta E_et = \sqrt(<E_et^2> - <E_et>^2) = ", np.sqrt(e2av - eav**2))
except:
f = open('energies', 'w')
for i, val in enumerate(energies):
f.write(str(i) + ' ' + str(val) + '\n')
f.close()
f = open('energiesC', 'w')
for i, val in enumerate(energiesC):
f.write(str(i) + ' ' + str(val) + '\n')
f.close()
elif I['sampling']['type'][0] == 'wignerNEU':
## TODO RW: this needs to be adapted to work with RPMD
if I.get('rpmd', None) != None:
print("Wigner sampling not implemented for RPMD - please go adhead and code it!!")
exit(-1)
# Neither Hessian nor normal modes given as input
# optimize geometry
if hessian is None and nmfreq is None:
#from scipy.optimize import minimize
#minimum = minimize(W.f_E, xcart)
#xcart = np.array(minimum.x)
#print minimum
#print " "
# calculate hessian
import CDTK.Tools.NumericalDerivatives as ND
hessian = ND.hessian(W.f_E, xcart)
print(hessian)
print("----")
##### TODO RW: MISSING ROTATIONAL SAMPLING!
energy = W.f_E(xcart)
# desired acceptance ratio
accept = float( I['sampling'].get('accept', [-1])[0] )
# initial step
step = [float(i) for i in I['sampling'].get('step', [0.01, 1.0])]
# temperature
temperature = float( I['sampling'].get('temperature', [0.0])[0] )
# modes
modes = [int(i) for i in I['sampling'].get('modes', list(range(len(xcart))) )]
if temperature < 1e-6:
dist='wigner0K'
temperature = 0.0
else:
dist='wigner'
# create sampler object
sampler = ico.IniConSampler(xcart, W.f_E, T=temperature, M=m, DIST=dist, NSAMPLES=nobtain, RefEnergy=energy,stepnorm=step,qnum=quantum_numbers, RefGeom=xcart,modes=modes)
# if a reasonable acceptance ratio is given then run burn-in with 5000 steps
# TODO RW: add flexibility on steps used for burnin
if accept > 0.0 and accept < 1.0:
sampler.burnin(steps=5000, targetRate=accept, hessian=hessian, is_linear=is_linear,modes=modes)
ncol = sampler._log['ncol']
ntot = sampler._log['ntot']
nabs = sampler._log['NABS']
ratio = sampler._log['ratio']
sys.stdout.write('After Burnin =======')
sys.stdout.write('N Collected: %i\n' % (ncol,))
sys.stdout.write('N Calls (collection): %i\n' % (ntot,))
sys.stdout.write('N Calls (absolute): %i\n' % (nabs,))
sys.stdout.write('Acceptance ratio: %6.3f\n' % (ratio,))
try:
sys.stdout.write('Step size: %6.3f\n' % (sampler._optimalStepNorm,))
except TypeError:
sys.stdout.write('Step size: %8.5f %8.5f\n' % (sampler._optimalStepNorm[0],sampler._optimalStepNorm[1],))
sys.stdout.write('After Burnin =======')
sampler._optimalStepNorm = step
# Use the sampler to obtain positions and velocities distributed according to exp(-beta H)
X,V = sampler.sampler_boltzmann(hessian,is_linear,modes)
ncol = sampler._log['ncol']
ntot = sampler._log['ntot']
nabs = sampler._log['NABS']
ratio = sampler._log['ratio']
sys.stdout.write('N Collected: %i\n' % (ncol,))
sys.stdout.write('N Calls (collection): %i\n' % (ntot,))
sys.stdout.write('N Calls (absolute): %i\n' % (nabs,))
sys.stdout.write('Acceptance ratio: %6.3f\n' % (ratio,))
try:
sys.stdout.write('Step size: %6.3f\n' % (sampler._optimalStepNorm,))
except TypeError:
sys.stdout.write('Step size: %8.5f %8.5f\n' % (sampler._optimalStepNorm[0],sampler._optimalStepNorm[1],))
# obtain energies and histogram them
energies = sampler._log['energy']
try:
hist, bin_edges = np.histogram(energies[::nstride], bins=500, density=False)
# For wigner the energies are in the full potential, not in the Harmonic appximation used in the sampling
f = open('energies', 'w')
for i, val in enumerate(hist):
v = (bin_edges[i] + bin_edges[i+1]) / 2.0
f.write(str(v) + ' ' + str(val) + '\n')
f.close()
eav = np.average(energies)
e2av = np.average(np.array(energies)*np.array(energies))
print("<E> = ", eav)
print("<E^2> = ", e2av)
print("\Delta E = \sqrt(<E^2> - <E>^2) = ", np.sqrt(e2av - eav**2))
except:
f = open('energies', 'w')
for i, val in enumerate(energies):
f.write(str(i) + ' ' + str(val) + '\n')
f.close()
elif I['sampling']['type'][0] == 'wignerNP' or I['sampling']['type'][0] == 'wignerHarmonic':
# New simpler implementation of Wigner sampling adapted from analysis branch.
start_time = time.time()
# Get modes to be sampled from input.
# Possible for 'mode' in input file are: 'linear', 'nonlinear' or a list of integers representing the mode indices.
# If nothing is given, all modes are sampled.
modes = I['sampling'].get('modes') # Here 'modes' is stored as a list of strings internally.
# 'm' here is mass array for each dof.
if modes is None or modes == [] or len(m) < 6:
modelist = list(range(len(m)))
elif modes[0] == 'linear':
modelist = list(range(5, len(m)))
elif modes[0] == 'nonlinear':
modelist = list(range(6, len(m)))
else:
modelist = [int(i) for i in modes]
assert(len(modelist) > 0), "No modes selected for sampling!"
# Get temperature from input if given, if not set to 0 K.
temperature = float( I['sampling'].get('temperature', [0.0])[0] )
if temperature < 1e-6:
dist='wigner0K'
temperature = 0.0
else:
dist='wigner'
print("Starting Wigner sampling.")
## TODO RW: this needs to be adapted to work with RPMD
if I.get('rpmd', None) is not None:
print("Wigner sampling not implemented for RPMD - please go adhead and code it!!")
exit(-1)
# Neither Hessian nor normal modes given as input
# optimize geometry
if hessian is None and nmfreq is None:
#from scipy.optimize import minimize
#minimum = minimize(W.f_E, xcart)
#xcart = np.array(minimum.x)
#print minimum
#print " "
# calculate hessian
import CDTK.Tools.NumericalDerivatives as ND
hessian = ND.hessian(W.f_E, xcart)
print("Hessian calculation done in {: .2f} s.".format(time.time() - start_time))
w,L = nmo.normal_modes_from_HessianCart(hessian,m)
Lx,mu = nmo.q_mw_to_x(m,L)
print("\n Wigner sampling:")
#print("\n frequencies: ", w)
#print("\n frequencies: in cm^-1", w/cv.ic2au)
print("\n All modes:\n")
for i in range(w.shape[0]):
print("Mode %d Frequency %e a.u. = %f cm-1" % (i,w[i],w[i]/cv.ic2au))
print("\n Selected modes: ", modelist)
w_normalModes = np.array(w[modelist], dtype=float)
M_normalModes = np.array(mu[modelist], dtype=float)
x_normalModes = np.array(Lx[modelist], dtype=float)
X = []
V = []
# X_nm = []
# V_nm = []
n = len(w_normalModes)
if opts.rDisplacement or opts.vDisplacement is not None:
if opts.rDisplacement is not None:
opts.rDisplacement= eval(opts.rDisplacement)
for iN in range(n):
f = np.zeros(n)
f[iN] = 1
for ix in opts.rDisplacement:
print("mode %d pos - displacement %d\n" % (modelist[iN],ix))
xx = ix * np.sqrt(1.0 / (2.0*f*w_normalModes*M_normalModes))
pp = np.zeros(xx.shape)
X.append(np.array(xcart) + np.dot(x_normalModes.transpose(), xx))
V.append(np.dot(x_normalModes.transpose(), pp)/ m)
if opts.vDisplacement is not None:
opts.vDisplacement= eval(opts.vDisplacement)
for iN in range(n):
f = np.zeros(n)
f[iN] = 1
for iv in opts.vDisplacement:
print("mode %d vel - displacement %d\n" % (modelist[iN],iv))
xx = 0.
pp = iv * np.sqrt((w_normalModes*M_normalModes)/ (2.0*f))
X.append(np.array(xcart) + np.dot(x_normalModes.transpose(), xx))
V.append(np.dot(x_normalModes.transpose(), pp)/ m)
else:
for i in range(nobtain):
if dist == 'wigner0K':
f = np.ones(n)
elif dist == 'wigner':
KBAU = cv.FPC_K_B_EV * cv.ev2au # Boltzmann constant in au.
beta = 1.0 / (temperature * KBAU)
f = np.tanh(beta*w_normalModes / 2.0)
else:
print("Unknown Dist.")
exit(-1)
xx = np.random.normal(np.zeros(n), np.sqrt(1.0 / (2.0*f*w_normalModes*M_normalModes)))
pp = np.random.normal(np.zeros(n), np.sqrt((w_normalModes*M_normalModes)/ (2.0*f)))
# X_nm.append(xx)
# V_nm.append(pp)
X.append(np.array(xcart) + np.dot(x_normalModes.transpose(), xx))
V.append(np.dot(x_normalModes.transpose(), pp)/ m)
print("Wigner sampling done in {: .2f} s.".format(time.time() - start_time))
print("Analysis:")
print(" Mean geometry - start geometry")
print(np.array(X).mean(axis=0) - xcart)
print(" Mean velocity")
print(np.array(V).mean(axis=0))
P = np.array(V) * np.array(m)
print(" Mean momentum")
print(P.mean(axis=0))
print(" Stdev momentum")
print(P.std(axis=0))
cov = np.cov((np.array(X)*np.sqrt(m[np.newaxis,:])).T)
evalues, evec = np.linalg.eigh(cov)
print(" Frequencies obtained from covariance of positions ")
for i,ev in enumerate(evalues[::-1]):
if ev < 1e-5:
continue
print("Mode %d Frequency %e a.u. = %f cm-1" % (
i, 1./(2*ev), 1./(2*ev)/cv.ic2au))
print((evec[:,evec.shape[1]-i-1]/np.sqrt(m)).reshape((-1,3)))
cov = np.cov((np.array(V)*np.sqrt(m[np.newaxis,:])).T)
evalues, evec = np.linalg.eigh(cov)
print(" Frequencies obtained from covariance of velocities ")
for i,ev in enumerate(evalues):
if ev < 1e-5:
continue
print("Mode %d Frequency %e a.u. = %f cm-1" % (
i, (2*ev), (2*ev)/cv.ic2au))
print((evec[:,i]/np.sqrt(m)).reshape((-1,3)))
# Sample for monte carlo electron dynamics
elif I['sampling']['type'][0] == 'monte-carlo':
X = []
V = []
for n in range(nsample):
X.append(xcart)
V.append(vcart)
# Sample for simple ring-polymer surface hopping codes
elif I['sampling']['type'][0] == 'rpsh':
X = []
V = []
X_rp = []
V_rp = []
# width of position distribution
isigma_x = [float(val) for val in I['sampling'].get('sigma_x', [0.0])]
if len(isigma_x) == 1:
sigma_x = np.array(isigma_x*3*nat)
else:
sigma_x = np.array(isigma_x)
# width of velocity distribution
isigma_v = [float(val) for val in I['sampling'].get('sigma_v', [0.0])]
if len(isigma_v) == 1:
sigma_v = np.array(isigma_v*3*nat)
else:
sigma_v = np.array(isigma_v)
# get bead number
nbeads = [int(n) for n in I.get('rpmd', {'beads': ['1']}).get('beads', ['1'])]
if len(nbeads) == 1:
nb = np.array(nbeads*3*nat)
else:
nb = np.array(nbeads)
# get beta
beta = I.get('rpmd', {'beta': ['1.0']}).get('beta', ['1.0'])[0]
# normal modes or beads
mode = I['sampling'].get('mode', ['beads'])[0].lower()
if mode != 'beads' and mode != 'nm':
print("Unknown mode! Please implement!")
exit(-1)
if mode == 'nm':
trajectory = trj.Trajectory(3*nat)
trajectory.R = xcart.flatten()
trajectory.V = vcart.flatten()
trajectory.M = np.array(atommass3,float)*cv.am2au
for n in range(nsample):
# sample bead positions and momenta
xx_rp = []
vv_rp = []
for i in range(3*nat):
if sigma_x[i] == 0:
xx_rp.append(np.array([xcart[i]]*nb[i]))
else:
# sample bead positions to give gaussian distribution
if mode == 'beads':
xx_rp.append(np.random.normal(xcart[i], sigma_x[i], nb[i]))
## TODO sample normal modes to give correct gaussian distribution - needs to be implemented
elif mode == 'nm':
trajectory._init_rpmd(beta, nb, w0 = sigma_x)
xx_rp.append(trajectory.R_rp[i].copy())
if sigma_v[i] == 0:
vv_rp.append(np.array([vcart[i]]*nb[i]))
else:
if mode == 'beads':
vv_rp.append(np.random.normal(vcart[i], sigma_v[i], nb[i]))
## TODO sample normal modes to give correct gaussian distribution - needs to be implemented
elif mode == 'nm':
trajectory._init_rpmd(beta, nb)
xx_rp.append(trajectory.V_rp[i].copy())
X_rp.append([rrp.copy() for rrp in xx_rp])
V_rp.append([vrp.copy() for vrp in vv_rp])
# calculate means
xx = np.zeros(3*nat)
vv = np.zeros(3*nat)
for i in range(3*nat):
xx[i] = np.sum(xx_rp[i], axis=0) /float(nb[i])
vv[i] = np.sum(vv_rp[i], axis=0) /float(nb[i])
X.append(xx.copy())
V.append(vv.copy())
else:
raise ValueError('No sampling method specified')
if opts.rho == True:
n = len(X[0])
XX = np.array(X)
VV = np.array(V)
for j in range(n):
hist, bin_edges = np.histogram(XX[::nstride][:,j], bins='auto', density=False)
f = open('pos_' + str(j), 'w')
for i, val in enumerate(hist):
v = (bin_edges[i] + bin_edges[i+1]) / 2.0
f.write(str(v) + ' ' + str(val) + '\n')
f.close()
hist, bin_edges = np.histogram(VV[::nstride][:,j], bins='auto', density=False)
f = open('vel_' + str(j), 'w')
for i, val in enumerate(hist):
v = (bin_edges[i] + bin_edges[i+1]) / 2.0
f.write(str(v) + ' ' + str(val) + '\n')
f.close()
# --------------------------------------------------------------------------
# Write down ensemble of initial conditions to the corresponding directories
# --------------------------------------------------------------------------
if opts.noDirec == True:
try:
outfileX = open('x_rp', 'w')
for i,x_rp in enumerate(X_rp[::nstride]):
outfileX.write(str(i) + ' ')
for item in x_rp:
for it in item:
outfileX.write(str(it)+' ')
outfileX.write('\n')
outfileX.close()
outfileV = open('v_rp', 'w')
for i,v_rp in enumerate(V_rp[::nstride]):
outfileV.write(str(i) + ' ')
for item in v_rp:
for it in item:
outfileV.write(str(it)+' ')
outfileV.write('\n')
outfileV.close()
except:
print("Ring-polymer stuff not sampled/written!")
outfileX = open('x', 'w')
for i,x in enumerate(X[::nstride]):
outfileX.write(str(i) + ' ')
for item in x:
outfileX.write(str(item)+' ')
outfileX.write('\n')
outfileX.close()
# print np.array(X[::nstride])[:,0]
hist, bin_edges = np.histogram(np.array(X[::nstride])[:,0], bins='auto', density=False)
f = open('x_hist', 'w')
for i, val in enumerate(hist):
v = (bin_edges[i] + bin_edges[i+1]) / 2.0
f.write(str(v) + ' ' + str(val) + '\n')
f.close()
hist, bin_edges = np.histogram(np.array(V[::nstride])[:,0], bins='auto', density=False)
f = open('v_hist', 'w')
for i, val in enumerate(hist):
v = (bin_edges[i] + bin_edges[i+1]) / 2.0
f.write(str(v) + ' ' + str(val) + '\n')
f.close()
outfileV = open('v', 'w')
for i,v in enumerate(V[::nstride]):
outfileV.write(str(i) + ' ')
for item in v:
outfileV.write(str(item)+' ')
outfileV.write('\n')
outfileV.close()
exit(0)
for i,x in enumerate(X[::nstride]):
v = V[i*nstride]
dirname = 'trj_'+'%07i' % (i+nshift,)
if not os.path.exists(dirname):
os.mkdir(dirname)
shutil.copy('atomlist',dirname+'/atomlist')
shutil.copy('atomnums',dirname+'/atomnums')
shutil.copy('atommass',dirname+'/atommass')
uti.listToFile(x,dirname+'/atompos')
uti.listToFile(v,dirname+'/atomvel')
if os.path.exists('./topol'):
shutil.copy('topol',dirname+'/topol')
try:
for i,x_rp in enumerate(X_rp[::nstride]):
v_rp = V_rp[i*nstride]
dirname = 'trj_'+'%07i' % (i+nshift,)
if not os.path.exists(dirname):
os.mkdir(dirname)
uti.doublelistToFile(x_rp,dirname+'/atompos_rp')
uti.doublelistToFile(v_rp,dirname+'/atomvel_rp')
except:
print("Ring-polymer stuff not sampled/written!")
os.chdir(currentDir)
if __name__ == "__main__":
start()