Skip to content

Commit ea52cd5

Browse files
Replace worker process when memory pressure high (#109)
* Replace worker process when memory pressure high * Make mem limit configurable * Add to docstring * Add tests * fixup! Make mem limit configurable * Bump version * Update src/ReTestItems.jl
1 parent 0c4a9d8 commit ea52cd5

File tree

4 files changed

+77
-12
lines changed

4 files changed

+77
-12
lines changed

Project.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
name = "ReTestItems"
22
uuid = "817f1d60-ba6b-4fd5-9520-3cf149f6a823"
3-
version = "1.17.0"
3+
version = "1.18.0"
44

55
[deps]
66
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"

src/ReTestItems.jl

+27-9
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ export TestSetup, TestItem, TestItemResult
1616
const RETESTITEMS_TEMP_FOLDER = mkpath(joinpath(tempdir(), "ReTestItemsTempLogsDirectory"))
1717
const DEFAULT_TESTITEM_TIMEOUT = 30*60
1818
const DEFAULT_RETRIES = 0
19+
const DEFAULT_MEMORY_THRESHOLD = 0.99
1920

2021
if isdefined(Base, :errormonitor)
2122
const errmon = Base.errormonitor
@@ -139,6 +140,14 @@ will be run.
139140
Can be used to load packages or set up the environment. Must be a `:block` expression.
140141
- `test_end_expr::Expr`: an expression that will be evaluated after each testitem is run.
141142
Can be used to verify that global state is unchanged after running a test. Must be a `:block` expression.
143+
- `memory_threshold::Real`: Sets the fraction of memory that can be in use before a worker processes are
144+
restarted to free memory. Defaults to $DEFAULT_MEMORY_THRESHOLD. Only supported with `nworkers > 0`.
145+
For example, if set to 0.8, then when >80% of the available memory is in use, a worker process will be killed and
146+
replaced with a new worker before the next testitem is evaluated. The testitem will then be run on the new worker
147+
process, regardless of if memory pressure dropped below the threshold. If the memory pressure remains above the
148+
threshold, then a worker process will again be replaced before the next testitem is evaluated.
149+
Can also be set using the `RETESTITEMS_MEMORY_THRESHOLD` environment variable.
150+
**Note**: the `memory_threshold` keyword is experimental and may be removed in future versions.
142151
- `report::Bool=false`: If `true`, write a JUnit-format XML file summarising the test results.
143152
Can also be set using the `RETESTITEMS_REPORT` environment variable. The location at which
144153
the XML report is saved can be set using the `RETESTITEMS_REPORT_LOCATION` environment variable.
@@ -189,6 +198,7 @@ function runtests(
189198
worker_init_expr::Expr=Expr(:block),
190199
testitem_timeout::Real=parse(Float64, get(ENV, "RETESTITEMS_TESTITEM_TIMEOUT", string(DEFAULT_TESTITEM_TIMEOUT))),
191200
retries::Int=parse(Int, get(ENV, "RETESTITEMS_RETRIES", string(DEFAULT_RETRIES))),
201+
memory_threshold::Real=parse(Float64, get(ENV, "RETESTITEMS_MEMORY_THRESHOLD", string(DEFAULT_MEMORY_THRESHOLD))),
192202
debug=0,
193203
name::Union{Regex,AbstractString,Nothing}=nothing,
194204
tags::Union{Symbol,AbstractVector{Symbol},Nothing}=nothing,
@@ -211,6 +221,7 @@ function runtests(
211221
end
212222
logs in LOG_DISPLAY_MODES || throw(ArgumentError("`logs` must be one of $LOG_DISPLAY_MODES, got $(repr(logs))"))
213223
report && logs == :eager && throw(ArgumentError("`report=true` is not compatible with `logs=:eager`"))
224+
(0 memory_threshold 1) || throw(ArgumentError("`memory_threshold` must be between 0 and 1, got $(repr(memory_threshold))"))
214225
# If we were given paths but none were valid, then nothing to run.
215226
!isempty(paths) && isempty(paths′) && return nothing
216227
shouldrun_combined(ti) = shouldrun(ti) && _shouldrun(name, ti.name) && _shouldrun(tags, ti.tags)
@@ -221,10 +232,10 @@ function runtests(
221232
debuglvl = Int(debug)
222233
if debuglvl > 0
223234
LoggingExtras.withlevel(LoggingExtras.Debug; verbosity=debuglvl) do
224-
_runtests(shouldrun_combined, paths′, nworkers, nworker_threads, worker_init_expr, test_end_expr, testitem_timeout, retries, verbose_results, debuglvl, report, logs)
235+
_runtests(shouldrun_combined, paths′, nworkers, nworker_threads, worker_init_expr, test_end_expr, testitem_timeout, retries, memory_threshold, verbose_results, debuglvl, report, logs)
225236
end
226237
else
227-
return _runtests(shouldrun_combined, paths′, nworkers, nworker_threads, worker_init_expr, test_end_expr, testitem_timeout, retries, verbose_results, debuglvl, report, logs)
238+
return _runtests(shouldrun_combined, paths′, nworkers, nworker_threads, worker_init_expr, test_end_expr, testitem_timeout, retries, memory_threshold, verbose_results, debuglvl, report, logs)
228239
end
229240
end
230241

@@ -238,7 +249,7 @@ end
238249
# By tracking and reusing test environments, we can avoid this issue.
239250
const TEST_ENVS = Dict{String, String}()
240251

241-
function _runtests(shouldrun, paths, nworkers::Int, nworker_threads::String, worker_init_expr::Expr, test_end_expr::Expr, testitem_timeout::Real, retries::Int, verbose_results::Bool, debug::Int, report::Bool, logs::Symbol)
252+
function _runtests(shouldrun, paths, nworkers::Int, nworker_threads::String, worker_init_expr::Expr, test_end_expr::Expr, testitem_timeout::Real, retries::Int, memory_threshold::Real, verbose_results::Bool, debug::Int, report::Bool, logs::Symbol)
242253
# Don't recursively call `runtests` e.g. if we `include` a file which calls it.
243254
# So we ignore the `runtests(...)` call in `test/runtests.jl` when `runtests(...)`
244255
# was called from the command line.
@@ -258,7 +269,7 @@ function _runtests(shouldrun, paths, nworkers::Int, nworker_threads::String, wor
258269
if is_running_test_runtests_jl(proj_file)
259270
# Assume this is `Pkg.test`, so test env already active.
260271
@debugv 2 "Running in current environment `$(Base.active_project())`"
261-
return _runtests_in_current_env(shouldrun, paths, proj_file, nworkers, nworker_threads, worker_init_expr, test_end_expr, testitem_timeout, retries, verbose_results, debug, report, logs)
272+
return _runtests_in_current_env(shouldrun, paths, proj_file, nworkers, nworker_threads, worker_init_expr, test_end_expr, testitem_timeout, retries, memory_threshold, verbose_results, debug, report, logs)
262273
else
263274
@debugv 1 "Activating test environment for `$proj_file`"
264275
orig_proj = Base.active_project()
@@ -271,7 +282,7 @@ function _runtests(shouldrun, paths, nworkers::Int, nworker_threads::String, wor
271282
testenv = TestEnv.activate()
272283
TEST_ENVS[proj_file] = testenv
273284
end
274-
_runtests_in_current_env(shouldrun, paths, proj_file, nworkers, nworker_threads, worker_init_expr, test_end_expr, testitem_timeout, retries, verbose_results, debug, report, logs)
285+
_runtests_in_current_env(shouldrun, paths, proj_file, nworkers, nworker_threads, worker_init_expr, test_end_expr, testitem_timeout, retries, memory_threshold, verbose_results, debug, report, logs)
275286
finally
276287
Base.set_active_project(orig_proj)
277288
end
@@ -281,7 +292,7 @@ end
281292

282293
function _runtests_in_current_env(
283294
shouldrun, paths, projectfile::String, nworkers::Int, nworker_threads, worker_init_expr::Expr, test_end_expr::Expr,
284-
testitem_timeout::Real, retries::Int, verbose_results::Bool, debug::Int, report::Bool, logs::Symbol,
295+
testitem_timeout::Real, retries::Int, memory_threshold::Real, verbose_results::Bool, debug::Int, report::Bool, logs::Symbol,
285296
)
286297
start_time = time()
287298
proj_name = something(Pkg.Types.read_project(projectfile).name, "")
@@ -346,7 +357,7 @@ function _runtests_in_current_env(
346357
ti = starting[i]
347358
@spawn begin
348359
with_logger(original_logger) do
349-
manage_worker($w, $proj_name, $testitems, $ti, $nworker_threads, $worker_init_expr, $test_end_expr, $testitem_timeout, $retries, $verbose_results, $debug, $report, $logs)
360+
manage_worker($w, $proj_name, $testitems, $ti, $nworker_threads, $worker_init_expr, $test_end_expr, $testitem_timeout, $retries, $memory_threshold, $verbose_results, $debug, $report, $logs)
350361
end
351362
end
352363
end
@@ -454,13 +465,20 @@ function record_test_error!(testitem, msg, elapsed_seconds::Real=0.0)
454465
end
455466

456467
function manage_worker(
457-
worker::Worker, proj_name, testitems, testitem, nworker_threads, worker_init_expr::Expr, test_end_expr::Expr,
458-
timeout::Real, retries::Int, verbose_results::Bool, debug::Int, report::Bool, logs::Symbol
468+
worker::Worker, proj_name::AbstractString, testitems::TestItems, testitem::Union{TestItem,Nothing}, nworker_threads, worker_init_expr::Expr, test_end_expr::Expr,
469+
timeout::Real, retries::Int, memory_threshold::Real, verbose_results::Bool, debug::Int, report::Bool, logs::Symbol
459470
)
460471
ntestitems = length(testitems.testitems)
461472
run_number = 1
473+
memory_threshold_percent = 100*memory_threshold
462474
while testitem !== nothing
463475
ch = Channel{TestItemResult}(1)
476+
if memory_percent() > memory_threshold_percent
477+
@warn "Memory usage ($(Base.Ryu.writefixed(memory_percent(), 1))%) is higher than threshold ($(Base.Ryu.writefixed(memory_threshold_percent, 1))%). Restarting worker process to try to free memory."
478+
terminate!(worker)
479+
wait(worker)
480+
worker = robust_start_worker(proj_name, nworker_threads, worker_init_expr, ntestitems)
481+
end
464482
testitem.workerid[] = worker.pid
465483
fut = remote_eval(worker, :(ReTestItems.runtestitem($testitem, GLOBAL_TEST_CONTEXT; test_end_expr=$(QuoteNode(test_end_expr)), verbose_results=$verbose_results, logs=$(QuoteNode(logs)))))
466484
max_runs = 1 + max(retries, testitem.retries)

src/log_capture.jl

+4-2
Original file line numberDiff line numberDiff line change
@@ -140,11 +140,13 @@ _has_logs(ti::TestItem, i=nothing) = (path = logpath(ti, i); (isfile(path) && fi
140140
# Stats to help diagnose OOM issues.
141141
_mem_watermark() = string(
142142
# Tracks the peak memory usage of a process / worker
143-
"maxrss ", lpad(Base.Ryu.writefixed(100 * Float64(Sys.maxrss()/Sys.total_memory()), 1), 4),
143+
"maxrss ", lpad(Base.Ryu.writefixed(maxrss_percent(), 1), 4),
144144
# Total memory pressure on the machine
145-
"% | mem ", lpad(Base.Ryu.writefixed(100 * Float64(1 - (Sys.free_memory()/Sys.total_memory())), 1), 4),
145+
"% | mem ", lpad(Base.Ryu.writefixed(memory_percent(), 1), 4),
146146
"% | "
147147
)
148+
maxrss_percent() = 100 * Float64(Sys.maxrss()/Sys.total_memory())
149+
memory_percent() = 100 * Float64(1 - (Sys.free_memory()/Sys.total_memory()))
148150

149151
"""
150152
print_errors_and_captured_logs(ti::TestItem, run_number::Int; logs=:batched, errors_first=false)

test/integrationtests.jl

+45
Original file line numberDiff line numberDiff line change
@@ -946,4 +946,49 @@ end
946946
end
947947
end
948948

949+
@testset "Replace workers when we hit memory threshold" begin
950+
using IOCapture
951+
file = joinpath(TEST_FILES_DIR, "_happy_tests.jl")
952+
try
953+
# monkey-patch the internal `memory_percent` function to return a fixed value, so we
954+
# can control if we hit the `memory_threshold`.
955+
@eval ReTestItems.memory_percent() = 83.1
956+
expected_warning = "Warning: Memory usage (83.1%) is higher than threshold (7.0%). Restarting worker process to try to free memory."
957+
958+
# Pass `memory_threshold` keyword, and hit the memory threshold.
959+
c1 = IOCapture.capture() do
960+
encased_testset(()->runtests(file; nworkers=1, memory_threshold=0.07))
961+
end
962+
results1 = c1.value
963+
@test all_passed(results1)
964+
@test contains(c1.output, expected_warning)
965+
966+
# Set the `RETESTITEMS_MEMORY_THRESHOLD` env variable, and hit the memory threshold.
967+
c2 = IOCapture.capture() do
968+
withenv("RETESTITEMS_MEMORY_THRESHOLD" => 0.07) do
969+
encased_testset(()->runtests(file; nworkers=1))
970+
end
971+
end
972+
results2 = c2.value
973+
@test all_passed(results2)
974+
@test contains(c2.output, expected_warning)
975+
976+
# Set the memory_threshold, but don't hit it.
977+
c3 = IOCapture.capture() do
978+
withenv("RETESTITEMS_MEMORY_THRESHOLD" => 0.9) do
979+
encased_testset(()->runtests(file; nworkers=1))
980+
end
981+
end
982+
results3 = c3.value
983+
@test all_passed(results3)
984+
@test !contains(c3.output, expected_warning)
985+
finally
986+
@eval ReTestItems.memory_percent() = 100 * Float64(Sys.maxrss()/Sys.total_memory())
987+
end
988+
xx = 99
989+
err_msg = "ArgumentError: `memory_threshold` must be between 0 and 1, got $xx"
990+
expected_err = VERSION < v"1.8" ? ArgumentError : err_msg
991+
@test_throws expected_err runtests(file; nworkers=1, memory_threshold=xx)
992+
end
993+
949994
end # integrationtests.jl testset

0 commit comments

Comments
 (0)