diff --git a/README.md b/README.md index be43a9b..592b522 100644 --- a/README.md +++ b/README.md @@ -366,44 +366,94 @@ remove **all** output from the cell. You can restore previous behavior by callin `ProgressMeter.ijulia_behavior(:append)`. You can enable it again by calling `ProgressMeter.ijulia_behavior(:clear)`, which will also disable the warning message. -### Tips for parallel programming +### Parallel programming with `ParallelProgress` -For remote parallelization, when multiple processes or tasks are being used for a computation, -the workers should communicate back to a single task for displaying the progress bar. This -can be accomplished with a `RemoteChannel`: +`ParallelProgress` wraps an `AbstractProgress` to allow updating from other workers, +by going through a `RemoteChannel` ```julia -using ProgressMeter using Distributed +addprocs(2) +@everywhere using ProgressMeter n_steps = 20 -p = Progress(n_steps) -channel = RemoteChannel(() -> Channel{Bool}(), 1) +p = ParallelProgress(n_steps) # introduce a long-running dummy task to all workers @everywhere long_task() = sum([ 1/x for x in 1:100_000_000 ]) @time long_task() # a single execution is about 0.3 seconds -@sync begin # start two tasks which will be synced in the very end - # the first task updates the progress bar - @async while take!(channel) - next!(p) +@distributed (+) for i in 1:n_steps + long_task() + next!(p) + i^2 +end + +finish!(p) +``` + +Here, returning some number `i^2` and reducing it somehow `(+)`is necessary to make the distribution happen. +`finish!(p)` or `close(p)` makes sure that the underlying `Channel` is closed + +```julia +pu = ParallelProgress(ProgressUnknown(color=:red)) +pt = ParallelProgress(ProgressThresh(0.1, desc="Optimizing...")) +close(pu) +close(pt) +``` + +### Mutliple progressbars across multiple workers with `MutlipleProgress` + +`MultipleProgress` combines multiple progressbars, allowing them to update simultaneously across multiple workers + +```julia +using Distributed +addprocs(2) +@everywhere using ProgressMeter + +progs = [Progress(10; desc="task $i ") for i in 1:5] +mainprog = Progress(50; desc="global ") +p = MultipleProgress(progs, mainprog) +res = pmap(1:5) do i + for _ in 1:10 + sleep(rand()) + next!(p[i]) end + sleep(0.01) + myid() +end +close(p) +``` - # the second task does the computation - @async begin - @distributed (+) for i in 1:n_steps - long_task() - put!(channel, true) # trigger a progress bar update - i^2 - end - put!(channel, false) # this tells the printing task to finish +``` +global 100%|██████████████████████████████████████████████| Time: 0:00:24 +task 4 100%|██████████████████████████████████████████████| Time: 0:00:05 +task 5 100%|██████████████████████████████████████████████| Time: 0:00:05 +``` + +the progress bars can be passed in an `Vector` or a `Dict`. In the first case the main progress bar can be +updated with `p[0]`, otherwise `p[:main]` or by specifying the kwarg `main` when building the `MultipleProgress` + +If no main progress is given, one will be automatically generated. See `?MultipleProgress` for all available options. + +Additional progress bars can be added from another worker: + +```julia +p = MultipleProgress(Progress(10; desc="tasks done "); count_finishes=true) +sleep(0.1) +pmap(1:10) do i + N = rand(20:50) + p[i] = Progress(N; desc=" task $i ") + for _ in 1:N + next!(p[i]) + sleep(0.05) end end +close(p) ``` -Here, returning some number `i^2` and reducing it somehow `(+)` -is necessary to make the distribution happen. +the main progress bar can be a `Progress` or a `ProgessUnknown`, +while the other progresses can be any `AbstractProgress` ### `progress_map` diff --git a/src/ProgressMeter.jl b/src/ProgressMeter.jl index 43d308b..3f5a8f1 100644 --- a/src/ProgressMeter.jl +++ b/src/ProgressMeter.jl @@ -3,7 +3,9 @@ module ProgressMeter using Printf: @sprintf using Distributed -export Progress, ProgressThresh, ProgressUnknown, BarGlyphs, next!, update!, cancel, finish!, @showprogress, progress_map, progress_pmap, ijulia_behavior +export Progress, ProgressThresh, ProgressUnknown, BarGlyphs, next!, update!, cancel, + finish!, @showprogress, progress_map, progress_pmap, ijulia_behavior, + MultipleProgress, ParallelProgress """ `ProgressMeter` contains a suite of utilities for displaying progress @@ -1105,6 +1107,7 @@ function ncalls_map(args...) return minimum(length, args) end +include("parallel_progress.jl") include("deprecated.jl") end # module diff --git a/src/parallel_progress.jl b/src/parallel_progress.jl new file mode 100644 index 0000000..bdf32be --- /dev/null +++ b/src/parallel_progress.jl @@ -0,0 +1,380 @@ + +mutable struct ParallelProgress <: AbstractProgress + channel +end + +@enum ProgressAction begin + PP_ADD + PP_PING + PP_NEXT + PP_CANCEL + PP_FINISH + PP_UPDATE +end + +function try_put!(pp::ParallelProgress, x) + # try to put! in channel, but don't error if closed + try + put!(pp.channel, x) + catch e + if e == Base.closed_exception() || + e isa RemoteException && e.captured isa CapturedException && e.captured.ex == Base.closed_exception() + # progress has finished or been closed + pp.channel = FakeChannel() + else + rethrow() + end + end + return nothing +end + +ping(pp::ParallelProgress, args...; kw...) = try_put!(pp, (PP_PING, args, kw)) +next!(pp::ParallelProgress, args...; kw...) = try_put!(pp, (PP_NEXT, args, kw)) +cancel(pp::ParallelProgress, args...; kw...) = try_put!(pp, (PP_CANCEL, args, kw)) +finish!(pp::ParallelProgress, args...; kw...) = try_put!(pp, (PP_FINISH, args, kw)) +update!(pp::ParallelProgress, args...; kw...) = try_put!(pp, (PP_UPDATE, args, kw)) + +""" +`ParallelProgress(n; kw...)` + +works like `Progress` but can be used from other workers + +# Example: +```julia +using Distributed +addprocs() +@everywhere using ProgressMeter +prog = ParallelProgress(10; desc="test ") +pmap(1:10) do i + sleep(rand()) + next!(prog) + return myid() +end +``` +""" +function ParallelProgress(n::Integer; kw...) + return ParallelProgress(Progress(n; kw...)) +end + +""" +`ParallelProgress(p::AbstractProgress)` + +wrapper around any `Progress`, `ProgressThresh` and `ProgressUnknown` that can be used +from other workers + +""" +function ParallelProgress(progress::AbstractProgress) + channel = RemoteChannel(() -> Channel{NTuple{3,Any}}(1024)) + pp = ParallelProgress(channel) + + @async begin + try + while !has_finished(progress) && !has_finished(pp) + f, args, kw = take!(channel) + if f == PP_PING + print(args...) + elseif f == PP_NEXT + next!(progress, args...; kw...) + elseif f == PP_CANCEL + cancel(progress, args...; kw...) + break + elseif f == PP_FINISH + finish!(progress, args...; kw...) + break + elseif f == PP_UPDATE + update!(progress, args...; kw...) + end + end + catch err + println() + # channel closed should only happen from Base.close(pp), which isn't an error + if err != Base.closed_exception() + bt = catch_backtrace() + showerror(stderr, err, bt) + println() + end + finally + close(pp) + end + end + return pp +end + + +""" + FakeChannel() + +fake RemoteChannel that doesn't put anything anywhere (for allowing overshoot) +""" +struct FakeChannel end +Base.close(::FakeChannel) = nothing +Base.isready(::FakeChannel) = false +Distributed.put!(::FakeChannel, _...) = nothing + +struct MultipleChannel + channel + id +end +Distributed.put!(mc::MultipleChannel, x) = put!(mc.channel, (mc.id, x...)) + +mutable struct MultipleProgress + channel + main +end + +Base.getindex(mp::MultipleProgress, n) = ParallelProgress.(MultipleChannel.(Ref(mp.channel), n)) +Base.setindex!(mp::MultipleProgress, p::AbstractProgress, n) = (put!(mp.channel, (n, PP_ADD, (p,), ())); nothing) + +""" + MultipleProgress(progresses::AbstractDict{T, <:AbstractProgress}, + [mainprogress::AbstractProgress]; + main = T<:Number ? 0 : :main, + enabled = true, + auto_close = true, + count_finishes = false, + count_overshoot = false, + auto_reset_timer = true) + +allows to call the `progresses` and `mainprogress` from different workers + - `progresses`: contains the different progressbars, can also be an `AbstractVector` + - `mainprogress`: main progressbar, defaults to `Progress` or `ProgressUnknown`, according to `count_finishes` and whether all progresses have known length or not + - `main`: how `mainprogress` should be called. Defaults to `0` or `:main` + - `enabled`: `enabled == false` doesn't show anything and doesn't open a channel + - `auto_close`: if true, the channel will close when all progresses are finished, otherwise, when mainprogress finishes or with `close(p)` + - `count_finishes`: if false, main_progress will be the sum of the individual progressbars, if true, it will be equal to the number of finished progressbars + - `count_overshoot`: overshooting progressmeters will be counted in the main progressmeter + - `auto_reset_timer`: tinit in progresses will be reset at first call + + +# Example +```julia +using Distributed +addprocs(2) +@everywhere using ProgressMeter +p = MultipleProgress([Progress(10; desc="task \$i ") for i in 1:5], Progress(50; desc="global ")) +pmap(1:5) do x + for i in 1:10 + sleep(rand()) + next!(p[x]) + end + sleep(0.01) + myid() +end +``` +""" +function MultipleProgress(progresses::AbstractVector{<:AbstractProgress}, + mainprogress::AbstractProgress; + main = 0, + kw...) + return MultipleProgress(Dict(pairs(progresses)), mainprogress; main=main, kw...) +end + +function MultipleProgress(progresses::AbstractVector{<:AbstractProgress}; main=0, kw...) + return MultipleProgress(Dict(pairs(progresses)); main=main, kw...) +end + +function MultipleProgress(progresses::AbstractDict{T,<:AbstractProgress}; + kwmain = (), + count_finishes = false, + kw...) where T + if count_finishes + mainprogress = Progress(length(progresses); kwmain...) + elseif valtype(progresses) <: Progress + mainprogress = Progress(sum(p->p.n, values(progresses)); kwmain...) + else + mainprogress = ProgressUnknown(; kwmain...) + end + return MultipleProgress(progresses, mainprogress; count_finishes=count_finishes, kw...) +end + +function MultipleProgress(progresses::AbstractDict{T,<:AbstractProgress}, + mainprogress::AbstractProgress; + main = T<:Number ? 0 : :main, + kwmain = (), + enabled = true, + auto_close = true, + count_finishes = false, + count_overshoot = false, + auto_reset_timer = true) where T + + !enabled && return MultipleProgress(FakeChannel(), main) + + if main ∈ keys(progresses) + error("`main=$main` cannot be used in `progresses`") + end + + channel = RemoteChannel(() -> Channel{NTuple{4,Any}}(1024)) + + mp = MultipleProgress(channel, main) + @async runMultipleProgress(progresses, mainprogress, mp; + auto_close=auto_close, + count_finishes=count_finishes, + count_overshoot=count_overshoot, + auto_reset_timer=auto_reset_timer) + return mp +end + +""" + MultipleProgress(mainprogress=ProgressUnknown(); main=0, auto_close=false, kw...) + +is equivalent to + + progresses = Dict{typeof(main), AbstractProgress}() + return MultipleProgress(progresses, mainprogress; main, auto_close, kw...) + +See also: `addprogress!` +""" +function MultipleProgress(mainprogress::AbstractProgress=ProgressUnknown(); main=0, auto_close=false, kw...) + progresses = Dict{typeof(main), AbstractProgress}() + return MultipleProgress(progresses, mainprogress; main=main, auto_close=auto_close, kw...) +end + +function runMultipleProgress(progresses::AbstractDict{T,<:AbstractProgress}, + mainprogress::AbstractProgress, + mp::MultipleProgress; + auto_close = true, + count_finishes = false, + count_overshoot = false, + auto_reset_timer = true) where T + max_offsets = 1 + try + for p in values(progresses) + p.offset = -1 + end + + channel = mp.channel + taken_offsets = Set{Int}() + + mainprogress.offset = 0 + # we must make sure that 2 progresses aren't updated at the same time, + # that's why we use only one Channel + while !has_finished(mainprogress) + + p, f, args, kwt = take!(channel) + + if f == PP_PING + println(args...) + elseif p == mp.main # main progressbar + if f == PP_CANCEL + finish!(mainprogress; keep=false) + cancel(mainprogress, args...; kwt..., keep=false) + break + elseif f == PP_UPDATE + update!(mainprogress, args...; kwt..., keep=false) + elseif f == PP_NEXT + next!(mainprogress, args...; kwt..., keep=false) + elseif f == PP_FINISH + finish!(mainprogress, args...; kwt..., keep=false) + break + else + error("action `$f` not applicable to main progressmeter") + end + else + # add progress + if f == PP_ADD + if p ∈ keys(progresses) + error("key `$p` already in use") + end + newprog = args[1] + newprog.output = mainprogress.output + newprog.offset = -1 + progresses[p] = newprog + continue + end + + # if first time calling progress p + if progresses[p].offset == -1 + # find first available offset + offset = 1 + while offset ∈ taken_offsets + offset += 1 + end + max_offsets = max(max_offsets, offset) + progresses[p].offset = offset + if auto_reset_timer + progresses[p].tinit = time() + end + push!(taken_offsets, offset) + end + + already_finished = has_finished(progresses[p]) + + if f == PP_NEXT + if count_overshoot || !has_finished(progresses[p]) + next!(progresses[p], args...; kwt..., keep=false) + !count_finishes && next!(mainprogress; keep=false) + end + else + prev_p_counter = progresses[p].counter + + if f == PP_FINISH + finish!(progresses[p], args...; kwt..., keep=false) + elseif f == PP_CANCEL + finish!(progresses[p]; keep=false) + cancel(progresses[p], args...; kwt..., keep=false) + elseif f == PP_UPDATE + if !isempty(args) + value = args[1] + if !count_overshoot && progresses[p] isa Progress + value = min(value, progresses[p].n) # avoid overshoot + end + update!(progresses[p], value, args[2:end]...; kwt..., keep=false) + else + update!(progresses[p]; kwt..., keep=false) + end + end + + if !count_finishes + update!(mainprogress, mainprogress.counter-prev_p_counter+progresses[p].counter; keep=false) + end + end + + if !already_finished && has_finished(progresses[p]) + delete!(taken_offsets, progresses[p].offset) + count_finishes && next!(mainprogress; keep=false) + + if auto_close && all(has_finished, values(progresses)) + break + end + end + end + end + catch err + # channel closed should only happen from Base.close(mp), which isn't an error + if err != Base.closed_exception() + bt = catch_backtrace() + println() + showerror(stderr, err, bt) + println() + end + finally + close(mp) + println("\n"^max_offsets) + end +end + +""" + close(p::Union{ParallelProgress,MultipleProgress}) + +empties and closes the channel of the progress and replaces it with a `FakeChannel` to allow overshoot + +""" +function Base.close(p::Union{ParallelProgress,MultipleProgress}) + channel = p.channel + p.channel = FakeChannel() + while isready(channel) # empty channel to avoid waiting `put!` + take!(channel) + end + close(channel) +end + +has_finished(p::Progress) = p.counter >= p.n +has_finished(p::ProgressThresh) = p.triggered +has_finished(p::ProgressUnknown) = p.done +has_finished(p::ParallelProgress) = isfakechannel(p.channel) +has_finished(p::MultipleProgress) = isfakechannel(p.channel) + +isfakechannel(::AbstractChannel) = false +isfakechannel(::RemoteChannel) = false +isfakechannel(::FakeChannel) = true +isfakechannel(mc::MultipleChannel) = isfakechannel(mc.channel) diff --git a/test/runtests.jl b/test/runtests.jl index dd82e24..78c8b40 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,4 +1,5 @@ using ProgressMeter +using Distributed using Test if get(ENV, "CI", "false") == "true" @@ -22,6 +23,10 @@ end @testset "Threading" begin include("test_threads.jl") end +@testset "Parallel" begin + include("test_parallel.jl") + include("test_multiple.jl") +end @testset "Deprecated" begin include("deprecated.jl") end diff --git a/test/test_multiple.jl b/test/test_multiple.jl new file mode 100644 index 0000000..56fe579 --- /dev/null +++ b/test/test_multiple.jl @@ -0,0 +1,359 @@ +using Distributed +using ProgressMeter: has_finished, isfakechannel + +@testset "MultipleProgress() tests" begin + + np = nworkers() + np == 1 && @info "incomplete tests: nworkers() == 1" + @test all([@fetchfrom w @isdefined(ProgressMeter) for w in workers()]) + + println("Testing MultipleProgress") + + c = Channel(10) + @test !isfakechannel(c) + close(c) + + println("Testing update!") + p = MultipleProgress([Progress(100)]) + for _ in 1:5 + sleep(0.2) + next!(p[1]) + end + update!(p[1], 95) + for _ in 96:99 + sleep(0.2) + next!(p[1]) + end + @test !waitfor(()->isfakechannel(p.channel)) + @test !isfakechannel(p[1].channel) + @test !has_finished(p) + next!(p[1]) + @test waitfor(()->isfakechannel(p.channel)) + @test isfakechannel(p[1].channel) + @test has_finished(p) + + println("Testing MultipleProgress with custom titles and color") + p = MultipleProgress( + [Progress(10; color=:red, desc=" red "), + Progress(10; desc=" default color ")], + kwmain=(desc="yellow ", color=:yellow) + ) + for _ in 1:9 + sleep(0.1) + next!.(p[1:2]) + end + @test !waitfor(()->has_finished(p)) + next!.(p[1:2]) + @test waitfor(()->has_finished(p)) + + println("Testing with Dicts, :main changes to red") + p = MultipleProgress( + Dict(:a => Progress(10, desc="task :a "), + :b => Progress(10, desc="task :b ")), + kwmain=(desc=":main ",) + ) + @test p.main == :main + for _ in 1:9 + sleep(0.1) + next!(p[:a]) + next!(p[:b]) + end + update!(p[:main], 18, color=:red) + @test !has_finished(p) + next!(p[:a]) + next!(p[:b]) + @test waitfor(()->has_finished(p)) + + println("Testing forbidden main keys") + @test_throws ErrorException MultipleProgress( + Dict(:main => Progress(10, desc="task :a "), + :b => Progress(10, desc="task :b ")), + ) + @test_throws ErrorException MultipleProgress( + Dict(0 => Progress(10, desc="task :a "), + 1 => Progress(10, desc="task :b ")), + ) + @test_throws ErrorException MultipleProgress( + Dict(:a => Progress(10, desc="task :a "), + :b => Progress(10, desc="task :b ")); + main=:a + ) + + println("Testing adding same key twice (should display error)") + p = MultipleProgress(; main="main", auto_close=false) + p["a"] = Progress(10; color=:red) + for i in 1:10 + sleep(0.1) + next!(p["a"]) + end + @test !waitfor(()->has_finished(p)) + println() + p["a"] = Progress(100) + @test waitfor(()->has_finished(p); tmax=5tmax) + + println("Testing over-shooting and under-shooting") + p = MultipleProgress(Progress.([5, 14]), count_overshoot=false) + for _ in 1:10 + sleep(0.1) + next!.(p[1:2]) + end + finish!(p[2]) + @test waitfor(()->has_finished(p)) + + println("Testing over-shooting with count_overshoot") + p = MultipleProgress(Progress.([5, 15]), count_overshoot=true) + next!.(p[1:2]) + sleep(0.1) + next!.(p[1:2]) + for _ in 1:15 + sleep(0.1) + next!(p[2]) + end + next!(p[2]) + @test waitfor(()->has_finished(p)) + + println("Testing rapid over-shooting") + p = MultipleProgress([Progress(10)], count_overshoot=true) + next!(p[1]) + sleep(0.1) + pmap(1:10000) do _ + next!(p[1]) + end + @test waitfor(()->has_finished(p)) + + println("Testing early cancel") + p = MultipleProgress(Progress.([10, 8])) + for _ in 1:5 + sleep(0.2) + next!(p[1]) + next!(p[2]) + end + cancel(p[1]) + finish!(p[2]) + @test waitfor(()->has_finished(p)) + + println("Testing early cancel main progress") + p = MultipleProgress(Progress.([10, 8])) + for _ in 1:5 + sleep(0.2) + next!(p[1]) + next!(p[2]) + end + cancel(p[0]) + @test waitfor(()->has_finished(p)) + + println("Testing early finish main progress") + p = MultipleProgress(Progress.([10, 8])) + for _ in 1:5 + sleep(0.2) + next!(p[1]) + next!(p[2]) + end + finish!(p[0]) + @test waitfor(()->has_finished(p)) + + println("Testing next! on main progress") + p = MultipleProgress([Progress(10)]) + for _ in 1:9 + sleep(0.02) + next!(p[1]) + end + next!(p[0]) + @test waitfor(()->has_finished(p)) + + println("Testing bar remplacement with $np workers and pmap") + lengths = rand(6:10, 2*np) + progresses = [Progress(lengths[i], desc=" task $i ") for i in 1:2np] + p = MultipleProgress(progresses) + ids = pmap(1:2*np) do ip + for _ in 1:lengths[ip] + sleep(0.2) + next!(p[ip]) + end + myid() + end + @test waitfor(()->has_finished(p)) + @test length(unique(ids)) == np + + println("Testing changing color with next! and update!") + p = MultipleProgress(Progress.([12,12])) + for i in 1:12 + sleep(0.01) + if i == 3 + next!(p[1], color=:red) + next!(p[2]) + elseif i == 6 + next!(p[1]) + update!(p[2], 51, color=:blue) + else + if i == 9 + next!(p[0]; color=:yellow, step=0) + end + next!(p[1]) + next!(p[2]) + end + end + @test waitfor(()->has_finished(p)) + + println("Testing changing desc with next! and update!") + p = MultipleProgress(Progress.([10,10,10])) + for i in 1:10 + sleep(0.5) + if i == 2 + next!(p[1], desc="20% done ") + next!.(p[2:3]) + elseif i == 4 + next!.(p[[1,3]]) + update!(p[2], 5, desc="40% done ") + else + if i == 6 + update!(p[0], desc="60% done ") + elseif i == 8 + update!(p[3], desc="80% done ") + end + next!.(p[1:3]) + end + end + @test waitfor(()->has_finished(p)) + + println("Update and finish all") + p = MultipleProgress(Progress.([10,10,10])) + for i in 1:100 + rand() < 0.9 && next!(p[1]) + rand() < 0.8 && next!(p[2]) + rand() < 0.7 && next!(p[3]) + sleep(0.2) + end + next!.(p[0:3], color=:red, step=0) + sleep(0.5) + finish!.(p[1:3]) + @test waitfor(()->has_finished(p)) + + println("Testing without main progressmeter and offset 0 finishes last") + p = MultipleProgress(Progress.([10,10,10]), kwmain=(enabled=false,)) + update!(p[0], 1, color=:red, desc="I shouldn't exist ") + for i in 1:8 + next!(p[1], desc="task a ") + rand() < 0.9 && next!(p[2], desc="task b ") + rand() < 0.8 && next!(p[3], desc="task c ") + sleep(0.2) + end + @test !waitfor(()->has_finished(p)) + finish!.(p[3:-1:1]) + @test waitfor(()->has_finished(p)) + + println("Testing without main progressmeter and offset 0 finishes first (#215)") + p = MultipleProgress(Progress.([10,10,10]), kwmain=(enabled=false,)) + update!(p[0], 1, color=:red, desc="I shouldn't exist ") + for i in 1:9 + next!(p[1], desc="task a ") + rand() < 0.9 && next!(p[2], desc="task b ") + rand() < 0.8 && next!(p[3], desc="task c ") + sleep(0.2) + end + finish!.(p[1:3]) + @test waitfor(()->has_finished(p)) + + println("Testing early close (should not display error)") + p = MultipleProgress([Progress(10, desc="Close test")]) + for i in 1:3 + sleep(0.1) + next!(p[1]) + end + close(p) + @test waitfor(()->has_finished(p)) + + println("Testing errors in MultipleProgress (should display error)") + p = MultipleProgress(Progress.([10]), kwmain=(desc="Error test", color=:red)) + for i in 1:3 + sleep(0.1) + next!(p[1]) + end + next!(p[1], 1) + @test waitfor(()->has_finished(p);tmax=10tmax) + + println("Testing with showvalues (doesn't really work)") + p = MultipleProgress(Progress.([10,10])) + for i in 1:10 + sleep(0.2) + next!(p[1]; showvalues = Dict(:i=>i)) + next!(p[2]; showvalues = [i=>i, "longstring"=>"WXYZ"^i]) + end + @test waitfor(()->has_finished(p)) + print("\n"^5) + + println("Testing with enabled=false") + p = MultipleProgress(Progress.([100, 100]), Progress(200); enabled = false) + @test has_finished(p) + next!.(p[0:2]) + update!.(p[0:2]) + finish!.(p[0:2]) + cancel.(p[0:2]) + close(p) + p[3] = Progress(100) + + println("Testing MultipleProgress with ProgressUnknown as mainprogress") + p = MultipleProgress([Progress(10), ProgressUnknown(),ProgressThresh(0.1)]; count_finishes=false) + for i in 1:10 + sleep(0.2) + next!(p[1]) + next!(p[2]) + update!(p[3], 1/i) + end + @test !waitfor(()->has_finished(p)) + finish!(p[2]) + @test waitfor(()->has_finished(p)) + + println("Testing MultipleProgress with count_finishes") + p = MultipleProgress([ProgressUnknown(), Progress(5), ProgressThresh(0.5)]; + count_finishes=true, kwmain=(dt=0, desc="Tasks finished ")) + update!(p[0], 0) + for i in 1:10 + sleep(0.2) + next!(p[1]) + next!(p[2]) + update!(p[3], 1/i) + end + @test !has_finished(p) + finish!(p[1]) + @test waitfor(()->has_finished(p)) + + N = 4*np + println("Testing adding $N progresses with $np workers") + p = MultipleProgress(Progress(N, desc="tasks done "); count_finishes=true) + @test !has_finished(p) + pmap(1:N) do ip + L = rand(20:50) + p[ip] = Progress(L, desc=" $(string(ip,pad=2)) (id=$(myid())) ") + for _ in 1:L + sleep(0.05) + next!(p[ip]) + end + end + @test waitfor(()->has_finished(p)) + + N = 4*np + println("Testing adding $N mixed progresses with $np workers") + p = MultipleProgress(ProgressUnknown(spinner=true)) + pmap(1:N) do ip + L = rand(20:50) + if ip%3 == 0 + p[ip] = Progress(L, desc=" $(string(ip,pad=2)) (id=$(myid())) ") + elseif ip%3 == 1 + p[ip] = ProgressUnknown(desc=" $(string(ip,pad=2)) (id=$(myid())) ", spinner=true) + else + p[ip] = ProgressThresh(1/L, desc=" $(string(ip,pad=2)) (id=$(myid())) ") + end + for i in 1:L + sleep(0.05) + if ip%3 == 2 + update!(p[ip], 1/i) + else + next!(p[ip]) + end + end + ip%3 == 1 && finish!(p[ip]) + end + finish!(p[0]) + @test waitfor(()->has_finished(p)) +end \ No newline at end of file diff --git a/test/test_parallel.jl b/test/test_parallel.jl new file mode 100644 index 0000000..4680c6f --- /dev/null +++ b/test/test_parallel.jl @@ -0,0 +1,202 @@ +using Distributed +using ProgressMeter: has_finished + +nworkers() == 1 && addprocs(4) +@everywhere using ProgressMeter + +# additional time before checking if progressbar has finished during CI +if get(ENV, "CI", "false") == "true" + dt = 0.1 + tmax = 5 +else + dt = 0.1 + tmax = 1 +end + +function waitfor(f; tmax=tmax, dt=dt) + t0 = time() + while time() - t0 < tmax + f() && return true + sleep(dt) + end + return false +end + +@testset "ParallelProgress() tests" begin + + np = nworkers() + np == 1 && @info "incomplete tests: nworkers() == 1" + @test all([@fetchfrom w @isdefined(ProgressMeter) for w in workers()]) + + println("Testing ParallelProgress") + println("Testing simultaneous updates") + p = ParallelProgress(100) + @sync for _ in 1:10 + @async for _ in 1:10 + sleep(0.1) + next!(p) + end + end + @test waitfor(()->has_finished(p)) + + println("Testing update!") + prog = Progress(100) + p = ParallelProgress(prog) + for _ in 1:5 + sleep(0.3) + next!(p) + end + update!(p, 95) + for _ in 96:100 + sleep(0.3) + next!(p) + end + @test waitfor(()->has_finished(p)) + + println("Testing over-shooting") + p = ParallelProgress(10) + for _ in 1:100 + sleep(0.01) + next!(p) + end + @test waitfor(()->has_finished(p)) + + println("Testing under-shooting") + p = ParallelProgress(200) + for _ in 1:10 + sleep(0.1) + next!(p) + end + finish!(p) + @test waitfor(()->has_finished(p)) + + println("Testing rapid over-shooting") + p = ParallelProgress(10) + next!(p) + sleep(0.1) + for _ in 1:10000 + next!(p) + end + @test waitfor(()->has_finished(p)) + + println("Testing early cancel") + p = ParallelProgress(10) + for _ in 1:5 + sleep(0.2) + next!(p) + end + cancel(p) + @test waitfor(()->has_finished(p)) + + println("Testing across $np workers with @distributed") + n = 10 #per core + p = ParallelProgress(n*np) + @sync @distributed for _ in 1:n*np + sleep(0.2) + next!(p) + end + @test waitfor(()->has_finished(p)) + + println("Testing across $np workers with @distributed and reduce") + n = 10 #per core + p = ParallelProgress(n*np) + res = @distributed (+) for i in 1:n*np + sleep(0.2) + next!(p) + i^2 + end + @test res == sum(i->i^2, 1:n*np) + @test waitfor(()->has_finished(p)) + + println("Testing across $np workers with pmap") + n = 10 + p = ParallelProgress(n*np) + ids = pmap(1:n*np) do i + sleep(0.2) + next!(p) + return myid() + end + @test waitfor(()->has_finished(p)) + @test length(unique(ids)) == np + + println("Testing changing color with next! and update!") + p = ParallelProgress(10) + for i in 1:10 + sleep(0.5) + if i == 3 + next!(p; color=:red) + elseif i == 6 + update!(p, 7; color=:blue) + else + next!(p) + end + end + @test waitfor(()->has_finished(p)) + + println("Testing changing desc with next! and update!") + p = ParallelProgress(10) + for i in 1:10 + sleep(0.5) + if i == 3 + next!(p; desc="30% done ") + elseif i == 6 + update!(p, 7; desc="60% done ") + else + next!(p) + end + end + @test waitfor(()->has_finished(p)) + + println("Testing with showvalues") + p = ParallelProgress(20) + for i in 1:20 + sleep(0.1) + # if i < 10 + next!(p; showvalues=Dict(:i => i, "longstring" => "ABCD"^i)) + # else #? lazy broken? + # next!(p; showvalues=() -> [(:i, "$i"), ("halfdone", true)]) + # end + end + @test waitfor(()->has_finished(p)) + + println("Testing with ProgressUnknown") + p = ParallelProgress(ProgressUnknown()) + for i in 1:10 + sleep(0.2) + next!(p) + end + sleep(0.5) + update!(p, 200) + @test !waitfor(()->has_finished(p)) + finish!(p) + @test waitfor(()->has_finished(p)) + + println("Testing with ProgressThresh") + p = ParallelProgress(ProgressThresh(10)) + for i in 20:-1:0 + sleep(0.2) + update!(p, i) + end + @test waitfor(()->has_finished(p)) + + println("Testing early close (should not display error)") + p = ParallelProgress(10; desc="Close test") + for i in 1:3 + sleep(0.1) + next!(p) + end + @test !waitfor(()->has_finished(p)) + close(p) + @test waitfor(()->has_finished(p)) + + println("Testing errors in ParallelProgress (should display error)") + @test_throws MethodError next!(Progress(10), 1) + p = ParallelProgress(10, desc="Error test", color=:red) + for i in 1:3 + sleep(0.1) + next!(p) + end + next!(p, 1) + @test waitfor(()->has_finished(p); tmax=10tmax) + sleep(1) +end