Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 19 additions & 12 deletions src/uct/cuda/cuda_ipc/cuda_ipc_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,23 @@


typedef struct uct_cuda_ipc_cache_hash_key {
pid_t pid;
CUdevice cu_device;
pid_t pid;
CUdevice cu_device;
ucs_sys_ns_t pid_ns;
} uct_cuda_ipc_cache_hash_key_t;

static UCS_F_ALWAYS_INLINE int
uct_cuda_ipc_cache_hash_equal(uct_cuda_ipc_cache_hash_key_t key1,
uct_cuda_ipc_cache_hash_key_t key2)
{
return (key1.pid == key2.pid) && (key1.cu_device == key2.cu_device);
return (key1.pid == key2.pid) && (key1.cu_device == key2.cu_device) &&
(key1.pid_ns == key2.pid_ns);
}

static UCS_F_ALWAYS_INLINE khint32_t
uct_cuda_ipc_cache_hash_func(uct_cuda_ipc_cache_hash_key_t key)
{
return kh_int_hash_func((key.pid << 8) | key.cu_device);
return kh_int64_hash_func(((key.pid << 8) | key.cu_device) ^ key.pid_ns);
}

KHASH_INIT(cuda_ipc_rem_cache, uct_cuda_ipc_cache_hash_key_t,
Expand Down Expand Up @@ -444,9 +446,9 @@ static void uct_cuda_ipc_cache_invalidate_regions(uct_cuda_ipc_cache_t *cache,
cache->name, from, to);
}

static ucs_status_t
uct_cuda_ipc_get_remote_cache(pid_t pid, CUdevice cu_dev,
uct_cuda_ipc_cache_t **cache)
static ucs_status_t uct_cuda_ipc_get_remote_cache(pid_t pid, CUdevice cu_dev,
ucs_sys_ns_t pid_ns,
uct_cuda_ipc_cache_t **cache)
{
ucs_status_t status = UCS_OK;
char target_name[64];
Expand All @@ -458,6 +460,7 @@ uct_cuda_ipc_get_remote_cache(pid_t pid, CUdevice cu_dev,

key.cu_device = cu_dev;
key.pid = pid;
key.pid_ns = pid_ns;

khiter = kh_put(cuda_ipc_rem_cache, &uct_cuda_ipc_remote_cache.hash, key,
&khret);
Expand Down Expand Up @@ -487,6 +490,7 @@ uct_cuda_ipc_get_remote_cache(pid_t pid, CUdevice cu_dev,

ucs_status_t uct_cuda_ipc_unmap_memhandle(pid_t pid, uintptr_t d_bptr,
void *mapped_addr, CUdevice cu_dev,
ucs_sys_ns_t pid_ns,
int cache_enabled)
{
ucs_status_t status = UCS_OK;
Expand All @@ -501,7 +505,7 @@ ucs_status_t uct_cuda_ipc_unmap_memhandle(pid_t pid, uintptr_t d_bptr,
return UCS_OK;
}

status = uct_cuda_ipc_get_remote_cache(pid, cu_dev, &cache);
status = uct_cuda_ipc_get_remote_cache(pid, cu_dev, pid_ns, &cache);
if (status != UCS_OK) {
return status;
}
Expand Down Expand Up @@ -534,10 +538,11 @@ ucs_status_t uct_cuda_ipc_unmap_memhandle(pid_t pid, uintptr_t d_bptr,
}

UCS_PROFILE_FUNC(ucs_status_t, uct_cuda_ipc_map_memhandle,
(key, cu_dev, mapped_addr, log_level),
uct_cuda_ipc_rkey_t *key, CUdevice cu_dev, void **mapped_addr,
ucs_log_level_t log_level)
(ext_key, cu_dev, mapped_addr, log_level),
uct_cuda_ipc_extended_rkey_t *ext_key, CUdevice cu_dev,
void **mapped_addr, ucs_log_level_t log_level)
{
uct_cuda_ipc_rkey_t *key = &ext_key->super;
uct_cuda_ipc_cache_t *cache;
ucs_status_t status;
ucs_pgt_region_t *pgt_region;
Expand All @@ -551,6 +556,7 @@ UCS_PROFILE_FUNC(ucs_status_t, uct_cuda_ipc_map_memhandle,
}

if ((getpid() == key->pid) &&
(ucs_sys_get_ns(UCS_SYS_NS_TYPE_PID) == ext_key->pid_ns) &&
(memcmp(uuid.bytes, key->uuid.bytes, sizeof(uuid.bytes)) == 0)) {
/* TODO: added for test purpose to enable cuda_ipc tests in gtest
* mapped addrr is set to be same as d_bptr avoiding any calls to
Expand All @@ -561,7 +567,8 @@ UCS_PROFILE_FUNC(ucs_status_t, uct_cuda_ipc_map_memhandle,
return UCS_OK;
}

status = uct_cuda_ipc_get_remote_cache(key->pid, cu_dev, &cache);
status = uct_cuda_ipc_get_remote_cache(key->pid, cu_dev, ext_key->pid_ns,
&cache);
if (status != UCS_OK) {
return status;
}
Expand Down
7 changes: 4 additions & 3 deletions src/uct/cuda/cuda_ipc/cuda_ipc_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,14 @@ void uct_cuda_ipc_destroy_cache(uct_cuda_ipc_cache_t *cache);
*
* @return UCS_OK on success, or error status on failure
*/
ucs_status_t
uct_cuda_ipc_map_memhandle(uct_cuda_ipc_rkey_t *key, CUdevice cu_dev,
void **mapped_addr, ucs_log_level_t log_level);
ucs_status_t uct_cuda_ipc_map_memhandle(uct_cuda_ipc_extended_rkey_t *key,
CUdevice cu_dev, void **mapped_addr,
ucs_log_level_t log_level);


ucs_status_t uct_cuda_ipc_unmap_memhandle(pid_t pid, uintptr_t d_bptr,
void *mapped_addr, CUdevice cu_dev,
ucs_sys_ns_t pid_ns,
int cache_enabled);

#endif
10 changes: 6 additions & 4 deletions src/uct/cuda/cuda_ipc/cuda_ipc_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,10 @@ uct_cuda_ipc_post_cuda_async_copy(uct_ep_h tl_ep, uint64_t remote_addr,
goto out;
}

offset = (uintptr_t)remote_addr - (uintptr_t)key->super.d_bptr;
offset = (uintptr_t)remote_addr -
(uintptr_t)key->super.super.d_bptr;
mapped_rem_addr = (void *) ((uintptr_t) mapped_addr + offset);
ucs_assert(offset <= key->super.b_len);
ucs_assert(offset <= key->super.super.b_len);

/* round-robin */
q_desc = &ctx_rsc->queue_desc[key->stream_id % iface->config.max_streams];
Expand Down Expand Up @@ -184,8 +185,9 @@ uct_cuda_ipc_post_cuda_async_copy(uct_ep_h tl_ep, uint64_t remote_addr,
ucs_queue_push(&q_desc->event_queue, &cuda_ipc_event->super.queue);
cuda_ipc_event->super.comp = comp;
cuda_ipc_event->mapped_addr = mapped_addr;
cuda_ipc_event->d_bptr = (uintptr_t)key->super.d_bptr;
cuda_ipc_event->pid = key->super.pid;
cuda_ipc_event->d_bptr = (uintptr_t)key->super.super.d_bptr;
cuda_ipc_event->pid = key->super.super.pid;
cuda_ipc_event->pid_ns = key->super.pid_ns;
cuda_ipc_event->cuda_device = cuda_device;
ucs_trace("cuMemcpyDtoDAsync issued :%p dst:%p, src:%p len:%ld",
cuda_ipc_event, (void *) dst, (void *) src, iov[0].length);
Expand Down
4 changes: 3 additions & 1 deletion src/uct/cuda/cuda_ipc/cuda_ipc_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ static void uct_cuda_ipc_complete_event(uct_iface_h tl_iface,
cuda_ipc_event->d_bptr,
cuda_ipc_event->mapped_addr,
cuda_ipc_event->cuda_device,
cuda_ipc_event->pid_ns,
iface->config.enable_cache);
if (status != UCS_OK) {
ucs_fatal("failed to unmap addr:%p", cuda_ipc_event->mapped_addr);
Expand Down Expand Up @@ -437,7 +438,8 @@ uct_cuda_ipc_iface_mem_element_pack(uct_iface_h tl_iface,
if (ucs_unlikely(status != UCS_OK)) {
goto out;
}
cuda_ipc_mem_element.mapped_offset = UCS_PTR_BYTE_DIFF(key->super.d_bptr, mapped_addr);
cuda_ipc_mem_element.mapped_offset =
UCS_PTR_BYTE_DIFF(key->super.super.d_bptr, mapped_addr);

status = UCT_CUDADRV_FUNC_LOG_ERR(
cuMemcpyHtoD((CUdeviceptr)mem_element, &cuda_ipc_mem_element,
Expand Down
1 change: 1 addition & 0 deletions src/uct/cuda/cuda_ipc/cuda_ipc_iface.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ typedef struct {
uintptr_t d_bptr;
pid_t pid;
CUdevice cuda_device;
ucs_sys_ns_t pid_ns;
} uct_cuda_ipc_event_desc_t;


Expand Down
32 changes: 29 additions & 3 deletions src/uct/cuda/cuda_ipc/cuda_ipc_md.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@
#include <sys/types.h>
#include <unistd.h>

/* Indicates whether PID NS is contained in rkey.
* Wire compatibility isn't broken because PID is used only for caching
* purposes. */
#define UCT_CUDA_IPC_RKEY_FLAG_PID_NS UCS_BIT(31)

static ucs_config_field_t uct_cuda_ipc_md_config_table[] = {
{"", "", NULL,
ucs_offsetof(uct_cuda_ipc_md_config_t, super), UCS_CONFIG_TYPE_TABLE(uct_md_config_table)},
Expand Down Expand Up @@ -104,7 +109,9 @@ uct_cuda_ipc_md_query(uct_md_h md, uct_md_attr_v2_t *md_attr)
md_attr->reg_mem_types = UCS_BIT(UCS_MEMORY_TYPE_CUDA);
md_attr->cache_mem_types = UCS_BIT(UCS_MEMORY_TYPE_CUDA);
md_attr->access_mem_types = UCS_BIT(UCS_MEMORY_TYPE_CUDA);
md_attr->rkey_packed_size = sizeof(uct_cuda_ipc_rkey_t);
md_attr->rkey_packed_size = ucs_sys_ns_is_default(UCS_SYS_NS_TYPE_PID) ?
sizeof(uct_cuda_ipc_rkey_t) :
sizeof(uct_cuda_ipc_extended_rkey_t);
return UCS_OK;
}

Expand Down Expand Up @@ -261,6 +268,7 @@ uct_cuda_ipc_mkey_pack(uct_md_h md, uct_mem_h tl_memh, void *address,
{
uct_cuda_ipc_rkey_t *packed = mkey_buffer;
uct_cuda_ipc_memh_t *memh = tl_memh;
uct_cuda_ipc_extended_rkey_t *ext_rkey;
uct_cuda_ipc_lkey_t *key;
ucs_status_t status;

Expand All @@ -287,6 +295,14 @@ uct_cuda_ipc_mkey_pack(uct_md_h md, uct_mem_h tl_memh, void *address,
packed->d_bptr = key->d_bptr;
packed->b_len = key->b_len;

if (!ucs_sys_ns_is_default(UCS_SYS_NS_TYPE_PID)) {
ext_rkey = (uct_cuda_ipc_extended_rkey_t*)packed;
ext_rkey->pid_ns = memh->pid_ns;

ucs_assert(!(getpid() & UCT_CUDA_IPC_RKEY_FLAG_PID_NS));
packed->pid |= UCT_CUDA_IPC_RKEY_FLAG_PID_NS;
Comment on lines +298 to +303
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shasson5 IIUC if a new version sends to an older version, the receiver is going to use an incorrect pid, right?
Is it intended, or we assume there is no such scenario?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes that's intended but it shouldn't cause a problem IMO

}

return UCT_CUDADRV_FUNC_LOG_ERR(cuDeviceGetUuid(&packed->uuid,
memh->dev_num));
}
Expand Down Expand Up @@ -318,7 +334,7 @@ uct_cuda_ipc_is_peer_accessible(uct_cuda_ipc_component_t *component,

pthread_mutex_lock(&component->lock);

cache = uct_cuda_ipc_get_dev_cache(component, &rkey->super);
cache = uct_cuda_ipc_get_dev_cache(component, &rkey->super.super);
if (ucs_unlikely(NULL == cache)) {
status = UCS_ERR_NO_RESOURCE;
goto err;
Expand Down Expand Up @@ -373,6 +389,7 @@ UCS_PROFILE_FUNC(ucs_status_t, uct_cuda_ipc_rkey_unpack,
uct_cuda_ipc_unpacked_rkey_t *unpacked;
ucs_sys_device_t sys_dev;
ucs_status_t status;
uct_cuda_ipc_extended_rkey_t *ext_rkey;

sys_dev = UCS_PARAM_VALUE(UCT_RKEY_UNPACK_FIELD, params, sys_device,
SYS_DEVICE, UCS_SYS_DEVICE_ID_UNKNOWN);
Expand All @@ -384,7 +401,15 @@ UCS_PROFILE_FUNC(ucs_status_t, uct_cuda_ipc_rkey_unpack,
goto err;
}

unpacked->super = *packed;
unpacked->super.super = *packed;
unpacked->super.super.pid &= ~UCT_CUDA_IPC_RKEY_FLAG_PID_NS;
unpacked->super.pid_ns = 0;

/* Check if PID NS exists before using it (for wire compatibility) */
if (packed->pid & UCT_CUDA_IPC_RKEY_FLAG_PID_NS) {
ext_rkey = (uct_cuda_ipc_extended_rkey_t*)packed;
unpacked->super.pid_ns = ext_rkey->pid_ns;
}

status = uct_cuda_ipc_is_peer_accessible(com, unpacked, sys_dev);
if (status != UCS_OK) {
Expand Down Expand Up @@ -424,6 +449,7 @@ uct_cuda_ipc_mem_reg(uct_md_h md, void *address, size_t length,
/* dev_num is initialized during pack in uct_cuda_ipc_mem_add_reg */
memh->dev_num = CU_DEVICE_INVALID;
memh->pid = getpid();
memh->pid_ns = ucs_sys_get_ns(UCS_SYS_NS_TYPE_PID);
ucs_list_head_init(&memh->list);

*memh_p = memh;
Expand Down
14 changes: 12 additions & 2 deletions src/uct/cuda/cuda_ipc/cuda_ipc_md.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ typedef struct uct_cuda_ipc_md_config {
typedef struct {
pid_t pid; /* PID as key to resolve peer_map hash */
int dev_num; /* GPU Device number */
ucs_sys_ns_t pid_ns; /* PID namespace */
ucs_list_link_t list;
} uct_cuda_ipc_memh_t;

Expand Down Expand Up @@ -139,9 +140,18 @@ typedef struct {
} uct_cuda_ipc_rkey_t;


/**
* @brief cuda ipc extended remote key
*/
typedef struct {
uct_cuda_ipc_rkey_t super;
ucs_sys_ns_t pid_ns; /* PID namespace */
} uct_cuda_ipc_extended_rkey_t;


typedef struct {
uct_cuda_ipc_rkey_t super;
int stream_id;
uct_cuda_ipc_extended_rkey_t super;
int stream_id;
} uct_cuda_ipc_unpacked_rkey_t;

#endif
18 changes: 9 additions & 9 deletions test/gtest/uct/cuda/test_cuda_ipc_md.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ extern "C" {

class test_cuda_ipc_md : public test_md {
protected:
static uct_cuda_ipc_rkey_t
static uct_cuda_ipc_extended_rkey_t
unpack_common(uct_md_h md, int64_t uuid, CUdeviceptr ptr, size_t size)
{
uct_cuda_ipc_rkey_t rkey = {};
uct_cuda_ipc_extended_rkey_t rkey = {};
uct_mem_h memh;
EXPECT_UCS_OK(md->ops->mem_reg(md, (void *)ptr, size, NULL, &memh));
EXPECT_UCS_OK(md->ops->mkey_pack(md, memh, (void *)ptr, size, NULL,
&rkey));

int64_t *uuid64 = (int64_t *)rkey.uuid.bytes;
int64_t *uuid64 = (int64_t *)rkey.super.uuid.bytes;
uuid64[0] = uuid;
uuid64[1] = uuid;

Expand All @@ -43,11 +43,11 @@ class test_cuda_ipc_md : public test_md {
return rkey;
}

static uct_cuda_ipc_rkey_t unpack(uct_md_h md, int64_t uuid)
static uct_cuda_ipc_extended_rkey_t unpack(uct_md_h md, int64_t uuid)
{
CUdeviceptr ptr;
EXPECT_EQ(CUDA_SUCCESS, cuMemAlloc(&ptr, 64));
uct_cuda_ipc_rkey_t rkey = unpack_common(md, uuid, ptr, 64);
uct_cuda_ipc_extended_rkey_t rkey = unpack_common(md, uuid, ptr, 64);
EXPECT_EQ(CUDA_SUCCESS, cuMemFree(ptr));
return rkey;
}
Expand Down Expand Up @@ -87,15 +87,15 @@ class test_cuda_ipc_md : public test_md {
EXPECT_EQ(CUDA_SUCCESS, cuStreamDestroy(*cu_stream));
}

static uct_cuda_ipc_rkey_t unpack_masync(uct_md_h md, int64_t uuid)
static uct_cuda_ipc_extended_rkey_t unpack_masync(uct_md_h md, int64_t uuid)
{
size_t size = 4 * UCS_MBYTE;
CUdeviceptr ptr;
CUmemoryPool mpool;
CUstream cu_stream;

alloc_mempool(&ptr, &mpool, &cu_stream, size);
uct_cuda_ipc_rkey_t rkey = unpack_common(md, uuid, ptr, size);
uct_cuda_ipc_extended_rkey_t rkey = unpack_common(md, uuid, ptr, size);
free_mempool(&ptr, &mpool, &cu_stream);
return rkey;
}
Expand Down Expand Up @@ -163,7 +163,7 @@ UCS_TEST_P(test_cuda_ipc_md, mpack_legacy)
constexpr size_t size = 4096;
ucs::handle<uct_md_h> md;
uct_mem_h memh;
uct_cuda_ipc_rkey_t rkey;
uct_cuda_ipc_extended_rkey_t rkey;
CUdeviceptr ptr;

UCS_TEST_CREATE_HANDLE(uct_md_h, md, uct_md_close, uct_md_open,
Expand All @@ -174,7 +174,7 @@ UCS_TEST_P(test_cuda_ipc_md, mpack_legacy)
EXPECT_UCS_OK(md->ops->mkey_pack(md, memh, (void *)ptr, size, NULL,
&rkey));

EXPECT_EQ(UCT_CUDA_IPC_KEY_HANDLE_TYPE_LEGACY, rkey.ph.handle_type);
EXPECT_EQ(UCT_CUDA_IPC_KEY_HANDLE_TYPE_LEGACY, rkey.super.ph.handle_type);

uct_md_mem_dereg_params_t params;
params.field_mask = UCT_MD_MEM_DEREG_FIELD_MEMH;
Expand Down
Loading