Replace marker with queueEmpty variable and update pool.h comment

This commit is contained in:
Stella Lau 2017-08-01 12:24:55 -07:00
parent 5adceeed01
commit 1d76da1d87
2 changed files with 15 additions and 21 deletions

View File

@ -13,8 +13,6 @@
#include <stdlib.h> /* malloc, calloc, free */ #include <stdlib.h> /* malloc, calloc, free */
#include "pool.h" #include "pool.h"
#include <stdio.h>
/* ====== Compiler specifics ====== */ /* ====== Compiler specifics ====== */
#if defined(_MSC_VER) #if defined(_MSC_VER)
# pragma warning(disable : 4204) /* disable: C4204: non-constant aggregate initializer */ # pragma warning(disable : 4204) /* disable: C4204: non-constant aggregate initializer */
@ -36,17 +34,16 @@ struct POOL_ctx_s {
pthread_t *threads; pthread_t *threads;
size_t numThreads; size_t numThreads;
size_t numThreadsBusy;
/* The queue is a circular buffer */ /* The queue is a circular buffer */
POOL_job *queue; POOL_job *queue;
size_t queueHead; size_t queueHead;
size_t queueTail; size_t queueTail;
size_t queueSize; size_t queueSize;
size_t jobsQueued; /* The number of threads working on jobs */
size_t numThreadsBusy;
size_t marker; /* Indicates if the queue is empty */
int queueEmpty;
/* The mutex protects the queue */ /* The mutex protects the queue */
pthread_mutex_t queueMutex; pthread_mutex_t queueMutex;
@ -69,12 +66,11 @@ static void* POOL_thread(void* opaque) {
for (;;) { for (;;) {
/* Lock the mutex and wait for a non-empty queue or until shutdown */ /* Lock the mutex and wait for a non-empty queue or until shutdown */
pthread_mutex_lock(&ctx->queueMutex); pthread_mutex_lock(&ctx->queueMutex);
// while (ctx->queueHead == ctx->queueTail && !ctx->shutdown) { while (ctx->queueEmpty && !ctx->shutdown) {
while (!ctx->jobsQueued && !ctx->shutdown && !ctx->marker) {
pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex); pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
} }
/* empty => shutting down: so stop */ /* empty => shutting down: so stop */
if (!ctx->jobsQueued && !ctx->marker) { if (ctx->queueEmpty) {
pthread_mutex_unlock(&ctx->queueMutex); pthread_mutex_unlock(&ctx->queueMutex);
return opaque; return opaque;
} }
@ -82,9 +78,8 @@ static void* POOL_thread(void* opaque) {
{ {
POOL_job const job = ctx->queue[ctx->queueHead]; POOL_job const job = ctx->queue[ctx->queueHead];
ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize; ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
ctx->jobsQueued--;
ctx->numThreadsBusy++; ctx->numThreadsBusy++;
ctx->marker = 0; ctx->queueEmpty = ctx->queueHead == ctx->queueTail;
/* Unlock the mutex, signal a pusher, and run the job */ /* Unlock the mutex, signal a pusher, and run the job */
pthread_mutex_unlock(&ctx->queueMutex); pthread_mutex_unlock(&ctx->queueMutex);
pthread_cond_signal(&ctx->queuePushCond); pthread_cond_signal(&ctx->queuePushCond);
@ -114,8 +109,7 @@ POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) {
ctx->queueHead = 0; ctx->queueHead = 0;
ctx->queueTail = 0; ctx->queueTail = 0;
ctx->numThreadsBusy = 0; ctx->numThreadsBusy = 0;
ctx->jobsQueued = 0; ctx->queueEmpty = 1;
ctx->marker = 0;
(void)pthread_mutex_init(&ctx->queueMutex, NULL); (void)pthread_mutex_init(&ctx->queueMutex, NULL);
(void)pthread_cond_init(&ctx->queuePushCond, NULL); (void)pthread_cond_init(&ctx->queuePushCond, NULL);
(void)pthread_cond_init(&ctx->queuePopCond, NULL); (void)pthread_cond_init(&ctx->queuePopCond, NULL);
@ -180,22 +174,23 @@ void POOL_add(void* ctxVoid, POOL_function function, void *opaque) {
pthread_mutex_lock(&ctx->queueMutex); pthread_mutex_lock(&ctx->queueMutex);
{ POOL_job const job = {function, opaque}; { POOL_job const job = {function, opaque};
/* Wait until there is space in the queue for the new job */
// Wait until there is space in the queue for the new job.
// If the ctx->queueSize is 1 (the pool was created with an
// intended queueSize of 0) and there is no job already waiting,
// wait until there is a thread free for the new job.
size_t newTail = (ctx->queueTail + 1) % ctx->queueSize; size_t newTail = (ctx->queueTail + 1) % ctx->queueSize;
while (ctx->queueHead == newTail && !ctx->shutdown && while (ctx->queueHead == newTail && !ctx->shutdown &&
(ctx->queueSize > 1 || ctx->numThreadsBusy == ctx->numThreads || (ctx->queueSize > 1 || ctx->numThreadsBusy == ctx->numThreads ||
ctx->marker)) { !ctx->queueEmpty)) {
pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex); pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
newTail = (ctx->queueTail + 1) % ctx->queueSize; newTail = (ctx->queueTail + 1) % ctx->queueSize;
} }
/* The queue is still going => there is space */ /* The queue is still going => there is space */
if (!ctx->shutdown) { if (!ctx->shutdown) {
if (ctx->queueSize > 1) { ctx->queueEmpty = 0;
ctx->marker = 1;
}
ctx->queue[ctx->queueTail] = job; ctx->queue[ctx->queueTail] = job;
ctx->queueTail = newTail; ctx->queueTail = newTail;
ctx->jobsQueued++;
} }
} }
pthread_mutex_unlock(&ctx->queueMutex); pthread_mutex_unlock(&ctx->queueMutex);

View File

@ -22,7 +22,6 @@ typedef struct POOL_ctx_s POOL_ctx;
* Create a thread pool with at most `numThreads` threads. * Create a thread pool with at most `numThreads` threads.
* `numThreads` must be at least 1. * `numThreads` must be at least 1.
* The maximum number of queued jobs before blocking is `queueSize`. * The maximum number of queued jobs before blocking is `queueSize`.
* `queueSize` must be at least 1.
* @return : POOL_ctx pointer on success, else NULL. * @return : POOL_ctx pointer on success, else NULL.
*/ */
POOL_ctx *POOL_create(size_t numThreads, size_t queueSize); POOL_ctx *POOL_create(size_t numThreads, size_t queueSize);