Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 55 additions & 3 deletions fabtests/common/shared.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <string.h>
#include <unistd.h>
#include <sched.h>
#include <stdbool.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/socket.h>
Expand Down Expand Up @@ -78,6 +79,8 @@ struct ft_context *tx_ctx_arr = NULL, *rx_ctx_arr = NULL;
uint64_t remote_cq_data = 0;

uint64_t tx_seq, rx_seq, tx_cq_cntr, rx_cq_cntr;

static bool ft_fdwait_need_progress;
int (*ft_mr_alloc_func)(void);
uint64_t ft_tag = 0;
int ft_parent_proc = 0;
Expand Down Expand Up @@ -108,6 +111,8 @@ struct timespec start, end;
int listen_sock = -1;
int sock = -1;
int oob_sock = -1;
#define FT_FDWAIT_PROGRESS_INTERVAL_MS 10

bool allow_rx_cq_data = true;

struct fi_av_attr av_attr = {
Expand Down Expand Up @@ -768,6 +773,16 @@ int ft_open_fabric_res(void)
return ft_open_domain_res();
}

static inline void ft_update_fdwait_progress_requirement(struct fi_info *info)
{
bool manual_progress = info && info->domain_attr &&
(info->domain_attr->data_progress == FI_PROGRESS_MANUAL ||
info->domain_attr->control_progress == FI_PROGRESS_MANUAL);

ft_fdwait_need_progress = manual_progress &&
(opts.comp_method == FT_COMP_WAIT_FD);
}

