diff --git a/src/cond.c b/src/cond.c index d67818d78..f4bb2bc42 100644 --- a/src/cond.c +++ b/src/cond.c @@ -222,6 +222,14 @@ int ABT_cond_wait(ABT_cond cond, ABT_mutex mutex) * Unlike other implementations of condition variables, a spurious wakeup never * occurs. * + * @note + * This function exhibits significantly higher CPU utilization than + * ABT_cond_wait() when blocking, even with the BASIC_WAIT scheduler. This + * is because Argobots lacks a user-space timeout mechanism, requiring + * continuous rescheduling of the caller to poll for `abstime`. The + * `--enable-sched-sleep` option can mitigate this, but it introduces a + * latency penalty for some workloads. + * * @contexts * \DOC_CONTEXT_ANY \DOC_CONTEXT_CTXSWITCH * diff --git a/src/eventual.c b/src/eventual.c index 7cf9ed133..64b56d408 100644 --- a/src/eventual.c +++ b/src/eventual.c @@ -205,6 +205,110 @@ int ABT_eventual_wait(ABT_eventual eventual, void **value) return ABT_SUCCESS; } +static inline double convert_timespec_to_sec(const struct timespec *p_ts) +{ + double secs; + secs = ((double)p_ts->tv_sec) + 1.0e-9 * ((double)p_ts->tv_nsec); + return secs; +} + +/** + * @ingroup EVENTUAL + * @brief Wait on an eventual with a timeout. + * + * \c ABT_eventual_timedwait() blocks the caller until the eventual \c eventual + * is signaled by \c ABT_eventual_set() or until the absolute timeout specified + * by \c abstime is reached. If the eventual is already ready when this function + * is called, the caller will not be blocked and the value will be returned + * immediately. + * + * If \c value is not \c NULL, \c value is set to the memory buffer of + * \c eventual. If \c value is not \c NULL but the size of the memory buffer of + * \c eventual (i.e., \c nbytes passed to \c ABT_eventual_create()) is zero, + * \c value is set to \c NULL. + * + * The memory buffer pointed to by \c value is deallocated when \c eventual is + * freed by \c ABT_eventual_free(). The memory buffer is properly aligned for + * storage of any type of object that has the given size. If the data written + * by \c ABT_eventual_set() is smaller than the size of the memory buffer of + * \c eventual, the contents of the memory buffer that was not written by + * \c ABT_eventual_set() are undefined. The contents of the memory buffer get + * undefined if either \c ABT_eventual_set() or \c ABT_eventual_reset() is + * called for \c eventual. The memory buffer is read-only, so rewriting the + * contents of the obtained memory buffer causes undefined behavior. + * + * \DOC_DESC_ATOMICITY_EVENTUAL_READINESS + * + * @changev20 + * \DOC_DESC_V1X_NOTASK{\c ABT_ERR_EVENTUAL} + * @endchangev20 + * + * @contexts + * \DOC_V1X \DOC_CONTEXT_INIT_NOTASK \DOC_CONTEXT_CTXSWITCH_CONDITIONAL{ + * \c eventual is not ready}\n + * \DOC_V20 \DOC_CONTEXT_INIT \DOC_CONTEXT_CTXSWITCH_CONDITIONAL{\c eventual is + * not ready} + * + * @errors + * \DOC_ERROR_SUCCESS + * \DOC_ERROR_INV_EVENTUAL_HANDLE{\c eventual} + * \DOC_ERROR_COND_TIMEDOUT + * \DOC_V1X \DOC_ERROR_TASK{\c ABT_ERR_EVENTUAL} + * + * @undefined + * \DOC_UNDEFINED_UNINIT + * \DOC_UNDEFINED_NULL_PTR{\c abstime} + * \DOC_UNDEFINED_EVENTUAL_BUFFER{\c eventual, \c value} + * + * @param[in] eventual eventual handle + * @param[out] value memory buffer of the eventual + * @param[in] abstime absolute timeout + * @return Error code + */ +int ABT_eventual_timedwait(ABT_eventual eventual, void **value, + const struct timespec *abstime) +{ + ABTI_UB_ASSERT(ABTI_initialized()); + ABTI_UB_ASSERT(abstime); + + ABTI_local *p_local = ABTI_local_get_local(); + ABTI_eventual *p_eventual = ABTI_eventual_get_ptr(eventual); + ABTI_CHECK_NULL_EVENTUAL_PTR(p_eventual); + +#ifndef ABT_CONFIG_ENABLE_VER_20_API + /* This routine cannot be called by a tasklet. */ + if (ABTI_IS_ERROR_CHECK_ENABLED && p_local) { + ABTI_xstream *p_local_xstream = ABTI_local_get_xstream(p_local); + ABTI_CHECK_TRUE(p_local_xstream->p_thread->type & + ABTI_THREAD_TYPE_YIELDABLE, + ABT_ERR_EVENTUAL); + } +#endif + + double tar_time = convert_timespec_to_sec(abstime); + + ABTD_spinlock_acquire(&p_eventual->lock); + if (p_eventual->ready == ABT_FALSE) { + ABT_bool is_timedout = + ABTI_waitlist_wait_timedout_and_unlock(&p_local, + &p_eventual->waitlist, + &p_eventual->lock, tar_time, + ABT_SYNC_EVENT_TYPE_EVENTUAL, + (void *)p_eventual); + if (is_timedout) { + return ABT_ERR_COND_TIMEDOUT; + } + } else { + ABTD_spinlock_release(&p_eventual->lock); + } + /* This value is updated outside the critical section, but it is okay since + * the "pointer" to the memory buffer is constant and there is no way to + * avoid updating this memory buffer by ABT_eventual_set() etc. */ + if (value) + *value = p_eventual->value; + return ABT_SUCCESS; +} + /** * @ingroup EVENTUAL * @brief Check if an eventual is ready. diff --git a/src/include/abt.h.in b/src/include/abt.h.in index 0d9f295bb..a44f7a641 100644 --- a/src/include/abt.h.in +++ b/src/include/abt.h.in @@ -333,9 +333,10 @@ extern "C" { #define ABT_ERR_COND 41 /** * @ingroup ERROR_CODE - * @brief Error code: a return value when a condition variable is timed out. + * @brief Error code: a return value when a timed wait operation times out. * - * This error code is used by \c ABT_cond_timedwait(). + * This error code is used by \c ABT_cond_timedwait() and + * \c ABT_eventual_timedwait(). */ #define ABT_ERR_COND_TIMEDOUT 42 /** @@ -2635,6 +2636,8 @@ int ABT_rwlock_unlock(ABT_rwlock rwlock) ABT_API_PUBLIC; int ABT_eventual_create(int nbytes, ABT_eventual *neweventual) ABT_API_PUBLIC; int ABT_eventual_free(ABT_eventual *eventual) ABT_API_PUBLIC; int ABT_eventual_wait(ABT_eventual eventual, void **value) ABT_API_PUBLIC; +int ABT_eventual_timedwait(ABT_eventual eventual, void **value, + const struct timespec *abstime) ABT_API_PUBLIC; int ABT_eventual_test(ABT_eventual eventual, void **value, ABT_bool *is_ready) ABT_API_PUBLIC; int ABT_eventual_set(ABT_eventual eventual, void *value, int nbytes) ABT_API_PUBLIC; int ABT_eventual_reset(ABT_eventual eventual) ABT_API_PUBLIC; diff --git a/src/include/abti_error.h b/src/include/abti_error.h index 9743c04f0..8ad5cedee 100644 --- a/src/include/abti_error.h +++ b/src/include/abti_error.h @@ -34,7 +34,7 @@ #define ABTI_STATIC_ASSERT(cond) \ do { \ - ((void)sizeof(char[2 * !!(cond)-1])); \ + ((void)sizeof(char[2 * !!(cond) - 1])); \ } while (0) #ifdef ABT_CONFIG_PRINT_ABT_ERRNO diff --git a/test/basic/Makefile.am b/test/basic/Makefile.am index 012ab117f..c985b9b02 100644 --- a/test/basic/Makefile.am +++ b/test/basic/Makefile.am @@ -70,6 +70,7 @@ TESTS = \ eventual_create \ eventual_static \ eventual_test \ + eventual_timedwait \ barrier \ self_exit_to \ self_rank_id \ @@ -177,6 +178,7 @@ future_create_SOURCES = future_create.c eventual_create_SOURCES = eventual_create.c eventual_static_SOURCES = eventual_static.c eventual_test_SOURCES = eventual_test.c +eventual_timedwait_SOURCES = eventual_timedwait.c barrier_SOURCES = barrier.c self_exit_to_SOURCES = self_exit_to.c self_rank_id_SOURCES = self_rank_id.c diff --git a/test/basic/eventual_timedwait.c b/test/basic/eventual_timedwait.c new file mode 100644 index 000000000..4b4606032 --- /dev/null +++ b/test/basic/eventual_timedwait.c @@ -0,0 +1,275 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */ +/* + * See COPYRIGHT in top-level directory. + */ + +#include +#include +#include +#include +#include "abt.h" +#include "abttest.h" + +#define DEFAULT_NUM_XSTREAMS 1 +#define DEFAULT_NUM_THREADS 4 + +static ABT_eventual eventual = ABT_EVENTUAL_NULL; +static volatile int g_timeout_counter = 0; +static volatile int g_success_counter = 0; +static volatile int g_waiting_counter = 0; + +void eventual_timeout_test(void *arg) +{ + int ret; + struct timespec ts; + struct timeval tv; + int eid; + ABT_unit_id tid; + void *value = NULL; + + ret = ABT_xstream_self_rank(&eid); + ATS_ERROR(ret, "ABT_xstream_self_rank"); + ret = ABT_thread_self_id(&tid); + ATS_ERROR(ret, "ABT_thread_self_id"); + + /* Set timeout to 1 second in the past to force timeout */ + ret = gettimeofday(&tv, NULL); + assert(!ret); + + ts.tv_sec = tv.tv_sec - 1; + ts.tv_nsec = tv.tv_usec * 1000; + + ATS_printf(1, "[U%d:E%d] waiting with past deadline (should timeout)\n", + (int)tid, eid); + ret = ABT_eventual_timedwait(eventual, &value, &ts); + if (ret == ABT_ERR_COND_TIMEDOUT) { + g_timeout_counter++; + ATS_printf(1, "[U%d:E%d] eventual timed out\n", (int)tid, eid); + } else { + ATS_ERROR(ret, "ABT_eventual_timedwait should have timed out"); + } +} + +void eventual_success_test(void *arg) +{ + int ret; + struct timespec ts; + struct timeval tv; + int eid; + ABT_unit_id tid; + void *value = NULL; + + ret = ABT_xstream_self_rank(&eid); + ATS_ERROR(ret, "ABT_xstream_self_rank"); + ret = ABT_thread_self_id(&tid); + ATS_ERROR(ret, "ABT_thread_self_id"); + + /* Set timeout to 10 seconds in the future */ + ret = gettimeofday(&tv, NULL); + assert(!ret); + + ts.tv_sec = tv.tv_sec + 10; + ts.tv_nsec = tv.tv_usec * 1000; + + ATS_printf(1, "[U%d:E%d] waiting with future deadline (should succeed)\n", + (int)tid, eid); + ret = ABT_eventual_timedwait(eventual, &value, &ts); + if (ret == ABT_SUCCESS) { + g_success_counter++; + int *result = (int *)value; + ATS_printf(1, "[U%d:E%d] eventual signaled, value=%d\n", (int)tid, eid, + result ? *result : -1); + } else if (ret == ABT_ERR_COND_TIMEDOUT) { + ATS_printf(1, "[U%d:E%d] unexpected timeout\n", (int)tid, eid); + } else { + ATS_ERROR(ret, "ABT_eventual_timedwait"); + } +} + +int main(int argc, char *argv[]) +{ + ABT_xstream *xstreams; + ABT_pool *pools; + ABT_thread *timeout_threads; + ABT_thread *success_threads; + int num_xstreams; + int num_timeout_threads; + int num_success_threads; + int ret, i, pidx = 0; + int eid; + ABT_unit_id tid; + int test_value = 42; + + /* Initialize */ + ATS_read_args(argc, argv); + if (argc < 2) { + num_xstreams = DEFAULT_NUM_XSTREAMS; + num_timeout_threads = DEFAULT_NUM_THREADS / 2; + num_success_threads = DEFAULT_NUM_THREADS / 2; + } else { + num_xstreams = ATS_get_arg_val(ATS_ARG_N_ES); + int total_threads = ATS_get_arg_val(ATS_ARG_N_ULT); + num_timeout_threads = total_threads / 2; + num_success_threads = total_threads - num_timeout_threads; + } + ATS_init(argc, argv, num_xstreams); + + ATS_printf(1, "# of ESs : %d\n", num_xstreams); + ATS_printf(1, "# of timeout ULTs: %d\n", num_timeout_threads); + ATS_printf(1, "# of success ULTs: %d\n", num_success_threads); + + ret = ABT_xstream_self_rank(&eid); + ATS_ERROR(ret, "ABT_xstream_self_rank"); + ret = ABT_thread_self_id(&tid); + ATS_ERROR(ret, "ABT_thread_self_id"); + + xstreams = (ABT_xstream *)malloc(num_xstreams * sizeof(ABT_xstream)); + pools = (ABT_pool *)malloc(num_xstreams * sizeof(ABT_pool)); + timeout_threads = + (ABT_thread *)malloc(num_timeout_threads * sizeof(ABT_thread)); + success_threads = + (ABT_thread *)malloc(num_success_threads * sizeof(ABT_thread)); + + /* Create an eventual */ + ret = ABT_eventual_create(sizeof(int), &eventual); + ATS_ERROR(ret, "ABT_eventual_create"); + + /* Create ESs */ + ret = ABT_xstream_self(&xstreams[0]); + ATS_ERROR(ret, "ABT_xstream_self"); + for (i = 1; i < num_xstreams; i++) { + ret = ABT_xstream_create(ABT_SCHED_NULL, &xstreams[i]); + ATS_ERROR(ret, "ABT_xstream_create"); + } + + /* Get the pools */ + for (i = 0; i < num_xstreams; i++) { + ret = ABT_xstream_get_main_pools(xstreams[i], 1, &pools[i]); + ATS_ERROR(ret, "ABT_xstream_get_main_pools"); + } + + /* Test 1: Timeout threads - should all timeout */ + ATS_printf(1, "\n=== Test 1: Timeout test ===\n"); + for (i = 0; i < num_timeout_threads; i++) { + ret = ABT_thread_create(pools[pidx], eventual_timeout_test, NULL, + ABT_THREAD_ATTR_NULL, &timeout_threads[i]); + ATS_ERROR(ret, "ABT_thread_create"); + pidx = (pidx + 1) % num_xstreams; + } + + /* Give threads time to start */ + for (i = 0; i < num_timeout_threads; i++) { + ABT_thread_yield(); + } + /* Give extra time for cross-stream scheduling */ + ABT_xstream_check_events(ABT_SCHED_NULL); + + /* Wait for timeout threads to complete */ + for (i = 0; i < num_timeout_threads; i++) { + ret = ABT_thread_free(&timeout_threads[i]); + ATS_ERROR(ret, "ABT_thread_free"); + } + + /* Reset the eventual for next test */ + ret = ABT_eventual_reset(eventual); + ATS_ERROR(ret, "ABT_eventual_reset"); + + /* Test 2: Success threads - create threads that wait with future deadline + */ + ATS_printf(1, "\n=== Test 2: Success test (with signal) ===\n"); + for (i = 0; i < num_success_threads; i++) { + ret = ABT_thread_create(pools[pidx], eventual_success_test, NULL, + ABT_THREAD_ATTR_NULL, &success_threads[i]); + ATS_ERROR(ret, "ABT_thread_create"); + pidx = (pidx + 1) % num_xstreams; + } + + /* Give threads time to start waiting */ + for (i = 0; i < num_success_threads; i++) { + ABT_thread_yield(); + } + /* Give extra time for cross-stream scheduling */ + ABT_xstream_check_events(ABT_SCHED_NULL); + + /* Signal the eventual */ + ret = ABT_eventual_set(eventual, &test_value, sizeof(int)); + ATS_ERROR(ret, "ABT_eventual_set"); + ATS_printf(1, "[U%d:E%d] eventual_set with value=%d\n", (int)tid, eid, + test_value); + + /* Wait for success threads to complete */ + for (i = 0; i < num_success_threads; i++) { + ret = ABT_thread_free(&success_threads[i]); + ATS_ERROR(ret, "ABT_thread_free"); + } + + /* Test 3: Wait on already ready eventual (should return immediately) */ + ATS_printf(1, "\n=== Test 3: Already ready test ===\n"); + struct timespec ts; + struct timeval tv; + void *value = NULL; + + ret = gettimeofday(&tv, NULL); + assert(!ret); + ts.tv_sec = tv.tv_sec + 1; + ts.tv_nsec = tv.tv_usec * 1000; + + ret = ABT_eventual_timedwait(eventual, &value, &ts); + if (ret == ABT_SUCCESS) { + int *result = (int *)value; + ATS_printf(1, + "[U%d:E%d] already ready eventual returned immediately, " + "value=%d\n", + (int)tid, eid, result ? *result : -1); + if (result && *result == test_value) { + ATS_printf(1, "Value matches expected value\n"); + } + } else { + ATS_ERROR(ret, "ABT_eventual_timedwait on ready eventual"); + } + + /* Join and free ESs */ + for (i = 1; i < num_xstreams; i++) { + ret = ABT_xstream_join(xstreams[i]); + ATS_ERROR(ret, "ABT_xstream_join"); + ret = ABT_xstream_free(&xstreams[i]); + ATS_ERROR(ret, "ABT_xstream_free"); + } + + /* Free the eventual */ + ret = ABT_eventual_free(&eventual); + ATS_ERROR(ret, "ABT_eventual_free"); + + /* Validation */ + int expected_timeouts = num_timeout_threads; + int expected_success = num_success_threads; + int failed = 0; + + if (g_timeout_counter != expected_timeouts) { + printf("ERROR: timeout_counter = %d (expected: %d)\n", + g_timeout_counter, expected_timeouts); + failed = 1; + } + if (g_success_counter != expected_success) { + printf("ERROR: success_counter = %d (expected: %d)\n", + g_success_counter, expected_success); + failed = 1; + } + + if (!failed) { + ATS_printf(1, "\n=== All tests passed ===\n"); + ATS_printf(1, "Timeouts: %d/%d\n", g_timeout_counter, + expected_timeouts); + ATS_printf(1, "Success: %d/%d\n", g_success_counter, expected_success); + } + + /* Finalize */ + ret = ATS_finalize(failed); + + free(success_threads); + free(timeout_threads); + free(pools); + free(xstreams); + + return ret; +} diff --git a/test/basic/info_query.c b/test/basic/info_query.c index b1826b23b..76c4bbe67 100644 --- a/test/basic/info_query.c +++ b/test/basic/info_query.c @@ -56,7 +56,7 @@ void info_query_all(ABT_bool init) while (p_query) { if (!(p_query->need_init && !init)) { const int idx = p_query->buffer_idx++; - int32_t *ptr = (int32_t *)(&p_query->buffers[1 + (idx)*3]); + int32_t *ptr = (int32_t *)(&p_query->buffers[1 + (idx) * 3]); ptr[-1] = 0x77777777; ptr[0] = 0x77777777; ptr[1] = 0x77777777; diff --git a/test/benchmark/thread_ops.c b/test/benchmark/thread_ops.c index f79cd3cc0..635f623bd 100644 --- a/test/benchmark/thread_ops.c +++ b/test/benchmark/thread_ops.c @@ -8,7 +8,7 @@ #include "abt.h" #include "abttest.h" -//#define TEST_MIGRATE_TO +// #define TEST_MIGRATE_TO #define USE_JOIN_MANY #ifdef USE_JOIN_MANY #define ABT_THREAD_JOIN_MANY(n, tl) ABT_thread_join_many(n, tl) diff --git a/test/benchmark/thread_ops_all.c b/test/benchmark/thread_ops_all.c index 5f1ac2408..9059cd8b1 100644 --- a/test/benchmark/thread_ops_all.c +++ b/test/benchmark/thread_ops_all.c @@ -8,7 +8,7 @@ #include "abt.h" #include "abttest.h" -//#define TEST_MIGRATE_TO +// #define TEST_MIGRATE_TO #define USE_JOIN_MANY #ifdef USE_JOIN_MANY #define ABT_THREAD_JOIN_MANY(n, tl) ABT_thread_join_many(n, tl)