Skip to content

Commit 108e83a

Browse files
committed
Add DaggerMPI subpackage for MPI integrations
Allows the `DArray` to participate in MPI operations, by building on the newly-added support for "MPI-style" partitioning (i.e. only the rank-local partition is stored), via a new `MPIBlocks` partitioner. This commit also implements MPI-powered `distribute` and `reduce` operations for `MPIBlocks`-partitioned arrays, which support a variety of distribution schemes and data transfer modes.
1 parent eee71ec commit 108e83a

File tree

2 files changed

+108
-0
lines changed

2 files changed

+108
-0
lines changed

lib/DaggerMPI/Project.toml

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
name = "DaggerMPI"
2+
uuid = "37bfb287-2338-4693-8557-581796463535"
3+
authors = ["Felipe de Alcântara Tomé <[email protected]>", "Julian P Samaroo <[email protected]>"]
4+
version = "0.1.0"
5+
6+
[deps]
7+
Dagger = "d58978e5-989f-55fb-8d15-ea34adc7bf54"
8+
MPI = "da04e1cc-30fd-572f-bb4f-1f8673147195"

lib/DaggerMPI/src/DaggerMPI.jl

+100
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
module DaggerMPI
2+
using Dagger
3+
import Base: reduce, fetch, cat
4+
using MPI
5+
6+
export MPIBlocks
7+
8+
struct MPIBlocks{N} <: Dagger.AbstractSingleBlocks{N}
9+
blocksize::NTuple{N, Int}
10+
end
11+
MPIBlocks(xs::Int...) = MPIBlocks(xs)
12+
13+
function Dagger.distribute(::Type{A},
14+
x::Union{AbstractArray, Nothing},
15+
dist::MPIBlocks,
16+
comm::MPI.Comm=MPI.COMM_WORLD,
17+
root::Integer=0) where {A<:AbstractArray{T, N}} where {T, N}
18+
isroot = MPI.Comm_rank(comm) == root
19+
20+
# TODO: Make better load balancing
21+
22+
data = Array{T, N}(undef, dist.blocksize)
23+
if isroot
24+
cs = Array{T, N}(undef, size(x))
25+
parts = partition(dist, domain(x))
26+
idx = 1
27+
for part in parts
28+
cs[idx:(idx - 1 + prod(dist.blocksize))] = x[part]
29+
idx += prod(dist.blocksize)
30+
end
31+
MPI.Scatter!(MPI.UBuffer(cs, div(length(cs), MPI.Comm_size(comm))), data, comm, root=root)
32+
else
33+
MPI.Scatter!(nothing, data, comm, root=root)
34+
end
35+
36+
data = Dagger.tochunk(data)
37+
38+
return Dagger.DArray(T, domain(data), domain(data), data, dist)
39+
end
40+
41+
function Dagger.distribute(::Type{A},
42+
dist::MPIBlocks,
43+
comm::MPI.Comm=MPI.COMM_WORLD,
44+
root::Integer=0) where {A<:AbstractArray{T, N}} where {T, N}
45+
return distribute(A, nothing, dist, comm, root)
46+
end
47+
48+
function Dagger.distribute(x::AbstractArray,
49+
dist::MPIBlocks,
50+
comm::MPI.Comm=MPI.COMM_WORLD,
51+
root::Integer=0)
52+
return distribute(typeof(x), x, dist, comm, root)
53+
end
54+
55+
function Base.reduce(f::Function, x::Dagger.DArray{T,N,MPIBlocks{N}};
56+
dims=nothing,
57+
comm=MPI.COMM_WORLD, root=nothing, acrossranks::Bool=true) where {T,N}
58+
if dims === nothing
59+
if !acrossranks
60+
return fetch(Dagger.reduce_async(f,x))
61+
elseif root === nothing
62+
return MPI.Allreduce(fetch(Dagger.reduce_async(f,x)), f, comm)
63+
else
64+
return MPI.Reduce(fetch(Dagger.reduce_async(f,x)), f, comm; root)
65+
end
66+
else
67+
if dims isa Int
68+
dims = (dims,)
69+
end
70+
d = reduce(x.domain, dims=dims)
71+
ds = reduce(x.subdomains[1], dims=dims)
72+
if !acrossranks
73+
thunks = Dagger.spawn(b->reduce(f, b, dims=dims), x.chunks[1])
74+
return Dagger.DArray(T, d, ds, thunks, x.partitioning; concat=x.concat)
75+
else
76+
tmp = collect(reduce(f, x, comm=comm, root=root, dims=dims, acrossranks=false))
77+
if root === nothing
78+
h = UInt(0)
79+
for dim in 1:N
80+
if dim in dims
81+
continue
82+
end
83+
h = hash(x.subdomains[1].indexes[dim], h)
84+
end
85+
h = abs(Base.unsafe_trunc(Int32, h))
86+
newc = MPI.Comm_split(comm, h, MPI.Comm_rank(comm))
87+
chunks = Dagger.tochunk(reshape(MPI.Allreduce(tmp, f, newc), size(tmp)))
88+
else
89+
rcvbuf = MPI.Reduce(tmp, f, comm; root)
90+
if root != MPI.Comm_rank(comm)
91+
return nothing
92+
end
93+
chunks = Dagger.tochunk(reshape(rcvbuf, size(tmp)))
94+
end
95+
return Dagger.DArray(T, d, ds, chunks, x.partitioning; concat=x.concat)
96+
end
97+
end
98+
end
99+
100+
end # module

0 commit comments

Comments
 (0)