Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/mpid/ch4/shm/posix/eager/iqueue/Makefile.mk
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ if BUILD_CH4_SHM_POSIX_EAGER_IQUEUE

noinst_HEADERS += src/mpid/ch4/shm/posix/eager/iqueue/iqueue_send.h \
src/mpid/ch4/shm/posix/eager/iqueue/iqueue_recv.h \
src/mpid/ch4/shm/posix/eager/iqueue/iqueue_qp.h \
src/mpid/ch4/shm/posix/eager/iqueue/posix_eager_inline.h

mpi_core_sources += src/mpid/ch4/shm/posix/eager/iqueue/func_table.c \
src/mpid/ch4/shm/posix/eager/iqueue/iqueue_qp.c \
src/mpid/ch4/shm/posix/eager/iqueue/iqueue_init.c

endif
1 change: 1 addition & 0 deletions src/mpid/ch4/shm/posix/eager/iqueue/iqueue_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@
#include <mpidimpl.h>
#include "mpidu_init_shm.h"
#include "iqueue_types.h"
#include "iqueue_qp.h"

#endif /* POSIX_EAGER_IQUEUE_IMPL_H_INCLUDED */
87 changes: 86 additions & 1 deletion src/mpid/ch4/shm/posix/eager/iqueue/iqueue_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,35 @@
description : >-
Size of each cell.

- name : MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_QP_NUM_CELLS
category : CH4
type : int
default : 64
class : none
verbosity : MPI_T_VERBOSITY_USER_BASIC
scope : MPI_T_SCOPE_ALL_EQ
description : >-
The number of cells in each ring buffer.

- name : MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_QP_CELL_SIZE
category : CH4
type : int
default : 320
class : none
verbosity : MPI_T_VERBOSITY_USER_BASIC
scope : MPI_T_SCOPE_ALL_EQ
description : >-
Size of each cell of ring buffer.

- name : MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_QP_ENABLE
category : CH4
type : boolean
default : false
class : none
verbosity : MPI_T_VERBOSITY_USER_BASIC
scope : MPI_T_SCOPE_ALL_EQ
description : >-
Control if ring buffers are enabled.
=== END_MPI_T_CVAR_INFO_BLOCK ===
*/

Expand All @@ -45,6 +74,7 @@ static int init_transport(void *slab, int vci_src, int vci_dst)

transport->num_cells = MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_NUM_CELLS;
transport->size_of_cell = MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_CELL_SIZE;
transport->qp = NULL;

if (MPIR_CVAR_CH4_SHM_POSIX_TOPO_ENABLE) {
int queue_types[2] = {
Expand Down Expand Up @@ -75,6 +105,35 @@ static int init_transport(void *slab, int vci_src, int vci_dst)
MPIDU_GENQ_SHMEM_QUEUE_TYPE__MPSC);
MPIR_ERR_CHECK(mpi_errno);

if (MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_QP_ENABLE) {
int buf_idx_base = MPIR_Process.local_rank;
int rb_size = MPIDI_POSIX_eager_iqueue_rb_size(MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_QP_CELL_SIZE,
MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_QP_NUM_CELLS);
char *qp_base = (char *) slab + MPIDI_POSIX_eager_iqueue_global.qp_offset;

transport->qp = MPL_malloc(sizeof(MPIDI_POSIX_eager_iqueue_qp_t *)
* MPIR_Process.local_size, MPL_MEM_SHM);
MPIR_Assert(transport->qp);

for (int peer_rank = 0; peer_rank < MPIR_Process.local_size; peer_rank++) {
if (peer_rank == MPIR_Process.local_rank) {
transport->qp[peer_rank] = NULL;
continue;
}
int send_idx = MPIR_Process.local_rank * MPIR_Process.local_size + peer_rank;
int recv_idx = peer_rank * MPIR_Process.local_size + MPIR_Process.local_rank;
char *send_slab = qp_base + send_idx * rb_size;
char *recv_slab = qp_base + recv_idx * rb_size;
MPIDI_POSIX_eager_iqueue_qp_t *qp =
MPIDI_POSIX_eager_iqueue_qp_init(send_slab, recv_slab,
MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_QP_CELL_SIZE,
MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_QP_NUM_CELLS,
peer_rank);
qp->cell_pool = transport->cell_pool;
transport->qp[peer_rank] = qp;
}
}

fn_exit:
return mpi_errno;
fn_fail:
Expand All @@ -93,9 +152,21 @@ int MPIDI_POSIX_iqueue_shm_size(int local_size)
MPIDU_genq_shmem_pool_size(cell_size, num_cells, local_size, num_free_queue);
int terminal_size = local_size * sizeof(MPIDU_genq_shmem_queue_u);

int slab_size = pool_size + terminal_size;
int slab_size = MPL_ROUND_UP_ALIGN(pool_size + terminal_size, sysconf(_SC_PAGESIZE));

MPIDI_POSIX_eager_iqueue_global.terminal_offset = pool_size;

if (MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_QP_ENABLE) {
int qp_cell_size = MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_QP_CELL_SIZE;
int qp_num_cells = MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_QP_NUM_CELLS;

int total_qp_size = MPIDI_POSIX_eager_iqueue_qp_size(qp_cell_size, qp_num_cells)
* local_size * local_size;

MPIDI_POSIX_eager_iqueue_global.qp_offset = slab_size;
slab_size += total_qp_size;
}

MPIDI_POSIX_eager_iqueue_global.slab_size = slab_size;
}

