diff --git a/ompi/mca/part/base/part_base_frame.c b/ompi/mca/part/base/part_base_frame.c index f9da3548456..851435b9562 100644 --- a/ompi/mca/part/base/part_base_frame.c +++ b/ompi/mca/part/base/part_base_frame.c @@ -137,8 +137,8 @@ static int mca_part_base_open(mca_base_open_flag_t flags) mca_part_base_selected_component.partm_finalize = NULL; - /* Currently this uses a default with no selection criteria as there is only 1 module. */ opal_pointer_array_add(&mca_part_base_part, strdup("persist")); + opal_pointer_array_add(&mca_part_base_part, strdup("persist_aggregated")); return OMPI_SUCCESS; } diff --git a/ompi/mca/part/persist_aggregated/Makefile.am b/ompi/mca/part/persist_aggregated/Makefile.am new file mode 100644 index 00000000000..4146e0c6f96 --- /dev/null +++ b/ompi/mca/part/persist_aggregated/Makefile.am @@ -0,0 +1,55 @@ +# +# Copyright (c) 2004-2006 The Regents of the University of California. +# All rights reserved. +# Copyright (c) 2009-2024 High Performance Computing Center Stuttgart, +# University of Stuttgart. All rights reserved. +# Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. +# Copyright (c) 2017 IBM Corporation. All rights reserved. +# Copyright (c) 2020-2021 Sandia National Laboratories. All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +# Make the output library in this directory, and name it either +# mca__.la (for DSO builds) or libmca__.la +# (for static builds). + +EXTRA_DIST = post_configure.sh + +if MCA_BUILD_ompi_part_persist_aggregated_DSO +component_noinst = +component_install = mca_part_persist_aggregated.la +else +component_noinst = libmca_part_persist_aggregated.la +component_install = +endif + +local_sources = \ + part_persist_aggregated.c \ + part_persist_aggregated.h \ + part_persist_aggregated_component.c \ + part_persist_aggregated_component.h \ + part_persist_aggregated_recvreq.h \ + part_persist_aggregated_recvreq.c \ + part_persist_aggregated_request.h \ + part_persist_aggregated_request.c \ + part_persist_aggregated_sendreq.h \ + part_persist_aggregated_sendreq.c \ + schemes/part_persist_aggregated_scheme_regular.h \ + schemes/part_persist_aggregated_scheme_regular.c + +mcacomponentdir = $(ompilibdir) +mcacomponent_LTLIBRARIES = $(component_install) +mca_part_persist_aggregated_la_SOURCES = $(local_sources) +mca_part_persist_aggregated_la_LIBADD = $(top_builddir)/ompi/lib@OMPI_LIBMPI_NAME@.la \ + $(part_persist_aggregated_LIBS) +mca_part_persist_aggregated_la_LDFLAGS = -module -avoid-version $(part_persist_aggregated_LDFLAGS) + +noinst_LTLIBRARIES = $(component_noinst) +libmca_part_persist_aggregated_la_SOURCES = $(local_sources) +libmca_part_persist_aggregated_la_LIBADD = $(part_persist_aggregated_LIBS) +libmca_part_persist_aggregated_la_LDFLAGS = -module -avoid-version $(part_persist_aggregated_LDFLAGS) + diff --git a/ompi/mca/part/persist_aggregated/part_persist_aggregated.c b/ompi/mca/part/persist_aggregated/part_persist_aggregated.c new file mode 100644 index 00000000000..a99bd0f219b --- /dev/null +++ b/ompi/mca/part/persist_aggregated/part_persist_aggregated.c @@ -0,0 +1,652 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2006-2007 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2007 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2006 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2015 Los Alamos National Security, LLC. All rights + * reserved. + * Copyright (c) 2011-2021 Sandia National Laboratories. All rights reserved. + * Copyright (c) 2024 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * Copyright (c) 2024 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "ompi/communicator/communicator.h" +#include "ompi/mca/part/base/part_base_prequest.h" +#include "ompi/mca/part/base/base.h" + +#include "ompi/mca/part/persist_aggregated/part_persist_aggregated.h" +#include "ompi/mca/part/persist_aggregated/part_persist_aggregated_sendreq.h" +#include "ompi/mca/part/persist_aggregated/part_persist_aggregated_recvreq.h" + +#include "ompi/mca/part/persist_aggregated/schemes/part_persist_aggregated_scheme_regular.h" + +static int mca_part_persist_aggregated_progress(void); +static int mca_part_persist_aggregated_precv_init(void *, size_t, size_t, ompi_datatype_t *, int, int, struct ompi_communicator_t *, struct ompi_info_t *, struct ompi_request_t **); +static int mca_part_persist_aggregated_psend_init(const void*, size_t, size_t, ompi_datatype_t*, int, int, ompi_communicator_t*, struct ompi_info_t *, ompi_request_t**); +static int mca_part_persist_aggregated_pready(size_t, size_t, ompi_request_t*); +static int mca_part_persist_aggregated_parrived(size_t, size_t, int*, ompi_request_t*); + +ompi_part_persist_aggregated_t ompi_part_persist_aggregated = { + .super = { + .part_progress = mca_part_persist_aggregated_progress, + .part_precv_init = mca_part_persist_aggregated_precv_init, + .part_psend_init = mca_part_persist_aggregated_psend_init, + .part_start = mca_part_persist_aggregated_start, + .part_pready = mca_part_persist_aggregated_pready, + .part_parrived = mca_part_persist_aggregated_parrived, + } +}; + +/** + * @brief selects an internal partitioning based on the user-provided partitioning + * and the mca parameters for minimal partition size and maximal partition count. + * + * More precisely, given a partitioning into p partitions of size s, computes + * an internal partitioning into p' partitions of size s' (apart from the last one, + * which has potentially different size r * s): + * p * s = (p' - 1) * s' + r * s + * where + * s' >= s + * p' <= p + * 0 < r * s <= s' + * and + * s' <= max_message_count + * p' >= min_message_size + * (given by mca parameters). + * + * @param[in] partitions number of user-provided partitions + * @param[in] count size of user-provided partitions in elements + * @param[out] internal_partitions number of internal partitions + * @param[out] factor number of public partitions corresponding to each internal + * partitions other than the last one + * @param[out] last_size number of public partitions corresponding to the last internal + * partition + */ +static inline void +part_persist_aggregated_select_internal_partitioning(size_t partitions, + size_t part_size, + size_t *internal_partitions, + size_t *factor, + size_t *remainder) +{ + size_t buffer_size = partitions * part_size; + size_t min_part_size = ompi_part_persist_aggregated.min_message_size; + size_t max_part_count = ompi_part_persist_aggregated.max_message_count; + + // check if max_part_count imposes higher lower bound on partition size + if (max_part_count > 0 && (buffer_size / max_part_count) > min_part_size) { + min_part_size = buffer_size / max_part_count; + } + + // cannot have partitions larger than buffer size + if (min_part_size > buffer_size) { + min_part_size = buffer_size; + } + + if (part_size < min_part_size) { + // have to use larger partititions + // solve p = (p' - 1) * a + r for a (factor) and r (remainder) + *factor = min_part_size / part_size; + *internal_partitions = partitions / *factor; + *remainder = partitions % (*internal_partitions); + + if (*remainder == 0) { // size of last partition must be set + *remainder = *factor; + } else { + // number of partitions was floored, so add 1 for last (smaller) partition + *internal_partitions += 1; + } + } else { + // can keep original partitioning + *internal_partitions = partitions; + *factor = 1; + *remainder = 1; + } +} + +/** + * This is a helper function that frees a request. This requires ompi_part_persist_aggregated.lock be held before calling. + */ +__opal_attribute_always_inline__ static inline int +mca_part_persist_aggregated_free_req(struct mca_part_persist_aggregated_request_t* req) +{ + int err = OMPI_SUCCESS; + size_t i; + opal_list_remove_item(ompi_part_persist_aggregated.progress_list, (opal_list_item_t*)req->progress_elem); + OBJ_RELEASE(req->progress_elem); + + // if on sender side, free aggregation state + if (MCA_PART_PERSIST_AGGREGATED_REQUEST_PSEND == req->req_type) { + mca_part_persist_aggregated_psend_request_t *sendreq = (mca_part_persist_aggregated_psend_request_t *) req; + part_persist_aggregate_regular_free(&sendreq->aggregation_state); + } + + for(i = 0; i < req->real_parts; i++) { + ompi_request_free(&(req->persist_reqs[i])); + } + free(req->persist_reqs); + free(req->flags); + + if( MCA_PART_PERSIST_AGGREGATED_REQUEST_PRECV == req->req_type ) { + MCA_PART_PERSIST_AGGREGATED_PRECV_REQUEST_RETURN(req); + } else { + MCA_PART_PERSIST_AGGREGATED_PSEND_REQUEST_RETURN(req); + } + return err; +} + +static void +mca_part_persist_aggregated_complete(struct mca_part_persist_aggregated_request_t* request) +{ + if(MCA_PART_PERSIST_AGGREGATED_REQUEST_PRECV == request->req_type) { + request->req_ompi.req_status.MPI_SOURCE = request->req_peer; + } else { + request->req_ompi.req_status.MPI_SOURCE = request->req_comm->c_my_rank; + } + request->req_ompi.req_complete_cb = NULL; + request->req_ompi.req_status.MPI_TAG = request->req_tag; + request->req_ompi.req_status._ucount = request->req_bytes; + request->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS; + request->req_part_complete = true; + ompi_request_complete(&(request->req_ompi), true ); +} + +/** + * mca_part_persist_aggregated_progress is the progress function that will be registered. It handles + * both send and recv request testing and completion. It also handles freeing requests, + * after MPI_Free is called and the requests have become inactive. + */ +static int +mca_part_persist_aggregated_progress(void) +{ + mca_part_persist_aggregated_list_t *current; + int err; + size_t i; + + /* prevent re-entry, */ + int block_entry = opal_atomic_add_fetch_32(&(ompi_part_persist_aggregated.block_entry), 1); + if(1 < block_entry) + { + block_entry = opal_atomic_add_fetch_32(&(ompi_part_persist_aggregated.block_entry), -1); + return OMPI_SUCCESS; + } + + OPAL_THREAD_LOCK(&ompi_part_persist_aggregated.lock); + + mca_part_persist_aggregated_request_t* to_delete = NULL; + + /* Don't do anything till a function in the module is called. */ + if(-1 == ompi_part_persist_aggregated.init_world) + { + OPAL_THREAD_UNLOCK(&ompi_part_persist_aggregated.lock); + block_entry = opal_atomic_add_fetch_32(&(ompi_part_persist_aggregated.block_entry), -1); + return OMPI_SUCCESS; + } + + /* Can't do anything if we don't have world */ + if(0 == ompi_part_persist_aggregated.init_world) { + ompi_part_persist_aggregated.my_world_rank = ompi_comm_rank(&ompi_mpi_comm_world.comm); + err = ompi_comm_idup(&ompi_mpi_comm_world.comm, &ompi_part_persist_aggregated.part_comm, &ompi_part_persist_aggregated.part_comm_req); + if(err != OMPI_SUCCESS) { + exit(-1); + } + ompi_part_persist_aggregated.part_comm_ready = 0; + err = ompi_comm_idup(&ompi_mpi_comm_world.comm, &ompi_part_persist_aggregated.part_comm_setup, &ompi_part_persist_aggregated.part_comm_sreq); + if(err != OMPI_SUCCESS) { + exit(-1); + } + ompi_part_persist_aggregated.part_comm_sready = 0; + ompi_part_persist_aggregated.init_world = 1; + + OPAL_THREAD_UNLOCK(&ompi_part_persist_aggregated.lock); + block_entry = opal_atomic_add_fetch_32(&(ompi_part_persist_aggregated.block_entry), -1); + return OMPI_SUCCESS; + } + + /* Check to see if Comms are setup */ + if(0 == ompi_part_persist_aggregated.init_comms) { + if(0 == ompi_part_persist_aggregated.part_comm_ready) { + ompi_request_test(&ompi_part_persist_aggregated.part_comm_req, &ompi_part_persist_aggregated.part_comm_ready, MPI_STATUS_IGNORE); + } + if(0 == ompi_part_persist_aggregated.part_comm_sready) { + ompi_request_test(&ompi_part_persist_aggregated.part_comm_sreq, &ompi_part_persist_aggregated.part_comm_sready, MPI_STATUS_IGNORE); + } + if(0 != ompi_part_persist_aggregated.part_comm_ready && 0 != ompi_part_persist_aggregated.part_comm_sready) { + ompi_part_persist_aggregated.init_comms = 1; + } + OPAL_THREAD_UNLOCK(&ompi_part_persist_aggregated.lock); + block_entry = opal_atomic_add_fetch_32(&(ompi_part_persist_aggregated.block_entry), -1); + return OMPI_SUCCESS; + } + + OPAL_LIST_FOREACH(current, ompi_part_persist_aggregated.progress_list, mca_part_persist_aggregated_list_t) { + mca_part_persist_aggregated_request_t *req = (mca_part_persist_aggregated_request_t *) current->item; + + /* Check to see if request is initilaized */ + if(false == req->initialized) { + int done = 0; + + if(true == req->flag_post_setup_recv) { + err = MCA_PML_CALL(irecv(&(req->setup_info[1]), sizeof(struct ompi_mca_persist_setup_t), MPI_BYTE, OMPI_ANY_SOURCE, req->my_recv_tag, ompi_part_persist_aggregated.part_comm_setup, &req->setup_req[1])); + req->flag_post_setup_recv = false; + } + + ompi_request_test(&(req->setup_req[1]), &done, MPI_STATUS_IGNORE); + + if(done) { + size_t dt_size_; + int32_t dt_size; + + if(MCA_PART_PERSIST_AGGREGATED_REQUEST_PSEND == req->req_type) { + /* parse message */ + req->world_peer = req->setup_info[1].world_rank; + + err = opal_datatype_type_size(&(req->req_datatype->super), &dt_size_); + if(OMPI_SUCCESS != err) return OMPI_ERROR; + dt_size = (dt_size_ > (size_t) INT_MAX) ? MPI_UNDEFINED : (int) dt_size_; + int32_t bytes = req->real_count * dt_size; + + /* Set up persistent sends */ + req->persist_reqs = (ompi_request_t**) malloc(sizeof(ompi_request_t*)*(req->real_parts)); + for(i = 0; i < req->real_parts - 1; i++) { + void *buf = ((void*) (((char*)req->req_addr) + (bytes * i))); + err = MCA_PML_CALL(isend_init(buf, req->real_count, req->req_datatype, req->world_peer, req->my_send_tag+i, MCA_PML_BASE_SEND_STANDARD, ompi_part_persist_aggregated.part_comm, &(req->persist_reqs[i]))); + } + // last transfer partition can have different size + void *buf = ((void*) (((char*)req->req_addr) + (bytes * i))); + err = MCA_PML_CALL(isend_init(buf, req->real_remainder, req->req_datatype, req->world_peer, req->my_send_tag+i, MCA_PML_BASE_SEND_STANDARD, ompi_part_persist_aggregated.part_comm, &(req->persist_reqs[i]))); + } else { + /* parse message */ + req->world_peer = req->setup_info[1].world_rank; + req->my_send_tag = req->setup_info[1].start_tag; + req->my_recv_tag = req->setup_info[1].setup_tag; + req->real_parts = req->setup_info[1].num_parts; + req->real_count = req->setup_info[1].count; + req->real_remainder = req->setup_info[1].remainder; + + err = opal_datatype_type_size(&(req->req_datatype->super), &dt_size_); + if(OMPI_SUCCESS != err) return OMPI_ERROR; + dt_size = (dt_size_ > (size_t) INT_MAX) ? MPI_UNDEFINED : (int) dt_size_; + int32_t bytes = req->real_count * dt_size; + + /* Set up persistent sends */ + req->persist_reqs = (ompi_request_t**) malloc(sizeof(ompi_request_t*)*(req->real_parts)); + req->flags = (int*) calloc(req->real_parts,sizeof(int)); + for(i = 0; i < req->real_parts - 1; i++) { + void *buf = ((void*) (((char*)req->req_addr) + (bytes * i))); + err = MCA_PML_CALL(irecv_init(buf, req->real_count, req->req_datatype, req->world_peer, req->my_send_tag+i, ompi_part_persist_aggregated.part_comm, &(req->persist_reqs[i]))); + } + // last transfer partition can have different size + void *buf = ((void*) (((char*)req->req_addr) + (bytes * i))); + err = MCA_PML_CALL(irecv_init(buf, req->real_remainder, req->req_datatype, req->world_peer, req->my_send_tag+i, ompi_part_persist_aggregated.part_comm, &(req->persist_reqs[i]))); + + err = req->persist_reqs[0]->req_start(req->real_parts, (&(req->persist_reqs[0]))); + + /* Send back a message */ + req->setup_info[0].world_rank = ompi_part_persist_aggregated.my_world_rank; + err = MCA_PML_CALL(isend(&(req->setup_info[0]), sizeof(struct ompi_mca_persist_setup_t), MPI_BYTE, req->world_peer, req->my_recv_tag, MCA_PML_BASE_SEND_STANDARD, ompi_part_persist_aggregated.part_comm_setup, &req->setup_req[0])); + if(OMPI_SUCCESS != err) return OMPI_ERROR; + } + + req->initialized = true; + } + } else { + if(false == req->req_part_complete && REQUEST_COMPLETED != req->req_ompi.req_complete && OMPI_REQUEST_ACTIVE == req->req_ompi.req_state) { + for(i = 0; i < req->real_parts; i++) { + + /* Check to see if partition is queued for being started. Only applicable to sends. */ + if(-2 == req->flags[i]) { + err = req->persist_reqs[i]->req_start(1, (&(req->persist_reqs[i]))); + req->flags[i] = 0; + } + + if(0 == req->flags[i]) + { + ompi_request_test(&(req->persist_reqs[i]), &(req->flags[i]), MPI_STATUS_IGNORE); + if(0 != req->flags[i]) req->done_count++; + } + } + + /* Check for completion and complete the requests */ + if(req->done_count == req->real_parts) + { + req->first_send = false; + mca_part_persist_aggregated_complete(req); + } + } + + if(true == req->req_free_called && true == req->req_part_complete && REQUEST_COMPLETED == req->req_ompi.req_complete && OMPI_REQUEST_INACTIVE == req->req_ompi.req_state) { + to_delete = req; + } + } + + } + OPAL_THREAD_UNLOCK(&ompi_part_persist_aggregated.lock); + block_entry = opal_atomic_add_fetch_32(&(ompi_part_persist_aggregated.block_entry), -1); + if(to_delete) { + err = mca_part_persist_aggregated_free_req(to_delete); + if (OMPI_SUCCESS != err) { + return OMPI_ERROR; + } + } + + return OMPI_SUCCESS; +} + +static int +mca_part_persist_aggregated_precv_init(void *buf, + size_t parts, + size_t count, + ompi_datatype_t * datatype, + int src, + int tag, + struct ompi_communicator_t *comm, + struct ompi_info_t * info, + struct ompi_request_t **request) +{ + int err = OMPI_SUCCESS; + size_t dt_size_; + int dt_size; + mca_part_persist_aggregated_list_t* new_progress_elem = NULL; + + mca_part_persist_aggregated_precv_request_t *recvreq; + + /* if module hasn't been called before, flag module to init. */ + if(-1 == ompi_part_persist_aggregated.init_world) + { + ompi_part_persist_aggregated.init_world = 0; + } + + /* Allocate a new request */ + MCA_PART_PERSIST_AGGREGATED_PRECV_REQUEST_ALLOC(recvreq); + if (OPAL_UNLIKELY(NULL == recvreq)) return OMPI_ERR_OUT_OF_RESOURCE; + + MCA_PART_PERSIST_AGGREGATED_PRECV_REQUEST_INIT(recvreq, ompi_proc, comm, tag, src, + datatype, buf, parts, count, flags); + + mca_part_persist_aggregated_request_t *req = (mca_part_persist_aggregated_request_t *) recvreq; + + /* Set lazy initializion flags */ + req->initialized = false; + req->first_send = true; + req->flag_post_setup_recv = false; + req->flags = NULL; + /* Non-blocking receive on setup info */ + err = MCA_PML_CALL(irecv(&req->setup_info[1], sizeof(struct ompi_mca_persist_setup_t), MPI_BYTE, src, tag, comm, &req->setup_req[1])); + if(OMPI_SUCCESS != err) return OMPI_ERROR; + + /* Compute total number of bytes */ + err = opal_datatype_type_size(&(req->req_datatype->super), &dt_size_); + if(OMPI_SUCCESS != err) return OMPI_ERROR; + dt_size = (dt_size_ > (size_t) INT_MAX) ? MPI_UNDEFINED : (int) dt_size_; + req->req_bytes = parts * count * dt_size; + + + /* Set ompi request initial values */ + req->req_ompi.req_persistent = true; + req->req_part_complete = true; + req->req_ompi.req_complete = REQUEST_COMPLETED; + req->req_ompi.req_state = OMPI_REQUEST_INACTIVE; + + /* Add element to progress engine */ + new_progress_elem = OBJ_NEW(mca_part_persist_aggregated_list_t); + new_progress_elem->item = req; + req->progress_elem = new_progress_elem; + OPAL_THREAD_LOCK(&ompi_part_persist_aggregated.lock); + opal_list_append(ompi_part_persist_aggregated.progress_list, (opal_list_item_t*)new_progress_elem); + OPAL_THREAD_UNLOCK(&ompi_part_persist_aggregated.lock); + + /* set return values */ + *request = (ompi_request_t*) recvreq; + return err; +} + +static int +mca_part_persist_aggregated_psend_init(const void* buf, + size_t parts, + size_t count, + ompi_datatype_t* datatype, + int dst, + int tag, + ompi_communicator_t* comm, + struct ompi_info_t * info, + ompi_request_t** request) +{ + int err = OMPI_SUCCESS; + size_t dt_size_; + int dt_size; + mca_part_persist_aggregated_list_t* new_progress_elem = NULL; + mca_part_persist_aggregated_psend_request_t *sendreq; + + /* if module hasn't been called before, flag module to init. */ + if(-1 == ompi_part_persist_aggregated.init_world) + { + ompi_part_persist_aggregated.init_world = 0; + } + + /* Create new request object */ + MCA_PART_PERSIST_AGGREGATED_PSEND_REQUEST_ALLOC(sendreq, comm, dst, ompi_proc); + if (OPAL_UNLIKELY(NULL == sendreq)) return OMPI_ERR_OUT_OF_RESOURCE; + MCA_PART_PERSIST_AGGREGATED_PSEND_REQUEST_INIT(sendreq, ompi_proc, comm, tag, dst, + datatype, buf, parts, count, flags); + mca_part_persist_aggregated_request_t *req = (mca_part_persist_aggregated_request_t *) sendreq; + + /* Set lazy initialization variables */ + req->initialized = false; + req->first_send = true; + + + /* Determine total bytes to send. */ + err = opal_datatype_type_size(&(req->req_datatype->super), &dt_size_); + if(OMPI_SUCCESS != err) return OMPI_ERROR; + dt_size = (dt_size_ > (size_t) INT_MAX) ? MPI_UNDEFINED : (int) dt_size_; + req->req_bytes = parts * count * dt_size; + + // select internal partitioning (i.e. real_parts) here + size_t factor, remaining_partitions; + part_persist_aggregated_select_internal_partitioning(parts, count, &req->real_parts, &factor, &remaining_partitions); + + req->real_remainder = remaining_partitions * count; // convert to number of elements + req->real_count = factor * count; + req->setup_info[0].num_parts = req->real_parts; // setup info has to contain internal partitioning + req->setup_info[0].count = req->real_count; + req->setup_info[0].remainder = req->real_remainder; + opal_output_verbose(5, ompi_part_base_framework.framework_output, "mapped given %lu*%lu partitioning to internal partitioning of %lu*%lu + %lu\n", parts, count, req->real_parts - 1, req->real_count, req->real_remainder); + + // init aggregation state + part_persist_aggregate_regular_init(&sendreq->aggregation_state, req->real_parts, factor, remaining_partitions); + + /* non-blocking send set-up data */ + req->setup_info[0].world_rank = ompi_comm_rank(&ompi_mpi_comm_world.comm); + req->setup_info[0].start_tag = ompi_part_persist_aggregated.next_send_tag; ompi_part_persist_aggregated.next_send_tag += parts; + req->my_send_tag = req->setup_info[0].start_tag; + req->setup_info[0].setup_tag = ompi_part_persist_aggregated.next_recv_tag; ompi_part_persist_aggregated.next_recv_tag++; + req->my_recv_tag = req->setup_info[0].setup_tag; + + req->flags = (int*) calloc(req->real_parts, sizeof(int)); + + err = MCA_PML_CALL(isend(&(req->setup_info[0]), sizeof(struct ompi_mca_persist_setup_t), MPI_BYTE, dst, tag, MCA_PML_BASE_SEND_STANDARD, comm, &req->setup_req[0])); + if(OMPI_SUCCESS != err) return OMPI_ERROR; + + /* Non-blocking receive on setup info */ + if(1 == ompi_part_persist_aggregated.init_comms) { + err = MCA_PML_CALL(irecv(&(req->setup_info[1]), sizeof(struct ompi_mca_persist_setup_t), MPI_BYTE, MPI_ANY_SOURCE, req->my_recv_tag, ompi_part_persist_aggregated.part_comm_setup, &req->setup_req[1])); + if(OMPI_SUCCESS != err) return OMPI_ERROR; + req->flag_post_setup_recv = false; + } else { + req->flag_post_setup_recv = true; + } + + /* Initilaize completion variables */ + sendreq->req_base.req_ompi.req_persistent = true; + req->req_part_complete = true; + req->req_ompi.req_complete = REQUEST_COMPLETED; + req->req_ompi.req_state = OMPI_REQUEST_INACTIVE; + + /* add element to progress queue */ + new_progress_elem = OBJ_NEW(mca_part_persist_aggregated_list_t); + new_progress_elem->item = req; + req->progress_elem = new_progress_elem; + OPAL_THREAD_LOCK(&ompi_part_persist_aggregated.lock); + opal_list_append(ompi_part_persist_aggregated.progress_list, (opal_list_item_t*)new_progress_elem); + OPAL_THREAD_UNLOCK(&ompi_part_persist_aggregated.lock); + + /* Set return values */ + *request = (ompi_request_t*) sendreq; + return err; +} + +int +mca_part_persist_aggregated_start(size_t count, ompi_request_t** requests) +{ + int err = OMPI_SUCCESS; + size_t _count = count; + + for(size_t i = 0; i < _count && OMPI_SUCCESS == err; i++) { + mca_part_persist_aggregated_request_t *req = (mca_part_persist_aggregated_request_t *)(requests[i]); + + // reset aggregation state here + if (MCA_PART_PERSIST_AGGREGATED_REQUEST_PSEND == req->req_type) { + mca_part_persist_aggregated_psend_request_t *sendreq = (mca_part_persist_aggregated_psend_request_t *)(req); + part_persist_aggregate_regular_reset(&sendreq->aggregation_state); + } + + /* First use is a special case, to support lazy initialization */ + if(false == req->first_send) + { + if(MCA_PART_PERSIST_AGGREGATED_REQUEST_PSEND == req->req_type) { + req->done_count = 0; + memset((void*)req->flags,0,sizeof(int32_t)*req->real_parts); + } else { + req->done_count = 0; + err = req->persist_reqs[0]->req_start(req->real_parts, req->persist_reqs); + memset((void*)req->flags,0,sizeof(int32_t)*req->real_parts); + } + } else { + if(MCA_PART_PERSIST_AGGREGATED_REQUEST_PSEND == req->req_type) { + req->done_count = 0; + for(size_t j = 0; j < req->real_parts && OMPI_SUCCESS == err; j++) { + req->flags[j] = -1; + } + } else { + req->done_count = 0; + } + } + req->req_ompi.req_state = OMPI_REQUEST_ACTIVE; + req->req_ompi.req_status.MPI_TAG = MPI_ANY_TAG; + req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS; + req->req_ompi.req_status._cancelled = 0; + req->req_part_complete = false; + req->req_ompi.req_complete = false; + OPAL_ATOMIC_SWAP_PTR(&req->req_ompi.req_complete, REQUEST_PENDING); + } + + return err; +} + +static int +mca_part_persist_aggregated_pready(size_t min_part, + size_t max_part, + ompi_request_t* request) +{ + int err = OMPI_SUCCESS; + size_t i; + + mca_part_persist_aggregated_request_t *req = (mca_part_persist_aggregated_request_t *)(request); + int flag_value; + if(true == req->initialized) + { + flag_value = 0; /* Mark partition as ready for testing */ + } + else + { + flag_value = -2; /* Mark partition as queued */ + } + + mca_part_persist_aggregated_psend_request_t *sendreq = (mca_part_persist_aggregated_psend_request_t *)(request); + int internal_part_ready; + for(i = min_part; i <= max_part && OMPI_SUCCESS == err; i++) { + part_persist_aggregate_regular_pready(&sendreq->aggregation_state, i, &internal_part_ready); + + if (-1 != internal_part_ready) { + // transfer partition is ready + if(true == req->initialized) { + err = req->persist_reqs[internal_part_ready]->req_start(1, (&(req->persist_reqs[internal_part_ready]))); + } + + req->flags[internal_part_ready] = flag_value; + } + } + + return err; +} + +static int +mca_part_persist_aggregated_parrived(size_t min_part, + size_t max_part, + int* flag, + ompi_request_t* request) +{ + int err = OMPI_SUCCESS; + size_t i; + int _flag = false; + mca_part_persist_aggregated_request_t *req = (mca_part_persist_aggregated_request_t *)request; + + if(0 != req->flags) { + _flag = 1; + if(req->req_parts == req->real_parts) { + for(i = min_part; i <= max_part; i++) { + _flag = _flag && req->flags[i]; + } + } else { + float convert = ((float)req->real_parts) / ((float)req->req_parts); + size_t _min = floor(convert * min_part); + size_t _max = ceil(convert * max_part); + for(i = _min; i <= _max; i++) { + _flag = _flag && req->flags[i]; + } + } + } + + if(!_flag) { + opal_progress(); + } + *flag = _flag; + return err; +} + +/** + * mca_part_persist_aggregated_free marks an entry as free called and sets the request to + * MPI_REQUEST_NULL. Note: requests get freed in the progress engine. + */ +int +mca_part_persist_aggregated_free(ompi_request_t** request) +{ + mca_part_persist_aggregated_request_t* req = *(mca_part_persist_aggregated_request_t**)request; + + if(true == req->req_free_called) return OMPI_ERROR; + req->req_free_called = true; + + *request = MPI_REQUEST_NULL; + return OMPI_SUCCESS; +} + +OBJ_CLASS_INSTANCE(mca_part_persist_aggregated_list_t, + opal_list_item_t, + NULL, + NULL); + diff --git a/ompi/mca/part/persist_aggregated/part_persist_aggregated.h b/ompi/mca/part/persist_aggregated/part_persist_aggregated.h new file mode 100644 index 00000000000..812812c9038 --- /dev/null +++ b/ompi/mca/part/persist_aggregated/part_persist_aggregated.h @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2004-2006 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2004-2007 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2015-2024 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * Copyright (c) 2017 Intel, Inc. All rights reserved + * Copyright (c) 2019-2021 The University of Tennessee at Chattanooga and The University + * of Tennessee Research Foundation. All rights reserved. + * Copyright (c) 2019-2021 Sandia National Laboratories. All rights reserved. + * Copyright (c) 2021 University of Alabama at Birmingham. All rights reserved. + * Copyright (c) 2021 Tennessee Technological University. All rights reserved. + * Copyright (c) 2021 Cisco Systems, Inc. All rights reserved + * Copyright (c) 2021 Bull S.A.S. All rights reserved. + * Copyright (c) 2024 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef PART_PERSIST_AGGREGATED_H +#define PART_PERSIST_AGGREGATED_H + +#ifdef HAVE_ALLOCA_H +#include +#endif + +#include + +#include "ompi_config.h" +#include "ompi/request/request.h" +#include "ompi/mca/part/part.h" +#include "ompi/mca/part/base/base.h" +#include "ompi/datatype/ompi_datatype.h" +#include "ompi/communicator/communicator.h" +#include "ompi/request/request.h" +#include "opal/sys/atomic.h" + +#include "ompi/mca/part/persist_aggregated/part_persist_aggregated_request.h" +#include "ompi/mca/part/base/part_base_precvreq.h" +#include "ompi/mca/part/persist_aggregated/part_persist_aggregated_recvreq.h" +#include "ompi/mca/part/persist_aggregated/part_persist_aggregated_sendreq.h" +#include "ompi/message/message.h" +#include "ompi/mca/pml/pml.h" +BEGIN_C_DECLS + +typedef struct mca_part_persist_aggregated_list_t { + opal_list_item_t super; + mca_part_persist_aggregated_request_t *item; +} mca_part_persist_aggregated_list_t; + +OPAL_DECLSPEC OBJ_CLASS_DECLARATION(mca_part_persist_aggregated_list_t); + + +struct ompi_part_persist_aggregated_t { + mca_part_base_module_t super; + int free_list_num; + int free_list_max; + int free_list_inc; + opal_list_t *progress_list; + + int32_t next_send_tag; /**< This is a counter for send tags for the actual data transfer. */ + int32_t next_recv_tag; + ompi_communicator_t *part_comm; /* This approach requires a separate tag space, so we need a dedicated communicator. */ + ompi_request_t *part_comm_req; + int32_t part_comm_ready; + ompi_communicator_t *part_comm_setup; /* We create a second communicator to send set-up messages (rational: these + messages go in the opposite direction of normal messages, need to use MPI_ANY_SOURCE + to support different communicators, and thus need to have a unique tag. Because tags + are controlled by the sender in this model, we cannot assume that the tag will be + unused in part_comm. */ + ompi_request_t *part_comm_sreq; + int32_t part_comm_sready; + int32_t init_comms; + int32_t init_world; + int32_t my_world_rank; /* Because the back end communicators use a world rank, we need to communicate ours + to set up the requests. */ + + uint32_t min_message_size; /* parameters to control internal partitioning */ + uint32_t max_message_count; + + opal_atomic_int32_t block_entry; + opal_mutex_t lock; +}; +typedef struct ompi_part_persist_aggregated_t ompi_part_persist_aggregated_t; +extern ompi_part_persist_aggregated_t ompi_part_persist_aggregated; + +int mca_part_persist_aggregated_start(size_t, ompi_request_t**); +int mca_part_persist_aggregated_free(ompi_request_t**); + +END_C_DECLS + +#endif /* PART_PERSIST_AGGREGATED_H_HAS_BEEN_INCLUDED */ diff --git a/ompi/mca/part/persist_aggregated/part_persist_aggregated_component.c b/ompi/mca/part/persist_aggregated/part_persist_aggregated_component.c new file mode 100644 index 00000000000..11aa049a72c --- /dev/null +++ b/ompi/mca/part/persist_aggregated/part_persist_aggregated_component.c @@ -0,0 +1,184 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2006-2007 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2007 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2006 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2010-2012 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2013-2021 Sandia National Laboratories. All rights reserved. + * Copyright (c) 2015 Los Alamos National Security, LLC. All rights + * reserved. + * Copyright (c) 2024 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * Copyright (c) 2024 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "ompi/mca/part/persist_aggregated/part_persist_aggregated.h" + +#include "ompi/mca/part/persist_aggregated/part_persist_aggregated_sendreq.h" +#include "ompi/mca/part/persist_aggregated/part_persist_aggregated_recvreq.h" +#include "ompi/mca/part/persist_aggregated/part_persist_aggregated_component.h" + +static int mca_part_persist_aggregated_component_register(void); +static int mca_part_persist_aggregated_component_open(void); +static int mca_part_persist_aggregated_component_close(void); +static mca_part_base_module_t* mca_part_persist_aggregated_component_init( int* priority, + bool enable_progress_threads, bool enable_mpi_threads); +static int mca_part_persist_aggregated_component_fini(void); + +mca_part_base_component_4_0_0_t mca_part_persist_aggregated_component = { + /* First, the mca_base_component_t struct containing meta + * information about the component itself */ + + .partm_version = { + MCA_PART_BASE_VERSION_2_0_0, + + .mca_component_name = "persist_aggregated", + MCA_BASE_MAKE_VERSION(component, OMPI_MAJOR_VERSION, OMPI_MINOR_VERSION, + OMPI_RELEASE_VERSION), + .mca_open_component = mca_part_persist_aggregated_component_open, + .mca_close_component = mca_part_persist_aggregated_component_close, + .mca_register_component_params = mca_part_persist_aggregated_component_register, + }, + .partm_data = { + /* This component is not checkpoint ready */ + MCA_BASE_METADATA_PARAM_NONE + }, + + .partm_init = mca_part_persist_aggregated_component_init, + .partm_finalize = mca_part_persist_aggregated_component_fini, +}; + +static int +mca_part_persist_aggregated_component_register(void) +{ + ompi_part_persist_aggregated.free_list_num = 4; + (void) mca_base_component_var_register(&mca_part_persist_aggregated_component.partm_version, "free_list_num", + "Initial size of request free lists", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + &ompi_part_persist_aggregated.free_list_num); + + ompi_part_persist_aggregated.free_list_max = -1; + (void) mca_base_component_var_register(&mca_part_persist_aggregated_component.partm_version, "free_list_max", + "Maximum size of request free lists", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + &ompi_part_persist_aggregated.free_list_max); + + ompi_part_persist_aggregated.free_list_inc = 64; + (void) mca_base_component_var_register(&mca_part_persist_aggregated_component.partm_version, "free_list_inc", + "Number of elements to add when growing request free lists", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + &ompi_part_persist_aggregated.free_list_inc); + + // variable for minimal internal partition size + ompi_part_persist_aggregated.min_message_size = 4096; + (void) mca_base_component_var_register(&mca_part_persist_aggregated_component.partm_version, "min_message_size", + "Minimal size of transferred messages (internal partitions)", + MCA_BASE_VAR_TYPE_UNSIGNED_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + &ompi_part_persist_aggregated.min_message_size); + + // variable for maximal internal partition count + ompi_part_persist_aggregated.max_message_count = 4096; + (void) mca_base_component_var_register(&mca_part_persist_aggregated_component.partm_version, "max_message_count", + "Maximal number of transferred messages (internal partitions)", + MCA_BASE_VAR_TYPE_UNSIGNED_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + &ompi_part_persist_aggregated.max_message_count); + + + return OPAL_SUCCESS; +} + +static void mca_part_persist_aggregated_init_lists(void) +{ + opal_free_list_init (&mca_part_base_precv_requests, + sizeof(mca_part_persist_aggregated_precv_request_t), + opal_cache_line_size, + OBJ_CLASS(mca_part_persist_aggregated_precv_request_t), + 0,opal_cache_line_size, + ompi_part_persist_aggregated.free_list_num, + ompi_part_persist_aggregated.free_list_max, + ompi_part_persist_aggregated.free_list_inc, + NULL, 0, NULL, NULL, NULL); + opal_free_list_init (&mca_part_base_psend_requests, + sizeof(mca_part_persist_aggregated_psend_request_t), + opal_cache_line_size, + OBJ_CLASS(mca_part_persist_aggregated_psend_request_t), + 0,opal_cache_line_size, + ompi_part_persist_aggregated.free_list_num, + ompi_part_persist_aggregated.free_list_max, + ompi_part_persist_aggregated.free_list_inc, + NULL, 0, NULL, NULL, NULL); + ompi_part_persist_aggregated.progress_list = OBJ_NEW(opal_list_t); +} + +static int +mca_part_persist_aggregated_component_open(void) +{ + OBJ_CONSTRUCT(&ompi_part_persist_aggregated.lock, opal_mutex_t); + + ompi_part_persist_aggregated.next_send_tag = 0; /**< This is a counter for send tags for the actual data transfer. */ + ompi_part_persist_aggregated.next_recv_tag = 0; + + mca_part_persist_aggregated_init_lists(); + + ompi_part_persist_aggregated.init_comms = 0; + ompi_part_persist_aggregated.init_world = -1; + + ompi_part_persist_aggregated.part_comm_ready = 0; + ompi_part_persist_aggregated.part_comm_ready = 0; + + ompi_part_persist_aggregated.block_entry = 0; + return OMPI_SUCCESS; +} + + +static int +mca_part_persist_aggregated_component_close(void) +{ + OBJ_DESTRUCT(&ompi_part_persist_aggregated.lock); + return OMPI_SUCCESS; +} + + +static mca_part_base_module_t* +mca_part_persist_aggregated_component_init(int* priority, + bool enable_progress_threads, + bool enable_mpi_threads) +{ + *priority = 1; + + opal_output_verbose( 10, 0, + "in persist part priority is %d\n", *priority); + + return &ompi_part_persist_aggregated.super; +} + + +static int +mca_part_persist_aggregated_component_fini(void) +{ + return OMPI_SUCCESS; +} + diff --git a/ompi/mca/part/persist_aggregated/part_persist_aggregated_component.h b/ompi/mca/part/persist_aggregated/part_persist_aggregated_component.h new file mode 100644 index 00000000000..1688b08bfff --- /dev/null +++ b/ompi/mca/part/persist_aggregated/part_persist_aggregated_component.h @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2006 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2020-2021 Sandia National Laboratories. All rights reserved. + * Copyright (c) 2024 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +/** + * @file + */ + +#ifndef MCA_PART_RMA_COMPONENT_H +#define MCA_PART_RMA_COMPONENT_H + +BEGIN_C_DECLS + +/* + * PART module functions. + */ +OMPI_DECLSPEC extern mca_part_base_component_4_0_0_t mca_part_persist_aggregated_component; + +END_C_DECLS + +#endif diff --git a/ompi/mca/part/persist_aggregated/part_persist_aggregated_recvreq.c b/ompi/mca/part/persist_aggregated/part_persist_aggregated_recvreq.c new file mode 100644 index 00000000000..641a2afd4bd --- /dev/null +++ b/ompi/mca/part/persist_aggregated/part_persist_aggregated_recvreq.c @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2007 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2006 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2020-2021 Sandia National Laboratories. All rights reserved. + * Copyright (c) 2024 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "ompi/mca/part/persist_aggregated/part_persist_aggregated.h" +#include "ompi/mca/part/persist_aggregated/part_persist_aggregated_recvreq.h" + + +static void +mca_part_persist_aggregated_precv_request_construct(mca_part_persist_aggregated_precv_request_t* recvreq) +{ + recvreq->req_base.req_ompi.req_start = mca_part_persist_aggregated_start; + recvreq->req_base.req_ompi.req_free = mca_part_persist_aggregated_free; + recvreq->req_base.req_ompi.req_cancel = NULL; + recvreq->req_base.req_ompi.req_persistent = true; + OBJ_CONSTRUCT( &(recvreq->req_base.req_convertor), opal_convertor_t ); +} + + +OBJ_CLASS_INSTANCE(mca_part_persist_aggregated_precv_request_t, + mca_part_persist_aggregated_request_t, + mca_part_persist_aggregated_precv_request_construct, + NULL); + diff --git a/ompi/mca/part/persist_aggregated/part_persist_aggregated_recvreq.h b/ompi/mca/part/persist_aggregated/part_persist_aggregated_recvreq.h new file mode 100644 index 00000000000..c252cd629fc --- /dev/null +++ b/ompi/mca/part/persist_aggregated/part_persist_aggregated_recvreq.h @@ -0,0 +1,105 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2013 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2006 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2012-2021 Sandia National Laboratories. All rights reserved. + * Copyright (c) 2015 Los Alamos National Security, LLC. All rights + * reserved. + * Copyright (c) 2017 Intel, Inc. All rights reserved + * Copyright (c) 2024 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef PART_PERSIST_AGGREGATED_RECVREQ_H +#define PART_PERSIST_AGGREGATED_RECVREQ_H + +#include "ompi/mca/part/persist_aggregated/part_persist_aggregated_request.h" +#include "ompi/mca/part/base/part_base_precvreq.h" + +struct mca_part_persist_aggregated_precv_request_t { + mca_part_persist_aggregated_request_t req_base; +}; +typedef struct mca_part_persist_aggregated_precv_request_t mca_part_persist_aggregated_precv_request_t; +OBJ_CLASS_DECLARATION(mca_part_persist_aggregated_precv_request_t); + +/** + * Allocate a recv request from the modules free list. + * + * @param rc (OUT) OMPI_SUCCESS or error status on failure. + * @return Receive request. + */ +#define MCA_PART_PERSIST_AGGREGATED_PRECV_REQUEST_ALLOC(precvreq) \ +do { \ + precvreq = (mca_part_persist_aggregated_precv_request_t*) \ + opal_free_list_get (&mca_part_base_precv_requests); \ + precvreq->req_base.req_type = MCA_PART_PERSIST_AGGREGATED_REQUEST_PRECV; \ + } while (0) + +/** + * Initialize a receive request with call parameters. + * + * @param request (IN) Receive request. + * @param addr (IN) User buffer. + * @param count (IN) Number of elements of indicated datatype. + * @param datatype (IN) User defined datatype. + * @param src (IN) Source rank w/in the communicator. + * @param comm (IN) Communicator. + * @param persistent (IN) Is this a ersistent request. + */ +#define MCA_PART_PERSIST_AGGREGATED_PRECV_REQUEST_INIT( request, \ + ompi_proc, \ + comm, \ + tag, \ + src, \ + datatype, \ + addr, \ + parts, \ + count, \ + flags ) \ +do { \ + OBJ_RETAIN(comm); \ + OMPI_DATATYPE_RETAIN(datatype); \ + (request)->req_base.req_comm = comm; \ + (request)->req_base.req_datatype = datatype; \ + (request)->req_base.req_ompi.req_mpi_object.comm = comm; \ + (request)->req_base.req_ompi.req_status.MPI_SOURCE = src; \ + (request)->req_base.req_ompi.req_status.MPI_TAG = tag; \ + (request)->req_base.req_part_complete = true; \ + (request)->req_base.req_ompi.req_status._ucount = count; \ + (request)->req_base.req_free_called = false; \ + (request)->req_base.req_addr = addr; /**< pointer to application buffer */\ + (request)->req_base.req_parts = parts; /**< number of partitions */\ + (request)->req_base.req_count = count; /**< count of user datatype elements */\ + (request)->req_base.req_peer = src; /**< peer process - rank w/in this communicator */\ + (request)->req_base.req_tag = tag; \ +} while(0) + +/** + * Free the PART receive request + */ +#define MCA_PART_PERSIST_AGGREGATED_PRECV_REQUEST_RETURN(recvreq) \ +{ \ + OBJ_RELEASE((recvreq)->req_comm); \ + OMPI_DATATYPE_RELEASE((recvreq)->req_datatype); \ + OMPI_REQUEST_FINI(&(recvreq)->req_ompi); \ + opal_convertor_cleanup( &((recvreq)->req_convertor) ); \ + opal_free_list_return ( &mca_part_base_precv_requests, \ + (opal_free_list_item_t*)(recvreq)); \ +} + +#endif + + diff --git a/ompi/mca/part/persist_aggregated/part_persist_aggregated_request.c b/ompi/mca/part/persist_aggregated/part_persist_aggregated_request.c new file mode 100644 index 00000000000..3e5b08cb01b --- /dev/null +++ b/ompi/mca/part/persist_aggregated/part_persist_aggregated_request.c @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2006 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2006 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2020-2021 Sandia National Laboratories. All rights reserved. + * Copyright (c) 2024 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "ompi/mca/part/persist_aggregated/part_persist_aggregated.h" +#include "ompi/mca/part/persist_aggregated/part_persist_aggregated_request.h" + + +static void mca_part_persist_aggregated_request_construct( mca_part_persist_aggregated_request_t* req) { + OBJ_CONSTRUCT(&req->req_convertor, opal_convertor_t); + req->req_ompi.req_type = OMPI_REQUEST_PART; +} + +static void mca_part_persist_aggregated_request_destruct( mca_part_persist_aggregated_request_t* req) { + OBJ_DESTRUCT(&req->req_convertor); +} + +OBJ_CLASS_INSTANCE(mca_part_persist_aggregated_request_t, + ompi_request_t, + mca_part_persist_aggregated_request_construct, + mca_part_persist_aggregated_request_destruct); diff --git a/ompi/mca/part/persist_aggregated/part_persist_aggregated_request.h b/ompi/mca/part/persist_aggregated/part_persist_aggregated_request.h new file mode 100644 index 00000000000..7a59c6ef4f7 --- /dev/null +++ b/ompi/mca/part/persist_aggregated/part_persist_aggregated_request.h @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2016 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2006 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2017 Intel, Inc. All rights reserved + * Copyright (c) 2020-2021 Sandia National Laboratories. All rights reserved. + * Copyright (c) 2024 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef PART_PERSIST_AGGREGATED_REQUEST_H +#define PART_PERSIST_AGGREGATED_REQUEST_H + +#include "ompi/mca/part/base/part_base_psendreq.h" +#include "ompi/mca/part/part.h" +#include "opal/sys/atomic.h" +/** + * Type of request. + */ +typedef enum { + MCA_PART_PERSIST_AGGREGATED_REQUEST_PSEND, + MCA_PART_PERSIST_AGGREGATED_REQUEST_PRECV, + MCA_PART_PERSIST_AGGREGATED_REQUEST_NULL +} mca_part_persist_aggregated_request_type_t; + +struct mca_part_persist_aggregated_list_t; + +struct ompi_mca_persist_setup_t { + int world_rank; + int start_tag; + int setup_tag; + size_t num_parts; + size_t dt_size; + size_t count; + size_t remainder; +}; + + +/** + * Base type for PART PERSIST requests + */ +struct mca_part_persist_aggregated_request_t { + +/* START: These fields have to match the definition of the mca_part_base_request_t */ + ompi_request_t req_ompi; /**< base request */ + volatile int32_t req_part_complete; /**< flag indicating if the pt-2-pt layer is done with this request */ + volatile int32_t req_free_called; /**< flag indicating if the user has freed this request */ + mca_part_persist_aggregated_request_type_t req_type; /**< MPI request type - used for test */ + struct ompi_communicator_t *req_comm; /**< communicator pointer */ + struct ompi_datatype_t *req_datatype; /**< pointer to data type */ + opal_convertor_t req_convertor; /**< always need the convertor */ + + const void *req_addr; /**< pointer to application buffer */ + size_t req_parts; /**< number of partitions */ + size_t req_count; /**< count of user datatype elements */ + int32_t req_peer; /**< peer process - rank w/in this communicator */ + int32_t req_tag; /**< user defined tag */ + struct ompi_proc_t* req_proc; /**< peer process */ + +/* END: These fields have to match the definition of the mca_part_base_request_t */ + + size_t req_bytes; /**< bytes for completion status */ + + size_t real_parts; /**< internal number of partitions */ + size_t real_count; + size_t real_remainder; /**< size of last internal partition (in elements) */ + size_t real_dt_size; /**< receiver needs to know how large the sender's datatype is. */ + size_t part_size; + + ompi_request_t** persist_reqs; /**< requests for persistent sends/recvs */ + ompi_request_t* setup_req [2]; /**< Request structure for setup messages */ + + + int32_t req_partitions_send; /**< Send side number of partitions */ + int32_t req_partitions_recv; /**< Recv side number of partitions */ + + int32_t my_send_tag; /**< This is a counter for send tags for the actual data transfer. */ + int32_t my_recv_tag; /**< This is a counter for receive tags, for incoming setup messages. */ + + int32_t world_peer; /**< peer's rank in MPI_COMM_WORLD */ + + int32_t initialized; /**< flag for initialized state */ + int32_t first_send; /**< flag for whether the first send has happened */ + int32_t flag_post_setup_recv; + size_t done_count; /**< counter for the number of partitions marked ready */ + + int32_t *flags; /**< array of flags to determine whether a partition has arrived */ + + struct ompi_mca_persist_setup_t setup_info[2]; /**< Setup info to send during initialization. */ + + struct mca_part_persist_aggregated_list_t* progress_elem; /**< pointer to progress list element for removal during free. */ + +}; +typedef struct mca_part_persist_aggregated_request_t mca_part_persist_aggregated_request_t; +OBJ_CLASS_DECLARATION(mca_part_persist_aggregated_request_t); + +#endif diff --git a/ompi/mca/part/persist_aggregated/part_persist_aggregated_sendreq.c b/ompi/mca/part/persist_aggregated/part_persist_aggregated_sendreq.c new file mode 100644 index 00000000000..3c0b9e1a132 --- /dev/null +++ b/ompi/mca/part/persist_aggregated/part_persist_aggregated_sendreq.c @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2007 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2006 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2020-2021 Sandia National Laboratories. All rights reserved. + * Copyright (c) 2024 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "ompi/mca/part/persist_aggregated/part_persist_aggregated.h" +#include "ompi/mca/part/persist_aggregated/part_persist_aggregated_sendreq.h" + + +static void mca_part_persist_aggregated_psend_request_construct(mca_part_persist_aggregated_psend_request_t* sendreq) +{ + /* no need to reinit for every send -- never changes */ + sendreq->req_base.req_ompi.req_start = mca_part_persist_aggregated_start; + sendreq->req_base.req_ompi.req_free = mca_part_persist_aggregated_free; + sendreq->req_base.req_ompi.req_persistent = true; + sendreq->req_base.req_ompi.req_cancel = NULL; +} + +OBJ_CLASS_INSTANCE(mca_part_persist_aggregated_psend_request_t, + mca_part_persist_aggregated_request_t, + mca_part_persist_aggregated_psend_request_construct, + NULL); + diff --git a/ompi/mca/part/persist_aggregated/part_persist_aggregated_sendreq.h b/ompi/mca/part/persist_aggregated/part_persist_aggregated_sendreq.h new file mode 100644 index 00000000000..40f72a25d67 --- /dev/null +++ b/ompi/mca/part/persist_aggregated/part_persist_aggregated_sendreq.h @@ -0,0 +1,101 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2013 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2006 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2015-2017 Los Alamos National Security, LLC. All rights + * reserved. + * Copyright (c) 2015 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * Copyright (c) 2017 Intel, Inc. All rights reserved + * Copyright (c) 2020-2021 Sandia National Laboratories. All rights reserved. + * Copyright (c) 2024 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef PART_PERSIST_AGGREGATED_PSENDREQ_H +#define PART_PERSIST_AGGREGATED_PSENDREQ_H + +#include "ompi/mca/part/persist_aggregated/part_persist_aggregated_request.h" +#include "ompi/mca/part/base/part_base_psendreq.h" +#include "ompi/mca/part/part.h" +#include "opal/prefetch.h" + +#include "ompi/mca/part/persist_aggregated/schemes/part_persist_aggregated_scheme_regular.h" + + +struct mca_part_persist_aggregated_psend_request_t { + mca_part_persist_aggregated_request_t req_base; + + struct part_persist_aggregation_state aggregation_state; +}; +typedef struct mca_part_persist_aggregated_psend_request_t mca_part_persist_aggregated_psend_request_t; +OBJ_CLASS_DECLARATION(mca_part_persist_aggregated_psend_request_t); + + +#define MCA_PART_PERSIST_AGGREGATED_PSEND_REQUEST_ALLOC(sendreq, comm, dst, \ + ompi_proc) \ +do { \ + sendreq = (mca_part_persist_aggregated_psend_request_t*) \ + opal_free_list_wait (&mca_part_base_psend_requests); \ + sendreq->req_base.req_type = MCA_PART_PERSIST_AGGREGATED_REQUEST_PSEND; \ +} while(0) + +#define MCA_PART_PERSIST_AGGREGATED_PSEND_REQUEST_INIT( req_send, \ + ompi_proc, \ + comm, \ + tag, \ + dst, \ + datatype, \ + buf, \ + parts, \ + count, \ + flags ) \ + do { \ + OMPI_REQUEST_INIT(&(sendreq->req_base.req_ompi), \ + false); \ + OBJ_RETAIN(comm); \ + OMPI_DATATYPE_RETAIN(datatype); \ + (req_send)->req_base.req_comm = comm; \ + (req_send)->req_base.req_datatype = datatype; \ + (req_send)->req_base.req_ompi.req_mpi_object.comm = comm; \ + (req_send)->req_base.req_ompi.req_status.MPI_SOURCE = \ + comm->c_my_rank; \ + (req_send)->req_base.req_ompi.req_status.MPI_TAG = tag; \ + (req_send)->req_base.req_part_complete = true; \ + (req_send)->req_base.req_ompi.req_status._ucount = count; \ + (req_send)->req_base.req_free_called = false; \ + (req_send)->req_base.req_addr = buf; /**< pointer to application buffer */\ + (req_send)->req_base.req_parts = parts; /**< number of partitions */\ + (req_send)->req_base.req_count = count; /**< count of user datatype elements */\ + (req_send)->req_base.req_peer = dst; /**< peer process - rank w/in this communicator */\ + (req_send)->req_base.req_tag = tag; /**< user defined tag */\ + } while(0) + +/* + * Release resources associated with a request + */ +#define MCA_PART_PERSIST_AGGREGATED_PSEND_REQUEST_RETURN(sendreq) \ + { \ + /* Let the base handle the reference counts */ \ + OMPI_DATATYPE_RELEASE(sendreq->req_datatype); \ + OBJ_RELEASE(sendreq->req_comm); \ + OMPI_REQUEST_FINI(&sendreq->req_ompi); \ + opal_convertor_cleanup( &(sendreq->req_convertor) ); \ + opal_free_list_return ( &mca_part_base_psend_requests, \ + (opal_free_list_item_t*)sendreq); \ + } + +#endif diff --git a/ompi/mca/part/persist_aggregated/post_configure.sh b/ompi/mca/part/persist_aggregated/post_configure.sh new file mode 100644 index 00000000000..d06ec572027 --- /dev/null +++ b/ompi/mca/part/persist_aggregated/post_configure.sh @@ -0,0 +1 @@ +DIRECT_CALL_HEADER="ompi/mca/part/rma/part_rma.h" diff --git a/ompi/mca/part/persist_aggregated/schemes/part_persist_aggregated_scheme_regular.c b/ompi/mca/part/persist_aggregated/schemes/part_persist_aggregated_scheme_regular.c new file mode 100644 index 00000000000..0a1f084e116 --- /dev/null +++ b/ompi/mca/part/persist_aggregated/schemes/part_persist_aggregated_scheme_regular.c @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2024 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + * + */ + +#include "part_persist_aggregated_scheme_regular.h" + +#include +#include + +// converts the index of a public partition to the index of its corresponding internal partition +static int internal_partition(struct part_persist_aggregation_state *state, int public_part) +{ + return public_part / state->aggregation_count; +} + +void part_persist_aggregate_regular_init(struct part_persist_aggregation_state *state, + int internal_partition_count, int factor, + int last_internal_partition_size) +{ + state->public_partition_count = (internal_partition_count - 1) * factor + + last_internal_partition_size; + state->internal_partition_count = internal_partition_count; + + // number of user-partitions per internal partition (except for the last one) + state->aggregation_count = factor; + // number of user-partitions corresponding to the last internal partition + state->last_internal_partition_size = last_internal_partition_size; + + // initialize counters + state->public_parts_ready = (opal_atomic_int32_t *) calloc(state->internal_partition_count, + sizeof(opal_atomic_uint32_t)); +} + +void part_persist_aggregate_regular_reset(struct part_persist_aggregation_state *state) +{ + // reset flags + if (NULL != state->public_parts_ready) { + memset((void *) state->public_parts_ready, 0, + state->internal_partition_count * sizeof(opal_atomic_uint32_t)); + } +} + +static inline int is_last_partition(struct part_persist_aggregation_state *state, int partition) +{ + return (partition == state->internal_partition_count - 1); +} + +static inline int num_public_parts(struct part_persist_aggregation_state *state, int partition) +{ + return is_last_partition(state, partition) ? state->last_internal_partition_size + : state->aggregation_count; +} + +void part_persist_aggregate_regular_pready(struct part_persist_aggregation_state *state, + int partition, int *available_partition) +{ + int internal_part = internal_partition(state, partition); + + // this is the new value (after adding) + int count = opal_atomic_add_fetch_32(&state->public_parts_ready[internal_part], 1); + + // push to buffer if internal partition is ready + if (count == num_public_parts(state, internal_part)) { + *available_partition = internal_part; + } else { + *available_partition = -1; + } +} + +void part_persist_aggregate_regular_free(struct part_persist_aggregation_state *state) +{ + if (state->public_parts_ready != NULL) + free((void *) state->public_parts_ready); + state->public_parts_ready = NULL; +} \ No newline at end of file diff --git a/ompi/mca/part/persist_aggregated/schemes/part_persist_aggregated_scheme_regular.h b/ompi/mca/part/persist_aggregated/schemes/part_persist_aggregated_scheme_regular.h new file mode 100644 index 00000000000..d3e8279a1ed --- /dev/null +++ b/ompi/mca/part/persist_aggregated/schemes/part_persist_aggregated_scheme_regular.h @@ -0,0 +1,95 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2024 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + * + */ + +/** + * @file + * This file defines a simple message aggregation scheme: + * A user-provided partitioning into n partitions is mapped + * to an internal partitioning of ceil(n/k) partitions where + * each internal partition corresponds to k public ones + * (with the last partition potentially having a lower size). + * The factor k can be defined to optimize the internal + * number/size of internal partitions. + */ + +#ifndef PART_PERSIST_AGGREGATED_SCHEME_REGULAR_H +#define PART_PERSIST_AGGREGATED_SCHEME_REGULAR_H + +#include "ompi_config.h" + +#include "opal/include/opal/sys/atomic.h" + +/** + * @brief tracks the number of pready calls corresponding to internal partitions + * + */ +struct part_persist_aggregation_state { + // counters for each internal partition + opal_atomic_int32_t *public_parts_ready; + + // number of public partitions + int public_partition_count; + + // number of internal partitions + int internal_partition_count; + + // how many public partitions are aggregated into an internal one + int aggregation_count; + // number of public partitions corresponding to last internal partition + int last_internal_partition_size; +}; + +/** + * @brief initializes the aggregation state + * + * @param[out] state pointer to aggregation state object + * @param[in] internal_partition_count number of internal partitions (i.e. number of messages + * per partitioned transfer) + * @param[in] factor number of public partitions corresponding to each + * internal one other than the last + * @param[in] last_internal_partition_size number of public partitions corresponding to the last + * internal partition internal partition + */ +OMPI_DECLSPEC void part_persist_aggregate_regular_init(struct part_persist_aggregation_state *state, + int internal_partition_count, int factor, + int last_internal_partition_size); + +/** + * @brief resets the aggregation state + * + * @param[out] state pointer to aggregation state object + */ +OMPI_DECLSPEC void +part_persist_aggregate_regular_reset(struct part_persist_aggregation_state *state); + +/** + * @brief marks a public partition as ready and optionally outputs an internal partition that can be + * sent. + * + * @param[in,out] state pointer to aggregation state object + * @param[in] partition index of the public partition to mark ready + * @param[out] available_partition index of the corresponding internal partition if it is ready, + * otherwise -1 + */ +OMPI_DECLSPEC void +part_persist_aggregate_regular_pready(struct part_persist_aggregation_state *state, int partition, + int *available_partition); + +/** + * @brief destroys the aggregation scheme + * + * @param[in,out] state pointer to aggregation state object + */ +OMPI_DECLSPEC void +part_persist_aggregate_regular_free(struct part_persist_aggregation_state *state); + +#endif \ No newline at end of file