DESY Hbb Analysis Framework
Functions | Variables
naf_submit Namespace Reference

Functions

def basenameConfigParameter (config, name)
 
def confirm ()
 
def createCondorDirsSubmit (maindir)
 
def createCondorSubmit (jobdir)
 
def createConfigParameter (config, parameter)
 
def createJobScript (exedir, jobid, exe, cfg)
 
def get_job_log (jobdir)
 
def get_job_status (jobdir)
 
def getConfigParameter (config, parameter)
 
def job_err (jobdir)
 
def removeConfigParameter (config, parameter)
 
def replaceConfigParameter (config, parameter, newpar)
 
def resubmit (submission_dir)
 
def resubmit_expert (submission_dir)
 
def status (submission_dir, failed_only=False)
 
def submission ()
 

Variables

 action
 
 args = parser.parse_args()
 
string dash = ' ----------------------------------------------------------------------------------------------------------'
 
 default
 
 dest
 
 help
 
 int
 
 parser = ArgumentParser(prog='naf_submit.py', formatter_class=lambda prog: HelpFormatter(prog,indent_increment=6,max_help_position=80,width=280), description='Prepare, submit and check jobs to NAF HTCondor batch system',add_help=True)
 
 parser_status = parser.add_argument_group('status','show and modify status')
 
 parser_submission = parser.add_argument_group('submission','prepare and submit jobs')
 
 type
 

Function Documentation

def naf_submit.basenameConfigParameter (   config,
  name 
)

Definition at line 87 of file naf_submit.py.

87 def basenameConfigParameter( config, name ):
88  with open(config, "r") as f:
89  lines = f.readlines()
90  with open(config, "w") as f:
91  for line in lines:
92  f.write(re.sub(name, os.path.basename(name), line))
93 
94 
def basenameConfigParameter(config, name)
Definition: naf_submit.py:87
def naf_submit.confirm ( )

Definition at line 483 of file naf_submit.py.

References ConfFile_cfg.input.

Referenced by resubmit(), and resubmit_expert().

483 def confirm():
484  answer = " "
485  while answer not in ["y", "n", ""]:
486  answer = input(" Please confirm this action (Y/[N])? ").lower()
487  return (answer == "y" or answer != "")
488 
489 
490 # --- main code ---
491 
492 # parsing arguments
def confirm()
Definition: naf_submit.py:483
def naf_submit.createCondorDirsSubmit (   maindir)

Definition at line 95 of file naf_submit.py.

Referenced by submission().

96  libpath = os.environ['LD_LIBRARY_PATH']
97  condor_submit = """
98 Requirements = ( OpSysAndVer == "CentOS7" )
99 getenv = True
100 executable = $(jobdir)/job.sh
101 log = $(jobdir)/xjob_$(Cluster)_$(Process).log
102 output = $(jobdir)/xjob_$(Cluster)_$(Process).out
103 error = $(jobdir)/xjob_$(Cluster)_$(Process).err
104 environment = "LD_LIBRARY_PATH_STORED=$ENV(LD_LIBRARY_PATH)"
105 
106 queue jobdir matching dirs job_*
107  """
108  with open(f'{maindir}/jobs.submit', 'w') as condor_file:
109  print(condor_submit,file=condor_file)
110 
def createCondorDirsSubmit(maindir)
Definition: naf_submit.py:95
def naf_submit.createCondorSubmit (   jobdir)

Definition at line 111 of file naf_submit.py.

Referenced by submission().

111 def createCondorSubmit(jobdir):
112  libpath = os.environ['LD_LIBRARY_PATH']
113  condor_submit = """
114 Requirements = ( OpSysAndVer == "CentOS7" )
115 getenv = True
116 executable = job.sh
117 log = xjob_$(Cluster)_$(Process).log
118 output = xjob_$(Cluster)_$(Process).out
119 error = xjob_$(Cluster)_$(Process).err
120 environment = "LD_LIBRARY_PATH_STORED=$ENV(LD_LIBRARY_PATH)"
121 
122 queue
123  """
124  with open(f'{jobdir}/job.submit', 'w') as condor_file:
125  print(condor_submit,file=condor_file)
126 
def createCondorSubmit(jobdir)
Definition: naf_submit.py:111
def naf_submit.createConfigParameter (   config,
  parameter 
)

