diff --git a/Makefile.am b/Makefile.am index 6760cb0..8dd1b9d 100644 --- a/Makefile.am +++ b/Makefile.am @@ -18,7 +18,6 @@ libarmci_la_SOURCES = src/buffer.c \ src/internals.c \ src/malloc.c \ src/gmr.c \ - src/gmr-extras.c \ src/message.c \ src/message_gop.c \ src/mutex.c \ diff --git a/README.md b/README.md index 7b8cce2..58c0e42 100644 --- a/README.md +++ b/README.md @@ -159,6 +159,11 @@ Boolean environment variables are enabled when set to a value beginning with Argument to `usleep()` to pause the progress polling loop. +`ARMCI_USE_REQUEST_ATOMICS` (boolean) + + Switch to request-based RMA (with Rget_accumulate) instead of + Fetch_and_op/Compare_and_swap plus a local flush. + ## Noncollective Groups `ARMCI_NONCOLLECTIVE_GROUPS` (boolean) diff --git a/src/armci.h b/src/armci.h index 9c0290e..7597cca 100644 --- a/src/armci.h +++ b/src/armci.h @@ -5,6 +5,9 @@ #ifndef _ARMCI_H_ #define _ARMCI_H_ +// TODO add to build system +#define USE_RMA_REQUESTS 1 + #include #define ARMCI_MPI 3 @@ -64,11 +67,16 @@ int ARMCI_PutS_flag(void *src_ptr, int src_stride_ar[/*stride_levels*/], int count[/*stride_levels+1*/], int stride_levels, int *flag, int value, int proc); - typedef struct armci_hdl_s { +#ifdef USE_RMA_REQUESTS + int batch_size; + MPI_Request single_request; // used when batch_size=0 (common case) + MPI_Request *request_array; // used when batch_size>0 +#else int target; /* we do not actually support individual completion */ int aggregate; +#endif } armci_hdl_t; diff --git a/src/armci_internals.h b/src/armci_internals.h index 405db46..b88e4c6 100644 --- a/src/armci_internals.h +++ b/src/armci_internals.h @@ -113,6 +113,7 @@ typedef struct { int rma_nocheck; /* Use MPI_MODE_NOCHECK on synchronization calls that take assertion */ int disable_shm_accumulate; /* Set the disable_shm_accumulate window info key to true */ int use_same_op; /* Set accumulate_ops=same_op window info key */ + int use_request_atomics; /* Use request-based RMA for atomic operations */ char rma_ordering[20]; /* Set accumulate_ordering= window info key */ size_t memory_limit; /* upper bound on how much memory ARMCI can allocate */ @@ -202,12 +203,12 @@ void ARMCII_Strided_to_dtype(int stride_array[/*stride_levels*/], int count[/*st int stride_levels, MPI_Datatype old_type, MPI_Datatype *new_type); int ARMCII_Iov_op_dispatch(enum ARMCII_Op_e op, void **src, void **dst, int count, int size, - int datatype, int overlapping, int same_alloc, int proc, int blocking); + int datatype, int overlapping, int same_alloc, int proc, int blocking, armci_hdl_t * handle); int ARMCII_Iov_op_batched(enum ARMCII_Op_e op, void **src, void **dst, int count, int elem_count, - MPI_Datatype type, int proc, int consrv /* if 1, batched = safe */, int blocking); + MPI_Datatype type, int proc, int consrv /* if 1, batched = safe */, int blocking, armci_hdl_t * handle); int ARMCII_Iov_op_datatype(enum ARMCII_Op_e op, void **src, void **dst, int count, int elem_count, - MPI_Datatype type, int proc, int blocking); + MPI_Datatype type, int proc, int blocking, armci_hdl_t * handle); armcii_iov_iter_t *ARMCII_Strided_to_iov_iter( void *src_ptr, int src_stride_ar[/*stride_levels*/], diff --git a/src/gmr-extras.c b/src/gmr-extras.c deleted file mode 100644 index d180b0a..0000000 --- a/src/gmr-extras.c +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Copyright (C) 2010. See COPYRIGHT in top-level directory. - */ - -#include -#include -#include -#include - -#include -#include -#include -#include -#include - -/** One-sided get-accumulate operation. Source and output buffer must be private. - * - * @param[in] mreg Memory region - * @param[in] src Source address (local) - * @param[in] out Result address (local) - * @param[in] dst Destination address (remote) - * @param[in] type MPI type of the given buffers - * @param[in] count Number of elements of the given type to transfer - * @param[in] op MPI_Op to apply at the destination - * @param[in] proc Absolute process id of the target - * @return 0 on success, non-zero on failure - */ -int gmr_get_accumulate(gmr_t *mreg, void *src, void *out, void *dst, int count, MPI_Datatype type, MPI_Op op, int proc) { - ARMCII_Assert_msg(src != NULL && out != NULL, "Invalid local address(es)"); - return gmr_get_accumulate_typed(mreg, src, count, type, out, count, type, dst, count, type, op, proc); -} - -/** One-sided get-accumulate operation with typed arguments. Source and output buffer must be private. - * - * @param[in] mreg Memory region - * @param[in] src Address of source data - * @param[in] src_count Number of elements of the given type at the source - * @param[in] src_type MPI datatype of the source elements - * @param[in] out Address of output buffer (same process as the source) - * @param[in] out_count Number of elements of the given type at the ouput - * @param[in] out_type MPI datatype of the output elements - * @param[in] dst Address of destination buffer - * @param[in] dst_count Number of elements of the given type at the destination - * @param[in] dst_type MPI datatype of the destination elements - * @param[in] size Number of bytes to transfer - * @param[in] op MPI_Op to apply at the destination - * @param[in] proc Absolute process id of target process - * @return 0 on success, non-zero on failure - */ -int gmr_get_accumulate_typed(gmr_t *mreg, void *src, int src_count, MPI_Datatype src_type, - void *out, int out_count, MPI_Datatype out_type, - void *dst, int dst_count, MPI_Datatype dst_type, MPI_Op op, int proc) { - - int grp_proc; - gmr_size_t disp; - MPI_Aint lb, extent; - - grp_proc = ARMCII_Translate_absolute_to_group(&mreg->group, proc); - ARMCII_Assert(grp_proc >= 0); - ARMCII_Assert_msg(mreg->window != MPI_WIN_NULL, "A non-null mreg contains a null window."); - - // Calculate displacement from beginning of the window - if (dst == MPI_BOTTOM) - disp = 0; - else - disp = (gmr_size_t) ((uint8_t*)dst - (uint8_t*)mreg->slices[proc].base); - - // Perform checks - MPI_Type_get_true_extent(dst_type, &lb, &extent); - ARMCII_Assert_msg(disp >= 0 && disp < mreg->slices[proc].size, "Invalid remote address"); - ARMCII_Assert_msg(disp + dst_count*extent <= mreg->slices[proc].size, "Transfer is out of range"); - - MPI_Get_accumulate(src, src_count, src_type, out, out_count, out_type, grp_proc, (MPI_Aint) disp, dst_count, dst_type, op, mreg->window); - - return 0; -} - -/** One-sided fetch-and-op. Source and output buffer must be private. - * - * @param[in] mreg Memory region - * @param[in] src Address of source data - * @param[in] out Address of output buffer (same process as the source) - * @param[in] dst Address of destination buffer - * @param[in] type MPI datatype of the source, output and destination elements - * @param[in] op MPI_Op to apply at the destination - * @param[in] proc Absolute process id of target process - * @return 0 on success, non-zero on failure - */ -int gmr_fetch_and_op(gmr_t *mreg, void *src, void *out, void *dst, - MPI_Datatype type, MPI_Op op, int proc) { - - int grp_proc; - gmr_size_t disp; - - grp_proc = ARMCII_Translate_absolute_to_group(&mreg->group, proc); - ARMCII_Assert(grp_proc >= 0); - ARMCII_Assert_msg(mreg->window != MPI_WIN_NULL, "A non-null mreg contains a null window."); - - /* built-in types only so no chance of seeing MPI_BOTTOM */ - disp = (gmr_size_t) ((uint8_t*)dst - (uint8_t*)mreg->slices[proc].base); - - // Perform checks - ARMCII_Assert_msg(disp >= 0 && disp < mreg->slices[proc].size, "Invalid remote address"); - ARMCII_Assert_msg(disp <= mreg->slices[proc].size, "Transfer is out of range"); - - MPI_Fetch_and_op(src, out, type, grp_proc, (MPI_Aint) disp, op, mreg->window); - - return 0; -} - -/** Lock a memory region at all targets so that one-sided operations can be performed. - * - * @param[in] mreg Memory region - * @return 0 on success, non-zero on failure - */ -int gmr_lockall(gmr_t *mreg) { - int grp_me = ARMCII_Translate_absolute_to_group(&mreg->group, ARMCI_GROUP_WORLD.rank); - - ARMCII_Assert(grp_me >= 0); - ARMCII_Assert_msg(mreg->window != MPI_WIN_NULL, "A non-null mreg contains a null window."); - - MPI_Win_lock_all((ARMCII_GLOBAL_STATE.rma_nocheck) ? MPI_MODE_NOCHECK : 0, - mreg->window); - - return 0; -} - -/** Unlock a memory region at all targets. - * - * @param[in] mreg Memory region - * @return 0 on success, non-zero on failure - */ -int gmr_unlockall(gmr_t *mreg) { - int grp_me = ARMCII_Translate_absolute_to_group(&mreg->group, ARMCI_GROUP_WORLD.rank); - - ARMCII_Assert(grp_me >= 0); - ARMCII_Assert_msg(mreg->window != MPI_WIN_NULL, "A non-null mreg contains a null window."); - - MPI_Win_unlock_all(mreg->window); - - return 0; -} - -/** Flush a memory region for local or remote completion. - * - * @param[in] mreg Memory region - * @param[in] proc Absolute process id of the target - * @param[in] local_only Only flush the operation locally. - * @return 0 on success, non-zero on failure - */ -int gmr_flush(gmr_t *mreg, int proc, int local_only) { - int grp_proc = ARMCII_Translate_absolute_to_group(&mreg->group, proc); - int grp_me = ARMCII_Translate_absolute_to_group(&mreg->group, ARMCI_GROUP_WORLD.rank); - - ARMCII_Assert(grp_proc >= 0 && grp_me >= 0); - ARMCII_Assert_msg(mreg->window != MPI_WIN_NULL, "A non-null mreg contains a null window."); - ARMCII_Assert_msg(grp_proc < mreg->group.size, "grp_proc exceeds group size!"); - - if (!local_only || ARMCII_GLOBAL_STATE.end_to_end_flush) { - MPI_Win_flush(grp_proc, mreg->window); - } else { - MPI_Win_flush_local(grp_proc, mreg->window); - } - - return 0; -} - -/** Flush a memory region for remote completion to all targets. - * - * @param[in] mreg Memory region - * @return 0 on success, non-zero on failure - */ -int gmr_flushall(gmr_t *mreg, int local_only) { - int grp_me = ARMCII_Translate_absolute_to_group(&mreg->group, ARMCI_GROUP_WORLD.rank); - - ARMCII_Assert(grp_me >= 0); - ARMCII_Assert_msg(mreg->window != MPI_WIN_NULL, "A non-null mreg contains a null window."); - - if (!local_only || ARMCII_GLOBAL_STATE.end_to_end_flush) { - MPI_Win_flush_all(mreg->window); - } else { - MPI_Win_flush_local_all(mreg->window); - } - - return 0; -} - -/** Sync memory region so that public and private windows are the same. - * - * @param[in] mreg Memory region - * @return 0 on success, non-zero on failure - */ -int gmr_sync(gmr_t *mreg) -{ -#if 0 - // what is the point of this? - int grp_me = ARMCII_Translate_absolute_to_group(&mreg->group, ARMCI_GROUP_WORLD.rank); - ARMCII_Assert(grp_me >= 0); -#endif - ARMCII_Assert_msg(mreg->window != MPI_WIN_NULL, "A non-null mreg contains a null window."); - - if (!(mreg->unified)) { - MPI_Win_sync(mreg->window); - } - - return 0; -} - -void gmr_progress(void) -{ - if (ARMCII_GLOBAL_STATE.explicit_nb_progress) { - int flag; - MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, ARMCI_GROUP_WORLD.comm, &flag, MPI_STATUS_IGNORE); - } - return; -} - diff --git a/src/gmr.c b/src/gmr.c index bdb2e8d..f18c71a 100644 --- a/src/gmr.c +++ b/src/gmr.c @@ -244,17 +244,17 @@ gmr_t *gmr_create(gmr_size_t local_size, void **base_ptrs, ARMCI_Group *group) { unified = false; } #else - int unified = ARMCII_Is_win_unified(mreg->window); - int print = ARMCII_GLOBAL_STATE.verbose; + const int unified = ARMCII_Is_win_unified(mreg->window); + const int print = ARMCII_GLOBAL_STATE.verbose; if (unified == 1) { mreg->unified = true; - if (print) printf("MPI_WIN_MODEL = MPI_WIN_UNIFIED\n"); + if (print > 1) printf("MPI_WIN_MODEL = MPI_WIN_UNIFIED\n"); } else if (unified == 0) { mreg->unified = false; - if (print) printf("MPI_WIN_MODEL = MPI_WIN_SEPARATE\n"); + if (print > 1) printf("MPI_WIN_MODEL = MPI_WIN_SEPARATE\n"); } else { mreg->unified = false; - if (print) printf("MPI_WIN_MODEL not available\n"); + if (print > 1) printf("MPI_WIN_MODEL not available\n"); } #endif if (!(mreg->unified) && (ARMCII_GLOBAL_STATE.shr_buf_method == ARMCII_SHR_BUF_NOGUARD) ) { @@ -453,9 +453,10 @@ gmr_t *gmr_lookup(void *ptr, int proc) { * @param[in] proc Absolute process id of target process * @return 0 on success, non-zero on failure */ -int gmr_put(gmr_t *mreg, void *src, void *dst, int size, int proc) { +int gmr_put(gmr_t *mreg, void *src, void *dst, int size, int proc, armci_hdl_t * handle) +{ ARMCII_Assert_msg(src != NULL, "Invalid local address"); - return gmr_put_typed(mreg, src, size, MPI_BYTE, dst, size, MPI_BYTE, proc); + return gmr_put_typed(mreg, src, size, MPI_BYTE, dst, size, MPI_BYTE, proc, handle); } @@ -473,8 +474,9 @@ int gmr_put(gmr_t *mreg, void *src, void *dst, int size, int proc) { * @return 0 on success, non-zero on failure */ int gmr_put_typed(gmr_t *mreg, void *src, int src_count, MPI_Datatype src_type, - void *dst, int dst_count, MPI_Datatype dst_type, int proc) { - + void *dst, int dst_count, MPI_Datatype dst_type, + int proc, armci_hdl_t * handle) +{ int grp_proc; gmr_size_t disp; MPI_Aint lb, extent; @@ -484,24 +486,59 @@ int gmr_put_typed(gmr_t *mreg, void *src, int src_count, MPI_Datatype src_type, ARMCII_Assert_msg(mreg->window != MPI_WIN_NULL, "A non-null mreg contains a null window."); // Calculate displacement from beginning of the window - if (dst == MPI_BOTTOM) + if (dst == MPI_BOTTOM) { disp = 0; - else + } else { disp = (gmr_size_t) ((uint8_t*)dst - (uint8_t*)mreg->slices[proc].base); + } // Perform checks MPI_Type_get_true_extent(dst_type, &lb, &extent); ARMCII_Assert_msg(disp >= 0 && disp < mreg->slices[proc].size, "Invalid remote address"); ARMCII_Assert_msg(disp + dst_count*extent <= mreg->slices[proc].size, "Transfer is out of range"); +#ifdef USE_RMA_REQUESTS + + if (handle!=NULL) { + + MPI_Request req = MPI_REQUEST_NULL; + + if (ARMCII_GLOBAL_STATE.rma_atomicity) { + MPI_Raccumulate(src, src_count, src_type, grp_proc, + (MPI_Aint) disp, dst_count, dst_type, + MPI_REPLACE, mreg->window, &req); + } else { + MPI_Rput(src, src_count, src_type, grp_proc, + (MPI_Aint) disp, dst_count, dst_type, + mreg->window, &req); + } + + gmr_handle_add_request(handle, req); + + return 0; + + } + +#endif + if (ARMCII_GLOBAL_STATE.rma_atomicity) { MPI_Accumulate(src, src_count, src_type, grp_proc, - (MPI_Aint) disp, dst_count, dst_type, MPI_REPLACE, mreg->window); + (MPI_Aint) disp, dst_count, dst_type, + MPI_REPLACE, mreg->window); } else { MPI_Put(src, src_count, src_type, grp_proc, (MPI_Aint) disp, dst_count, dst_type, mreg->window); } +#ifndef USE_RMA_REQUESTS + + if (handle!=NULL) { + /* Regular (not aggregate) handles merely store the target for future flushing. */ + handle->target = grp_proc; + } + +#endif + return 0; } @@ -515,9 +552,10 @@ int gmr_put_typed(gmr_t *mreg, void *src, int src_count, MPI_Datatype src_type, * @param[in] proc Absolute process id of target process * @return 0 on success, non-zero on failure */ -int gmr_get(gmr_t *mreg, void *src, void *dst, int size, int proc) { +int gmr_get(gmr_t *mreg, void *src, void *dst, int size, int proc, armci_hdl_t * handle) +{ ARMCII_Assert_msg(dst != NULL, "Invalid local address"); - return gmr_get_typed(mreg, src, size, MPI_BYTE, dst, size, MPI_BYTE, proc); + return gmr_get_typed(mreg, src, size, MPI_BYTE, dst, size, MPI_BYTE, proc, handle); } @@ -535,8 +573,9 @@ int gmr_get(gmr_t *mreg, void *src, void *dst, int size, int proc) { * @return 0 on success, non-zero on failure */ int gmr_get_typed(gmr_t *mreg, void *src, int src_count, MPI_Datatype src_type, - void *dst, int dst_count, MPI_Datatype dst_type, int proc) { - + void *dst, int dst_count, MPI_Datatype dst_type, + int proc, armci_hdl_t * handle) +{ int grp_proc; gmr_size_t disp; MPI_Aint lb, extent; @@ -546,16 +585,43 @@ int gmr_get_typed(gmr_t *mreg, void *src, int src_count, MPI_Datatype src_type, ARMCII_Assert_msg(mreg->window != MPI_WIN_NULL, "A non-null mreg contains a null window."); // Calculate displacement from beginning of the window - if (src == MPI_BOTTOM) + if (src == MPI_BOTTOM) { disp = 0; - else + } else { disp = (gmr_size_t) ((uint8_t*)src - (uint8_t*)mreg->slices[proc].base); + } // Perform checks MPI_Type_get_true_extent(src_type, &lb, &extent); ARMCII_Assert_msg(disp >= 0 && disp < mreg->slices[proc].size, "Invalid remote address"); ARMCII_Assert_msg(disp + src_count*extent <= mreg->slices[proc].size, "Transfer is out of range"); +#ifdef USE_RMA_REQUESTS + + if (handle!=NULL) { + + MPI_Request req = MPI_REQUEST_NULL; + + if (ARMCII_GLOBAL_STATE.rma_atomicity) { + // Using the source type instead of MPI_BYTE works around an MPICH bug that appears with + // Intel MPI 2021.10 and Cray MPI 8.1.29 + MPI_Rget_accumulate(NULL, 0, src_type /* MPI_BYTE */, + dst, dst_count, dst_type, grp_proc, + (MPI_Aint) disp, src_count, src_type, + MPI_NO_OP, mreg->window, &req); + } else { + MPI_Rget(dst, dst_count, dst_type, grp_proc, + (MPI_Aint) disp, src_count, src_type, + mreg->window, &req); + } + + gmr_handle_add_request(handle, req); + + return 0; + } + +#endif + if (ARMCII_GLOBAL_STATE.rma_atomicity) { MPI_Get_accumulate(NULL, 0, MPI_BYTE, dst, dst_count, dst_type, grp_proc, (MPI_Aint) disp, src_count, src_type, MPI_NO_OP, mreg->window); @@ -564,6 +630,15 @@ int gmr_get_typed(gmr_t *mreg, void *src, int src_count, MPI_Datatype src_type, (MPI_Aint) disp, src_count, src_type, mreg->window); } +#ifndef USE_RMA_REQUESTS + + if (handle!=NULL) { + /* Regular (not aggregate) handles merely store the target for future flushing. */ + handle->target = grp_proc; + } + +#endif + return 0; } @@ -578,9 +653,11 @@ int gmr_get_typed(gmr_t *mreg, void *src, int src_count, MPI_Datatype src_type, * @param[in] proc Absolute process id of the target * @return 0 on success, non-zero on failure */ -int gmr_accumulate(gmr_t *mreg, void *src, void *dst, int count, MPI_Datatype type, int proc) { +int gmr_accumulate(gmr_t *mreg, void *src, void *dst, int count, MPI_Datatype type, + int proc, armci_hdl_t * handle) +{ ARMCII_Assert_msg(src != NULL, "Invalid local address"); - return gmr_accumulate_typed(mreg, src, count, type, dst, count, type, proc); + return gmr_accumulate_typed(mreg, src, count, type, dst, count, type, proc, handle); } @@ -598,8 +675,9 @@ int gmr_accumulate(gmr_t *mreg, void *src, void *dst, int count, MPI_Datatype ty * @return 0 on success, non-zero on failure */ int gmr_accumulate_typed(gmr_t *mreg, void *src, int src_count, MPI_Datatype src_type, - void *dst, int dst_count, MPI_Datatype dst_type, int proc) { - + void *dst, int dst_count, MPI_Datatype dst_type, + int proc, armci_hdl_t * handle) +{ int grp_proc; gmr_size_t disp; MPI_Aint lb, extent; @@ -609,18 +687,350 @@ int gmr_accumulate_typed(gmr_t *mreg, void *src, int src_count, MPI_Datatype src ARMCII_Assert_msg(mreg->window != MPI_WIN_NULL, "A non-null mreg contains a null window."); // Calculate displacement from beginning of the window - if (dst == MPI_BOTTOM) + if (dst == MPI_BOTTOM) { disp = 0; - else + } else { disp = (gmr_size_t) ((uint8_t*)dst - (uint8_t*)mreg->slices[proc].base); + } // Perform checks MPI_Type_get_true_extent(dst_type, &lb, &extent); ARMCII_Assert_msg(disp >= 0 && disp < mreg->slices[proc].size, "Invalid remote address"); ARMCII_Assert_msg(disp + dst_count*extent <= mreg->slices[proc].size, "Transfer is out of range"); +#ifdef USE_RMA_REQUESTS + + if (handle!=NULL) { + + MPI_Request req = MPI_REQUEST_NULL; + + MPI_Raccumulate(src, src_count, src_type, grp_proc, + (MPI_Aint) disp, dst_count, dst_type, + MPI_SUM, mreg->window, &req); + + gmr_handle_add_request(handle, req); + + return 0; + + } + +#endif + MPI_Accumulate(src, src_count, src_type, grp_proc, (MPI_Aint) disp, dst_count, dst_type, MPI_SUM, mreg->window); +#ifndef USE_RMA_REQUESTS + + if (handle!=NULL) { + /* Regular (not aggregate) handles merely store the target for future flushing. */ + handle->target = grp_proc; + } + +#endif + return 0; } +/** One-sided get-accumulate operation. Source and output buffer must be private. + * + * @param[in] mreg Memory region + * @param[in] src Source address (local) + * @param[in] out Result address (local) + * @param[in] dst Destination address (remote) + * @param[in] type MPI type of the given buffers + * @param[in] count Number of elements of the given type to transfer + * @param[in] op MPI_Op to apply at the destination + * @param[in] proc Absolute process id of the target + * @return 0 on success, non-zero on failure + */ +int gmr_get_accumulate(gmr_t *mreg, void *src, void *out, void *dst, int count, + MPI_Datatype type, MPI_Op op, int proc, armci_hdl_t * handle) +{ + ARMCII_Assert_msg(src != NULL && out != NULL, "Invalid local address(es)"); + return gmr_get_accumulate_typed(mreg, src, count, type, out, count, type, dst, count, type, op, proc, handle); +} + +/** One-sided get-accumulate operation with typed arguments. Source and output buffer must be private. + * + * @param[in] mreg Memory region + * @param[in] src Address of source data + * @param[in] src_count Number of elements of the given type at the source + * @param[in] src_type MPI datatype of the source elements + * @param[in] out Address of output buffer (same process as the source) + * @param[in] out_count Number of elements of the given type at the ouput + * @param[in] out_type MPI datatype of the output elements + * @param[in] dst Address of destination buffer + * @param[in] dst_count Number of elements of the given type at the destination + * @param[in] dst_type MPI datatype of the destination elements + * @param[in] size Number of bytes to transfer + * @param[in] op MPI_Op to apply at the destination + * @param[in] proc Absolute process id of target process + * @return 0 on success, non-zero on failure + */ +int gmr_get_accumulate_typed(gmr_t *mreg, void *src, int src_count, MPI_Datatype src_type, + void *out, int out_count, MPI_Datatype out_type, + void *dst, int dst_count, MPI_Datatype dst_type, + MPI_Op op, int proc, armci_hdl_t * handle) +{ + int grp_proc; + gmr_size_t disp; + MPI_Aint lb, extent; + + grp_proc = ARMCII_Translate_absolute_to_group(&mreg->group, proc); + ARMCII_Assert(grp_proc >= 0); + ARMCII_Assert_msg(mreg->window != MPI_WIN_NULL, "A non-null mreg contains a null window."); + + // Calculate displacement from beginning of the window + if (dst == MPI_BOTTOM) { + disp = 0; + } else { + disp = (gmr_size_t) ((uint8_t*)dst - (uint8_t*)mreg->slices[proc].base); + } + + // Perform checks + MPI_Type_get_true_extent(dst_type, &lb, &extent); + ARMCII_Assert_msg(disp >= 0 && disp < mreg->slices[proc].size, "Invalid remote address"); + ARMCII_Assert_msg(disp + dst_count*extent <= mreg->slices[proc].size, "Transfer is out of range"); + +#ifdef USE_RMA_REQUESTS + + if (handle!=NULL) { + + MPI_Request req = MPI_REQUEST_NULL; + + MPI_Rget_accumulate(src, src_count, src_type, + out, out_count, out_type, + grp_proc, (MPI_Aint) disp, dst_count, dst_type, + op, mreg->window, &req); + + gmr_handle_add_request(handle, req); + + return 0; + + } + +#endif + + MPI_Get_accumulate(src, src_count, src_type, + out, out_count, out_type, + grp_proc, (MPI_Aint) disp, dst_count, dst_type, + op, mreg->window); + +#ifndef USE_RMA_REQUESTS + + if (handle!=NULL) { + /* Regular (not aggregate) handles merely store the target for future flushing. */ + handle->target = grp_proc; + } + +#endif + + return 0; +} + +/** One-sided fetch-and-op. Source and output buffer must be private. + * + * @param[in] mreg Memory region + * @param[in] src Address of source data + * @param[in] out Address of output buffer (same process as the source) + * @param[in] dst Address of destination buffer + * @param[in] type MPI datatype of the source, output and destination elements + * @param[in] op MPI_Op to apply at the destination + * @param[in] proc Absolute process id of target process + * @return 0 on success, non-zero on failure + */ +int gmr_fetch_and_op(gmr_t *mreg, void *src, void *out, void *dst, + MPI_Datatype type, MPI_Op op, int proc) +{ + int grp_proc; + gmr_size_t disp; + + grp_proc = ARMCII_Translate_absolute_to_group(&mreg->group, proc); + ARMCII_Assert(grp_proc >= 0); + ARMCII_Assert_msg(mreg->window != MPI_WIN_NULL, "A non-null mreg contains a null window."); + + /* built-in types only so no chance of seeing MPI_BOTTOM */ + disp = (gmr_size_t) ((uint8_t*)dst - (uint8_t*)mreg->slices[proc].base); + + // Perform checks + ARMCII_Assert_msg(disp >= 0 && disp < mreg->slices[proc].size, "Invalid remote address"); + ARMCII_Assert_msg(disp <= mreg->slices[proc].size, "Transfer is out of range"); + + if (ARMCII_GLOBAL_STATE.use_request_atomics) { + + MPI_Request req; + MPI_Rget_accumulate(src, 1, type, out, 1, type, grp_proc, (MPI_Aint) disp, 1, type, op, mreg->window, &req); + MPI_Wait(&req, MPI_STATUS_IGNORE); + + } else { + + MPI_Fetch_and_op(src, out, type, grp_proc, (MPI_Aint) disp, op, mreg->window); + if (ARMCII_GLOBAL_STATE.end_to_end_flush) { + MPI_Win_flush(grp_proc, mreg->window); + } else { + MPI_Win_flush_local(grp_proc, mreg->window); + } + + } + + return 0; +} + +/** Lock a memory region at all targets so that one-sided operations can be performed. + * + * @param[in] mreg Memory region + * @return 0 on success, non-zero on failure + */ +int gmr_lockall(gmr_t *mreg) { + int grp_me = ARMCII_Translate_absolute_to_group(&mreg->group, ARMCI_GROUP_WORLD.rank); + + ARMCII_Assert(grp_me >= 0); + ARMCII_Assert_msg(mreg->window != MPI_WIN_NULL, "A non-null mreg contains a null window."); + + MPI_Win_lock_all((ARMCII_GLOBAL_STATE.rma_nocheck) ? MPI_MODE_NOCHECK : 0, + mreg->window); + + return 0; +} + +/** Unlock a memory region at all targets. + * + * @param[in] mreg Memory region + * @return 0 on success, non-zero on failure + */ +int gmr_unlockall(gmr_t *mreg) { + int grp_me = ARMCII_Translate_absolute_to_group(&mreg->group, ARMCI_GROUP_WORLD.rank); + + ARMCII_Assert(grp_me >= 0); + ARMCII_Assert_msg(mreg->window != MPI_WIN_NULL, "A non-null mreg contains a null window."); + + MPI_Win_unlock_all(mreg->window); + + return 0; +} + +/** Flush a memory region for local or remote completion. + * + * @param[in] mreg Memory region + * @param[in] proc Absolute process id of the target + * @param[in] local_only Only flush the operation locally. + * @return 0 on success, non-zero on failure + */ +int gmr_flush(gmr_t *mreg, int proc, int local_only) { + int grp_proc = ARMCII_Translate_absolute_to_group(&mreg->group, proc); + int grp_me = ARMCII_Translate_absolute_to_group(&mreg->group, ARMCI_GROUP_WORLD.rank); + + ARMCII_Assert(grp_proc >= 0 && grp_me >= 0); + ARMCII_Assert_msg(mreg->window != MPI_WIN_NULL, "A non-null mreg contains a null window."); + ARMCII_Assert_msg(grp_proc < mreg->group.size, "grp_proc exceeds group size!"); + + if (!local_only || ARMCII_GLOBAL_STATE.end_to_end_flush) { + MPI_Win_flush(grp_proc, mreg->window); + } else { + MPI_Win_flush_local(grp_proc, mreg->window); + } + + return 0; +} + +/** Flush a memory region for remote completion to all targets. + * + * @param[in] mreg Memory region + * @return 0 on success, non-zero on failure + */ +int gmr_flushall(gmr_t *mreg, int local_only) { + int grp_me = ARMCII_Translate_absolute_to_group(&mreg->group, ARMCI_GROUP_WORLD.rank); + + ARMCII_Assert(grp_me >= 0); + ARMCII_Assert_msg(mreg->window != MPI_WIN_NULL, "A non-null mreg contains a null window."); + + if (!local_only || ARMCII_GLOBAL_STATE.end_to_end_flush) { + MPI_Win_flush_all(mreg->window); + } else { + MPI_Win_flush_local_all(mreg->window); + } + + return 0; +} + +/** Sync memory region so that public and private windows are the same. + * + * @param[in] mreg Memory region + * @return 0 on success, non-zero on failure + */ +int gmr_sync(gmr_t *mreg) +{ +#if 0 + // what is the point of this? + int grp_me = ARMCII_Translate_absolute_to_group(&mreg->group, ARMCI_GROUP_WORLD.rank); + ARMCII_Assert(grp_me >= 0); +#endif + ARMCII_Assert_msg(mreg->window != MPI_WIN_NULL, "A non-null mreg contains a null window."); + + if (!(mreg->unified)) { + MPI_Win_sync(mreg->window); + } + + return 0; +} + +void gmr_progress(void) +{ + if (ARMCII_GLOBAL_STATE.explicit_nb_progress) { + int flag; + MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, ARMCI_GROUP_WORLD.comm, &flag, MPI_STATUS_IGNORE); + } + return; +} + +void gmr_handle_add_request(armci_hdl_t * handle, MPI_Request req) +{ + if (handle->batch_size < 0) { + + ARMCII_Warning("gmr_handle_add_request passed a bogus (uninitialized) handle.\n"); + + } else if (handle->batch_size == 0) { + + if (handle->single_request != MPI_REQUEST_NULL) { + ARMCII_Warning("gmr_handle_add_request: handle is corrupt (single_request_array is not MPI_REQUEST_NULL).\n"); + } + if (handle->request_array != NULL) { + //ARMCII_Warning("gmr_handle_add_request: handle is corrupt (request_array is not NULL).\n"); + } + + handle->batch_size = 1; + handle->single_request = req; + + } else if (handle->batch_size == 1) { + + if (handle->single_request == MPI_REQUEST_NULL) { + ARMCII_Warning("gmr_handle_add_request: handle is corrupt (single_request_array is MPI_REQUEST_NULL).\n"); + } + if (handle->request_array != NULL) { + //ARMCII_Warning("gmr_handle_add_request: handle is corrupt (request_array is not NULL).\n"); + } + + // there is a single request in the handle, so we allocate space for two, + // then copy from the single request to the array and append the new one. + // we nullify the single request to make sure it is not usable. + handle->batch_size++; + handle->request_array = malloc( handle->batch_size * sizeof(MPI_Request) ); + handle->request_array[0] = handle->single_request; + handle->request_array[1] = req; + handle->single_request = MPI_REQUEST_NULL; + + } else if (handle->batch_size > 1) { + + if (handle->single_request != MPI_REQUEST_NULL) { + ARMCII_Warning("gmr_handle_add_request: handle is corrupt (single_request_array is not MPI_REQUEST_NULL).\n"); + } + if (handle->request_array == NULL) { + ARMCII_Warning("gmr_handle_add_request: handle is corrupt (request_array is NULL).\n"); + } + + // grow the allocation and append the new one. + handle->batch_size++; + handle->request_array = realloc( handle->request_array , handle->batch_size * sizeof(MPI_Request) ); + handle->request_array[handle->batch_size-1] = req; + + } +} diff --git a/src/gmr.h b/src/gmr.h index 5a45317..602593b 100644 --- a/src/gmr.h +++ b/src/gmr.h @@ -44,29 +44,41 @@ void gmr_destroy(gmr_t *mreg, ARMCI_Group *group); int gmr_destroy_all(void); gmr_t *gmr_lookup(void *ptr, int proc); -int gmr_get(gmr_t *mreg, void *src, void *dst, int size, int target); -int gmr_put(gmr_t *mreg, void *src, void *dst, int size, int target); -int gmr_accumulate(gmr_t *mreg, void *src, void *dst, int count, MPI_Datatype type, int proc); -int gmr_get_accumulate(gmr_t *mreg, void *src, void *out, void *dst, int count, MPI_Datatype type, - MPI_Op op, int proc); +// blocking int gmr_fetch_and_op(gmr_t *mreg, void *src, void *out, void *dst, MPI_Datatype type, MPI_Op op, int proc); +// nonblocking +int gmr_get(gmr_t *mreg, void *src, void *dst, int size, + int target, armci_hdl_t * handle); +int gmr_put(gmr_t *mreg, void *src, void *dst, int size, + int target, armci_hdl_t * handle); +int gmr_accumulate(gmr_t *mreg, void *src, void *dst, int count, MPI_Datatype type, + int proc, armci_hdl_t * handle); +int gmr_get_accumulate(gmr_t *mreg, void *src, void *out, void *dst, int count, MPI_Datatype type, + MPI_Op op, int proc, armci_hdl_t * handle); + int gmr_get_typed(gmr_t *mreg, void *src, int src_count, MPI_Datatype src_type, - void *dst, int dst_count, MPI_Datatype dst_type, int proc); + void *dst, int dst_count, MPI_Datatype dst_type, + int proc, armci_hdl_t * handle); int gmr_put_typed(gmr_t *mreg, void *src, int src_count, MPI_Datatype src_type, - void *dst, int dst_count, MPI_Datatype dst_type, int proc); + void *dst, int dst_count, MPI_Datatype dst_type, + int proc, armci_hdl_t * handle); int gmr_accumulate_typed(gmr_t *mreg, void *src, int src_count, MPI_Datatype src_type, - void *dst, int dst_count, MPI_Datatype dst_type, int proc); + void *dst, int dst_count, MPI_Datatype dst_type, + int proc, armci_hdl_t * handle); int gmr_get_accumulate_typed(gmr_t *mreg, void *src, int src_count, MPI_Datatype src_type, - void *out, int out_count, MPI_Datatype out_type, void *dst, int dst_count, MPI_Datatype dst_type, - MPI_Op op, int proc); + void *out, int out_count, MPI_Datatype out_type, + void *dst, int dst_count, MPI_Datatype dst_type, + MPI_Op op, int proc, armci_hdl_t * handle); int gmr_lockall(gmr_t *mreg); int gmr_unlockall(gmr_t *mreg); int gmr_flush(gmr_t *mreg, int proc, int local_only); int gmr_flushall(gmr_t *mreg, int local_only); int gmr_sync(gmr_t *mreg); +int gmr_wait(armci_hdl_t * handle); void gmr_progress(void); +void gmr_handle_add_request(armci_hdl_t * handle, MPI_Request req); #endif /* HAVE_GMR_H */ diff --git a/src/groups.c b/src/groups.c index 642dae9..fb0b7f5 100644 --- a/src/groups.c +++ b/src/groups.c @@ -11,13 +11,10 @@ #include -/** The ARMCI world group. This is accessed from outside via - * ARMCI_Group_get_world. - */ +/** The ARMCI world group. This is accessed from outside via ARMCI_Group_get_world. */ ARMCI_Group ARMCI_GROUP_WORLD = {0}; ARMCI_Group ARMCI_GROUP_DEFAULT = {0}; - /** Initialize an ARMCI group's remaining fields using the communicator field. */ void ARMCII_Group_init_from_comm(ARMCI_Group *group) { diff --git a/src/init_finalize.c b/src/init_finalize.c index f67ebd8..7ac5b81 100644 --- a/src/init_finalize.c +++ b/src/init_finalize.c @@ -199,6 +199,22 @@ int PARMCI_Init_thread_comm(int armci_requested, MPI_Comm comm) { } } + /* Setup groups and communicators - do this before ARMCII_Warning is called! */ + + MPI_Comm_dup(comm, &ARMCI_GROUP_WORLD.comm); + ARMCII_Group_init_from_comm(&ARMCI_GROUP_WORLD); + ARMCI_GROUP_DEFAULT = ARMCI_GROUP_WORLD; + + /* Create GOP operators */ + + MPI_Op_create(ARMCII_Absmin_op, 1 /* commute */, &ARMCI_MPI_ABSMIN_OP); + MPI_Op_create(ARMCII_Absmax_op, 1 /* commute */, &ARMCI_MPI_ABSMAX_OP); + + MPI_Op_create(ARMCII_Msg_sel_min_op, 1 /* commute */, &ARMCI_MPI_SELMIN_OP); + MPI_Op_create(ARMCII_Msg_sel_max_op, 1 /* commute */, &ARMCI_MPI_SELMAX_OP); + + /* Determine environmental settings */ + ARMCII_GLOBAL_STATE.verbose = ARMCII_Getenv_int("ARMCI_VERBOSE", 0); /* Figure out what MPI library we are using, in an attempt to work around bugs. */ @@ -238,8 +254,8 @@ int PARMCI_Init_thread_comm(int armci_requested, MPI_Comm comm) { #ifdef HAVE_PTHREADS /* Check progress thread settings */ - ARMCII_GLOBAL_STATE.progress_thread = ARMCII_Getenv_bool("ARMCI_PROGRESS_THREAD", 0); - ARMCII_GLOBAL_STATE.progress_usleep = ARMCII_Getenv_int("ARMCI_PROGRESS_USLEEP", 0); + ARMCII_GLOBAL_STATE.progress_thread = ARMCII_Getenv_bool("ARMCI_PROGRESS_THREAD", 0); + ARMCII_GLOBAL_STATE.progress_usleep = ARMCII_Getenv_int("ARMCI_PROGRESS_USLEEP", 0); if (ARMCII_GLOBAL_STATE.progress_thread && (mpi_thread_level!=MPI_THREAD_MULTIPLE)) { ARMCII_Warning("ARMCI progress thread requires MPI_THREAD_MULTIPLE (%d); progress thread disabled.\n", @@ -258,7 +274,7 @@ int PARMCI_Init_thread_comm(int armci_requested, MPI_Comm comm) { /* Check for debugging flags */ - ARMCII_GLOBAL_STATE.debug_alloc = ARMCII_Getenv_bool("ARMCI_DEBUG_ALLOC", 0); + ARMCII_GLOBAL_STATE.debug_alloc = ARMCII_Getenv_bool("ARMCI_DEBUG_ALLOC", 0); { int junk; junk = ARMCII_Getenv_bool("ARMCI_FLUSH_BARRIERS", -1); @@ -275,29 +291,32 @@ int PARMCI_Init_thread_comm(int armci_requested, MPI_Comm comm) { if (ARMCII_Getenv("ARMCI_NONCOLLECTIVE_GROUPS")) { ARMCII_GLOBAL_STATE.noncollective_groups = ARMCII_Getenv_bool("ARMCI_NONCOLLECTIVE_GROUPS", 0); } - ARMCII_GLOBAL_STATE.cache_rank_translation=ARMCII_Getenv_bool("ARMCI_CACHE_RANK_TRANSLATION", 1); - - /* Check for IOV flags */ + ARMCII_GLOBAL_STATE.cache_rank_translation = ARMCII_Getenv_bool("ARMCI_CACHE_RANK_TRANSLATION", 1); -#ifdef NO_SEATBELTS - ARMCII_GLOBAL_STATE.iov_checks = 0; -#endif - ARMCII_GLOBAL_STATE.iov_checks = ARMCII_Getenv_bool("ARMCI_IOV_CHECKS", 0); - ARMCII_GLOBAL_STATE.iov_batched_limit = ARMCII_Getenv_int("ARMCI_IOV_BATCHED_LIMIT", 0); - - if (ARMCII_GLOBAL_STATE.iov_batched_limit < 0) { - ARMCII_Warning("Ignoring invalid value for ARMCI_IOV_BATCHED_LIMIT (%d)\n", ARMCII_GLOBAL_STATE.iov_batched_limit); - ARMCII_GLOBAL_STATE.iov_batched_limit = 0; - } + /* Check for IOV and Strided flags */ #if defined(OPEN_MPI) && defined(OMPI_MAJOR_VERSION) && (OMPI_MAJOR_VERSION < 5) /* Open-MPI 5 RMA works a lot better than older releases... */ ARMCII_GLOBAL_STATE.iov_method = ARMCII_IOV_BATCHED; + ARMCII_GLOBAL_STATE.strided_method = ARMCII_STRIDED_IOV; #else - /* DIRECT leads to addr=NULL errors when ARMCI_{GetV,PutV} are used + /* IOV_DIRECT leads to addr=NULL errors when ARMCI_{GetV,PutV} are used * Jeff: Is this still true? */ ARMCII_GLOBAL_STATE.iov_method = ARMCII_IOV_DIRECT; + ARMCII_GLOBAL_STATE.strided_method = ARMCII_STRIDED_DIRECT; +#endif + +#ifdef NO_SEATBELTS + ARMCII_GLOBAL_STATE.iov_checks = 0; #endif + ARMCII_GLOBAL_STATE.iov_checks = ARMCII_Getenv_bool("ARMCI_IOV_CHECKS", 0); + ARMCII_GLOBAL_STATE.iov_batched_limit = ARMCII_Getenv_int("ARMCI_IOV_BATCHED_LIMIT", 0); + + if (ARMCII_GLOBAL_STATE.iov_batched_limit < 0) { + ARMCII_Warning("Ignoring invalid value for ARMCI_IOV_BATCHED_LIMIT (%d)\n", + ARMCII_GLOBAL_STATE.iov_batched_limit); + ARMCII_GLOBAL_STATE.iov_batched_limit = 0; + } char *var = ARMCII_Getenv("ARMCI_IOV_METHOD"); if (var != NULL) { @@ -313,14 +332,6 @@ int PARMCI_Init_thread_comm(int armci_requested, MPI_Comm comm) { ARMCII_Warning("Ignoring unknown value for ARMCI_IOV_METHOD (%s)\n", var); } - /* Check for Strided flags */ - -#if defined(OPEN_MPI) - ARMCII_GLOBAL_STATE.strided_method = ARMCII_STRIDED_IOV; -#else - ARMCII_GLOBAL_STATE.strided_method = ARMCII_STRIDED_DIRECT; -#endif - var = ARMCII_Getenv("ARMCI_STRIDED_METHOD"); if (var != NULL) { if (strcmp(var, "IOV") == 0) { @@ -334,10 +345,13 @@ int PARMCI_Init_thread_comm(int armci_requested, MPI_Comm comm) { #ifdef OPEN_MPI if (ARMCII_GLOBAL_STATE.iov_method == ARMCII_IOV_DIRECT || ARMCII_GLOBAL_STATE.strided_method == ARMCII_STRIDED_DIRECT) { + if (ARMCI_GROUP_WORLD.rank == 0) { ARMCII_Warning("MPI Datatypes are broken in RMA in many versions of Open-MPI!\n"); #if defined(OMPI_MAJOR_VERSION) && (OMPI_MAJOR_VERSION == 4) - ARMCII_Warning("Open-MPI 4.0.0 RMA with datatypes is definitely broken. See https://github.com/open-mpi/ompi/issues/6275 for details.\n"); + ARMCII_Warning("Open-MPI 4.0.0 RMA with datatypes is definitely broken." + "See https://github.com/open-mpi/ompi/issues/6275 for details.\n"); #endif + } } #endif @@ -396,19 +410,15 @@ int PARMCI_Init_thread_comm(int armci_requested, MPI_Comm comm) { /* Use MPI_MODE_NOCHECK assertion */ ARMCII_GLOBAL_STATE.rma_nocheck=ARMCII_Getenv_bool("ARMCI_RMA_NOCHECK", 1); - /* Setup groups and communicators */ + /* Use request-based RMA for atomic operations */ + ARMCII_GLOBAL_STATE.use_request_atomics=ARMCII_Getenv_bool("ARMCI_USE_REQUEST_ATOMICS", 1); - MPI_Comm_dup(comm, &ARMCI_GROUP_WORLD.comm); - ARMCII_Group_init_from_comm(&ARMCI_GROUP_WORLD); - ARMCI_GROUP_DEFAULT = ARMCI_GROUP_WORLD; - - /* Create GOP operators */ - - MPI_Op_create(ARMCII_Absmin_op, 1 /* commute */, &ARMCI_MPI_ABSMIN_OP); - MPI_Op_create(ARMCII_Absmax_op, 1 /* commute */, &ARMCI_MPI_ABSMAX_OP); - - MPI_Op_create(ARMCII_Msg_sel_min_op, 1 /* commute */, &ARMCI_MPI_SELMIN_OP); - MPI_Op_create(ARMCII_Msg_sel_max_op, 1 /* commute */, &ARMCI_MPI_SELMAX_OP); + /* Use request-based RMA for ARMCI nonblocking with explicit handles */ +#ifdef USE_RMA_REQUESTS + const int use_rma_requests = 1; +#else + const int use_rma_requests = 0; +#endif #ifdef HAVE_MEMKIND_H char * pool_path; @@ -543,6 +553,8 @@ int PARMCI_Init_thread_comm(int armci_requested, MPI_Comm comm) { printf(" NO_FLUSH_LOCAL = %s\n", ARMCII_GLOBAL_STATE.end_to_end_flush ? "TRUE" : "FALSE"); printf(" RMA_NOCHECK = %s\n", ARMCII_GLOBAL_STATE.rma_nocheck ? "TRUE" : "FALSE"); printf(" MSG_BARRIER_SYNCS = %s\n", ARMCII_GLOBAL_STATE.msg_barrier_syncs ? "TRUE" : "FALSE"); + printf(" USE_REQUEST_ATOMICS = %s\n", ARMCII_GLOBAL_STATE.use_request_atomics ? "TRUE" : "FALSE"); + printf(" USE_RMA_REQUESTS = %s\n", use_rma_requests ? "TRUE" : "FALSE"); // compile-time option /* MPI info set on window */ printf(" USE_ALLOC_SHM = %s\n", ARMCII_GLOBAL_STATE.use_alloc_shm ? "TRUE" : "FALSE"); @@ -568,7 +580,7 @@ int PARMCI_Init_thread_comm(int armci_requested, MPI_Comm comm) { /* Update (Aug. 2022): it has reappeared in MPICH 4.x, per * https://github.com/pmodels/mpich/issues/6110 */ printf(" Warning: MPI_Win_allocate can lead to correctness issues.\n"); - if ((mpi_implementation == ARMCII_MPICH) && (mpi_impl_major == 4)) { + if ((mpi_implementation == ARMCII_MPICH) && (mpi_impl_major == 4)) { printf(" There is a good chance your implementation is affected!\n"); printf(" See https://github.com/pmodels/mpich/issues/6110 for details.\n"); } diff --git a/src/onesided.c b/src/onesided.c index e934a8b..490626a 100644 --- a/src/onesided.c +++ b/src/onesided.c @@ -104,7 +104,7 @@ int PARMCI_Get(void *src, void *dst, int size, int target) { /* Origin buffer is private */ else if (dst_mreg == NULL) { - gmr_get(src_mreg, src, dst, size, target); + gmr_get(src_mreg, src, dst, size, target, NULL /* handle */); gmr_flush(src_mreg, target, 0); /* it's a round trip so w.r.t. flush, local=remote */ } @@ -117,7 +117,7 @@ int PARMCI_Get(void *src, void *dst, int size, int target) { MPI_Alloc_mem(size, MPI_INFO_NULL, &dst_buf); ARMCII_Assert(dst_buf != NULL); - gmr_get(src_mreg, src, dst_buf, size, target); + gmr_get(src_mreg, src, dst_buf, size, target, NULL /* handle */); gmr_flush(src_mreg, target, 0); /* it's a round trip so w.r.t. flush, local=remote */ ARMCI_Copy(dst_buf, dst, size); @@ -167,7 +167,7 @@ int PARMCI_Put(void *src, void *dst, int size, int target) { /* Origin buffer is private */ else if (src_mreg == NULL) { - gmr_put(dst_mreg, src, dst, size, target); + gmr_put(dst_mreg, src, dst, size, target, NULL /* handle */); gmr_flush(dst_mreg, target, 1); /* flush_local */ } @@ -182,7 +182,7 @@ int PARMCI_Put(void *src, void *dst, int size, int target) { ARMCI_Copy(src, src_buf, size); - gmr_put(dst_mreg, src_buf, dst, size, target); + gmr_put(dst_mreg, src_buf, dst, size, target, NULL /* handle */); gmr_flush(dst_mreg, target, 1); /* flush_local */ MPI_Free_mem(src_buf); @@ -259,7 +259,7 @@ int PARMCI_Acc(int datatype, void *scale, void *src, void *dst, int bytes, int p /* TODO: Support a local accumulate operation more efficiently */ - gmr_accumulate(dst_mreg, src_buf, dst, count, type, proc); + gmr_accumulate(dst_mreg, src_buf, dst, count, type, proc, NULL /* handle */); gmr_flush(dst_mreg, proc, 1); /* flush_local */ if (src_buf != src) diff --git a/src/onesided_nb.c b/src/onesided_nb.c index f5a58c5..fb3da2b 100644 --- a/src/onesided_nb.c +++ b/src/onesided_nb.c @@ -11,24 +11,39 @@ /** Initialize Non-blocking handle. */ -void ARMCI_INIT_HANDLE(armci_hdl_t *handle) { - if (handle!=NULL) { - handle->aggregate = 1; +void ARMCI_INIT_HANDLE(armci_hdl_t *handle) +{ + ARMCII_Assert_msg(handle, "handle is NULL"); + if (1 || handle != NULL) { +#ifdef USE_RMA_REQUESTS + handle->batch_size = 0; + handle->single_request = MPI_REQUEST_NULL; + handle->request_array = NULL; +#else + handle->aggregate = 0; handle->target = -1; +#endif } else { - ARMCII_Warning("ARMCI_INIT_HANDLE given NULL handle"); + ARMCII_Warning("ARMCI_INIT_HANDLE given NULL handle.\n"); } return; } - /** Mark a handle as aggregate. */ -void ARMCI_SET_AGGREGATE_HANDLE(armci_hdl_t *handle) { - if (handle!=NULL) { +void ARMCI_SET_AGGREGATE_HANDLE(armci_hdl_t *handle) +{ + ARMCII_Assert_msg(handle, "handle is NULL"); + if (1 || handle != NULL) { +#ifdef USE_RMA_REQUESTS + handle->batch_size = 0; + handle->single_request = MPI_REQUEST_NULL; + handle->request_array = NULL; +#else handle->aggregate = 1; +#endif } else { - ARMCII_Warning("ARMCI_INIT_HANDLE given NULL handle"); + ARMCII_Warning("ARMCI_SET_AGGREGATE_HANDLE given NULL handle.\n"); } return; } @@ -36,11 +51,19 @@ void ARMCI_SET_AGGREGATE_HANDLE(armci_hdl_t *handle) { /** Clear an aggregate handle. */ -void ARMCI_UNSET_AGGREGATE_HANDLE(armci_hdl_t *handle) { - if (handle!=NULL) { +void ARMCI_UNSET_AGGREGATE_HANDLE(armci_hdl_t *handle) +{ + ARMCII_Assert_msg(handle, "handle is NULL"); + if (1 || handle != NULL) { +#ifdef USE_RMA_REQUESTS + handle->batch_size = 0; + handle->single_request = MPI_REQUEST_NULL; + handle->request_array = NULL; +#else handle->aggregate = 0; +#endif } else { - ARMCII_Warning("ARMCI_INIT_HANDLE given NULL handle"); + ARMCII_Warning("ARMCI_UNSET_AGGREGATE_HANDLE given NULL handle.\n"); } return; } @@ -58,7 +81,8 @@ void ARMCI_UNSET_AGGREGATE_HANDLE(armci_hdl_t *handle) { /** Non-blocking put operation. */ -int PARMCI_NbPut(void *src, void *dst, int size, int target, armci_hdl_t *handle) { +int PARMCI_NbPut(void *src, void *dst, int size, int target, armci_hdl_t *handle) +{ gmr_t *src_mreg, *dst_mreg; dst_mreg = gmr_lookup(dst, target); @@ -76,12 +100,7 @@ int PARMCI_NbPut(void *src, void *dst, int size, int target, armci_hdl_t *handle ARMCI_Copy(src, dst, size); } else { - gmr_put(dst_mreg, src, dst, size, target); - } - - if (handle!=NULL) { - /* Regular (not aggregate) handles merely store the target for future flushing. */ - handle->target = target; + gmr_put(dst_mreg, src, dst, size, target, handle); } gmr_progress(); @@ -120,12 +139,7 @@ int PARMCI_NbGet(void *src, void *dst, int size, int target, armci_hdl_t *handle ARMCI_Copy(src, dst, size); } else { - gmr_get(src_mreg, src, dst, size, target); - } - - if (handle!=NULL) { - /* Regular (not aggregate) handles merely store the target for future flushing. */ - handle->target = target; + gmr_get(src_mreg, src, dst, size, target, handle); } gmr_progress(); @@ -192,7 +206,7 @@ int PARMCI_NbAcc(int datatype, void *scale, void *src, void *dst, int bytes, int /* TODO: Support a local accumulate operation more efficiently */ - gmr_accumulate(dst_mreg, src_buf, dst, count, type, target); + gmr_accumulate(dst_mreg, src_buf, dst, count, type, target, handle); if (src_buf != src) { /* must wait for local completion to free source buffer */ @@ -200,11 +214,6 @@ int PARMCI_NbAcc(int datatype, void *scale, void *src, void *dst, int bytes, int MPI_Free_mem(src_buf); } - if (handle!=NULL) { - /* Regular (not aggregate) handles merely store the target for future flushing. */ - handle->target = target; - } - gmr_progress(); return 0; @@ -223,7 +232,54 @@ int PARMCI_NbAcc(int datatype, void *scale, void *src, void *dst, int bytes, int /** Wait for a non-blocking operation to finish. */ -int PARMCI_Wait(armci_hdl_t* handle) { +int PARMCI_Wait(armci_hdl_t* handle) +{ +#ifdef USE_RMA_REQUESTS + + ARMCII_Assert_msg(handle, "handle is NULL"); + + if (handle->batch_size < 0) { + + ARMCII_Warning("ARMCI_Wait passed a bogus (uninitialized) handle.\n"); + + } else if (handle->batch_size == 0) { + + ARMCII_Warning("ARMCI_Wait passed an inactive handle.\n"); + + } else if (handle->batch_size == 1) { + + if (handle->single_request == MPI_REQUEST_NULL) { + ARMCII_Warning("ARMCI_Wait: handle is corrupt (single_request_array is MPI_REQUEST_NULL)\n"); + } + if (handle->request_array != NULL) { + //ARMCII_Warning("ARMCI_Wait: handle is corrupt (request_array is not NULL)\n"); + } + + MPI_Wait( &(handle->single_request), MPI_STATUS_IGNORE ); + + handle->batch_size = 0; + handle->single_request = MPI_REQUEST_NULL; + handle->request_array = NULL; + + } else if (handle->batch_size > 1) { + + if (handle->single_request != MPI_REQUEST_NULL) { + ARMCII_Warning("ARMCI_Wait: handle is corrupt (single_request_array is not MPI_REQUEST_NULL)\n"); + } + if (handle->request_array == NULL) { + ARMCII_Warning("ARMCI_Wait: handle is corrupt (request_array is NULL)\n"); + } + + MPI_Waitall( handle->batch_size, handle->request_array, MPI_STATUSES_IGNORE ); + free(handle->request_array); + + handle->batch_size = 0; + handle->single_request = MPI_REQUEST_NULL; + handle->request_array = NULL; + } + +#else + gmr_t *cur_mreg = gmr_list; if(handle->aggregate > 0) { @@ -239,6 +295,9 @@ int PARMCI_Wait(armci_hdl_t* handle) { cur_mreg = cur_mreg->next; } } + +#endif + return 0; } @@ -255,8 +314,64 @@ int PARMCI_Wait(armci_hdl_t* handle) { /** Check if a non-blocking operation has finished. */ -int PARMCI_Test(armci_hdl_t* handle) { +int PARMCI_Test(armci_hdl_t* handle) +{ +#ifdef USE_RMA_REQUESTS + + int flag = 0; + + ARMCII_Assert_msg(handle, "handle is NULL"); + + if (handle->batch_size < 0) { + + ARMCII_Warning("ARMCI_Test passed a bogus (uninitialized) handle.\n"); + + } else if (handle->batch_size == 0) { + + ARMCII_Warning("ARMCI_Test passed an inactive handle.\n"); + + } else if (handle->batch_size == 1) { + + if (handle->single_request == MPI_REQUEST_NULL) { + ARMCII_Warning("ARMCI_Test: handle is corrupt (single_request_array is MPI_REQUEST_NULL)\n"); + } + if (handle->request_array != NULL) { + ARMCII_Warning("ARMCI_Test: handle is corrupt (request_array is not NULL)\n"); + } + + MPI_Test( &(handle->single_request), &flag, MPI_STATUS_IGNORE ); + + if (flag) { + handle->batch_size = 0; + handle->single_request = MPI_REQUEST_NULL; + handle->request_array = NULL; + } + + } else if (handle->batch_size > 1) { + + if (handle->single_request != MPI_REQUEST_NULL) { + ARMCII_Warning("ARMCI_Test: handle is corrupt (single_request_array is not MPI_REQUEST_NULL)\n"); + } + if (handle->request_array == NULL) { + ARMCII_Warning("ARMCI_Test: handle is corrupt (request_array is NULL)\n"); + } + + MPI_Testall( handle->batch_size, handle->request_array, &flag, MPI_STATUSES_IGNORE ); + + if (flag) { + free(handle->request_array); + handle->batch_size = 0; + handle->single_request = MPI_REQUEST_NULL; + handle->request_array = NULL; + } + } + + // no error codes are supported so we can do this + return (!flag); + +#else return PARMCI_Wait(handle); +#endif } @@ -272,7 +387,8 @@ int PARMCI_Test(armci_hdl_t* handle) { /** Wait for all outstanding non-blocking operations with implicit handles to a particular process to finish. */ -int PARMCI_WaitProc(int proc) { +int PARMCI_WaitProc(int proc) +{ gmr_t *cur_mreg = gmr_list; while (cur_mreg) { @@ -295,7 +411,8 @@ int PARMCI_WaitProc(int proc) { /** Wait for all non-blocking operations with implicit (NULL) handles to finish. */ -int PARMCI_WaitAll(void) { +int PARMCI_WaitAll(void) +{ gmr_t *cur_mreg = gmr_list; while (cur_mreg) { diff --git a/src/rmw.c b/src/rmw.c index 44ad856..a45da16 100644 --- a/src/rmw.c +++ b/src/rmw.c @@ -37,8 +37,8 @@ * @param[in] value Value to add to remote location (ignored for swap). * @param[in] proc Process rank for the target buffer. */ -int PARMCI_Rmw(int op, void *ploc, void *prem, int value, int proc) { - +int PARMCI_Rmw(int op, void *ploc, void *prem, int value, int proc) +{ int is_swap = 0, is_long = 0; MPI_Datatype type; MPI_Op rop; @@ -73,33 +73,48 @@ int PARMCI_Rmw(int op, void *ploc, void *prem, int value, int proc) { /* We hold the DLA lock if (src_mreg != NULL). */ if (is_swap) { - long out_val_l, src_val_l = *((long*)ploc); - int out_val_i, src_val_i = *((int*)ploc); - - gmr_fetch_and_op(dst_mreg, - is_long ? (void*) &src_val_l : (void*) &src_val_i /* src */, - is_long ? (void*) &out_val_l : (void*) &out_val_i /* out */, - prem /* dst */, type, rop, proc); - gmr_flush(dst_mreg, proc, 0); /* it's a round trip so w.r.t. flush, local=remote */ - if (is_long) + + if (is_long) { + + long out_val_l, src_val_l = *((long*)ploc); + + // this is a blocking operation + gmr_fetch_and_op(dst_mreg, (void*) &src_val_l, (void*) &out_val_l, prem /* dst */, type, rop, proc); + *(long*) ploc = out_val_l; - else + + } else { + + int out_val_i, src_val_i = *((int*)ploc); + + // this is a blocking operation + gmr_fetch_and_op(dst_mreg, (void*) &src_val_i, (void*) &out_val_i, prem /* dst */, type, rop, proc); + *(int*) ploc = out_val_i; + + } } else /* fetch-and-add */ { - long fetch_val_l, add_val_l = value; - int fetch_val_i, add_val_i = value; - gmr_fetch_and_op(dst_mreg, - is_long ? (void*) &add_val_l : (void*) &add_val_i /* src */, - is_long ? (void*) &fetch_val_l : (void*) &fetch_val_i /* out */, - prem /* dst */, type, rop, proc); - gmr_flush(dst_mreg, proc, 0); /* it's a round trip so w.r.t. flush, local=remote */ + if (is_long) { + + long fetch_val_l, add_val_l = value; + + // this is a blocking operation + gmr_fetch_and_op(dst_mreg, (void*) &add_val_l, (void*) &fetch_val_l, prem /* dst */, type, rop, proc); - if (is_long) *(long*) ploc = fetch_val_l; - else + + } else { + + int fetch_val_i, add_val_i = value; + + // this is a blocking operation + gmr_fetch_and_op(dst_mreg, (void*) &add_val_i, (void*) &fetch_val_i, prem /* dst */, type, rop, proc); + *(int*) ploc = fetch_val_i; + + } } return 0; diff --git a/src/strided.c b/src/strided.c index 85370d7..2f25954 100644 --- a/src/strided.c +++ b/src/strided.c @@ -158,7 +158,7 @@ int PARMCI_PutS(void *src_ptr, int src_stride_ar[/*stride_levels*/], mreg = gmr_lookup(dst_ptr, proc); ARMCII_Assert_msg(mreg != NULL, "Invalid shared pointer"); - gmr_put_typed(mreg, src_buf, 1, src_type, dst_ptr, 1, dst_type, proc); + gmr_put_typed(mreg, src_buf, 1, src_type, dst_ptr, 1, dst_type, proc, NULL /* handle */); gmr_flush(mreg, proc, 1); /* flush_local */ MPI_Type_free(&src_type); @@ -258,7 +258,7 @@ int PARMCI_GetS(void *src_ptr, int src_stride_ar[/*stride_levels*/], mreg = gmr_lookup(src_ptr, proc); ARMCII_Assert_msg(mreg != NULL, "Invalid shared pointer"); - gmr_get_typed(mreg, src_ptr, 1, src_type, dst_buf, 1, dst_type, proc); + gmr_get_typed(mreg, src_ptr, 1, src_type, dst_buf, 1, dst_type, proc, NULL /* handle */); gmr_flush(mreg, proc, 0); /* COPY: Finish the transfer */ @@ -400,7 +400,7 @@ int PARMCI_AccS(int datatype, void *scale, mreg = gmr_lookup(dst_ptr, proc); ARMCII_Assert_msg(mreg != NULL, "Invalid shared pointer"); - gmr_accumulate_typed(mreg, src_buf, 1, src_type, dst_ptr, 1, dst_type, proc); + gmr_accumulate_typed(mreg, src_buf, 1, src_type, dst_ptr, 1, dst_type, proc, NULL /* handle */); gmr_flush(mreg, proc, 1); /* flush_local */ MPI_Type_free(&src_type); diff --git a/src/strided_nb.c b/src/strided_nb.c index 7f4f748..19e9e59 100644 --- a/src/strided_nb.c +++ b/src/strided_nb.c @@ -37,8 +37,8 @@ * @return Zero on success, error code otherwise. */ int PARMCI_NbPutS(void *src_ptr, int src_stride_ar[/*stride_levels*/], - void *dst_ptr, int dst_stride_ar[/*stride_levels*/], - int count[/*stride_levels+1*/], int stride_levels, int proc, armci_hdl_t *handle) { + void *dst_ptr, int dst_stride_ar[/*stride_levels*/], + int count[/*stride_levels+1*/], int stride_levels, int proc, armci_hdl_t * handle) { int err; @@ -87,7 +87,7 @@ int PARMCI_NbPutS(void *src_ptr, int src_stride_ar[/*stride_levels*/], mreg = gmr_lookup(dst_ptr, proc); ARMCII_Assert_msg(mreg != NULL, "Invalid shared pointer"); - gmr_put_typed(mreg, src_buf, 1, src_type, dst_ptr, 1, dst_type, proc); + gmr_put_typed(mreg, src_buf, 1, src_type, dst_ptr, 1, dst_type, proc, handle); MPI_Type_free(&src_type); MPI_Type_free(&dst_type); @@ -98,11 +98,6 @@ int PARMCI_NbPutS(void *src_ptr, int src_stride_ar[/*stride_levels*/], MPI_Free_mem(src_buf); } - if (handle!=NULL) { - /* Regular (not aggregate) handles merely store the target for future flushing. */ - handle->target = proc; - } - err = 0; } else { @@ -194,7 +189,7 @@ int PARMCI_NbGetS(void *src_ptr, int src_stride_ar[/*stride_levels*/], mreg = gmr_lookup(src_ptr, proc); ARMCII_Assert_msg(mreg != NULL, "Invalid shared pointer"); - gmr_get_typed(mreg, src_ptr, 1, src_type, dst_buf, 1, dst_type, proc); + gmr_get_typed(mreg, src_ptr, 1, src_type, dst_buf, 1, dst_type, proc, handle); /* COPY: Finish the transfer */ if (dst_buf != dst_ptr) { @@ -206,11 +201,6 @@ int PARMCI_NbGetS(void *src_ptr, int src_stride_ar[/*stride_levels*/], MPI_Type_free(&src_type); MPI_Type_free(&dst_type); - if (handle!=NULL) { - /* Regular (not aggregate) handles merely store the target for future flushing. */ - handle->target = proc; - } - err = 0; } else { @@ -344,7 +334,7 @@ int PARMCI_NbAccS(int datatype, void *scale, mreg = gmr_lookup(dst_ptr, proc); ARMCII_Assert_msg(mreg != NULL, "Invalid shared pointer"); - gmr_accumulate_typed(mreg, src_buf, 1, src_type, dst_ptr, 1, dst_type, proc); + gmr_accumulate_typed(mreg, src_buf, 1, src_type, dst_ptr, 1, dst_type, proc, handle); MPI_Type_free(&src_type); MPI_Type_free(&dst_type); @@ -355,11 +345,6 @@ int PARMCI_NbAccS(int datatype, void *scale, MPI_Free_mem(src_buf); } - if (handle!=NULL) { - /* Regular (not aggregate) handles merely store the target for future flushing. */ - handle->target = proc; - } - err = 0; } else { diff --git a/src/vector.c b/src/vector.c index 4667aec..71f9964 100644 --- a/src/vector.c +++ b/src/vector.c @@ -123,21 +123,21 @@ int ARMCII_Iov_check_same_allocation(void **ptrs, int count, int proc) { * @return Zero on success, error code otherwise */ int ARMCII_Iov_op_dispatch(enum ARMCII_Op_e op, void **src, void **dst, int count, int size, - int datatype, int overlapping, int same_alloc, int proc, int blocking) { + int datatype, int overlapping, int same_alloc, int proc, + int blocking, armci_hdl_t * handle) +{ MPI_Datatype type; int type_count, type_size; if (op == ARMCII_OP_ACC) { ARMCII_Acc_type_translate(datatype, &type, &type_size); - type_count = size/type_size; - ARMCII_Assert_msg(size % type_size == 0, "Transfer size is not a multiple of type size"); } else { type = MPI_BYTE; MPI_Type_size(type, &type_size); - type_count = size/type_size; - ARMCII_Assert_msg(size % type_size == 0, "Transfer size is not a multiple of type size"); } + type_count = size/type_size; + ARMCII_Assert_msg(size % type_size == 0, "Transfer size is not a multiple of type size"); // CONSERVATIVE CASE: If remote pointers overlap or remote pointers correspond to // multiple allocations, use the safe implementation to avoid invalid MPI @@ -150,7 +150,7 @@ int ARMCII_Iov_op_dispatch(enum ARMCII_Op_e op, void **src, void **dst, int coun return ARMCII_Iov_op_safe(op, src, dst, count, type_count, type, proc); #else /* Jeff: We are going to always block when there is buffer overlap. */ - return ARMCII_Iov_op_batched(op, src, dst, count, type_count, type, proc, 1 /* consrv */, 1 /* blocking */); + return ARMCII_Iov_op_batched(op, src, dst, count, type_count, type, proc, 1 /* consrv */, 1 /* blocking */, handle); #endif } @@ -159,10 +159,10 @@ int ARMCII_Iov_op_dispatch(enum ARMCII_Op_e op, void **src, void **dst, int coun else if ( ARMCII_GLOBAL_STATE.iov_method == ARMCII_IOV_DIRECT || ARMCII_GLOBAL_STATE.iov_method == ARMCII_IOV_AUTO ) { - return ARMCII_Iov_op_datatype(op, src, dst, count, type_count, type, proc, blocking); + return ARMCII_Iov_op_datatype(op, src, dst, count, type_count, type, proc, blocking, handle); } else if (ARMCII_GLOBAL_STATE.iov_method == ARMCII_IOV_BATCHED) { - return ARMCII_Iov_op_batched(op, src, dst, count, type_count, type, proc, 0 /* not consrv */, blocking); + return ARMCII_Iov_op_batched(op, src, dst, count, type_count, type, proc, 0 /* not consrv */, blocking, handle); } else { ARMCII_Error("unknown iov method (%d)\n", ARMCII_GLOBAL_STATE.iov_method); @@ -228,7 +228,7 @@ int ARMCII_Iov_op_safe(enum ARMCII_Op_e op, void **src, void **dst, int count, i * lock/unlock pair. */ int ARMCII_Iov_op_batched(enum ARMCII_Op_e op, void **src, void **dst, int count, int elem_count, - MPI_Datatype type, int proc, int consrv, int blocking) { + MPI_Datatype type, int proc, int consrv, int blocking, armci_hdl_t * handle) { int i; int flush_local = 1; /* used only for MPI-3 */ @@ -264,15 +264,15 @@ int ARMCII_Iov_op_batched(enum ARMCII_Op_e op, void **src, void **dst, int count switch(op) { case ARMCII_OP_PUT: - gmr_put(mreg, src[i], dst[i], elem_count, proc); + gmr_put(mreg, src[i], dst[i], elem_count, proc, handle); flush_local = 1; break; case ARMCII_OP_GET: - gmr_get(mreg, src[i], dst[i], elem_count, proc); + gmr_get(mreg, src[i], dst[i], elem_count, proc, handle); flush_local = 0; break; case ARMCII_OP_ACC: - gmr_accumulate(mreg, src[i], dst[i], elem_count, type, proc); + gmr_accumulate(mreg, src[i], dst[i], elem_count, type, proc, handle); flush_local = 1; break; default: @@ -293,7 +293,8 @@ int ARMCII_Iov_op_batched(enum ARMCII_Op_e op, void **src, void **dst, int count * datatype to achieve a one-sided gather/scatter. */ int ARMCII_Iov_op_datatype(enum ARMCII_Op_e op, void **src, void **dst, int count, int elem_count, - MPI_Datatype type, int proc, int blocking) { + MPI_Datatype type, int proc, int blocking, armci_hdl_t * handle) +{ gmr_t *mreg; MPI_Datatype type_loc, type_rem; @@ -354,15 +355,15 @@ int ARMCII_Iov_op_datatype(enum ARMCII_Op_e op, void **src, void **dst, int coun switch(op) { case ARMCII_OP_PUT: - gmr_put_typed(mreg, MPI_BOTTOM, 1, type_loc, MPI_BOTTOM, 1, type_rem, proc); + gmr_put_typed(mreg, MPI_BOTTOM, 1, type_loc, MPI_BOTTOM, 1, type_rem, proc, handle); flush_local = 1; break; case ARMCII_OP_GET: - gmr_get_typed(mreg, MPI_BOTTOM, 1, type_rem, MPI_BOTTOM, 1, type_loc, proc); + gmr_get_typed(mreg, MPI_BOTTOM, 1, type_rem, MPI_BOTTOM, 1, type_loc, proc, handle); flush_local = 0; break; case ARMCII_OP_ACC: - gmr_accumulate_typed(mreg, MPI_BOTTOM, 1, type_loc, MPI_BOTTOM, 1, type_rem, proc); + gmr_accumulate_typed(mreg, MPI_BOTTOM, 1, type_loc, MPI_BOTTOM, 1, type_rem, proc, handle); flush_local = 1; break; default: @@ -398,10 +399,9 @@ int ARMCII_Iov_op_datatype(enum ARMCII_Op_e op, void **src, void **dst, int coun * @param[in] proc Target process. * @return Success 0, otherwise non-zero. */ -int PARMCI_PutV(armci_giov_t *iov, int iov_len, int proc) { - int v; - - for (v = 0; v < iov_len; v++) { +int PARMCI_PutV(armci_giov_t *iov, int iov_len, int proc) +{ + for (int v = 0; v < iov_len; v++) { void **src_buf; int overlapping, same_alloc; @@ -413,7 +413,7 @@ int PARMCI_PutV(armci_giov_t *iov, int iov_len, int proc) { ARMCII_Buf_prepare_read_vec(iov[v].src_ptr_array, &src_buf, iov[v].ptr_array_len, iov[v].bytes); ARMCII_Iov_op_dispatch(ARMCII_OP_PUT, src_buf, iov[v].dst_ptr_array, iov[v].ptr_array_len, iov[v].bytes, 0, - overlapping, same_alloc, proc, 1 /* blocking */); + overlapping, same_alloc, proc, 1 /* blocking */, NULL); ARMCII_Buf_finish_read_vec(iov[v].src_ptr_array, src_buf, iov[v].ptr_array_len, iov[v].bytes); } @@ -438,10 +438,9 @@ int PARMCI_PutV(armci_giov_t *iov, int iov_len, int proc) { * @param[in] proc Target process. * @return Success 0, otherwise non-zero. */ -int PARMCI_GetV(armci_giov_t *iov, int iov_len, int proc) { - int v; - - for (v = 0; v < iov_len; v++) { +int PARMCI_GetV(armci_giov_t *iov, int iov_len, int proc) +{ + for (int v = 0; v < iov_len; v++) { void **dst_buf; int overlapping, same_alloc; @@ -454,7 +453,7 @@ int PARMCI_GetV(armci_giov_t *iov, int iov_len, int proc) { ARMCII_Buf_prepare_write_vec(iov[v].dst_ptr_array, &dst_buf, iov[v].ptr_array_len, iov[v].bytes); ARMCII_Iov_op_dispatch(ARMCII_OP_GET, iov[v].src_ptr_array, dst_buf, iov[v].ptr_array_len, iov[v].bytes, 0, - overlapping, same_alloc, proc, 1 /* blocking */); + overlapping, same_alloc, proc, 1 /* blocking */, NULL); ARMCII_Buf_finish_write_vec(iov[v].dst_ptr_array, dst_buf, iov[v].ptr_array_len, iov[v].bytes); } @@ -479,10 +478,9 @@ int PARMCI_GetV(armci_giov_t *iov, int iov_len, int proc) { * @param[in] proc Target process. * @return Success 0, otherwise non-zero. */ -int PARMCI_AccV(int datatype, void *scale, armci_giov_t *iov, int iov_len, int proc) { - int v; - - for (v = 0; v < iov_len; v++) { +int PARMCI_AccV(int datatype, void *scale, armci_giov_t *iov, int iov_len, int proc) +{ + for (int v = 0; v < iov_len; v++) { void **src_buf; int overlapping, same_alloc; @@ -494,7 +492,7 @@ int PARMCI_AccV(int datatype, void *scale, armci_giov_t *iov, int iov_len, int p ARMCII_Buf_prepare_acc_vec(iov[v].src_ptr_array, &src_buf, iov[v].ptr_array_len, iov[v].bytes, datatype, scale); ARMCII_Iov_op_dispatch(ARMCII_OP_ACC, src_buf, iov[v].dst_ptr_array, iov[v].ptr_array_len, iov[v].bytes, datatype, - overlapping, same_alloc, proc, 1 /* blocking */); + overlapping, same_alloc, proc, 1 /* blocking */, NULL); ARMCII_Buf_finish_acc_vec(iov[v].src_ptr_array, src_buf, iov[v].ptr_array_len, iov[v].bytes); } diff --git a/src/vector_nb.c b/src/vector_nb.c index 4f5a98e..ed41c6c 100644 --- a/src/vector_nb.c +++ b/src/vector_nb.c @@ -27,15 +27,15 @@ * @param[in] proc Target process. * @return Success 0, otherwise non-zero. */ -int PARMCI_NbPutV(armci_giov_t *iov, int iov_len, int proc, armci_hdl_t* handle) { - int v; +int PARMCI_NbPutV(armci_giov_t *iov, int iov_len, int proc, armci_hdl_t* handle) +{ int blocking = 0; if (ARMCII_GLOBAL_STATE.shr_buf_method != ARMCII_SHR_BUF_NOGUARD) { blocking = 1; } - for (v = 0; v < iov_len; v++) { + for (int v = 0; v < iov_len; v++) { void **src_buf; int overlapping, same_alloc; @@ -47,15 +47,10 @@ int PARMCI_NbPutV(armci_giov_t *iov, int iov_len, int proc, armci_hdl_t* handle) ARMCII_Buf_prepare_read_vec(iov[v].src_ptr_array, &src_buf, iov[v].ptr_array_len, iov[v].bytes); ARMCII_Iov_op_dispatch(ARMCII_OP_PUT, src_buf, iov[v].dst_ptr_array, iov[v].ptr_array_len, iov[v].bytes, 0, - overlapping, same_alloc, proc, blocking); + overlapping, same_alloc, proc, blocking, handle); ARMCII_Buf_finish_read_vec(iov[v].src_ptr_array, src_buf, iov[v].ptr_array_len, iov[v].bytes); } - if (handle!=NULL) { - /* Regular (not aggregate) handles merely store the target for future flushing. */ - handle->target = proc; - } - gmr_progress(); return 0; @@ -79,15 +74,15 @@ int PARMCI_NbPutV(armci_giov_t *iov, int iov_len, int proc, armci_hdl_t* handle) * @param[in] proc Target process. * @return Success 0, otherwise non-zero. */ -int PARMCI_NbGetV(armci_giov_t *iov, int iov_len, int proc, armci_hdl_t* handle) { - int v; +int PARMCI_NbGetV(armci_giov_t *iov, int iov_len, int proc, armci_hdl_t* handle) +{ int blocking = 0; if (ARMCII_GLOBAL_STATE.shr_buf_method != ARMCII_SHR_BUF_NOGUARD) { blocking = 1; } - for (v = 0; v < iov_len; v++) { + for (int v = 0; v < iov_len; v++) { void **dst_buf; int overlapping, same_alloc; @@ -100,15 +95,10 @@ int PARMCI_NbGetV(armci_giov_t *iov, int iov_len, int proc, armci_hdl_t* handle) ARMCII_Buf_prepare_write_vec(iov[v].dst_ptr_array, &dst_buf, iov[v].ptr_array_len, iov[v].bytes); ARMCII_Iov_op_dispatch(ARMCII_OP_GET, iov[v].src_ptr_array, dst_buf, iov[v].ptr_array_len, iov[v].bytes, 0, - overlapping, same_alloc, proc, blocking); + overlapping, same_alloc, proc, blocking, handle); ARMCII_Buf_finish_write_vec(iov[v].dst_ptr_array, dst_buf, iov[v].ptr_array_len, iov[v].bytes); } - if (handle!=NULL) { - /* Regular (not aggregate) handles merely store the target for future flushing. */ - handle->target = proc; - } - gmr_progress(); return 0; @@ -132,15 +122,15 @@ int PARMCI_NbGetV(armci_giov_t *iov, int iov_len, int proc, armci_hdl_t* handle) * @param[in] proc Target process. * @return Success 0, otherwise non-zero. */ -int PARMCI_NbAccV(int datatype, void *scale, armci_giov_t *iov, int iov_len, int proc, armci_hdl_t* handle) { - int v; +int PARMCI_NbAccV(int datatype, void *scale, armci_giov_t *iov, int iov_len, int proc, armci_hdl_t* handle) +{ int blocking = 0; if (ARMCII_GLOBAL_STATE.shr_buf_method != ARMCII_SHR_BUF_NOGUARD) { blocking = 1; } - for (v = 0; v < iov_len; v++) { + for (int v = 0; v < iov_len; v++) { void **src_buf; int overlapping, same_alloc; @@ -152,15 +142,10 @@ int PARMCI_NbAccV(int datatype, void *scale, armci_giov_t *iov, int iov_len, int ARMCII_Buf_prepare_acc_vec(iov[v].src_ptr_array, &src_buf, iov[v].ptr_array_len, iov[v].bytes, datatype, scale); ARMCII_Iov_op_dispatch(ARMCII_OP_ACC, src_buf, iov[v].dst_ptr_array, iov[v].ptr_array_len, iov[v].bytes, datatype, - overlapping, same_alloc, proc, blocking); + overlapping, same_alloc, proc, blocking, handle); ARMCII_Buf_finish_acc_vec(iov[v].src_ptr_array, src_buf, iov[v].ptr_array_len, iov[v].bytes); } - if (handle!=NULL) { - /* Regular (not aggregate) handles merely store the target for future flushing. */ - handle->target = proc; - } - gmr_progress(); return 0;