Source code for CDTK.cdtk2hdf5

#!/usr/bin/env python
#*  **************************************************************************
#*
#*  CDTK, Chemical Dynamics Toolkit
#*  A modular system for chemical dynamics applications and more
#*
#*  Copyright (C) 2011, 2012, 2013, 2014, 2015, 2016
#*  Oriol Vendrell, DESY, <oriol.vendrell@desy.de>
#*
#*  Copyright (C) 2017, 2018, 2019
#*  Ralph Welsch, DESY, <ralph.welsch@desy.de>
#*
#*  Copyright (C) 2020, 2021, 2022, 2023
#*  Ludger Inhester, DESY, ludger.inhester@cfel.de
#*
#*  This file is part of CDTK.
#*
#*  CDTK is free software: you can redistribute it and/or modify
#*  it under the terms of the GNU General Public License as published by
#*  the Free Software Foundation, either version 3 of the License, or
#*  (at your option) any later version.
#*
#*  This program is distributed in the hope that it will be useful,
#*  but WITHOUT ANY WARRANTY; without even the implied warranty of
#*  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#*  GNU General Public License for more details.
#*
#*  You should have received a copy of the GNU General Public License
#*  along with this program.  If not, see <http://www.gnu.org/licenses/>.
#*
#*  **************************************************************************

""" 
converter for cdtj to mdtraj format
"""
 
import argparse
import h5py
import numpy as np
import json
import CDTK.Tools.Conversion as cv
import os
 
