Source code for CDTK.Tools.Utils

#*  **************************************************************************
#*
#*  CDTK, Chemical Dynamics Toolkit
#*  A modular system for chemical dynamics applications and more
#*
#*  Copyright (C) 2011, 2012, 2013, 2014, 2015, 2016
#*  Oriol Vendrell, DESY, <oriol.vendrell@desy.de>
#*
#*  Copyright (C) 2017, 2018, 2019
#*  Ralph Welsch, DESY, <ralph.welsch@desy.de>
#*
#*  Copyright (C) 2020, 2021, 2022, 2023
#*  Ludger Inhester, DESY, ludger.inhester@cfel.de
#*
#*  This file is part of CDTK.
#*
#*  CDTK is free software: you can redistribute it and/or modify
#*  it under the terms of the GNU General Public License as published by
#*  the Free Software Foundation, either version 3 of the License, or
#*  (at your option) any later version.
#*
#*  This program is distributed in the hope that it will be useful,
#*  but WITHOUT ANY WARRANTY; without even the implied warranty of
#*  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#*  GNU General Public License for more details.
#*
#*  You should have received a copy of the GNU General Public License
#*  along with this program.  If not, see <http://www.gnu.org/licenses/>.
#*
#*  **************************************************************************

"""
Some miscellaneous tools
"""

import os
import uuid
import re
import datetime

import numpy as np

import CDTK.Tools.Conversion as cv

TINY = 1.0e-8


