-
Notifications
You must be signed in to change notification settings - Fork 456
prov/efa: Add support for blocking CQ read operations for RDM #11568
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
54211fb
8d02d10
d638fb5
b7bf24d
79cc073
8b8e568
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,6 +38,7 @@ | |
| #include <string.h> | ||
| #include <unistd.h> | ||
| #include <sched.h> | ||
| #include <stdbool.h> | ||
| #include <sys/wait.h> | ||
| #include <sys/types.h> | ||
| #include <sys/socket.h> | ||
|
|
@@ -78,6 +79,8 @@ struct ft_context *tx_ctx_arr = NULL, *rx_ctx_arr = NULL; | |
| uint64_t remote_cq_data = 0; | ||
|
|
||
| uint64_t tx_seq, rx_seq, tx_cq_cntr, rx_cq_cntr; | ||
|
|
||
| static bool ft_fdwait_need_progress; | ||
| int (*ft_mr_alloc_func)(void); | ||
| uint64_t ft_tag = 0; | ||
| int ft_parent_proc = 0; | ||
|
|
@@ -108,6 +111,8 @@ struct timespec start, end; | |
| int listen_sock = -1; | ||
| int sock = -1; | ||
| int oob_sock = -1; | ||
| #define FT_FDWAIT_PROGRESS_INTERVAL_MS 10 | ||
|
|
||
| bool allow_rx_cq_data = true; | ||
|
|
||
| struct fi_av_attr av_attr = { | ||
|
|
@@ -768,6 +773,16 @@ int ft_open_fabric_res(void) | |
| return ft_open_domain_res(); | ||
| } | ||
|
|
||
| static inline void ft_update_fdwait_progress_requirement(struct fi_info *info) | ||
| { | ||
| bool manual_progress = info && info->domain_attr && | ||
| (info->domain_attr->data_progress == FI_PROGRESS_MANUAL || | ||
| info->domain_attr->control_progress == FI_PROGRESS_MANUAL); | ||
|
|
||
| ft_fdwait_need_progress = manual_progress && | ||
| (opts.comp_method == FT_COMP_WAIT_FD); | ||
| } | ||
|
|
||
| int ft_alloc_ep_res(struct fi_info *fi, struct fid_cq **new_txcq, | ||
| struct fid_cq **new_rxcq, struct fid_cntr **new_txcntr, | ||
| struct fid_cntr **new_rxcntr, | ||
|
|
@@ -776,6 +791,8 @@ int ft_alloc_ep_res(struct fi_info *fi, struct fid_cq **new_txcq, | |
| { | ||
| int ret; | ||
|
|
||
| ft_update_fdwait_progress_requirement(fi); | ||
|
|
||
| if (cq_attr.format == FI_CQ_FORMAT_UNSPEC) { | ||
| if (fi->caps & FI_TAGGED) | ||
| cq_attr.format = FI_CQ_FORMAT_TAGGED; | ||
|
|
@@ -1883,6 +1900,20 @@ static void ft_cleanup_mr_array(struct ft_context *ctx_arr, char **mr_bufs) | |
| } | ||
| } | ||
|
|
||
| static inline int ft_fdwait_poll_timeout(int remaining) | ||
| { | ||
| int interval; | ||
|
|
||
| if (!ft_fdwait_need_progress) | ||
| return remaining; | ||
|
|
||
| if (remaining < 0) | ||
| return FT_FDWAIT_PROGRESS_INTERVAL_MS; | ||
|
|
||
| interval = MIN(remaining, FT_FDWAIT_PROGRESS_INTERVAL_MS); | ||
| return interval > 0 ? interval : FT_FDWAIT_PROGRESS_INTERVAL_MS; | ||
| } | ||
|
|
||
| void ft_close_fids(void) | ||
| { | ||
| FT_CLOSE_FID(mc); | ||
|
|
@@ -2749,6 +2780,7 @@ static int ft_wait_for_comp(struct fid_cq *cq, uint64_t *cur, | |
|
|
||
| while (total - *cur > 0) { | ||
| ret = fi_cq_sread(cq, &comp, 1, NULL, timeout); | ||
|
|
||
| if (ret > 0) { | ||
| if (!ft_tag_is_valid(cq, &comp, tag ? tag : rx_cq_cntr)) | ||
| return -FI_EOTHER; | ||
|
|
@@ -2770,17 +2802,36 @@ static int ft_fdwait_for_comp(struct fid_cq *cq, uint64_t *cur, | |
| { | ||
| struct fi_cq_err_entry comp; | ||
| struct fid *fids[1]; | ||
| int fd, ret; | ||
| int fd, ret = 0; | ||
| int remaining = timeout; | ||
|
|
||
| fd = cq == txcq ? tx_fd : rx_fd; | ||
| fids[0] = &cq->fid; | ||
|
|
||
| while (total - *cur > 0) { | ||
| ret = fi_trywait(fabric, fids, 1); | ||
| if (ret == FI_SUCCESS) { | ||
| ret = ft_poll_fd(fd, timeout); | ||
| int wait_timeout = ft_fdwait_need_progress | ||
| ? ft_fdwait_poll_timeout(remaining) | ||
| : remaining; | ||
|
Comment on lines
+2814
to
+2816
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the same condition is checked inside |
||
|
|
||
| if (ft_fdwait_need_progress) | ||
| (void) fi_cq_read(cq, NULL, 0); | ||
|
|
||
| ret = ft_poll_fd(fd, wait_timeout); | ||
| if (ret && ret != -FI_EAGAIN) | ||
| return ret; | ||
|
|
||
| if (ret == -FI_EAGAIN) { | ||
| if (remaining >= 0) { | ||
| remaining -= wait_timeout; | ||
| if (remaining <= 0) | ||
| return -FI_EAGAIN; | ||
| } | ||
| continue; | ||
| } | ||
| } else if (ret && ret != -FI_EAGAIN) { | ||
| return ret; | ||
| } | ||
|
|
||
| ret = fi_cq_read(cq, &comp, 1); | ||
|
|
@@ -2791,9 +2842,10 @@ static int ft_fdwait_for_comp(struct fid_cq *cq, uint64_t *cur, | |
| } else if (ret < 0 && ret != -FI_EAGAIN) { | ||
| return ret; | ||
| } | ||
| ret = 0; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This becomes unnecessary as well. |
||
| } | ||
|
|
||
| return 0; | ||
| return ret; | ||
|
Comment on lines
-2796
to
+2848
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is no error path reaching here, can just return 0. |
||
| } | ||
|
|
||
| int ft_read_cq(struct fid_cq *cq, uint64_t *cur, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,7 @@ | |
|
|
||
| #include "efa_cq.h" | ||
| #include "efa_prov_info.h" | ||
| #include "rdm/efa_rdm_cq.h" | ||
| #ifdef EFA_PERF_ENABLED | ||
| const char *efa_perf_counters_str[] = { | ||
| EFA_PERF_FOREACH(OFI_STR) | ||
|
|
@@ -63,16 +64,28 @@ static int efa_fabric_close(fid_t fid) | |
| static int efa_trywait(struct fid_fabric *fabric, struct fid **fids, int count) | ||
| { | ||
| struct efa_cq *efa_cq; | ||
| struct util_cq *util_cq; | ||
| struct efa_rdm_cq *rdm_cq; | ||
| struct util_wait *wait; | ||
| int ret, i; | ||
|
|
||
| for (i = 0; i < count; i++) { | ||
| if (fids[i]->fclass == FI_CLASS_CQ) { | ||
| /* Use EFA-specific CQ trywait */ | ||
| efa_cq = container_of(fids[i], struct efa_cq, util_cq.cq_fid.fid); | ||
| ret = efa_cq_trywait(efa_cq); | ||
| if (ret) | ||
| return ret; | ||
| util_cq = container_of(fids[i], struct util_cq, cq_fid.fid); | ||
|
|
||
| /* RDM CQs use util wait objects, not hardware CQ events */ | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what are the benefits of using util wait instead of efa device interrupt to implement wait? Is it because there can be too many device level completions that is only for 1 libfabric completion
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The major requirement of the blocking cq read is to avoid burning the CPU which was the busy loop that application will do when sread is not available. Are we confident this approach is meeting this requirement? cc @bwbarrett
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My understanding is we'd need SHM to expose some kind of wait object (e.g. Short of that, don't we need to periodically drive progress or risk missing completions? I figured the exponential backoff strategy with a minimum interval of 1ms was appropriate since it matches backoff strategies we employ elsewhere. The |
||
| if (util_cq->wait) { | ||
| rdm_cq = container_of(util_cq, struct efa_rdm_cq, efa_cq.util_cq); | ||
| ret = efa_rdm_cq_trywait(rdm_cq); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How will it work on single node when shm is on? AFAICT it is only calling efa_rdm_cq_progress which doesn't progress shm cq.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now this seems to be contradicting to your earlier change on fabtests that adding the progress in the waitfd code path, if we think that calling a progress function outside
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Are u saying fi_trywait for shm cq (no efa) will not progress shm completions, so you need that logic in fabtests? With your PR, fi_trywait for efa cq (+shm as peer) will progress both, right? |
||
| if (ret) | ||
| return ret; | ||
| } else { | ||
| /* Use EFA-specific CQ trywait for non-RDM CQs */ | ||
| efa_cq = container_of(fids[i], struct efa_cq, util_cq.cq_fid.fid); | ||
| ret = efa_cq_trywait(efa_cq); | ||
| if (ret) | ||
| return ret; | ||
| } | ||
| } else { | ||
| /* Use generic util trywait logic for non-CQ types */ | ||
| switch (fids[i]->fclass) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be simplified as:
And the
intervalvariable can be removed,