Skip to content

Interleaved computation with communication in halo exchange #881

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 144 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
144 commits
Select commit Hold shift + click to select a range
25243a9
added count to mhp algorithms
Jul 31, 2024
c950ea3
Merge branch 'main' of https://github.com/oneapi-src/distributed-rang…
Jul 31, 2024
7eec868
minor fix
Jul 31, 2024
6090fdc
minor fixes
Jul 31, 2024
167702d
code review fixes
Aug 2, 2024
755c896
more code review fixes
Aug 5, 2024
e98de3b
removed redundant conditional
Aug 16, 2024
f31b80c
fixes according to pre-commit checks
Aug 25, 2024
b847d04
Merge branch 'main' of https://github.com/oneapi-src/distributed-rang…
Sep 3, 2024
06cc78b
Merge branch 'main' of https://github.com/oneapi-src/distributed-rang…
Sep 26, 2024
5511751
added cyclic_halo_impl and distributed_vector_dual
Nov 11, 2024
b26665e
Merge pull request #1 from quazuo/count
quazuo Nov 11, 2024
e1e9910
added dual_segment and refined dual_distributed_vector
Dec 2, 2024
bdecda7
progress
Dec 25, 2024
9e81fd7
Merge remote-tracking branch 'upstream/main'
Dec 25, 2024
811307c
tiny fix
Dec 25, 2024
a5fdcf5
Merge branch 'main' of https://github.com/quazuo/distributed-ranges
Dec 25, 2024
5353689
prog
Dec 25, 2024
6701c41
prog
Dec 25, 2024
f79fe45
prog
Dec 25, 2024
b126f8b
prog
Dec 25, 2024
e1d50c9
prog
Dec 25, 2024
0da1f2c
prog
Dec 25, 2024
871bd58
prog
Dec 27, 2024
2e3c96d
prog
Dec 27, 2024
dff502a
prog
Dec 27, 2024
89f9c18
prog
Dec 27, 2024
2e4cc88
prog
Dec 27, 2024
9c37e12
prog
Dec 27, 2024
78cbd29
prog
Dec 27, 2024
e18f59a
prog
Dec 27, 2024
3329c97
prog
Dec 27, 2024
a0e067d
prog
Dec 27, 2024
fb05c7d
prog
Dec 27, 2024
ea43af7
prog
Dec 27, 2024
4fd4d8f
prog
Dec 27, 2024
95ca6cf
prog
Dec 27, 2024
900613e
prog
Dec 27, 2024
9218cd8
prog
Dec 27, 2024
46bfa65
prog
Dec 27, 2024
3d7a9a8
prog
Dec 27, 2024
2603a6c
prog
Dec 27, 2024
cdce405
prog
Dec 27, 2024
46f6ade
prog
Jan 1, 2025
e4eafa2
prog
Jan 1, 2025
ad460d8
prog
Jan 1, 2025
bd1e8ed
prog
Jan 1, 2025
8d0f5be
prog
Jan 1, 2025
046b7e4
prog
Jan 1, 2025
5c32e9e
prog
Jan 1, 2025
6e95e27
prog
Jan 2, 2025
46dff8e
prog
Jan 2, 2025
b842cc7
prog
Jan 2, 2025
d43b9d3
prog
Jan 2, 2025
1b6b21b
prog
Jan 2, 2025
d333564
prog
Jan 2, 2025
8690c17
prog
Jan 2, 2025
3860947
prog
Jan 2, 2025
6dcd2f4
prog
Jan 2, 2025
df55977
prog
Jan 2, 2025
4b0f293
prog
Jan 2, 2025
0fe709c
prog
Jan 2, 2025
05fa8f6
prog
Jan 7, 2025
0d94948
prog
Jan 7, 2025
33a1d4f
prog
Jan 7, 2025
8dd8a00
prog
Jan 7, 2025
d04461d
prog
Jan 13, 2025
497eb8c
prog
Jan 13, 2025
29759f1
prog
Jan 21, 2025
73c78a4
prog
Jan 27, 2025
02c92d8
prog
Feb 1, 2025
6d36592
prog
Feb 1, 2025
5a48300
prog
Feb 1, 2025
2955cdc
prog
Feb 1, 2025
d5ad221
prog
Feb 1, 2025
b02102f
prog
Feb 1, 2025
8d798eb
prog
Feb 1, 2025
d658d77
prog
Feb 1, 2025
4e33bd0
prog
Feb 1, 2025
6d108c4
prog
Feb 1, 2025
381159f
prog
Feb 1, 2025
3d2edda
prog
Feb 10, 2025
ccc1033
prog
Feb 10, 2025
3723fef
prog
Feb 10, 2025
6e2f48b
prog
Feb 10, 2025
741b0d8
prog
Feb 11, 2025
a1483a2
prog
Feb 18, 2025
ffc698d
prog
Feb 25, 2025
e69d371
prog
Mar 4, 2025
92baecf
prog
Mar 4, 2025
232b66f
prog
Mar 4, 2025
7f6adc6
prog
Mar 4, 2025
f1dd4c1
prog
Mar 4, 2025
71bff7d
prog
Mar 10, 2025
8b6dc9d
prog
Mar 10, 2025
73ffe2d
prog
Mar 12, 2025
4790936
prog
Mar 12, 2025
c25edce
prog
Mar 13, 2025
68cadd5
prog
Mar 13, 2025
7efc1dd
prog
Mar 13, 2025
bfcad1a
prog
Apr 7, 2025
6a57340
prog
Apr 22, 2025
b9f85f4
prog
Apr 30, 2025
c9f663f
prog
Apr 30, 2025
91bd445
prog
Apr 30, 2025
f17cb40
prog
May 1, 2025
c3fe0df
prog
May 1, 2025
3b2a249
prog
May 6, 2025
a0e6d1a
prog
May 6, 2025
ce168f8
prog
May 6, 2025
f49db0a
prog
May 6, 2025
650ef40
prog
May 6, 2025
ecd682b
prog
May 6, 2025
459ab92
prog
May 6, 2025
c404bf5
prog
May 6, 2025
c30485c
prog
May 6, 2025
81f646a
prog
May 6, 2025
2f8558a
prog
May 6, 2025
b517be9
prog
May 6, 2025
eef815c
prog
May 6, 2025
0c8b1df
prog
May 6, 2025
cfa249e
prog
May 8, 2025
0afa41b
prog
May 8, 2025
a8063ea
prog
May 8, 2025
175843b
prog
May 8, 2025
62b282f
prog
May 8, 2025
6cd3a48
prog
May 8, 2025
6b363de
prog
May 8, 2025
a71921b
prog
May 8, 2025
3c897e4
prog
May 8, 2025
f3f5b05
prog
May 8, 2025
bbcc376
prog
May 12, 2025
d26e451
prog
May 12, 2025
5c2a3df
prog
May 12, 2025
8811883
prog
May 12, 2025
ba753d3
prog
May 12, 2025
8deeb5d
prog
May 12, 2025
4fdb6bc
prog
May 14, 2025
ad88358
prog
May 14, 2025
af75a5c
prog
May 14, 2025
cd257c9
prog
May 14, 2025
0a3c1ee
prog
May 14, 2025
73c3b05
prog
Jun 5, 2025
c54b47f
prog
Jun 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/dr/mp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,4 @@
#include <dr/mp/algorithms/transpose.hpp>
#include <dr/mp/containers/distributed_vector.hpp>
#include <dr/mp/containers/distributed_mdarray.hpp>
#include <dr/mp/containers/dual_distributed_vector.hpp>
48 changes: 47 additions & 1 deletion include/dr/mp/algorithms/for_each.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,53 @@

