diff --git a/include/rdma/fi_ext.h b/include/rdma/fi_ext.h index 9932ba0cb93..b44b53ae5fc 100644 --- a/include/rdma/fi_ext.h +++ b/include/rdma/fi_ext.h @@ -77,6 +77,7 @@ enum { FI_OPT_EFA_SENDRECV_IN_ORDER_ALIGNED_128_BYTES, /* bool */ FI_OPT_EFA_WRITE_IN_ORDER_ALIGNED_128_BYTES, /* bool */ FI_OPT_EFA_HOMOGENEOUS_PEERS, /* bool */ + FI_OPT_EFA_CQ_FLOW_CONTROL, /* bool */ }; struct fi_fid_export { diff --git a/prov/efa/src/efa_base_ep.c b/prov/efa/src/efa_base_ep.c index 9e8855e1ca6..5343b1d2df7 100644 --- a/prov/efa/src/efa_base_ep.c +++ b/prov/efa/src/efa_base_ep.c @@ -341,8 +341,17 @@ static int efa_base_ep_create_qp(struct efa_base_ep *base_ep, assert(tx_cq->unsolicited_write_recv_enabled == rx_cq->unsolicited_write_recv_enabled); - /* If user intend to post rx buffer for cq data, we shouldn't enable unsolicited write recv */ - use_unsolicited_write_recv = tx_cq->unsolicited_write_recv_enabled && !(base_ep->info->mode & FI_RX_CQ_DATA); + if (EFA_INFO_TYPE_IS_DIRECT(base_ep->info)) { + /* If user intend to post rx buffer for cq data, we shouldn't + * enable unsolicited write recv */ + use_unsolicited_write_recv = + tx_cq->unsolicited_write_recv_enabled && !(base_ep->info->mode & FI_RX_CQ_DATA); + } else { + /* RDM full protocol doesn't support FI_RX_CQ_DATA. + * Use FI_OPT_EFA_CQ_FLOW_CONTROL to disable unsolicited write recv. */ + use_unsolicited_write_recv = + tx_cq->unsolicited_write_recv_enabled && !base_ep->cq_flow_control; + } EFA_INFO(FI_LOG_EP_CTRL, "creating QP with unsolicited write recv status: %d\n", use_unsolicited_write_recv); ret = efa_qp_create(&base_ep->qp, &attr_ex, base_ep->info->tx_attr->tclass, use_unsolicited_write_recv); diff --git a/prov/efa/src/efa_base_ep.h b/prov/efa/src/efa_base_ep.h index 382bebf4dfb..3d0b3a3e8e4 100644 --- a/prov/efa/src/efa_base_ep.h +++ b/prov/efa/src/efa_base_ep.h @@ -93,6 +93,7 @@ struct efa_base_ep { /* Only used by RDM ep type */ struct efa_qp *user_recv_qp; /* Separate qp to receive pkts posted by users */ struct efa_recv_wr *user_recv_wr_vec; + bool cq_flow_control; /* reduce the likelihood of cq overflow */ }; int efa_base_ep_bind_av(struct efa_base_ep *base_ep, struct efa_av *av); diff --git a/prov/efa/src/efa_ep.c b/prov/efa/src/efa_ep.c index 65a5a2a5699..37d160fbce9 100644 --- a/prov/efa/src/efa_ep.c +++ b/prov/efa/src/efa_ep.c @@ -171,6 +171,17 @@ static int efa_ep_setopt(fid_t fid, int level, int optname, const void *optval, /* no op as efa direct ep will not handshake with peers */ case FI_OPT_EFA_HOMOGENEOUS_PEERS: break; + case FI_OPT_EFA_CQ_FLOW_CONTROL: + if (optlen != sizeof(bool)) + return -FI_EINVAL; + ep->cq_flow_control = *(bool *)optval; + if (!(ep->info->mode & FI_RX_CQ_DATA) && ep->cq_flow_control) { + EFA_WARN(FI_LOG_EP_CTRL, + "FI_RX_CQ_DATA is required when cq flow " + "control is enabled.\n"); + return -FI_EOPNOTSUPP; + } + break; default: EFA_INFO(FI_LOG_EP_CTRL, "Unknown / unsupported endpoint option\n"); return -FI_ENOPROTOOPT; diff --git a/prov/efa/src/rdm/efa_rdm_ep_fiops.c b/prov/efa/src/rdm/efa_rdm_ep_fiops.c index 303aaac11d6..761c7c018dc 100644 --- a/prov/efa/src/rdm/efa_rdm_ep_fiops.c +++ b/prov/efa/src/rdm/efa_rdm_ep_fiops.c @@ -1748,6 +1748,11 @@ static int efa_rdm_ep_setopt(fid_t fid, int level, int optname, return -FI_EINVAL; efa_rdm_ep->homogeneous_peers = *(bool *)optval; break; + case FI_OPT_EFA_CQ_FLOW_CONTROL: + if (optlen != sizeof(bool)) + return -FI_EINVAL; + efa_rdm_ep->base_ep.cq_flow_control = *(bool *)optval; + break; default: EFA_INFO(FI_LOG_EP_CTRL, "Unknown endpoint option\n"); return -FI_ENOPROTOOPT; diff --git a/prov/efa/src/rdm/efa_rdm_ep_utils.c b/prov/efa/src/rdm/efa_rdm_ep_utils.c index 67d75a116ad..04eb25d2627 100644 --- a/prov/efa/src/rdm/efa_rdm_ep_utils.c +++ b/prov/efa/src/rdm/efa_rdm_ep_utils.c @@ -297,6 +297,18 @@ int efa_rdm_ep_post_user_recv_buf(struct efa_rdm_ep *ep, struct efa_rdm_ope *rxe assert(rxe->iov_count > 0 && rxe->iov_count <= ep->base_ep.info->rx_attr->iov_limit); assert(rxe->iov[0].iov_len >= ep->msg_prefix_size); + + if (ep->user_rx_pkts_posted >= ep->efa_max_outstanding_rx_ops) { + EFA_WARN(FI_LOG_EP_CTRL, + "RX CQ is full. Increase the CQ size or poll the cq " + "before posting more recv." + "user_rx_pkts_posted=%zu, " + "efa_max_outstanding_rx_ops=%zu\n", + ep->user_rx_pkts_posted, + ep->efa_max_outstanding_rx_ops); + return -FI_EAGAIN; + } + pkt_entry = efa_rdm_pke_alloc(ep, ep->user_rx_pkt_pool, EFA_RDM_PKE_FROM_USER_RX_POOL); if (OFI_UNLIKELY(!pkt_entry)) return -FI_EAGAIN; @@ -858,7 +870,17 @@ int efa_rdm_ep_bulk_post_internal_rx_pkts(struct efa_rdm_ep *ep) if (ep->efa_rx_pkts_to_post < MIN(efa_env.internal_rx_refill_threshold, efa_rdm_ep_get_rx_pool_size(ep))) return 0; - assert(ep->efa_rx_pkts_to_post + ep->efa_rx_pkts_posted <= ep->efa_max_outstanding_rx_ops); + if (ep->efa_rx_pkts_to_post + ep->efa_rx_pkts_posted > ep->efa_max_outstanding_rx_ops) { + EFA_WARN(FI_LOG_EP_CTRL, + "RX CQ is full. Increase the CQ size or poll the cq " + "before posting more recv. efa_rx_pkts_to_post: %zu, " + "efa_rx_pkts_posted: %zu, efa_max_outstanding_rx_ops: " + "%zu\n", + ep->efa_rx_pkts_to_post, ep->efa_rx_pkts_posted, + ep->efa_max_outstanding_rx_ops); + return -FI_EAGAIN; + } + for (i = 0; i < ep->efa_rx_pkts_to_post; ++i) { ep->pke_vec[i] = efa_rdm_pke_alloc(ep, ep->efa_rx_pkt_pool, EFA_RDM_PKE_FROM_EFA_RX_POOL); @@ -1026,8 +1048,12 @@ void efa_rdm_ep_post_internal_rx_pkts(struct efa_rdm_ep *ep) assert(ep->efa_rx_pkts_to_post + ep->efa_rx_pkts_posted + ep->efa_rx_pkts_held == efa_rdm_ep_get_rx_pool_size(ep)); err = efa_rdm_ep_bulk_post_internal_rx_pkts(ep); - if (err) - goto err_exit; + if (err) { + if (err != -FI_EAGAIN) { + goto err_exit; + } + return; + } return; diff --git a/prov/efa/src/rdm/efa_rdm_msg.c b/prov/efa/src/rdm/efa_rdm_msg.c index d2e67a2fcbe..ec24a6a00bf 100644 --- a/prov/efa/src/rdm/efa_rdm_msg.c +++ b/prov/efa/src/rdm/efa_rdm_msg.c @@ -937,6 +937,17 @@ ssize_t efa_rdm_msg_generic_recv(struct efa_rdm_ep *ep, const struct fi_msg *msg efa_rdm_tracepoint(recv_begin_msg_context, (size_t) msg->context, (size_t) msg->addr); + if (ep->efa_rx_pkts_to_post + ep->efa_rx_pkts_posted > ep->efa_max_outstanding_rx_ops) { + EFA_WARN(FI_LOG_EP_DATA, + "RX CQ is full. Increase the CQ size or poll the cq " + "before posting more recv. efa_rx_pkts_to_post: %zu, " + "efa_rx_pkts_posted: %zu, efa_max_outstanding_rx_ops: " + "%zu\n", + ep->efa_rx_pkts_to_post, ep->efa_rx_pkts_posted, + ep->efa_max_outstanding_rx_ops); + return -FI_EAGAIN; + } + ret = efa_rdm_attempt_to_sync_memops_iov(ep, (struct iovec *)msg->msg_iov, msg->desc, msg->iov_count); if (ret) return ret; diff --git a/prov/efa/test/efa_unit_test_ep.c b/prov/efa/test/efa_unit_test_ep.c index 6fc65ae02d9..46bc7acf42e 100644 --- a/prov/efa/test/efa_unit_test_ep.c +++ b/prov/efa/test/efa_unit_test_ep.c @@ -1877,3 +1877,69 @@ void test_efa_base_ep_disable_unsolicited_write_recv_with_rx_cq_data(struct efa_ /* When FI_RX_CQ_DATA is set, unsolicited write recv should be disabled */ assert_false(efa_base_ep->qp->unsolicited_write_recv_enabled); } + +/** + * @brief Test that unsolicited write recv is disabled when FI_OPT_EFA_CQ_FLOW_CONTROL is set + */ +void test_efa_rdm_ep_setopt_cq_flow_control(struct efa_resource **state) +{ + struct efa_resource *resource = *state; + struct efa_rdm_ep *ep; + bool optval = true; + + efa_unit_test_resource_construct_ep_not_enabled(resource, FI_EP_RDM, EFA_FABRIC_NAME); + + ep = container_of(resource->ep, struct efa_rdm_ep, + base_ep.util_ep.ep_fid); + assert_false(ep->base_ep.cq_flow_control); + assert_int_equal(fi_setopt(&resource->ep->fid, FI_OPT_ENDPOINT, + FI_OPT_EFA_CQ_FLOW_CONTROL, &optval, + sizeof(optval)), + FI_SUCCESS); + assert_true(ep->base_ep.cq_flow_control); + assert_int_equal(fi_enable(resource->ep), 0); + assert_false(ep->base_ep.qp->unsolicited_write_recv_enabled); +} + +/** + * @brief Test setting FI_OPT_EFA_CQ_FLOW_CONTROL will fail without FI_RX_CQ_DATA in efa direct + */ +void test_efa_direct_ep_setopt_cq_flow_control_no_rx_cq_data(struct efa_resource **state) +{ + struct efa_resource *resource = *state; + bool optval = true; + + efa_unit_test_resource_construct_ep_not_enabled(resource, FI_EP_RDM, EFA_DIRECT_FABRIC_NAME); + assert_int_equal(fi_setopt(&resource->ep->fid, FI_OPT_ENDPOINT, + FI_OPT_EFA_CQ_FLOW_CONTROL, &optval, + sizeof(optval)), + -FI_EOPNOTSUPP); +} + +/** + * @brief Test setting FI_OPT_EFA_CQ_FLOW_CONTROL with FI_RX_CQ_DATA will disable unsolicited write recv + */ +void test_efa_direct_ep_setopt_cq_flow_control_with_rx_cq_data(struct efa_resource **state) +{ + struct efa_resource *resource = *state; + struct efa_base_ep *efa_base_ep; + bool optval = true; + + resource->hints = efa_unit_test_alloc_hints(FI_EP_RDM, EFA_DIRECT_FABRIC_NAME); + assert_non_null(resource->hints); + + resource->hints->mode |= FI_RX_CQ_DATA; + + efa_unit_test_resource_construct_with_hints(resource, FI_EP_RDM, FI_VERSION(1, 18), + resource->hints, false, true); + + efa_base_ep = container_of(resource->ep, struct efa_base_ep, util_ep.ep_fid); + assert_false(efa_base_ep->cq_flow_control); + assert_int_equal(fi_setopt(&resource->ep->fid, FI_OPT_ENDPOINT, + FI_OPT_EFA_CQ_FLOW_CONTROL, &optval, + sizeof(optval)), + FI_SUCCESS); + assert_true(efa_base_ep->cq_flow_control); + assert_int_equal(fi_enable(resource->ep), 0); + assert_false(efa_base_ep->qp->unsolicited_write_recv_enabled); +} diff --git a/prov/efa/test/efa_unit_tests.c b/prov/efa/test/efa_unit_tests.c index 5135d77092e..0cc4ab10a5a 100644 --- a/prov/efa/test/efa_unit_tests.c +++ b/prov/efa/test/efa_unit_tests.c @@ -198,6 +198,9 @@ int main(void) cmocka_unit_test_setup_teardown(test_efa_ep_lock_type_mutex, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_rdm_ep_shm_ep_different_info, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_base_ep_disable_unsolicited_write_recv_with_rx_cq_data, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), + cmocka_unit_test_setup_teardown(test_efa_rdm_ep_setopt_cq_flow_control, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), + cmocka_unit_test_setup_teardown(test_efa_direct_ep_setopt_cq_flow_control_no_rx_cq_data, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), + cmocka_unit_test_setup_teardown(test_efa_direct_ep_setopt_cq_flow_control_with_rx_cq_data, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), /* end efa_unit_test_ep.c */ /* begin efa_unit_test_cq.c */ diff --git a/prov/efa/test/efa_unit_tests.h b/prov/efa/test/efa_unit_tests.h index 50b52249578..e117718483f 100644 --- a/prov/efa/test/efa_unit_tests.h +++ b/prov/efa/test/efa_unit_tests.h @@ -409,6 +409,9 @@ void test_efa_ep_lock_type_no_op(); void test_efa_ep_lock_type_mutex(); void test_efa_rdm_ep_shm_ep_different_info(); void test_efa_base_ep_disable_unsolicited_write_recv_with_rx_cq_data(); +void test_efa_rdm_ep_setopt_cq_flow_control(); +void test_efa_direct_ep_setopt_cq_flow_control_no_rx_cq_data(); +void test_efa_direct_ep_setopt_cq_flow_control_with_rx_cq_data(); /* begin efa_unit_test_data_path_direct.c */ void test_efa_data_path_direct_rdma_read_multiple_sge_fail();