Source code for CDTK.analyze

#!/usr/bin/env python

#*  **************************************************************************
#*
#*  CDTK, Chemical Dynamics Toolkit
#*  A modular system for chemical dynamics applications and more
#*
#*  Copyright (C) 2011, 2012, 2013, 2014, 2015, 2016
#*  Oriol Vendrell, DESY, <oriol.vendrell@desy.de>
#*
#*  Copyright (C) 2017, 2018, 2019
#*  Ralph Welsch, DESY, <ralph.welsch@desy.de>
#*
#*  Copyright (C) 2020, 2021, 2022, 2023
#*  Ludger Inhester, DESY, ludger.inhester@cfel.de
#*
#*  This file is part of CDTK.
#*
#*  CDTK is free software: you can redistribute it and/or modify
#*  it under the terms of the GNU General Public License as published by
#*  the Free Software Foundation, either version 3 of the License, or
#*  (at your option) any later version.
#*
#*  This program is distributed in the hope that it will be useful,
#*  but WITHOUT ANY WARRANTY; without even the implied warranty of
#*  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#*  GNU General Public License for more details.
#*
#*  You should have received a copy of the GNU General Public License
#*  along with this program.  If not, see <http://www.gnu.org/licenses/>.
#*
#*  **************************************************************************


import sys
import itertools
import re
import pickle
import shlex

from scipy import interpolate
from optparse import OptionParser

import numpy as np
import numpy.linalg as la

import CDTK.Tools.Utils as uti
import CDTK.Tools.Conversion as conv

# This program does several different analysis schemes for a given set of trajectories
# and also calculated the associated fourier spectrum (TODO: fourier trafor to be implemented!).
#
# The trajectories need to be saved with the -p command from xpyder (pickle).
#
# The analysis is based on the calculation of correlation functions of the type
# <A B(t)>, where A and B correspond to two 'observables'. Setting 'A' to be the
# identity also expectation values can be calculated. One can also request the
# calculation of said correlation function at a given timestep (i.e. the first one, the
# last one, ...). More details in the help text.
#
# The final data can be analysed in several ways. The mean value averaged over all
# trajectories can be calculated as well as other functions (standard deviation, percentile, ...).
# Alternatively, the quantity calculated can be histogrammed with different ways to define the
# histograms. More details in the help text.

# For all values obtained the standard error of the respective quantity is calculated as
# the standard deviation of that quantity obtained from bootstrapping!

# The 'commands' tuple holds most of the definitions needed:
# 0: string defining observable A
# 1: string defining observable B
# 2: string defining the value used for projection
# 3: string defining the operation for combining A and B
# 4: string requesting histograms and defining the bins to be used
# 5: string defining the function to be calculated of the given values (mean, std, ...)
# 6: string defining the output file