namespace dr::mp {

/// Collective for_each on distributed range
// the concept doesn't work yet... for some reason
template <typename R>
concept dual_vector_range =
dr::distributed_range<R> && requires(R &r) { dr::ranges::segments(r)[0].is_compute(); };

void partial_for_each(dual_vector_range auto &&dr, auto op) {
dr::drlog.debug(dr::logger::for_each, "partial_for_each: parallel execution\n");
if (rng::empty(dr)) {
return;
}

auto is_local = [](const auto &segment) {
return dr::ranges::rank(segment) == default_comm().rank();
};

for (auto &seg : dr::ranges::segments(dr) | rng::views::filter(is_local)) {
if (!seg.is_compute()) {
seg.swap_state();
continue;
}

auto b = dr::ranges::local(rng::begin(seg));
auto s = rng::subrange(b, b + rng::distance(seg));

if (mp::use_sycl()) {
dr::drlog.debug(" using sycl\n");

assert(rng::distance(s) > 0);
#ifdef SYCL_LANGUAGE_VERSION
dr::__detail::parallel_for(
dr::mp::sycl_queue(), sycl::range<1>(rng::distance(s)),
[first = rng::begin(s), op](auto idx) { op(first[idx]); })
.wait();
#else
assert(false);
#endif
} else {
dr::drlog.debug(" using cpu\n");
rng::for_each(s, op);
}

seg.swap_state();
}
barrier();
}

// Collective for_each on distributed range
void for_each(dr::distributed_range auto &&dr, auto op) {
dr::drlog.debug(dr::logger::for_each, "for_each: parallel execution\n");
if (rng::empty(dr)) {
Expand Down
338 changes: 338 additions & 0 deletions include/dr/mp/containers/dual_distributed_vector.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,338 @@
// SPDX-FileCopyrightText: Intel Corporation
//
// SPDX-License-Identifier: BSD-3-Clause

#pragma once

#include <dr/mp/allocator.hpp>
#include <dr/mp/containers/distribution.hpp>
#include <dr/mp/containers/dual_segment.hpp>

namespace dr::mp {

static constexpr std::size_t DUAL_SEGMENTS_PER_PROC = 2;

class DualMpiBackend {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the difference between DualMPiBackend and MpiBackend types? if none, please use one type

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a leftover from when I was experimenting with changing some code in the backend, thanks for pointing it out

dr::rma_window win_;

public:
void *allocate(std::size_t data_size) {
assert(data_size > 0);
void *data = __detail::allocator<std::byte>().allocate(data_size);
DRLOG("called MPI allocate({}) -> got:{}", data_size, data);
win_.create(default_comm(), data, data_size);
active_wins().insert(win_.mpi_win());
return data;
}

void deallocate(void *data, std::size_t data_size) {
assert(data_size > 0);
DRLOG("calling MPI deallocate ({}, data_size:{})", data, data_size);
active_wins().erase(win_.mpi_win());
win_.free();
__detail::allocator<std::byte>().deallocate(static_cast<std::byte *>(data),
data_size);
}

void getmem(void *dst, std::size_t offset, std::size_t datalen,
int segment_index) {
const std::size_t peer = get_peer(segment_index);

DRLOG("calling MPI get(dst:{}, "
"segm_offset:{}, size:{}, peer:{})",
dst, offset, datalen, peer);

#if (MPI_VERSION >= 4) || \
(defined(I_MPI_NUMVERSION) && (I_MPI_NUMVERSION > 20211200000))
// 64-bit API inside
win_.get(dst, datalen, peer, offset);
#else
for (std::size_t remainder = datalen, off = 0UL; remainder > 0;) {
std::size_t s = std::min(remainder, (std::size_t)INT_MAX);
DRLOG("{}:{} win_.get total {} now {} bytes at off {}, dst offset {}",
default_comm().rank(), __LINE__, datalen, s, off, offset + off);
win_.get((uint8_t *)dst + off, s, peer, offset + off);
off += s;
remainder -= s;
}
#endif
}

void putmem(void const *src, std::size_t offset, std::size_t datalen,
int segment_index) {
const std::size_t peer = get_peer(segment_index);

DRLOG("calling MPI put(segm_offset:{}, "
"src:{}, size:{}, peer:{})",
offset, src, datalen, peer);

#if (MPI_VERSION >= 4) || \
(defined(I_MPI_NUMVERSION) && (I_MPI_NUMVERSION > 20211200000))
// 64-bit API inside
win_.put(src, datalen, peer, offset);
#else
for (std::size_t remainder = datalen, off = 0UL; remainder > 0;) {
std::size_t s = std::min(remainder, (std::size_t)INT_MAX);
DRLOG("{}:{} win_.put {} bytes at off {}, dst offset {}",
default_comm().rank(), __LINE__, s, off, offset + off);
win_.put((uint8_t *)src + off, s, peer, offset + off);
off += s;
remainder -= s;
}
#endif
}

std::size_t getrank() { return win_.communicator().rank(); }

void fence() { win_.fence(); }

private:
std::size_t get_peer(const std::size_t segment_index) {
const auto size = win_.communicator().size();
return segment_index < size ? segment_index : 2 * size - segment_index - 1;
}
};

/// distributed vector
template <typename T, class BackendT = DualMpiBackend>
class dual_distributed_vector {

public:
using value_type = T;
using size_type = std::size_t;
using difference_type = std::ptrdiff_t;
using backend_type = BackendT;

class iterator {
public:
using iterator_category = std::random_access_iterator_tag;
using value_type = typename dual_distributed_vector::value_type;
using difference_type = typename dual_distributed_vector::difference_type;

iterator() {}
iterator(const dual_distributed_vector *parent, difference_type offset)
: parent_(parent), offset_(offset) {}

auto operator+(difference_type n) const {
return iterator(parent_, offset_ + n);
}
friend auto operator+(difference_type n, const iterator &other) {
return other + n;
}
auto operator-(difference_type n) const {
return iterator(parent_, offset_ - n);
}
auto operator-(iterator other) const { return offset_ - other.offset_; }

auto &operator+=(difference_type n) {
offset_ += n;
return *this;
}
auto &operator-=(difference_type n) {
offset_ -= n;
return *this;
}
auto &operator++() {
offset_++;
return *this;
}
auto operator++(int) {
auto old = *this;
offset_++;
return old;
}
auto &operator--() {
offset_--;
return *this;
}
auto operator--(int) {
auto old = *this;
offset_--;
return old;
}

bool operator==(iterator other) const {
if (parent_ == nullptr || other.parent_ == nullptr) {
return false;
} else {
return offset_ == other.offset_;
}
}
auto operator<=>(iterator other) const {
assert(parent_ == other.parent_);
return offset_ <=> other.offset_;
}

auto operator*() const {
auto segment_size = parent_->segment_size_;
return parent_
->segments()[offset_ / segment_size][offset_ % segment_size];
}
auto operator[](difference_type n) const { return *(*this + n); }

auto local() {
auto segment_size = parent_->segment_size_;
return (parent_->segments()[offset_ / segment_size].begin() +
offset_ % segment_size)
.local();
}

//
// Support for distributed ranges
//
// distributed iterator provides segments
// remote iterator provides local
//
auto segments() {
return dr::__detail::drop_segments(parent_->segments(), offset_);
}

private:
const dual_distributed_vector *parent_ = nullptr;
difference_type offset_;
};

// Do not copy
// We need a move constructor for the implementation of reduce algorithm
dual_distributed_vector(const dual_distributed_vector &) = delete;
dual_distributed_vector &operator=(const dual_distributed_vector &) = delete;
dual_distributed_vector(dual_distributed_vector &&) { assert(false); }

/// Constructor
dual_distributed_vector(std::size_t size = 0,
distribution dist = distribution()) {
init(size, dist);
}

/// Constructor
dual_distributed_vector(std::size_t size, value_type fill_value,
distribution dist = distribution()) {
init(size, dist);
mp::fill(*this, fill_value);
}

~dual_distributed_vector() {
if (finalized()) return;

for (size_t i = 0; i < DUAL_SEGMENTS_PER_PROC; i++) {
fence(i);

if (datas_[i] != nullptr) {
backends_[i].deallocate(datas_[i], data_size_ * sizeof(value_type));
}

delete halos_[i];
}

delete halo_;
}

/// Returns iterator to beginning
auto begin() const { return iterator(this, 0); }
/// Returns iterator to end
auto end() const { return begin() + size_; }

/// Returns size
auto size() const { return size_; }
/// Returns reference using index
auto operator[](difference_type n) const { return *(begin() + n); }

auto &halo() const { return *halo_; }

auto segments() const { return rng::views::all(segments_); }
auto segments() { return rng::views::all(segments_); }

__attribute__((unused))
void fence(const std::size_t i) { backends_[i].fence(); }

auto res_idx(const std::size_t segment_index) const {
return segment_index < default_comm().size() ? 0 : 1;
}

backend_type& backend(const std::size_t segment_index) {
return backends_[res_idx(segment_index)];
}
const backend_type& backend(const std::size_t segment_index) const {
return backends_[res_idx(segment_index)];
}

T *data(const std::size_t segment_index) {
return datas_[res_idx(segment_index)];
}

std::size_t data_size() const { return data_size_; }

private:
void init(auto size, auto dist) {
size_ = size;
distribution_ = dist;

// determine the distribution of data
auto comm_size = default_comm().size(); // dr-style ignore
auto hb = dist.halo();
std::size_t gran = dist.granularity();
// TODO: make this an error that is reported back to user
assert(size % gran == 0 && "size must be a multiple of the granularity");
assert(hb.prev % gran == 0 && "size must be a multiple of the granularity");
assert(hb.next % gran == 0 && "size must be a multiple of the granularity");

std::size_t segment_count = comm_size * DUAL_SEGMENTS_PER_PROC;
auto proc_segments_size = gran * std::max({
(size / gran + segment_count - 1) / segment_count,
hb.prev / gran,
hb.next / gran});
segment_size_ = proc_segments_size;

std::size_t actual_segment_count_ =
size_ / segment_size_ + (size_ % segment_size_ == 0 ? 0 : 1);
assert(actual_segment_count_ <= segment_count
&& "there must be at most 2 segments per process");

data_size_ = segment_size_ + hb.prev + hb.next;

for (std::size_t i = 0; i < DUAL_SEGMENTS_PER_PROC; i++) {
if (size_ > 0) {
datas_.push_back(static_cast<T *>(backends_[i].allocate(data_size_ * sizeof(value_type))));
std::memset(datas_[i], 69, data_size_ * sizeof(value_type));
halos_.push_back(new dual_span_halo<T>(default_comm(), datas_[i], data_size_, hb, i == 1));
}
}

halo_ = new cyclic_span_halo<T>(halos_);

std::size_t segment_index = 0;
for (std::size_t i = 0; i < size; i += segment_size_) {
segments_.emplace_back(this, segment_index++,
std::min(segment_size_, size - i), data_size_);
}

for (size_t i = 0; i < default_comm().size(); i++) {
segments_[default_comm().size() + i].swap_state();
}

for (size_t i = 0; i < DUAL_SEGMENTS_PER_PROC; i++) {
fence(i);
}
}

friend dual_dv_segment_iterator<dual_distributed_vector>;

std::size_t segment_size_ = 0;
std::size_t data_size_ = 0; // size + halo

std::vector<dual_span_halo<T> *> halos_;
std::vector<T *> datas_;
cyclic_span_halo<T> *halo_;

distribution distribution_;
std::size_t size_;
std::vector<dual_dv_segment<dual_distributed_vector>> segments_;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if these 2 lines are the only differences between ordinary and dual vector, then let's pass different template parameters and make segment type a template paramter

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

init() is different, also there are multiple pointers to local segments (std::vector<T *> datas_) and there are additional member functions that return the local segment currently suited for computation. Although I agree that the two classes might be merged, as long as it's still not fully developed I'd rather keep it split and merge them a bit later when it fully works.

std::vector<backend_type> backends_{DUAL_SEGMENTS_PER_PROC};
};

template <typename T, typename B>
auto &halo(const dual_distributed_vector<T, B> &dv) {
return dv.halo();
}

} // namespace dr::mp
Loading