Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
6f3bf4c
test/bench: adapt p2p tests to multi-pair tests
hzhou Aug 7, 2025
005451f
test/bench: add p2p_self test
hzhou Aug 8, 2025
2e75378
misc: add MPIR_async_test
hzhou Jan 31, 2024
74630f4
ch4/ofi: create general pipeline chunk pools
hzhou Aug 5, 2025
3f25663
ch4/ofi: wrap initialization of per-vci struct
hzhou Aug 6, 2025
9213ff5
ch4/ofi: make MPIDI_OFI_request_t a union
hzhou Aug 10, 2025
0a30064
ch4/ofi: add MPIR_CVAR_CH4_OFI_RNDV_PROTOCOL
hzhou Aug 5, 2025
255ad49
ch4/ofi: add MPIDI_OFI_RNDV_{send,recv}_hdr
hzhou Aug 7, 2025
70e6d15
ch4/ofi: avoid posting large buffer in recv
hzhou Aug 5, 2025
7bdf40b
ch4/ofi: add rndv pipeline protocol
hzhou Aug 1, 2025
d12c93b
ch4/ofi: add rndv read protocol
hzhou Aug 6, 2025
228e722
ch4/ofi: add rndv write protocol
hzhou Aug 8, 2025
b7d35ce
ch4/ofi: remove the old gpu pipeline code
hzhou Aug 7, 2025
a80ced9
ch4/ofi: remove the huge protocol
hzhou Aug 7, 2025
b5fec23
ch4/ofi: removing leftover constants
hzhou Aug 8, 2025
6acf08b
ch4/ofi: remove MPIR_CVAR_CH4_OFI_EAGER_MAX_MSG_SIZE
hzhou Aug 7, 2025
d649898
ch4/ofi: fix warnings in MPIDI_NM_progress
hzhou Aug 7, 2025
561710f
ch4/ofi: refactor and implement rndv auto selection
hzhou Aug 8, 2025
9630d42
ch4/ofi: synchronize remote_data_sz in rndv protocols
hzhou Aug 9, 2025
551c238
ch4/ofi: avoid overwriting rndv fields
hzhou Aug 10, 2025
f634d90
ch4/ofi: fix thread critical sections in rndv algorithms
hzhou Aug 11, 2025
e1a40a8
test: add tests to cover ofi rndv protocols
hzhou Aug 11, 2025
0335526
ch4/ofi: fix MPIDI_OFI_rndv_need_pack for reg_host
hzhou Aug 18, 2025
f4b1e3e
ch4/ofi: fix noinline build
hzhou Aug 20, 2025
5e8794d
ch4: pass rreq to MPIDI_NM_am_can_do_tag
hzhou Aug 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/include/mpiimpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ typedef struct MPIR_Stream MPIR_Stream;
/******************* PART 3: DEVICE INDEPENDENT HEADERS **********************/
/*****************************************************************************/

#include "mpir_misc.h"
#include "mpir_dbg.h"
#include "mpir_objects.h"
#include "mpir_strerror.h"
Expand All @@ -166,6 +165,7 @@ typedef struct MPIR_Stream MPIR_Stream;
#include "mpir_mem.h"
#include "mpir_info.h"
#include "mpir_errcodes.h"
#include "mpir_misc.h"
#include "mpir_errhandler.h"
#include "mpir_attr_generic.h"
#include "mpir_contextid.h"
Expand Down
20 changes: 20 additions & 0 deletions src/include/mpir_misc.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,26 @@ typedef struct {
MPIR_request_type_t type;
} MPIR_gpu_req;

MPL_STATIC_INLINE_PREFIX void MPIR_async_test(MPIR_gpu_req * areq, int *is_done)
{
int err;
switch (areq->type) {
case MPIR_NULL_REQUEST:
/* a dummy, immediately complete */
*is_done = 1;
break;
case MPIR_TYPEREP_REQUEST:
MPIR_Typerep_test(areq->u.y_req, is_done);
break;
case MPIR_GPU_REQUEST:
err = MPL_gpu_test(&areq->u.gpu_req, is_done);
MPIR_Assertp(err == MPL_SUCCESS);
break;
default:
MPIR_Assert(0);
}
}

