buffer pool can be invoked from multiple threads

dev
Yann Collet 2017-07-11 14:14:07 -07:00
parent ef0ff7fe7f
commit 16261e6951
3 changed files with 40 additions and 28 deletions

View File

@ -92,7 +92,7 @@ POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) {
* and full queues. * and full queues.
*/ */
ctx->queueSize = queueSize + 1; ctx->queueSize = queueSize + 1;
ctx->queue = (POOL_job *)malloc(ctx->queueSize * sizeof(POOL_job)); ctx->queue = (POOL_job*) malloc(ctx->queueSize * sizeof(POOL_job));
ctx->queueHead = 0; ctx->queueHead = 0;
ctx->queueTail = 0; ctx->queueTail = 0;
pthread_mutex_init(&ctx->queueMutex, NULL); pthread_mutex_init(&ctx->queueMutex, NULL);
@ -100,7 +100,7 @@ POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) {
pthread_cond_init(&ctx->queuePopCond, NULL); pthread_cond_init(&ctx->queuePopCond, NULL);
ctx->shutdown = 0; ctx->shutdown = 0;
/* Allocate space for the thread handles */ /* Allocate space for the thread handles */
ctx->threads = (pthread_t *)malloc(numThreads * sizeof(pthread_t)); ctx->threads = (pthread_t*)malloc(numThreads * sizeof(pthread_t));
ctx->numThreads = 0; ctx->numThreads = 0;
/* Check for errors */ /* Check for errors */
if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; } if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; }
@ -153,8 +153,8 @@ size_t POOL_sizeof(POOL_ctx *ctx) {
+ ctx->numThreads * sizeof(pthread_t); + ctx->numThreads * sizeof(pthread_t);
} }
void POOL_add(void *ctxVoid, POOL_function function, void *opaque) { void POOL_add(void* ctxVoid, POOL_function function, void *opaque) {
POOL_ctx *ctx = (POOL_ctx *)ctxVoid; POOL_ctx* const ctx = (POOL_ctx*)ctxVoid;
if (!ctx) { return; } if (!ctx) { return; }
pthread_mutex_lock(&ctx->queueMutex); pthread_mutex_lock(&ctx->queueMutex);
@ -183,22 +183,22 @@ struct POOL_ctx_s {
int data; int data;
}; };
POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) { POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {
(void)numThreads; (void)numThreads;
(void)queueSize; (void)queueSize;
return (POOL_ctx *)malloc(sizeof(POOL_ctx)); return (POOL_ctx*)malloc(sizeof(POOL_ctx));
} }
void POOL_free(POOL_ctx *ctx) { void POOL_free(POOL_ctx* ctx) {
if (ctx) free(ctx); free(ctx);
} }
void POOL_add(void *ctx, POOL_function function, void *opaque) { void POOL_add(void* ctx, POOL_function function, void* opaque) {
(void)ctx; (void)ctx;
function(opaque); function(opaque);
} }
size_t POOL_sizeof(POOL_ctx *ctx) { size_t POOL_sizeof(POOL_ctx* ctx) {
if (ctx==NULL) return 0; /* supports sizeof NULL */ if (ctx==NULL) return 0; /* supports sizeof NULL */
return sizeof(*ctx); return sizeof(*ctx);
} }

View File

