Skip to content

Commit d7972d1

Browse files
authored
Merge pull request #500 from tmaeno/master
Merging OraDBProxy and JediDBProxy
2 parents 23407eb + 595719f commit d7972d1

24 files changed

+21681
-3
lines changed

PandaPkgInfo.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
release_version = "0.6.0"
1+
release_version = "0.6.1"

pandaserver/srvcore/CoreUtils.py

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
import copy
12
import datetime
23
import json
4+
import math
35
import os
46
import re
57
import subprocess
@@ -273,3 +275,215 @@ def as_python_object(dct):
273275
if "_datetime_object" in dct:
274276
return datetime.datetime.strptime(str(dct["_datetime_object"]), "%Y-%m-%d %H:%M:%S.%f")
275277
return dct
278+
279+
280+
# get effective file size
281+
def getEffectiveFileSize(fsize, startEvent, endEvent, nEvents):
282+
inMB = 1024 * 1024
283+
if fsize in [None, 0]:
284+
# use dummy size for pseudo input
285+
effectiveFsize = inMB
286+
elif nEvents is not None and startEvent is not None and endEvent is not None:
287+
# take event range into account
288+
effectiveFsize = int(float(fsize) * float(endEvent - startEvent + 1) / float(nEvents))
289+
else:
290+
effectiveFsize = fsize
291+
# use dummy size if input is too small
292+
if effectiveFsize == 0:
293+
effectiveFsize = inMB
294+
# in MB
295+
effectiveFsize = float(effectiveFsize) / inMB
296+
# return
297+
return effectiveFsize
298+
299+
300+
# get effective number of events
301+
def getEffectiveNumEvents(startEvent, endEvent, nEvents):
302+
if endEvent is not None and startEvent is not None:
303+
evtCounts = endEvent - startEvent + 1
304+
if evtCounts > 0:
305+
return evtCounts
306+
return 1
307+
if nEvents is not None:
308+
return nEvents
309+
return 1
310+
311+
312+
# get memory usage
313+
def getMemoryUsage():
314+
try:
315+
t = open(f"/proc/{os.getpid()}/status")
316+
v = t.read()
317+
t.close()
318+
value = 0
319+
for line in v.split("\n"):
320+
if line.startswith("VmRSS"):
321+
items = line.split()
322+
value = int(items[1])
323+
if items[2] in ["kB", "KB"]:
324+
value /= 1024
325+
elif items[2] in ["mB", "MB"]:
326+
pass
327+
break
328+
return int(value)
329+
except Exception:
330+
return None
331+
332+
333+
# check process
334+
def checkProcess(pid):
335+
return os.path.exists(f"/proc/{pid}/status")
336+
337+
338+
# offset for walltime
339+
wallTimeOffset = 10 * 60
340+
341+
342+
# convert config parameters
343+
def convert_config_params(itemStr):
344+
items = itemStr.split(":")
345+
newItems = []
346+
for item in items:
347+
if item == "":
348+
newItems.append(None)
349+
elif "," in item:
350+
newItems.append(item.split(","))
351+
else:
352+
try:
353+
newItems.append(int(item))
354+
except Exception:
355+
newItems.append(item)
356+
return newItems
357+
358+
359+
# parse init params
360+
def parse_init_params(par):
361+
if isinstance(par, list):
362+
return par
363+
try:
364+
return par.split("|")
365+
except Exception:
366+
return [par]
367+
368+
369+
# get config param for vo and prodSourceLabel
370+
def getConfigParam(configStr, vo, sourceLabel):
371+
try:
372+
for _ in configStr.split(","):
373+
items = configStr.split(":")
374+
vos = items[0].split("|")
375+
sourceLabels = items[1].split("|")
376+
if vo not in ["", "any"] and vo not in vos and None not in vos and "any" not in vos and "" not in vos:
377+
continue
378+
if (
379+
sourceLabel not in ["", "any"]
380+
and sourceLabel not in sourceLabels
381+
and None not in sourceLabels
382+
and "any" not in sourceLabels
383+
and "" not in sourceLabels
384+
):
385+
continue
386+
return ",".join(items[2:])
387+
except Exception:
388+
pass
389+
return None
390+
391+
392+
# get percentile until numpy 1.5.X becomes available
393+
def percentile(inList, percent, idMap):
394+
inList = sorted(copy.copy(inList))
395+
k = (len(inList) - 1) * float(percent) / 100
396+
f = math.floor(k)
397+
c = math.ceil(k)
398+
if f == c:
399+
retVal = inList[int(f)]
400+
return retVal, [retVal]
401+
val0 = inList[int(f)]
402+
val1 = inList[int(c)]
403+
d0 = val0 * (c - k)
404+
d1 = val1 * (k - f)
405+
retVal = d0 + d1
406+
return retVal, [val0, val1]
407+
408+
409+
# get max walltime and cpu count
410+
def getJobMaxWalltime(taskSpec, inputChunk, totalMasterEvents, jobSpec, siteSpec):
411+
try:
412+
if taskSpec.getCpuTime() is None:
413+
# use PQ maxtime when CPU time is not defined
414+
jobSpec.maxWalltime = siteSpec.maxtime
415+
jobSpec.maxCpuCount = siteSpec.maxtime
416+
else:
417+
jobSpec.maxWalltime = taskSpec.getCpuTime()
418+
if jobSpec.maxWalltime is not None and jobSpec.maxWalltime > 0:
419+
jobSpec.maxWalltime *= totalMasterEvents
420+
if siteSpec.coreCount > 0:
421+
jobSpec.maxWalltime /= float(siteSpec.coreCount)
422+
if siteSpec.corepower not in [0, None]:
423+
jobSpec.maxWalltime /= siteSpec.corepower
424+
if taskSpec.cpuEfficiency not in [None, 0]:
425+
jobSpec.maxWalltime /= float(taskSpec.cpuEfficiency) / 100.0
426+
if taskSpec.baseWalltime is not None:
427+
jobSpec.maxWalltime += taskSpec.baseWalltime
428+
jobSpec.maxWalltime = int(jobSpec.maxWalltime)
429+
if taskSpec.useHS06():
430+
jobSpec.maxCpuCount = jobSpec.maxWalltime
431+
except Exception:
432+
pass
433+
434+
435+
# use direct IO for job
436+
def use_direct_io_for_job(task_spec, site_spec, input_chunk):
437+
# not for merging
438+
if input_chunk and input_chunk.isMerging:
439+
return False
440+
# always
441+
if site_spec.always_use_direct_io():
442+
return True
443+
# force copy-to-scratch
444+
if task_spec.useLocalIO():
445+
return False
446+
# depends on task and site specs
447+
if task_spec.allowInputLAN() is not None and site_spec.isDirectIO():
448+
return True
449+
return False
450+
451+
452+
# stopwatch
453+
class StopWatch:
454+
"""Utility class to measure timing information."""
455+
456+
def __init__(self, identifier: str = None):
457+
self.start_time = datetime.datetime.now()
458+
self.checkpoint = self.start_time
459+
self.step_name = None
460+
self.identifier = identifier
461+
462+
def reset(self):
463+
"""Reset the stopwatch."""
464+
self.start_time = datetime.datetime.now()
465+
self.checkpoint = self.start_time
466+
self.step_name = None
467+
468+
def get_elapsed_time(self, new_step_name: str) -> str:
469+
"""Get the elapsed time since the stopwatch was started and the duration since the last checkpoint.
470+
:param new_step_name: The name of the next step.
471+
Returns:
472+
str: A string with the elapsed time and the duration since the last checkpoint.
473+
"""
474+
now = datetime.datetime.now()
475+
total_delta = now - self.start_time
476+
duration_delta = now - self.checkpoint
477+
return_str = ""
478+
if self.identifier:
479+
return_str += f"{self.identifier}: "
480+
return_str += f"elapsed {total_delta.seconds}.{int(total_delta.microseconds / 1000):03d} sec. "
481+
if self.step_name is not None:
482+
return_str += f"{self.step_name} took {duration_delta.seconds}.{int(duration_delta.microseconds / 1000):03d} sec. "
483+
if new_step_name:
484+
return_str += f"{new_step_name} started."
485+
else:
486+
return_str += "done."
487+
self.checkpoint = now
488+
self.step_name = new_step_name
489+
return return_str

0 commit comments

Comments
 (0)