diff --git a/fabtests/common/shared.c b/fabtests/common/shared.c index a8ecf1633a5..67438a604a7 100644 --- a/fabtests/common/shared.c +++ b/fabtests/common/shared.c @@ -38,6 +38,7 @@ #include #include #include +#include #include #include #include @@ -78,6 +79,8 @@ struct ft_context *tx_ctx_arr = NULL, *rx_ctx_arr = NULL; uint64_t remote_cq_data = 0; uint64_t tx_seq, rx_seq, tx_cq_cntr, rx_cq_cntr; + +static bool ft_fdwait_need_progress; int (*ft_mr_alloc_func)(void); uint64_t ft_tag = 0; int ft_parent_proc = 0; @@ -108,6 +111,8 @@ struct timespec start, end; int listen_sock = -1; int sock = -1; int oob_sock = -1; +#define FT_FDWAIT_PROGRESS_INTERVAL_MS 10 + bool allow_rx_cq_data = true; struct fi_av_attr av_attr = { @@ -768,6 +773,16 @@ int ft_open_fabric_res(void) return ft_open_domain_res(); } +static inline void ft_update_fdwait_progress_requirement(struct fi_info *info) +{ + bool manual_progress = info && info->domain_attr && + (info->domain_attr->data_progress == FI_PROGRESS_MANUAL || + info->domain_attr->control_progress == FI_PROGRESS_MANUAL); + + ft_fdwait_need_progress = manual_progress && + (opts.comp_method == FT_COMP_WAIT_FD); +} + int ft_alloc_ep_res(struct fi_info *fi, struct fid_cq **new_txcq, struct fid_cq **new_rxcq, struct fid_cntr **new_txcntr, struct fid_cntr **new_rxcntr, @@ -776,6 +791,8 @@ int ft_alloc_ep_res(struct fi_info *fi, struct fid_cq **new_txcq, { int ret; + ft_update_fdwait_progress_requirement(fi); + if (cq_attr.format == FI_CQ_FORMAT_UNSPEC) { if (fi->caps & FI_TAGGED) cq_attr.format = FI_CQ_FORMAT_TAGGED; @@ -1883,6 +1900,20 @@ static void ft_cleanup_mr_array(struct ft_context *ctx_arr, char **mr_bufs) } } +static inline int ft_fdwait_poll_timeout(int remaining) +{ + int interval; + + if (!ft_fdwait_need_progress) + return remaining; + + if (remaining < 0) + return FT_FDWAIT_PROGRESS_INTERVAL_MS; + + interval = MIN(remaining, FT_FDWAIT_PROGRESS_INTERVAL_MS); + return interval > 0 ? interval : FT_FDWAIT_PROGRESS_INTERVAL_MS; +} + void ft_close_fids(void) { FT_CLOSE_FID(mc); @@ -2749,6 +2780,7 @@ static int ft_wait_for_comp(struct fid_cq *cq, uint64_t *cur, while (total - *cur > 0) { ret = fi_cq_sread(cq, &comp, 1, NULL, timeout); + if (ret > 0) { if (!ft_tag_is_valid(cq, &comp, tag ? tag : rx_cq_cntr)) return -FI_EOTHER; @@ -2770,7 +2802,8 @@ static int ft_fdwait_for_comp(struct fid_cq *cq, uint64_t *cur, { struct fi_cq_err_entry comp; struct fid *fids[1]; - int fd, ret; + int fd, ret = 0; + int remaining = timeout; fd = cq == txcq ? tx_fd : rx_fd; fids[0] = &cq->fid; @@ -2778,9 +2811,27 @@ static int ft_fdwait_for_comp(struct fid_cq *cq, uint64_t *cur, while (total - *cur > 0) { ret = fi_trywait(fabric, fids, 1); if (ret == FI_SUCCESS) { - ret = ft_poll_fd(fd, timeout); + int wait_timeout = ft_fdwait_need_progress + ? ft_fdwait_poll_timeout(remaining) + : remaining; + + if (ft_fdwait_need_progress) + (void) fi_cq_read(cq, NULL, 0); + + ret = ft_poll_fd(fd, wait_timeout); if (ret && ret != -FI_EAGAIN) return ret; + + if (ret == -FI_EAGAIN) { + if (remaining >= 0) { + remaining -= wait_timeout; + if (remaining <= 0) + return -FI_EAGAIN; + } + continue; + } + } else if (ret && ret != -FI_EAGAIN) { + return ret; } ret = fi_cq_read(cq, &comp, 1); @@ -2791,9 +2842,10 @@ static int ft_fdwait_for_comp(struct fid_cq *cq, uint64_t *cur, } else if (ret < 0 && ret != -FI_EAGAIN) { return ret; } + ret = 0; } - return 0; + return ret; } int ft_read_cq(struct fid_cq *cq, uint64_t *cur, diff --git a/fabtests/pytest/efa/test_rdm.py b/fabtests/pytest/efa/test_rdm.py index f1c647cf07e..373e9d0507c 100644 --- a/fabtests/pytest/efa/test_rdm.py +++ b/fabtests/pytest/efa/test_rdm.py @@ -162,12 +162,16 @@ def test_rdm_bw_zcpy_recv_use_fi_more(cmdline_args, memory_type, zcpy_recv_max_m @pytest.mark.functional @pytest.mark.parametrize("comp_method", ["sread", "fd"]) -def test_rdm_pingpong_sread(cmdline_args, completion_semantic, memory_type_bi_dir, direct_message_size, support_sread, comp_method): +@pytest.mark.parametrize("fabric", ["efa", "efa-direct"]) +def test_rdm_pingpong_sread(cmdline_args, completion_semantic, memory_type_bi_dir, + direct_message_size, support_sread, comp_method, fabric): if not support_sread: pytest.skip("sread not supported by efa device.") + if fabric == "efa" and comp_method == "sread" and completion_semantic == "delivery_complete": + pytest.skip("Skip delivery-complete with sread to avoid manual-progress deadlock.") efa_run_client_server_test(cmdline_args, f"fi_rdm_pingpong -c {comp_method}", "short", completion_semantic, memory_type_bi_dir, - direct_message_size, fabric="efa-direct") + direct_message_size if fabric == "efa-direct" else "all", fabric=fabric) # These tests skip efa-direct because efa-direct does not diff --git a/fabtests/pytest/efa/test_rma_bw.py b/fabtests/pytest/efa/test_rma_bw.py index 43bfebe7f74..cc20627fc15 100644 --- a/fabtests/pytest/efa/test_rma_bw.py +++ b/fabtests/pytest/efa/test_rma_bw.py @@ -91,12 +91,12 @@ def test_rma_bw_sread(cmdline_args, rma_operation_type, rma_bw_completion_semant direct_rma_size, rma_bw_memory_type, support_sread, comp_method, rma_fabric): if not support_sread: pytest.skip("sread not supported by efa device.") - if rma_fabric == "efa": - pytest.skip("sread not implemented in efa fabric yet.") + if rma_fabric == "efa" and comp_method == "sread" and rma_bw_completion_semantic == "delivery_complete": + pytest.skip("Skip delivery-complete with sread to avoid manual-progress deadlock.") command = f"fi_rma_bw -e rdm -c {comp_method}" command = command + " -o " + rma_operation_type # rma_bw test with data verification takes longer to finish timeout = max(1080, cmdline_args.timeout) efa_run_client_server_test(cmdline_args, command, "short", rma_bw_completion_semantic, - rma_bw_memory_type, direct_rma_size, + rma_bw_memory_type, direct_rma_size if rma_fabric == "efa-direct" else "all", timeout=timeout, fabric=rma_fabric) diff --git a/include/rdma/fabric.h b/include/rdma/fabric.h index b1324ad8d82..e80a6cae1c2 100644 --- a/include/rdma/fabric.h +++ b/include/rdma/fabric.h @@ -777,6 +777,8 @@ enum fi_type { FI_TYPE_MR_ATTR, FI_TYPE_CNTR_ATTR, FI_TYPE_CQ_ERR_ENTRY, + FI_TYPE_CQ_WAIT_COND, + FI_TYPE_WAIT_OBJ, }; char *fi_tostr(const void *data, enum fi_type datatype); diff --git a/man/fi_efa.7.md b/man/fi_efa.7.md index 2aae54dd136..4fe80a4ec60 100644 --- a/man/fi_efa.7.md +++ b/man/fi_efa.7.md @@ -65,7 +65,16 @@ The following features are supported: *Completion events* : The provider supports *FI_CQ_FORMAT_CONTEXT*, *FI_CQ_FORMAT_MSG*, and *FI_CQ_FORMAT_DATA*. *FI_CQ_FORMAT_TAGGED* is supported on the `efa` fabric - of RDM endpoint. Wait objects are not currently supported. + of RDM endpoint. + + The `efa` fabric of RDM endpoint supports *FI_WAIT_FD*, *FI_WAIT_UNSPEC* and + *FI_WAIT_NONE* wait objects for blocking CQ operations (*fi_cq_sread*). + Applications should use *fi_cq_sread()* for blocking reads rather than polling + the wait fd directly, as EFA uses manual progress (*FI_PROGRESS_MANUAL*) and + requires *fi_cq_read()* calls to generate completions. The *fi_cq_sread()* + implementation handles this by driving progress internally while blocking. + + DGRAM endpoints do not support wait objects. *Modes* : The provider requires the use of *FI_MSG_PREFIX* when running over @@ -93,8 +102,10 @@ The following features are supported: # LIMITATIONS ## Completion events -- Synchronous CQ read is not supported. -- Wait objects are not currently supported. +- DGRAM endpoints do not support synchronous CQ reads (*fi_cq_sread*) or wait objects. +- For RDM endpoints with *FI_WAIT_FD*, applications must use *fi_cq_sread()* for + blocking reads. Direct polling of the wait fd without calling *fi_cq_read()* or + *fi_cq_sread()* will not work due to manual progress requirements. ## RMA operations - Completion events for RMA targets (*FI_RMA_EVENT*) is not supported. diff --git a/man/fi_fabric.3.md b/man/fi_fabric.3.md index ebc44edd366..d1745a974eb 100644 --- a/man/fi_fabric.3.md +++ b/man/fi_fabric.3.md @@ -195,6 +195,12 @@ datatype or field value. *FI_TYPE_CQ_ERR_ENTRY* : struct fi_cq_err_entry +*FI_TYPE_WAIT_OBJ* +: enum fi_wait_obj + +*FI_TYPE_CQ_WAIT_COND* +: enum fi_cq_wait_cond + fi_tostr() will return a pointer to an internal libfabric buffer that should not be modified, and will be overwritten the next time fi_tostr() is invoked. fi_tostr() is not thread safe. diff --git a/prov/efa/src/efa_fabric.c b/prov/efa/src/efa_fabric.c index d3ba4803e29..674c2ce731c 100644 --- a/prov/efa/src/efa_fabric.c +++ b/prov/efa/src/efa_fabric.c @@ -20,6 +20,7 @@ #include "efa_cq.h" #include "efa_prov_info.h" +#include "rdm/efa_rdm_cq.h" #ifdef EFA_PERF_ENABLED const char *efa_perf_counters_str[] = { EFA_PERF_FOREACH(OFI_STR) @@ -63,16 +64,28 @@ static int efa_fabric_close(fid_t fid) static int efa_trywait(struct fid_fabric *fabric, struct fid **fids, int count) { struct efa_cq *efa_cq; + struct util_cq *util_cq; + struct efa_rdm_cq *rdm_cq; struct util_wait *wait; int ret, i; for (i = 0; i < count; i++) { if (fids[i]->fclass == FI_CLASS_CQ) { - /* Use EFA-specific CQ trywait */ - efa_cq = container_of(fids[i], struct efa_cq, util_cq.cq_fid.fid); - ret = efa_cq_trywait(efa_cq); - if (ret) - return ret; + util_cq = container_of(fids[i], struct util_cq, cq_fid.fid); + + /* RDM CQs use util wait objects, not hardware CQ events */ + if (util_cq->wait) { + rdm_cq = container_of(util_cq, struct efa_rdm_cq, efa_cq.util_cq); + ret = efa_rdm_cq_trywait(rdm_cq); + if (ret) + return ret; + } else { + /* Use EFA-specific CQ trywait for non-RDM CQs */ + efa_cq = container_of(fids[i], struct efa_cq, util_cq.cq_fid.fid); + ret = efa_cq_trywait(efa_cq); + if (ret) + return ret; + } } else { /* Use generic util trywait logic for non-CQ types */ switch (fids[i]->fclass) { diff --git a/prov/efa/src/rdm/efa_rdm_cq.c b/prov/efa/src/rdm/efa_rdm_cq.c index 74bd93b726b..235985bbe6d 100644 --- a/prov/efa/src/rdm/efa_rdm_cq.c +++ b/prov/efa/src/rdm/efa_rdm_cq.c @@ -8,6 +8,7 @@ #include "efa_av.h" #include "efa_cntr.h" #include "efa_rdm_pke_cmd.h" +#include "efa_rdm_peer.h" #include "efa_rdm_pke_utils.h" #include "efa_rdm_pke_nonreq.h" #include "efa_rdm_tracepoint.h" @@ -69,7 +70,7 @@ static struct fi_ops efa_rdm_cq_fi_ops = { .size = sizeof(struct fi_ops), .close = efa_rdm_cq_close, .bind = fi_no_bind, - .control = fi_no_control, + .control = ofi_cq_control, .ops_open = fi_no_ops_open, }; @@ -112,6 +113,9 @@ static void efa_rdm_cq_proc_ibv_recv_rdma_with_imm_completion( ret = ofi_cq_write(target_cq, NULL, flags, len, NULL, imm_data, 0); } + if (OFI_LIKELY(!ret && target_cq->wait)) + target_cq->wait->signal(target_cq->wait); + if (OFI_UNLIKELY(ret)) { EFA_WARN(FI_LOG_CQ, "Unable to write a cq entry for remote for RECV_RDMA operation: %s\n", @@ -844,7 +848,7 @@ static ssize_t efa_rdm_cq_readfrom(struct fid_cq *cq_fid, void *buf, size_t coun if (cq->shm_cq) { fi_cq_read(cq->shm_cq, NULL, 0); - /* + /* * fi_cq_read(cq->shm_cq, NULL, 0) will progress shm ep and write * completion to efa. Use ofi_cq_read_entries to get the number of * shm completions without progressing efa ep again. @@ -863,14 +867,105 @@ static ssize_t efa_rdm_cq_readfrom(struct fid_cq *cq_fid, void *buf, size_t coun return ret; } +static void efa_rdm_cq_progress(struct util_cq *cq); + +/** + * @brief Blocking CQ read with source address + * + * @param[in] cq_fid CQ file descriptor + * @param[out] buf Buffer for completion entries + * @param[in] count Maximum entries to read + * @param[out] src_addr Source addresses (optional) + * @param[in] cond Wait condition (threshold) + * @param[in] timeout Timeout in milliseconds + * @return Number of entries read or negative error code + */ +static ssize_t efa_rdm_cq_sreadfrom(struct fid_cq *cq_fid, void *buf, size_t count, + fi_addr_t *src_addr, const void *cond, int timeout) +{ + struct efa_cq *efa_cq = container_of(cq_fid, struct efa_cq, util_cq.cq_fid); + struct util_cq *util_cq = &efa_cq->util_cq; + const uint64_t endtime = ofi_timeout_time(timeout); + ssize_t ret; + size_t num_completions = 0, threshold = 1; + int wait_time = 1; + uint8_t *buffer = buf; + + /* fi_cq_sread[from] calls are invalid w/ FI_WAIT_NONE */ + if (OFI_UNLIKELY(efa_cq->wait_obj == FI_WAIT_NONE)) + return -FI_EINVAL; + + if (OFI_UNLIKELY(!count)) + return -FI_EINVAL; + + /* Require wait object for blocking operations */ + if (OFI_UNLIKELY(!util_cq->wait)) + return -FI_ENOSYS; + + /* Handle threshold condition */ + if (OFI_UNLIKELY(efa_cq->wait_cond == FI_CQ_COND_THRESHOLD && cond)) + threshold = MAX(MIN(*(const size_t *) cond, count), 1); + + while (num_completions < threshold) { + /* Try non-blocking read first */ + ret = efa_rdm_cq_readfrom(cq_fid, buffer, count - num_completions, + src_addr ? src_addr + num_completions : NULL); + + if (OFI_LIKELY(ret > 0)) { + num_completions += ret; + buffer += (ptrdiff_t) ret * efa_cq->entry_size; + continue; + } + + if (OFI_UNLIKELY(ret != 0 && ret != -FI_EAGAIN)) + return num_completions ? (ssize_t) num_completions : ret; + + /* Handle timeout */ + if (OFI_UNLIKELY(ofi_adjust_timeout(endtime, &timeout))) + return num_completions ? (ssize_t) num_completions : -FI_EAGAIN; + + /* Check for wakeup signal */ + if (OFI_UNLIKELY(ofi_atomic_get32(&util_cq->wakeup))) { + ofi_atomic_set32(&util_cq->wakeup, 0); + return num_completions ? (ssize_t) num_completions : -FI_EAGAIN; + } + + if (efa_cq->wait_obj == FI_WAIT_FD) { + ofi_wait(&util_cq->wait->wait_fid, timeout); + } else { + int interval = timeout >= 0 ? MIN(wait_time, timeout) : wait_time; + ofi_wait(&util_cq->wait->wait_fid, interval); + wait_time *= 2; + } + } + + return (ssize_t) num_completions; +} + +/** + * @brief Blocking CQ read for EFA RDM + * + * @param[in] cq_fid CQ file descriptor + * @param[out] buf Buffer for completion entries + * @param[in] count Maximum entries to read + * @param[in] cond Wait condition (unused) + * @param[in] timeout Timeout in milliseconds + * @return Number of entries read or negative error code + */ +static ssize_t efa_rdm_cq_sread(struct fid_cq *cq_fid, void *buf, size_t count, + const void *cond, int timeout) +{ + return efa_rdm_cq_sreadfrom(cq_fid, buf, count, NULL, cond, timeout); +} + static struct fi_ops_cq efa_rdm_cq_ops = { .size = sizeof(struct fi_ops_cq), .read = ofi_cq_read, .readfrom = efa_rdm_cq_readfrom, .readerr = ofi_cq_readerr, - .sread = fi_no_cq_sread, - .sreadfrom = fi_no_cq_sreadfrom, - .signal = fi_no_cq_signal, + .sread = efa_rdm_cq_sread, + .sreadfrom = efa_rdm_cq_sreadfrom, + .signal = ofi_cq_signal, .strerror = efa_rdm_cq_strerror, }; @@ -912,6 +1007,61 @@ static void efa_rdm_cq_progress(struct util_cq *cq) ofi_genlock_unlock(&cq->ep_list_lock); } +/** + * @brief Try to wait on RDM CQ - check if ready for blocking wait + * + * Called only when util_cq->wait exists + * + * @param[in] cq RDM CQ to check + * @return FI_SUCCESS if ready to wait, -FI_EAGAIN if completions available + */ +int efa_rdm_cq_trywait(struct efa_rdm_cq *cq) +{ + struct util_cq *util_cq = &cq->efa_cq.util_cq; + + /* Drive progress */ + fi_cq_read(&util_cq->cq_fid, NULL, 0); + + /* Check if completions available */ + if (!ofi_cirque_isempty(util_cq->cirq)) + return -FI_EAGAIN; + + /* Use wait object's trywait (guaranteed to exist by caller) */ + return util_cq->wait->wait_try(util_cq->wait); +} + +/** + * @brief Verify CQ wait object attributes + * + * @param[in] attr CQ attributes to verify + * @return FI_SUCCESS if valid, -FI_EINVAL if unsupported + */ +static int efa_rdm_cq_verify_wait_attr(const struct fi_cq_attr *attr) +{ + switch (attr->wait_obj) { + case FI_WAIT_NONE: + case FI_WAIT_UNSPEC: + case FI_WAIT_FD: + break; + default: + EFA_WARN(FI_LOG_CQ, "Unsupported wait object: %s\n", + fi_tostr(&attr->wait_obj, FI_TYPE_WAIT_OBJ)); + return -FI_EINVAL; + } + + switch (attr->wait_cond) { + case FI_CQ_COND_NONE: + case FI_CQ_COND_THRESHOLD: + break; + default: + EFA_WARN(FI_LOG_CQ, "Unsupported wait condition: %s\n", + fi_tostr(&attr->wait_cond, FI_TYPE_CQ_WAIT_COND)); + return -FI_EINVAL; + } + + return FI_SUCCESS; +} + /** * @brief create a CQ for EFA RDM provider * @@ -935,8 +1085,9 @@ int efa_rdm_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr, struct fi_peer_cq_context peer_cq_context = {0}; struct fi_efa_cq_init_attr efa_cq_init_attr = {0}; - if (attr->wait_obj != FI_WAIT_NONE) - return -FI_ENOSYS; + ret = efa_rdm_cq_verify_wait_attr(attr); + if (ret) + return ret; cq = calloc(1, sizeof(*cq)); if (!cq) @@ -949,12 +1100,16 @@ int efa_rdm_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr, dlist_init(&cq->ibv_cq_poll_list); cq->need_to_scan_ep_list = false; + ret = ofi_cq_init(&efa_prov, domain, attr, &cq->efa_cq.util_cq, &efa_rdm_cq_progress, context); if (ret) goto free; + cq->efa_cq.wait_obj = attr->wait_obj; + cq->efa_cq.wait_cond = attr->wait_cond; + ret = efa_cq_open_ibv_cq( attr, efa_domain->device->ibv_ctx, &cq->efa_cq.ibv_cq, &efa_cq_init_attr); @@ -965,13 +1120,10 @@ int efa_rdm_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr, cq->efa_cq.poll_ibv_cq = efa_rdm_cq_poll_ibv_cq; - *cq_fid = &cq->efa_cq.util_cq.cq_fid; - (*cq_fid)->fid.ops = &efa_rdm_cq_fi_ops; - (*cq_fid)->ops = &efa_rdm_cq_ops; - /* open shm cq as peer cq */ if (efa_domain->shm_domain) { memcpy(&shm_cq_attr, attr, sizeof(*attr)); + shm_cq_attr.wait_obj = FI_WAIT_NONE; /* Bind ep with shm provider's cq */ shm_cq_attr.flags |= FI_PEER; peer_cq_context.size = sizeof(peer_cq_context); @@ -984,6 +1136,10 @@ int efa_rdm_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr, } } + *cq_fid = &cq->efa_cq.util_cq.cq_fid; + (*cq_fid)->fid.ops = &efa_rdm_cq_fi_ops; + (*cq_fid)->ops = &efa_rdm_cq_ops; + return 0; destroy_ibv_cq: retv = -ibv_destroy_cq(ibv_cq_ex_to_cq(cq->efa_cq.ibv_cq.ibv_cq_ex)); diff --git a/prov/efa/src/rdm/efa_rdm_cq.h b/prov/efa/src/rdm/efa_rdm_cq.h index ae6d7784eb8..df326f6c751 100644 --- a/prov/efa/src/rdm/efa_rdm_cq.h +++ b/prov/efa/src/rdm/efa_rdm_cq.h @@ -18,6 +18,8 @@ struct efa_rdm_cq { int efa_rdm_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr, struct fid_cq **cq_fid, void *context); +int efa_rdm_cq_trywait(struct efa_rdm_cq *cq); + void efa_rdm_cq_poll_ibv_cq_closing_ep(struct efa_ibv_cq *ibv_cq, struct efa_rdm_ep *closing_ep); int efa_rdm_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq); diff --git a/prov/efa/src/rdm/efa_rdm_ope.c b/prov/efa/src/rdm/efa_rdm_ope.c index e12beed4d5b..9c2cd5fec7d 100644 --- a/prov/efa/src/rdm/efa_rdm_ope.c +++ b/prov/efa/src/rdm/efa_rdm_ope.c @@ -920,6 +920,9 @@ void efa_rdm_rxe_report_completion(struct efa_rdm_ope *rxe) return; } + if (rx_cq->wait) + rx_cq->wait->signal(rx_cq->wait); + rxe->fi_flags |= EFA_RDM_TXE_NO_COMPLETION; } @@ -1020,6 +1023,9 @@ void efa_rdm_txe_report_completion(struct efa_rdm_ope *txe) efa_rdm_txe_handle_error(txe, -ret, FI_EFA_ERR_WRITE_SEND_COMP); return; } + + if (tx_cq->wait) + tx_cq->wait->signal(tx_cq->wait); } efa_cntr_report_tx_completion(&txe->ep->base_ep.util_ep, txe->cq_entry.flags); diff --git a/prov/efa/test/efa_unit_test_cq.c b/prov/efa/test/efa_unit_test_cq.c index 86f38a31ead..3993b433c90 100644 --- a/prov/efa/test/efa_unit_test_cq.c +++ b/prov/efa/test/efa_unit_test_cq.c @@ -1467,6 +1467,7 @@ static void test_efa_cq_data_path_direct_status( efa_cq = container_of(cq, struct efa_cq, util_cq.cq_fid); assert_true(efa_cq->ibv_cq.data_path_direct_enabled == data_path_direct_enabled); + assert_int_equal(fi_close(&cq->fid), 0); /* Recover the mocked vendor_id */ @@ -1732,7 +1733,7 @@ void test_efa_cq_sread_einval(struct efa_resource **state) efa_unit_test_resource_construct(resource, FI_EP_RDM, EFA_DIRECT_FABRIC_NAME); efa_cq = container_of(resource->cq, struct efa_cq, util_cq.cq_fid.fid); - assert_null(efa_cq->wait_obj); + assert_int_equal(efa_cq->wait_obj, FI_WAIT_NONE); assert_null(efa_cq->ibv_cq.channel); ret = fi_cq_sread(resource->cq, &cq_entry, 1, NULL, 1); @@ -2316,4 +2317,48 @@ void test_efa_cq_read_mixed_success_error(struct efa_resource **state) will_return_maybe(efa_mock_efa_ibv_cq_start_poll_return_mock, ENOENT); assert_int_equal(fi_close(&resource->ep->fid), 0); resource->ep = NULL; -} \ No newline at end of file +} + + + +/** + * @brief Test fi_cq_sread() with count=0 returns -FI_EINVAL + */ +void test_efa_rdm_cq_sread_invalid_count(struct efa_resource **state) +{ + struct efa_resource *resource = *state; + struct fi_cq_data_entry cq_entry; + int ret; + + efa_unit_test_resource_construct(resource, FI_EP_RDM, EFA_FABRIC_NAME); + + ret = fi_cq_sread(resource->cq, &cq_entry, 0, NULL, 0); + assert_int_equal(ret, -FI_EINVAL); + + will_return_maybe(efa_mock_efa_ibv_cq_start_poll_return_mock, ENOENT); + assert_int_equal(fi_close(&resource->ep->fid), 0); + resource->ep = NULL; +} + +/** + * @brief Test fi_cq_sread() with wait object disabled returns -FI_EINVAL + */ +void test_efa_rdm_cq_sread_no_wait_obj(struct efa_resource **state) +{ + struct efa_resource *resource = *state; + struct fi_cq_data_entry cq_entry; + int ret; + + /* Open a temporary CQ with wait-none so no wait object exists */ + struct fid_cq *waitless_cq = NULL; + efa_unit_test_resource_construct(resource, FI_EP_RDM, EFA_FABRIC_NAME); + assert_int_equal(fi_cq_open(resource->domain, + &(struct fi_cq_attr){ .wait_obj = FI_WAIT_NONE }, + &waitless_cq, NULL), 0); + + ret = fi_cq_sread(waitless_cq, &cq_entry, 1, NULL, 0); + assert_int_equal(ret, -FI_EINVAL); + + assert_int_equal(fi_close(&waitless_cq->fid), 0); + will_return_maybe(efa_mock_efa_ibv_cq_start_poll_return_mock, ENOENT); +} diff --git a/prov/efa/test/efa_unit_tests.c b/prov/efa/test/efa_unit_tests.c index 5135d77092e..daa7eeab7b6 100644 --- a/prov/efa/test/efa_unit_tests.c +++ b/prov/efa/test/efa_unit_tests.c @@ -53,6 +53,11 @@ static int efa_unit_test_mocks_teardown(void **state) /* Reset the contents of g_efa_hmem_info from backup */ memcpy(g_efa_hmem_info, g_efa_hmem_info_backup, sizeof(g_efa_hmem_info)); + /* Provide default mock return values for CQ operations during teardown */ + if (g_efa_unit_test_mocks.efa_ibv_cq_start_poll) { + will_return_maybe(efa_mock_efa_ibv_cq_start_poll_return_mock, ENOENT); + } + efa_unit_test_resource_destruct(resource); efa_ibv_ah_limit_cnt_reset(); @@ -223,6 +228,8 @@ int main(void) cmocka_unit_test_setup_teardown(test_rdm_fallback_to_ibv_create_cq_ex_cq_read_ignore_forgotton_peer, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_ibv_cq_ex_read_ignore_removed_peer, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_rdm_cq_before_ep_enable, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), + cmocka_unit_test_setup_teardown(test_efa_rdm_cq_sread_invalid_count, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), + cmocka_unit_test_setup_teardown(test_efa_rdm_cq_sread_no_wait_obj, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_cq_data_path_direct_disabled_by_env, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_cq_data_path_direct_disabled_with_old_device, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_cq_data_path_direct_enabled_with_new_device, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), diff --git a/prov/efa/test/efa_unit_tests.h b/prov/efa/test/efa_unit_tests.h index 50b52249578..d78b80e5106 100644 --- a/prov/efa/test/efa_unit_tests.h +++ b/prov/efa/test/efa_unit_tests.h @@ -191,6 +191,8 @@ void test_ibv_cq_ex_read_recover_forgotten_peer_ah(); void test_rdm_fallback_to_ibv_create_cq_ex_cq_read_ignore_forgotton_peer(); void test_ibv_cq_ex_read_ignore_removed_peer(); void test_efa_rdm_cq_before_ep_enable(); +void test_efa_rdm_cq_sread_invalid_count(); +void test_efa_rdm_cq_sread_no_wait_obj(); /* begin efa_unit_test_info.c */ void test_info_open_ep_with_wrong_info(); diff --git a/src/fi_tostr.c b/src/fi_tostr.c index 1a92d6ba5c3..61178359df6 100644 --- a/src/fi_tostr.c +++ b/src/fi_tostr.c @@ -1039,6 +1039,12 @@ char *DEFAULT_SYMVER_PRE(fi_tostr_r)(char *buf, size_t len, case FI_TYPE_CQ_ERR_ENTRY: ofi_tostr_cq_err_entry(buf, len, data); break; + case FI_TYPE_CQ_WAIT_COND: + ofi_tostr_cq_wait_cond(buf, len, *enumval); + break; + case FI_TYPE_WAIT_OBJ: + ofi_tostr_wait_obj(buf, len, *enumval); + break; default: ofi_strncatf(buf, len, "Unknown type"); break;