Skip to content

Commit 1068e2e

Browse files
committed
QPTPool_enqueue_front
pushes to front of claimed queue, not waiting queue
1 parent 30bef1b commit 1068e2e

File tree

2 files changed

+82
-11
lines changed

2 files changed

+82
-11
lines changed

include/QueuePerThreadPool.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ OF SUCH DAMAGE.
6565
#ifndef QUEUE_PER_THREAD_POOL_H
6666
#define QUEUE_PER_THREAD_POOL_H
6767

68-
#include <inttypes.h>
68+
#include <stddef.h> /* size_t */
69+
#include <stdint.h> /* uint64 */
6970

7071
#ifdef __cplusplus
7172
extern "C" {
@@ -151,6 +152,7 @@ typedef enum QPTPool_enqueue_dst {
151152
#ifdef QPTPOOL_SWAP
152153
QPTPool_enqueue_SWAP,
153154
#endif
155+
QPTPool_enqueue_CLAIMED,
154156
} QPTPool_enqueue_dst_t;
155157

156158
#ifdef QPTPOOL_SWAP
@@ -181,6 +183,9 @@ int QPTPool_generic_alloc_and_deserialize(const int fd, QPTPool_f *func, void **
181183
QPTPool_enqueue_dst_t QPTPool_enqueue(QPTPool_ctx_t *ctx,
182184
QPTPool_f func, void *new_work);
183185

186+
QPTPool_enqueue_dst_t QPTPool_enqueue_front(QPTPool_ctx_t *ctx,
187+
QPTPool_f func, void *new_work);
188+
184189
#ifdef QPTPOOL_SWAP
185190
/*
186191
* Enqueue data and a function to process the data. Pushes to

src/QueuePerThreadPool.c

Lines changed: 76 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -251,23 +251,48 @@ static inline void maybe_steal_work(QPTPool_t *pool, QPTPoolThreadData_t *tw, si
251251

252252
/*
253253
* wait_for_work() -
254-
* Assumes pool->mutex and tw->mutex are locked.
254+
* Assumes pool->mutex, tw->mutex, and tw->claimed are locked.
255255
*/
256256
static void wait_for_work(QPTPool_t *pool, QPTPoolThreadData_t *tw) {
257257
while (
258-
/* running, but no work in pool or current thread */
259-
((pool->state == RUNNING) && ((!pool->incomplete && !pool->swapped) ||
260-
!tw->waiting.size)) ||
261-
/*
262-
* not running and still have work in
263-
* other threads, just not this one
264-
*/
265-
((pool->state == STOPPING) && (pool->incomplete || pool->swapped) &&
266-
!tw->waiting.size)
258+
(
259+
/* running but no work, so sit here */
260+
(pool->state == RUNNING) &&
261+
(
262+
/* no claimed or waiting work in this thread */
263+
(!tw->claimed.size && !tw->waiting.size)
264+
265+
/*
266+
* OR because if there is work in other threads, sit
267+
* here and wait for signal instead of looping
268+
*/
269+
||
270+
271+
/* no work in pool */
272+
(!pool->incomplete && !pool->swapped)
273+
)
274+
)
275+
||
276+
(
277+
/*
278+
* stopping but still have work in
279+
* other threads, just not this one
280+
*/
281+
(pool->state == STOPPING) &&
282+
(
283+
!tw->claimed.size &&
284+
285+
!tw->waiting.size &&
286+
287+
(pool->incomplete || pool->swapped)
288+
)
289+
)
267290
) {
291+
pthread_mutex_unlock(&tw->claimed_mutex);
268292
pthread_mutex_unlock(&pool->mutex);
269293
pthread_cond_wait(&tw->cv, &tw->mutex);
270294
pthread_mutex_lock(&pool->mutex);
295+
pthread_mutex_lock(&tw->claimed_mutex);
271296
}
272297
}
273298

@@ -336,7 +361,10 @@ static void *worker_function(void *args) {
336361
pthread_mutex_lock(&pool->mutex);
337362

338363
maybe_steal_work(pool, tw, ctx->id);
364+
365+
pthread_mutex_lock(&tw->claimed_mutex); /* lock claimed_mutex after stealing work */
339366
wait_for_work(pool, tw);
367+
pthread_mutex_unlock(&tw->claimed_mutex);
340368

341369
/* if stopping and entire thread pool is empty, this thread can exit */
342370
if ((pool->state == STOPPING) && !pool->incomplete && !pool->swapped) {
@@ -787,6 +815,44 @@ QPTPool_enqueue_dst_t QPTPool_enqueue(QPTPool_ctx_t *ctx, QPTPool_f func, void *
787815
return ret;
788816
}
789817

818+
/* id selects the next_queue variable to use, not where the work will be placed */
819+
QPTPool_enqueue_dst_t QPTPool_enqueue_front(QPTPool_ctx_t *ctx, QPTPool_f func, void *new_work) {
820+
/* Not checking arguments */
821+
822+
struct queue_item *qi = malloc(sizeof(struct queue_item));
823+
qi->func = func; /* if no function is provided, the thread will segfault when it processes this item */
824+
qi->work = new_work;
825+
826+
QPTPool_t *pool = ctx->pool;
827+
const size_t id = ctx->id;
828+
829+
QPTPoolThreadData_t *data = &pool->data[id];
830+
pthread_mutex_lock(&data->mutex);
831+
QPTPoolThreadData_t *next = &pool->data[data->next_queue];
832+
833+
/* have to calculate next_queue before new_work is modified */
834+
data->next_queue = pool->next.func(id, data->next_queue, pool->nthreads,
835+
new_work, pool->next.args);
836+
pthread_mutex_unlock(&data->mutex);
837+
838+
QPTPool_enqueue_dst_t ret = QPTPool_enqueue_ERROR;
839+
840+
/* push to the front of claimed queue */
841+
pthread_mutex_lock(&next->mutex);
842+
pthread_mutex_lock(&next->claimed_mutex);
843+
sll_push_front(&next->claimed, qi);
844+
ret = QPTPool_enqueue_CLAIMED;
845+
pthread_mutex_lock(&pool->mutex);
846+
pool->incomplete++;
847+
pthread_mutex_unlock(&pool->mutex);
848+
849+
pthread_cond_broadcast(&next->cv);
850+
pthread_mutex_unlock(&next->claimed_mutex);
851+
pthread_mutex_unlock(&next->mutex);
852+
853+
return ret;
854+
}
855+
790856
#ifdef QPTPOOL_SWAP
791857
static int write_swap(struct Swap *swap,
792858
QPTPool_serialize_and_free_f serialize,

0 commit comments

Comments
 (0)