Skip to content

Commit cd029d9

Browse files
committed
Push tasks in working condition with hwloc
Drop compilation constants for dynamic values and allocations to make task pushing work with OpenStream.
1 parent b9c6f5c commit cd029d9

7 files changed

+49
-29
lines changed

libworkstream_df/hwloc-support.c

+13
Original file line numberDiff line numberDiff line change
@@ -483,4 +483,17 @@ void openstream_hwloc_cleanup(void) {
483483
cpuid_to_closest_numa_node = NULL;
484484
num_numa_nodes = 0;
485485
topology_depth = 0;
486+
}
487+
488+
unsigned hwloc_mem_transfer_cost(unsigned numa_node_a, unsigned numa_node_b) {
489+
// Best information source should be provided by inter-node bandwith
490+
if (pu_bandwidth_matrix_size) {
491+
return pu_bandwidth_distances[numa_node_a][numa_node_b];
492+
}
493+
// Second best information source can be extracted by looking at the latency
494+
if (pu_latency_matrix_size) {
495+
return pu_latency_distances[numa_node_a][numa_node_b];
496+
}
497+
// Assume uniform transfer cost when no other data is available
498+
return 1;
486499
}

libworkstream_df/hwloc-support.h

+2
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,6 @@ unsigned closest_numa_node_of_processing_unit(const hwloc_obj_t obj);
6060

6161
void openstream_hwloc_cleanup(void);
6262

63+
unsigned hwloc_mem_transfer_cost(unsigned numa_node_a, unsigned numa_node_b);
64+
6365
#endif // HWLOC_SUPPORT_H_

libworkstream_df/profiling.c

