From 1d76da1d87d0908f9b24d865b064ab812e491e79 Mon Sep 17 00:00:00 2001 From: Stella Lau Date: Tue, 1 Aug 2017 12:24:55 -0700 Subject: [PATCH] Replace marker with queueEmpty variable and update pool.h comment --- lib/common/pool.c | 35 +++++++++++++++-------------------- lib/common/pool.h | 1 - 2 files changed, 15 insertions(+), 21 deletions(-) diff --git a/lib/common/pool.c b/lib/common/pool.c index f51beccc..e25b1d75 100644 --- a/lib/common/pool.c +++ b/lib/common/pool.c @@ -13,8 +13,6 @@ #include /* malloc, calloc, free */ #include "pool.h" -#include - /* ====== Compiler specifics ====== */ #if defined(_MSC_VER) # pragma warning(disable : 4204) /* disable: C4204: non-constant aggregate initializer */ @@ -36,17 +34,16 @@ struct POOL_ctx_s { pthread_t *threads; size_t numThreads; - size_t numThreadsBusy; - /* The queue is a circular buffer */ POOL_job *queue; size_t queueHead; size_t queueTail; size_t queueSize; - size_t jobsQueued; - - size_t marker; + /* The number of threads working on jobs */ + size_t numThreadsBusy; + /* Indicates if the queue is empty */ + int queueEmpty; /* The mutex protects the queue */ pthread_mutex_t queueMutex; @@ -69,12 +66,11 @@ static void* POOL_thread(void* opaque) { for (;;) { /* Lock the mutex and wait for a non-empty queue or until shutdown */ pthread_mutex_lock(&ctx->queueMutex); -// while (ctx->queueHead == ctx->queueTail && !ctx->shutdown) { - while (!ctx->jobsQueued && !ctx->shutdown && !ctx->marker) { + while (ctx->queueEmpty && !ctx->shutdown) { pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex); } /* empty => shutting down: so stop */ - if (!ctx->jobsQueued && !ctx->marker) { + if (ctx->queueEmpty) { pthread_mutex_unlock(&ctx->queueMutex); return opaque; } @@ -82,9 +78,8 @@ static void* POOL_thread(void* opaque) { { POOL_job const job = ctx->queue[ctx->queueHead]; ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize; - ctx->jobsQueued--; ctx->numThreadsBusy++; - ctx->marker = 0; + ctx->queueEmpty = ctx->queueHead == ctx->queueTail; /* Unlock the mutex, signal a pusher, and run the job */ pthread_mutex_unlock(&ctx->queueMutex); pthread_cond_signal(&ctx->queuePushCond); @@ -114,8 +109,7 @@ POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) { ctx->queueHead = 0; ctx->queueTail = 0; ctx->numThreadsBusy = 0; - ctx->jobsQueued = 0; - ctx->marker = 0; + ctx->queueEmpty = 1; (void)pthread_mutex_init(&ctx->queueMutex, NULL); (void)pthread_cond_init(&ctx->queuePushCond, NULL); (void)pthread_cond_init(&ctx->queuePopCond, NULL); @@ -180,22 +174,23 @@ void POOL_add(void* ctxVoid, POOL_function function, void *opaque) { pthread_mutex_lock(&ctx->queueMutex); { POOL_job const job = {function, opaque}; - /* Wait until there is space in the queue for the new job */ + + // Wait until there is space in the queue for the new job. + // If the ctx->queueSize is 1 (the pool was created with an + // intended queueSize of 0) and there is no job already waiting, + // wait until there is a thread free for the new job. size_t newTail = (ctx->queueTail + 1) % ctx->queueSize; while (ctx->queueHead == newTail && !ctx->shutdown && (ctx->queueSize > 1 || ctx->numThreadsBusy == ctx->numThreads || - ctx->marker)) { + !ctx->queueEmpty)) { pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex); newTail = (ctx->queueTail + 1) % ctx->queueSize; } /* The queue is still going => there is space */ if (!ctx->shutdown) { - if (ctx->queueSize > 1) { - ctx->marker = 1; - } + ctx->queueEmpty = 0; ctx->queue[ctx->queueTail] = job; ctx->queueTail = newTail; - ctx->jobsQueued++; } } pthread_mutex_unlock(&ctx->queueMutex); diff --git a/lib/common/pool.h b/lib/common/pool.h index 957100f4..ed271195 100644 --- a/lib/common/pool.h +++ b/lib/common/pool.h @@ -22,7 +22,6 @@ typedef struct POOL_ctx_s POOL_ctx; * Create a thread pool with at most `numThreads` threads. * `numThreads` must be at least 1. * The maximum number of queued jobs before blocking is `queueSize`. - * `queueSize` must be at least 1. * @return : POOL_ctx pointer on success, else NULL. */ POOL_ctx *POOL_create(size_t numThreads, size_t queueSize);