#!/usr/bin/env python
#* **************************************************************************
#*
#* CDTK, Chemical Dynamics Toolkit
#* A modular system for chemical dynamics applications and more
#*
#* Copyright (C) 2011, 2012, 2013, 2014, 2015, 2016
#* Oriol Vendrell, DESY, <oriol.vendrell@desy.de>
#*
#* Copyright (C) 2017, 2018, 2019
#* Ralph Welsch, DESY, <ralph.welsch@desy.de>
#*
#* Copyright (C) 2020, 2021, 2022, 2023
#* Ludger Inhester, DESY, ludger.inhester@cfel.de
#*
#* This file is part of CDTK.
#*
#* CDTK is free software: you can redistribute it and/or modify
#* it under the terms of the GNU General Public License as published by
#* the Free Software Foundation, either version 3 of the License, or
#* (at your option) any later version.
#*
#* This program is distributed in the hope that it will be useful,
#* but WITHOUT ANY WARRANTY; without even the implied warranty of
#* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#* GNU General Public License for more details.
#*
#* You should have received a copy of the GNU General Public License
#* along with this program. If not, see <http://www.gnu.org/licenses/>.
#*
#* **************************************************************************
"""
converter for cdtj to mdtraj format
"""
import argparse
import h5py
import numpy as np
import json
import CDTK.Tools.Conversion as cv
import os
[docs]
def parseCommandLine():
parser = argparse.ArgumentParser(
description='read cdtk trajectory dir and convert it into hdf5-mdtraj format.')
parser.add_argument('dir', type=str,
help='cdtk dir')
parser.add_argument('-o',
dest='outputfile',
default='out.h5',
type=str,
help='hdf5 outputfilename')
args = parser.parse_args()
return args
[docs]
def readLog(filename):
"""reads log files
Args:
filename (str): _description_
Returns:
list of dictionaries: each element of the list as key time and value
"""
data = []
with open(filename, 'r') as f:
for line in f:
ls = line.split()
time = float(ls[0])
ldata = np.array([float(a) for a in ls[1:]])
data.append({'time': time, 'value': ldata})
return data
[docs]
def readCoordLog(filename):
data = []
with open(filename, 'r') as f:
for line in f:
ls = line.split()
time = float(ls[0])
geom = np.array([float(a) for a in ls[1:]])
geom = geom.reshape((geom.shape[0]//3, 3))
data.append({'time': time, 'geom': geom})
return data
[docs]
def readOccLog(filename):
data = []
with open(filename, 'r') as f:
for line in f:
ls = line.split()
time = float(ls[0])
occ = [ int(o) for o in ''.join(ls[1:]).strip('[]').split(',')]
data.append({
'time': time,
'occ': occ
})
return data
[docs]
def readPulseLog(filename):
data = []
with open(filename, 'r') as f:
for line in f:
ls = line.split()
time = float(ls[0])
pulse = float(ls[1])
data.append({
'time': time,
'pulse': pulse
})
return data
[docs]
def readEventLog(filename):
data = []
with open(filename, 'r') as f:
for line in f:
ls = line.split()
time = float(ls[0])
eType = ls[1]
energy = ls[-1]
rate = ls[-2]
occ = [ int(o) for o in ''.join(ls[2:-2]).strip('[]').split(',')]
data.append({
'time': time,
'type': eType,
'energy': energy,
'rate': rate,
'occ': occ
})
return data
[docs]
def readAtomData(filename, convertType=float):
data = []
with open(filename, 'r') as f:
for line in f:
data.append(convertType(line.rstrip()))
return data
[docs]
def readEnergyLog(filename):
data = []
with open(filename, 'r') as f:
for line in f:
ls = line.split()
time = float(ls[0])
ekin = float(ls[1])
epot = float(ls[2])
etot = float(ls[3])
data.append({
'time': time,
'ekin': ekin,
'epot': epot,
'etot': etot,
})
return data
[docs]
def readVadLog(filename):
data = []
with open(filename, 'r') as f:
for line in f:
ls = line.split()
time = float(ls[0])
vAd = np.array(list(map(float, ls[1:])))
data.append({
'time': time,
'vAd': vAd,
})
return data
[docs]
def readCurrentChargeLog(filename):
data = []
with open(filename, 'r') as f:
for line in f:
ls = line.split()
time = float(ls[0])
charge = np.array([float(l) for l in ls[1:]])
data.append({
'time': time,
'charge': charge
})
return data
[docs]
def readCoefLog(filename):
data = []
with open(filename, 'r') as f:
for line in f:
ls = line.split()
time = float(ls[0])
coef = np.array([float(l) for l in ls[1:]])
data.append({
'time': time,
'coef': coef
})
return data
[docs]
def readStateLog(filename):
data = []
with open(filename, 'r') as f:
for line in f:
ls = line.split()
time = float(ls[0])
state = int(ls[1])
data.append({'time': time, 'state': state})
return data
[docs]
def readTrivCrossLog(filename):
data = []
if not os.path.exists(filename):
return None
with open(filename, 'r') as f:
for line in f:
if line.startswith('#'):
continue
ls = line.split()
time = float(ls[0])
sFrom = int(ls[1])
sTo = int(ls[2])
data.append({'time': time, 'from': sFrom, 'to': sTo})
return data
[docs]
def readChargeLog(filename):
data = []
with open(filename, 'r') as f:
for line in f:
ls = line.split()
if len(ls) < 2:
continue
time = float(ls[0])
state = int(ls[1])
charge = np.array([float(l) for l in ls[2:]])
data.append({
'time': time,
'state': state,
'charge': charge
})
return data
[docs]
def readDir(dirname):
atommass = readAtomData(dirname + '/atommass', float)
atomtype = readAtomData(dirname + '/atomlist', str)
posData = readCoordLog(dirname + '/R.log')
velData = readCoordLog(dirname + '/V.log')
energyData = readEnergyLog(dirname + '/E.log')
if os.path.exists(dirname + '/V_ad.log'):
vAdData = readVadLog(dirname + '/V_ad.log')
else:
vAdData = None
if os.path.exists(dirname + '/currentMulliken.log'):
currentChargeData = readCurrentChargeLog(dirname + '/currentMulliken.log')
else:
currentChargeData = None
if os.path.exists(dirname + '/trivialCrossings.log'):
trivCrossData = readTrivCrossLog(dirname + '/trivialCrossings.log')
else:
trivCrossData = None
if os.path.exists(dirname + '/C.log'):
coefData = readCoefLog(dirname + '/C.log')
else:
coefData = None
if os.path.exists(dirname + '/S.log'):
stateData = readStateLog(dirname + '/S.log')
else:
stateData = None
if os.path.exists(dirname + '/mulliken.log'):
chargeData = readChargeLog(dirname + '/mulliken.log')
else:
chargeData = None
if os.path.exists(dirname + '/charge.log'):
totalChargeData = readLog(dirname + '/charge.log')
else:
totalChargeData = None
if os.path.exists(dirname + '/occupation.log'):
occData = readOccLog(dirname + '/occupation.log')
else:
occData = None
if os.path.exists(dirname + '/event.log'):
eventData = readEventLog(dirname + '/event.log')
else:
eventData = None
if os.path.exists(dirname + '/partial.log'):
partialChargeData = readLog(dirname + '/partial.log')
else:
partialChargeData = None
if os.path.exists(dirname + '/pulse.log'):
pulseData = readPulseLog(dirname + '/pulse.log')
else:
pulseData = None
return {
'atomtype': atomtype,
'atommass': atommass,
'posData': posData,
'velData': velData,
'energyData': energyData,
'partialChargeData': partialChargeData,
'totalChargeData': totalChargeData,
'eventData': eventData,
'occData': occData,
'pulseData': pulseData,
'stateData': stateData,
'chargeData': chargeData,
'currentChargeData': currentChargeData,
'trivCrossData': trivCrossData,
'energyData': energyData,
'vAdData': vAdData,
'coefData': coefData,
}
[docs]
def start():
args = parseCommandLine()
data = readDir(args.dir)
with h5py.File(args.outputfile, 'w') as f:
f.attrs['Convention'] = 'Pande'
f.attrs["ConventionVersion"] = "1.1"
f.attrs["program"] = "CDTK"
f.attrs["programVersion"] = "unkown" # TODO find that
f.attrs["title"] = f"{args.dir}"
f['coordinates'] = np.array(
[d['geom'] for d in data['posData']], dtype='float32') * cv.au2an * 0.1
f['coordinates'].attrs["units"] = 'nanometers'
f['velocities'] = np.array([d['geom'] for d in data['velData']],
dtype='float32') * cv.au2an * 0.1 / (cv.au2fs * 1.e-3)
f['velocities'].attrs["units"] = 'nanometers/picosecond'
time = np.array([d['time'] for d in data['posData']], dtype='float32')
f['time'] = np.array([d['time'] for d in data['posData']],
dtype='float32') * (cv.au2fs * 1.e-3)
f['time'].attrs["units"] = 'picosecond'
atoms = [{'element': el, "name": el + str(i+1), "mass": m, "index": i}
for i, (el, m) in enumerate(zip(data["atomtype"], data["atommass"]))]
residues = [{
"index": 0,
"name": "HET",
"resSeq": 1,
'atoms': atoms
}]
chains = [
{'index': 0,
'residues': residues}
]
topology = {
'bonds': [],
'chains': chains,
}
f.create_dataset("topology", data=[
json.dumps(topology).encode('ascii')])
# check that time is consitent between energy and pos Data: difference smaller than 1% of time step
assert( abs((np.array([d["time"] for d in data["energyData"]]) - np.array([d["time"] for d in data["posData"]]) < \
0.01 * (data["posData"][1]["time"]-data["posData"][0]["time"])).all() ))
f['eKin'] = np.array([d['ekin'] for d in data['energyData']],
dtype='float32')
f['eKin'].attrs['units'] = 'Hartree'
f['ePot'] = np.array([d['epot'] for d in data['energyData']],
dtype='float32')
f['ePot'].attrs['units'] = 'Hartree'
f['eTot'] = np.array([d['etot'] for d in data['energyData']],
dtype='float32')
f['eTot'].attrs['units'] = 'Hartree'
if data['pulseData'] is not None:
f['pulse'] = np.array([d['pulse'] for d in data['pulseData']],
dtype='float32')
f['pulse'].attrs['units'] = 'unclear'
if data['stateData'] is not None:
f['state'] = np.array([d['state'] for d in data['stateData']])
if data['currentChargeData'] is not None:
f['currentCharge'] = np.array([d['charge'] for d in data['currentChargeData']], dtype='float32')
if data['coefData'] is not None:
coefs = np.array([d['coef'] for d in data['coefData']], dtype='float32')
f['coefs'] = coefs.copy()
# in the simulation data the swap in the coefficients when a trivial crossing occurs
# appears one step too early (for technical reasons ... )
# here we correct for this
if data['trivCrossData'] is not None:
for d in data['trivCrossData']:
s = np.argmin(abs(time - d['time']))
if s > 0:
f['coefs'][s-1, [2 * d['from'], 2 * d['from'] + 1 ]] = coefs[s-1, [2 * d['to'], 2 * d['to'] + 1 ]]
if data['totalChargeData'] is not None:
# time difference in pos vs charge data is smaller than 1% of time step
assert( abs((np.array([d["time"] for d in data["totalChargeData"]]) - np.array([d["time"] for d in data["posData"]]) < \
0.01 * (data["posData"][1]["time"]-data["posData"][0]["time"])).all() ))
f['charge'] = np.array([d['value'][0] for d in data['totalChargeData']], dtype='float32')
if data['partialChargeData'] is not None:
f['partialCharge'] = np.array([d['value'] for d in data['partialChargeData']], dtype='float32')
if data['eventData'] is not None:
f['eventTime'] = np.array([d['time'] for d in data['eventData']], dtype='float32')
f['eventType'] = np.array([d['type'] for d in data['eventData']], dtype='|S1')
f['eventEnergy'] = np.array([d['energy'] for d in data['eventData']], dtype='float32')
f['eventOcc'] = np.array([d['occ'] for d in data['eventData']], dtype='int')
if data['chargeData'] is not None:
states = np.unique([d['state'] for d in data['chargeData']])
for s in states:
f[f'chargeState{s}'] = np.array([d['charge'] for d in data['chargeData'] if d['state']==s], dtype='float32')
f.close()
if __name__ == "__main__":
start()