[docs] def getBins(doHis, nvalues, correl): """ This function creates a set of bins for histogramming the data calculated. Input: TODO --- doHis --- either an integer number that defines the number of bins to be used, 0 for integer steps (in both cases upper and lower bounds are chosen as highest and lowest value), or three numbers defining lower bound, upper bound as well as step or a valid command for numpy.histogram nvalues --- number of values calculated correl --- correlation function values calculated Output: bins --- array holding the definition of the bins """ # arrays to hold the lower and upper bounds minb = np.zeros(nvalues) maxb = np.zeros(nvalues) bins = [] # Iterate over all values calculated for k in range(nvalues): # check the string defining the type of bins requested vals = doHis.split(',') # 1 part -> get number of bins requested if len(vals) == 1: try: n = int(vals[0]) except ValueError: n = None # do integer bins; step of bins = 1 if n == 0: ### TODO get these numbers minb[k] = int(np.floor(np.nanmin(correl[:, :, k])))-0.5 maxb[k] = int(np.ceil(np.nanmax(correl[:, :, k])))+0.51 bins.append(np.arange(minb[k], maxb[k])) # do certain number of bins with upper and lower bound given by the data elif n > 0: # do not take zeros into account if nozero == True: try: minb[k] = np.nanmin(np.take(correl[:, :, k], correl[:, :, k].flatten().nonzero())) - 1e-7 except ValueError: minb[k] = -1e-7 try: maxb[k] = np.nanmax(np.take(correl[:, :, k], correl[:, :, k].flatten().nonzero())) + 1e-7 except ValueError: maxb[k] = 1e-7 # do take zeros into account else: minb[k] = np.nanmin(correl[:, :, k]) - 1e-7 maxb[k] = np.nanmax(correl[:, :, k]) + 1e-7 # create set of bins bins.append(np.linspace(minb[k], maxb[k], int(doHis)+1)) # this should be a reasonable string for np.histogram else: bins.append(vals[0]) # 3 parts -> minimum value, maximum value and step size elif len(vals) == 3: minb[k] = float(vals[0]) maxb[k] = float(vals[1]) + float(vals[2]) bins.append(np.arange(minb[k], maxb[k], float(vals[2]))) return bins
[docs] def bootstrap(correl, doHis, func=np.nanmean, nbootstrap=1000, nozero=False): """ This function performs bootstraping to obtain meaningful variances for the calculated quantities. Either a histogram or a certain quantity (i.e. mean, std) given as a function. Input: correl --- an array of values to perform bootstrapping on; NaN values are ignored doHis --- either an integer number that defines the number of bins to be used, 0 for integer steps (in both cases upper and lower bounds are chosen as highest and lowest value), or three numbers defining lower bound, upper bound as well as step or a valid command for numpy.histogram func --- function defining the value to be calculated (i.e. mean, std, etc.) nbootstrap --- number of bootstrapping steps nzero --- if zero values should be omitted for bin calculation Output: mean --- mean values of the quantity requested (i.e. mean or histogram) sigma --- standard errors for the quantities (i.e. standard deviations of the mean or bin value) bins --- definition of bins if histogramming is used ntraj --- number of trajectories used for each value; i.e. number of values not NaN or Inf """ # reshape for better processing if len(correl.shape) == 2: nsamples, nsteps = correl.shape nvalues = 1 else: nsamples, nsteps, nvalues = correl.shape correl = correl.reshape(nsamples, nsteps, nvalues) # calculate the number of trajectories that do not have NaN or Inf for each timestep and values ntraj = np.zeros((nsteps, nvalues), dtype=int) for j in range(nsteps): for k in range(nvalues): if type(correl[:, j, k][0]) == np.ndarray: ntraj[j, k] = len(correl[:, j, k]) else: ntraj[j, k] = len(correl[np.isfinite(correl[:, j, k])]) # set up structure for means values = np.zeros((nsteps, nbootstrap, nvalues)).tolist() # create bins for histogram bins = [] if doHis != None: bins = getBins(doHis, nvalues, correl) # iterate as often as given for i in range(nbootstrap): # get indices for bootstrap resampling; 1 bootstrap iteration = no random sampling! if nbootstrap == 1: resample_i = np.array(list(range(nsamples))) else: resample_i = np.random.randint(nsamples, size=nsamples) # for each time-step calculate quantities for all values for j in range(nsteps): if doHis != None: values[j][i] = [] # iterate over all calculated values for k in range(nvalues): # do histogram if doHis != None: r_correl = correl[resample_i, j, k] if type(r_correl[0]) == np.ndarray: r_correl = np.concatenate( r_correl, axis=0 ) else: r_correl = r_correl[np.isfinite(r_correl)] nt = float(len(r_correl)) val, bi = np.histogram(r_correl, bins=bins[k], density=False) values[j][i] += (val/nt).tolist() bins[k] = bi # call given function (mean, std, ...) else: values[j][i][k] = func(correl[resample_i, j, k]) values = np.array(values).reshape((nsteps, nbootstrap, -1)) nvalues = values.shape[2] # calculate bootstraped mean of quantity requested and corresponding std. deviation mean = np.zeros((nsteps, nvalues)).tolist() sigma = np.zeros((nsteps, nvalues)).tolist() mean = np.mean(values, axis=1) sigma = np.std(values, axis=1) return np.array(mean), np.array(sigma), bins, ntraj
[docs] def red(c0, c, redString): """ This function calculates the correlation between two values at time 0 and t. The correlation might be calculated for a single value by multiplication, for a set of values in an array as an elemnt-wise multiplication (i.e. [c0[0]*c[0], c0[1]*c[1], ...) or for vector-valued quantities by a dot product. Other options might be needed here. Input: c0 --- value or values at time 0 c --- value or values at time t redString --- definition on how to combine the values. 'dot' does a dot-product of both values 'mult' does a multiplication; element-wise (i.e. [c0[0]*c[0], c0[1]*c[1], ...) if arrays are given. Output: The connected value or values. """ if redString == 'dot': return np.dot(c0, c) elif redString == 'mult': return c0 * c else: print('Reduction string invalid. redString = ', redStrin, ' not implemented.') exit(-1)
[docs] def angle(optionsString, pValue, traj, i, val): """ Does perform operations related to angles. If no options given it will not work. Valid options are as follows: -1 <a>, -2 <b>, -3 <b> given: calculate the angle between the the sites. A site can be defined as an atom by its number or as a center of mass by giving m and a comma separated list of atoms, i.e., -1 m,0,3,4. Optionally, the third argument can also be a space fixed axis given as -3 s,0,0,1 and then the angle between the vector given by the first two sites and that axis is calculated. -p <a> given: this option projects it onto a certain range. Valid are below a value (<,A) or above a value (>,A) or fully bound ranges (A,<,B) - also several ranges can be given as A,<,B,C,<,D, .. . If within the given value the function returns 1.0, else the given pValue or NaN if -v is given. -v given: Requests NaN values for projection. -n given: Normalization for angle projections required. Input: optionsString --- Input flags given pValue --- value used for projections traj --- trajectory object to analyze i --- time step given as integer val --- value from previous operation Output: values obtained from the position operation combined with old value. """ obsparser = OptionParser(usage="Options for +angle or +ang", add_help_option=False) obsparser.add_option('-h', '--help', dest='help', action='store_true', default=False, help='Prints this help page.') obsparser.add_option('-1', '--angle1', dest='ang1', type='str', default=None, help='An angle between the three given (d1, d2 and d3) sites should be calculated. A site can be defined as an atom by its number or as a center of mass by giving m and a set of atoms by their numbers with comma sepatation.') obsparser.add_option('-2', '--angle2', dest='ang2', type='str', default=None, help='An angle between the three given (d1, d2 and d3) sites should be calculated. A site can be defined as an atom by its number or as a center of mass by giving m and a set of atoms by their numbers with comma sepatation.') obsparser.add_option('-3', '--angle3', dest='ang3', type='str', default=None, help='An angle between the three given (d1, d2 and d3) sites should be calculated. A site can be defined as an atom by its number or as a center of mass by giving m and a set of atoms by their numbers with comma sepatation. The third site can also be given by a space fixed vector given by an s and then the three components.') obsparser.add_option('-p', '--project', dest='proj', type='str', default=None, help='Take the angle given and project onto a certain range. Valid are fully bound ranges (A,<,B), or below a value (<,A) or above a value (>,A).') obsparser.add_option('-n', '--normalize', dest='doNorm', action='store_true', default=False, help='If normalization is required') obsparser.add_option('-v', '--nanvalue', dest='doNan', action='store_true', default=False, help='If only this projection is supposed to give Nans.') opts, args = obsparser.parse_args(optionsString.split()) if opts.help == True: obsparser.print_help() return 0.0 cval = None # obtain first site: vals = opts.ang1.split(',') # center of mass if vals[0] == 'm': coord1 = traj.getCOM([int(v) for v in vals[1:]], i) else: n1 = int(vals[0]) coord1 = traj.logR[i][n1*3:(n1+1)*3] # obtain second site: vals = opts.ang2.split(',') # center of mass if vals[0] == 'm': coord2 = traj.getCOM([int(v) for v in vals[1:]], i) else: n2 = int(vals[0]) coord2 = traj.logR[i][n2*3:(n2+1)*3] # obtain third site: vals = opts.ang3.split(',') # center of mass if vals[0] == 'm': coord3 = traj.getCOM([int(v) for v in vals[1:]], i) # second vector given as third site! elif vals[0] == 's': coord3 = None vec2 = np.array(vals[1:4], dtype=float) else: n2 = int(vals[0]) coord2 = traj.logR[i][n2*3:(n2+1)*3] # get first vector: vec1 = coord1 - coord2 norm1 = np.linalg.norm( vec1 ) # get second vector if coord3 != None: vec2 = coord3 - coord2 norm2 = np.linalg.norm( vec2 ) # get angle dp = np.dot(vec1, vec2)/ norm1 / norm2 angle = np.arccos( np.clip(dp, -1, 1) ) cval = angle # if we want to project the angle onto a certain interval if opts.proj != None: vals = opts.proj.split(',') # case: above a value; > A if vals[0] == '>': if cval > float(vals[1]): cval = 1.0 else: if opts.doNan == True: cval = np.NaN else: cval = pValue # case: below a value; < A if vals[0] == '<': if cval < float(vals[1]): cval = 1.0 else: if opts.doNan == True: cval = np.NaN else: cval = pValue if len(vals) == 3: # case: between value; < A < if vals[1] == '<': if cval > float(vals[0]) and cval < float(vals[2]): cval = 1.0 if opts.doNorm == True: cval /= (2.0 * np.pi * (np.cos(float(vals[0])) - np.cos(float(vals[2]))) ) else: if opts.doNan == True: cval = np.NaN else: cval = pValue # several cases: between value; < A < elif len(vals) > 3: for i in range(0, len(vals), 3): vals2 = vals[i:i+3] if vals2[1] == '<': if angle > float(vals2[0]) and angle < float(vals2[2]): cval = 1.0 if opts.doNorm == True: cval /= (2.0 * np.pi * (np.cos(float(vals2[0])) - np.cos(float(vals2[2]))) ) break else: if opts.doNan == True: cval = np.NaN else: cval = pValue ## Combine with old values try: if cval == None or val == None: return pValue else: return cval*val except: return cval*val
[docs] def position(optionsString, pValue, traj, i, val): """ Does perform operations related to positions. If no options given it will return None. Valid options are as follows: -1 <a>, -2 <b> given: calculate the distance between the two sites. A site can be defined as an atom by its number or as a center of mass by giving m and a comma separated list of atoms, i.e., -1 m,0,3,4. -x <a> given: coordinate value of a given coordinates, e.g., -x 0, gives the x coordinate of the first atom, or -x 0,3,7 gives the x coordinate of the first and second atom and the y coordinate of the third atom. Alternatively, also the 3D coordinate of a center of mass can be obtained by giving m and a comma separated list of atoms. Single atoms can be obtained as center of mass of that atom, i.e., -x m,1. -p <a> given: if a single value is calculated (i.e. a distance or single coordinate) this option projects it onto a certain range. Valid are fully bound ranges (A,<,B), or below a value (<,A) or above a value (>,A). If within the given value the function returns 1.0, else the given pValue. -d <a> given: checks whether a given site is dissociated, i.e., if its distance to all other atoms is larger than a certain number. The number to compare is given first and then the atom or site to look at is given, e.g., -d 7.0,2 or -d 7.0,m,0,3,4. Input: optionsString --- Input flags given pValue --- value used for projections traj --- trajectory object to analyze i --- time step given as integer val --- value from previous operation Output: values obtained from the position operation combined with old value. """ obsparser = OptionParser(usage="Options for +position or +pos", add_help_option=False) obsparser.add_option('-h', '--help', dest='help', action='store_true', default=False, help='Prints this help page.') obsparser.add_option('-1', '--distance1', dest='dis1', type='str', default=None, help='A distance between the two given (d1 and d2) sites should be calculated. A site can be defined as an atom by its number or as a center of mass by giving m and a set of atoms by their numbers with comma sepatation.') obsparser.add_option('-2', '--distance2', dest='dis2', type='str', default=None, help='A distance between the two given (d1 and d2) sites should be calculated. A site can be defined as an atom by its number or as a center of mass by giving m and a set of atoms by their numbers with comma sepatation.') obsparser.add_option('-p', '--project', dest='proj', type='str', default=None, help='Take the distance or coordinate value given and project onto a certain range. Valid are fully bound ranges (A,<,B), or below a value (<,A) or above a value (>,A).') obsparser.add_option('-x', '--x', dest='coord', type='str', default=None, help='A specific coordinate or a list of coorinates. Numbered starting with 0, i.e., 0: x-coordinate of first atom, 1: y-coordinated of first atom, etc. If we want the center of mass of the list we have to give a m first.') obsparser.add_option('-d', '--dissociated', dest='diss', type='str', default=None, help='Check wheather the given atom or list of atoms is beyond a certain distance d to all other atoms. Format -d d,n or -d d,m,n0,n1,n2') opts, args = obsparser.parse_args(optionsString.split()) if opts.help == True: obsparser.print_help() return 0.0 cval = None # if we want to look at dissociation if opts.diss != None: # obtain dissociation parameter vals = opts.diss.split(',') diss_distance = float(vals[0]) # center of mass if vals[1] == 'm': atoms = [int(v) for v in vals[2:]] coord = traj.getCOM(atoms, i) # or regular coordinates else: atoms = [int(vals[1])] coord = traj.logR[i][atoms[0]] cval = 1.0 # check distance to all other atoms n = len(traj.logR[i]) // 3 for j in range(n): # if atom not in site definition, get distance if not j in atoms: d = np.linalg.norm(coord - traj.logR[i][j*3:(j+1)*3]) # if not dissociated in one case, break; i.e. lazy evaluation if d < diss_distance: cval = pValue break # if we want to look at coordinates if opts.coord != None: # obtain coordinate list vals = opts.coord.split(',') # center of mass if vals[0] == 'm': cval = traj.getCOM([int(v) for v in vals[1:]], i) # certain coordinate else: cval = np.zeros(len(vals)) for j, v in enumerate(vals): cval[j] = traj.logR[i][int(v)] # if we want to look at distances if opts.dis1 != None and opts.dis2 != None: # obtain first site: vals = opts.dis1.split(',') # center of mass if vals[0] == 'm': coord1 = traj.getCOM([int(v) for v in vals[1:]], i) else: n1 = int(vals[0]) coord1 = traj.logR[i][n1*3:(n1+1)*3] # obtain second site: vals = opts.dis2.split(',') # center of mass if vals[0] == 'm': coord2 = traj.getCOM([int(v) for v in vals[1:]], i) else: n2 = int(vals[0]) coord2 = traj.logR[i][n2*3:(n2+1)*3] # calculate distance cval = np.linalg.norm( coord1 - coord2 ) # if we want to project the distance/coordinate onto a certain interval if opts.proj != None: vals = opts.proj.split(',') # case: above a value; > A if vals[0] == '>': if cval > float(vals[1]): cval = 1.0 else: cval = pValue # case: below a value; < A if vals[0] == '<': if cval < float(vals[1]): cval = 1.0 else: cval = pValue # case: between values < A < if vals[1] == '<': if cval > float(vals[0]) and cval < float(vals[2]): cval = 1.0 else: cval = pValue ## Combine with old values try: if cval == None or val == None: return pValue else: return cval*val except: return cval*val
[docs] def velocity(optionsString, pValue, traj, i, val): """ Does perform operations related to velocities. If no options given it will return None. Note: adapted from RPMD branch. Valid options are as follows: -x <a> given: velocity value of a given coordinates, e.g., -x 0, gives the x velocity of the first atom, or -x 0,3,7 gives the x velocity of the first and second atom and the y velocity of the third atom. Alternatively, also the 3D velocity of a center of mass can be obtained by giving m and a comma separated list of atoms. Single atoms can be obtained as center of mass of that atom, i.e., -x m,1. Input: optionsString --- Input flags given pValue --- value used for projections traj --- trajectory object to analyze i --- time step given as integer val --- value from previous operation Output: values obtained from the velocity operation combined with old value. """ obsparser = OptionParser(usage="Options for +velocity or +vel", add_help_option=False) obsparser.add_option('-h', '--help', dest='help', action='store_true', default=False, help='Prints this help page.') obsparser.add_option('-A','--A', dest='obsA', type='str', default=None, help='First observable') obsparser.add_option('-x', '--x', dest='coord', type='str', default=None, help='A specific coordinate or a list of coorinates. Numbered starting with 0, i.e., 0: x-coordinate of first atom, 1: y-coordinated of first atom, etc. If we want the center of mass of the list we have to give a m first.') opts, args = obsparser.parse_args(optionsString.split()) if opts.help == True: obsparser.print_help() return 0.0 cval = None # if we want to look at coordinates if opts.coord != None: # obtain coordinate list vals = opts.coord.split(',') # center of mass if vals[0] == 'm': print("Center of mass velocity not implemented!") exit(-1) # certain coordinate else: cval = np.zeros(len(vals)) for j, v in enumerate(vals): cval[j] = traj.logV[i][int(v)] return cval*val
[docs] def charge(optionsString, pValue, traj, i, val): """ Does perform operations related to charges. If no options given it will return the charge of the system. Valid options are as follows: -t given: give back list of partial charges. -n <a> given: if -t is given, only consider the atoms given here as a list -p <a>: project the calculated (partial) charge rounded to an integer to the given value. Gives 1.0 for matches to the given value, or pValue if not. Input: optionsString --- Input flags given pValue --- value used for projections traj --- trajectory object to analyze i --- time step given as integer val --- value from previous operation Output: values obtained from the charge operation combined with old value. """ obsparser = OptionParser(usage="Options for +charge or +cha", add_help_option=False) obsparser.add_option('-h', '--help', dest='help', action='store_true', default=False, help='Prints this help page.') obsparser.add_option('-p', '--project', dest='project', type='int', action='store', default=None, help='Project only onto given charge, partial charges are rounded to integers') obsparser.add_option('-t', '--partial', dest='doPartial', action='store_true', default=False, help='Do analysis for partial charges.') obsparser.add_option('-n', '--numbers', dest='atoms', type='str', default=None, help='Atoms to look at given as comma separated list.') opts, args = obsparser.parse_args(optionsString.split()) if opts.help == True: obsparser.print_help() return 0.0 cval = 1.0 # get partial charges for either all atoms or a single site if opts.doPartial == True: if opts.atoms == None: cval = np.array(traj.ES.log['partial'][i][1:]) else: vals = opts.atoms.split(',') cval = np.array([traj.ES.log['partial'][i][int(j)+1] for j in vals]) else: cval = np.array([traj.ES.log['charge'][i][1]], dtype=float) # project onto certain charge value if opts.project != None: proj_q = int(opts.project) for j, v in enumerate(cval): if round(v) != proj_q: cval[j] = pValue else: cval[j] = 1.0 ## Combine with old values return cval*val
[docs] def state(optionsString, pValue, traj, i, val): """ Does perform operations related to the electronic state in surface hopping. Valid options are as follows: -p <a>: project results only for given state. Input: optionsString --- Input flags given pValue --- value used for projections traj --- trajectory object to analyze i --- time step given as integer val --- value from previous operation Output: values obtained from the state operation combined with old value. """ obsparser = OptionParser(usage="Options for +state", add_help_option=False) obsparser.add_option('-h', '--help', dest='help', action='store_true', default=False, help='Prints this help page.') obsparser.add_option('-p', '--project', dest='project', type='int', action='store', default=None, help='Project only onto given state.') opts, args = obsparser.parse_args(optionsString.split()) if opts.help == True: obsparser.print_help() return 0.0 # make sure the trajectory has a state hasS = hasattr(traj, 'logS') # get either the state from the variable or from an logged event if hasS == True: cval = traj.logS[i] else: cval = int(traj.ES.log['event'][-1][2]) # project onto certain state if opts.project != None: if cval == opts.project: cval = 1.0 else: cval = pValue ## Combine with old values return cval*val
[docs] def spec(optionsString, pValue, traj, i, val): """ Does perform operations related to electronic spectra (Auger, Photoionisation, Flouresence, ...). TODO check if this really works!! Valid options are as follows: -a: Auger spectrum -i: Photoionization spectrum -f: Flouresence spectrum Input: optionsString --- Input flags given pValue --- value used for projections traj --- trajectory object to analyze i --- time step given as integer val --- value from previous operation Output: values obtained from the spec operation combined with old value. """ obsparser = OptionParser(usage="Options for +spectrum or +spec", add_help_option=False) obsparser.add_option('-h', '--help', dest='help', action='store_true', default=False, help='Prints this help page.') obsparser.add_option('-a', '--auger', dest='doAuger', action='store_true', default=False, help='Do analysis for auger decays.') obsparser.add_option('-i', '--photoion', dest='doPhoto', action='store_true', default=False, help='Do analysis for photoionization.') obsparser.add_option('-f', '--flouresence', dest='doFlouro', action='store_true', default=False, help='Do analysis for fouresence.') opts, args = obsparser.parse_args(optionsString.split()) if opts.help == True: obsparser.print_help() return 0.0 cval = 1.0 if opts.doAuger == True: cval = [np.NaN] for p in traj.ES.log['event']: if p[1].lower() == 'a': cval[0] = float(p[4]) elif opts.doPhoto == True: cval = [] for p in traj.ES.log['event']: if p[1].lower() == 'p': cval.append(float(p[4])) elif opts.doFlouro == True: cval = [] for p in traj.ES.log['event']: if p[1].lower() == 'f': cval.append(float(p[4])) cval = np.array(cval) ## Combine with old values return cval*val
[docs] def kinetic(optionsString, pValue, traj, i, val): """ Does perform operations related to kinetic energies. If no options given it will return the full kinetic energy of the system. Valid options are as follows: -n <a> given: Comma separated list of atoms to be taken into account. -s given: if given, the kinetic energy for each atom is calculated separatly and returned as an array -k <a> given: The remaining KER (TODO what is actually done here?) is calculated for each atom (currently -n is ignored by this option, but should be implemented); implies -s. Input: optionsString --- Input flags given pValue --- value used for projections traj --- trajectory object to analyze i --- time step given as integer val --- value from previous operation Output: values obtained from the kinetic operation combined with old value. """ obsparser = OptionParser(usage="Options for +kinetic or +kin", add_help_option=False) obsparser.add_option('-h', '--help', dest='help', action='store_true', default=False, help='Prints this help page.') obsparser.add_option('-n', '--numbers', dest='atoms', type='str', default=None, help='Atoms to look at given as comma separated list.') obsparser.add_option('-s', '--separate', dest='sep', action='store_true', default=False, help='Consider atoms separately.') obsparser.add_option('-k', '--ker', dest='ker', action='store_true', default=False, help='Calculate KER, implies -s.') opts, args = obsparser.parse_args(optionsString.split()) if opts.help == True: obsparser.print_help() return 0.0 cval = 1.0 # TODO implement -n option here # KER to be calculated if opts.ker == True: n = traj.NDof // 3 cval = np.zeros(n) # get kinetic energy first for j in range(n): cval[j] = traj.kinetic_energy(step=i, atomlist=[j]) # get remaining contribution by coulomb repulsion for k in range(n): if k == j: continue else: red_mass = traj.M[k*3] / (traj.M[k*3] + traj.M[j*3]) qq = traj.ES.log['partial'][i][k+1] * traj.ES.log['partial'][i][j+1] d = np.linalg.norm( traj.logR[i][j*3:(j+1)*3] - traj.logR[i][k*3:(k+1)*3]) cval[j] += red_mass * qq / d return cval*val # take all atoms into account if opts.atoms == None: # separately if opts.sep == True: n = traj.NDof // 3 cval = np.zeros(n) for j in range(n): cval[j] = traj.kinetic_energy(step=i, atomlist=[j]) # full kinetic energy else: cval = traj.kinetic_energy(step=i) # only a subset of atoms else: vals = opts.atoms.split(',') n = len(vals) # separately if opts.sep == True: cval = np.zeros(n) for j in range(n): cval[j] = traj.kinetic_energy(step=i, atomlist=[int(vals[j])]) # combined else: cval = traj.kinetic_energy(step=i, atomlist=[int(v) for v in vals]) ## Combine with old values return cval*val
[docs] def operate(opString, optionsString, projValue, traj, i, val): """ Perform an operation to obtain a quantity from a trajectory at time step i. Results are combined with results from previous operations given in 'val'. The operation is given by the 'opString' with options given by the 'optionsString'. Input: opString --- string defining the operation to be performed. Valid: 'pos': position related things 'vel': velocity related things 'kin': kinetic energy related things 'cha': charge related things 'spec': auger, flouressence or photo-electron related things 'state' : state in surface hopping related 'id' : idendity optionsString --- string with additional options given to the operation projValue --- projection value traj --- Trajectory data i --- time-step as integer val --- value obtained from previous operations Return: quantity obtained by this operation combined with given older value. """ if projValue != 'None': pValue = float(projValue) else: pValue = float('nan') if opString == 'id' or opString == 'identity': return 1.0 elif opString == 'pos' or opString == 'position' : return position(optionsString, pValue, traj, i, val) elif opString == 'ang' or opString == 'angle': return angle(optionsString, pValue, traj, i, val) elif opString == 'vel' or opString == 'velocity': return velocity(optionsString, pValue, traj, i, val) elif opString == 'cha' or opString == 'charge': return charge(optionsString, pValue, traj, i, val) elif opString == 'kin' or opString == 'kinetic': return kinetic(optionsString, pValue, traj, i, val) elif opString == 'spec' or opString == 'spectrum': return spec(optionsString, pValue, traj, i, val) elif opString == 'state' or opString == 'state': return state(optionsString, pValue, traj, i, val) else: print('operation invalid. opString = ', opString, ' not implemented.') exit(-1)
[docs] def observable(obsString, projValue, traj, i): """ Function that calculates a quantity as defined in the 'obsString' from a trajectory at time-step 'i'. Input: obsString --- String defining the quantity to be calculated. projValue --- Value used for any projections traj --- Trajectory data i --- time-step as integer """ # number of operations to be performed # pattern = r'([0-9]+);.*' # n2 = int(re.findall(pattern, obsString)[0]) # print n n = obsString.count('+') # if n != n2: # print "Wrong! n=", n, "n2=", n2 # stop # list of operations to be performed pattern = r'\+(\S+)' operations = re.findall(pattern, obsString) # list of options for each operation options = [] # iterate over all, but last for j in range(len(operations) - 1): offset = len(operations[j]) start = obsString.find(operations[j]) end = obsString.find('+' + operations[j+1]) options.append(obsString[start+offset:end]) # last one separate offset = len(operations[-1]) start = obsString.find(operations[-1]) options.append(obsString[start+offset:]) # go through all operatons val = 1.0 for j, op in enumerate(operations): val = operate(op, options[j], projValue, traj, i, val) # do lazy evaluation try: if val == None: return val except: continue return val
[docs] def doFourrier(): """ TODO implement! """ return
[docs] def outputBins(fout, form, bins): """ Funtion to write bin definition if histogram was requested Input: fout --- output file handle form --- If given, prints results in an alternative formats. Possible values: "time": standard format with one line per time and all values along the columns. "values" one line per value and time along the columns. "single" with values first and empty line and then std errors. bins --- Bin edges if histograms were requested. Output: The bin data is written. """ fout.write("#### A histogram was requested, the center of the histograms are given below: \n") # standard format - time and then value stderror if form.lower() == 'time': fout.write("# 1: Time ") shift = 0 # iterate over bins for subs in bins: fout.write(" ||| ") # get bin center and write it as well as mentioning the stderror for i in range(len(subs) - 1): binc = (subs[i] + subs[i+1]) / 2.0 fout.write(str(2*i + 2 + shift) + ": " + str(binc) + " " + str(2*i + 3 + shift) + ": stderr ") shift += (2*len(subs) - 2) fout.write("\n") elif form.lower() == 'single': fout.write("# ") shift = 0 # iterate over bins for subs in bins: # get bin center and write it as well as mentioning the stderror for i in range(len(subs) - 1): binc = (subs[i] + subs[i+1]) / 2.0 fout.write(str(i + 1 + shift) + ": " + str(binc) + " ") shift += (len(subs)) fout.write("\n") elif form.lower() == 'values': shift = 0 # iterate over bins for subs in bins: fout.write("# ||| ") # get bin center and write it as well as mentioning the stderror for i in range(len(subs) - 1): binc = (subs[i] + subs[i+1]) / 2.0 fout.write(str(2*i + 2 + shift) + ": " + str(binc) + " " + str(2*i + 3 + shift) + ": stderr ") shift += (2*len(subs) - 2) fout.write("\n") return
[docs] def outputNtraj(fout, time, ntraj): """ Funtion to write the number of trajectories contributing. Input: fout --- output file handle. time --- array giving the timestamps. ntraj --- array giving the number of trajectories. Output: The number of trajectories are written. """ fout.write("### Number of trajectories used for this file is: \n") for t, nl in zip(time, ntraj): fout.write("# " + str(t) + " ") for n in nl: fout.write(str(n) + " ") fout.write(" \n") return
[docs] def outputData(m, commands, nbootstrap, step, form, bins, time, mean, sigma, ntraj): """ Funtion to write data to files. Input: m --- index of current command. commands --- dictonary of commands as tuples. They are essentially the input flags for the main program with the following structure: (opA, opB, projection, red, his, output) opA and opB define the operations for <A(0) B(t)> red defines how to combined A and B, i.e. multiplication, dot product, ... his defines if histogram should be produced and number of bins (0=automatic) output defines output file nbootstrap --- number of bootstrap resamplings to be performed step --- Only give results for one timestep. -1 is last, 0 is first, None is all. form --- If given, prints results in an alternative formats. Possible values: "time": standard format with one line per time and all values along the columns. "values" one line per value and time along the columns. "single" with values first and empty line and then std errors. bins --- Bin edges if histograms were requested. time --- array giving the timestamps. mean --- Mean value of the requested data as calculated by the bootstrapping. sigma --- Standard deviations of the mean of the requested data as calculated by the bootstrapping. ntraj --- array giving the number of trajectories. Output: The data is written to files with names given in the command. The first column is always time, and then there is a column for each value directly followed by its standard error. """ # output file sout = commands[m][6] with open(sout, 'w') as fout: # write out some header information fout.write("#### Output file generated by analyze.py for the following commands: \n") fout.write("#### -A " + str(commands[m][0]) + " \n") fout.write("#### -B " + str(commands[m][1]) + " \n") fout.write("#### -p " + str(commands[m][2]) + " \n") fout.write("#### -r " + str(commands[m][3]) + " \n") fout.write("#### -s " + str(commands[m][4]) + " \n") fout.write("#### -v " + str(commands[m][5]) + " \n") fout.write("#### -t " + str(step) + " \n") fout.write("#### -c " + str(form) + " \n") fout.write("#### Standard deviations of the quantities calculated are calculated employing a bootstrapping approach with " + str(nbootstrap) + " samples. \n") # output bins if histogram was requested if (commands[m][4] != None): outputBins(fout, form, bins) # iterate over data and output for t, me, si in zip(time, mean, sigma): # standard format - time and then value stderror if form.lower() == 'time': fout.write('{0: >12.3f} '.format(t) ) for valm, vals in zip(np.array(me).flatten(), np.array(si).flatten()): fout.write('{0: >12.3e} {1: >12.3e} '.format(valm, vals)) fout.write('\n') # alternative format - all values in one line and then all stderrors elif form.lower() == 'single': for valm in np.array(me).flatten(): fout.write('{0: >12.3f} '.format(valm)) fout.write('\n \n') for vals in np.array(si).flatten(): fout.write('{0: >12.3f} '.format(vals)) fout.write('\n') # value format - each value has a row and times are for each column if form.lower() == 'values': # write times to the header line too fout.write("# 1: Values ") for t in time: fout.write(str(t) + " ") nvalues = len(bins) ntime = len(time) # reshape for better plotting mean = mean.reshape(ntime, -1, nvalues, order='F') sigma = sigma.reshape(ntime, -1, nvalues, order='F') shift = 0 # iterate over bins for k, subs in enumerate(bins): fout.write("\n") # get bin center and write respective values for i in range(len(subs) - 1): binc = (subs[i] + subs[i+1]) / 2.0 fout.write(str(binc) + " ") for valm, vals in zip(np.array(mean)[:, i, k], np.array(sigma)[:, i, k]): fout.write('{0: >12.3f} {1: >12.3f} '.format(valm, vals)) fout.write('\n') fout.write('#### \n') ## print number of trajectories that contribute if None given as projection if (commands[m][2] == 'None'): outputNtraj(fout, time, ntraj) return
[docs] def correlation_function(commands, dirs, nbootstrap=1000, step=None, form='time', nozero=False): """ Function to calculate a set of correlation functions or expectation values given by the command string and for the trajectories given in the dirs folder. Standard errors of the calculated quantities are calculated using bootstrapping. Input: commands --- dictonary of commands as tuples. They are essentially the input flags for the main program with the following structure: (opA, opB, projection, red, his, output) opA and opB define the operations for <A(0) B(t)> red defines how to combined A and B, i.e. multiplication, dot product, ... his defines if histogram should be produced and number of bins (0=automatic) output defines output file dirs --- list of directories containing the pickled trajectory objects nbootstrap --- number of bootstrap resamplings to be performed step --- Only give results for one timestep. -1 is last, 0 is first, None is all. form --- If given, prints results in an alternative formats. Possible values: "time": standard format with one line per time and all values along the columns. "values" one line per value and time along the columns. "single" with values first and empty line and then std errors. nozero --- If true ignore zero values in bin calculation. Output: The data is written to files with names given in the command. The first column is always time, and then there is a column for each value directly followed by its standard error. """ ncom = len(commands) c0 = np.zeros(ncom).tolist() correl = [[] for m in range(ncom)] time = [] # Loop over directories, Read data and add to corrlation function for j, d in enumerate(dirs): # load pickled file picklefile = open(d + '/pickle.dat', "rb" ) traj = pickle.load(picklefile) picklefile.close() # Initialize corrlation function array # all times in the trajectory if step == None: n = len(traj.logR) # only one timestep requested else: n = 1 time = np.zeros(n) for m in range(ncom): correl[m].append([]) correl[m][j] = np.zeros(n).tolist() ## Value at t=0 c0[m] = observable(commands[m][0], commands[m][2], traj, 0) ## Iterate over trajectory for i in range(n): s = i if step != None: s = step time[i] = traj.logt[s] ## and all commands for m in range(ncom): c = observable(commands[m][1], commands[m][2], traj, s) # update correlation function correl[m][j][i] = red(c0[m], c, commands[m][3]) for m in range(ncom): ## do statistic here ## obtain mean/std/percentile/... and corresponding sigma if commands[m][5] == 'mean': func = np.nanmean elif commands[m][5] == 'std': func = np.nanstd elif "percentile" in commands[m][5]: pe = float(commands[m][5].split()[1]) func = (lambda x: np.nanpercentile(x, pe)) else: print('Value function string invalid. ', m, '-th valString = ', commands[m][5], ' not implemented.') exit(-1) mean, sigma, bins, ntraj = bootstrap(np.array(correl[m]), commands[m][4], func=func, nbootstrap=nbootstrap, nozero=nozero) # TODO also implement fourier trafo here! # Write correlation function and std error to file outputData(m, commands, nbootstrap, step, form, bins, time, mean, sigma, ntraj) return
[docs] def doParse(filename, directory, parser): """ Parse a file of commands. For documentation of commands see documentation for input flags of the whole program. The commands need to be given in the same way. Different commands can be given separated by lines starting with #. Input: filename --- Name of the file to be parsed as string directory --- Directory for input/output parser --- Set up parser to interpret commands Output: commands --- dictonary of commands """ i = 0 commands = {} cline = "" with open(filename, 'r') as inf: for line in inf: # separator for different commands; line starting with # if line[0] == '#': # already read command needs to be parsed if cline != "": opts, args = parser.parse_args(shlex.split(cline)) commands[i] = (opts.obsA, opts.obsB, opts.projection, opts.red, opts.his, opts.func, directory + opts.outfile) i += 1 cline = "" else: cline += line.rstrip() cline += ' ' # already read command needs to be parsed if cline != "": opts, args = parser.parse_args(shlex.split(cline)) commands[i] = (opts.obsA, opts.obsB, opts.projection, opts.red, opts.his, opts.func, directory + opts.outfile) return commands
[docs] def getParser(): """ Function defining the input options parser. Output: parser --- option parser object. """ usage = "\n \n This program does several different analysis schemes for a given set of trajectories and also calculated the associated fourier spectrum (TODO: fourier trafor to be implemented!). \n \n The trajectories need to be saved with the -p command from xpyder (pickle). \n \n The analysis is based on the calculation of correlation functions of the type <A B(t)>, where A and B correspond to two 'observables'. Setting 'A' to be the identity also expectation values can be calculated. One can also request the calculation of said correlation function at a given timestep (i.e. the first one, the last one, ...). More details in the help text below. \n \n The final data can be analysed in several ways. The mean value averaged over all trajectories can be calculated as well as other functions (standard deviation, percentile, ...). Alternatively, the quantity calculated can be histogrammed with different ways to define the histograms. Several ways to format the output file are also possible. More details in the help text. \n \n For all values obtained the standard error of the respective quantity is calculated as the standard deviation of that quantity obtained from bootstrapping! \n \n The type of correlation function is given as follows. These options can be given as command line options or in a file given with -f <file>. In the file more than one quantity can be defined. Each quantity is terminated by a line starting with ###. \n -A <Operations> and -B <operations> define the observable. Each observable can be comprised of different operations. Each single operation is given with a + sign and and identifier (e.g., +position, +velocity, +kinetic, ...) and apropriate options. The options are detailed below for each possible operation. \n Examples: \n -A '+charge' -B '1; +charge' would calculate a charge-charge correlation function. \n -A '+identity' -B '+posision -1 0 -2 1' would calculate the time-dependent expectation value of the distance between atom 0 and 1. \n -A '+identity' -B '+charge -t -p 0 +kinetic -k' would calculate the time-dependent expectation value of the released kinetic energy for each atom that has a partial charge between 0.5 and 1.5 \n \n -r <red> defines how to interpret A B(t), i.e., which operation to perform to combine A and B. \n \n -p <value> the value to use for unsuccesful projections (usually 0.0 or None). \n \n -v <value> the property to calculate for AB, i.e., the mean or standard deviation. \n \n -o <file> defines the output file, which always contains time as first column, then each of the values calculated (bin center values are given as comments) and the corresponding error directly after each value. \n \n -s <options> to produce a histogram of the calculated data. Details below. Other general options are: \n \n Several other options can be given only as command line options: -d <directory>, -c <format>, -n <Nbootstrap>, -t <timestep>, -z; which are detailed below." # Parse command line options parser=OptionParser(usage=usage, add_help_option=False) parser.add_option('-d','--dir', dest='dir', type='str', default='.', help='Name of directory contraining trj folders. This option cannot be given to individual commands and has to be given as a command line option. ') parser.add_option('-f', '--file', dest='File', action='store', default=None, help='Use inputfile instead of command line input. Valid commands in the input file are: -A, -B, -p, -r, -s, -v, -o') parser.add_option('-n', '--nbootstrap', dest='nbootstrap', type='int', action='store', default=200, help='Number of bootstrapping steps. This option cannot be given to individual commands and has to be given as a command line option.') parser.add_option('-t', '--timestep', dest='step', type='int', action='store', default=None, help='Only calculate results for a given timestep; -1 is last timestep. 0 is first timestep. Default is None - give results for all times. This option cannot be given to individual commands and has to be given as a command line option.') parser.add_option('-c', '--changeformat', dest='form', default='time', help='If given, prints results in an alternative formats. Possible values: "time": standard format with one line per time and all values along the columns. "values" one line per value and time along the columns. "single" with values first and empty line and then std errors. This option cannot be given to individual commands and has to be given as a command line option.') parser.add_option('-z', '--nozero', dest='nozero', action='store_true', default=False, help='Ignore zero values for bin calculation. This option cannot be given to individual commands and has to be given as a command line option.') parser.add_option('-h', '--help', dest='help', action='store_true', default=False, help='Prints this help page.') parser.add_option('-A','--A', dest='obsA', type='str', default=None, help='First observable. TODO help on how to set up.') parser.add_option('-B','--B', dest='obsB', type='str', default=None, help='Second observable. TODO help on how to set up.') parser.add_option('-p', '--projection', dest='projection', type='str', action='store', default='0.0', help='Value for unsuccessful projection.') parser.add_option('-r','--reduce', dest='red', type='str', default='mult', help='String describing the reduction process, i.e., how to combine observable A and B. Valid: \'mult\' for multiplication and \'dot\' for dot product.') parser.add_option('-s', '--spread', dest='his', action='store', type='str', default=None, help='Do analysis for distribution using a histogram with given either the number of bins, 0 for integer step bins, 3 floats defining minimum, maximum and step or a string which is a valid option for numpy.histogram.') parser.add_option('-v', '--value', dest='func', type='str', action='store', default='mean', help='Function to calculate, i.e. mean, std, percentile x.') parser.add_option('-o','--output', dest='outfile', type='str', default='correl.dat', help='Output file. Written in to the given directory.') return parser
[docs] def start(): """ Wrapper for starting the program. Needed for good unit test coding. """ parser = getParser() opts, args = parser.parse_args(sys.argv[1:]) directory = opts.dir + '/' commands = {} if opts.File != None: commands = doParse(opts.File, directory, parser) else: commands[0] = (opts.obsA, opts.obsB, opts.projection, opts.red, opts.his, opts.func, directory + opts.outfile) # called without any option or -h set -> print help for everything if not sys.argv[1:] or opts.help == True: parser.print_help() print("") ostring = "-h" position(ostring, None, None, None, None) print("") ostring = "-h" angle(ostring, None, None, None, None) print("") ostring = "-h" charge(ostring, None, None, None, None) print("") ostring = "-h" velocity(ostring, None, None, None, None) print("") ostring = "-h" kinetic(ostring, None, None, None, None) print("") ostring = "-h" state(ostring, None, None, None, None) print("") ostring = "-h" spec(ostring, None, None, None, None) print("") sys.exit(0) # Obtain list of trj directories dirs = uti.getDirectoryList(directory, "pickle.dat") # calulate correlation function correlation_function(commands, dirs, opts.nbootstrap, opts.step, opts.form, opts.nozero)
if __name__ == "__main__": start()