@@ -76,6 +76,9 @@ typedef enum {
76
76
77
77
/* The context for a single thread in QPTPool */
78
78
typedef struct QPTPoolThreadData {
79
+ sll_t active ; /* work items that have already been claimed by this thread */
80
+ pthread_mutex_t active_mutex ;
81
+
79
82
sll_t waiting ; /* generally push into this queue */
80
83
sll_t deferred ; /* push into here if waiting queue is too big; pop when waiting queue is empty */
81
84
pthread_mutex_t mutex ;
@@ -163,6 +166,49 @@ static uint64_t steal_work(QPTPool_t *ctx, const size_t id,
163
166
return 0 ;
164
167
}
165
168
169
+ /*
170
+ * QueuePerThreadPool normally claims an entire work queue at once
171
+ * instead of popping them off one at a time. This reduces the
172
+ * contention on the work queues. However, this scheme makes it
173
+ * possible to cause starvation when a work item takes a long time to
174
+ * complete and has more work items queued up after it, all of which
175
+ * have already been claimed and are not in the waiting or deferred
176
+ * queues.
177
+ *
178
+ * This function attempts to prevent starvation by stealing work that
179
+ * has already been claimed, reproducing the effect of threads popping
180
+ * off individual work items. This function is only called after the
181
+ * waiting and deferred queues have been checked and found to be
182
+ * empty, and so should not be called frequently.
183
+ */
184
+ static uint64_t steal_active (QPTPool_t * ctx , const size_t id ,
185
+ const size_t start , const size_t end ) {
186
+ QPTPoolThreadData_t * tw = & ctx -> data [id ];
187
+
188
+ for (size_t i = start ; i < end ; i ++ ) {
189
+ if (i == id ) {
190
+ continue ;
191
+ }
192
+
193
+ QPTPoolThreadData_t * target = & ctx -> data [i ];
194
+
195
+ if (pthread_mutex_trylock (& target -> active_mutex ) == 0 ) {
196
+ if (target -> active .size ) {
197
+ const uint64_t active = target -> active .size * ctx -> steal .num / ctx -> steal .denom ;
198
+ if (active ) {
199
+ sll_move_first (& tw -> waiting , & target -> active , active );
200
+ pthread_mutex_unlock (& target -> active_mutex );
201
+ return active ;
202
+ }
203
+ }
204
+
205
+ pthread_mutex_unlock (& target -> active_mutex );
206
+ }
207
+ }
208
+
209
+ return 0 ;
210
+ }
211
+
166
212
static void * worker_function (void * args ) {
167
213
timestamp_create_start (wf );
168
214
@@ -176,10 +222,6 @@ static void *worker_function(void *args) {
176
222
QPTPoolThreadData_t * tw = & ctx -> data [id ];
177
223
178
224
while (1 ) {
179
- timestamp_create_start (wf_sll_init );
180
- sll_t being_processed ; /* don't bother initializing */
181
- timestamp_set_end (wf_sll_init );
182
-
183
225
timestamp_create_start (wf_tw_mutex_lock );
184
226
pthread_mutex_lock (& tw -> mutex );
185
227
timestamp_set_end (wf_tw_mutex_lock );
@@ -197,7 +239,19 @@ static void *worker_function(void *args) {
197
239
!tw -> waiting .size && !tw -> deferred .size &&
198
240
ctx -> incomplete ) {
199
241
if (steal_work (ctx , id , tw -> steal_from , ctx -> nthreads ) == 0 ) {
200
- steal_work (ctx , id , 0 , tw -> steal_from );
242
+ if (steal_work (ctx , id , 0 , tw -> steal_from ) == 0 ) {
243
+ /*
244
+ * if still can't find anything, try the active queue
245
+ *
246
+ * this should only be called if there is some
247
+ * work that is taking so long that the rest of
248
+ * the threads have run out of work, so this
249
+ * should not happen too often
250
+ */
251
+ if (steal_active (ctx , id , tw -> steal_from , ctx -> nthreads ) == 0 ) {
252
+ steal_active (ctx , id , 0 , tw -> steal_from );
253
+ }
254
+ }
201
255
}
202
256
203
257
tw -> steal_from = (tw -> steal_from + 1 ) % ctx -> nthreads ;
@@ -232,18 +286,20 @@ static void *worker_function(void *args) {
232
286
233
287
/* move entire queue into work and clear out queue */
234
288
timestamp_create_start (wf_move_queue );
289
+ pthread_mutex_lock (& tw -> active_mutex );
235
290
if (tw -> waiting .size ) {
236
- sll_move (& being_processed , & tw -> waiting );
291
+ sll_move (& tw -> active , & tw -> waiting );
237
292
}
238
293
else {
239
- sll_move (& being_processed , & tw -> deferred );
294
+ sll_move (& tw -> active , & tw -> deferred );
240
295
}
296
+ pthread_mutex_unlock (& tw -> active_mutex );
241
297
timestamp_set_end (wf_move_queue );
242
298
243
299
#if defined(DEBUG ) && defined (QPTPOOL_QUEUE_SIZE )
244
300
pthread_mutex_lock (& ctx -> mutex );
245
301
pthread_mutex_lock (& print_mutex );
246
- tw -> waiting .size = being_processed .size ;
302
+ tw -> waiting .size = tw -> active .size ;
247
303
248
304
struct timespec now ;
249
305
clock_gettime (CLOCK_MONOTONIC , & now );
@@ -269,19 +325,35 @@ static void *worker_function(void *args) {
269
325
/* process all work */
270
326
size_t work_count = 0 ;
271
327
328
+ /*
329
+ * pop work item off before it is processed so that if another
330
+ * thread steals from the active queue, the current active
331
+ * work will not be re-run
332
+ *
333
+ * this has the side effect of moving 2 frees into the loop
334
+ * instead of batching all of them after processing the work
335
+ *
336
+ * tradeoffs:
337
+ * more locking
338
+ * delayed work
339
+ * lower memory utilization
340
+ */
272
341
timestamp_create_start (wf_get_queue_head );
273
- sll_node_t * w = sll_head_node (& being_processed );
342
+ pthread_mutex_lock (& tw -> active_mutex );
343
+ struct queue_item * qi = (struct queue_item * ) sll_pop (& tw -> active );
344
+ pthread_mutex_unlock (& tw -> active_mutex );
274
345
timestamp_end_print (ctx -> debug_buffers , id , "wf_get_queue_head" , wf_get_queue_head );
275
346
276
- while (w ) {
347
+ while (qi ) {
277
348
timestamp_create_start (wf_process_work );
278
- struct queue_item * qi = sll_node_data (w );
279
-
280
349
tw -> threads_successful += !qi -> func (ctx , id , qi -> work , ctx -> args );
350
+ free (qi );
281
351
timestamp_end_print (ctx -> debug_buffers , id , "wf_process_work" , wf_process_work );
282
352
283
353
timestamp_create_start (wf_next_work );
284
- w = sll_next_node (w );
354
+ pthread_mutex_lock (& tw -> active_mutex );
355
+ qi = (struct queue_item * ) sll_pop (& tw -> active );
356
+ pthread_mutex_unlock (& tw -> active_mutex );
285
357
timestamp_end_print (ctx -> debug_buffers , id , "wf_next_work" , wf_next_work );
286
358
287
359
work_count ++ ;
@@ -290,16 +362,13 @@ static void *worker_function(void *args) {
290
362
timestamp_set_end (wf_process_queue );
291
363
292
364
timestamp_create_start (wf_cleanup );
293
- sll_destroy (& being_processed , free );
294
365
tw -> threads_started += work_count ;
295
-
296
366
pthread_mutex_lock (& ctx -> mutex );
297
367
ctx -> incomplete -= work_count ;
298
368
pthread_mutex_unlock (& ctx -> mutex );
299
369
timestamp_set_end (wf_cleanup );
300
370
301
371
#if defined(DEBUG ) && defined(PER_THREAD_STATS )
302
- timestamp_print (ctx -> debug_buffers , id , "wf_sll_init" , wf_sll_init );
303
372
timestamp_print (ctx -> debug_buffers , id , "wf_tw_mutex_lock" , wf_tw_mutex_lock );
304
373
timestamp_print (ctx -> debug_buffers , id , "wf_ctx_mutex_lock" , wf_ctx_mutex_lock );
305
374
timestamp_print (ctx -> debug_buffers , id , "wf_wait" , wf_wait );
@@ -366,6 +435,8 @@ QPTPool_t *QPTPool_init(const size_t nthreads, void *args) {
366
435
/* set up thread data, but not threads */
367
436
for (size_t i = 0 ; i < nthreads ; i ++ ) {
368
437
QPTPoolThreadData_t * data = & ctx -> data [i ];
438
+ sll_init (& data -> active );
439
+ pthread_mutex_init (& data -> active_mutex , NULL );
369
440
sll_init (& data -> waiting );
370
441
sll_init (& data -> deferred );
371
442
pthread_mutex_init (& data -> mutex , NULL );
@@ -702,6 +773,8 @@ void QPTPool_destroy(QPTPool_t *ctx) {
702
773
*/
703
774
sll_destroy (& data -> deferred , free );
704
775
sll_destroy (& data -> waiting , free );
776
+ pthread_mutex_destroy (& data -> active_mutex );
777
+ sll_destroy (& data -> active , free );
705
778
}
706
779
707
780
pthread_mutex_destroy (& ctx -> mutex );
0 commit comments