#* **************************************************************************
#*
#* CDTK, Chemical Dynamics Toolkit
#* A modular system for chemical dynamics applications and more
#*
#* Copyright (C) 2011, 2012, 2013, 2014, 2015, 2016
#* Oriol Vendrell, DESY, <oriol.vendrell@desy.de>
#*
#* Copyright (C) 2017, 2018, 2019
#* Ralph Welsch, DESY, <ralph.welsch@desy.de>
#*
#* Copyright (C) 2020, 2021, 2022, 2023
#* Ludger Inhester, DESY, ludger.inhester@cfel.de
#*
#* This file is part of CDTK.
#*
#* CDTK is free software: you can redistribute it and/or modify
#* it under the terms of the GNU General Public License as published by
#* the Free Software Foundation, either version 3 of the License, or
#* (at your option) any later version.
#*
#* This program is distributed in the hope that it will be useful,
#* but WITHOUT ANY WARRANTY; without even the implied warranty of
#* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#* GNU General Public License for more details.
#*
#* You should have received a copy of the GNU General Public License
#* along with this program. If not, see <http://www.gnu.org/licenses/>.
#*
#* **************************************************************************
"""
Some miscellaneous tools
"""
import os
import uuid
import re
import datetime
import numpy as np
import CDTK.Tools.Conversion as cv
TINY = 1.0e-8
[docs]
def saveDataStruct(fname,obj):
"""
Store the string representation of a data structure to a file
The repr() function is used, this function is intended only for
rather simple data structures made for example of nested lists
and dictionaries.
obj -- object to store
fname -- path to the file where to store the string
"""
string = repr(obj)
try:
f = open(fname,'w')
f.write(string)
finally:
f.close()
[docs]
def loadDataStruct(fname):
"""
Load a data structure from a file
The data structure must have been saved as a string, for example by
saveDataStruct
fname -- path to the file with the data structure
"""
f=open(fname,'r')
string = f.read()
string = 'newobject='+string
exec(string)
f.close()
return newobject
[docs]
def readFile(path):
"""
return a string with the contents of a file
path - path to the file to be read
"""
f = open(path,'r')
s = f.read()
f.close()
return s
[docs]
def fileLineParser(path,regexp,functions=[]):
"""
Parse a file line by line using a regexp to find matching values
path -- path to the file being parsed
regexp -- a valid regexp with one or more parenthesis to catch matching
values
functions -- list of transformation functions to be applied to the matching
strings, e.g. "float"
if provided should have as many elements as the number of
elements to be matched from every matching line
output:
List of lists, one list per matching line. Empty list if no matches.
"""
try:
fpath = open(path,'r')
except:
raise ValueError(path+' file could not be opened')
matchtable = []
for line in fpath.readlines():
match = re.search(regexp,line)
if match:
matchlist = match.groups()
if functions:
trafolist = []
for i,strvalue in enumerate(matchlist):
trafolist.append( functions[i](strvalue) )
matchtable.append( trafolist )
else:
matchtable.append( matchlist)
fpath.close()
return matchtable
################################################################
# Interaction with block-data files
################################################################
[docs]
def readTableFile(fname,**opts):
"""
read a file with data organized in table form and return a l[i,j] list
fname -- path to file to be read or file object
Options:
mode -- text string: "rows" (default) or "cols"
- in "rows" mode the first index runs over the rows of the table
- in "cols" mode the first index runs over the columns of the table
skip -- integer, dafault 0. New row read every <skip> rows.
"""
if isinstance(fname,str):
ftable = open(fname,'r')
elif isinstance(fname,file):
ftable = fname
else:
raise ValueError('readTableFile: wrong input type')
mode = opts.get('mode','rows')
skip = opts.get('skip',0)
table = []
count = skip
for l in ftable:
if not l.strip(): continue # ignore blank lines
if l[0] == '#': continue # ignore comments
if count < skip: # skip rows
count = count + 1
continue
ls = l.split()
ln = []
for num in ls:
ln.append(float(num))
table.append(ln)
count = 0
ftable.close()
table = np.array(table)
if mode == 'rows':
return table
elif mode == 'cols':
return table.transpose()
else:
raise ValueError('readTableFile: mode must be either "rows" or "cols"')
[docs]
def readAuto(fname,**opts):
"""
Read an autocorrelation file into a complex array
fname -- path to file to be read or file object
Options:
skip -- integer, dafault 0. New row read every <skip> rows.
"""
skip = opts.get('skip',0)
tmax = opts.get('tmax',9.0e+9)
a = readTableFile(fname,mode='cols',skip=skip)
time = np.array(a[0])
auto = []
for i in range(len(a[0])):
re = a[1][i]
im = a[2][i]
auto.append(complex(re,im))
auto = np.array(auto)
rtime = []
rauto = []
for i,t in enumerate(time):
if t < tmax:
rtime.append(t)
rauto.append(auto[i])
return np.array(rtime),np.array(rauto)
[docs]
def readColumnFile(fname,col_ix=0):
l = []
with open(fname) as f:
for line in f:
l.append(line.split()[col_ix])
f.close()
return l
[docs]
def getNextTime(finput):
"""
Return next time in data file
Input:
finput - file object with the gpop file
Returns:
time - time (float) or None if EOF is reached
"""
line = finput.readline()
while 1:
if line == '' or line == ' ': return None
if line[0] == '#':
return float(line.split()[1])
line = finput.readline()
[docs]
def getPopTable(finput,dof):
"""
Return the next grid population in table form
Input:
finput - file object with the gpop file
dof - degree of freedom to look for
Returns:
found - True/False, whether still data to be returned
table - list of lists with x,y data
"""
table = []
while 1:
line = finput.readline()
if line == '':
raise ValueError('EOF reached reading gpop file')
if line[0] == '#':
raise ValueError('Next block reached reading gpop file')
if line == '\n' or line == ' \n':
continue
if line.find('.') < 0: # head of DOF block
if int(line.split()[0]) == dof: # read block
pdim = int(line.split()[1])
for i in range(pdim):
line = finput.readline()
table.append(line)
return table
[docs]
def getDenTable(finput,dof,**args):
"""
Return the next grid reduced density in table form
Input:
finput - file object with the gden file
dof - degree of freedom to look for
Returns:
found - True/False, whether still data to be returned
table - list of lists with x,y,z data
"""
norm = args.get('norm',False)
while 1:
line = finput.readline()
if line == '':
raise ValueError('EOF reached reading gden file')
if line[0] == '#':
raise ValueError('Next block reached reading gden file')
if line == '\n' or line == ' \n':
continue
if line.find('.') < 0: # head of DOF block
if int(line.split()[0]) == dof: # read block
pdim = int(line.split()[1])
cmat = np.zeros((pdim,pdim),complex)
table = []
for i in range(pdim):
for j in range(pdim):
line = finput.readline()
sline = line.split()
table.append(sline[0]+' '+sline[1]+' ')
creal = float(line.split()[2])
cimag = float(line.split()[3])
cmat[i,j] = creal + cimag*1j
tableOut = []
s = 0
for i in range(pdim):
for j in range(pdim):
if cmat[i,i] < TINY or cmat[j,j] < TINY:
cnorm = 1.0
else:
cnorm = abs(cmat[i,j])/abs(cmat[i,i]*cmat[j,j])**0.5
c = cmat[i,j]
tline = table[s] + '%13.7f %13.7f %13.7f\n' % (c.real,c.imag,cnorm)
tableOut.append(tline)
s = s + 1
tableOut.append('\n')
return tableOut
[docs]
def getUniqueID(idlength=12):
return uuid.uuid4().hex[0:idlength]
[docs]
def getDateString():
CTIME = datetime.datetime.today()
return '%4i%02i%02i' % (CTIME.year,CTIME.month,CTIME.day)
[docs]
def listToFile(mylist,fname):
with open(fname,'w') as f:
for item in mylist:
f.write(str(item)+'\n')
f.close()
[docs]
def changeIsotopeSymbols(listin):
subst = { 'D' : 'H',
'C13' : 'C' }
listout = []
for s in listin:
listout.append( subst.get(s,s) )
return listout
[docs]
def traj2xyz(atomlist, geoms, filename='traj.xyz', comment='CDTK-generated traj file', timesteps=None):
""" Generate a trajectory in .xyz format Useful for plots with VMD
Args:
atomlist (list of strings): list of atom symbols
geoms (numpy array [step, atoms, 3 ]): geometries
filename (str, optional): output filename. Defaults to 'traj.xyz'.
comment (str, optional): comment line. Defaults to 'CDTK-generated traj file'.
timesteps (array of floats, optional): time step information. Defaults to None.
"""
with open(filename,'w') as f:
for step in range(geoms.shape[0]):
f.write(f'{len(atomlist)}\n')
if timesteps is None:
f.write(f'{comment} step={step}\n')
else:
f.write(f'{comment} step={step} time = {timesteps[step]:8.2f} au= {timesteps[step]*cv.au2fs:8.2f} fs\n')
for atom in range(geoms.shape[1]):
f.write(f'{atomlist[atom]:4s} {geoms[step,atom,0]*cv.au2an:15.6f} {geoms[step,atom,1]*cv.au2an:15.6f} {geoms[step,atom,2]*cv.au2an:15.6f}\n')
f.close()
[docs]
def traj2vtf(atomlist, Rlog, vtfname='traj', mmlist=None, disp_n_atoms=None):
"""
Generate a trajectory in .vtf format
Useful for plots with VMD
Point charges represented by "X"
Input arguments:
atomlist -- path to atomlist file
Rlog -- path to R.log file
vtfname -- vtf filename
mmlist -- path to MM region atomlist file
disp_n_atoms -- int; number of atoms to output (truncation for large systems)
"""
alist = []
count = 0
stride = 1
with open(atomlist,'r') as fatom:
s = fatom.readline()
while s:
alist.append( s[:-1] )
count += 1
if (disp_n_atoms is not None) and (count == disp_n_atoms):
break
s = fatom.readline()
fatom.close()
if mmlist is not None and os.path.exists(mmlist):
with open(mmlist,'r') as fatom:
s = fatom.readline()
while s:
alist.append( 'X' )
s = fatom.readline()
mmlist.close()
natoms = len(alist)
vtf = vtfname + '.vtf'
with open(vtf,'w') as fvtf:
fvtf.write('# VTF Trajectory\n')
fvtf.write('# atomlist at ' + atomlist + '\n')
fvtf.write('# R.log at ' + Rlog + '\n')
fvtf.write('# time step stride {t:d}\n'.format(t=stride))
fvtf.write('\n')
for i,s in enumerate(alist):
fvtf.write( 'atom %i radius 1.0 name %s\n' % (i,s) )
fvtf.write('\n')
with open(Rlog,'r') as fRlog:
l = fRlog.readline()
j = 0
while l:
if j%stride == 0:
fvtf.write('timestep\n')
ll = l.split()
t = ll.pop(0)
for i in range(natoms):
x = float( ll.pop(0) ) * cv.au2an
y = float( ll.pop(0) ) * cv.au2an
z = float( ll.pop(0) ) * cv.au2an
fvtf.write( '%12.7f %12.7f %12.7f\n' % (x,y,z) )
fvtf.write('\n')
l = fRlog.readline()
j += 1
fRlog.close()
fvtf.close()
[docs]
def atomicCoordinatesFromRlog(Rlog):
"""
Sort the entries of an Rlog file on a per atom basis
Rlog:
t0 X01 Y01 Z01 X02 Y02 Z02 ...... X0N Y0N Z0N
t1 X11 Y11 Z11 X12 Y12 Z12 ...... X1N Y1N Z1N
...
tf Xf1 Yf1 Zf1 Xf2 Yf2 Zf2 ...... XfN YfN ZfN
Returns as array
[ [ [X01 Y01 Z01 ], [X02 Y02 Z02], .... [X0N Y0N Z0N] ],
[ [X11 Y11 Z11 ], [X12 Y12 Z12], .... [X1N Y1N Z1N] ],
....
[ [Xf1 Yf1 Zf1 ], [Xf2 Yf2 Zf2], .... [XfN YfN ZfN] ]
]
"""
with open(Rlog,'r') as fRlog:
xyz = []
l = fRlog.readline()
natoms = len( l.split() )//3
while l:
a = list( map( float, l.split() ) ) # line to list
a.pop(0) # pop time entry
a = np.asarray(a)
a.shape = (natoms,3)
xyz.append(a)
l = fRlog.readline()
return np.asarray(xyz)
[docs]
def getTrajectoryTimeSteps(Rlog):
with open(Rlog,'r') as fRlog:
t = [ float( line.split().pop(0) ) for line in fRlog ]
return np.asarray(t)
[docs]
def getDataFromLog(log):
with open(log,'r') as logFile:
data = [ line.split() for line in logFile ]
return np.asarray(data)
[docs]
def getDirectoryList(folder='./', file_name=None):
""" Get trj_ subfolders in a given folder. If a file name is given, only
trj_ subfolders are returned that contain a file with that name. The
returned list is sorted.
Parameters
----------
folder : string, optional, default: './'
Folder to search for trj_ subfolders.
file_name : string, optional, default: None
If given, only trj_ subfolders are returned that contain a file with
that name.
Returns
-------
dirs : list of string
Sorted list of trj_ subfolders.
"""
allEntries = os.listdir(folder)
dirs = []
for entry in allEntries:
path = os.path.join(folder, entry)
if entry[0:4] == 'trj_' and os.path.isdir(path):
if file_name is None or os.path.isfile(os.path.join(path,
file_name)):
dirs.append(path)
dirs.sort()
return dirs
[docs]
def readRlogRestricted(rlog,col0=0,coln=1):
'''
Read log file and return array with vals starting at
col0, ending at col0 + coln
'''
r_xyz = []
with open(rlog,'r') as f:
for line in f:
vals = map(float, line.split()[col0:col0+coln])
r_xyz.append(vals)
return np.asarray(r_xyz)
################################################################
################################################################
# XML functions
################################################################
[docs]
def readTag(tag,string):
"""
In a string look for XML tag "tag" and return its content
tag - string containing the tag whose contents are to be retrieved
string - text string
returns the contents between <tag> and </tag> or None if <tag> is not
present.
"""
itag = '<'+tag+'>'
ftag = '</'+tag+'>'
if string.find(tag) == -1:
return None
istring = string.find(itag)
fstring = string.find(ftag) + len(ftag)
return string[istring:fstring]
[docs]
def readTagFromFile(tag,path):
"""
return a string with the contents between <tag> and </tag>
"""
s = readFile(path)
return readTag(tag,s)
#################################################################
# Array manipulation
#################################################################
[docs]
def ReverseArray(array):
"""
Reverse array.
Parameters:
-----------
array : 1D array_like
array to be reversed
Returns
-------
Reversed array
"""
reverse_array = np.zeros(len(array))
for i, x in enumerate(array):
reverse_array[-i-1] = x
return reverse_array
[docs]
def ShiftArray(array, shift, **option):
"""
Shift array.
Parameters:
-----------
array : 1D array_like
array to be shifted
shift : int
amount of shifting,
positive integer will shift the array to the right,
negative integer will shift the array to the left.
periodic : Bool, optional, default: True
if True, use the periodic boudary condition on the array
edge (the right most array element which is right shifted
will appear on the left most, and vice versa). If False,
the new array element which enter the array as the right
most or the left most elements will be set 0.0.
Returns:
--------
Shifted array
"""
periodic = option.get('periodic', True)
N = len(array)
shifted_array = np.zeros(N)
if periodic:
for i in xrange(N):
shifted_array[(i + shift) % N] = array[i]
else:
for i in xrange(N):
l = i + shift
if l < 0 or l >= N:
shifted_array[l % N] = 0.0
else:
shifted_array[l] = array[i]
return shifted_array