@ -19,11 +19,11 @@ extern "C" {
typedef struct POOL_ctx_s POOL_ctx; typedef struct POOL_ctx_s POOL_ctx;
/*! POOL_create() : /*! POOL_create() :
Create a thread pool with at most `numThreads` threads. * Create a thread pool with at most `numThreads` threads.
`numThreads` must be at least 1. * `numThreads` must be at least 1.
The maximum number of queued jobs before blocking is `queueSize`. * The maximum number of queued jobs before blocking is `queueSize`.
`queueSize` must be at least 1. * `queueSize` must be at least 1.
@return : The POOL_ctx pointer on success else NULL. * @return : POOL_ctx pointer on success, else NULL.
*/ */
POOL_ctx *POOL_create(size_t numThreads, size_t queueSize); POOL_ctx *POOL_create(size_t numThreads, size_t queueSize);

View File

@ -73,6 +73,7 @@ static unsigned long long GetCurrentClockTimeMicroseconds(void)
/* ===== Buffer Pool ===== */ /* ===== Buffer Pool ===== */
/* a single Buffer Pool can be invoked from multiple threads in parallel */
typedef struct buffer_s { typedef struct buffer_s {
void* start; void* start;
@ -82,6 +83,7 @@ typedef struct buffer_s {
static const buffer_t g_nullBuffer = { NULL, 0 }; static const buffer_t g_nullBuffer = { NULL, 0 };
typedef struct ZSTDMT_bufferPool_s { typedef struct ZSTDMT_bufferPool_s {
pthread_mutex_t poolMutex;
unsigned totalBuffers; unsigned totalBuffers;
unsigned nbBuffers; unsigned nbBuffers;
ZSTD_customMem cMem; ZSTD_customMem cMem;
@ -94,6 +96,7 @@ static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned nbThreads, ZSTD_custo
ZSTDMT_bufferPool* const bufPool = (ZSTDMT_bufferPool*)ZSTD_calloc( ZSTDMT_bufferPool* const bufPool = (ZSTDMT_bufferPool*)ZSTD_calloc(
sizeof(ZSTDMT_bufferPool) + (maxNbBuffers-1) * sizeof(buffer_t), cMem); sizeof(ZSTDMT_bufferPool) + (maxNbBuffers-1) * sizeof(buffer_t), cMem);
if (bufPool==NULL) return NULL; if (bufPool==NULL) return NULL;
pthread_mutex_init(&bufPool->poolMutex, NULL);
bufPool->totalBuffers = maxNbBuffers; bufPool->totalBuffers = maxNbBuffers;
bufPool->nbBuffers = 0; bufPool->nbBuffers = 0;
bufPool->cMem = cMem; bufPool->cMem = cMem;
@ -106,6 +109,7 @@ static void ZSTDMT_freeBufferPool(ZSTDMT_bufferPool* bufPool)
if (!bufPool) return; /* compatibility with free on NULL */ if (!bufPool) return; /* compatibility with free on NULL */
for (u=0; u<bufPool->totalBuffers; u++) for (u=0; u<bufPool->totalBuffers; u++)
ZSTD_free(bufPool->bTable[u].start, bufPool->cMem); ZSTD_free(bufPool->bTable[u].start, bufPool->cMem);
pthread_mutex_destroy(&bufPool->poolMutex);
ZSTD_free(bufPool, bufPool->cMem); ZSTD_free(bufPool, bufPool->cMem);
} }
@ -116,31 +120,37 @@ static size_t ZSTDMT_sizeof_bufferPool(ZSTDMT_bufferPool* bufPool)
+ (bufPool->totalBuffers - 1) * sizeof(buffer_t); + (bufPool->totalBuffers - 1) * sizeof(buffer_t);
unsigned u; unsigned u;
size_t totalBufferSize = 0; size_t totalBufferSize = 0;
pthread_mutex_lock(&bufPool->poolMutex);
for (u=0; u<bufPool->totalBuffers; u++) for (u=0; u<bufPool->totalBuffers; u++)
totalBufferSize += bufPool->bTable[u].size; totalBufferSize += bufPool->bTable[u].size;
pthread_mutex_unlock(&bufPool->poolMutex);
return poolSize + totalBufferSize; return poolSize + totalBufferSize;
} }
/** ZSTDMT_getBuffer() : /** ZSTDMT_getBuffer() :
* assumption : invocation from main thread only ! */ * assumption : invocation from main thread only ! */
static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* pool, size_t bSize) static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool, size_t bSize)
{ {
DEBUGLOG(2, "ZSTDMT_getBuffer"); DEBUGLOG(2, "ZSTDMT_getBuffer");
if (pool->nbBuffers) { /* try to use an existing buffer */ pthread_mutex_lock(&bufPool->poolMutex);
buffer_t const buf = pool->bTable[--(pool->nbBuffers)]; if (bufPool->nbBuffers) { /* try to use an existing buffer */
buffer_t const buf = bufPool->bTable[--(bufPool->nbBuffers)];
size_t const availBufferSize = buf.size; size_t const availBufferSize = buf.size;
if ((availBufferSize >= bSize) & (availBufferSize <= 10*bSize)) if ((availBufferSize >= bSize) & (availBufferSize <= 10*bSize)) {
/* large enough, but not too much */ /* large enough, but not too much */
pthread_mutex_unlock(&bufPool->poolMutex);
return buf; return buf;
}
/* size conditions not respected : scratch this buffer, create new one */ /* size conditions not respected : scratch this buffer, create new one */
DEBUGLOG(2, "existing buffer does not meet size conditions => freeing"); DEBUGLOG(2, "existing buffer does not meet size conditions => freeing");
ZSTD_free(buf.start, pool->cMem); ZSTD_free(buf.start, bufPool->cMem);
} }
pthread_mutex_unlock(&bufPool->poolMutex);
/* create new buffer */ /* create new buffer */
DEBUGLOG(2, "create a new buffer"); DEBUGLOG(2, "create a new buffer");
{ buffer_t buffer; { buffer_t buffer;
void* const start = ZSTD_malloc(bSize, pool->cMem); void* const start = ZSTD_malloc(bSize, bufPool->cMem);
if (start==NULL) bSize = 0; if (start==NULL) bSize = 0;
buffer.start = start; /* note : start can be NULL if malloc fails ! */ buffer.start = start; /* note : start can be NULL if malloc fails ! */
buffer.size = bSize; buffer.size = bSize;
@ -149,23 +159,25 @@ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* pool, size_t bSize)
} }
/* store buffer for later re-use, up to pool capacity */ /* store buffer for later re-use, up to pool capacity */
static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* pool, buffer_t buf) static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf)
{ {
DEBUGLOG(2, "ZSTDMT_releaseBuffer"); DEBUGLOG(2, "ZSTDMT_releaseBuffer");
if (buf.start == NULL) return; /* release on NULL */ if (buf.start == NULL) return; /* release on NULL */
if (pool->nbBuffers < pool->totalBuffers) { pthread_mutex_lock(&bufPool->poolMutex);
pool->bTable[pool->nbBuffers++] = buf; /* store for later re-use */ if (bufPool->nbBuffers < bufPool->totalBuffers) {
bufPool->bTable[bufPool->nbBuffers++] = buf; /* store for later re-use */
pthread_mutex_unlock(&bufPool->poolMutex);
return; return;
} }
pthread_mutex_unlock(&bufPool->poolMutex);
/* Reached bufferPool capacity (should not happen) */ /* Reached bufferPool capacity (should not happen) */
DEBUGLOG(2, "buffer pool capacity reached => freeing "); DEBUGLOG(2, "buffer pool capacity reached => freeing ");
ZSTD_free(buf.start, pool->cMem); ZSTD_free(buf.start, bufPool->cMem);
} }
/* ===== CCtx Pool ===== */ /* ===== CCtx Pool ===== */
/* a single CCtx Pool can be invoked from multiple threads in parallel */
/* a single cctxPool can be called from multiple threads in parallel */
typedef struct { typedef struct {
pthread_mutex_t poolMutex; pthread_mutex_t poolMutex;
@ -314,7 +326,7 @@ void ZSTDMT_compressChunk(void* jobDescription)
job->cSize = (job->lastChunk) ? job->cSize = (job->lastChunk) ?
ZSTD_compressEnd (cctx, dstBuff.start, dstBuff.size, src, job->srcSize) : ZSTD_compressEnd (cctx, dstBuff.start, dstBuff.size, src, job->srcSize) :
ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, job->srcSize); ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, job->srcSize);
DEBUGLOG(5, "compressed %u bytes into %u bytes (first:%u) (last:%u)", DEBUGLOG(2, "compressed %u bytes into %u bytes (first:%u) (last:%u)",
(unsigned)job->srcSize, (unsigned)job->cSize, job->firstChunk, job->lastChunk); (unsigned)job->srcSize, (unsigned)job->cSize, job->firstChunk, job->lastChunk);
DEBUGLOG(5, "dstBuff.size : %u ; => %s", (U32)dstBuff.size, ZSTD_getErrorName(job->cSize)); DEBUGLOG(5, "dstBuff.size : %u ; => %s", (U32)dstBuff.size, ZSTD_getErrorName(job->cSize));