diff --git a/NanoGardener/python/framework/PostProcMaker.py b/NanoGardener/python/framework/PostProcMaker.py index 733112c8f..57ff86a6c 100644 --- a/NanoGardener/python/framework/PostProcMaker.py +++ b/NanoGardener/python/framework/PostProcMaker.py @@ -342,15 +342,22 @@ def submitJobs(self,iProd,iStep): if not self._iniStep == 'Prod' : bpostFix='____'+self._iniStep # Make job directories + ## remember: py files will be created in EOS (not in afs for space limitation grr) if JOB_DIR_SPLIT : jDir = jobDir+'/NanoGardening__'+iProd+'__'+iStep + jDirPy = jobDirPy+'/NanoGardening__'+iProd+'__'+iStep for iSample in self._targetDic : if not os.path.exists(jDir+'/'+iSample) : os.system('mkdir -p '+jDir+'/'+iSample) + if not os.path.exists(jDirPy+'/'+iSample) : os.system('mkdir -p '+jDirPy+'/'+iSample) else: jDir = jobDir+'/NanoGardening__'+iProd + jDirPy = jobDirPy+'/NanoGardening__'+iProd if not os.path.exists(jDir) : os.system('mkdir -p '+jDir) + if not os.path.exists(jDirPy) : os.system('mkdir -p '+jDirPy) wDir = workDir+'/NanoGardening__'+iProd + wDirPy = workDirPy+'/NanoGardening__'+iProd if not os.path.exists(wDir) : os.system('mkdir -p '+wDir) + if not os.path.exists(wDirPy) : os.system('mkdir -p '+wDirPy) # prepare targetList targetList = [] @@ -402,9 +409,9 @@ def submitJobs(self,iProd,iStep): if iTarget in targetList : # Create python if JOB_DIR_SPLIT : - pyFile=jDir+'/'+iSample+'/NanoGardening__'+iProd+'__'+iStep+'__'+iTarget+bpostFix+'.py' + pyFile=jDirPy+'/'+iSample+'/NanoGardening__'+iProd+'__'+iStep+'__'+iTarget+bpostFix+'.py' else: - pyFile=jDir+'/NanoGardening__'+iProd+'__'+iStep+'__'+iTarget+bpostFix+'.py' + pyFile=jDirPy+'/NanoGardening__'+iProd+'__'+iStep+'__'+iTarget+bpostFix+'.py' if os.path.isfile(pyFile) : os.system('rm '+pyFile) outFile=self._treeFilePrefix+iTarget+'__'+iStep+'.root' jsonFilter = self._Productions[iProd]['jsonFile'] if 'jsonFile' in self._Productions[iProd].keys() else None diff --git a/ShapeAnalysis/python/ShapeFactoryMulti.py b/ShapeAnalysis/python/ShapeFactoryMulti.py index 0cfbd0663..5ce21f53d 100644 --- a/ShapeAnalysis/python/ShapeFactoryMulti.py +++ b/ShapeAnalysis/python/ShapeFactoryMulti.py @@ -83,14 +83,14 @@ def makeNominals(self, inputDir, outputDir, variables, cuts, samples, nuisances, for line in linesToAdd: ROOT.gROOT.ProcessLineSync(line) - print " supercut = ", supercut + # print " supercut = ", supercut if number != 99999 : outputFileName = outputDir+'/plots_'+self._tag+"_"+str(number)+".root" else : outputFileName = outputDir+'/plots_'+self._tag+".root" - print " outputFileName = ", outputFileName + # print " outputFileName = ", outputFileName os.system ("mkdir -p " + outputDir + "/") ROOT.TH1.SetDefaultSumw2(True) @@ -105,7 +105,7 @@ def makeNominals(self, inputDir, outputDir, variables, cuts, samples, nuisances, print '' print ' ' for cutName, cut in self._cuts.iteritems(): - print "cut = ", cutName, " :: ", self._cuts[cutName] + # print "cut = ", cutName, " :: ", self._cuts[cutName] if type(cut) is dict and 'categories' in cut: for catname in cut['categories']: outFile.mkdir(cutName + '_' + catname) @@ -132,11 +132,11 @@ def makeNominals(self, inputDir, outputDir, variables, cuts, samples, nuisances, line += variable['class'] elif 'tree' in variable: line += 'tree (%d branches)' % len(variable['tree']) - print line - if 'range' in variable: - print " range:", variable['range'] - if 'samples' in variable: - print " samples:", variable['samples'] + # print line + # if 'range' in variable: + # print " range:", variable['range'] + # if 'samples' in variable: + # print " samples:", variable['samples'] print '' print ' ' @@ -145,10 +145,10 @@ def makeNominals(self, inputDir, outputDir, variables, cuts, samples, nuisances, line = " nuisance = " + nuisanceName if 'name' in nuisance: line += " :: " + nuisance['name'] - print line - if 'kind' in nuisance: - print " kind:", nuisance['kind'] - print " type:", nuisance['type'] + # print line + # if 'kind' in nuisance: + # print " kind:", nuisance['kind'] + # print " type:", nuisance['type'] if nuisanceName == "stat": for item in nuisance["samples"].itervalues(): @@ -167,8 +167,8 @@ def makeNominals(self, inputDir, outputDir, variables, cuts, samples, nuisances, # One MultiDraw per sample = tree for sampleName, sample in self._samples.iteritems(): - print " sample =", sampleName - print " name:", sample['name'] + # print " sample =", sampleName + # print " name:", sample['name'] #print " weight:", sample['weight'] if 'outputFormat' in sample: @@ -1173,9 +1173,9 @@ def _connectInputs(self, process, filenames, inputDir, skipMissingFiles, friends if "sdfarm" in os.uname()[1]: inputDir = inputDir.replace("xrootd","xrd") - print " connectInputs from", inputDir + # print " connectInputs from", inputDir - print ' (%d files)' % len(filenames) + # print ' (%d files)' % len(filenames) drawer = ROOT.multidraw.MultiDraw(self._treeName) drawer.setWeightBranch('') @@ -1437,6 +1437,27 @@ def _make_reweight(weight): def postprocess_nuisance_variations(nuisance, samples, cuts, variables, outFile): twosided = ('OneSided' not in nuisance or not nuisance['OneSided']) + total_folders = 0 + for cutName, cut in cuts.iteritems(): + if 'cuts' in nuisance and cutName not in nuisance['cuts']: + continue + + if 'categories' in cut: + catsuffixes = ['_' + catname for catname in cut['categories']] + else: + catsuffixes = [''] + + for catsuffix in catsuffixes: + for variableName, variable in variables.iteritems(): + if 'tree' in variable: + continue + + if 'cuts' in variable and cutName not in variable['cuts']: + continue + + total_folders += 1 + + processed_folders = 0 for cutName, cut in cuts.iteritems(): if 'cuts' in nuisance and cutName not in nuisance['cuts']: continue @@ -1457,6 +1478,8 @@ def postprocess_nuisance_variations(nuisance, samples, cuts, variables, outFile) dname = cutName + catsuffix + '/' + variableName outDir = outFile.GetDirectory(dname) outDir.cd() + processed_folders += 1 + print('Processing folder %d/%d' % (processed_folders, total_folders)) for sampleName, sample in samples.iteritems(): if sampleName not in nuisance['samples']: diff --git a/ShapeAnalysis/scripts/mkShapesMulti.py b/ShapeAnalysis/scripts/mkShapesMulti.py index d8ce53d23..381a681c6 100755 --- a/ShapeAnalysis/scripts/mkShapesMulti.py +++ b/ShapeAnalysis/scripts/mkShapesMulti.py @@ -239,7 +239,8 @@ def makeTargetList(options, samples): parser.add_option('--batchQueue' , dest='batchQueue' , help='Queue on batch' , default='') parser.add_option('--batchSplit' , dest="batchSplit" , help="Splitting mode for batch jobs" , default=[], type='string' , action='callback' , callback=list_maker('batchSplit',',')) parser.add_option('--doHadd' , dest='doHadd' , help='Hadd for batch mode' , default=False) - parser.add_option('--redoStat' , dest='redoStat' , help='redo stat uncertainty' , default=False) + parser.add_option('--redoStat' , dest='redoStat' , help='redo stat uncertainty' , default=False) + parser.add_option('--Silent' , dest='silent' , help='do not write logs, errs and outs condor files' , action='store_false', default=True) parser.add_option('--doThreads' , dest='doThreads' , help='switch to multi-threading mode' , default=False) parser.add_option('--nThreads' , dest='numThreads' , help='number of threads for multi-threading' , default=1, type='int') parser.add_option('--doNotCleanup' , dest='doNotCleanup' , help='do not remove additional support files' , action='store_true', default=False) @@ -383,7 +384,7 @@ def makeTargetList(options, samples): else : use_singularity = False bpostFix='' - jobs = batchJobs('mkShapes',opt.tag,stepList,targetList,','.join(batchSplit),bpostFix,JOB_DIR_SPLIT_READY=True,USE_SINGULARITY=use_singularity) + jobs = batchJobs('mkShapes',opt.tag,stepList,targetList,','.join(batchSplit),bpostFix,JOB_DIR_SPLIT_READY=True,USE_SINGULARITY=use_singularity,makeout=opt.silent) jobs.nThreads = nThreads jobs.AddPy2Sh() diff --git a/Tools/python/batchTools.py b/Tools/python/batchTools.py index 99bc886d3..7d93414ea 100644 --- a/Tools/python/batchTools.py +++ b/Tools/python/batchTools.py @@ -56,10 +56,16 @@ def __init__ (self,baseName,prodName,stepList,targetList,batchSplit,postFix='',u StepName = '' for iStep in stepList : StepName+=iStep self.subDir = jobDir+'/'+baseName+'__'+prodName+'__'+StepName + self.subDirPy = jobDirPy+'/'+baseName+'__'+prodName+'__'+StepName else: self.subDir = jobDir+'/'+baseName+'__'+prodName + self.subDirPy = jobDirPy+'/'+baseName+'__'+prodName + if not os.path.exists(jobDir) : os.system('mkdir -p '+jobDir) + if not os.path.exists(jobDirPy) : os.system('mkdir -p '+jobDirPy) + self.nThreads = 1 + self.makeout = makeout #print stepList #print batchSplit @@ -95,16 +101,18 @@ def __init__ (self,baseName,prodName,stepList,targetList,batchSplit,postFix='',u # Create job and init files (loop on Steps,Targets) if not os.path.exists(self.subDir) : os.system('mkdir -p '+self.subDir) + if not os.path.exists(self.subDirPy) : os.system('mkdir -p '+self.subDirPy) CMSSW=os.environ["CMSSW_BASE"] SCRAMARCH=os.environ["SCRAM_ARCH"] for jName in self.jobsList: if JOB_DIR_SPLIT and self.JOB_DIR_SPLIT_READY : subDirExtra = '/' + jName.split('__')[3] if not os.path.exists(self.subDir+subDirExtra) : os.system('mkdir -p '+self.subDir+subDirExtra) + if not os.path.exists(self.subDirPy+subDirExtra) : os.system('mkdir -p '+self.subDirPy+subDirExtra) else: subDirExtra ='' jFile = open(self.subDir+subDirExtra+'/'+jName+'.sh','w') - if usePython : pFile = open(self.subDir+subDirExtra+'/'+jName+'.py','w') + if usePython : pFile = open(self.subDirPy+subDirExtra+'/'+jName+'.py','w') jFile.write('#!/bin/bash\n') if self.USE_SINGULARITY : jFileSing = open(self.subDir+subDirExtra+'/'+jName+'_Sing.sh','w') @@ -116,6 +124,7 @@ def __init__ (self,baseName,prodName,stepList,targetList,batchSplit,postFix='',u jFile.write('#$ -cwd\n') jFile.write('export X509_USER_PROXY=/afs/cern.ch/user/'+os.environ["USER"][:1]+'/'+os.environ["USER"]+'/.proxy\n') + jFile.write('export EOS_MGM_URL=root://eoscms.cern.ch\n') if 'latino' in hostName: jFile.write('export X509_USER_PROXY=/eos/user/'+os.environ["USER"][:1]+'/'+os.environ["USER"]+'/.proxy\n') jFile.write('export EOS_MGM_URL=root://eoscms.cern.ch\n') @@ -262,7 +271,7 @@ def InitPy (self,command): subDirExtra = '/' + jName.split('__')[3] else: subDirExtra ='' - pFile = open(self.subDir+subDirExtra+'/'+jName+'.py','a') + pFile = open(self.subDirPy+subDirExtra+'/'+jName+'.py','a') pFile.write(command+'\n') pFile.close() @@ -273,7 +282,10 @@ def AddPy2Sh(self): else: subDirExtra ='' jFile = open(self.subDir+subDirExtra+'/'+jName+'.sh','a') - command = 'python '+self.subDir+subDirExtra+'/'+jName+'.py' + command = 'xrdcp root://eosuser.cern.ch/'+self.subDirPy+subDirExtra+'/'+jName+'.py .' + jFile.write(command+'\n') + jFile.write('ls -l'+'\n') + command = 'python '+jName+'.py' jFile.write(command+'\n') jFile.close() @@ -283,8 +295,8 @@ def AddPy (self,iStep,iTarget,command): subDirExtra = '/' + jName.split('__')[3] else: subDirExtra ='' - print 'Adding to ',self.subDir+subDirExtra+'/'+jName - pFile = open(self.subDir+subDirExtra+'/'+jName+'.py','a') + print 'Adding to ',self.subDirPy+subDirExtra+'/'+jName + pFile = open(self.subDirPy+subDirExtra+'/'+jName+'.py','a') pFile.write(command+'\n') pFile.close() @@ -294,7 +306,7 @@ def GetPyName (self,iStep,iTarget) : subDirExtra = '/' + jName.split('__')[3] else: subDirExtra ='' - return self.subDir+subDirExtra+'/'+jName+'.py' + return self.subDirPy+subDirExtra+'/'+jName+'.py' def Sub(self,queue='longlunch',IiheWallTime='168:00:00',optTodo=False): # Submit host name to identify the environment @@ -344,15 +356,21 @@ def Sub(self,queue='longlunch',IiheWallTime='168:00:00',optTodo=False): jdsFile.write('executable = '+self.subDir+subDirExtra+'/'+jName+'.sh\n') jdsFile.write('universe = vanilla\n') #jdsFile.write('use_x509userproxy = true\n') - jdsFile.write('output = '+self.subDir+subDirExtra+'/'+jName+'.out\n') - jdsFile.write('error = '+self.subDir+subDirExtra+'/'+jName+'.err\n') - jdsFile.write('log = '+self.subDir+subDirExtra+'/'+jName+'.log\n') + if self.makeout: + jdsFile.write('output = '+self.subDir+subDirExtra+'/'+jName+'.out\n') + jdsFile.write('error = '+self.subDir+subDirExtra+'/'+jName+'.err\n') + jdsFile.write('log = '+self.subDir+subDirExtra+'/'+jName+'.log\n') + else: + jdsFile.write('output = /dev/null\n') + jdsFile.write('error = /dev/null\n') + jdsFile.write('log = /dev/null\n') if CONDOR_ACCOUNTING_GROUP: jdsFile.write('+AccountingGroup = '+CONDOR_ACCOUNTING_GROUP+'\n') jdsFile.write('accounting_group = '+CONDOR_ACCOUNTING_GROUP+'\n') if AUTO_CONDOR_RETRY: jdsFile.write('on_exit_hold = (ExitBySignal == True) || (ExitCode != 0)\n') jdsFile.write('periodic_release = (NumJobStarts < 3) && ((CurrentTime - EnteredCurrentStatus) > (60*3))\n') + jdsFile.write('MY.WantOS = "el7"\n') jdsFile.write('request_cpus = '+str(self.nThreads)+'\n') jdsFile.write('+JobFlavour = "'+queue+'"\n') jdsFile.write('queue\n') @@ -466,9 +484,15 @@ def Sub(self,queue='longlunch',IiheWallTime='168:00:00',optTodo=False): jds = 'executable = $(JName).sh\n' jds += 'universe = vanilla\n' - jds += 'output = $(JName).out\n' - jds += 'error = $(JName).err\n' - jds += 'log = $(JName).log\n' + if self.makeout: + jds += 'output = $(JName).out\n' + jds += 'error = $(JName).err\n' + jds += 'log = $(JName).log\n' + else: + jds += 'output = /dev/null\n' + jds += 'error = /dev/null\n' + jds += 'log = /dev/null\n' + jds += 'MY.WantOS = "el7"\n' #jds += 'use_x509userproxy = true\n' jds += 'request_cpus = '+str(self.nThreads)+'\n' if CONDOR_ACCOUNTING_GROUP: @@ -714,9 +738,14 @@ def batchResub(Dir='ALL',queue='longlunch',requestCpus=1,IiheWallTime='168:00:00 jdsFile = open(subDir+'/'+jName+'.jds','w') jdsFile.write('executable = '+subDir+'/'+jName+'.sh\n') jdsFile.write('universe = vanilla\n') - jdsFile.write('output = '+subDir+'/'+jName+'.out\n') - jdsFile.write('error = '+subDir+'/'+jName+'.err\n') - jdsFile.write('log = '+subDir+'/'+jName+'.log\n') + if self.makeout: + jdsFile.write('output = '+subDir+'/'+jName+'.out\n') + jdsFile.write('error = '+subDir+'/'+jName+'.err\n') + jdsFile.write('log = '+subDir+'/'+jName+'.log\n') + else: + jdsFile.write('output = /dev/null\n') + jdsFile.write('error = /dev/null\n') + jdsFile.write('log = /dev/null\n') jdsFile.write('request_cpus = '+str(requestCpus)+'\n') if CONDOR_ACCOUNTING_GROUP: jdsFile.write('+AccountingGroup = '+CONDOR_ACCOUNTING_GROUP+'\n') @@ -792,9 +821,14 @@ def batchResub(Dir='ALL',queue='longlunch',requestCpus=1,IiheWallTime='168:00:00 if scheduler == 'condor': jds = 'executable = '+subDir+'/$(JName).sh\n' jds += 'universe = vanilla\n' - jds += 'output = '+subDir+'/$(JName).out\n' - jds += 'error = '+subDir+'/$(JName).err\n' - jds += 'log = '+subDir+'/$(JName).log\n' + if self.makeout: + jds += 'output = '+subDir+'/$(JName).out\n' + jds += 'error = '+subDir+'/$(JName).err\n' + jds += 'log = '+subDir+'/$(JName).log\n' + else: + jds += 'output = /dev/null\n' + jds += 'error = /dev/null\n' + jds += 'log = /dev/null\n' jds += 'request_cpus = '+str(requestCpus)+'\n' jds += '+JobFlavour = "'+queue+'"\n' if CONDOR_ACCOUNTING_GROUP: diff --git a/Tools/python/userConfig_TEMPLATE.py b/Tools/python/userConfig_TEMPLATE.py index 7018571b3..1119f671c 100644 --- a/Tools/python/userConfig_TEMPLATE.py +++ b/Tools/python/userConfig_TEMPLATE.py @@ -2,4 +2,9 @@ baseDir = '/afs/cern.ch/user/x/xjanssen/cms/HWW2015/' jobDir = baseDir+'jobs/' workDir = baseDir+'workspace/' + +baseDirPy = '/eos/user/a/abulla/private/' +jobDirPy = baseDirPy+'jobs/' +workDirPy = baseDirPy+'workspace/' + jobDirSplit = True diff --git a/Tools/scripts/haddfast b/Tools/scripts/haddfast index 57f4aef92..35df9f134 100755 --- a/Tools/scripts/haddfast +++ b/Tools/scripts/haddfast @@ -30,7 +30,7 @@ def merge(indir, outdir, pname=''): nnew = 0 nadd = 0 - LOG.debug(pname + '/' + indir.GetName()) + LOG.debug('Merging from %s', pname + '/' + indir.GetName()) # Collect all existing keys in outdir into a dict to avoid calling outdir.Get() (slow) # Pick up only the latest versions (highest key cycle number) @@ -64,37 +64,41 @@ def merge(indir, outdir, pname=''): for name, key in inkeys.iteritems(): obj = key.ReadObj() - if obj.IsA() is tdirectoryfile: - # If the input object is a directory, recurse + try: + if obj.IsA() is tdirectoryfile: + # If the input object is a directory, recurse - try: - outsubdir = outcont[name] - except KeyError: - outsubdir = outdir.mkdir(name) + try: + outsubdir = outcont[name] + except KeyError: + outsubdir = outdir.mkdir(name) - nsubnew, nsubadd = merge(obj, outsubdir, pname + '/' + indir.GetName()) - nnew += nsubnew - nadd += nsubadd + nsubnew, nsubadd = merge(obj, outsubdir, pname + '/' + indir.GetName()) + nnew += nsubnew + nadd += nsubadd - else: - # Write to outdir or add to an existing histogram - - outdir.cd() - try: - outhist = outcont[name] - except KeyError: - obj.SetDirectory(outdir) - obj.Write() - nnew += 1 else: - outhist.Add(obj) - outhist.Write(name) - outhist.Delete() - nadd += 1 + # Write to outdir or add to an existing histogram + + outdir.cd() + try: + outhist = outcont[name] + except KeyError: + obj.SetDirectory(outdir) + obj.Write() + nnew += 1 + else: + outhist.Add(obj) + outhist.Write(name) + outhist.Delete() + nadd += 1 - # Delete the object from memory immediately - # Using TObject::Delete because TDirectory::Delete does different things - ROOT.TObject.Delete(obj) + # Delete the object from memory immediately + ROOT.TObject.Delete(obj) + + except Exception as e: + LOG.error('Error processing object %s in %s: %s', name, pname, e) + raise # Re-raise the exception after logging return nnew, nadd @@ -104,63 +108,73 @@ def writeto(sourcePaths, targetPath, eosDownload=False): Target is closed after processing each source to clean memory. """ - LOG.info('merge %s -> %s', sourcePaths, targetPath) + LOG.info('Starting merge from %s to %s', sourcePaths, targetPath) target = ROOT.TFile.Open(targetPath, 'recreate') - # This is critical (and safe) - see https://root-forum.cern.ch/t/tfile-close-slow/24179 ROOT.gROOT.GetListOfFiles().Remove(target) _nadd = 0 for path in sourcePaths: - pathOrig = path - pathReal = os.path.realpath(pathOrig) - if eosDownload and pathReal.startswith('/eos'): - for _ in range(5): - with tempfile.NamedTemporaryFile(suffix='.root', delete=False) as tmp: - pass - proc = subprocess.Popen(['xrdcp', '-f', 'root://eoscms.cern.ch/' + pathReal, tmp.name]) - proc.communicate() - if proc.returncode == 0: - path = tmp.name - break - else: - try: - os.unlink(tmp.name) - except: + LOG.info('Processing file: %s', path) # Log the current file being processed + + try: + pathOrig = path + pathReal = os.path.realpath(pathOrig) + + if eosDownload and pathReal.startswith('/eos'): + for _ in range(5): + with tempfile.NamedTemporaryFile(suffix='.root', delete=False) as tmp: pass - time.sleep(5) - else: - raise RuntimeError('Failed to download ' + pathOrig) + proc = subprocess.Popen(['xrdcp', '-f', 'root://eoscms.cern.ch/' + pathReal, tmp.name]) + proc.communicate() + if proc.returncode == 0: + path = tmp.name + break + else: + try: + os.unlink(tmp.name) + except Exception as e: + LOG.error('Error deleting temp file %s: %s', tmp.name, e) + time.sleep(5) + else: + LOG.error('Failed to download %s', pathOrig) + raise RuntimeError('Failed to download ' + pathOrig) - start = time.time() - source = ROOT.TFile.Open(path) - ROOT.gROOT.GetListOfFiles().Remove(source) + start = time.time() + source = ROOT.TFile.Open(path) + ROOT.gROOT.GetListOfFiles().Remove(source) - nnew, nadd = merge(source, target) + nnew, nadd = merge(source, target) - source.Close() - target.Close() # closing target at each iteration to flush out in-memory objects + source.Close() + target.Close() # Closing target to flush out in-memory objects - LOG.info('%s -> %s: %d new, %d merged (%.1f s)', pathOrig, targetPath, nnew, nadd, time.time() - start) + LOG.info('Finished processing %s: %d new, %d merged (%.1f s)', pathOrig, nnew, nadd, time.time() - start) + + _nadd += nadd + if pathOrig != sourcePaths[-1]: + if _nadd > 1000000: + # Purge duplicate keys by compressing + os.rename(targetPath, targetPath + '.tmp') + writeto([targetPath + '.tmp'], targetPath) + os.unlink(targetPath + '.tmp') + _nadd = 0 + + target = ROOT.TFile.Open(targetPath, 'update') + ROOT.gROOT.GetListOfFiles().Remove(target) + + if eosDownload and pathReal.startswith('/eos'): + try: + os.unlink(path) + except Exception as e: + LOG.error('Error deleting file %s: %s', path, e) + + except Exception as e: + LOG.error('Error processing file %s: %s', path, e) + raise # Re-raise the exception after logging - _nadd += nadd - if pathOrig != sourcePaths[-1]: - if _nadd > 1000000: - # purge duplicate keys by compressing - os.rename(targetPath, targetPath + '.tmp') - writeto([targetPath + '.tmp'], targetPath) - os.unlink(targetPath + '.tmp') - _nadd = 0 - - target = ROOT.TFile.Open(targetPath, 'update') - ROOT.gROOT.GetListOfFiles().Remove(target) - if eosDownload and pathReal.startswith('/eos'): - try: - os.unlink(path) - except: - pass if __name__ == '__main__': sys.argv = _argv @@ -188,6 +202,7 @@ if __name__ == '__main__': sourcePaths.extend(glob.glob(path)) else: sourcePaths.append(path) + total_files = len(sourcePaths) # Count the total number of input files if args.writeDirect and not args.compress: targetName = args.target @@ -210,8 +225,10 @@ if __name__ == '__main__': proc.join() if proc.exitcode != 0: + LOG.error('Merge failed for process %s.', proc.name) raise RuntimeError('Merge failed.') + LOG.info('Process %s completed.', proc.name) processes.remove((proc, targetPath)) allSources.append((targetPath, True)) @@ -231,9 +248,17 @@ if __name__ == '__main__': if len(allSources) > 1: jobid = 'proc%d' % ijob - - sources[jobid] = [allSources.pop(), allSources.pop()] - sourcePaths = [s[0] for s in sources[jobid]] + + # Log the list of remaining files before starting a job + remaining_files = len(allSources) + LOG.info('Remaining files: %d/%d', remaining_files, total_files) + + # Select and log the files for the current job + selected_files = [allSources.pop(0), allSources.pop(0)] + LOG.info('Starting job %s with sources %s', jobid, [s[0] for s in selected_files]) + + sources[jobid] = selected_files + sourcePaths = [s[0] for s in selected_files] dtemp = tempfile.mkdtemp() targetPath = dtemp + '/tmp.root' proc = multiprocessing.Process(target=writeto, args=(sourcePaths, targetPath, args.eosDownload), name=jobid) @@ -253,7 +278,7 @@ if __name__ == '__main__': shutil.rmtree(os.path.dirname(allSources[0][0])) else: shutil.copyfile(allSources[0][0], targetName) - + if args.compress: LOG.info('Compressing') writeto([targetName], args.target) @@ -265,3 +290,5 @@ if __name__ == '__main__': shutil.move(targetName, args.target) shutil.rmtree(os.path.dirname(targetName)) + +