+2-2
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ void init_wqueue_counters(wstream_df_thread_p th) {
292292
#if ALLOW_PUSHES
293293
th->steals_pushed = 0;
294294
th->pushes_fails = 0;
295-
memset(th->pushes_mem, 0, sizeof(th->pushes_mem));
295+
th->pushes_mem = calloc(topology_depth, sizeof(*th->pushes_mem));
296296
#endif
297297

298298
th->reuse_addr = 0;
@@ -434,7 +434,7 @@ void dump_wqueue_counters (unsigned int num_workers, wstream_df_thread_p* wstrea
434434
{
435435
#ifdef WS_PAPI_PROFILE
436436
#ifdef DUMP_NUMA_COUNTERS
437-
for(int i = 0; i < MAX_NUMA_NODES; i++) {
437+
for(int i = 0; i < num_numa_nodes; i++) {
438438
dump_numa_counters_single(numa_node_by_id(i));
439439
}
440440
#endif

libworkstream_df/profiling.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@ struct wstream_df_thread;
1212
struct wstream_df_numa_node;
1313
extern unsigned wstream_num_workers;
1414

15-
#if ALLOW_PUSHES
15+
#if ALLOW_PUSHES && WQUEUE_PROFILE
1616
#define WSTREAM_DF_THREAD_WQUEUE_PROFILE_PUSH_FIELDS \
1717
unsigned long long steals_pushed; \
18-
unsigned long long pushes_mem[MEM_NUM_LEVELS]; \
18+
unsigned long long *pushes_mem; \
1919
unsigned long long pushes_fails;
2020
#else
2121
#define WSTREAM_DF_THREAD_WQUEUE_PROFILE_PUSH_FIELDS

libworkstream_df/work_distribution.c

+24-21
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ void import_pushes(wstream_df_thread_p cthread)
118118
}
119119
}
120120

121-
int work_push_beneficial_max_writer(wstream_df_frame_p fp, wstream_df_thread_p cthread, int num_workers, int* target_worker)
121+
int work_push_beneficial_max_writer(wstream_df_frame_p fp, wstream_df_thread_p cthread, int num_workers, unsigned* target_worker)
122122
{
123123
unsigned int max_worker;
124124
int max_data;
@@ -223,7 +223,7 @@ int work_push_beneficial_split_owner_chain(wstream_df_frame_p fp, wstream_df_thr
223223
unsigned int max_worker;
224224
int numa_node_id;
225225
int max_data;
226-
size_t data[MAX_NUMA_NODES];
226+
size_t data[num_numa_nodes];
227227
wstream_df_numa_node_p numa_node;
228228
unsigned int rand_idx;
229229

@@ -247,7 +247,7 @@ int work_push_beneficial_split_owner_chain(wstream_df_frame_p fp, wstream_df_thr
247247
max_data = data[cthread->numa_node->id];
248248
numa_node_id = cthread->numa_node->id;
249249

250-
for(int i = 0; i < MAX_NUMA_NODES; i++) {
250+
for(unsigned i = 0; i < num_numa_nodes; i++) {
251251
if((int)data[i] > max_data) {
252252
max_data = data[i];
253253
numa_node_id = i;
@@ -272,13 +272,13 @@ int work_push_beneficial_split_owner_chain_inner_mw(wstream_df_frame_p fp, wstre
272272
unsigned int max_worker;
273273
int numa_node_id;
274274
int max_data;
275-
size_t data[MAX_NUMA_NODES];
275+
size_t data[num_numa_nodes];
276276
wstream_df_numa_node_p numa_node;
277277
unsigned int rand_idx;
278278
int node_id;
279279

280280
#if defined(PUSH_EQUAL_RANDOM)
281-
size_t others[MAX_NUMA_NODES];
281+
size_t others[num_numa_nodes];
282282
int num_others = 0;
283283
#endif
284284

@@ -308,14 +308,14 @@ int work_push_beneficial_split_owner_chain_inner_mw(wstream_df_frame_p fp, wstre
308308
numa_node_id = cthread->numa_node->id;
309309

310310
#if defined(PUSH_EQUAL_SEQ)
311-
for(int i = 0; i < MAX_NUMA_NODES; i++) {
311+
for(unsigned i = 0; i < num_numa_nodes; i++) {
312312
if((int)data[i] > max_data) {
313313
max_data = data[i];
314314
numa_node_id = i;
315315
}
316316
}
317317
#elif defined(PUSH_EQUAL_RANDOM)
318-
for(int i = 0; i < MAX_NUMA_NODES; i++) {
318+
for(unsigned i = 0; i < num_numa_nodes; i++) {
319319
if((int)data[i] > max_data)
320320
others[num_others++] = i;
321321

@@ -368,9 +368,8 @@ int work_push_beneficial_split_score_nodes(wstream_df_frame_p fp, wstream_df_thr
368368
{
369369
unsigned int max_worker;
370370
int numa_node_id;
371-
int max_data;
372-
size_t data[MAX_NUMA_NODES];
373-
size_t scores[MAX_NUMA_NODES];
371+
size_t data[num_numa_nodes];
372+
size_t scores[num_numa_nodes];
374373
size_t min_score;
375374
wstream_df_numa_node_p numa_node;
376375
int factor;
@@ -379,7 +378,7 @@ int work_push_beneficial_split_score_nodes(wstream_df_frame_p fp, wstream_df_thr
379378
int input_size = 0;
380379

381380
#if defined(PUSH_EQUAL_RANDOM)
382-
size_t others[MAX_NUMA_NODES];
381+
size_t others[num_numa_nodes];
383382
int num_others = 0;
384383
#endif
385384

@@ -390,8 +389,10 @@ int work_push_beneficial_split_score_nodes(wstream_df_frame_p fp, wstream_df_thr
390389
/* By default assume that data is going to be reused */
391390
if(vi->reuse_data_view)
392391
node_id = slab_numa_node_of(vi->reuse_data_view->data);
392+
#if USE_BROADCAST_TABLES
393393
else if(vi->broadcast_table) /* Peek view with deferred copy */
394394
node_id = -1;
395+
#endif // USE_BROADCAST_TABLES
395396
else
396397
node_id = slab_numa_node_of(vi->data);
397398

@@ -407,22 +408,22 @@ int work_push_beneficial_split_score_nodes(wstream_df_frame_p fp, wstream_df_thr
407408
if(input_size < PUSH_MIN_FRAME_SIZE)
408409
return 0;
409410

410-
for(int target_node = 0; target_node < MAX_NUMA_NODES; target_node++)
411-
for(int source_node = 0; source_node < MAX_NUMA_NODES; source_node++)
412-
scores[target_node] += data[source_node] * mem_transfer_costs(target_node, source_node);
411+
for(unsigned target_node = 0; target_node < num_numa_nodes; target_node++)
412+
for(unsigned source_node = 0; source_node < num_numa_nodes; source_node++)
413+
scores[target_node] += data[source_node] * hwloc_mem_transfer_cost(target_node, source_node);
413414

414415
min_score = scores[cthread->numa_node->id];
415416
numa_node_id = cthread->numa_node->id;
416417

417418
#if defined(PUSH_EQUAL_SEQ)
418-
for(int i = 0; i < MAX_NUMA_NODES; i++) {
419+
for(unsigned i = 0; i < num_numa_nodes; i++) {
419420
if(scores[i] < min_score) {
420421
min_score = scores[i];
421422
numa_node_id = i;
422423
}
423424
}
424425
#elif defined(PUSH_EQUAL_RANDOM)
425-
for(int i = 0; i < MAX_NUMA_NODES; i++) {
426+
for(int i = 0; i < num_numa_nodes; i++) {
426427
if(scores[i] == min_score)
427428
others[num_others++] = i;
428429

@@ -466,7 +467,7 @@ int work_push_beneficial_split_score_nodes(wstream_df_frame_p fp, wstream_df_thr
466467
* of the worker suited best for execution in target_worker. Otherwise 0 is
467468
* returned.
468469
*/
469-
int work_push_beneficial(wstream_df_frame_p fp, wstream_df_thread_p cthread, int num_workers, int* target_worker)
470+
int work_push_beneficial(wstream_df_frame_p fp, wstream_df_thread_p cthread, int num_workers, wstream_df_thread_p* wstream_df_worker_threads, int* target_worker)
470471
{
471472
int res;
472473
unsigned int lcl_target_worker;
@@ -496,7 +497,7 @@ int work_push_beneficial(wstream_df_frame_p fp, wstream_df_thread_p cthread, int
496497
if(/* Only migrate to a different worker */
497498
lcl_target_worker != cthread->worker_id &&
498499
/* Do not migrate to workers that are too close in the memory hierarchy */
499-
mem_lowest_common_level(cthread->worker_id, worker_id_to_cpu(lcl_target_worker)) >= PUSH_MIN_MEM_LEVEL)
500+
level_of_common_ancestor(cthread->cpu, wstream_df_worker_threads[lcl_target_worker]->cpu) >= PUSH_MIN_MEM_LEVEL)
500501
{
501502
*target_worker = lcl_target_worker;
502503
return 1;
@@ -517,7 +518,6 @@ int work_try_push(wstream_df_frame_p fp,
517518
{
518519
int level;
519520
int curr_owner;
520-
int fp_size;
521521

522522
/* Save current owner for statistics and update new owner */
523523
curr_owner = fp->last_owner;
@@ -526,11 +526,14 @@ int work_try_push(wstream_df_frame_p fp,
526526
/* We need to copy frame attributes used afterwards as the frame will
527527
* be under control of the target worker once it is pushed.
528528
*/
529-
fp_size = fp->size;
529+
530+
#if ALLOW_WQEVENT_SAMPLING
531+
int fp_size = fp->size;
532+
#endif // ALLOW_WQEVENT_SAMPLING
530533

531534
if(fifo_pushback(&wstream_df_worker_threads[target_worker]->push_fifo, fp)) {
532535
/* Push was successful, update traces and statistics */
533-
level = mem_lowest_common_level(cthread->worker_id, worker_id_to_cpu(target_worker));
536+
level = level_of_common_ancestor(cthread->cpu, wstream_df_worker_threads[target_worker]->cpu);
534537
inc_wqueue_counter(&cthread->pushes_mem[level], 1);
535538

536539
trace_push(cthread, target_worker, worker_id_to_cpu(target_worker), fp_size, fp);

libworkstream_df/work_distribution.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ void reorder_pushes(wstream_df_thread_p cthread);
1010

1111
#if ALLOW_PUSHES
1212
void import_pushes(wstream_df_thread_p cthread);
13-
int work_push_beneficial(wstream_df_frame_p fp, wstream_df_thread_p cthread, int num_workers, int* target_worker);
13+
int work_push_beneficial(wstream_df_frame_p fp, wstream_df_thread_p cthread, int num_workers, wstream_df_thread_p* wstream_df_worker_threads, int* target_worker);
1414
int work_try_push(wstream_df_frame_p fp, int target_worker, wstream_df_thread_p cthread, wstream_df_thread_p* wstream_df_worker_threads);
1515
#endif
1616

libworkstream_df/wstream_df.c

+5-3
Original file line numberDiff line numberDiff line change
@@ -397,8 +397,9 @@ tdecrease_n (void *data, size_t n, bool is_write)
397397
#if ALLOW_PUSHES
398398
int target_worker;
399399
/* Check whether the frame should be pushed somewhere else */
400-
int beneficial = work_push_beneficial(fp, cthread, wstream_num_workers,
401-
&target_worker);
400+
int beneficial =
401+
work_push_beneficial(fp, cthread, wstream_num_workers,
402+
wstream_df_worker_threads, &target_worker);
402403

403404
#ifdef PUSH_ONLY_IF_NOT_STOLEN_AND_CACHE_EMPTY
404405
int curr_stolen = (cthread->current_frame &&
@@ -904,11 +905,12 @@ __attribute__((__optimize__("O1"))) static void worker_thread(void) {
904905

905906
trace_state_change(cthread, WORKER_STATE_SEEKING);
906907
while (true) {
907-
if (cthread->yield)
908+
if (cthread->yield) {
908909
while (true) {
909910
struct timespec ts = {.tv_sec = 0, .tv_nsec = 100000000};
910911
nanosleep(&ts, NULL);
911912
}
913+
}
912914

913915
#if ALLOW_PUSHES
914916
#if !ALLOW_PUSH_REORDER

0 commit comments

Comments
 (0)