Skip to content

Commit 9e193b3

Browse files
committed
prov/efa: Add support for blocking CQ read in RDM
This adds support for `fi_cq_sread{,from}`, `fi_control` and `fi_signal` for the `efa` fabric. This also expands EFA's `fi_trywait()` to handle wait objects for Util CQs. Since the endpoint needs to progress in order to generate completions to read from, an exponential backoff strategy is employed to poll by driving progress periodically without excessively spinning the CPU for higher latency scenarios. The `timeout` specified by the user is used to cap the total time spent waiting for completions. A CQ requested with FI_WAIT_FD indicates the app intends to drive progress itself as needed, since EFA advertises FI_PROGRESS_MANUAL. In this scenario, sreadfrom will block without periodically driving progress. Signed-off-by: Darryl Abbate <[email protected]>
1 parent b81a7c5 commit 9e193b3

File tree

4 files changed

+218
-11
lines changed

4 files changed

+218
-11
lines changed

prov/efa/src/efa_fabric.c

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
#include "efa_cq.h"
2222
#include "efa_prov_info.h"
23+
#include "rdm/efa_rdm_cq.h"
2324
#ifdef EFA_PERF_ENABLED
2425
const char *efa_perf_counters_str[] = {
2526
EFA_PERF_FOREACH(OFI_STR)
@@ -63,16 +64,28 @@ static int efa_fabric_close(fid_t fid)
6364
static int efa_trywait(struct fid_fabric *fabric, struct fid **fids, int count)
6465
{
6566
struct efa_cq *efa_cq;
67+
struct util_cq *util_cq;
68+
struct efa_rdm_cq *rdm_cq;
6669
struct util_wait *wait;
6770
int ret, i;
6871

6972
for (i = 0; i < count; i++) {
7073
if (fids[i]->fclass == FI_CLASS_CQ) {
71-
/* Use EFA-specific CQ trywait */
72-
efa_cq = container_of(fids[i], struct efa_cq, util_cq.cq_fid.fid);
73-
ret = efa_cq_trywait(efa_cq);
74-
if (ret)
75-
return ret;
74+
util_cq = container_of(fids[i], struct util_cq, cq_fid.fid);
75+
76+
/* RDM CQs use util wait objects, not hardware CQ events */
77+
if (util_cq->wait) {
78+
rdm_cq = container_of(util_cq, struct efa_rdm_cq, efa_cq.util_cq);
79+
ret = efa_rdm_cq_trywait(rdm_cq);
80+
if (ret)
81+
return ret;
82+
} else {
83+
/* Use EFA-specific CQ trywait for non-RDM CQs */
84+
efa_cq = container_of(fids[i], struct efa_cq, util_cq.cq_fid.fid);
85+
ret = efa_cq_trywait(efa_cq);
86+
if (ret)
87+
return ret;
88+
}
7689
} else {
7790
/* Use generic util trywait logic for non-CQ types */
7891
switch (fids[i]->fclass) {

prov/efa/src/rdm/efa_rdm_cq.c

Lines changed: 192 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include "efa_av.h"
99
#include "efa_cntr.h"
1010
#include "efa_rdm_pke_cmd.h"
11+
#include "efa_rdm_peer.h"
1112
#include "efa_rdm_pke_utils.h"
1213
#include "efa_rdm_pke_nonreq.h"
1314
#include "efa_rdm_tracepoint.h"
@@ -22,6 +23,28 @@ const char *efa_rdm_cq_strerror(struct fid_cq *cq_fid, int prov_errno,
2223
: efa_strerror(prov_errno);
2324
}
2425

26+
static
27+
int efa_rdm_cq_control(struct fid *fid, int command, void *arg)
28+
{
29+
const struct util_cq *util_cq = container_of(fid, struct util_cq, cq_fid.fid);
30+
31+
switch (command) {
32+
case FI_GETWAIT:
33+
if (!util_cq->wait || util_cq->wait->wait_obj != FI_WAIT_FD)
34+
return -FI_ENODATA;
35+
*(int *) arg = container_of(util_cq->wait, struct util_wait_fd, util_wait)->epoll_fd;
36+
break;
37+
case FI_GETWAITOBJ:
38+
*(enum fi_wait_obj *) arg = util_cq->wait
39+
? util_cq->wait->wait_obj
40+
: FI_WAIT_NONE;
41+
break;
42+
default:
43+
return -FI_ENOSYS;
44+
}
45+
return FI_SUCCESS;
46+
}
47+
2548
/**
2649
* @brief close a CQ of EFA RDM endpoint
2750
*
@@ -69,7 +92,7 @@ static struct fi_ops efa_rdm_cq_fi_ops = {
6992
.size = sizeof(struct fi_ops),
7093
.close = efa_rdm_cq_close,
7194
.bind = fi_no_bind,
72-
.control = fi_no_control,
95+
.control = efa_rdm_cq_control,
7396
.ops_open = fi_no_ops_open,
7497
};
7598

@@ -116,6 +139,9 @@ void efa_rdm_cq_proc_ibv_recv_rdma_with_imm_completion(
116139
ret = ofi_cq_write(target_cq, NULL, flags, len, NULL, imm_data, 0);
117140
}
118141

142+
if (OFI_LIKELY(!ret && target_cq->wait))
143+
target_cq->wait->signal(target_cq->wait);
144+
119145
if (OFI_UNLIKELY(ret)) {
120146
EFA_WARN(FI_LOG_CQ,
121147
"Unable to write a cq entry for remote for RECV_RDMA operation: %s\n",
@@ -854,14 +880,113 @@ static ssize_t efa_rdm_cq_readfrom(struct fid_cq *cq_fid, void *buf, size_t coun
854880
return ret;
855881
}
856882

883+
static void efa_rdm_cq_progress(struct util_cq *cq);
884+
885+
/**
886+
* @brief Blocking CQ read with source address
887+
*
888+
* @param[in] cq_fid CQ file descriptor
889+
* @param[out] buf Buffer for completion entries
890+
* @param[in] count Maximum entries to read
891+
* @param[out] src_addr Source addresses (optional)
892+
* @param[in] cond Wait condition (threshold)
893+
* @param[in] timeout Timeout in milliseconds
894+
* @return Number of entries read or negative error code
895+
*/
896+
static ssize_t efa_rdm_cq_sreadfrom(struct fid_cq *cq_fid, void *buf, size_t count,
897+
fi_addr_t *src_addr, const void *cond, int timeout)
898+
{
899+
struct efa_cq *efa_cq = container_of(cq_fid, struct efa_cq, util_cq.cq_fid);
900+
struct util_cq *util_cq = &efa_cq->util_cq;
901+
const uint64_t endtime = ofi_timeout_time(timeout);
902+
ssize_t ret;
903+
size_t num_completions = 0, threshold = 1;
904+
int wait_time = 1;
905+
uint8_t *buffer = buf;
906+
907+
/* fi_cq_sread[from] calls are invalid w/ FI_WAIT_NONE */
908+
if (OFI_UNLIKELY(efa_cq->wait_obj == FI_WAIT_NONE))
909+
return -FI_EINVAL;
910+
911+
if (OFI_UNLIKELY(!count))
912+
return -FI_EINVAL;
913+
914+
/* Require wait object for blocking operations */
915+
if (OFI_UNLIKELY(!util_cq->wait))
916+
return -FI_ENOSYS;
917+
918+
/* Handle threshold condition */
919+
if (OFI_UNLIKELY(efa_cq->wait_cond == FI_CQ_COND_THRESHOLD && cond))
920+
threshold = MAX(MIN(*(const size_t *) cond, count), 1);
921+
922+
while (num_completions < threshold) {
923+
/* Try non-blocking read first */
924+
ret = efa_rdm_cq_readfrom(cq_fid, buffer, count - num_completions,
925+
src_addr ? src_addr + num_completions : NULL);
926+
927+
if (OFI_LIKELY(ret > 0)) {
928+
num_completions += ret;
929+
buffer += (ptrdiff_t) ret * efa_cq->entry_size;
930+
wait_time = 1;
931+
continue;
932+
}
933+
934+
if (OFI_UNLIKELY(ret != 0 && ret != -FI_EAGAIN))
935+
return num_completions ? (ssize_t) num_completions : ret;
936+
937+
/* Handle timeout */
938+
if (OFI_UNLIKELY(ofi_adjust_timeout(endtime, &timeout)))
939+
return num_completions ? (ssize_t) num_completions : -FI_EAGAIN;
940+
941+
/* Check for wakeup signal */
942+
if (OFI_UNLIKELY(ofi_atomic_get32(&util_cq->wakeup))) {
943+
ofi_atomic_set32(&util_cq->wakeup, 0);
944+
return num_completions ? (ssize_t) num_completions : -FI_EAGAIN;
945+
}
946+
947+
/* Progress then wait */
948+
efa_rdm_cq_progress(util_cq);
949+
950+
/* Check if progress produced completions before blocking */
951+
if (OFI_LIKELY(!ofi_cirque_isempty(util_cq->cirq)))
952+
continue;
953+
954+
if (efa_cq->wait_obj == FI_WAIT_FD) {
955+
ofi_wait(&util_cq->wait->wait_fid, timeout);
956+
} else {
957+
/* Wait with exponential backoff, capped at remaining timeout */
958+
ofi_wait(&util_cq->wait->wait_fid, timeout >= 0 ? MIN(wait_time, timeout) : wait_time);
959+
wait_time *= 2;
960+
}
961+
}
962+
963+
return (ssize_t) num_completions;
964+
}
965+
966+
/**
967+
* @brief Blocking CQ read for EFA RDM
968+
*
969+
* @param[in] cq_fid CQ file descriptor
970+
* @param[out] buf Buffer for completion entries
971+
* @param[in] count Maximum entries to read
972+
* @param[in] cond Wait condition (unused)
973+
* @param[in] timeout Timeout in milliseconds
974+
* @return Number of entries read or negative error code
975+
*/
976+
static ssize_t efa_rdm_cq_sread(struct fid_cq *cq_fid, void *buf, size_t count,
977+
const void *cond, int timeout)
978+
{
979+
return efa_rdm_cq_sreadfrom(cq_fid, buf, count, NULL, cond, timeout);
980+
}
981+
857982
static struct fi_ops_cq efa_rdm_cq_ops = {
858983
.size = sizeof(struct fi_ops_cq),
859984
.read = ofi_cq_read,
860985
.readfrom = efa_rdm_cq_readfrom,
861986
.readerr = ofi_cq_readerr,
862-
.sread = fi_no_cq_sread,
863-
.sreadfrom = fi_no_cq_sreadfrom,
864-
.signal = fi_no_cq_signal,
987+
.sread = efa_rdm_cq_sread,
988+
.sreadfrom = efa_rdm_cq_sreadfrom,
989+
.signal = ofi_cq_signal,
865990
.strerror = efa_rdm_cq_strerror,
866991
};
867992

@@ -903,6 +1028,61 @@ static void efa_rdm_cq_progress(struct util_cq *cq)
9031028
ofi_genlock_unlock(&cq->ep_list_lock);
9041029
}
9051030

1031+
/**
1032+
* @brief Try to wait on RDM CQ - check if ready for blocking wait
1033+
*
1034+
* Called only when util_cq->wait exists
1035+
*
1036+
* @param[in] cq RDM CQ to check
1037+
* @return FI_SUCCESS if ready to wait, -FI_EAGAIN if completions available
1038+
*/
1039+
int efa_rdm_cq_trywait(struct efa_rdm_cq *cq)
1040+
{
1041+
struct util_cq *util_cq = &cq->efa_cq.util_cq;
1042+
1043+
/* Drive progress */
1044+
fi_cq_read(&util_cq->cq_fid, NULL, 0);
1045+
1046+
/* Check if completions available */
1047+
if (!ofi_cirque_isempty(util_cq->cirq))
1048+
return -FI_EAGAIN;
1049+
1050+
/* Use wait object's trywait (guaranteed to exist by caller) */
1051+
return util_cq->wait->wait_try(util_cq->wait);
1052+
}
1053+
1054+
/**
1055+
* @brief Verify CQ wait object attributes
1056+
*
1057+
* @param[in] attr CQ attributes to verify
1058+
* @return FI_SUCCESS if valid, -FI_EINVAL if unsupported
1059+
*/
1060+
static int efa_rdm_cq_verify_wait_attr(const struct fi_cq_attr *attr)
1061+
{
1062+
switch (attr->wait_obj) {
1063+
case FI_WAIT_NONE:
1064+
case FI_WAIT_UNSPEC:
1065+
case FI_WAIT_FD:
1066+
break;
1067+
default:
1068+
EFA_WARN(FI_LOG_CQ, "Unsupported wait object: %s\n",
1069+
fi_tostr(&attr->wait_obj, FI_TYPE_WAIT_OBJ));
1070+
return -FI_EINVAL;
1071+
}
1072+
1073+
switch (attr->wait_cond) {
1074+
case FI_CQ_COND_NONE:
1075+
case FI_CQ_COND_THRESHOLD:
1076+
break;
1077+
default:
1078+
EFA_WARN(FI_LOG_CQ, "Unsupported wait condition: %s\n",
1079+
fi_tostr(&attr->wait_cond, FI_TYPE_CQ_WAIT_COND));
1080+
return -FI_EINVAL;
1081+
}
1082+
1083+
return FI_SUCCESS;
1084+
}
1085+
9061086
/**
9071087
* @brief create a CQ for EFA RDM provider
9081088
*
@@ -926,8 +1106,9 @@ int efa_rdm_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr,
9261106
struct fi_peer_cq_context peer_cq_context = {0};
9271107
struct fi_efa_cq_init_attr efa_cq_init_attr = {0};
9281108

929-
if (attr->wait_obj != FI_WAIT_NONE)
930-
return -FI_ENOSYS;
1109+
ret = efa_rdm_cq_verify_wait_attr(attr);
1110+
if (ret)
1111+
return ret;
9311112

9321113
cq = calloc(1, sizeof(*cq));
9331114
if (!cq)
@@ -940,12 +1121,16 @@ int efa_rdm_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr,
9401121

9411122
dlist_init(&cq->ibv_cq_poll_list);
9421123
cq->need_to_scan_ep_list = false;
1124+
9431125
ret = ofi_cq_init(&efa_prov, domain, attr, &cq->efa_cq.util_cq,
9441126
&efa_rdm_cq_progress, context);
9451127

9461128
if (ret)
9471129
goto free;
9481130

1131+
cq->efa_cq.wait_obj = attr->wait_obj;
1132+
cq->efa_cq.wait_cond = attr->wait_cond;
1133+
9491134
ret = efa_cq_open_ibv_cq(
9501135
attr, efa_domain->device->ibv_ctx, &cq->efa_cq.ibv_cq,
9511136
&efa_cq_init_attr);
@@ -959,6 +1144,7 @@ int efa_rdm_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr,
9591144
/* open shm cq as peer cq */
9601145
if (efa_domain->shm_domain) {
9611146
memcpy(&shm_cq_attr, attr, sizeof(*attr));
1147+
shm_cq_attr.wait_obj = FI_WAIT_NONE;
9621148
/* Bind ep with shm provider's cq */
9631149
shm_cq_attr.flags |= FI_PEER;
9641150
peer_cq_context.size = sizeof(peer_cq_context);

prov/efa/src/rdm/efa_rdm_cq.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ struct efa_rdm_cq {
1717
int efa_rdm_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr,
1818
struct fid_cq **cq_fid, void *context);
1919

20+
int efa_rdm_cq_trywait(struct efa_rdm_cq *cq);
21+
2022
void efa_rdm_cq_poll_ibv_cq_closing_ep(struct efa_ibv_cq *ibv_cq, struct efa_rdm_ep *closing_ep);
2123
int efa_rdm_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq);
2224

prov/efa/src/rdm/efa_rdm_ope.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -920,6 +920,9 @@ void efa_rdm_rxe_report_completion(struct efa_rdm_ope *rxe)
920920
return;
921921
}
922922

923+
if (rx_cq->wait)
924+
rx_cq->wait->signal(rx_cq->wait);
925+
923926
rxe->fi_flags |= EFA_RDM_TXE_NO_COMPLETION;
924927
}
925928

@@ -1020,6 +1023,9 @@ void efa_rdm_txe_report_completion(struct efa_rdm_ope *txe)
10201023
efa_rdm_txe_handle_error(txe, -ret, FI_EFA_ERR_WRITE_SEND_COMP);
10211024
return;
10221025
}
1026+
1027+
if (tx_cq->wait)
1028+
tx_cq->wait->signal(tx_cq->wait);
10231029
}
10241030

10251031
efa_cntr_report_tx_completion(&txe->ep->base_ep.util_ep, txe->cq_entry.flags);

0 commit comments

Comments
 (0)