Merge pull request #781 from stellamplau/qSize

Allow queueSize=0 in pool.c
dev
Yann Collet 2017-08-07 13:55:19 -07:00 committed by GitHub
commit 6d0a5c5728
3 changed files with 76 additions and 13 deletions

View File

@ -39,6 +39,12 @@ struct POOL_ctx_s {
size_t queueHead; size_t queueHead;
size_t queueTail; size_t queueTail;
size_t queueSize; size_t queueSize;
/* The number of threads working on jobs */
size_t numThreadsBusy;
/* Indicates if the queue is empty */
int queueEmpty;
/* The mutex protects the queue */ /* The mutex protects the queue */
pthread_mutex_t queueMutex; pthread_mutex_t queueMutex;
/* Condition variable for pushers to wait on when the queue is full */ /* Condition variable for pushers to wait on when the queue is full */
@ -60,21 +66,37 @@ static void* POOL_thread(void* opaque) {
for (;;) { for (;;) {
/* Lock the mutex and wait for a non-empty queue or until shutdown */ /* Lock the mutex and wait for a non-empty queue or until shutdown */
pthread_mutex_lock(&ctx->queueMutex); pthread_mutex_lock(&ctx->queueMutex);
while (ctx->queueHead == ctx->queueTail && !ctx->shutdown) {
while (ctx->queueEmpty && !ctx->shutdown) {
pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex); pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
} }
/* empty => shutting down: so stop */ /* empty => shutting down: so stop */
if (ctx->queueHead == ctx->queueTail) { if (ctx->queueEmpty) {
pthread_mutex_unlock(&ctx->queueMutex); pthread_mutex_unlock(&ctx->queueMutex);
return opaque; return opaque;
} }
/* Pop a job off the queue */ /* Pop a job off the queue */
{ POOL_job const job = ctx->queue[ctx->queueHead]; {
POOL_job const job = ctx->queue[ctx->queueHead];
ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize; ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
ctx->numThreadsBusy++;
ctx->queueEmpty = ctx->queueHead == ctx->queueTail;
/* Unlock the mutex, signal a pusher, and run the job */ /* Unlock the mutex, signal a pusher, and run the job */
pthread_mutex_unlock(&ctx->queueMutex); pthread_mutex_unlock(&ctx->queueMutex);
if (ctx->queueSize > 1) {
pthread_cond_signal(&ctx->queuePushCond); pthread_cond_signal(&ctx->queuePushCond);
}
job.function(job.opaque); job.function(job.opaque);
/* If the intended queue size was 0, signal after finishing job */
if (ctx->queueSize == 1) {
pthread_mutex_lock(&ctx->queueMutex);
ctx->numThreadsBusy--;
pthread_mutex_unlock(&ctx->queueMutex);
pthread_cond_signal(&ctx->queuePushCond);
}
} }
} }
/* Unreachable */ /* Unreachable */
@ -83,7 +105,7 @@ static void* POOL_thread(void* opaque) {
POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) { POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) {
POOL_ctx *ctx; POOL_ctx *ctx;
/* Check the parameters */ /* Check the parameters */
if (!numThreads || !queueSize) { return NULL; } if (!numThreads) { return NULL; }
/* Allocate the context and zero initialize */ /* Allocate the context and zero initialize */
ctx = (POOL_ctx *)calloc(1, sizeof(POOL_ctx)); ctx = (POOL_ctx *)calloc(1, sizeof(POOL_ctx));
if (!ctx) { return NULL; } if (!ctx) { return NULL; }
@ -95,6 +117,8 @@ POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) {
ctx->queue = (POOL_job*) malloc(ctx->queueSize * sizeof(POOL_job)); ctx->queue = (POOL_job*) malloc(ctx->queueSize * sizeof(POOL_job));
ctx->queueHead = 0; ctx->queueHead = 0;
ctx->queueTail = 0; ctx->queueTail = 0;
ctx->numThreadsBusy = 0;
ctx->queueEmpty = 1;
(void)pthread_mutex_init(&ctx->queueMutex, NULL); (void)pthread_mutex_init(&ctx->queueMutex, NULL);
(void)pthread_cond_init(&ctx->queuePushCond, NULL); (void)pthread_cond_init(&ctx->queuePushCond, NULL);
(void)pthread_cond_init(&ctx->queuePopCond, NULL); (void)pthread_cond_init(&ctx->queuePopCond, NULL);
@ -153,22 +177,37 @@ size_t POOL_sizeof(POOL_ctx *ctx) {
+ ctx->numThreads * sizeof(pthread_t); + ctx->numThreads * sizeof(pthread_t);
} }
/**
* Returns 1 if the queue is full and 0 otherwise.
*
* If the queueSize is 1 (the pool was created with an intended queueSize of 0),
* then a queue is empty if there is a thread free and no job is waiting.
*/
static int isQueueFull(POOL_ctx const* ctx) {
if (ctx->queueSize > 1) {
return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize);
} else {
return ctx->numThreadsBusy == ctx->numThreads ||
!ctx->queueEmpty;
}
}
void POOL_add(void* ctxVoid, POOL_function function, void *opaque) { void POOL_add(void* ctxVoid, POOL_function function, void *opaque) {
POOL_ctx* const ctx = (POOL_ctx*)ctxVoid; POOL_ctx* const ctx = (POOL_ctx*)ctxVoid;
if (!ctx) { return; } if (!ctx) { return; }
pthread_mutex_lock(&ctx->queueMutex); pthread_mutex_lock(&ctx->queueMutex);
{ POOL_job const job = {function, opaque}; { POOL_job const job = {function, opaque};
/* Wait until there is space in the queue for the new job */ /* Wait until there is space in the queue for the new job */
size_t newTail = (ctx->queueTail + 1) % ctx->queueSize; while (isQueueFull(ctx) && !ctx->shutdown) {
while (ctx->queueHead == newTail && !ctx->shutdown) {
pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex); pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
newTail = (ctx->queueTail + 1) % ctx->queueSize;
} }
/* The queue is still going => there is space */ /* The queue is still going => there is space */
if (!ctx->shutdown) { if (!ctx->shutdown) {
ctx->queueEmpty = 0;
ctx->queue[ctx->queueTail] = job; ctx->queue[ctx->queueTail] = job;
ctx->queueTail = newTail; ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize;
} }
} }
pthread_mutex_unlock(&ctx->queueMutex); pthread_mutex_unlock(&ctx->queueMutex);

View File

@ -22,7 +22,6 @@ typedef struct POOL_ctx_s POOL_ctx;
* Create a thread pool with at most `numThreads` threads. * Create a thread pool with at most `numThreads` threads.
* `numThreads` must be at least 1. * `numThreads` must be at least 1.
* The maximum number of queued jobs before blocking is `queueSize`. * The maximum number of queued jobs before blocking is `queueSize`.
* `queueSize` must be at least 1.
* @return : POOL_ctx pointer on success, else NULL. * @return : POOL_ctx pointer on success, else NULL.
*/ */
POOL_ctx *POOL_create(size_t numThreads, size_t queueSize); POOL_ctx *POOL_create(size_t numThreads, size_t queueSize);

View File

@ -1,5 +1,6 @@
#include "pool.h" #include "pool.h"
#include "threading.h" #include "threading.h"
#include "util.h"
#include <stddef.h> #include <stddef.h>
#include <stdio.h> #include <stdio.h>
@ -50,21 +51,45 @@ int testOrder(size_t numThreads, size_t queueSize) {
return 0; return 0;
} }
void waitFn(void *opaque) {
(void)opaque;
UTIL_sleepMilli(1);
}
/* Tests for deadlock */
int testWait(size_t numThreads, size_t queueSize) {
struct data data;
POOL_ctx *ctx = POOL_create(numThreads, queueSize);
ASSERT_TRUE(ctx);
{
size_t i;
for (i = 0; i < 16; ++i) {
POOL_add(ctx, &waitFn, &data);
}
}
POOL_free(ctx);
return 0;
}
int main(int argc, const char **argv) { int main(int argc, const char **argv) {
size_t numThreads; size_t numThreads;
for (numThreads = 1; numThreads <= 4; ++numThreads) { for (numThreads = 1; numThreads <= 4; ++numThreads) {
size_t queueSize; size_t queueSize;
for (queueSize = 1; queueSize <= 2; ++queueSize) { for (queueSize = 0; queueSize <= 2; ++queueSize) {
if (testOrder(numThreads, queueSize)) { if (testOrder(numThreads, queueSize)) {
printf("FAIL: testOrder\n"); printf("FAIL: testOrder\n");
return 1; return 1;
} }
if (testWait(numThreads, queueSize)) {
printf("FAIL: testWait\n");
return 1;
}
} }
} }
printf("PASS: testOrder\n"); printf("PASS: testOrder\n");
(void)argc; (void)argc;
(void)argv; (void)argv;
return (POOL_create(0, 1) || POOL_create(1, 0)) ? printf("FAIL: testInvalid\n"), 1 return (POOL_create(0, 1)) ? printf("FAIL: testInvalid\n"), 1
: printf("PASS: testInvalid\n"), 0; : printf("PASS: testInvalid\n"), 0;
return 0; return 0;
} }