int ft_alloc_ep_res(struct fi_info *fi, struct fid_cq **new_txcq,
struct fid_cq **new_rxcq, struct fid_cntr **new_txcntr,
struct fid_cntr **new_rxcntr,
Expand All @@ -776,6 +791,8 @@ int ft_alloc_ep_res(struct fi_info *fi, struct fid_cq **new_txcq,
{
int ret;

ft_update_fdwait_progress_requirement(fi);

if (cq_attr.format == FI_CQ_FORMAT_UNSPEC) {
if (fi->caps & FI_TAGGED)
cq_attr.format = FI_CQ_FORMAT_TAGGED;
Expand Down Expand Up @@ -1883,6 +1900,20 @@ static void ft_cleanup_mr_array(struct ft_context *ctx_arr, char **mr_bufs)
}
}

static inline int ft_fdwait_poll_timeout(int remaining)
{
int interval;

if (!ft_fdwait_need_progress)
return remaining;

if (remaining < 0)
return FT_FDWAIT_PROGRESS_INTERVAL_MS;

interval = MIN(remaining, FT_FDWAIT_PROGRESS_INTERVAL_MS);
return interval > 0 ? interval : FT_FDWAIT_PROGRESS_INTERVAL_MS;
Comment on lines +1910 to +1914
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be simplified as:

	if (remaining <= 0)
		return FT_FDWAIT_PROGRESS_INTERVAL_MS;

	return MIN(remaining, FT_FDWAIT_INTERVAL_MS);

And the interval variable can be removed,

}

void ft_close_fids(void)
{
FT_CLOSE_FID(mc);
Expand Down Expand Up @@ -2749,6 +2780,7 @@ static int ft_wait_for_comp(struct fid_cq *cq, uint64_t *cur,

while (total - *cur > 0) {
ret = fi_cq_sread(cq, &comp, 1, NULL, timeout);

if (ret > 0) {
if (!ft_tag_is_valid(cq, &comp, tag ? tag : rx_cq_cntr))
return -FI_EOTHER;
Expand All @@ -2770,17 +2802,36 @@ static int ft_fdwait_for_comp(struct fid_cq *cq, uint64_t *cur,
{
struct fi_cq_err_entry comp;
struct fid *fids[1];
int fd, ret;
int fd, ret = 0;
int remaining = timeout;

fd = cq == txcq ? tx_fd : rx_fd;
fids[0] = &cq->fid;

while (total - *cur > 0) {
ret = fi_trywait(fabric, fids, 1);
if (ret == FI_SUCCESS) {
ret = ft_poll_fd(fd, timeout);
int wait_timeout = ft_fdwait_need_progress
? ft_fdwait_poll_timeout(remaining)
: remaining;
Comment on lines +2814 to +2816
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same condition is checked inside ft_fdwait_poll_timeout(). can eliminate this variable and use ft_fdwait_poll_timeout(remaining) directly.


if (ft_fdwait_need_progress)
(void) fi_cq_read(cq, NULL, 0);

ret = ft_poll_fd(fd, wait_timeout);
if (ret && ret != -FI_EAGAIN)
return ret;

if (ret == -FI_EAGAIN) {
if (remaining >= 0) {
remaining -= wait_timeout;
if (remaining <= 0)
return -FI_EAGAIN;
}
continue;
}
} else if (ret && ret != -FI_EAGAIN) {
return ret;
}

ret = fi_cq_read(cq, &comp, 1);
Expand All @@ -2791,9 +2842,10 @@ static int ft_fdwait_for_comp(struct fid_cq *cq, uint64_t *cur,
} else if (ret < 0 && ret != -FI_EAGAIN) {
return ret;
}
ret = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This becomes unnecessary as well.

}

return 0;
return ret;
Comment on lines -2796 to +2848
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no error path reaching here, can just return 0.

}

int ft_read_cq(struct fid_cq *cq, uint64_t *cur,
Expand Down
8 changes: 6 additions & 2 deletions fabtests/pytest/efa/test_rdm.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,16 @@ def test_rdm_bw_zcpy_recv_use_fi_more(cmdline_args, memory_type, zcpy_recv_max_m

@pytest.mark.functional
@pytest.mark.parametrize("comp_method", ["sread", "fd"])
def test_rdm_pingpong_sread(cmdline_args, completion_semantic, memory_type_bi_dir, direct_message_size, support_sread, comp_method):
@pytest.mark.parametrize("fabric", ["efa", "efa-direct"])
def test_rdm_pingpong_sread(cmdline_args, completion_semantic, memory_type_bi_dir,
direct_message_size, support_sread, comp_method, fabric):
if not support_sread:
pytest.skip("sread not supported by efa device.")
if fabric == "efa" and comp_method == "sread" and completion_semantic == "delivery_complete":
pytest.skip("Skip delivery-complete with sread to avoid manual-progress deadlock.")
efa_run_client_server_test(cmdline_args, f"fi_rdm_pingpong -c {comp_method}", "short",
completion_semantic, memory_type_bi_dir,
direct_message_size, fabric="efa-direct")
direct_message_size if fabric == "efa-direct" else "all", fabric=fabric)


# These tests skip efa-direct because efa-direct does not
Expand Down
6 changes: 3 additions & 3 deletions fabtests/pytest/efa/test_rma_bw.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ def test_rma_bw_sread(cmdline_args, rma_operation_type, rma_bw_completion_semant
direct_rma_size, rma_bw_memory_type, support_sread, comp_method, rma_fabric):
if not support_sread:
pytest.skip("sread not supported by efa device.")
if rma_fabric == "efa":
pytest.skip("sread not implemented in efa fabric yet.")
if rma_fabric == "efa" and comp_method == "sread" and rma_bw_completion_semantic == "delivery_complete":
pytest.skip("Skip delivery-complete with sread to avoid manual-progress deadlock.")
command = f"fi_rma_bw -e rdm -c {comp_method}"
command = command + " -o " + rma_operation_type
# rma_bw test with data verification takes longer to finish
timeout = max(1080, cmdline_args.timeout)
efa_run_client_server_test(cmdline_args, command, "short", rma_bw_completion_semantic,
rma_bw_memory_type, direct_rma_size,
rma_bw_memory_type, direct_rma_size if rma_fabric == "efa-direct" else "all",
timeout=timeout, fabric=rma_fabric)
2 changes: 2 additions & 0 deletions include/rdma/fabric.h
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,8 @@ enum fi_type {
FI_TYPE_MR_ATTR,
FI_TYPE_CNTR_ATTR,
FI_TYPE_CQ_ERR_ENTRY,
FI_TYPE_CQ_WAIT_COND,
FI_TYPE_WAIT_OBJ,
};

char *fi_tostr(const void *data, enum fi_type datatype);
Expand Down
17 changes: 14 additions & 3 deletions man/fi_efa.7.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,16 @@ The following features are supported:
*Completion events*
: The provider supports *FI_CQ_FORMAT_CONTEXT*, *FI_CQ_FORMAT_MSG*, and
*FI_CQ_FORMAT_DATA*. *FI_CQ_FORMAT_TAGGED* is supported on the `efa` fabric
of RDM endpoint. Wait objects are not currently supported.
of RDM endpoint.

The `efa` fabric of RDM endpoint supports *FI_WAIT_FD*, *FI_WAIT_UNSPEC* and
*FI_WAIT_NONE* wait objects for blocking CQ operations (*fi_cq_sread*).
Applications should use *fi_cq_sread()* for blocking reads rather than polling
the wait fd directly, as EFA uses manual progress (*FI_PROGRESS_MANUAL*) and
requires *fi_cq_read()* calls to generate completions. The *fi_cq_sread()*
implementation handles this by driving progress internally while blocking.

DGRAM endpoints do not support wait objects.

*Modes*
: The provider requires the use of *FI_MSG_PREFIX* when running over
Expand Down Expand Up @@ -93,8 +102,10 @@ The following features are supported:
# LIMITATIONS

## Completion events
- Synchronous CQ read is not supported.
- Wait objects are not currently supported.
- DGRAM endpoints do not support synchronous CQ reads (*fi_cq_sread*) or wait objects.
- For RDM endpoints with *FI_WAIT_FD*, applications must use *fi_cq_sread()* for
blocking reads. Direct polling of the wait fd without calling *fi_cq_read()* or
*fi_cq_sread()* will not work due to manual progress requirements.

## RMA operations
- Completion events for RMA targets (*FI_RMA_EVENT*) is not supported.
Expand Down
6 changes: 6 additions & 0 deletions man/fi_fabric.3.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,12 @@ datatype or field value.
*FI_TYPE_CQ_ERR_ENTRY*
: struct fi_cq_err_entry

*FI_TYPE_WAIT_OBJ*
: enum fi_wait_obj

*FI_TYPE_CQ_WAIT_COND*
: enum fi_cq_wait_cond

fi_tostr() will return a pointer to an internal libfabric buffer that
should not be modified, and will be overwritten the next time
fi_tostr() is invoked. fi_tostr() is not thread safe.
Expand Down
23 changes: 18 additions & 5 deletions prov/efa/src/efa_fabric.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include "efa_cq.h"
#include "efa_prov_info.h"
#include "rdm/efa_rdm_cq.h"
#ifdef EFA_PERF_ENABLED
const char *efa_perf_counters_str[] = {
EFA_PERF_FOREACH(OFI_STR)
Expand Down Expand Up @@ -63,16 +64,28 @@ static int efa_fabric_close(fid_t fid)
static int efa_trywait(struct fid_fabric *fabric, struct fid **fids, int count)
{
struct efa_cq *efa_cq;
struct util_cq *util_cq;
struct efa_rdm_cq *rdm_cq;
struct util_wait *wait;
int ret, i;

for (i = 0; i < count; i++) {
if (fids[i]->fclass == FI_CLASS_CQ) {
/* Use EFA-specific CQ trywait */
efa_cq = container_of(fids[i], struct efa_cq, util_cq.cq_fid.fid);
ret = efa_cq_trywait(efa_cq);
if (ret)
return ret;
util_cq = container_of(fids[i], struct util_cq, cq_fid.fid);

/* RDM CQs use util wait objects, not hardware CQ events */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are the benefits of using util wait instead of efa device interrupt to implement wait? Is it because there can be too many device level completions that is only for 1 libfabric completion

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The major requirement of the blocking cq read is to avoid burning the CPU which was the busy loop that application will do when sread is not available. Are we confident this approach is meeting this requirement? cc @bwbarrett

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is we'd need SHM to expose some kind of wait object (e.g. FI_WAIT_FD) in order to implement any kind of multiplexing with EFA device interrupts. I'd be more than happy to engineer a solution but I figured that would be out of scope given the context/urgency of the feature. Could we consider this a follow-up/future improvement?

Short of that, don't we need to periodically drive progress or risk missing completions? I figured the exponential backoff strategy with a minimum interval of 1ms was appropriate since it matches backoff strategies we employ elsewhere. The ofi_wait() calls block on the util CQ FD between retries, so it's not "spinning," per se.

if (util_cq->wait) {
rdm_cq = container_of(util_cq, struct efa_rdm_cq, efa_cq.util_cq);
ret = efa_rdm_cq_trywait(rdm_cq);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will it work on single node when shm is on? AFAICT it is only calling efa_rdm_cq_progress which doesn't progress shm cq.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

efa_rdm_cq_trywait() should now be progressing the SHM CQ via fi_cq_read()

Copy link
Contributor

@shijin-aws shijin-aws Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now this seems to be contradicting to your earlier change on fabtests that adding the progress in the waitfd code path, if we think that calling a progress function outside fi_trywait for manual_progress provider is required, do we still need a progress function inside fi_trywait ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fi_trywait won't process SHM completions and move them into the util CQ. My comment in efa_rdm_cq_trywait is probably misleading.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fi_trywait won't process SHM completions and move them into the util CQ.

Are u saying fi_trywait for shm cq (no efa) will not progress shm completions, so you need that logic in fabtests? With your PR, fi_trywait for efa cq (+shm as peer) will progress both, right?

if (ret)
return ret;
} else {
/* Use EFA-specific CQ trywait for non-RDM CQs */
efa_cq = container_of(fids[i], struct efa_cq, util_cq.cq_fid.fid);
ret = efa_cq_trywait(efa_cq);
if (ret)
return ret;
}
} else {
/* Use generic util trywait logic for non-CQ types */
switch (fids[i]->fclass) {
Expand Down
Loading