-
Notifications
You must be signed in to change notification settings - Fork 456
prov/efa: Add support for blocking CQ read operations for RDM #11568
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
54211fb
8d02d10
d638fb5
b7bf24d
79cc073
8b8e568
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,6 +38,7 @@ | |
| #include <string.h> | ||
| #include <unistd.h> | ||
| #include <sched.h> | ||
| #include <stdbool.h> | ||
| #include <sys/wait.h> | ||
| #include <sys/types.h> | ||
| #include <sys/socket.h> | ||
|
|
@@ -78,6 +79,8 @@ struct ft_context *tx_ctx_arr = NULL, *rx_ctx_arr = NULL; | |
| uint64_t remote_cq_data = 0; | ||
|
|
||
| uint64_t tx_seq, rx_seq, tx_cq_cntr, rx_cq_cntr; | ||
|
|
||
| static bool ft_fdwait_need_progress; | ||
| int (*ft_mr_alloc_func)(void); | ||
| uint64_t ft_tag = 0; | ||
| int ft_parent_proc = 0; | ||
|
|
@@ -108,6 +111,8 @@ struct timespec start, end; | |
| int listen_sock = -1; | ||
| int sock = -1; | ||
| int oob_sock = -1; | ||
| #define FT_FDWAIT_PROGRESS_INTERVAL_MS 10 | ||
|
|
||
| bool allow_rx_cq_data = true; | ||
|
|
||
| struct fi_av_attr av_attr = { | ||
|
|
@@ -768,6 +773,16 @@ int ft_open_fabric_res(void) | |
| return ft_open_domain_res(); | ||
| } | ||
|
|
||
| static inline void ft_update_fdwait_progress_requirement(struct fi_info *info) | ||
| { | ||
| bool manual_progress = info && info->domain_attr && | ||
| (info->domain_attr->data_progress == FI_PROGRESS_MANUAL || | ||
| info->domain_attr->control_progress == FI_PROGRESS_MANUAL); | ||
|
|
||
| ft_fdwait_need_progress = manual_progress && | ||
| (opts.comp_method == FT_COMP_WAIT_FD); | ||
| } | ||
|
|
||
| int ft_alloc_ep_res(struct fi_info *fi, struct fid_cq **new_txcq, | ||
| struct fid_cq **new_rxcq, struct fid_cntr **new_txcntr, | ||
| struct fid_cntr **new_rxcntr, | ||
|
|
@@ -776,6 +791,8 @@ int ft_alloc_ep_res(struct fi_info *fi, struct fid_cq **new_txcq, | |
| { | ||
| int ret; | ||
|
|
||
| ft_update_fdwait_progress_requirement(fi); | ||
|
|
||
| if (cq_attr.format == FI_CQ_FORMAT_UNSPEC) { | ||
| if (fi->caps & FI_TAGGED) | ||
| cq_attr.format = FI_CQ_FORMAT_TAGGED; | ||
|
|
@@ -1883,6 +1900,20 @@ static void ft_cleanup_mr_array(struct ft_context *ctx_arr, char **mr_bufs) | |
| } | ||
| } | ||
|
|
||
| static inline int ft_fdwait_poll_timeout(int remaining) | ||
| { | ||
| int interval; | ||
|
|
||
| if (!ft_fdwait_need_progress) | ||
| return remaining; | ||
|
|
||
| if (remaining < 0) | ||
| return FT_FDWAIT_PROGRESS_INTERVAL_MS; | ||
|
|
||
| interval = MIN(remaining, FT_FDWAIT_PROGRESS_INTERVAL_MS); | ||
| return interval > 0 ? interval : FT_FDWAIT_PROGRESS_INTERVAL_MS; | ||
| } | ||
|
|
||
| void ft_close_fids(void) | ||
| { | ||
| FT_CLOSE_FID(mc); | ||
|
|
@@ -2749,6 +2780,7 @@ static int ft_wait_for_comp(struct fid_cq *cq, uint64_t *cur, | |
|
|
||
| while (total - *cur > 0) { | ||
| ret = fi_cq_sread(cq, &comp, 1, NULL, timeout); | ||
|
|
||
| if (ret > 0) { | ||
| if (!ft_tag_is_valid(cq, &comp, tag ? tag : rx_cq_cntr)) | ||
| return -FI_EOTHER; | ||
|
|
@@ -2770,17 +2802,36 @@ static int ft_fdwait_for_comp(struct fid_cq *cq, uint64_t *cur, | |
| { | ||
| struct fi_cq_err_entry comp; | ||
| struct fid *fids[1]; | ||
| int fd, ret; | ||
| int fd, ret = 0; | ||
| int remaining = timeout; | ||
|
|
||
| fd = cq == txcq ? tx_fd : rx_fd; | ||
| fids[0] = &cq->fid; | ||
|
|
||
| while (total - *cur > 0) { | ||
| ret = fi_trywait(fabric, fids, 1); | ||
| if (ret == FI_SUCCESS) { | ||
| ret = ft_poll_fd(fd, timeout); | ||
| int wait_timeout = ft_fdwait_need_progress | ||
| ? ft_fdwait_poll_timeout(remaining) | ||
| : remaining; | ||
|
Comment on lines
+2814
to
+2816
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the same condition is checked inside |
||
|
|
||
| if (ft_fdwait_need_progress) | ||
| (void) fi_cq_read(cq, NULL, 0); | ||
|
|
||
| ret = ft_poll_fd(fd, wait_timeout); | ||
| if (ret && ret != -FI_EAGAIN) | ||
| return ret; | ||
|
|
||
| if (ret == -FI_EAGAIN) { | ||
| if (remaining >= 0) { | ||
| remaining -= wait_timeout; | ||
| if (remaining <= 0) | ||
| return -FI_EAGAIN; | ||
| } | ||
| continue; | ||
| } | ||
| } else if (ret && ret != -FI_EAGAIN) { | ||
| return ret; | ||
| } | ||
|
|
||
| ret = fi_cq_read(cq, &comp, 1); | ||
|
|
@@ -2791,9 +2842,10 @@ static int ft_fdwait_for_comp(struct fid_cq *cq, uint64_t *cur, | |
| } else if (ret < 0 && ret != -FI_EAGAIN) { | ||
| return ret; | ||
| } | ||
| ret = 0; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This becomes unnecessary as well. |
||
| } | ||
|
|
||
| return 0; | ||
| return ret; | ||
|
Comment on lines
-2796
to
+2848
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is no error path reaching here, can just return 0. |
||
| } | ||
|
|
||
| int ft_read_cq(struct fid_cq *cq, uint64_t *cur, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be simplified as:
And the
intervalvariable can be removed,