Skip to content

Commit 579b4f4

Browse files
committed
steal work that has already been claimed
QueuePerThreadPool normally claims an entire work queue at once instead of popping them off one at a time. This reduces the contention on the work queues. However, this scheme makes it possible to cause starvation when a work item takes a long time to complete and has more work items queued up after it, all of which have already been claimed and are not in the waiting or deferred queues. Added steal_active function to prevent starvation by stealing work that has already been claimed, reproducing the effect of threads popping off individual work items. steal_active is only called after the waiting and deferred queues have been checked and found to be empty, and so should not be called frequently.
1 parent 183a767 commit 579b4f4

File tree

4 files changed

+156
-16
lines changed

4 files changed

+156
-16
lines changed

include/SinglyLinkedList.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ struct SinglyLinkedList {
8383
typedef struct SinglyLinkedList sll_t;
8484

8585
sll_t *sll_init(sll_t *sll);
86-
sll_t *sll_push(sll_t *sll, void *data);
86+
sll_t *sll_push(sll_t *sll, void *data); /* back */
87+
void *sll_pop(sll_t *sll); /* front */
8788
sll_t *sll_move_first(sll_t *dst, sll_t *src, const uint64_t n); /* move first n from src to dst, replacing dst */
8889
sll_t *sll_move(sll_t *dst, sll_t *src); /* move all of src to dst, replacing dst */
8990
sll_t *sll_move_append_first(sll_t *dst, sll_t *src, const uint64_t n); /* move first n from src to dst, appending to dst */

src/QueuePerThreadPool.c

Lines changed: 86 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,9 @@ typedef enum {
7676

7777
/* The context for a single thread in QPTPool */
7878
typedef struct QPTPoolThreadData {
79+
pthread_mutex_t active_mutex;
80+
sll_t active; /* work items that have already been claimed by this thread */
81+
7982
sll_t waiting; /* generally push into this queue */
8083
sll_t deferred; /* push into here if waiting queue is too big; pop when waiting queue is empty */
8184
pthread_mutex_t mutex;
@@ -163,6 +166,49 @@ static uint64_t steal_work(QPTPool_t *ctx, const size_t id,
163166
return 0;
164167
}
165168

169+
/*
170+
* QueuePerThreadPool normally claims an entire work queue at once
171+
* instead of popping them off one at a time. This reduces the
172+
* contention on the work queues. However, this scheme makes it
173+
* possible to cause starvation when a work item takes a long time to
174+
* complete and has more work items queued up after it, all of which
175+
* have already been claimed and are not in the waiting or deferred
176+
* queues.
177+
*
178+
* This function attempts to prevent starvation by stealing work that
179+
* has already been claimed, reproducing the effect of threads popping
180+
* off individual work items. This function is only called after the
181+
* waiting and deferred queues have been checked and found to be
182+
* empty, and so should not be called frequently.
183+
*/
184+
static uint64_t steal_active(QPTPool_t *ctx, const size_t id,
185+
const size_t start, const size_t end) {
186+
QPTPoolThreadData_t *tw = &ctx->data[id];
187+
188+
for(size_t i = start; i < end; i++) {
189+
if (i == id) {
190+
continue;
191+
}
192+
193+
QPTPoolThreadData_t *target = &ctx->data[i];
194+
195+
if (pthread_mutex_trylock(&target->active_mutex) == 0) {
196+
if (target->active.size) {
197+
const uint64_t active = target->active.size * ctx->steal.num / ctx->steal.denom;
198+
if (active) {
199+
sll_move_first(&tw->waiting, &target->active, active);
200+
pthread_mutex_unlock(&target->active_mutex);
201+
return active;
202+
}
203+
}
204+
205+
pthread_mutex_unlock(&target->active_mutex);
206+
}
207+
}
208+
209+
return 0;
210+
}
211+
166212
static void *worker_function(void *args) {
167213
timestamp_create_start(wf);
168214

@@ -176,10 +222,6 @@ static void *worker_function(void *args) {
176222
QPTPoolThreadData_t *tw = &ctx->data[id];
177223

178224
while (1) {
179-
timestamp_create_start(wf_sll_init);
180-
sll_t being_processed; /* don't bother initializing */
181-
timestamp_set_end(wf_sll_init);
182-
183225
timestamp_create_start(wf_tw_mutex_lock);
184226
pthread_mutex_lock(&tw->mutex);
185227
timestamp_set_end(wf_tw_mutex_lock);
@@ -197,7 +239,19 @@ static void *worker_function(void *args) {
197239
!tw->waiting.size && !tw->deferred.size &&
198240
ctx->incomplete) {
199241
if (steal_work(ctx, id, tw->steal_from, ctx->nthreads) == 0) {
200-
steal_work(ctx, id, 0, tw->steal_from);
242+
if (steal_work(ctx, id, 0, tw->steal_from) == 0) {
243+
/*
244+
* if still can't find anything, try the active queue
245+
*
246+
* this should only be called if there is some
247+
* work that is taking so long that the rest of
248+
* the threads have run out of work, so this
249+
* should not happen too often
250+
*/
251+
if (steal_active(ctx, id, tw->steal_from, ctx->nthreads) == 0) {
252+
steal_active(ctx, id, 0, tw->steal_from);
253+
}
254+
}
201255
}
202256

203257
tw->steal_from = (tw->steal_from + 1) % ctx->nthreads;
@@ -232,12 +286,14 @@ static void *worker_function(void *args) {
232286

233287
/* move entire queue into work and clear out queue */
234288
timestamp_create_start(wf_move_queue);
289+
pthread_mutex_lock(&tw->active_mutex);
235290
if (tw->waiting.size) {
236-
sll_move(&being_processed, &tw->waiting);
291+
sll_move(&tw->active, &tw->waiting);
237292
}
238293
else {
239-
sll_move(&being_processed, &tw->deferred);
294+
sll_move(&tw->active, &tw->deferred);
240295
}
296+
pthread_mutex_unlock(&tw->active_mutex);
241297
timestamp_set_end(wf_move_queue);
242298

243299
#if defined(DEBUG) && defined (QPTPOOL_QUEUE_SIZE)
@@ -269,19 +325,35 @@ static void *worker_function(void *args) {
269325
/* process all work */
270326
size_t work_count = 0;
271327

328+
/*
329+
* pop work item off before it is processed so that if another
330+
* thread steals from the active queue, the current active
331+
* work will not be re-run
332+
*
333+
* this has the side effect of moving 2 frees into the loop
334+
* instead of batching all of them after processing the work
335+
*
336+
* tradeoffs:
337+
* more locking
338+
* delayed work
339+
* lower memory utilization
340+
*/
272341
timestamp_create_start(wf_get_queue_head);
273-
sll_node_t *w = sll_head_node(&being_processed);
342+
pthread_mutex_lock(&tw->active_mutex);
343+
struct queue_item *qi = (struct queue_item *) sll_pop(&tw->active);
344+
pthread_mutex_unlock(&tw->active_mutex);
274345
timestamp_end_print(ctx->debug_buffers, id, "wf_get_queue_head", wf_get_queue_head);
275346

276-
while (w) {
347+
while (qi) {
277348
timestamp_create_start(wf_process_work);
278-
struct queue_item *qi = sll_node_data(w);
279-
280349
tw->threads_successful += !qi->func(ctx, id, qi->work, ctx->args);
350+
free(qi);
281351
timestamp_end_print(ctx->debug_buffers, id, "wf_process_work", wf_process_work);
282352

283353
timestamp_create_start(wf_next_work);
284-
w = sll_next_node(w);
354+
pthread_mutex_lock(&tw->active_mutex);
355+
qi = (struct queue_item *) sll_pop(&tw->active);
356+
pthread_mutex_unlock(&tw->active_mutex);
285357
timestamp_end_print(ctx->debug_buffers, id, "wf_next_work", wf_next_work);
286358

287359
work_count++;
@@ -290,16 +362,13 @@ static void *worker_function(void *args) {
290362
timestamp_set_end(wf_process_queue);
291363

292364
timestamp_create_start(wf_cleanup);
293-
sll_destroy(&being_processed, free);
294365
tw->threads_started += work_count;
295-
296366
pthread_mutex_lock(&ctx->mutex);
297367
ctx->incomplete -= work_count;
298368
pthread_mutex_unlock(&ctx->mutex);
299369
timestamp_set_end(wf_cleanup);
300370

301371
#if defined(DEBUG) && defined(PER_THREAD_STATS)
302-
timestamp_print(ctx->debug_buffers, id, "wf_sll_init", wf_sll_init);
303372
timestamp_print(ctx->debug_buffers, id, "wf_tw_mutex_lock", wf_tw_mutex_lock);
304373
timestamp_print(ctx->debug_buffers, id, "wf_ctx_mutex_lock", wf_ctx_mutex_lock);
305374
timestamp_print(ctx->debug_buffers, id, "wf_wait", wf_wait);
@@ -366,6 +435,7 @@ QPTPool_t *QPTPool_init(const size_t nthreads, void *args) {
366435
/* set up thread data, but not threads */
367436
for(size_t i = 0; i < nthreads; i++) {
368437
QPTPoolThreadData_t *data = &ctx->data[i];
438+
sll_init(&data->active);
369439
sll_init(&data->waiting);
370440
sll_init(&data->deferred);
371441
pthread_mutex_init(&data->mutex, NULL);
@@ -702,6 +772,7 @@ void QPTPool_destroy(QPTPool_t *ctx) {
702772
*/
703773
sll_destroy(&data->deferred, free);
704774
sll_destroy(&data->waiting, free);
775+
sll_destroy(&data->active, free);
705776
}
706777

707778
pthread_mutex_destroy(&ctx->mutex);

src/SinglyLinkedList.c

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,28 @@ sll_t *sll_push(sll_t *sll, void *data) {
103103
return sll;
104104
}
105105

106+
void *sll_pop(sll_t *sll) {
107+
/* Not checking arguments */
108+
109+
if (sll->size == 0) {
110+
return NULL;
111+
}
112+
113+
sll_node_t *head = sll->head;
114+
void *data = head->data;
115+
sll->head = head->next;
116+
117+
if (sll->tail == head) {
118+
sll->tail = NULL;
119+
}
120+
121+
sll->size--;
122+
123+
free(head);
124+
125+
return data;
126+
}
127+
106128
sll_t *sll_move_first(sll_t *dst, sll_t *src, const uint64_t n) {
107129
/* Not checking arguments */
108130

test/unit/googletest/SinglyLinkedList.cpp

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,52 @@ TEST(SinglyLinkedList, push) {
104104
sll_destroy(&sll, nullptr);
105105
}
106106

107+
TEST(SinglyLinkedList, pop) {
108+
const size_t count = 5;
109+
110+
sll_t sll;
111+
EXPECT_EQ(&sll, sll_init(&sll));
112+
EXPECT_EQ(sll.head, nullptr);
113+
EXPECT_EQ(sll.tail, nullptr);
114+
115+
void *data = &sll;
116+
117+
for(size_t items = 1; items <= count; items++) {
118+
// push items
119+
for(size_t i = 1; i <= items; i++) {
120+
EXPECT_EQ(&sll, sll_push(&sll, data));
121+
}
122+
EXPECT_EQ(sll.size, items);
123+
124+
// pop all but the last item
125+
for(size_t i = 1; i < items; i++) {
126+
void *popped = sll_pop(&sll);
127+
EXPECT_EQ(popped, data);
128+
129+
EXPECT_NE(sll.head, nullptr);
130+
EXPECT_NE(sll.tail, nullptr);
131+
EXPECT_EQ(sll.size, items - i);
132+
}
133+
134+
EXPECT_EQ(sll.head, sll.tail);
135+
EXPECT_EQ(sll.size, (size_t) 1);
136+
137+
// pop last item
138+
void *popped = sll_pop(&sll);
139+
EXPECT_EQ(popped, data);
140+
141+
// head and tail are NULL
142+
EXPECT_EQ(sll.head, nullptr);
143+
EXPECT_EQ(sll.tail, nullptr);
144+
EXPECT_EQ(sll.size, (size_t) 0);
145+
}
146+
147+
// pop from empty list
148+
EXPECT_EQ(sll_pop(&sll), nullptr);
149+
150+
sll_destroy(&sll, nullptr);
151+
}
152+
107153
TEST(SinglyLinkedList, move_first) {
108154
for(uint64_t i = 0; i < 5; i++) {
109155
/* push 3 items into src */

0 commit comments

Comments
 (0)