26
26
27
27
from .command_line_tool import CallbackJob , ExpressionJob
28
28
from .context import RuntimeContext , getdefault
29
+ from .cuda import cuda_version_and_device_count
29
30
from .cwlprov .provenance_profile import ProvenanceProfile
30
31
from .errors import WorkflowException
31
32
from .job import JobBase
@@ -269,16 +270,22 @@ def __init__(self) -> None:
269
270
270
271
self .max_ram = int (psutil .virtual_memory ().available / 2 ** 20 )
271
272
self .max_cores = float (psutil .cpu_count ())
273
+ self .max_cuda = cuda_version_and_device_count ()[1 ]
272
274
self .allocated_ram = float (0 )
273
275
self .allocated_cores = float (0 )
276
+ self .allocated_cuda : int = 0
274
277
275
278
def select_resources (
276
279
self , request : Dict [str , Union [int , float ]], runtime_context : RuntimeContext
277
280
) -> Dict [str , Union [int , float ]]: # pylint: disable=unused-argument
278
281
"""Naïve check for available cpu cores and memory."""
279
282
result : Dict [str , Union [int , float ]] = {}
280
283
maxrsc = {"cores" : self .max_cores , "ram" : self .max_ram }
281
- for rsc in ("cores" , "ram" ):
284
+ resources_types = {"cores" , "ram" }
285
+ if "cudaDeviceCountMin" in request or "cudaDeviceCountMax" in request :
286
+ maxrsc ["cudaDeviceCount" ] = self .max_cuda
287
+ resources_types .add ("cudaDeviceCount" )
288
+ for rsc in resources_types :
282
289
rsc_min = request [rsc + "Min" ]
283
290
if rsc_min > maxrsc [rsc ]:
284
291
raise WorkflowException (
@@ -293,9 +300,6 @@ def select_resources(
293
300
result ["tmpdirSize" ] = math .ceil (request ["tmpdirMin" ])
294
301
result ["outdirSize" ] = math .ceil (request ["outdirMin" ])
295
302
296
- if "cudaDeviceCount" in request :
297
- result ["cudaDeviceCount" ] = request ["cudaDeviceCount" ]
298
-
299
303
return result
300
304
301
305
def _runner (
@@ -326,6 +330,10 @@ def _runner(
326
330
self .allocated_ram -= ram
327
331
cores = job .builder .resources ["cores" ]
328
332
self .allocated_cores -= cores
333
+ cudaDevices : int = cast (
334
+ int , job .builder .resources .get ("cudaDeviceCount" , 0 )
335
+ )
336
+ self .allocated_cuda -= cudaDevices
329
337
runtime_context .workflow_eval_lock .notify_all ()
330
338
331
339
def run_job (
@@ -349,34 +357,43 @@ def run_job(
349
357
if isinstance (job , JobBase ):
350
358
ram = job .builder .resources ["ram" ]
351
359
cores = job .builder .resources ["cores" ]
352
- if ram > self .max_ram or cores > self .max_cores :
360
+ cudaDevices = cast (int , job .builder .resources .get ("cudaDeviceCount" , 0 ))
361
+ if ram > self .max_ram or cores > self .max_cores or cudaDevices > self .max_cuda :
353
362
_logger .error (
354
363
'Job "%s" cannot be run, requests more resources (%s) '
355
- "than available on this host (max ram %d, max cores %d" ,
364
+ "than available on this host (already allocated ram is %d, "
365
+ "allocated cores is %d, allocated CUDA is %d, "
366
+ "max ram %d, max cores %d, max CUDA %d)." ,
356
367
job .name ,
357
368
job .builder .resources ,
358
369
self .allocated_ram ,
359
370
self .allocated_cores ,
371
+ self .allocated_cuda ,
360
372
self .max_ram ,
361
373
self .max_cores ,
374
+ self .max_cuda ,
362
375
)
363
376
self .pending_jobs .remove (job )
364
377
return
365
378
366
379
if (
367
380
self .allocated_ram + ram > self .max_ram
368
381
or self .allocated_cores + cores > self .max_cores
382
+ or self .allocated_cuda + cudaDevices > self .max_cuda
369
383
):
370
384
_logger .debug (
371
385
'Job "%s" cannot run yet, resources (%s) are not '
372
386
"available (already allocated ram is %d, allocated cores is %d, "
373
- "max ram %d, max cores %d" ,
387
+ "allocated CUDA devices is %d, "
388
+ "max ram %d, max cores %d, max CUDA %d)." ,
374
389
job .name ,
375
390
job .builder .resources ,
376
391
self .allocated_ram ,
377
392
self .allocated_cores ,
393
+ self .allocated_cuda ,
378
394
self .max_ram ,
379
395
self .max_cores ,
396
+ self .max_cuda ,
380
397
)
381
398
n += 1
382
399
continue
@@ -386,6 +403,8 @@ def run_job(
386
403
self .allocated_ram += ram
387
404
cores = job .builder .resources ["cores" ]
388
405
self .allocated_cores += cores
406
+ cuda = cast (int , job .builder .resources .get ("cudaDevices" , 0 ))
407
+ self .allocated_cuda += cuda
389
408
self .taskqueue .add (
390
409
functools .partial (self ._runner , job , runtime_context , TMPDIR_LOCK ),
391
410
runtime_context .workflow_eval_lock ,
0 commit comments