introduced POOL_resize()

not complete yet :
finalize behavior in case of unfinished expansion
dev
Yann Collet 2018-06-18 20:46:39 -07:00
parent e30f13bde0
commit 1c714fda3f
2 changed files with 88 additions and 38 deletions

View File

@ -33,8 +33,9 @@ typedef struct POOL_job_s {
struct POOL_ctx_s {
ZSTD_customMem customMem;
/* Keep track of the threads */
ZSTD_pthread_t *threads;
size_t numThreads;
ZSTD_pthread_t* threads;
size_t threadCapacity;
size_t threadLimit;
/* The queue is a circular buffer */
POOL_job *queue;
@ -58,10 +59,10 @@ struct POOL_ctx_s {
};
/* POOL_thread() :
Work thread for the thread pool.
Waits for jobs and executes them.
@returns : NULL on failure else non-null.
*/
* Work thread for the thread pool.
* Waits for jobs and executes them.
* @returns : NULL on failure else non-null.
*/
static void* POOL_thread(void* opaque) {
POOL_ctx* const ctx = (POOL_ctx*)opaque;
if (!ctx) { return NULL; }
@ -103,16 +104,17 @@ POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {
return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem);
}
POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem) {
POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize,
ZSTD_customMem customMem) {
POOL_ctx* ctx;
/* Check the parameters */
/* Check parameters */
if (!numThreads) { return NULL; }
/* Allocate the context and zero initialize */
ctx = (POOL_ctx*)ZSTD_calloc(sizeof(POOL_ctx), customMem);
if (!ctx) { return NULL; }
/* Initialize the job queue.
* It needs one extra space since one space is wasted to differentiate empty
* and full queues.
* It needs one extra space since one space is wasted to differentiate
* empty and full queues.
*/
ctx->queueSize = queueSize + 1;
ctx->queue = (POOL_job*)ZSTD_malloc(ctx->queueSize * sizeof(POOL_job), customMem);
@ -126,7 +128,7 @@ POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customM
ctx->shutdown = 0;
/* Allocate space for the thread handles */
ctx->threads = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), customMem);
ctx->numThreads = 0;
ctx->threadCapacity = 0;
ctx->customMem = customMem;
/* Check for errors */
if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; }
@ -134,11 +136,12 @@ POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customM
{ size_t i;
for (i = 0; i < numThreads; ++i) {
if (ZSTD_pthread_create(&ctx->threads[i], NULL, &POOL_thread, ctx)) {
ctx->numThreads = i;
ctx->threadCapacity = i;
POOL_free(ctx);
return NULL;
} }
ctx->numThreads = numThreads;
ctx->threadCapacity = numThreads;
ctx->threadLimit = numThreads;
}
return ctx;
}
@ -156,8 +159,8 @@ static void POOL_join(POOL_ctx* ctx) {
ZSTD_pthread_cond_broadcast(&ctx->queuePopCond);
/* Join all of the threads */
{ size_t i;
for (i = 0; i < ctx->numThreads; ++i) {
ZSTD_pthread_join(ctx->threads[i], NULL);
for (i = 0; i < ctx->threadCapacity; ++i) {
ZSTD_pthread_join(ctx->threads[i], NULL); /* note : could fail */
} }
}
@ -172,24 +175,72 @@ void POOL_free(POOL_ctx *ctx) {
ZSTD_free(ctx, ctx->customMem);
}
size_t POOL_sizeof(POOL_ctx *ctx) {
if (ctx==NULL) return 0; /* supports sizeof NULL */
return sizeof(*ctx)
+ ctx->queueSize * sizeof(POOL_job)
+ ctx->numThreads * sizeof(ZSTD_pthread_t);
+ ctx->threadCapacity * sizeof(ZSTD_pthread_t);
}
/* note : only works if no job is running !
* return : 1 on success, 0 on failure */
static int POOL_resize_internal(POOL_ctx* ctx, size_t numThreads)
{
if (ctx->numThreadsBusy > 0) return 0;
if (numThreads <= ctx->threadCapacity) {
ctx->threadLimit = numThreads;
return 1;
}
/* numThreads > threadCapacity */
{ ZSTD_pthread_t* const threadPool = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), ctx->customMem);
if (!threadPool) return 0;
/* Initialize additional threads */
{ size_t threadId;
for (threadId = ctx->threadCapacity; threadId < numThreads; ++threadId) {
if (ZSTD_pthread_create(&threadPool[threadId], NULL, &POOL_thread, ctx)) {
break;
} }
if (threadId != numThreads) { /* not all threads successfully init */
/* how to destroy existing threads ? */
/* POOL_join destroy all existing threads, not just newly created ones */
return 0;
}
}
/* replace existing thread pool */
memcpy(threadPool, ctx->threads, ctx->threadCapacity);
ZSTD_free(ctx->threads, ctx->customMem);
ctx->threads = threadPool;
}
ctx->threadCapacity = numThreads;
ctx->threadLimit = numThreads;
return 1;
}
/* return : 1 on success, 0 on failure */
int POOL_resize(POOL_ctx* ctx, size_t numThreads)
{
int result;
if (!ctx) return 0;
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
result = POOL_resize_internal(ctx, numThreads);
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
return result;
}
/**
* Returns 1 if the queue is full and 0 otherwise.
*
* If the queueSize is 1 (the pool was created with an intended queueSize of 0),
* then a queue is empty if there is a thread free and no job is waiting.
* When queueSize is 1 (pool was created with an intended queueSize of 0),
* then a queue is empty if there is a thread free _and_ no job is waiting.
*/
static int isQueueFull(POOL_ctx const* ctx) {
if (ctx->queueSize > 1) {
return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize);
} else {
return ctx->numThreadsBusy == ctx->numThreads ||
return ctx->numThreadsBusy == ctx->threadLimit ||
!ctx->queueEmpty;
}
}

View File

@ -30,40 +30,39 @@ typedef struct POOL_ctx_s POOL_ctx;
*/
POOL_ctx* POOL_create(size_t numThreads, size_t queueSize);
POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem);
POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize,
ZSTD_customMem customMem);
/*! POOL_free() :
Free a thread pool returned by POOL_create().
*/
* Free a thread pool returned by POOL_create().
*/
void POOL_free(POOL_ctx* ctx);
/*! POOL_sizeof() :
return memory usage of pool returned by POOL_create().
*/
* @return threadpool memory usage
* note : compatible with NULL (returns 0 in this case)
*/
size_t POOL_sizeof(POOL_ctx* ctx);
/*! POOL_function :
The function type that can be added to a thread pool.
*/
* The function type that can be added to a thread pool.
*/
typedef void (*POOL_function)(void*);
/*! POOL_add_function :
The function type for a generic thread pool add function.
*/
typedef void (*POOL_add_function)(void*, POOL_function, void*);
/*! POOL_add() :
Add the job `function(opaque)` to the thread pool. `ctx` must be valid.
Possibly blocks until there is room in the queue.
Note : The function may be executed asynchronously, so `opaque` must live until the function has been completed.
*/
* Add the job `function(opaque)` to the thread pool. `ctx` must be valid.
* Possibly blocks until there is room in the queue.
* Note : The function may be executed asynchronously,
* therefore, `opaque` must live until function has been completed.
*/
void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque);
/*! POOL_tryAdd() :
Add the job `function(opaque)` to the thread pool if a worker is available.
return immediately otherwise.
@return : 1 if successful, 0 if not.
*/
* Add the job `function(opaque)` to thread pool _if_ a worker is available.
* Returns immediately even if not (does not block).
* @return : 1 if successful, 0 if not.
*/
int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque);