Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 44 additions & 6 deletions src/ucm/cuda/cudamem.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,27 @@
#include <string.h>


/* Create a body of CUDA memory allocation replacement function */
#define UCM_CUDA_ALLOC_FUNC_DEBUG(_name, _retval, _success, _size, _ptr_type, _ref, \
_args_fmt, ...) \
_retval ucm_##_name(_ptr_type _ref ptr_arg, \
UCS_FUNC_DEFINE_ARGS(__VA_ARGS__)) \
{ \
_ptr_type ptr; \
_retval ret; \
\
ucm_event_enter(); \
ret = ucm_orig_##_name(ptr_arg, UCS_FUNC_PASS_ARGS(__VA_ARGS__)); \
if (ret == (_success)) { \
ptr = _ref ptr_arg; \
ucm_warn("%s(" _args_fmt ") allocated %p", __func__, \
UCS_FUNC_PASS_ARGS(__VA_ARGS__), (void*)ptr); \
ucm_cuda_dispatch_mem_alloc((CUdeviceptr)ptr, (_size)); \
} \
ucm_event_leave(); \
return ret; \
}

/* Create a body of CUDA memory allocation replacement function */
#define UCM_CUDA_ALLOC_FUNC(_name, _retval, _success, _size, _ptr_type, _ref, \
_args_fmt, ...) \
Expand Down Expand Up @@ -61,6 +82,23 @@
return ret; \
}

