@@ -311,16 +311,29 @@ function _runtests_in_current_env(
311
311
end
312
312
end
313
313
elseif ! isempty (testitems. testitems)
314
- # spawn a task per worker to start and manage the lifetime of the worker
315
- # get starting test items for each worker
314
+ # Use the logger that was set before we eval'd any user code to avoid world age
315
+ # issues when logging https://github.com/JuliaLang/julia/issues/33865
316
+ original_logger = current_logger ()
317
+ # Wait for all workers to be started so we can throw as soon as possible if
318
+ # we were unable to start the requested number of workers
319
+ @info " Starting test workers"
320
+ workers = Vector {Worker} (undef, nworkers)
321
+ ntestitems = length (testitems. testitems)
322
+ @sync for i in 1 : nworkers
323
+ @spawn begin
324
+ with_logger (original_logger) do
325
+ $ workers[$ i] = robust_start_worker ($ proj_name, $ nworker_threads, $ worker_init_expr, $ ntestitems; worker_num= $ i)
326
+ end
327
+ end
328
+ end
329
+ # Now all workers are started, we can begin processing test items.
330
+ @info " Starting evaluating test items"
316
331
starting = get_starting_testitems (testitems, nworkers)
317
- @sync for i = 1 : nworkers
332
+ @sync for (i, w) in enumerate (workers)
318
333
ti = starting[i]
319
334
@spawn begin
320
- # Wrapping with the logger that was set before we eval'd any user code to
321
- # avoid world age issues when logging https://github.com/JuliaLang/julia/issues/33865
322
- with_logger (current_logger ()) do
323
- start_and_manage_worker ($ proj_name, $ testitems, $ ti, $ nworker_threads, $ worker_init_expr, $ testitem_timeout, $ retries, $ verbose_results, $ debug, $ report, $ logs)
335
+ with_logger (original_logger) do
336
+ manage_worker ($ w, $ proj_name, $ testitems, $ ti, $ nworker_threads, $ worker_init_expr, $ testitem_timeout, $ retries, $ verbose_results, $ debug, $ report, $ logs)
324
337
end
325
338
end
326
339
end
@@ -337,21 +350,51 @@ function _runtests_in_current_env(
337
350
return nothing
338
351
end
339
352
340
- function start_worker (proj_name, nworker_threads, worker_init_expr, ntestitems)
353
+ # Start a new `Worker` with `nworker_threads` threads and evaluate `worker_init_expr` on it.
354
+ # The provided `worker_num` is only for logging purposes, and not persisted as part of the worker.
355
+ function start_worker (proj_name, nworker_threads, worker_init_expr, ntestitems; worker_num= nothing )
341
356
w = Worker (; threads= " $nworker_threads " )
357
+ i = worker_num == nothing ? " " : " $worker_num "
342
358
# remote_fetch here because we want to make sure the worker is all setup before starting to eval testitems
343
359
remote_fetch (w, quote
344
360
using ReTestItems, Test
345
361
Test. TESTSET_PRINT_ENABLE[] = false
346
362
const GLOBAL_TEST_CONTEXT = ReTestItems. TestContext ($ proj_name, $ ntestitems)
347
363
GLOBAL_TEST_CONTEXT. setups_evaled = ReTestItems. TestSetupModules ()
348
- @info " Starting test item evaluations on pid = $(Libc. getpid ()) , with $(Threads. nthreads ()) threads"
364
+ @info " Starting test worker $( $ i) on pid = $(Libc. getpid ()) , with $(Threads. nthreads ()) threads"
349
365
$ (worker_init_expr. args... )
350
366
nothing
351
367
end )
352
368
return w
353
369
end
354
370
371
+ # Want to be somewhat robust to workers possibly terminating during start up (e.g. due to
372
+ # the `worker_init_expr`).
373
+ # The number of retries and delay between retries is currently arbitrary...
374
+ # we want to retry at least once, and we give a slight delay in case there are resources
375
+ # that need to be cleaned up before a new worker would be able to start successfully.
376
+ const _NRETRIES = 2
377
+ const _RETRY_DELAY_SECONDS = 1
378
+
379
+ # Start a worker, retrying up to `_NRETRIES` times if it terminates unexpectedly,
380
+ # with a delay of `_RETRY_DELAY_SECONDS` seconds between retries.
381
+ # If we fail to start a worker successfully after `_NRETRIES` retries, or if we somehow hit
382
+ # something other than a `WorkerTerminatedException`, then rethrow the exception.
383
+ function robust_start_worker (args... ; kwargs... )
384
+ f = retry (start_worker; delays= fill (_RETRY_DELAY_SECONDS, _NRETRIES), check= _worker_terminated)
385
+ f (args... ; kwargs... )
386
+ end
387
+
388
+ function _worker_terminated (state, exception)
389
+ if exception isa WorkerTerminatedException
390
+ retry_num = state - 1
391
+ @error " $(exception. worker) terminated unexpectedly. Starting new worker (retry $retry_num /$_NRETRIES )."
392
+ return true
393
+ else
394
+ return false
395
+ end
396
+ end
397
+
355
398
any_non_pass (ts:: DefaultTestSet ) = ts. anynonpass
356
399
357
400
function record_timeout! (testitem, run_number:: Int , timeout_limit:: Real )
@@ -396,12 +439,11 @@ function record_test_error!(testitem, msg, elapsed_seconds::Real=0.0)
396
439
return testitem
397
440
end
398
441
399
- function start_and_manage_worker (
400
- proj_name, testitems, testitem, nworker_threads, worker_init_expr,
442
+ function manage_worker (
443
+ worker :: Worker , proj_name, testitems, testitem, nworker_threads, worker_init_expr,
401
444
timeout:: Real , retries:: Int , verbose_results:: Bool , debug:: Int , report:: Bool , logs:: Symbol
402
445
)
403
446
ntestitems = length (testitems. testitems)
404
- worker = start_worker (proj_name, nworker_threads, worker_init_expr, ntestitems)
405
447
run_number = 1
406
448
while testitem != = nothing
407
449
ch = Channel {TestItemResult} (1 )
@@ -479,7 +521,7 @@ function start_and_manage_worker(
479
521
end
480
522
# The worker was terminated, so replace it unless there are no more testitems to run
481
523
if testitem != = nothing
482
- worker = start_worker (proj_name, nworker_threads, worker_init_expr, ntestitems)
524
+ worker = robust_start_worker (proj_name, nworker_threads, worker_init_expr, ntestitems)
483
525
end
484
526
# Now loop back around to reschedule the testitem
485
527
continue
0 commit comments