Skip to content

Commit

Permalink
fix try_lock_mode and add modes: none, global, global_b
Browse files Browse the repository at this point in the history
  • Loading branch information
JiakunYan committed Mar 22, 2024
1 parent ca0f3bc commit 6dcb6a8
Show file tree
Hide file tree
Showing 11 changed files with 77 additions and 27 deletions.
3 changes: 3 additions & 0 deletions lci/api/lci.h
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,9 @@ typedef enum {
LCI_BACKEND_TRY_LOCK_SEND = 1,
LCI_BACKEND_TRY_LOCK_RECV = 1 << 1,
LCI_BACKEND_TRY_LOCK_POLL = 1 << 2,
LCI_BACKEND_TRY_LOCK_GLOBAL = 1 << 3,
LCI_BACKEND_LOCK_GLOBAL = 1 << 4,
LCI_BACKEND_TRY_LOCK_MODE_MAX = 1 << 5,
} LCI_backend_try_lock_mode_t;
extern uint64_t LCI_BACKEND_TRY_LOCK_MODE;

Expand Down
1 change: 1 addition & 0 deletions lci/backend/ibv/server_ibv.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ struct __attribute__((aligned(LCI_CACHE_LINE))) LCISI_ibv_qp_extra_t {
};

typedef struct LCISI_endpoint_t {
struct LCISI_endpoint_super_t super;
struct LCISI_server_t* server;
// Connections O(N)
struct ibv_td* td;
Expand Down
2 changes: 0 additions & 2 deletions lci/backend/ofi/server_ofi.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ void LCISD_endpoint_init(LCIS_server_t server_pp, LCIS_endpoint_t* endpoint_pp,
endpoint_p->server->endpoints[endpoint_p->server->endpoint_count++] =
endpoint_p;
endpoint_p->is_single_threaded = single_threaded;
LCIU_spinlock_init(&endpoint_p->lock);
if (!LCI_OFI_CXI_TRY_NO_HACK &&
strcmp(endpoint_p->server->info->fabric_attr->prov_name, "cxi") == 0 &&
endpoint_p->server->info->domain_attr->mr_mode & FI_MR_ENDPOINT &&
Expand Down Expand Up @@ -230,5 +229,4 @@ void LCISD_endpoint_fina(LCIS_endpoint_t endpoint_pp)
FI_SAFECALL(fi_close((struct fid*)&endpoint_p->ep->fid));
FI_SAFECALL(fi_close((struct fid*)&endpoint_p->cq->fid));
FI_SAFECALL(fi_close((struct fid*)&endpoint_p->av->fid));
LCIU_spinlock_fina(&endpoint_p->lock);
}
6 changes: 3 additions & 3 deletions lci/backend/ofi/server_ofi.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@

#define LCISI_OFI_CS_TRY_ENTER(endpoint_p, mode, ret) \
if (LCI_BACKEND_TRY_LOCK_MODE & mode && !endpoint_p->is_single_threaded && \
!LCIU_try_acquire_spinlock(&endpoint_p->lock)) \
!LCIU_try_acquire_spinlock(&endpoint_p->super.lock)) \
return ret;

#define LCISI_OFI_CS_EXIT(endpoint_p, mode) \
if (LCI_BACKEND_TRY_LOCK_MODE & mode && !endpoint_p->is_single_threaded) \
LCIU_release_spinlock(&endpoint_p->lock);
LCIU_release_spinlock(&endpoint_p->super.lock);

struct LCISI_endpoint_t;

Expand All @@ -46,13 +46,13 @@ typedef struct __attribute__((aligned(LCI_CACHE_LINE))) LCISI_server_t {
} LCISI_server_t;

typedef struct __attribute__((aligned(LCI_CACHE_LINE))) LCISI_endpoint_t {
struct LCISI_endpoint_super_t super;
LCISI_server_t* server;
struct fid_ep* ep;
struct fid_cq* cq;
struct fid_av* av;
fi_addr_t* peer_addrs;
bool is_single_threaded;
LCIU_spinlock_t lock;
} LCISI_endpoint_t;

extern int g_next_rdma_key;
Expand Down
61 changes: 51 additions & 10 deletions lci/backend/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ typedef struct LCID_server_opaque_t* LCIS_server_t;

struct LCID_endpoint_opaque_t;
typedef struct LCID_endpoint_opaque_t* LCIS_endpoint_t;
struct LCISI_endpoint_super_t {
LCIU_spinlock_t lock;
};

typedef uint64_t LCIS_offset_t;

Expand Down Expand Up @@ -98,6 +101,20 @@ static inline LCI_error_t LCISD_post_recv(LCIS_endpoint_t endpoint_pp,
#include "backend/ucx/lcisi_ucx_detail.h"
#endif

#define LCIS_endpoint_super(endpoint) (((LCISI_endpoint_t*)endpoint)->super)

#define LCISI_CS_ENTER(endpoint, ret) \
if (LCI_BACKEND_TRY_LOCK_MODE == LCI_BACKEND_TRY_LOCK_GLOBAL && \
!LCIU_try_acquire_spinlock(&LCIS_endpoint_super(endpoint).lock)) \
return ret; \
else if (LCI_BACKEND_TRY_LOCK_MODE == LCI_BACKEND_LOCK_GLOBAL) \
LCIU_acquire_spinlock(&LCIS_endpoint_super(endpoint).lock);

#define LCISI_CS_EXIT(endpoint) \
if (LCI_BACKEND_TRY_LOCK_MODE == LCI_BACKEND_TRY_LOCK_GLOBAL || \
LCI_BACKEND_TRY_LOCK_MODE == LCI_BACKEND_LOCK_GLOBAL) \
LCIU_release_spinlock(&LCIS_endpoint_super(endpoint).lock);

/* Wrapper functions */
static inline void LCIS_server_init(LCI_device_t device, LCIS_server_t* s)
{
Expand Down Expand Up @@ -136,24 +153,30 @@ static inline void LCIS_endpoint_init(LCIS_server_t server_pp,
LCIS_endpoint_t* endpoint_pp,
bool single_threaded)
{
return LCISD_endpoint_init(server_pp, endpoint_pp, single_threaded);
LCISD_endpoint_init(server_pp, endpoint_pp, single_threaded);
LCIU_spinlock_init(&LCIS_endpoint_super(*endpoint_pp).lock);
}

static inline void LCIS_endpoint_fina(LCIS_endpoint_t endpoint_pp)
{
return LCISD_endpoint_fina(endpoint_pp);
LCIU_spinlock_fina(&LCIS_endpoint_super(endpoint_pp).lock);
LCISD_endpoint_fina(endpoint_pp);
}

static inline int LCIS_poll_cq(LCIS_endpoint_t endpoint_pp,
LCIS_cq_entry_t* entry)
{
return LCISD_poll_cq(endpoint_pp, entry);
LCISI_CS_ENTER(endpoint_pp, 0);
int ret = LCISD_poll_cq(endpoint_pp, entry);
LCISI_CS_EXIT(endpoint_pp);
return ret;
}

static inline LCI_error_t LCIS_post_sends(LCIS_endpoint_t endpoint_pp, int rank,
void* buf, size_t size,
LCIS_meta_t meta)
{
LCISI_CS_ENTER(endpoint_pp, LCI_ERR_RETRY);
LCI_DBG_Log(LCI_LOG_TRACE, "server",
"LCIS_post_sends: rank %d buf %p size %lu meta %d\n", rank, buf,
size, meta);
Expand All @@ -166,16 +189,18 @@ static inline LCI_error_t LCIS_post_sends(LCIS_endpoint_t endpoint_pp, int rank,
} else if (ret == LCI_ERR_RETRY_LOCK) {
LCII_PCOUNTER_ADD(net_send_failed_lock, 1);
ret = LCI_ERR_RETRY;
} else {
} else if (ret == LCI_ERR_RETRY_NOMEM) {
LCII_PCOUNTER_ADD(net_send_failed_nomem, 1);
ret = LCI_ERR_RETRY;
}
LCISI_CS_EXIT(endpoint_pp);
return ret;
}
static inline LCI_error_t LCIS_post_send(LCIS_endpoint_t endpoint_pp, int rank,
void* buf, size_t size, LCIS_mr_t mr,
LCIS_meta_t meta, void* ctx)
{
LCISI_CS_ENTER(endpoint_pp, LCI_ERR_RETRY);
LCI_DBG_Log(LCI_LOG_TRACE, "server",
"LCIS_post_send: rank %d buf %p size %lu mr %p meta %d ctx %p\n",
rank, buf, size, mr.mr_p, meta, ctx);
Expand All @@ -189,16 +214,18 @@ static inline LCI_error_t LCIS_post_send(LCIS_endpoint_t endpoint_pp, int rank,
} else if (ret == LCI_ERR_RETRY_LOCK) {
LCII_PCOUNTER_ADD(net_send_failed_lock, 1);
ret = LCI_ERR_RETRY;
} else {
} else if (ret == LCI_ERR_RETRY_NOMEM) {
LCII_PCOUNTER_ADD(net_send_failed_nomem, 1);
ret = LCI_ERR_RETRY;
}
LCISI_CS_EXIT(endpoint_pp);
return ret;
}
static inline LCI_error_t LCIS_post_puts(LCIS_endpoint_t endpoint_pp, int rank,
void* buf, size_t size, uintptr_t base,
LCIS_offset_t offset, LCIS_rkey_t rkey)
{
LCISI_CS_ENTER(endpoint_pp, LCI_ERR_RETRY);
LCI_DBG_Log(LCI_LOG_TRACE, "server",
"LCIS_post_puts: rank %d buf %p size %lu base %p offset %lu "
"rkey %lu\n",
Expand All @@ -213,17 +240,19 @@ static inline LCI_error_t LCIS_post_puts(LCIS_endpoint_t endpoint_pp, int rank,
} else if (ret == LCI_ERR_RETRY_LOCK) {
LCII_PCOUNTER_ADD(net_send_failed_lock, 1);
ret = LCI_ERR_RETRY;
} else {
} else if (ret == LCI_ERR_RETRY_NOMEM) {
LCII_PCOUNTER_ADD(net_send_failed_nomem, 1);
ret = LCI_ERR_RETRY;
}
LCISI_CS_EXIT(endpoint_pp);
return ret;
}
static inline LCI_error_t LCIS_post_put(LCIS_endpoint_t endpoint_pp, int rank,
void* buf, size_t size, LCIS_mr_t mr,
uintptr_t base, LCIS_offset_t offset,
LCIS_rkey_t rkey, void* ctx)
{
LCISI_CS_ENTER(endpoint_pp, LCI_ERR_RETRY);
LCI_DBG_Log(LCI_LOG_TRACE, "server",
"LCIS_post_put: rank %d buf %p size %lu mr %p base %p "
"offset %lu rkey %lu ctx %p\n",
Expand All @@ -238,10 +267,11 @@ static inline LCI_error_t LCIS_post_put(LCIS_endpoint_t endpoint_pp, int rank,
} else if (ret == LCI_ERR_RETRY_LOCK) {
LCII_PCOUNTER_ADD(net_send_failed_lock, 1);
ret = LCI_ERR_RETRY;
} else {
} else if (ret == LCI_ERR_RETRY_NOMEM) {
LCII_PCOUNTER_ADD(net_send_failed_nomem, 1);
ret = LCI_ERR_RETRY;
}
LCISI_CS_EXIT(endpoint_pp);
return ret;
}
static inline LCI_error_t LCIS_post_putImms(LCIS_endpoint_t endpoint_pp,
Expand All @@ -250,6 +280,7 @@ static inline LCI_error_t LCIS_post_putImms(LCIS_endpoint_t endpoint_pp,
LCIS_offset_t offset,
LCIS_rkey_t rkey, uint32_t meta)
{
LCISI_CS_ENTER(endpoint_pp, LCI_ERR_RETRY);
LCI_DBG_Log(LCI_LOG_TRACE, "server",
"LCIS_post_putImms: rank %d buf %p size %lu base %p offset %lu "
"rkey %lu meta %d\n",
Expand All @@ -264,10 +295,11 @@ static inline LCI_error_t LCIS_post_putImms(LCIS_endpoint_t endpoint_pp,
} else if (ret == LCI_ERR_RETRY_LOCK) {
LCII_PCOUNTER_ADD(net_send_failed_lock, 1);
ret = LCI_ERR_RETRY;
} else {
} else if (ret == LCI_ERR_RETRY_NOMEM) {
LCII_PCOUNTER_ADD(net_send_failed_nomem, 1);
ret = LCI_ERR_RETRY;
}
LCISI_CS_EXIT(endpoint_pp);
return ret;
}
static inline LCI_error_t LCIS_post_putImm(LCIS_endpoint_t endpoint_pp,
Expand All @@ -277,6 +309,7 @@ static inline LCI_error_t LCIS_post_putImm(LCIS_endpoint_t endpoint_pp,
LCIS_rkey_t rkey, LCIS_meta_t meta,
void* ctx)
{
LCISI_CS_ENTER(endpoint_pp, LCI_ERR_RETRY);
LCI_DBG_Log(LCI_LOG_TRACE, "server",
"LCIS_post_putImm: rank %d buf %p size %lu mr %p base %p "
"offset %lu rkey %lu meta %u ctx %p\n",
Expand All @@ -291,20 +324,28 @@ static inline LCI_error_t LCIS_post_putImm(LCIS_endpoint_t endpoint_pp,
} else if (ret == LCI_ERR_RETRY_LOCK) {
LCII_PCOUNTER_ADD(net_send_failed_lock, 1);
ret = LCI_ERR_RETRY;
} else {
} else if (ret == LCI_ERR_RETRY_NOMEM) {
LCII_PCOUNTER_ADD(net_send_failed_nomem, 1);
ret = LCI_ERR_RETRY;
}
LCISI_CS_EXIT(endpoint_pp);
return ret;
}
static inline LCI_error_t LCIS_post_recv(LCIS_endpoint_t endpoint_pp, void* buf,
uint32_t size, LCIS_mr_t mr, void* ctx)
{
LCISI_CS_ENTER(endpoint_pp, LCI_ERR_RETRY);
LCI_DBG_Log(LCI_LOG_TRACE, "server",
"LCIS_post_recv: buf %p size %u mr %p user_context %p\n", buf,
size, mr.mr_p, ctx);
LCI_error_t ret = LCISD_post_recv(endpoint_pp, buf, size, mr, ctx);
if (ret == LCI_OK) LCII_PCOUNTER_ADD(net_recv_posted, 1);
if (ret == LCI_OK) {
LCII_PCOUNTER_ADD(net_recv_posted, 1);
} else if (ret == LCI_ERR_RETRY_LOCK) {
LCII_PCOUNTER_ADD(net_recv_failed_lock, 1);
ret = LCI_ERR_RETRY;
}
LCISI_CS_EXIT(endpoint_pp);
return ret;
}

Expand Down
1 change: 1 addition & 0 deletions lci/backend/ucx/server_ucx.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ typedef struct __attribute__((aligned(LCI_CACHE_LINE))) LCISI_server_t {
} LCISI_server_t;

typedef struct __attribute__((aligned(LCI_CACHE_LINE))) LCISI_endpoint_t {
struct LCISI_endpoint_super_t super;
LCISI_server_t* server;
ucp_worker_h worker;
ucp_address_t* if_address;
Expand Down
1 change: 1 addition & 0 deletions lci/profile/performance_counter.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ extern LCT_pcounter_ctx_t LCII_pcounter_ctx;
_macro(net_sends_posted) \
_macro(net_send_posted) \
_macro(net_recv_posted) \
_macro(net_recv_failed_lock) \
_macro(net_send_comp) \
_macro(net_recv_comp) \
_macro(net_send_failed_lock) \
Expand Down
14 changes: 10 additions & 4 deletions lci/runtime/env.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,20 +98,26 @@ void LCII_env_init(int num_proc, int rank)
LCI_OFI_CXI_TRY_NO_HACK = LCIU_getenv_or("LCI_OFI_CXI_TRY_NO_HACK", false);
{
// default value
LCI_BACKEND_TRY_LOCK_MODE = LCI_BACKEND_TRY_LOCK_SEND |
LCI_BACKEND_TRY_LOCK_RECV |
LCI_BACKEND_TRY_LOCK_POLL;
LCI_BACKEND_TRY_LOCK_MODE = 0;
// if users explicitly set the value
char* p = getenv("LCI_BACKEND_TRY_LOCK_MODE");
if (p) {
LCT_dict_str_int_t dict[] = {
{"none", 0},
{"send", LCI_BACKEND_TRY_LOCK_SEND},
{"recv", LCI_BACKEND_TRY_LOCK_RECV},
{"poll", LCI_BACKEND_TRY_LOCK_POLL},
{"global", LCI_BACKEND_TRY_LOCK_GLOBAL},
{"global_b", LCI_BACKEND_LOCK_GLOBAL},
};
LCI_BACKEND_TRY_LOCK_MODE =
LCT_parse_arg(dict, sizeof(dict) / sizeof(dict[0]), p, ";");
LCT_parse_arg(dict, sizeof(dict) / sizeof(dict[0]), p, ",");
}
LCI_Assert(LCI_BACKEND_TRY_LOCK_MODE < LCI_BACKEND_TRY_LOCK_MODE_MAX,
"Unexpected LCI_BACKEND_TRY_LOCK_MODE %d",
LCI_BACKEND_TRY_LOCK_MODE);
LCI_Log(LCI_LOG_INFO, "env", "set LCI_BACKEND_TRY_LOCK_MODE to be %d\n",
LCI_BACKEND_TRY_LOCK_MODE);
}
LCI_UCX_USE_TRY_LOCK = LCIU_getenv_or("LCI_UCX_USE_TRY_LOCK", 0);
LCI_UCX_PROGRESS_FOCUSED = LCIU_getenv_or("LCI_UCX_PROGRESS_FOCUSED", 0);
Expand Down
4 changes: 2 additions & 2 deletions lct/pcounter/pcounter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,11 @@ struct ctx_t {
mode_str = p;
}
if (mode_str == "on-the-fly") {
dump_ofilename = "lct_pcounter.%.out";
dump_ofilename = std::string("lct_pcounter.") + LCT_hostname + ".%.out";
dump_record_on_the_fly = true;
record_interval = 1000000;
} else if (mode_str == "on-the-fly-lw") {
dump_ofilename = "lct_pcounter.%.out";
dump_ofilename = std::string("lct_pcounter.") + LCT_hostname + ".%.out";
dump_record_on_the_fly = true;
dump_record_on_the_fly_lw = true;
record_interval = 1000000;
Expand Down
4 changes: 2 additions & 2 deletions lct/pmi/pmi_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ void LCT_pmi_getname(int rank, char* key, char* value)
}
void LCT_pmi_barrier()
{
LCT_DBG_Log(LCT_log_ctx_default, LCT_LOG_TRACE, "pmi", "enter pmi barrier\n");
LCT_Log(LCT_log_ctx_default, LCT_LOG_DEBUG, "pmi", "enter pmi barrier\n");
lcti_pmi_ops.barrier();
LCT_DBG_Log(LCT_log_ctx_default, LCT_LOG_TRACE, "pmi", "leave pmi barrier\n");
LCT_Log(LCT_log_ctx_default, LCT_LOG_DEBUG, "pmi", "leave pmi barrier\n");
}
void LCT_pmi_finalize() { lcti_pmi_ops.finalize(); }
7 changes: 3 additions & 4 deletions lct/util/string.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,13 @@ uint64_t LCT_parse_arg(LCT_dict_str_int_t dict[], int count, const char* key,
}
start_pos = end_pos + 1;
// process the word
uint64_t cur_val;
bool succeed = LCT_str_int_search(dict, count, word.c_str(), 0,
reinterpret_cast<int*>(&cur_val));
int cur_val;
bool succeed = LCT_str_int_search(dict, count, word.c_str(), 0, &cur_val);
if (!succeed)
LCT_Warn(LCT_log_ctx_default, "Unknown word %s in key %s\n", word.c_str(),
key);
else
ret |= cur_val;
ret |= (uint64_t)cur_val;
}
return ret;
}

0 comments on commit 6dcb6a8

Please sign in to comment.