/* Create a body of CUDA memory release replacement function */
#define UCM_CUDA_FREE_FUNC_DEBUG(_name, _mem_type, _retval, _ptr_arg, _size, \
_args_fmt, ...) \
_retval ucm_##_name(UCS_FUNC_DEFINE_ARGS(__VA_ARGS__)) \
{ \
_retval ret; \
\
ucm_event_enter(); \
ucm_warn("%s(" _args_fmt ")", __func__, \
UCS_FUNC_PASS_ARGS(__VA_ARGS__)); \
ucm_cuda_dispatch_mem_free((CUdeviceptr)(_ptr_arg), _size, _mem_type, \
#_name); \
ret = ucm_orig_##_name(UCS_FUNC_PASS_ARGS(__VA_ARGS__)); \
ucm_event_leave(); \
return ret; \
}

#define UCM_CUDA_GET_GLOBAL_FUNC(_name, _obj_type) \
CUresult ucm_##_name(CUdeviceptr *dptr, size_t *bytes, _obj_type obj, \
const char *name) \
Expand Down Expand Up @@ -200,8 +238,8 @@ static void ucm_cuda_dispatch_mem_free(CUdeviceptr ptr, size_t length,
/* Driver API replacements */
UCM_CUDA_ALLOC_FUNC(cuMemAlloc, CUresult, CUDA_SUCCESS, arg0, CUdeviceptr, *,
"size=%zu", size_t)
UCM_CUDA_ALLOC_FUNC(cuMemAlloc_v2, CUresult, CUDA_SUCCESS, arg0, CUdeviceptr, *,
"size=%zu", size_t)
UCM_CUDA_ALLOC_FUNC_DEBUG(cuMemAlloc_v2, CUresult, CUDA_SUCCESS, arg0, CUdeviceptr, *,
"size=%zu", size_t)
UCM_CUDA_ALLOC_FUNC(cuMemAllocManaged, CUresult, CUDA_SUCCESS, arg0,
CUdeviceptr, *, "size=%zu flags=0x%x", size_t, unsigned)
UCM_CUDA_ALLOC_FUNC(cuMemAllocPitch, CUresult, CUDA_SUCCESS,
Expand All @@ -222,10 +260,10 @@ UCM_CUDA_ALLOC_FUNC(cuMemAllocFromPoolAsync, CUresult, CUDA_SUCCESS, arg0,
CUdeviceptr, *, "size=%zu pool=%p stream=%p", size_t,
CUmemoryPool, CUstream)
#endif
UCM_CUDA_FREE_FUNC(cuMemFree, UCS_MEMORY_TYPE_CUDA, CUresult, arg0, 0,
"ptr=0x%llx", CUdeviceptr)
UCM_CUDA_FREE_FUNC(cuMemFree_v2, UCS_MEMORY_TYPE_CUDA, CUresult, arg0, 0,
"ptr=0x%llx", CUdeviceptr)
UCM_CUDA_FREE_FUNC_DEBUG(cuMemFree, UCS_MEMORY_TYPE_CUDA, CUresult, arg0, 0,
"ptr=0x%llx", CUdeviceptr)
UCM_CUDA_FREE_FUNC_DEBUG(cuMemFree_v2, UCS_MEMORY_TYPE_CUDA, CUresult, arg0, 0,
"ptr=0x%llx", CUdeviceptr)
UCM_CUDA_FREE_FUNC(cuMemFreeHost, UCS_MEMORY_TYPE_HOST, CUresult, arg0, 0,
"ptr=%p", void*)
UCM_CUDA_FREE_FUNC(cuMemFreeHost_v2, UCS_MEMORY_TYPE_HOST, CUresult, arg0, 0,
Expand Down
12 changes: 12 additions & 0 deletions src/ucp/core/ucp_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,18 @@ static ucs_config_field_t ucp_context_config_table[] = {
"even if invalidation workflow isn't supported",
ucs_offsetof(ucp_context_config_t, rndv_errh_ppln_enable), UCS_CONFIG_TYPE_BOOL},

{"RNDV_PIPELINE_FRAG_FC_ENABLE", "y",
"Enable fragment-level flow control in rendezvous pipeline protocol.\n"
"When enabled, limits the number of outstanding fragments to prevent\n"
"staging buffer resource exhaustion",
ucs_offsetof(ucp_context_config_t, rndv_ppln_frag_fc_enable), UCS_CONFIG_TYPE_BOOL},

{"RNDV_PIPELINE_FRAG_WND_SIZE", "1000",
"Maximum number of outstanding fragments allowed in rendezvous pipeline\n"
"protocol when fragment flow control is enabled. This limits the number\n"
"of staging buffers allocated at any given time",
ucs_offsetof(ucp_context_config_t, rndv_ppln_frag_wnd_size), UCS_CONFIG_TYPE_UINT},

{"FLUSH_WORKER_EPS", "y",
"Enable flushing the worker by flushing its endpoints. Allows completing\n"
"the flush operation in a bounded time even if there are new requests on\n"
Expand Down
4 changes: 4 additions & 0 deletions src/ucp/core/ucp_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ typedef struct ucp_context_config {
int rndv_shm_ppln_enable;
/** Enable error handling for rndv pipeline protocol */
int rndv_errh_ppln_enable;
/** Enable fragment throttling for rndv pipeline protocol */
int rndv_ppln_frag_fc_enable;
/** Maximum outstanding fragments in rndv pipeline (window size) */
size_t rndv_ppln_frag_wnd_size;
/** Threshold for using tag matching offload capabilities. Smaller buffers
* will not be posted to the transport. */
size_t tm_thresh;
Expand Down
6 changes: 6 additions & 0 deletions src/ucp/core/ucp_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,12 @@ struct ucp_request {
struct {
/* Size to send in ack message */
ssize_t ack_data_size;

/* Throttling state for fragment flow control */
size_t outstanding_frags; /* Current outstanding fragments */
size_t max_outstanding; /* Max allowed outstanding fragments */
size_t total_frags; /* Total number of fragments */
size_t next_frag_idx; /* Next fragment to send */
} ppln;

/* Used by rndv/rkey_ptr */
Expand Down
33 changes: 33 additions & 0 deletions src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -2519,6 +2519,13 @@ ucs_status_t ucp_worker_create(ucp_context_h context,
worker->counters.ep_closures = 0;
worker->counters.ep_failures = 0;

/* Initialize RNDV pipeline flow control statistics */
worker->rndv_ppln_fc_stats.num_ppln_super_reqs = 0;
worker->rndv_ppln_fc_stats.total_frags = 0;
worker->rndv_ppln_fc_stats.max_super_reqs = 0;
worker->rndv_ppln_fc_stats.max_total_frags = 0;
worker->rndv_ppln_fc_stats.progress_count = 0;

/* Copy user flags, and mask-out unsupported flags for compatibility */
worker->flags = UCP_PARAM_VALUE(WORKER, params, flags, FLAGS, 0) &
UCS_MASK(UCP_WORKER_INTERNAL_FLAGS_SHIFT);
Expand Down Expand Up @@ -2922,6 +2929,15 @@ void ucp_worker_destroy(ucp_worker_h worker)
{
ucs_debug("destroy worker %p", worker);

/* Log final RNDV pipeline flow control statistics */
if (worker->context->config.ext.rndv_ppln_frag_fc_enable &&
(worker->rndv_ppln_fc_stats.max_super_reqs > 0)) {
ucs_info("RNDV_PPLN_FC final stats: "
"peak super_reqs=%zu peak_frags=%zu",
worker->rndv_ppln_fc_stats.max_super_reqs,
worker->rndv_ppln_fc_stats.max_total_frags);
}

UCS_ASYNC_BLOCK(&worker->async);
uct_worker_progress_unregister_safe(worker->uct, &worker->keepalive.cb_id);
ucp_worker_usage_tracker_destroy(worker);
Expand Down Expand Up @@ -3074,6 +3090,23 @@ unsigned ucp_worker_progress(ucp_worker_h worker)
/* coverity[assert_side_effect] */
ucs_assert(--worker->inprogress == 0);

/* Periodically log RNDV pipeline flow control statistics */
if (worker->context->config.ext.rndv_ppln_frag_fc_enable) {
/* Log every 10M progress calls (lightweight check - just a counter) */
if (++worker->rndv_ppln_fc_stats.progress_count >= 10000) {
if (worker->rndv_ppln_fc_stats.num_ppln_super_reqs > 0 ||
worker->rndv_ppln_fc_stats.max_super_reqs > 0) {
ucs_warn("RNDV_PPLN_FC: current super_reqs=%zu frags=%zu | "
"peak super_reqs=%zu frags=%zu",
worker->rndv_ppln_fc_stats.num_ppln_super_reqs,
worker->rndv_ppln_fc_stats.total_frags,
worker->rndv_ppln_fc_stats.max_super_reqs,
worker->rndv_ppln_fc_stats.max_total_frags);
}
worker->rndv_ppln_fc_stats.progress_count = 0;
}
}

UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker);

return count;
Expand Down
9 changes: 9 additions & 0 deletions src/ucp/core/ucp_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,15 @@ typedef struct ucp_worker {
unsigned rkey_config_count; /* Current number of rkey configurations */
ucp_rkey_config_t rkey_config[UCP_WORKER_MAX_RKEY_CONFIG];

/* RNDV pipeline flow control statistics */
struct {
size_t num_ppln_super_reqs; /* Number of active pipeline super requests */
size_t total_frags; /* Total outstanding fragments across all reqs */
size_t max_super_reqs; /* Peak number of super requests */
size_t max_total_frags; /* Peak total fragments */
size_t progress_count; /* Progress call counter for periodic logging */
} rndv_ppln_fc_stats;

struct {
int timerfd; /* Timer needed to signal to user's fd when
* the next keepalive round must be done */
Expand Down
Loading
Loading