[docs] def saveDataStruct(fname,obj): """ Store the string representation of a data structure to a file The repr() function is used, this function is intended only for rather simple data structures made for example of nested lists and dictionaries. obj -- object to store fname -- path to the file where to store the string """ string = repr(obj) try: f = open(fname,'w') f.write(string) finally: f.close()
[docs] def loadDataStruct(fname): """ Load a data structure from a file The data structure must have been saved as a string, for example by saveDataStruct fname -- path to the file with the data structure """ f=open(fname,'r') string = f.read() string = 'newobject='+string exec(string) f.close() return newobject
[docs] def readFile(path): """ return a string with the contents of a file path - path to the file to be read """ f = open(path,'r') s = f.read() f.close() return s
[docs] def fileLineParser(path,regexp,functions=[]): """ Parse a file line by line using a regexp to find matching values path -- path to the file being parsed regexp -- a valid regexp with one or more parenthesis to catch matching values functions -- list of transformation functions to be applied to the matching strings, e.g. "float" if provided should have as many elements as the number of elements to be matched from every matching line output: List of lists, one list per matching line. Empty list if no matches. """ try: fpath = open(path,'r') except: raise ValueError(path+' file could not be opened') matchtable = [] for line in fpath.readlines(): match = re.search(regexp,line) if match: matchlist = match.groups() if functions: trafolist = [] for i,strvalue in enumerate(matchlist): trafolist.append( functions[i](strvalue) ) matchtable.append( trafolist ) else: matchtable.append( matchlist) fpath.close() return matchtable
################################################################ # Interaction with block-data files ################################################################
[docs] def readTableFile(fname,**opts): """ read a file with data organized in table form and return a l[i,j] list fname -- path to file to be read or file object Options: mode -- text string: "rows" (default) or "cols" - in "rows" mode the first index runs over the rows of the table - in "cols" mode the first index runs over the columns of the table skip -- integer, dafault 0. New row read every <skip> rows. """ if isinstance(fname,str): ftable = open(fname,'r') elif isinstance(fname,file): ftable = fname else: raise ValueError('readTableFile: wrong input type') mode = opts.get('mode','rows') skip = opts.get('skip',0) table = [] count = skip for l in ftable: if not l.strip(): continue # ignore blank lines if l[0] == '#': continue # ignore comments if count < skip: # skip rows count = count + 1 continue ls = l.split() ln = [] for num in ls: ln.append(float(num)) table.append(ln) count = 0 ftable.close() table = np.array(table) if mode == 'rows': return table elif mode == 'cols': return table.transpose() else: raise ValueError('readTableFile: mode must be either "rows" or "cols"')
[docs] def readAuto(fname,**opts): """ Read an autocorrelation file into a complex array fname -- path to file to be read or file object Options: skip -- integer, dafault 0. New row read every <skip> rows. """ skip = opts.get('skip',0) tmax = opts.get('tmax',9.0e+9) a = readTableFile(fname,mode='cols',skip=skip) time = np.array(a[0]) auto = [] for i in range(len(a[0])): re = a[1][i] im = a[2][i] auto.append(complex(re,im)) auto = np.array(auto) rtime = [] rauto = [] for i,t in enumerate(time): if t < tmax: rtime.append(t) rauto.append(auto[i]) return np.array(rtime),np.array(rauto)
[docs] def readColumnFile(fname,col_ix=0): l = [] with open(fname) as f: for line in f: l.append(line.split()[col_ix]) f.close() return l
[docs] def getNextTime(finput): """ Return next time in data file Input: finput - file object with the gpop file Returns: time - time (float) or None if EOF is reached """ line = finput.readline() while 1: if line == '' or line == ' ': return None if line[0] == '#': return float(line.split()[1]) line = finput.readline()
[docs] def getPopTable(finput,dof): """ Return the next grid population in table form Input: finput - file object with the gpop file dof - degree of freedom to look for Returns: found - True/False, whether still data to be returned table - list of lists with x,y data """ table = [] while 1: line = finput.readline() if line == '': raise ValueError('EOF reached reading gpop file') if line[0] == '#': raise ValueError('Next block reached reading gpop file') if line == '\n' or line == ' \n': continue if line.find('.') < 0: # head of DOF block if int(line.split()[0]) == dof: # read block pdim = int(line.split()[1]) for i in range(pdim): line = finput.readline() table.append(line) return table
[docs] def getDenTable(finput,dof,**args): """ Return the next grid reduced density in table form Input: finput - file object with the gden file dof - degree of freedom to look for Returns: found - True/False, whether still data to be returned table - list of lists with x,y,z data """ norm = args.get('norm',False) while 1: line = finput.readline() if line == '': raise ValueError('EOF reached reading gden file') if line[0] == '#': raise ValueError('Next block reached reading gden file') if line == '\n' or line == ' \n': continue if line.find('.') < 0: # head of DOF block if int(line.split()[0]) == dof: # read block pdim = int(line.split()[1]) cmat = np.zeros((pdim,pdim),complex) table = [] for i in range(pdim): for j in range(pdim): line = finput.readline() sline = line.split() table.append(sline[0]+' '+sline[1]+' ') creal = float(line.split()[2]) cimag = float(line.split()[3]) cmat[i,j] = creal + cimag*1j tableOut = [] s = 0 for i in range(pdim): for j in range(pdim): if cmat[i,i] < TINY or cmat[j,j] < TINY: cnorm = 1.0 else: cnorm = abs(cmat[i,j])/abs(cmat[i,i]*cmat[j,j])**0.5 c = cmat[i,j] tline = table[s] + '%13.7f %13.7f %13.7f\n' % (c.real,c.imag,cnorm) tableOut.append(tline) s = s + 1 tableOut.append('\n') return tableOut
[docs] def getUniqueID(idlength=12): return uuid.uuid4().hex[0:idlength]
[docs] def getDateString(): CTIME = datetime.datetime.today() return '%4i%02i%02i' % (CTIME.year,CTIME.month,CTIME.day)
[docs] def listToFile(mylist,fname): with open(fname,'w') as f: for item in mylist: f.write(str(item)+'\n') f.close()
[docs] def changeIsotopeSymbols(listin): subst = { 'D' : 'H', 'C13' : 'C' } listout = [] for s in listin: listout.append( subst.get(s,s) ) return listout
[docs] def traj2xyz(atomlist, geoms, filename='traj.xyz', comment='CDTK-generated traj file', timesteps=None): """ Generate a trajectory in .xyz format Useful for plots with VMD Args: atomlist (list of strings): list of atom symbols geoms (numpy array [step, atoms, 3 ]): geometries filename (str, optional): output filename. Defaults to 'traj.xyz'. comment (str, optional): comment line. Defaults to 'CDTK-generated traj file'. timesteps (array of floats, optional): time step information. Defaults to None. """ with open(filename,'w') as f: for step in range(geoms.shape[0]): f.write(f'{len(atomlist)}\n') if timesteps is None: f.write(f'{comment} step={step}\n') else: f.write(f'{comment} step={step} time = {timesteps[step]:8.2f} au= {timesteps[step]*cv.au2fs:8.2f} fs\n') for atom in range(geoms.shape[1]): f.write(f'{atomlist[atom]:4s} {geoms[step,atom,0]*cv.au2an:15.6f} {geoms[step,atom,1]*cv.au2an:15.6f} {geoms[step,atom,2]*cv.au2an:15.6f}\n') f.close()
[docs] def traj2vtf(atomlist, Rlog, vtfname='traj', mmlist=None, disp_n_atoms=None): """ Generate a trajectory in .vtf format Useful for plots with VMD Point charges represented by "X" Input arguments: atomlist -- path to atomlist file Rlog -- path to R.log file vtfname -- vtf filename mmlist -- path to MM region atomlist file disp_n_atoms -- int; number of atoms to output (truncation for large systems) """ alist = [] count = 0 stride = 1 with open(atomlist,'r') as fatom: s = fatom.readline() while s: alist.append( s[:-1] ) count += 1 if (disp_n_atoms is not None) and (count == disp_n_atoms): break s = fatom.readline() fatom.close() if mmlist is not None and os.path.exists(mmlist): with open(mmlist,'r') as fatom: s = fatom.readline() while s: alist.append( 'X' ) s = fatom.readline() mmlist.close() natoms = len(alist) vtf = vtfname + '.vtf' with open(vtf,'w') as fvtf: fvtf.write('# VTF Trajectory\n') fvtf.write('# atomlist at ' + atomlist + '\n') fvtf.write('# R.log at ' + Rlog + '\n') fvtf.write('# time step stride {t:d}\n'.format(t=stride)) fvtf.write('\n') for i,s in enumerate(alist): fvtf.write( 'atom %i radius 1.0 name %s\n' % (i,s) ) fvtf.write('\n') with open(Rlog,'r') as fRlog: l = fRlog.readline() j = 0 while l: if j%stride == 0: fvtf.write('timestep\n') ll = l.split() t = ll.pop(0) for i in range(natoms): x = float( ll.pop(0) ) * cv.au2an y = float( ll.pop(0) ) * cv.au2an z = float( ll.pop(0) ) * cv.au2an fvtf.write( '%12.7f %12.7f %12.7f\n' % (x,y,z) ) fvtf.write('\n') l = fRlog.readline() j += 1 fRlog.close() fvtf.close()
[docs] def atomicCoordinatesFromRlog(Rlog): """ Sort the entries of an Rlog file on a per atom basis Rlog: t0 X01 Y01 Z01 X02 Y02 Z02 ...... X0N Y0N Z0N t1 X11 Y11 Z11 X12 Y12 Z12 ...... X1N Y1N Z1N ... tf Xf1 Yf1 Zf1 Xf2 Yf2 Zf2 ...... XfN YfN ZfN Returns as array [ [ [X01 Y01 Z01 ], [X02 Y02 Z02], .... [X0N Y0N Z0N] ], [ [X11 Y11 Z11 ], [X12 Y12 Z12], .... [X1N Y1N Z1N] ], .... [ [Xf1 Yf1 Zf1 ], [Xf2 Yf2 Zf2], .... [XfN YfN ZfN] ] ] """ with open(Rlog,'r') as fRlog: xyz = [] l = fRlog.readline() natoms = len( l.split() )//3 while l: a = list( map( float, l.split() ) ) # line to list a.pop(0) # pop time entry a = np.asarray(a) a.shape = (natoms,3) xyz.append(a) l = fRlog.readline() return np.asarray(xyz)
[docs] def getTrajectoryTimeSteps(Rlog): with open(Rlog,'r') as fRlog: t = [ float( line.split().pop(0) ) for line in fRlog ] return np.asarray(t)
[docs] def getDataFromLog(log): with open(log,'r') as logFile: data = [ line.split() for line in logFile ] return np.asarray(data)
[docs] def getDirectoryList(folder='./', file_name=None): """ Get trj_ subfolders in a given folder. If a file name is given, only trj_ subfolders are returned that contain a file with that name. The returned list is sorted. Parameters ---------- folder : string, optional, default: './' Folder to search for trj_ subfolders. file_name : string, optional, default: None If given, only trj_ subfolders are returned that contain a file with that name. Returns ------- dirs : list of string Sorted list of trj_ subfolders. """ allEntries = os.listdir(folder) dirs = [] for entry in allEntries: path = os.path.join(folder, entry) if entry[0:4] == 'trj_' and os.path.isdir(path): if file_name is None or os.path.isfile(os.path.join(path, file_name)): dirs.append(path) dirs.sort() return dirs
[docs] def readRlogRestricted(rlog,col0=0,coln=1): ''' Read log file and return array with vals starting at col0, ending at col0 + coln ''' r_xyz = [] with open(rlog,'r') as f: for line in f: vals = map(float, line.split()[col0:col0+coln]) r_xyz.append(vals) return np.asarray(r_xyz)
################################################################ ################################################################ # XML functions ################################################################
[docs] def readTag(tag,string): """ In a string look for XML tag "tag" and return its content tag - string containing the tag whose contents are to be retrieved string - text string returns the contents between <tag> and </tag> or None if <tag> is not present. """ itag = '<'+tag+'>' ftag = '</'+tag+'>' if string.find(tag) == -1: return None istring = string.find(itag) fstring = string.find(ftag) + len(ftag) return string[istring:fstring]
[docs] def readTagFromFile(tag,path): """ return a string with the contents between <tag> and </tag> """ s = readFile(path) return readTag(tag,s)
################################################################# # Array manipulation #################################################################
[docs] def ReverseArray(array): """ Reverse array. Parameters: ----------- array : 1D array_like array to be reversed Returns ------- Reversed array """ reverse_array = np.zeros(len(array)) for i, x in enumerate(array): reverse_array[-i-1] = x return reverse_array
[docs] def ShiftArray(array, shift, **option): """ Shift array. Parameters: ----------- array : 1D array_like array to be shifted shift : int amount of shifting, positive integer will shift the array to the right, negative integer will shift the array to the left. periodic : Bool, optional, default: True if True, use the periodic boudary condition on the array edge (the right most array element which is right shifted will appear on the left most, and vice versa). If False, the new array element which enter the array as the right most or the left most elements will be set 0.0. Returns: -------- Shifted array """ periodic = option.get('periodic', True) N = len(array) shifted_array = np.zeros(N) if periodic: for i in xrange(N): shifted_array[(i + shift) % N] = array[i] else: for i in xrange(N): l = i + shift if l < 0 or l >= N: shifted_array[l % N] = 0.0 else: shifted_array[l] = array[i] return shifted_array