Definition at line 33 of file naf_submit.py.

Referenced by submission().

33 def createConfigParameter( config, parameter ):
34  exist = False
35  with open(config,"r") as f:
36  lines = f.readlines()
37 
38  for line in lines:
39  line = line.replace(" ","").strip()
40  if len(line) == 0:
41  continue
42  if line[0] != '#' and line.split("=")[0] == parameter:
43  exist = True
44  break
45 
46  if not exist:
47  with open(config, "w") as f:
48  f.write(parameter+" = \n")
49  for line in lines:
50  f.write(line)
51 
52 
def createConfigParameter(config, parameter)
Definition: naf_submit.py:33
def naf_submit.createJobScript (   exedir,
  jobid,
  exe,
  cfg 
)

Definition at line 127 of file naf_submit.py.

Referenced by submission().

127 def createJobScript(exedir,jobid, exe, cfg):
128  js = """
129 if [[ ! -e "{}" ]]; then
130  cd {}
131 fi
132 export LD_LIBRARY_PATH=$LD_LIBRARY_PATH_STORED
133 {} -c {}
134  """.format(cfg,jobid,exe,cfg)
135  js_name = f'{exedir}/job.sh'
136  with open(js_name, 'w') as job_script:
137  print(js,file=job_script)
138  os.chmod(js_name, 0o744)
139 
140 
def createJobScript(exedir, jobid, exe, cfg)
Definition: naf_submit.py:127
def naf_submit.get_job_log (   jobdir)

Definition at line 304 of file naf_submit.py.

Referenced by get_job_status().

304 def get_job_log(jobdir):
305  files = os.listdir(jobdir)
306  log = [ jobdir+'/'+x for x in files if x.endswith('.log') ]
307  if len(log) > 1:
308  return max(log, key=os.path.getctime)
309  elif len(log) == 1:
310  return log[0]
311  else:
312  return None
313 
314 # given the directory of a job_xxx check if there is anything in the most recent err file
def get_job_log(jobdir)
Definition: naf_submit.py:304
def naf_submit.get_job_status (   jobdir)

Definition at line 331 of file naf_submit.py.

References get_job_log().

Referenced by resubmit(), and status().

331 def get_job_status(jobdir):
332  job_status = {}
333  job_status['submission'] = False
334  job_status['termination'] = False
335  job_status['execution'] = False
336  job_status['abortion'] = False
337  job_status['error'] = True
338  job_status['jobid'] = ' '
339  log = get_job_log(jobdir)
340  if not log:
341  return job_status
342  logname = os.path.basename(log)
343  job_status['jobid'] = logname.split('.')[0].split('_')[1] + '.' + logname.split('.')[0].split('_')[2]
344  f = open(log,'r')
345  for l in f:
346  if re.search('Job submitted from host',l):
347  job_status['submission'] = True
348  if re.search('Job executing on host',l):
349  job_status['execution'] = True
350  if re.search('Job terminated.',l):
351  job_status['termination'] = True
352  if re.search('Job was evicted',l) or re.search('Job was aborted',l):
353  job_status['abortion'] = True
354  if re.search('return value 0',l):
355  job_status['error'] = False
356 
357  return job_status
358 
359 
360 
361 
def get_job_log(jobdir)
Definition: naf_submit.py:304
def get_job_status(jobdir)
Definition: naf_submit.py:331
def naf_submit.getConfigParameter (   config,
  parameter 
)

Definition at line 13 of file naf_submit.py.

Referenced by submission().

13 def getConfigParameter( config, parameter ):
14  p = None
15  exist = False
16  with open(config) as f:
17  for line in f:
18  line = line.replace(" ","").strip()
19  if len(line) == 0:
20  continue
21  if line[0] != '#' and line.split("=")[0] == parameter:
22  par = line.split("=")[1]
23  exist = True
24  p = [parameter,par]
25  if parameter=="ntuplesList" and "tools:" in p[1]:
26  ntp_path = os.getenv('CMSSW_BASE')
27  ntp_path += "/src/Analysis/Tools/data/ntuples/"
28  p[1] = p[1].replace("tools:",ntp_path)
29  break
30 
31  return p
32 
def getConfigParameter(config, parameter)
Definition: naf_submit.py:13
def naf_submit.job_err (   jobdir)