int MPIR_Localcopy(const void *sendbuf, MPI_Aint sendcount, MPI_Datatype sendtype,
void *recvbuf, MPI_Aint recvcount, MPI_Datatype recvtype);
int MPIR_Ilocalcopy(const void *sendbuf, MPI_Aint sendcount, MPI_Datatype sendtype,
Expand Down
2 changes: 0 additions & 2 deletions src/include/mpir_typerep.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ int MPIR_Typerep_ipack(const void *inbuf, MPI_Aint incount, MPI_Datatype datatyp
int MPIR_Typerep_iunpack(const void *inbuf, MPI_Aint insize, void *outbuf, MPI_Aint outcount,
MPI_Datatype datatype, MPI_Aint outoffset, MPI_Aint * actual_unpack_bytes,
MPIR_Typerep_req * typerep_req, uint32_t flags);
int MPIR_Typerep_wait(MPIR_Typerep_req typerep_req);
int MPIR_Typerep_test(MPIR_Typerep_req typerep_req, int *completed);

int MPIR_Typerep_size_external32(MPI_Datatype type);
int MPIR_Typerep_pack_external(const void *inbuf, MPI_Aint incount, MPI_Datatype datatype,
Expand Down
3 changes: 3 additions & 0 deletions src/mpi/datatype/typerep/src/typerep_pre.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,7 @@ typedef struct {
#define MPIR_TYPEREP_HANDLE_NULL NULL
#endif

int MPIR_Typerep_wait(MPIR_Typerep_req typerep_req);
int MPIR_Typerep_test(MPIR_Typerep_req typerep_req, int *completed);

#endif /* TYPEREP_PRE_H_INCLUDED */
7 changes: 6 additions & 1 deletion src/mpi/misc/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,12 @@ int MPIR_Ilocalcopy_gpu(const void *sendbuf, MPI_Aint sendcount, MPI_Datatype se
do_localcopy(sendbuf, sendcount, sendtype, sendoffset, recvbuf, recvcount, recvtype,
recvoffset, LOCALCOPY_NONBLOCKING, &req->u.y_req);
MPIR_ERR_CHECK(mpi_errno);
req->type = MPIR_TYPEREP_REQUEST;

if (req->u.y_req.req == MPIR_TYPEREP_REQ_NULL) {
req->type = MPIR_NULL_REQUEST;
} else {
req->type = MPIR_TYPEREP_REQUEST;
}
#endif

fn_exit:
Expand Down
2 changes: 2 additions & 0 deletions src/mpid/ch4/netmod/ofi/Makefile.mk
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ mpi_core_sources += src/mpid/ch4/netmod/ofi/func_table.c \
src/mpid/ch4/netmod/ofi/ofi_part.c \
src/mpid/ch4/netmod/ofi/ofi_events.c \
src/mpid/ch4/netmod/ofi/ofi_rndv.c \
src/mpid/ch4/netmod/ofi/ofi_rndv_read.c \
src/mpid/ch4/netmod/ofi/ofi_pipeline.c \
src/mpid/ch4/netmod/ofi/ofi_huge.c \
src/mpid/ch4/netmod/ofi/ofi_progress.c \
src/mpid/ch4/netmod/ofi/ofi_am_events.c \
Expand Down
20 changes: 20 additions & 0 deletions src/mpid/ch4/netmod/ofi/ofi_events.c
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,26 @@ int MPIDI_OFI_dispatch_function(int vci, struct fi_cq_tagged_entry *wc, MPIR_Req
mpi_errno = MPIDI_OFI_rndv_cts_event(vci, wc, req);
break;

case MPIDI_OFI_EVENT_PIPELINE_SEND_CHUNK:
mpi_errno = MPIDI_OFI_pipeline_send_chunk_event(wc, req);
break;

case MPIDI_OFI_EVENT_PIPELINE_RECV_CHUNK:
mpi_errno = MPIDI_OFI_pipeline_recv_chunk_event(wc, req);
break;

case MPIDI_OFI_EVENT_RNDVREAD_RECV_MRS:
mpi_errno = MPIDI_OFI_rndvread_recv_mrs_event(wc, req);
break;

case MPIDI_OFI_EVENT_RNDVREAD_READ_CHUNK:
mpi_errno = MPIDI_OFI_rndvread_read_chunk_event(wc, req);
break;

case MPIDI_OFI_EVENT_RNDVREAD_ACK:
mpi_errno = MPIDI_OFI_rndvread_ack_event(wc, req);
break;

case MPIDI_OFI_EVENT_CHUNK_DONE:
mpi_errno = chunk_done_event(vci, wc, req);
break;
Expand Down
5 changes: 5 additions & 0 deletions src/mpid/ch4/netmod/ofi/ofi_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ int MPIDI_OFI_dispatch_function(int vci, struct fi_cq_tagged_entry *wc, MPIR_Req
int MPIDI_OFI_recv_rndv_event(int vci, struct fi_cq_tagged_entry *wc, MPIR_Request * rreq);
int MPIDI_OFI_peek_rndv_event(int vci, struct fi_cq_tagged_entry *wc, MPIR_Request * rreq);
int MPIDI_OFI_rndv_cts_event(int vci, struct fi_cq_tagged_entry *wc, MPIR_Request * req);
int MPIDI_OFI_pipeline_send_chunk_event(struct fi_cq_tagged_entry *wc, MPIR_Request * r);
int MPIDI_OFI_pipeline_recv_chunk_event(struct fi_cq_tagged_entry *wc, MPIR_Request * r);
int MPIDI_OFI_rndvread_recv_mrs_event(struct fi_cq_tagged_entry *wc, MPIR_Request * r);
int MPIDI_OFI_rndvread_read_chunk_event(struct fi_cq_tagged_entry *wc, MPIR_Request * r);
int MPIDI_OFI_rndvread_ack_event(struct fi_cq_tagged_entry *wc, MPIR_Request * r);

MPL_STATIC_INLINE_PREFIX MPL_gpu_engine_type_t MPIDI_OFI_gpu_get_recv_engine_type(void)
{
Expand Down
2 changes: 2 additions & 0 deletions src/mpid/ch4/netmod/ofi/ofi_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ ATTRIBUTE((unused));
MPIDI_OFI_global.prov_use[nic]->domain_attr->name : "(n/a)")
#define MPIDI_OFI_DEFAULT_NIC_NAME (MPIDI_OFI_NIC_NAME(0))

#define MPIDI_OFI_EAGER_THRESH (MPIR_CVAR_CH4_OFI_EAGER_THRESHOLD == -1 ? MPIDI_OFI_global.max_msg_size : MPIR_CVAR_CH4_OFI_EAGER_THRESHOLD)

int MPIDI_OFI_progress_uninlined(int vci);
int MPIDI_OFI_handle_cq_error(int vci, int nic, ssize_t ret);
int MPIDI_OFI_flush_send_queue(void);
Expand Down
61 changes: 56 additions & 5 deletions src/mpid/ch4/netmod/ofi/ofi_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,26 @@ categories :
description : >-
If true, enable OFI triggered ops for MPI collectives.

- name : MPIR_CVAR_CH4_OFI_PIPELINE_CHUNK_SZ
category : CH4_OFI
type : int
default : 1048576
class : none
verbosity : MPI_T_VERBOSITY_USER_BASIC
scope : MPI_T_SCOPE_LOCAL
description : >-
Specifies the chunk size (in bytes) for pipeline data transfer.

- name : MPIR_CVAR_CH4_OFI_PIPELINE_NUM_CHUNKS
category : CH4_OFI
type : int
default : 32
class : none
verbosity : MPI_T_VERBOSITY_USER_BASIC
scope : MPI_T_SCOPE_LOCAL
description : >-
Specifies the number of chunk buffers for pipeline data transfer.

- name : MPIR_CVAR_CH4_OFI_ENABLE_GPU_PIPELINE
category : CH4_OFI
type : boolean
Expand Down Expand Up @@ -655,7 +675,7 @@ int MPIDI_OFI_init_local(int *tag_bits)
MPL_COMPILE_TIME_ASSERT(offsetof(struct MPIR_Request, dev.ch4.netmod) ==
offsetof(MPIDI_OFI_dynamic_process_request_t, context));
MPL_COMPILE_TIME_ASSERT(offsetof(struct MPIR_Request, dev.ch4.am.netmod_am.ofi.context) ==
offsetof(struct MPIR_Request, dev.ch4.netmod.ofi.context));
offsetof(struct MPIR_Request, dev.ch4.netmod.ofi.direct.context));
MPL_COMPILE_TIME_ASSERT(sizeof(MPIDI_Devreq_t) >= sizeof(MPIDI_OFI_request_t));

int err;
Expand Down Expand Up @@ -794,8 +814,7 @@ int MPIDI_OFI_init_local(int *tag_bits)
MPIR_Assert(MPIDI_OFI_DEFAULT_SHORT_SEND_SIZE <= MPIR_CVAR_CH4_PACK_BUFFER_SIZE);

MPIDI_OFI_global.num_vcis = 1;
MPIDI_OFI_am_init(0);
MPIDI_OFI_am_post_recv(0, 0);
MPIDI_OFI_init_per_vci(0);

fn_exit:
*tag_bits = MPIDI_OFI_TAG_BITS;
Expand Down Expand Up @@ -984,6 +1003,10 @@ int MPIDI_OFI_mpi_finalize_hook(void)

MPIDIU_map_destroy(MPIDI_OFI_global.win_map);

for (int vci = 0; vci < MPIDI_OFI_global.num_vcis; vci++) {
MPIDU_genq_private_pool_destroy(MPIDI_OFI_global.per_vci[vci].pipeline_pool);
}

if (MPIDI_OFI_ENABLE_AM) {
for (int vci = 0; vci < MPIDI_OFI_global.num_vcis; vci++) {
while (MPIDI_OFI_global.per_vci[vci].am_unordered_msgs) {
Expand Down Expand Up @@ -1584,7 +1607,35 @@ static void dump_global_settings(void)

/* static functions for AM */

int MPIDI_OFI_am_init(int vci)
static int am_init(int vci);
static int am_post_recv(int vci, int nic);

int MPIDI_OFI_init_per_vci(int vci)
{
int mpi_errno = MPI_SUCCESS;

/* Create chunk buffer pool (for pipeline etc.) */
mpi_errno = MPIDU_genq_private_pool_create(MPIR_CVAR_CH4_OFI_PIPELINE_CHUNK_SZ,
MPIR_CVAR_CH4_OFI_PIPELINE_NUM_CHUNKS,
MPIR_CVAR_CH4_OFI_PIPELINE_NUM_CHUNKS,
host_alloc_registered,
host_free_registered,
&MPIDI_OFI_global.per_vci[vci].pipeline_pool);
MPIR_ERR_CHECK(mpi_errno);

mpi_errno = am_init(vci);
MPIR_ERR_CHECK(mpi_errno);

mpi_errno = am_post_recv(vci, 0);
MPIR_ERR_CHECK(mpi_errno);

fn_exit:
return mpi_errno;
fn_fail:
goto fn_exit;
}

static int am_init(int vci)
{
int mpi_errno = MPI_SUCCESS;

Expand Down Expand Up @@ -1633,7 +1684,7 @@ int MPIDI_OFI_am_init(int vci)
goto fn_exit;
}

int MPIDI_OFI_am_post_recv(int vci, int nic)
static int am_post_recv(int vci, int nic)
{
int mpi_errno = MPI_SUCCESS;

Expand Down
3 changes: 1 addition & 2 deletions src/mpid/ch4/netmod/ofi/ofi_init.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ void MPIDI_OFI_update_global_settings(struct fi_info *prov);
bool MPIDI_OFI_nic_already_used(const struct fi_info *prov, struct fi_info **others, int nic_count);

int MPIDI_OFI_create_vci_context(int vci, int nic);
int MPIDI_OFI_am_init(int vci);
int MPIDI_OFI_am_post_recv(int vci, int nic);
int MPIDI_OFI_init_per_vci(int vci);

bool MPIDI_OFI_nic_is_up(struct fi_info *prov);

Expand Down
Loading