Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 80 additions & 171 deletions src/uct/cuda/cuda_ipc/cuda_ipc.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#include <cuda/atomic>

#define UCT_CUDA_IPC_IS_ALIGNED_POW2(_n, _p) (!((_n) & ((_p) - 1)))
#define UCT_CUDA_IPC_WARP_SIZE 32
#define UCT_CUDA_IPC_COPY_LOOP_UNROLL 8

UCS_F_DEVICE int4 uct_cuda_ipc_ld_global_cg(const int4* p)
Expand Down Expand Up @@ -60,8 +59,8 @@ uct_cuda_ipc_get_lane(unsigned &lane_id, unsigned &num_lanes)
num_lanes = 1;
break;
case UCS_DEVICE_LEVEL_WARP:
lane_id = threadIdx.x % UCT_CUDA_IPC_WARP_SIZE;
num_lanes = UCT_CUDA_IPC_WARP_SIZE;
lane_id = threadIdx.x % UCS_DEVICE_NUM_THREADS_IN_WARP;
num_lanes = UCS_DEVICE_NUM_THREADS_IN_WARP;
break;
case UCS_DEVICE_LEVEL_BLOCK:
lane_id = threadIdx.x;
Expand All @@ -74,19 +73,6 @@ uct_cuda_ipc_get_lane(unsigned &lane_id, unsigned &num_lanes)
}
}

UCS_F_DEVICE void* uct_cuda_ipc_map_remote(const uct_cuda_ipc_device_mem_element_t* elem,
uint64_t remote_address)
{
return reinterpret_cast<void*>((uintptr_t)remote_address + elem->mapped_offset);
}

UCS_F_DEVICE void uct_cuda_ipc_atomic_inc(uint64_t *dst, uint64_t inc_value)
{
cuda::atomic_ref<uint64_t, cuda::thread_scope_system> dst_ref{*dst};
dst_ref.fetch_add(inc_value, cuda::memory_order_relaxed);
cuda::atomic_thread_fence(cuda::memory_order_release, cuda::thread_scope_system);
}

template<ucs_device_level_t level>
UCS_F_DEVICE void uct_cuda_ipc_level_sync()
{
Expand All @@ -107,186 +93,109 @@ UCS_F_DEVICE void uct_cuda_ipc_level_sync()
return;
}

template<ucs_device_level_t level>
UCS_F_DEVICE void uct_cuda_ipc_copy_level(void *dst, const void *src, size_t len);

template<>
void uct_cuda_ipc_copy_level<UCS_DEVICE_LEVEL_THREAD>(void *dst, const void *src, size_t len)
{
memcpy(dst, src, len);
}