Definition at line 315 of file naf_submit.py.

Referenced by status().

315 def job_err(jobdir):
316  files = os.listdir(jobdir)
317  errs = [ jobdir+'/'+x for x in files if x.endswith('.err') ]
318  if len(errs) > 1:
319  err = max(errs, key=os.path.getctime)
320  elif len(errs) == 1:
321  err = errs[0]
322  else:
323  # if err file does not exist, there is no error
324  return False
325 
326  return (os.path.getsize(err) > 0)
327 
328 
329 
330 # given the directory of a job_xxxx check the status in the most recent log file
def job_err(jobdir)
Definition: naf_submit.py:315
def naf_submit.removeConfigParameter (   config,
  parameter 
)

Definition at line 73 of file naf_submit.py.

Referenced by submission().

73 def removeConfigParameter( config, parameter ):
74  with open(config, "r") as f:
75  lines = f.readlines()
76  with open(config, "w") as f:
77  for line in lines:
78  l = line.replace(" ","").strip()
79  if len(l) < 1:
80  f.write(line)
81  continue
82  if l[0] != '#' and l.split('=')[0] == parameter:
83  continue;
84  f.write(line)
85 
86 
def removeConfigParameter(config, parameter)
Definition: naf_submit.py:73
def naf_submit.replaceConfigParameter (   config,
  parameter,
  newpar 
)

Definition at line 53 of file naf_submit.py.

Referenced by submission().

53 def replaceConfigParameter( config, parameter, newpar ):
54  par = None
55  with open(config, "r") as f:
56  lines = f.readlines()
57  with open(config, "w") as f:
58  for line in lines:
59  l = line.replace(" ","").strip()
60  if len(l) < 1:
61  f.write(line)
62  continue
63  if l[0] != '#' and l.split('=')[0] == parameter:
64  if l.split('=')[1] == "" :
65  par = "="
66  newpar = " = " + newpar
67  else:
68  par = l.split('=')[1]
69  f.write(re.sub(par, newpar, line))
70  else:
71  f.write(line)
72 
def replaceConfigParameter(config, parameter, newpar)
Definition: naf_submit.py:53
def naf_submit.resubmit (   submission_dir)

Definition at line 428 of file naf_submit.py.

References confirm(), get_job_status(), and status().

428 def resubmit(submission_dir):
429  failed_jobs = status(submission_dir,True)
430  if failed_jobs == 0:
431  return
432  jobs_dir = os.listdir(submission_dir)
433  jobs_dir = [ x for x in jobs_dir if 'job_' in x ]
434  jobs_dir.sort()
435  cwd = os.getcwd()
436  print(' ')
437  print(' *** Resubmit failed jobs ***')
438  print(dash)
439  confirmed = confirm()
440  if not confirmed:
441  print(dash)
442  print(' ')
443  return
444  for jj in jobs_dir:
445  j = submission_dir+'/'+jj
446  js = get_job_status(j)
447  if js['abortion'] or ( js['termination'] and js['error'] ):
448  os.chdir(j)
449  os.system('condor_submit job.submit')
450  os.chdir(cwd)
451  print(dash)
452  print(' ')
453  os.chdir(cwd)
454  sleep(2)
455  status(submission_dir)
456 
457 
458 
459 # resubmit jobs expert mode
def confirm()
Definition: naf_submit.py:483
def status(submission_dir, failed_only=False)
Definition: naf_submit.py:362
def resubmit(submission_dir)
Definition: naf_submit.py:428
def get_job_status(jobdir)
Definition: naf_submit.py:331
def naf_submit.resubmit_expert (   submission_dir)

Definition at line 460 of file naf_submit.py.

References confirm(), and status().

