diff --git a/lib/common/pool.c b/lib/common/pool.c index d08b1de7..bd31f032 100644 --- a/lib/common/pool.c +++ b/lib/common/pool.c @@ -73,10 +73,13 @@ static void* POOL_thread(void* opaque) { while ( ctx->queueEmpty || (ctx->numThreadsBusy >= ctx->threadLimit) ) { - if (ctx->shutdown) { - ZSTD_pthread_mutex_unlock(&ctx->queueMutex); - return opaque; - } + if (ctx->shutdown) { + /* even if !queueEmpty, (possible if numThreadsBusy >= threadLimit), + * a few threads will be shutdown while !queueEmpty, + * but enough threads will remain active to finish the queue */ + ZSTD_pthread_mutex_unlock(&ctx->queueMutex); + return opaque; + } ZSTD_pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex); } /* Pop a job off the queue */ @@ -99,7 +102,7 @@ static void* POOL_thread(void* opaque) { ZSTD_pthread_mutex_unlock(&ctx->queueMutex); } } /* for (;;) */ - assert(0); /* Unreachable */ + assert(0); /* Unreachable */ } POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) { @@ -187,10 +190,12 @@ size_t POOL_sizeof(POOL_ctx *ctx) { } -/* note : only works if no job is running ! */ +/* @return : a working pool on success, NULL on failure + * note : starting context is considered consumed. */ static POOL_ctx* POOL_resize_internal(POOL_ctx* ctx, size_t numThreads) { if (numThreads <= ctx->threadCapacity) { + if (!numThreads) return NULL; ctx->threadLimit = numThreads; return ctx; } diff --git a/lib/common/pool.h b/lib/common/pool.h index 00ea7663..caf51490 100644 --- a/lib/common/pool.h +++ b/lib/common/pool.h @@ -45,6 +45,7 @@ void POOL_free(POOL_ctx* ctx); * note : new pool context might have same address as original one, but it's not guaranteed. * consider starting context as consumed, only rely on returned one. * note 2 : only numThreads can be resized, queueSize is unchanged. + * note 3 : `numThreads` must be at least 1 */ POOL_ctx* POOL_resize(POOL_ctx* ctx, size_t numThreads); diff --git a/tests/poolTests.c b/tests/poolTests.c index d7ff70ac..d5768967 100644 --- a/tests/poolTests.c +++ b/tests/poolTests.c @@ -169,7 +169,7 @@ typedef struct { void waitIncFn(void *opaque) { abruptEndCanary_t* test = (abruptEndCanary_t*) opaque; - UTIL_sleepMilli(1); + UTIL_sleepMilli(10); ZSTD_pthread_mutex_lock(&test->mut); test->val = test->val + 1; ZSTD_pthread_mutex_unlock(&test->mut); @@ -179,14 +179,15 @@ static int testAbruptEnding_internal(abruptEndCanary_t test) { int const nbWaits = 16; - POOL_ctx* const ctx = POOL_create(2 /*numThreads*/, nbWaits /*queueSize*/); + POOL_ctx* ctx = POOL_create(3 /*numThreads*/, nbWaits /*queueSize*/); ASSERT_TRUE(ctx); test.val = 0; { int i; for (i=0; i