From 5adceeed0175e5312c9b68f00494ecd014aa81d6 Mon Sep 17 00:00:00 2001 From: Stella Lau Date: Mon, 31 Jul 2017 10:10:16 -0700 Subject: [PATCH 1/5] Allow queueSize=0 in pool.c and update poolTests --- lib/common/pool.c | 37 ++++++++++++++++++++++++++++++++----- tests/poolTests.c | 6 +++--- 2 files changed, 35 insertions(+), 8 deletions(-) diff --git a/lib/common/pool.c b/lib/common/pool.c index aeaca7e7..f51beccc 100644 --- a/lib/common/pool.c +++ b/lib/common/pool.c @@ -13,6 +13,8 @@ #include /* malloc, calloc, free */ #include "pool.h" +#include + /* ====== Compiler specifics ====== */ #if defined(_MSC_VER) # pragma warning(disable : 4204) /* disable: C4204: non-constant aggregate initializer */ @@ -34,11 +36,18 @@ struct POOL_ctx_s { pthread_t *threads; size_t numThreads; + size_t numThreadsBusy; + /* The queue is a circular buffer */ POOL_job *queue; size_t queueHead; size_t queueTail; size_t queueSize; + + size_t jobsQueued; + + size_t marker; + /* The mutex protects the queue */ pthread_mutex_t queueMutex; /* Condition variable for pushers to wait on when the queue is full */ @@ -60,21 +69,30 @@ static void* POOL_thread(void* opaque) { for (;;) { /* Lock the mutex and wait for a non-empty queue or until shutdown */ pthread_mutex_lock(&ctx->queueMutex); - while (ctx->queueHead == ctx->queueTail && !ctx->shutdown) { +// while (ctx->queueHead == ctx->queueTail && !ctx->shutdown) { + while (!ctx->jobsQueued && !ctx->shutdown && !ctx->marker) { pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex); } /* empty => shutting down: so stop */ - if (ctx->queueHead == ctx->queueTail) { + if (!ctx->jobsQueued && !ctx->marker) { pthread_mutex_unlock(&ctx->queueMutex); return opaque; } /* Pop a job off the queue */ - { POOL_job const job = ctx->queue[ctx->queueHead]; + { + POOL_job const job = ctx->queue[ctx->queueHead]; ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize; + ctx->jobsQueued--; + ctx->numThreadsBusy++; + ctx->marker = 0; /* Unlock the mutex, signal a pusher, and run the job */ pthread_mutex_unlock(&ctx->queueMutex); pthread_cond_signal(&ctx->queuePushCond); job.function(job.opaque); + + pthread_mutex_lock(&ctx->queueMutex); + ctx->numThreadsBusy--; + pthread_mutex_unlock(&ctx->queueMutex); } } /* Unreachable */ @@ -83,7 +101,7 @@ static void* POOL_thread(void* opaque) { POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) { POOL_ctx *ctx; /* Check the parameters */ - if (!numThreads || !queueSize) { return NULL; } + if (!numThreads) { return NULL; } /* Allocate the context and zero initialize */ ctx = (POOL_ctx *)calloc(1, sizeof(POOL_ctx)); if (!ctx) { return NULL; } @@ -95,6 +113,9 @@ POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) { ctx->queue = (POOL_job*) malloc(ctx->queueSize * sizeof(POOL_job)); ctx->queueHead = 0; ctx->queueTail = 0; + ctx->numThreadsBusy = 0; + ctx->jobsQueued = 0; + ctx->marker = 0; (void)pthread_mutex_init(&ctx->queueMutex, NULL); (void)pthread_cond_init(&ctx->queuePushCond, NULL); (void)pthread_cond_init(&ctx->queuePopCond, NULL); @@ -161,14 +182,20 @@ void POOL_add(void* ctxVoid, POOL_function function, void *opaque) { { POOL_job const job = {function, opaque}; /* Wait until there is space in the queue for the new job */ size_t newTail = (ctx->queueTail + 1) % ctx->queueSize; - while (ctx->queueHead == newTail && !ctx->shutdown) { + while (ctx->queueHead == newTail && !ctx->shutdown && + (ctx->queueSize > 1 || ctx->numThreadsBusy == ctx->numThreads || + ctx->marker)) { pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex); newTail = (ctx->queueTail + 1) % ctx->queueSize; } /* The queue is still going => there is space */ if (!ctx->shutdown) { + if (ctx->queueSize > 1) { + ctx->marker = 1; + } ctx->queue[ctx->queueTail] = job; ctx->queueTail = newTail; + ctx->jobsQueued++; } } pthread_mutex_unlock(&ctx->queueMutex); diff --git a/tests/poolTests.c b/tests/poolTests.c index adc5947d..09e6d6ad 100644 --- a/tests/poolTests.c +++ b/tests/poolTests.c @@ -54,7 +54,7 @@ int main(int argc, const char **argv) { size_t numThreads; for (numThreads = 1; numThreads <= 4; ++numThreads) { size_t queueSize; - for (queueSize = 1; queueSize <= 2; ++queueSize) { + for (queueSize = 0; queueSize <= 2; ++queueSize) { if (testOrder(numThreads, queueSize)) { printf("FAIL: testOrder\n"); return 1; @@ -64,7 +64,7 @@ int main(int argc, const char **argv) { printf("PASS: testOrder\n"); (void)argc; (void)argv; - return (POOL_create(0, 1) || POOL_create(1, 0)) ? printf("FAIL: testInvalid\n"), 1 - : printf("PASS: testInvalid\n"), 0; + return (POOL_create(0, 1)) ? printf("FAIL: testInvalid\n"), 1 + : printf("PASS: testInvalid\n"), 0; return 0; } From 1d76da1d87d0908f9b24d865b064ab812e491e79 Mon Sep 17 00:00:00 2001 From: Stella Lau Date: Tue, 1 Aug 2017 12:24:55 -0700 Subject: [PATCH 2/5] Replace marker with queueEmpty variable and update pool.h comment --- lib/common/pool.c | 35 +++++++++++++++-------------------- lib/common/pool.h | 1 - 2 files changed, 15 insertions(+), 21 deletions(-) diff --git a/lib/common/pool.c b/lib/common/pool.c index f51beccc..e25b1d75 100644 --- a/lib/common/pool.c +++ b/lib/common/pool.c @@ -13,8 +13,6 @@ #include /* malloc, calloc, free */ #include "pool.h" -#include - /* ====== Compiler specifics ====== */ #if defined(_MSC_VER) # pragma warning(disable : 4204) /* disable: C4204: non-constant aggregate initializer */ @@ -36,17 +34,16 @@ struct POOL_ctx_s { pthread_t *threads; size_t numThreads; - size_t numThreadsBusy; - /* The queue is a circular buffer */ POOL_job *queue; size_t queueHead; size_t queueTail; size_t queueSize; - size_t jobsQueued; - - size_t marker; + /* The number of threads working on jobs */ + size_t numThreadsBusy; + /* Indicates if the queue is empty */ + int queueEmpty; /* The mutex protects the queue */ pthread_mutex_t queueMutex; @@ -69,12 +66,11 @@ static void* POOL_thread(void* opaque) { for (;;) { /* Lock the mutex and wait for a non-empty queue or until shutdown */ pthread_mutex_lock(&ctx->queueMutex); -// while (ctx->queueHead == ctx->queueTail && !ctx->shutdown) { - while (!ctx->jobsQueued && !ctx->shutdown && !ctx->marker) { + while (ctx->queueEmpty && !ctx->shutdown) { pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex); } /* empty => shutting down: so stop */ - if (!ctx->jobsQueued && !ctx->marker) { + if (ctx->queueEmpty) { pthread_mutex_unlock(&ctx->queueMutex); return opaque; } @@ -82,9 +78,8 @@ static void* POOL_thread(void* opaque) { { POOL_job const job = ctx->queue[ctx->queueHead]; ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize; - ctx->jobsQueued--; ctx->numThreadsBusy++; - ctx->marker = 0; + ctx->queueEmpty = ctx->queueHead == ctx->queueTail; /* Unlock the mutex, signal a pusher, and run the job */ pthread_mutex_unlock(&ctx->queueMutex); pthread_cond_signal(&ctx->queuePushCond); @@ -114,8 +109,7 @@ POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) { ctx->queueHead = 0; ctx->queueTail = 0; ctx->numThreadsBusy = 0; - ctx->jobsQueued = 0; - ctx->marker = 0; + ctx->queueEmpty = 1; (void)pthread_mutex_init(&ctx->queueMutex, NULL); (void)pthread_cond_init(&ctx->queuePushCond, NULL); (void)pthread_cond_init(&ctx->queuePopCond, NULL); @@ -180,22 +174,23 @@ void POOL_add(void* ctxVoid, POOL_function function, void *opaque) { pthread_mutex_lock(&ctx->queueMutex); { POOL_job const job = {function, opaque}; - /* Wait until there is space in the queue for the new job */ + + // Wait until there is space in the queue for the new job. + // If the ctx->queueSize is 1 (the pool was created with an + // intended queueSize of 0) and there is no job already waiting, + // wait until there is a thread free for the new job. size_t newTail = (ctx->queueTail + 1) % ctx->queueSize; while (ctx->queueHead == newTail && !ctx->shutdown && (ctx->queueSize > 1 || ctx->numThreadsBusy == ctx->numThreads || - ctx->marker)) { + !ctx->queueEmpty)) { pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex); newTail = (ctx->queueTail + 1) % ctx->queueSize; } /* The queue is still going => there is space */ if (!ctx->shutdown) { - if (ctx->queueSize > 1) { - ctx->marker = 1; - } + ctx->queueEmpty = 0; ctx->queue[ctx->queueTail] = job; ctx->queueTail = newTail; - ctx->jobsQueued++; } } pthread_mutex_unlock(&ctx->queueMutex); diff --git a/lib/common/pool.h b/lib/common/pool.h index 957100f4..ed271195 100644 --- a/lib/common/pool.h +++ b/lib/common/pool.h @@ -22,7 +22,6 @@ typedef struct POOL_ctx_s POOL_ctx; * Create a thread pool with at most `numThreads` threads. * `numThreads` must be at least 1. * The maximum number of queued jobs before blocking is `queueSize`. - * `queueSize` must be at least 1. * @return : POOL_ctx pointer on success, else NULL. */ POOL_ctx *POOL_create(size_t numThreads, size_t queueSize); From 73ba58955fa238d23bc7ba483c10b722af5e8c3b Mon Sep 17 00:00:00 2001 From: Stella Lau Date: Tue, 1 Aug 2017 20:12:06 -0700 Subject: [PATCH 3/5] Signal after finishing job when queueSize=0 --- lib/common/pool.c | 45 +++++++++++++++++++++++++++++++-------------- 1 file changed, 31 insertions(+), 14 deletions(-) diff --git a/lib/common/pool.c b/lib/common/pool.c index e25b1d75..e140f1e8 100644 --- a/lib/common/pool.c +++ b/lib/common/pool.c @@ -66,6 +66,7 @@ static void* POOL_thread(void* opaque) { for (;;) { /* Lock the mutex and wait for a non-empty queue or until shutdown */ pthread_mutex_lock(&ctx->queueMutex); + while (ctx->queueEmpty && !ctx->shutdown) { pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex); } @@ -82,12 +83,20 @@ static void* POOL_thread(void* opaque) { ctx->queueEmpty = ctx->queueHead == ctx->queueTail; /* Unlock the mutex, signal a pusher, and run the job */ pthread_mutex_unlock(&ctx->queueMutex); - pthread_cond_signal(&ctx->queuePushCond); + + if (ctx->queueSize > 1) { + pthread_cond_signal(&ctx->queuePushCond); + } + job.function(job.opaque); - pthread_mutex_lock(&ctx->queueMutex); - ctx->numThreadsBusy--; - pthread_mutex_unlock(&ctx->queueMutex); + /* If the intended queue size was 0, signal after finishing job */ + if (ctx->queueSize == 1) { + pthread_mutex_lock(&ctx->queueMutex); + ctx->numThreadsBusy--; + pthread_mutex_unlock(&ctx->queueMutex); + pthread_cond_signal(&ctx->queuePushCond); + } } } /* Unreachable */ @@ -168,6 +177,21 @@ size_t POOL_sizeof(POOL_ctx *ctx) { + ctx->numThreads * sizeof(pthread_t); } +/** + * Returns 1 if the queue is full and 0 otherwise. + * + * If the queueSize is 1 (the pool was created with an intended queueSize of 0), + * then a queue is empty if there is a thread free and no job is waiting. + */ +static int isQueueFull(POOL_ctx const* ctx) { + if (ctx->queueSize > 1) { + return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize); + } else { + return ctx->numThreadsBusy == ctx->numThreads || + !ctx->queueEmpty; + } +} + void POOL_add(void* ctxVoid, POOL_function function, void *opaque) { POOL_ctx* const ctx = (POOL_ctx*)ctxVoid; if (!ctx) { return; } @@ -175,22 +199,15 @@ void POOL_add(void* ctxVoid, POOL_function function, void *opaque) { pthread_mutex_lock(&ctx->queueMutex); { POOL_job const job = {function, opaque}; - // Wait until there is space in the queue for the new job. - // If the ctx->queueSize is 1 (the pool was created with an - // intended queueSize of 0) and there is no job already waiting, - // wait until there is a thread free for the new job. - size_t newTail = (ctx->queueTail + 1) % ctx->queueSize; - while (ctx->queueHead == newTail && !ctx->shutdown && - (ctx->queueSize > 1 || ctx->numThreadsBusy == ctx->numThreads || - !ctx->queueEmpty)) { + /* Wait until there is space in the queue for the new job */ + while (isQueueFull(ctx) && !ctx->shutdown) { pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex); - newTail = (ctx->queueTail + 1) % ctx->queueSize; } /* The queue is still going => there is space */ if (!ctx->shutdown) { ctx->queueEmpty = 0; ctx->queue[ctx->queueTail] = job; - ctx->queueTail = newTail; + ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize; } } pthread_mutex_unlock(&ctx->queueMutex); From 1e366f9dea1855c4ee8bd4bb8b4b41e486c95e6c Mon Sep 17 00:00:00 2001 From: Stella Lau Date: Wed, 2 Aug 2017 11:27:50 -0700 Subject: [PATCH 4/5] Add test for deadlock --- tests/poolTests.c | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/tests/poolTests.c b/tests/poolTests.c index 09e6d6ad..d5c37aee 100644 --- a/tests/poolTests.c +++ b/tests/poolTests.c @@ -2,6 +2,7 @@ #include "threading.h" #include #include +#include #define ASSERT_TRUE(p) \ do { \ @@ -50,6 +51,26 @@ int testOrder(size_t numThreads, size_t queueSize) { return 0; } +void waitFn(void *opaque) { + (void)opaque; + usleep(100); +} + +/* Tests for deadlock */ +int testWait(size_t numThreads, size_t queueSize) { + struct data data; + POOL_ctx *ctx = POOL_create(numThreads, queueSize); + ASSERT_TRUE(ctx); + { + size_t i; + for (i = 0; i < 16; ++i) { + POOL_add(ctx, &waitFn, &data); + } + } + POOL_free(ctx); + return 0; +} + int main(int argc, const char **argv) { size_t numThreads; for (numThreads = 1; numThreads <= 4; ++numThreads) { @@ -59,6 +80,10 @@ int main(int argc, const char **argv) { printf("FAIL: testOrder\n"); return 1; } + if (testWait(numThreads, queueSize)) { + printf("FAIL: testWait\n"); + return 1; + } } } printf("PASS: testOrder\n"); From e1abc2a3677c89bd300f96081b07e8c34e69aafd Mon Sep 17 00:00:00 2001 From: Stella Lau Date: Mon, 7 Aug 2017 11:43:37 -0700 Subject: [PATCH 5/5] Switch the sleep function to UTIL_sleepMilli --- tests/poolTests.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/poolTests.c b/tests/poolTests.c index d5c37aee..9e11281b 100644 --- a/tests/poolTests.c +++ b/tests/poolTests.c @@ -1,8 +1,8 @@ #include "pool.h" #include "threading.h" +#include "util.h" #include #include -#include #define ASSERT_TRUE(p) \ do { \ @@ -53,7 +53,7 @@ int testOrder(size_t numThreads, size_t queueSize) { void waitFn(void *opaque) { (void)opaque; - usleep(100); + UTIL_sleepMilli(1); } /* Tests for deadlock */