460 def resubmit_expert(submission_dir):
461  failed_jobs = status(submission_dir,True)
462  if failed_jobs == 0:
463  return
464  cwd = os.getcwd()
465 # submission_dir = args.resubmit_expert
466  os.chdir(submission_dir)
467  print(' ')
468  print(' *** Resubmit jobs (EXPERT) ***')
469  print(dash)
470  confirmed = confirm()
471  if not confirmed:
472  print(dash)
473  print(' ')
474  return
475  os.system('condor_submit jobs.submit')
476  print(dash)
477  print(' ')
478  os.chdir(cwd)
479  sleep(2)
480  status(submission_dir)
481 
482 
def confirm()
Definition: naf_submit.py:483
def status(submission_dir, failed_only=False)
Definition: naf_submit.py:362
def resubmit_expert(submission_dir)
Definition: naf_submit.py:460
def naf_submit.status (   submission_dir,
  failed_only = False 
)

Definition at line 362 of file naf_submit.py.

References get_job_status(), and job_err().

Referenced by analysis::tools::Jet.associatePartons(), analysis::ntuple::Candidates< T >.Kinematics(), analysis::tools::Candidate.matchTo(), analysis::tools::RecoTrack.quality(), resubmit(), resubmit_expert(), and submission().

362 def status(submission_dir, failed_only=False):
363  failed_jobs = 0
364 # submission_dir = args.status
365  finished_dir = submission_dir+'/finished_jobs'
366  jobs_dir = os.listdir(submission_dir)
367  jobs_dir = [ x for x in jobs_dir if 'job_' in x ]
368  jobs_dir.sort()
369  print(' ')
370  header = ' *** STATUS OF JOBS ***'
371  if failed_only:
372  header = ' *** FAILED JOBS ***'
373  print(header)
374  print('\n '+submission_dir)
375  print(' ')
376  dash= ' ----------------------------------------------------------------------------------------------------------'
377  print(dash)
378  print(' job finished running submitted aborted error condor_id (latest)')
379  print(dash)
380  for jj in jobs_dir:
381  j = submission_dir+'/'+jj
382  js = get_job_status(j)
383  je = job_err(j)
384  finished = ' '
385  running = ' '
386  submitted = ' '
387  aborted = ' '
388  error = ' ' # check
389  jobid = js['jobid']
390  if js['termination']:
391  finished = '\u2705' # check
392  error = '\u2705 ' # check
393 # if js['error']:
394 # finished = '\u2705'
395  if js['error'] and not js['abortion']:
396  error = '\u274C' # red x
397  if not js['abortion'] and not js['error']:
398  # search for finished.txt before moving job dir, guarantee that all everything finished
399  if os.path.isfile(j+'/finished.txt'):
400  sleep(1)
401  move(j,finished_dir)
402  else:
403  finished = '\u26A0\uFE0F ' # warning
404 # elif js['execution'] and not js['abortion'] and not js['error']:
405  elif js['execution'] and not js['abortion']:
406  running = '\u2705'
407  elif js['submission'] and not js['abortion']:
408  submitted = '\u2705'
409  if js['abortion']:
410  aborted = '\u274C ' # red cross
411 
412  if not ( js['submission'] or js['termination'] or js['execution'] or js['abortion'] ):
413  submitted = '\u26A0\uFE0F ' # warning
414  if js['abortion'] or ( js['termination'] and js['error']):
415  failed_jobs +=1
416  if failed_only and (not (js['abortion'] or ( js['termination'] and js['error']))):
417  continue
418  print(' '+jj+' '+finished+' '+running+' '+submitted+' '+aborted+' '+error+' '+jobid)
419  if len(jobs_dir) == 0:
420  print(' No jobs to be checked!')
421  print(dash)
422  if not failed_only:
423  print('\n N.B.: Good *finished* jobs will no longer appear in future "--status" calls')
424 
425  return failed_jobs
426 
427 # resubmit jobs according to their status
def status(submission_dir, failed_only=False)
Definition: naf_submit.py:362
def job_err(jobdir)
Definition: naf_submit.py:315
def get_job_status(jobdir)
Definition: naf_submit.py:331
def naf_submit.submission ( )

Definition at line 141 of file naf_submit.py.