[docs] def parseCommandLine(): parser = argparse.ArgumentParser( description='read cdtk trajectory dir and convert it into hdf5-mdtraj format.') parser.add_argument('dir', type=str, help='cdtk dir') parser.add_argument('-o', dest='outputfile', default='out.h5', type=str, help='hdf5 outputfilename') args = parser.parse_args() return args
[docs] def readLog(filename): """reads log files Args: filename (str): _description_ Returns: list of dictionaries: each element of the list as key time and value """ data = [] with open(filename, 'r') as f: for line in f: ls = line.split() time = float(ls[0]) ldata = np.array([float(a) for a in ls[1:]]) data.append({'time': time, 'value': ldata}) return data
[docs] def readCoordLog(filename): data = [] with open(filename, 'r') as f: for line in f: ls = line.split() time = float(ls[0]) geom = np.array([float(a) for a in ls[1:]]) geom = geom.reshape((geom.shape[0]//3, 3)) data.append({'time': time, 'geom': geom}) return data
[docs] def readOccLog(filename): data = [] with open(filename, 'r') as f: for line in f: ls = line.split() time = float(ls[0]) occ = [ int(o) for o in ''.join(ls[1:]).strip('[]').split(',')] data.append({ 'time': time, 'occ': occ }) return data
[docs] def readPulseLog(filename): data = [] with open(filename, 'r') as f: for line in f: ls = line.split() time = float(ls[0]) pulse = float(ls[1]) data.append({ 'time': time, 'pulse': pulse }) return data
[docs] def readEventLog(filename): data = [] with open(filename, 'r') as f: for line in f: ls = line.split() time = float(ls[0]) eType = ls[1] energy = ls[-1] rate = ls[-2] occ = [ int(o) for o in ''.join(ls[2:-2]).strip('[]').split(',')] data.append({ 'time': time, 'type': eType, 'energy': energy, 'rate': rate, 'occ': occ }) return data
[docs] def readAtomData(filename, convertType=float): data = [] with open(filename, 'r') as f: for line in f: data.append(convertType(line.rstrip())) return data
[docs] def readEnergyLog(filename): data = [] with open(filename, 'r') as f: for line in f: ls = line.split() time = float(ls[0]) ekin = float(ls[1]) epot = float(ls[2]) etot = float(ls[3]) data.append({ 'time': time, 'ekin': ekin, 'epot': epot, 'etot': etot, }) return data
[docs] def readVadLog(filename): data = [] with open(filename, 'r') as f: for line in f: ls = line.split() time = float(ls[0]) vAd = np.array(list(map(float, ls[1:]))) data.append({ 'time': time, 'vAd': vAd, }) return data
[docs] def readCurrentChargeLog(filename): data = [] with open(filename, 'r') as f: for line in f: ls = line.split() time = float(ls[0]) charge = np.array([float(l) for l in ls[1:]]) data.append({ 'time': time, 'charge': charge }) return data
[docs] def readCoefLog(filename): data = [] with open(filename, 'r') as f: for line in f: ls = line.split() time = float(ls[0]) coef = np.array([float(l) for l in ls[1:]]) data.append({ 'time': time, 'coef': coef }) return data
[docs] def readStateLog(filename): data = [] with open(filename, 'r') as f: for line in f: ls = line.split() time = float(ls[0]) state = int(ls[1]) data.append({'time': time, 'state': state}) return data
[docs] def readTrivCrossLog(filename): data = [] if not os.path.exists(filename): return None with open(filename, 'r') as f: for line in f: if line.startswith('#'): continue ls = line.split() time = float(ls[0]) sFrom = int(ls[1]) sTo = int(ls[2]) data.append({'time': time, 'from': sFrom, 'to': sTo}) return data
[docs] def readChargeLog(filename): data = [] with open(filename, 'r') as f: for line in f: ls = line.split() if len(ls) < 2: continue time = float(ls[0]) state = int(ls[1]) charge = np.array([float(l) for l in ls[2:]]) data.append({ 'time': time, 'state': state, 'charge': charge }) return data
[docs] def readDir(dirname): atommass = readAtomData(dirname + '/atommass', float) atomtype = readAtomData(dirname + '/atomlist', str) posData = readCoordLog(dirname + '/R.log') velData = readCoordLog(dirname + '/V.log') energyData = readEnergyLog(dirname + '/E.log') if os.path.exists(dirname + '/V_ad.log'): vAdData = readVadLog(dirname + '/V_ad.log') else: vAdData = None if os.path.exists(dirname + '/currentMulliken.log'): currentChargeData = readCurrentChargeLog(dirname + '/currentMulliken.log') else: currentChargeData = None if os.path.exists(dirname + '/trivialCrossings.log'): trivCrossData = readTrivCrossLog(dirname + '/trivialCrossings.log') else: trivCrossData = None if os.path.exists(dirname + '/C.log'): coefData = readCoefLog(dirname + '/C.log') else: coefData = None if os.path.exists(dirname + '/S.log'): stateData = readStateLog(dirname + '/S.log') else: stateData = None if os.path.exists(dirname + '/mulliken.log'): chargeData = readChargeLog(dirname + '/mulliken.log') else: chargeData = None if os.path.exists(dirname + '/charge.log'): totalChargeData = readLog(dirname + '/charge.log') else: totalChargeData = None if os.path.exists(dirname + '/occupation.log'): occData = readOccLog(dirname + '/occupation.log') else: occData = None if os.path.exists(dirname + '/event.log'): eventData = readEventLog(dirname + '/event.log') else: eventData = None if os.path.exists(dirname + '/partial.log'): partialChargeData = readLog(dirname + '/partial.log') else: partialChargeData = None if os.path.exists(dirname + '/pulse.log'): pulseData = readPulseLog(dirname + '/pulse.log') else: pulseData = None return { 'atomtype': atomtype, 'atommass': atommass, 'posData': posData, 'velData': velData, 'energyData': energyData, 'partialChargeData': partialChargeData, 'totalChargeData': totalChargeData, 'eventData': eventData, 'occData': occData, 'pulseData': pulseData, 'stateData': stateData, 'chargeData': chargeData, 'currentChargeData': currentChargeData, 'trivCrossData': trivCrossData, 'energyData': energyData, 'vAdData': vAdData, 'coefData': coefData, }
[docs] def start(): args = parseCommandLine() data = readDir(args.dir) with h5py.File(args.outputfile, 'w') as f: f.attrs['Convention'] = 'Pande' f.attrs["ConventionVersion"] = "1.1" f.attrs["program"] = "CDTK" f.attrs["programVersion"] = "unkown" # TODO find that f.attrs["title"] = f"{args.dir}" f['coordinates'] = np.array( [d['geom'] for d in data['posData']], dtype='float32') * cv.au2an * 0.1 f['coordinates'].attrs["units"] = 'nanometers' f['velocities'] = np.array([d['geom'] for d in data['velData']], dtype='float32') * cv.au2an * 0.1 / (cv.au2fs * 1.e-3) f['velocities'].attrs["units"] = 'nanometers/picosecond' time = np.array([d['time'] for d in data['posData']], dtype='float32') f['time'] = np.array([d['time'] for d in data['posData']], dtype='float32') * (cv.au2fs * 1.e-3) f['time'].attrs["units"] = 'picosecond' atoms = [{'element': el, "name": el + str(i+1), "mass": m, "index": i} for i, (el, m) in enumerate(zip(data["atomtype"], data["atommass"]))] residues = [{ "index": 0, "name": "HET", "resSeq": 1, 'atoms': atoms }] chains = [ {'index': 0, 'residues': residues} ] topology = { 'bonds': [], 'chains': chains, } f.create_dataset("topology", data=[ json.dumps(topology).encode('ascii')]) # check that time is consitent between energy and pos Data: difference smaller than 1% of time step assert( abs((np.array([d["time"] for d in data["energyData"]]) - np.array([d["time"] for d in data["posData"]]) < \ 0.01 * (data["posData"][1]["time"]-data["posData"][0]["time"])).all() )) f['eKin'] = np.array([d['ekin'] for d in data['energyData']], dtype='float32') f['eKin'].attrs['units'] = 'Hartree' f['ePot'] = np.array([d['epot'] for d in data['energyData']], dtype='float32') f['ePot'].attrs['units'] = 'Hartree' f['eTot'] = np.array([d['etot'] for d in data['energyData']], dtype='float32') f['eTot'].attrs['units'] = 'Hartree' if data['pulseData'] is not None: f['pulse'] = np.array([d['pulse'] for d in data['pulseData']], dtype='float32') f['pulse'].attrs['units'] = 'unclear' if data['stateData'] is not None: f['state'] = np.array([d['state'] for d in data['stateData']]) if data['currentChargeData'] is not None: f['currentCharge'] = np.array([d['charge'] for d in data['currentChargeData']], dtype='float32') if data['coefData'] is not None: coefs = np.array([d['coef'] for d in data['coefData']], dtype='float32') f['coefs'] = coefs.copy() # in the simulation data the swap in the coefficients when a trivial crossing occurs # appears one step too early (for technical reasons ... ) # here we correct for this if data['trivCrossData'] is not None: for d in data['trivCrossData']: s = np.argmin(abs(time - d['time'])) if s > 0: f['coefs'][s-1, [2 * d['from'], 2 * d['from'] + 1 ]] = coefs[s-1, [2 * d['to'], 2 * d['to'] + 1 ]] if data['totalChargeData'] is not None: # time difference in pos vs charge data is smaller than 1% of time step assert( abs((np.array([d["time"] for d in data["totalChargeData"]]) - np.array([d["time"] for d in data["posData"]]) < \ 0.01 * (data["posData"][1]["time"]-data["posData"][0]["time"])).all() )) f['charge'] = np.array([d['value'][0] for d in data['totalChargeData']], dtype='float32') if data['partialChargeData'] is not None: f['partialCharge'] = np.array([d['value'] for d in data['partialChargeData']], dtype='float32') if data['eventData'] is not None: f['eventTime'] = np.array([d['time'] for d in data['eventData']], dtype='float32') f['eventType'] = np.array([d['type'] for d in data['eventData']], dtype='|S1') f['eventEnergy'] = np.array([d['energy'] for d in data['eventData']], dtype='float32') f['eventOcc'] = np.array([d['occ'] for d in data['eventData']], dtype='int') if data['chargeData'] is not None: states = np.unique([d['state'] for d in data['chargeData']]) for s in states: f[f'chargeState{s}'] = np.array([d['charge'] for d in data['chargeData'] if d['state']==s], dtype='float32') f.close()
if __name__ == "__main__": start()