Fix pool for threading.h

dev
Nick Terrell 2016-12-31 19:10:47 -05:00
parent 4204e03e77
commit bb13387d7d
2 changed files with 10 additions and 10 deletions

View File

@ -51,21 +51,21 @@ static void* POOL_thread(void* opaque) {
if (!ctx) { return NULL; }
for (;;) {
/* Lock the mutex and wait for a non-empty queue or until shutdown */
if (pthread_mutex_lock(&ctx->queueMutex)) { return NULL; }
pthread_mutex_lock(&ctx->queueMutex);
while (ctx->queueHead == ctx->queueTail && !ctx->shutdown) {
if (pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex)) { return NULL; }
pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
}
/* empty => shutting down: so stop */
if (ctx->queueHead == ctx->queueTail) {
if (pthread_mutex_unlock(&ctx->queueMutex)) { return NULL; }
pthread_mutex_unlock(&ctx->queueMutex);
return opaque;
}
/* Pop a job off the queue */
{ POOL_job const job = ctx->queue[ctx->queueHead];
ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
/* Unlock the mutex, signal a pusher, and run the job */
if (pthread_mutex_unlock(&ctx->queueMutex)) { return NULL; }
if (pthread_cond_signal(&ctx->queuePushCond)) { return NULL; }
pthread_mutex_unlock(&ctx->queueMutex);
pthread_cond_signal(&ctx->queuePushCond);
job.function(job.opaque);
}
}
@ -73,7 +73,6 @@ static void* POOL_thread(void* opaque) {
}
POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) {
int err = 0;
POOL_ctx *ctx;
/* Check the parameters */
if (!numThreads || !queueSize) { return NULL; }
@ -88,15 +87,15 @@ POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) {
ctx->queue = (POOL_job *)malloc(ctx->queueSize * sizeof(POOL_job));
ctx->queueHead = 0;
ctx->queueTail = 0;
err |= pthread_mutex_init(&ctx->queueMutex, NULL);
err |= pthread_cond_init(&ctx->queuePushCond, NULL);
err |= pthread_cond_init(&ctx->queuePopCond, NULL);
pthread_mutex_init(&ctx->queueMutex, NULL);
pthread_cond_init(&ctx->queuePushCond, NULL);
pthread_cond_init(&ctx->queuePopCond, NULL);
ctx->shutdown = 0;
/* Allocate space for the thread handles */
ctx->threads = (pthread_t *)malloc(numThreads * sizeof(pthread_t));
ctx->numThreads = 0;
/* Check for errors */
if (!ctx->threads || !ctx->queue || err) { POOL_free(ctx); return NULL; }
if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; }
/* Initialize the threads */
{ size_t i;
for (i = 0; i < numThreads; ++i) {

View File

@ -39,6 +39,7 @@ typedef void (*POOL_add_function)(void *, POOL_function, void *);
/*! POOL_add() :
Add the job `function(opaque)` to the thread pool.
Possibly blocks until there is room in the queue.
Note : The function may be executed asynchronously, so `opaque` must live until the function has been completed.
*/
void POOL_add(void *ctx, POOL_function function, void *opaque);