References createCondorDirsSubmit(), createCondorSubmit(), createConfigParameter(), createJobScript(), getConfigParameter(), int, removeConfigParameter(), replaceConfigParameter(), and status().

142 
143  ntuples = args.ntuples
144  json = args.json
145  config = args.config
146  events_max = -1
147  if args.events_max:
148  events_max = args.events_max
149  test = args.njobs
150 
151  configMC = getConfigParameter( config, "isMC" )
152  isMC = configMC[1] == 'true'
153 
154  if test:
155  print('TEST MODE:', test, 'jobs')
156 
157  configNtuples = None
158  # get parameter from configuration
159  if config:
160  if not os.path.isfile(config):
161  print ("Configuration file does not exist")
162  quit()
163  configNtuples = getConfigParameter( config, "ntuplesList" )
164  if not ntuples:
165  if configNtuples:
166  ntuples = configNtuples[1]
167  if not ntuples:
168  print ("*error* You must define the parameter ntuplesList in your configuration.")
169  quit()
170  if not isMC:
171  configJson = getConfigParameter( config, "json" )
172  if not json:
173  if configJson:
174  json = configJson[1]
175  else:
176  json = None
177 
178  # checking if require files exist
179  if ntuples:
180  if not os.path.isfile(ntuples):
181  print ("Ntuples list file does not exist")
182  quit()
183  if json:
184  if (not 'tools:' in json) and (not os.path.isfile(json)):
185  print ("Json file does not exist")
186  quit()
187 
188  # directory where the jobs will be stored
189  maindir = "Condor_"+os.path.basename(args.exe)
190  if config:
191  maindir = maindir+"_"+ os.path.splitext(os.path.basename(config))[0]
192  if args.label:
193  maindir += '_'+args.label
194  cwd = os.getcwd()
195  if os.path.exists(cwd+"/"+maindir):
196  print ('')
197  print (bcolors.FAIL,'>>>>>>', maindir,"already exists. Rename or remove it and then resubmit",bcolors.ENDC)
198  status(maindir)
199  quit()
200  os.mkdir(maindir)
201  os.mkdir(maindir+'/finished_jobs')
202 
203  # splitting the file list
204  if ntuples:
205  pid = os.getpid()
206  tmpdir = ".tmp_" + str(pid)
207  os.mkdir(tmpdir)
208  copyfile(ntuples, tmpdir+"/"+os.path.basename(ntuples))
209  if config:
210  copyfile(config, tmpdir+"/"+os.path.basename(config))
211  os.chdir(tmpdir)
212  if config:
213  # replace json and ntuples in the local exe config by their basenames
214  if json:
215  if (not 'tools:' in json):
216  createConfigParameter(os.path.basename(config),'json')
217  replaceConfigParameter(os.path.basename(config), 'json', os.path.basename(json))
218  else:
219  removeConfigParameter(os.path.basename(config),'json')
220  # ntuples list
221  createConfigParameter(os.path.basename(config),'ntuplesList')
222  replaceConfigParameter(os.path.basename(config), 'ntuplesList', os.path.basename(ntuples))
223  if events_max:
224  replaceConfigParameter(os.path.basename(config), 'eventsMax', events_max)
225 
226 
227  splitcmd = "split.csh" + " " + str(args.nfiles) + " " + os.path.basename(ntuples)
228  os.system(splitcmd)
229  os.remove(os.path.basename(ntuples)) # not needed anymore after the splitting
230  files = glob.glob('.*_x????.txt')
231  files.sort()
232  os.chdir(cwd)
233 
234  # loop over the splitted files, each will correspond to a job on the NAF
235 
236  for i,f in enumerate(files):
237  if test:
238  if i >= int(test) and int(test)>=0:
239  os.chdir(cwd)
240  break
241  jobnum = os.path.splitext(f)[0][-4:]
242  jobid = "job_"+jobnum
243  exedir = maindir+"/"+jobid
244  os.mkdir(exedir)
245  # moving stuff to the proper directories
246  move(tmpdir+"/"+f,exedir+"/"+os.path.basename(ntuples))
247  if json:
248  if not 'tools:' in json:
249  copyfile(json, exedir+"/"+os.path.basename(json))
250  if config:
251  copyfile(tmpdir+"/"+os.path.basename(config),exedir+"/"+os.path.basename(config))
252  condorcmd = "condor_scripts.csh" + " " + jobid + " " + args.exe + " " + os.path.basename(config)
253  createCondorSubmit(cwd+'/'+exedir)
254  createJobScript(exedir,jobid,args.exe,os.path.basename(config))
255  else:
256  condorcmd = "condor_scripts.csh" + " " + jobid + " " + args.exe
257  # make the submissions
258  # os.chdir(exedir)
259  jobf = open(f'{exedir}/seed.txt', 'w+')
260  jobf.write(str(int(jobnum)+1))
261  # print >> jobf, int(jobnum)+1
262  jobf.close()
263  print ("Creating ",jobid,"...")
264  # os.system(condorcmd)
265  # if not test:
266  # os.chdir(exedir)
267  # os.system('condor_submit job.submit')
268  # sleep(0.2)
269  # back to original directory
270  # os.chdir(cwd)
271 
272  createCondorDirsSubmit(cwd+'/'+maindir)
273  if not test:
274  os.chdir(cwd+'/'+maindir)
275  os.system('condor_submit jobs.submit')
276 
277  sleep(2)
278  os.chdir(cwd)
279  status(maindir)
280 
281  else:
282  exedir = maindir+"/job_0000"
283  os.mkdir(exedir)
284  if os.path.isfile(args.exe):
285  copyfile(args.exe, exedir+"/"+os.path.basename(args.exe))
286  os.chdir(exedir)
287  jobf = open('./seed.txt', 'w+')
288  print >> jobf, 1
289  jobf.close()
290  condorcmd = "condor_scripts.csh job_0000" + " " + os.path.basename(args.exe)
291  os.system(condorcmd)
292  if not test:
293  os.system('condor_submit job.submit')
294  os.chdir(cwd)
295 
296  # remove the temporary directory
297  os.chdir(cwd)
298  if ntuples:
299  os.remove(tmpdir+"/"+os.path.basename(config))
300  rmtree(tmpdir)
301 
302 
303 # given the directory of a job_xxx return the most recent log file
def replaceConfigParameter(config, parameter, newpar)
Definition: naf_submit.py:53
def status(submission_dir, failed_only=False)
Definition: naf_submit.py:362
def createCondorDirsSubmit(maindir)
Definition: naf_submit.py:95
def submission()
Definition: naf_submit.py:141
def getConfigParameter(config, parameter)
Definition: naf_submit.py:13
def createJobScript(exedir, jobid, exe, cfg)
Definition: naf_submit.py:127
def createCondorSubmit(jobdir)
Definition: naf_submit.py:111
def removeConfigParameter(config, parameter)
Definition: naf_submit.py:73
def createConfigParameter(config, parameter)
Definition: naf_submit.py:33

Variable Documentation

naf_submit.action

Definition at line 508 of file naf_submit.py.

naf_submit.args = parser.parse_args()

Definition at line 511 of file naf_submit.py.

string naf_submit.dash = ' ----------------------------------------------------------------------------------------------------------'

Definition at line 513 of file naf_submit.py.

naf_submit.default

Definition at line 499 of file naf_submit.py.

naf_submit.dest

Definition at line 496 of file naf_submit.py.

naf_submit.help

Definition at line 496 of file naf_submit.py.

naf_submit.int

Definition at line 499 of file naf_submit.py.

Referenced by submission().

naf_submit.parser = ArgumentParser(prog='naf_submit.py', formatter_class=lambda prog: HelpFormatter(prog,indent_increment=6,max_help_position=80,width=280), description='Prepare, submit and check jobs to NAF HTCondor batch system',add_help=True)

Definition at line 493 of file naf_submit.py.

naf_submit.parser_status = parser.add_argument_group('status','show and modify status')

Definition at line 506 of file naf_submit.py.

naf_submit.parser_submission = parser.add_argument_group('submission','prepare and submit jobs')

Definition at line 494 of file naf_submit.py.

naf_submit.type

Definition at line 499 of file naf_submit.py.