template<>
void uct_cuda_ipc_copy_level<UCS_DEVICE_LEVEL_WARP>(void *dst, const void *src, size_t len)
template<typename vec_t>
UCS_F_DEVICE void uct_cuda_ipc_try_copy_aligned(const char* &src, char* &dst,
size_t &len,
unsigned warp_id,
unsigned num_warps,
unsigned lane_id,
unsigned num_lanes)
{
using vec4 = int4;
using vec2 = int2;
unsigned int lane_id, num_lanes;
constexpr size_t vec_size = sizeof(vec_t);

uct_cuda_ipc_get_lane<UCS_DEVICE_LEVEL_WARP>(lane_id, num_lanes);
auto s1 = reinterpret_cast<const char*>(src);
auto d1 = reinterpret_cast<char *>(dst);
if (!(UCT_CUDA_IPC_IS_ALIGNED_POW2((intptr_t)src, vec_size) &&
UCT_CUDA_IPC_IS_ALIGNED_POW2((intptr_t)dst, vec_size))) {
return;
}

/* 16B-aligned fast path using vec4 */
if (UCT_CUDA_IPC_IS_ALIGNED_POW2((intptr_t)s1, sizeof(vec4)) &&
UCT_CUDA_IPC_IS_ALIGNED_POW2((intptr_t)d1, sizeof(vec4))) {
const vec4 *s4 = reinterpret_cast<const vec4*>(s1);
vec4 *d4 = reinterpret_cast<vec4*>(d1);
size_t n4 = len / sizeof(vec4);
for (size_t i = lane_id; i < n4; i += num_lanes) {
vec4 v = uct_cuda_ipc_ld_global_cg(s4 + i);
uct_cuda_ipc_st_global_cg(d4 + i, v);
auto src_vec = reinterpret_cast<const vec_t*>(src);
auto dst_vec = reinterpret_cast<vec_t*>(dst);
constexpr size_t lanes_unroll = UCS_DEVICE_NUM_THREADS_IN_WARP *
UCT_CUDA_IPC_COPY_LOOP_UNROLL;
size_t num_vectors = (len / (lanes_unroll * vec_size)) *
lanes_unroll;

for (size_t vec = warp_id * lanes_unroll + lane_id % UCS_DEVICE_NUM_THREADS_IN_WARP;
vec < num_vectors;
vec += num_warps * lanes_unroll) {
vec_t tmp[UCT_CUDA_IPC_COPY_LOOP_UNROLL];
#pragma unroll
for (int i = 0; i < UCT_CUDA_IPC_COPY_LOOP_UNROLL; i++) {
tmp[i] = uct_cuda_ipc_ld_global_cg(
src_vec + (vec + UCS_DEVICE_NUM_THREADS_IN_WARP * i));
}

len = len - n4 * sizeof(vec4);
if (len == 0) {
return;
#pragma unroll
for (int i = 0; i < UCT_CUDA_IPC_COPY_LOOP_UNROLL; i++) {
uct_cuda_ipc_st_global_cg(
dst_vec + (vec + UCS_DEVICE_NUM_THREADS_IN_WARP * i), tmp[i]);
}

s1 = reinterpret_cast<const char*>(s4 + n4);
d1 = reinterpret_cast<char*>(d4 + n4);
}

/* 8B-aligned fast path using vec2 */
if (UCT_CUDA_IPC_IS_ALIGNED_POW2((intptr_t)s1, sizeof(vec2)) &&
UCT_CUDA_IPC_IS_ALIGNED_POW2((intptr_t)d1, sizeof(vec2))) {
const vec2 *s2 = reinterpret_cast<const vec2*>(s1);
vec2 *d2 = reinterpret_cast<vec2*>(d1);
size_t n2 = len / sizeof(vec2);
for (size_t i = lane_id; i < n2; i += num_lanes) {
vec2 v2 = uct_cuda_ipc_ld_global_cg(s2 + i);
uct_cuda_ipc_st_global_cg(d2 + i, v2);
}
src_vec += num_vectors;
dst_vec += num_vectors;
len = len - num_vectors * vec_size;

len = len - n2 * sizeof(vec2);
if (len == 0) {
return;
}

s1 = reinterpret_cast<const char*>(s2 + n2);
d1 = reinterpret_cast<char*>(d2 + n2);
num_vectors = len / vec_size;
for (size_t vec = lane_id; vec < num_vectors; vec += num_lanes) {
vec_t v = uct_cuda_ipc_ld_global_cg(src_vec + vec);
uct_cuda_ipc_st_global_cg(dst_vec + vec, v);
}

/* byte tail */
for (size_t i = lane_id; i < len; i += num_lanes) {
d1[i] = s1[i];
}
len -= num_vectors * vec_size;
src = reinterpret_cast<const char*>(src_vec + num_vectors);
dst = reinterpret_cast<char*>(dst_vec + num_vectors);
}

template<>
void uct_cuda_ipc_copy_level<UCS_DEVICE_LEVEL_BLOCK>(void *dst, const void *src, size_t len)
UCS_F_DEVICE void*
uct_cuda_ipc_map_remote(const uct_cuda_ipc_device_mem_element_t* elem,
uint64_t remote_address)
{
using vec4 = int4;
using vec2 = int2;
auto s1 = reinterpret_cast<const char*>(src);
auto d1 = reinterpret_cast<char *>(dst);
const vec4 *s4;
vec4 *d4;
int warp, num_warps, idx;
size_t num_lines;

if (UCT_CUDA_IPC_IS_ALIGNED_POW2((intptr_t)s1, sizeof(vec4)) &&
UCT_CUDA_IPC_IS_ALIGNED_POW2((intptr_t)d1, sizeof(vec4))) {
vec4 tmp[UCT_CUDA_IPC_COPY_LOOP_UNROLL];
warp = threadIdx.x / UCT_CUDA_IPC_WARP_SIZE;
num_warps = blockDim.x / UCT_CUDA_IPC_WARP_SIZE;
idx = threadIdx.x % UCT_CUDA_IPC_WARP_SIZE;
s4 = reinterpret_cast<const vec4*>(s1);
d4 = reinterpret_cast<vec4*>(d1);
num_lines = (len / (UCT_CUDA_IPC_WARP_SIZE * UCT_CUDA_IPC_COPY_LOOP_UNROLL * sizeof(vec4))) *
(UCT_CUDA_IPC_WARP_SIZE * UCT_CUDA_IPC_COPY_LOOP_UNROLL);

for (size_t line = warp * UCT_CUDA_IPC_WARP_SIZE * UCT_CUDA_IPC_COPY_LOOP_UNROLL + idx; line < num_lines;
line += num_warps * UCT_CUDA_IPC_WARP_SIZE * UCT_CUDA_IPC_COPY_LOOP_UNROLL) {
#pragma unroll
for (int i = 0; i < UCT_CUDA_IPC_COPY_LOOP_UNROLL; i++) {
tmp[i] = uct_cuda_ipc_ld_global_cg(s4 + (line + UCT_CUDA_IPC_WARP_SIZE * i));
}

#pragma unroll
for (int i = 0; i < UCT_CUDA_IPC_COPY_LOOP_UNROLL; i++) {
uct_cuda_ipc_st_global_cg(d4 + (line + UCT_CUDA_IPC_WARP_SIZE * i), tmp[i]);
}
}
len = len - num_lines * sizeof(vec4);
if (len == 0) {
return;
}

s4 = s4 + num_lines;
d4 = d4 + num_lines;
num_lines = len / sizeof(vec4);
for (size_t line = threadIdx.x; line < num_lines; line += blockDim.x) {
vec4 v = uct_cuda_ipc_ld_global_cg(s4 + line);
uct_cuda_ipc_st_global_cg(d4 + line, v);
}

len = len - num_lines * sizeof(vec4);
if (len == 0) {
return;
}

s1 = reinterpret_cast<const char*>(s4 + num_lines);
d1 = reinterpret_cast<char*>(d4 + num_lines);
}

/* If not 16B-aligned, try 8B-aligned fast path using vec2 */
if (UCT_CUDA_IPC_IS_ALIGNED_POW2((intptr_t)s1, sizeof(vec2)) &&
UCT_CUDA_IPC_IS_ALIGNED_POW2((intptr_t)d1, sizeof(vec2))) {
const vec2 *s2;
vec2 *d2;
vec2 tmp2[UCT_CUDA_IPC_COPY_LOOP_UNROLL];

warp = threadIdx.x / UCT_CUDA_IPC_WARP_SIZE;
num_warps = blockDim.x / UCT_CUDA_IPC_WARP_SIZE;
idx = threadIdx.x % UCT_CUDA_IPC_WARP_SIZE;
s2 = reinterpret_cast<const vec2*>(s1);
d2 = reinterpret_cast<vec2*>(d1);
num_lines = (len / (UCT_CUDA_IPC_WARP_SIZE * UCT_CUDA_IPC_COPY_LOOP_UNROLL * sizeof(vec2))) *
(UCT_CUDA_IPC_WARP_SIZE * UCT_CUDA_IPC_COPY_LOOP_UNROLL);

for (size_t line = warp * UCT_CUDA_IPC_WARP_SIZE * UCT_CUDA_IPC_COPY_LOOP_UNROLL + idx; line < num_lines;
line += num_warps * UCT_CUDA_IPC_WARP_SIZE * UCT_CUDA_IPC_COPY_LOOP_UNROLL) {
#pragma unroll
for (int i = 0; i < UCT_CUDA_IPC_COPY_LOOP_UNROLL; i++) {
tmp2[i] = uct_cuda_ipc_ld_global_cg(s2 + (line + UCT_CUDA_IPC_WARP_SIZE * i));
}
return reinterpret_cast<void*>((uintptr_t)remote_address + elem->mapped_offset);
}

#pragma unroll
for (int i = 0; i < UCT_CUDA_IPC_COPY_LOOP_UNROLL; i++) {
uct_cuda_ipc_st_global_cg(d2 + (line + UCT_CUDA_IPC_WARP_SIZE * i), tmp2[i]);
}
}
UCS_F_DEVICE void
uct_cuda_ipc_atomic_inc(uint64_t *dst, uint64_t inc_value)
{
cuda::atomic_ref<uint64_t, cuda::thread_scope_system> dst_ref{*dst};
dst_ref.fetch_add(inc_value, cuda::memory_order_relaxed);
cuda::atomic_thread_fence(cuda::memory_order_release, cuda::thread_scope_system);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it a subtle race to put this fence after the increment?
What if a reader acquires the atomic and reads the data that was supposed to be release by the fence before the actual fence is executed?

Overall, this code seem to be a redundant implementation of the following one-liner:
__nv_atomic_add(dst, inc_value, __NV_ATOMIC_RELEASE, __NV_THREAD_SCOPE_SYSTEM);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this pr doesn't change the previous implementation of atomic, it just adds unrolling to warp put operation.
Regarding the race, even if do it in different order, i.e. fence -> relaxed rmw it will not be enough since only lane 0 does it before calling level_sync. So yes, it could be a race, but depends on what guarantees ucx api provides

}

len = len - num_lines * sizeof(vec2);
if (len == 0) {
return;
}
template<ucs_device_level_t level>
UCS_F_DEVICE void uct_cuda_ipc_copy_level(void *dst, const void *src, size_t len)
{
auto s1 = reinterpret_cast<const char*>(src);
auto d1 = reinterpret_cast<char *>(dst);
unsigned int lane_id, num_lanes;

s2 = s2 + num_lines;
d2 = d2 + num_lines;
num_lines = len / sizeof(vec2);
for (size_t line = threadIdx.x; line < num_lines; line += blockDim.x) {
vec2 v2 = uct_cuda_ipc_ld_global_cg(s2 + line);
uct_cuda_ipc_st_global_cg(d2 + line, v2);
}
uct_cuda_ipc_get_lane<level>(lane_id, num_lanes);

len = len - num_lines * sizeof(vec2);
if (len == 0) {
return;
}
const unsigned warp_id = lane_id / UCS_DEVICE_NUM_THREADS_IN_WARP;
const unsigned num_warps = num_lanes / UCS_DEVICE_NUM_THREADS_IN_WARP;

s1 = reinterpret_cast<const char*>(s2 + num_lines);
d1 = reinterpret_cast<char*>(d2 + num_lines);
}
uct_cuda_ipc_try_copy_aligned<int4>(s1, d1, len, warp_id, num_warps,
lane_id, num_lanes);
uct_cuda_ipc_try_copy_aligned<int2>(s1, d1, len, warp_id, num_warps,
lane_id, num_lanes);

for (size_t line = threadIdx.x; line < len; line += blockDim.x) {
for (size_t line = lane_id; line < len; line += num_lanes) {
d1[line] = s1[line];
}
}

template<>
void uct_cuda_ipc_copy_level<UCS_DEVICE_LEVEL_GRID>(void *dst, const void *src, size_t len)
__device__ __forceinline__ void
uct_cuda_ipc_copy_level<UCS_DEVICE_LEVEL_THREAD>(void *dst, const void *src,
size_t len)
{
memcpy(dst, src, len);
}

template<>
__device__ __forceinline__ void
uct_cuda_ipc_copy_level<UCS_DEVICE_LEVEL_GRID>(void *dst, const void *src,
size_t len)
{/* not implemented */}

template<ucs_device_level_t level = UCS_DEVICE_LEVEL_BLOCK>
Expand Down
7 changes: 5 additions & 2 deletions test/gtest/ucp/test_ucp_device.cc
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ UCS_TEST_P(test_ucp_device, create_fail)
}

UCP_INSTANTIATE_TEST_CASE_TLS_GPU_AWARE(test_ucp_device, rc_gda, "rc,rc_gda")

UCP_INSTANTIATE_TEST_CASE_TLS_GPU_AWARE(test_ucp_device, cuda_ipc, "rc,cuda_copy,cuda_ipc")

class test_ucp_device_kernel : public test_ucp_device {
public:
Expand Down Expand Up @@ -347,7 +347,8 @@ UCS_TEST_P(test_ucp_device_kernel, local_counter)

UCP_INSTANTIATE_TEST_CASE_TLS_GPU_AWARE(test_ucp_device_kernel, rc_gda,
"rc,rc_gda")

UCP_INSTANTIATE_TEST_CASE_TLS_GPU_AWARE(test_ucp_device_kernel, cuda_ipc,
"rc,cuda_copy,cuda_ipc")

class test_ucp_device_xfer : public test_ucp_device_kernel {
public:
Expand Down Expand Up @@ -640,3 +641,5 @@ UCS_TEST_P(test_ucp_device_xfer, counter)

UCP_INSTANTIATE_TEST_CASE_TLS_GPU_AWARE(test_ucp_device_xfer, rc_gda,
"rc,rc_gda")
UCP_INSTANTIATE_TEST_CASE_TLS_GPU_AWARE(test_ucp_device_xfer, cuda_ipc,
"rc,cuda_copy,cuda_ipc")
1 change: 0 additions & 1 deletion test/gtest/uct/cuda/test_cuda_ipc_device.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ class test_cuda_ipc_rma : public uct_test {
CUdevice m_cuda_dev;
static const uint64_t SEED1 = 0xABClu;
static const uint64_t SEED2 = 0xDEFlu;
static const unsigned WARP_SIZE = 32;
};

UCS_TEST_P(test_cuda_ipc_rma, has_device_ep_capability)
Expand Down
Loading