diff --git a/lib/common/pool.c b/lib/common/pool.c index 749fa4f2..06d8a5f5 100644 --- a/lib/common/pool.c +++ b/lib/common/pool.c @@ -92,7 +92,7 @@ POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) { * and full queues. */ ctx->queueSize = queueSize + 1; - ctx->queue = (POOL_job *)malloc(ctx->queueSize * sizeof(POOL_job)); + ctx->queue = (POOL_job*) malloc(ctx->queueSize * sizeof(POOL_job)); ctx->queueHead = 0; ctx->queueTail = 0; pthread_mutex_init(&ctx->queueMutex, NULL); @@ -100,7 +100,7 @@ POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) { pthread_cond_init(&ctx->queuePopCond, NULL); ctx->shutdown = 0; /* Allocate space for the thread handles */ - ctx->threads = (pthread_t *)malloc(numThreads * sizeof(pthread_t)); + ctx->threads = (pthread_t*)malloc(numThreads * sizeof(pthread_t)); ctx->numThreads = 0; /* Check for errors */ if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; } @@ -153,8 +153,8 @@ size_t POOL_sizeof(POOL_ctx *ctx) { + ctx->numThreads * sizeof(pthread_t); } -void POOL_add(void *ctxVoid, POOL_function function, void *opaque) { - POOL_ctx *ctx = (POOL_ctx *)ctxVoid; +void POOL_add(void* ctxVoid, POOL_function function, void *opaque) { + POOL_ctx* const ctx = (POOL_ctx*)ctxVoid; if (!ctx) { return; } pthread_mutex_lock(&ctx->queueMutex); @@ -183,22 +183,22 @@ struct POOL_ctx_s { int data; }; -POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) { +POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) { (void)numThreads; (void)queueSize; - return (POOL_ctx *)malloc(sizeof(POOL_ctx)); + return (POOL_ctx*)malloc(sizeof(POOL_ctx)); } -void POOL_free(POOL_ctx *ctx) { - if (ctx) free(ctx); +void POOL_free(POOL_ctx* ctx) { + free(ctx); } -void POOL_add(void *ctx, POOL_function function, void *opaque) { +void POOL_add(void* ctx, POOL_function function, void* opaque) { (void)ctx; function(opaque); } -size_t POOL_sizeof(POOL_ctx *ctx) { +size_t POOL_sizeof(POOL_ctx* ctx) { if (ctx==NULL) return 0; /* supports sizeof NULL */ return sizeof(*ctx); } diff --git a/lib/common/pool.h b/lib/common/pool.h index 386cd674..957100f4 100644 --- a/lib/common/pool.h +++ b/lib/common/pool.h @@ -19,11 +19,11 @@ extern "C" { typedef struct POOL_ctx_s POOL_ctx; /*! POOL_create() : - Create a thread pool with at most `numThreads` threads. - `numThreads` must be at least 1. - The maximum number of queued jobs before blocking is `queueSize`. - `queueSize` must be at least 1. - @return : The POOL_ctx pointer on success else NULL. + * Create a thread pool with at most `numThreads` threads. + * `numThreads` must be at least 1. + * The maximum number of queued jobs before blocking is `queueSize`. + * `queueSize` must be at least 1. + * @return : POOL_ctx pointer on success, else NULL. */ POOL_ctx *POOL_create(size_t numThreads, size_t queueSize); diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 255b9c7e..c4547c81 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -73,6 +73,7 @@ static unsigned long long GetCurrentClockTimeMicroseconds(void) /* ===== Buffer Pool ===== */ +/* a single Buffer Pool can be invoked from multiple threads in parallel */ typedef struct buffer_s { void* start; @@ -82,6 +83,7 @@ typedef struct buffer_s { static const buffer_t g_nullBuffer = { NULL, 0 }; typedef struct ZSTDMT_bufferPool_s { + pthread_mutex_t poolMutex; unsigned totalBuffers; unsigned nbBuffers; ZSTD_customMem cMem; @@ -94,6 +96,7 @@ static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned nbThreads, ZSTD_custo ZSTDMT_bufferPool* const bufPool = (ZSTDMT_bufferPool*)ZSTD_calloc( sizeof(ZSTDMT_bufferPool) + (maxNbBuffers-1) * sizeof(buffer_t), cMem); if (bufPool==NULL) return NULL; + pthread_mutex_init(&bufPool->poolMutex, NULL); bufPool->totalBuffers = maxNbBuffers; bufPool->nbBuffers = 0; bufPool->cMem = cMem; @@ -106,6 +109,7 @@ static void ZSTDMT_freeBufferPool(ZSTDMT_bufferPool* bufPool) if (!bufPool) return; /* compatibility with free on NULL */ for (u=0; utotalBuffers; u++) ZSTD_free(bufPool->bTable[u].start, bufPool->cMem); + pthread_mutex_destroy(&bufPool->poolMutex); ZSTD_free(bufPool, bufPool->cMem); } @@ -116,31 +120,37 @@ static size_t ZSTDMT_sizeof_bufferPool(ZSTDMT_bufferPool* bufPool) + (bufPool->totalBuffers - 1) * sizeof(buffer_t); unsigned u; size_t totalBufferSize = 0; + pthread_mutex_lock(&bufPool->poolMutex); for (u=0; utotalBuffers; u++) totalBufferSize += bufPool->bTable[u].size; + pthread_mutex_unlock(&bufPool->poolMutex); return poolSize + totalBufferSize; } /** ZSTDMT_getBuffer() : * assumption : invocation from main thread only ! */ -static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* pool, size_t bSize) +static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool, size_t bSize) { DEBUGLOG(2, "ZSTDMT_getBuffer"); - if (pool->nbBuffers) { /* try to use an existing buffer */ - buffer_t const buf = pool->bTable[--(pool->nbBuffers)]; + pthread_mutex_lock(&bufPool->poolMutex); + if (bufPool->nbBuffers) { /* try to use an existing buffer */ + buffer_t const buf = bufPool->bTable[--(bufPool->nbBuffers)]; size_t const availBufferSize = buf.size; - if ((availBufferSize >= bSize) & (availBufferSize <= 10*bSize)) + if ((availBufferSize >= bSize) & (availBufferSize <= 10*bSize)) { /* large enough, but not too much */ + pthread_mutex_unlock(&bufPool->poolMutex); return buf; + } /* size conditions not respected : scratch this buffer, create new one */ DEBUGLOG(2, "existing buffer does not meet size conditions => freeing"); - ZSTD_free(buf.start, pool->cMem); + ZSTD_free(buf.start, bufPool->cMem); } + pthread_mutex_unlock(&bufPool->poolMutex); /* create new buffer */ DEBUGLOG(2, "create a new buffer"); { buffer_t buffer; - void* const start = ZSTD_malloc(bSize, pool->cMem); + void* const start = ZSTD_malloc(bSize, bufPool->cMem); if (start==NULL) bSize = 0; buffer.start = start; /* note : start can be NULL if malloc fails ! */ buffer.size = bSize; @@ -149,23 +159,25 @@ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* pool, size_t bSize) } /* store buffer for later re-use, up to pool capacity */ -static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* pool, buffer_t buf) +static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf) { DEBUGLOG(2, "ZSTDMT_releaseBuffer"); if (buf.start == NULL) return; /* release on NULL */ - if (pool->nbBuffers < pool->totalBuffers) { - pool->bTable[pool->nbBuffers++] = buf; /* store for later re-use */ + pthread_mutex_lock(&bufPool->poolMutex); + if (bufPool->nbBuffers < bufPool->totalBuffers) { + bufPool->bTable[bufPool->nbBuffers++] = buf; /* store for later re-use */ + pthread_mutex_unlock(&bufPool->poolMutex); return; } + pthread_mutex_unlock(&bufPool->poolMutex); /* Reached bufferPool capacity (should not happen) */ DEBUGLOG(2, "buffer pool capacity reached => freeing "); - ZSTD_free(buf.start, pool->cMem); + ZSTD_free(buf.start, bufPool->cMem); } /* ===== CCtx Pool ===== */ - -/* a single cctxPool can be called from multiple threads in parallel */ +/* a single CCtx Pool can be invoked from multiple threads in parallel */ typedef struct { pthread_mutex_t poolMutex; @@ -314,7 +326,7 @@ void ZSTDMT_compressChunk(void* jobDescription) job->cSize = (job->lastChunk) ? ZSTD_compressEnd (cctx, dstBuff.start, dstBuff.size, src, job->srcSize) : ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, job->srcSize); - DEBUGLOG(5, "compressed %u bytes into %u bytes (first:%u) (last:%u)", + DEBUGLOG(2, "compressed %u bytes into %u bytes (first:%u) (last:%u)", (unsigned)job->srcSize, (unsigned)job->cSize, job->firstChunk, job->lastChunk); DEBUGLOG(5, "dstBuff.size : %u ; => %s", (U32)dstBuff.size, ZSTD_getErrorName(job->cSize));