Expand Down Expand Up @@ -184,6 +255,13 @@ int MPIDI_POSIX_iqueue_finalize(void)
MPIR_ERR_CHECK(mpi_errno);

MPIDI_POSIX_eager_iqueue_global.root_slab = NULL;

if (MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_QP_ENABLE) {
for (int i = 0; i < MPIR_Process.local_size; i++) {
MPIDI_POSIX_eager_iqueue_qp_free(&transport->qp[i]);
}
MPL_free(transport->qp);
}
}

if (MPIDI_POSIX_eager_iqueue_global.all_vci_slab) {
Expand All @@ -198,6 +276,13 @@ int MPIDI_POSIX_iqueue_finalize(void)

mpi_errno = MPIDU_genq_shmem_pool_destroy(transport->cell_pool);
MPIR_ERR_CHECK(mpi_errno);

if (MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_QP_ENABLE) {
for (int i = 0; i < MPIR_Process.local_size; i++) {
MPIDI_POSIX_eager_iqueue_qp_free(&transport->qp[i]);
}
MPL_free(transport->qp);
}
}
}

Expand Down
65 changes: 65 additions & 0 deletions src/mpid/ch4/shm/posix/eager/iqueue/iqueue_qp.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (C) by Argonne National Laboratory
* See COPYRIGHT in top-level directory
*/

#include <mpidimpl.h>
#include "iqueue_noinline.h"

MPIDI_POSIX_eager_iqueue_qp_t *MPIDI_POSIX_eager_iqueue_qp_init(void *send_slab, void *recv_slab,
int cell_size, int num_cells,
int peer_rank)
{
MPIDI_POSIX_eager_iqueue_qp_t *qp = NULL;

MPIR_Assert(send_slab);
MPIR_Assert(recv_slab);
MPIR_Assert(MPL_CHECK_ALIGN((uintptr_t) send_slab, sysconf(_SC_PAGESIZE)));
MPIR_Assert(MPL_CHECK_ALIGN((uintptr_t) recv_slab, sysconf(_SC_PAGESIZE)));
MPIR_Assert(MPL_CHECK_ALIGN(cell_size, MPL_CACHELINE_SIZE));
MPIR_Assert(MPL_is_pof2(num_cells));
MPIR_Assert(peer_rank != MPIR_Process.local_rank);

qp = (MPIDI_POSIX_eager_iqueue_qp_t *) MPL_malloc(sizeof(MPIDI_POSIX_eager_iqueue_qp_t),
MPL_MEM_SHM);
if (qp == NULL) {
goto fn_fail;
}

qp->cell_size = cell_size;
qp->num_cells = num_cells;

qp->send.cntr = (MPIDI_POSIX_eager_iqueue_rb_cntr_t *) send_slab;
qp->send.base = send_slab + sizeof(MPIDI_POSIX_eager_iqueue_rb_cntr_t);
qp->send.next_seq = 0;
qp->send.last_ack = 0;

qp->recv.cntr = (MPIDI_POSIX_eager_iqueue_rb_cntr_t *) recv_slab;
qp->recv.base = recv_slab + sizeof(MPIDI_POSIX_eager_iqueue_rb_cntr_t);
qp->recv.next_seq = 0;
qp->recv.last_ack = 0;

qp->peer_rank = peer_rank;

/* init counter will do first-touch NUMA affinity for the recv slab */
memset(recv_slab, 0, MPIDI_POSIX_eager_iqueue_rb_size(cell_size, num_cells));

fn_exit:
return qp;
fn_fail:
if (qp) {
MPL_free(qp);
qp = NULL;
}
goto fn_exit;
}

void MPIDI_POSIX_eager_iqueue_qp_free(MPIDI_POSIX_eager_iqueue_qp_t ** qp)
{
if (*qp == NULL) {
return;
}

MPL_free(*qp);
qp = NULL;
}
93 changes: 93 additions & 0 deletions src/mpid/ch4/shm/posix/eager/iqueue/iqueue_qp.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright (C) by Argonne National Laboratory
* See COPYRIGHT in top-level directory
*/

#ifndef POSIX_EAGER_IQUEUE_QP_H_INCLUDED
#define POSIX_EAGER_IQUEUE_QP_H_INCLUDED

#include <mpidimpl.h>

MPIDI_POSIX_eager_iqueue_qp_t *MPIDI_POSIX_eager_iqueue_qp_init(void *send_slab, void *recv_slab,
int cell_size, int num_cells,
int peer_rank);
void MPIDI_POSIX_eager_iqueue_qp_free(MPIDI_POSIX_eager_iqueue_qp_t ** qp);

MPL_STATIC_INLINE_PREFIX int MPIDI_POSIX_eager_iqueue_rb_size(int cell_size, int num_cells)
{
return MPL_ROUND_UP_ALIGN((sizeof(MPIDI_POSIX_eager_iqueue_rb_cntr_t) + num_cells * cell_size),
sysconf(_SC_PAGESIZE));
}

MPL_STATIC_INLINE_PREFIX int MPIDI_POSIX_eager_iqueue_qp_size(int cell_size, int num_cells)
{
return 2 * MPIDI_POSIX_eager_iqueue_rb_size(cell_size, num_cells);
}

MPL_STATIC_INLINE_PREFIX
int MPIDI_POSIX_eager_iqueue_qp_cntr_to_idx(MPIDI_POSIX_eager_iqueue_qp_t * qp, uint64_t cntr)
{
return cntr & (qp->num_cells - 1);
}

MPL_STATIC_INLINE_PREFIX void
*MPIDI_POSIX_eager_iqueue_qp_get_send_cell(MPIDI_POSIX_eager_iqueue_qp_t * qp)
{
char *cell = NULL;
if (qp->send.next_seq - qp->send.last_ack == qp->num_cells) {
uint64_t new_ack = MPL_atomic_acquire_load_uint64(&qp->send.cntr->ack);
if (new_ack == qp->send.last_ack) {
return NULL;
} else {
for (int i = qp->send.last_ack; i < new_ack; i++) {
MPIDI_POSIX_eager_iqueue_cell_ext_t *tmp = (MPIDI_POSIX_eager_iqueue_cell_ext_t *)
(qp->send.base +
MPIDI_POSIX_eager_iqueue_qp_cntr_to_idx(qp, i) * qp->cell_size);
if (tmp->base.type & MPIDI_POSIX_EAGER_IQUEUE_CELL_TYPE_BUF) {
cell = MPIDU_genq_shmem_pool_handle_to_cell(qp->cell_pool, tmp->buf_handle);
MPIDU_genq_shmem_pool_cell_free(qp->cell_pool, cell);
}
}
}
qp->send.last_ack = new_ack;
}

cell = qp->send.base + MPIDI_POSIX_eager_iqueue_qp_cntr_to_idx(qp, qp->send.next_seq)
* qp->cell_size;

return cell;
}

MPL_STATIC_INLINE_PREFIX void
*MPIDI_POSIX_eager_iqueue_qp_get_recv_cell(MPIDI_POSIX_eager_iqueue_qp_t * qp)
{
char *cell = NULL;
if (qp->recv.last_ack == qp->recv.next_seq) {
uint64_t new_seq = MPL_atomic_acquire_load_uint64(&qp->recv.cntr->seq);
if (new_seq == qp->recv.next_seq) {
return NULL;
}
qp->recv.next_seq = new_seq;
}

cell = qp->recv.base + MPIDI_POSIX_eager_iqueue_qp_cntr_to_idx(qp, qp->recv.last_ack)
* qp->cell_size;

return cell;
}

MPL_STATIC_INLINE_PREFIX void
MPIDI_POSIX_eager_iqueue_qp_send_commit(MPIDI_POSIX_eager_iqueue_qp_t * qp)
{
qp->send.next_seq++;
MPL_atomic_release_store_uint64(&qp->send.cntr->seq, qp->send.next_seq);
}

MPL_STATIC_INLINE_PREFIX void
MPIDI_POSIX_eager_iqueue_qp_recv_complete(MPIDI_POSIX_eager_iqueue_qp_t * qp)
{
qp->recv.last_ack++;
MPL_atomic_release_store_uint64(&qp->recv.cntr->ack, qp->recv.last_ack);
}

#endif /* POSIX_EAGER_IQUEUE_QP_H_INCLUDED */
45 changes: 38 additions & 7 deletions src/mpid/ch4/shm/posix/eager/iqueue/iqueue_recv.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,45 @@ MPIDI_POSIX_eager_recv_begin(int vci, MPIDI_POSIX_eager_recv_transaction_t * tra
for (int vci_src = 0; vci_src < max_vcis; vci_src++) {
transport = MPIDI_POSIX_eager_iqueue_get_transport(vci_src, vci);

MPIDU_genq_shmem_queue_dequeue(transport->cell_pool, transport->my_terminal,
(void **) &cell);
if (MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_QP_ENABLE) {
MPIDI_POSIX_eager_iqueue_qp_t *qp = NULL;
for (int i = 0; i < MPIR_Process.local_size; i++) {
if (i == MPIR_Process.local_rank)
continue;

qp = transport->qp[i];
cell = (MPIDI_POSIX_eager_iqueue_cell_t *)
MPIDI_POSIX_eager_iqueue_qp_get_recv_cell(qp);
if (cell == NULL) {
continue;
} else {
break;
}
}
} else {
MPIDU_genq_shmem_queue_dequeue(transport->cell_pool, transport->my_terminal,
(void **) &cell);
}

if (cell) {
transaction->src_local_rank = cell->from;
transaction->src_vci = vci_src;
transaction->dst_vci = vci;
transaction->payload = MPIDI_POSIX_EAGER_IQUEUE_CELL_PAYLOAD(cell);
transaction->payload_sz = cell->payload_size;

if (likely(cell->type == MPIDI_POSIX_EAGER_IQUEUE_CELL_TYPE_HDR)) {
if (cell->type & MPIDI_POSIX_EAGER_IQUEUE_CELL_TYPE_BUF) {
uint64_t handle = ((MPIDI_POSIX_eager_iqueue_cell_ext_t *) cell)->buf_handle;
/* payload should be buffer mapped from the handle */
transaction->payload = MPIDU_genq_shmem_pool_handle_to_cell(transport->cell_pool,
handle);
} else {
transaction->payload = MPIDI_POSIX_EAGER_IQUEUE_CELL_PAYLOAD(cell);
}

if (likely(cell->type & MPIDI_POSIX_EAGER_IQUEUE_CELL_TYPE_HDR)) {
transaction->msg_hdr = &cell->am_header;
} else {
MPIR_Assert(cell->type == MPIDI_POSIX_EAGER_IQUEUE_CELL_TYPE_DATA);
MPIR_Assert(cell->type & MPIDI_POSIX_EAGER_IQUEUE_CELL_TYPE_DATA);
transaction->msg_hdr = NULL;
}

Expand Down Expand Up @@ -66,9 +92,14 @@ MPIDI_POSIX_eager_recv_commit(MPIDI_POSIX_eager_recv_transaction_t * transaction
MPIR_FUNC_ENTER;

transport = MPIDI_POSIX_eager_iqueue_get_transport(transaction->src_vci, transaction->dst_vci);
cell = (MPIDI_POSIX_eager_iqueue_cell_t *) transaction->transport.iqueue.pointer_to_cell;

MPIDU_genq_shmem_pool_cell_free(transport->cell_pool, cell);
if (MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_QP_ENABLE) {
MPIDI_POSIX_eager_iqueue_qp_t *qp = transport->qp[transaction->src_local_rank];
MPIDI_POSIX_eager_iqueue_qp_recv_complete(qp);
} else {
cell = (MPIDI_POSIX_eager_iqueue_cell_t *) transaction->transport.iqueue.pointer_to_cell;
MPIDU_genq_shmem_pool_cell_free(transport->cell_pool, cell);
}

MPIR_FUNC_EXIT;
}
Expand Down
Loading