ZSTDMT streaming : fall back to (regular) single thread mode

when nbThreads==1
dev
Yann Collet 2017-01-23 01:43:58 -08:00
parent 84581ff8d7
commit 1cbf251e43
1 changed files with 21 additions and 9 deletions

View File

@ -172,7 +172,10 @@ static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(unsigned nbThreads)
ZSTDMT_CCtxPool* const cctxPool = (ZSTDMT_CCtxPool*) calloc(1, sizeof(ZSTDMT_CCtxPool) + (nbThreads-1)*sizeof(ZSTD_CCtx*));
if (!cctxPool) return NULL;
cctxPool->totalCCtx = nbThreads;
cctxPool->availCCtx = 0;
cctxPool->availCCtx = 1; /* at least one cctx for single-thread mode */
cctxPool->cctx[0] = ZSTD_createCCtx();
if (!cctxPool->cctx[0]) { ZSTDMT_freeCCtxPool(cctxPool); return NULL; }
DEBUGLOG(1, "cctxPool created, with %u threads", nbThreads);
return cctxPool;
}
@ -278,6 +281,7 @@ struct ZSTDMT_CCtx_s {
unsigned allJobsCompleted;
unsigned long long frameContentSize;
ZSTD_CDict* cdict;
ZSTD_CStream* cstream;
ZSTDMT_jobDescription jobs[1]; /* variable size (must lies at the end) */
};
@ -287,7 +291,7 @@ ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads)
U32 const minNbJobs = nbThreads + 2;
U32 const nbJobsLog2 = ZSTD_highbit32(minNbJobs) + 1;
U32 const nbJobs = 1 << nbJobsLog2;
DEBUGLOG(4, "nbThreads : %u ; minNbJobs : %u ; nbJobsLog2 : %u ; nbJobs : %u \n",
DEBUGLOG(5, "nbThreads : %u ; minNbJobs : %u ; nbJobsLog2 : %u ; nbJobs : %u \n",
nbThreads, minNbJobs, nbJobsLog2, nbJobs);
if ((nbThreads < 1) | (nbThreads > ZSTDMT_NBTHREADS_MAX)) return NULL;
cctx = (ZSTDMT_CCtx*) calloc(1, sizeof(ZSTDMT_CCtx) + nbJobs*sizeof(ZSTDMT_jobDescription));
@ -302,8 +306,14 @@ ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads)
ZSTDMT_freeCCtx(cctx);
return NULL;
}
if (nbThreads==1) {
cctx->cstream = ZSTD_createCStream();
if (!cctx->cstream) {
ZSTDMT_freeCCtx(cctx); return NULL;
} }
pthread_mutex_init(&cctx->jobCompleted_mutex, NULL); /* Todo : check init function return */
pthread_cond_init(&cctx->jobCompleted_cond, NULL);
DEBUGLOG(4, "mt_cctx created, for %u threads \n", nbThreads);
return cctx;
}
@ -329,11 +339,12 @@ static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx)
size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx)
{
if (mtctx==NULL) return 0; /* compatible with free on NULL */
ZSTD_freeCDict(mtctx->cdict);
POOL_free(mtctx->factory);
if (!mtctx->allJobsCompleted) ZSTDMT_releaseAllJobResources(mtctx); /* stop workers first */
ZSTDMT_freeBufferPool(mtctx->buffPool); /* release job resources into pools first */
ZSTDMT_freeCCtxPool(mtctx->cctxPool);
ZSTD_freeCDict(mtctx->cdict);
ZSTD_freeCStream(mtctx->cstream);
pthread_mutex_destroy(&mtctx->jobCompleted_mutex);
pthread_cond_destroy(&mtctx->jobCompleted_cond);
free(mtctx);
@ -361,12 +372,8 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
params.fParams.contentSizeFlag = 1;
if (nbChunks==1) { /* fallback to single-thread mode */
size_t result;
ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(mtctx->cctxPool);
if (!cctx) return ERROR(memory_allocation);
result = ZSTD_compressCCtx(mtctx->cctxPool->cctx[0], dst, dstCapacity, src, srcSize, compressionLevel);
ZSTDMT_releaseCCtx(mtctx->cctxPool, cctx);
return result;
ZSTD_CCtx* const cctx = mtctx->cctxPool->cctx[0];
return ZSTD_compressCCtx(cctx, dst, dstCapacity, src, srcSize, compressionLevel);
}
{ unsigned u;
@ -461,6 +468,7 @@ static size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs,
ZSTD_parameters params, unsigned long long pledgedSrcSize)
{
ZSTD_customMem const cmem = { NULL, NULL, NULL };
if (zcs->nbThreads==1) return ZSTD_initCStream_advanced(zcs->cstream, dict, dictSize, params, pledgedSrcSize);
if (zcs->allJobsCompleted == 0) { /* previous job not correctly finished */
ZSTDMT_waitForAllJobsCompleted(zcs);
ZSTDMT_releaseAllJobResources(zcs);
@ -498,6 +506,7 @@ size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* zcs,
* pledgedSrcSize is optional and can be zero == unknown */
size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize)
{
if (zcs->nbThreads==1) return ZSTD_resetCStream(zcs->cstream, pledgedSrcSize);
return ZSTDMT_initCStream_internal(zcs, NULL, 0, 0, zcs->params, pledgedSrcSize);
}
@ -510,6 +519,7 @@ size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) {
size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
{
if (zcs->frameEnded) return ERROR(stage_wrong); /* current frame being ended. Only flush is allowed. Restart with init */
if (zcs->nbThreads==1) return ZSTD_compressStream(zcs->cstream, output, input);
/* fill input buffer */
{ size_t const toLoad = MIN(input->size - input->pos, zcs->inBuffSize - zcs->inBuff.filled);
@ -708,10 +718,12 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output)
{
if (zcs->nbThreads==1) return ZSTD_flushStream(zcs->cstream, output);
return ZSTDMT_flushStream_internal(zcs, output, 0);
}
size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output)
{
if (zcs->nbThreads==1) return ZSTD_endStream(zcs->cstream, output);
return ZSTDMT_flushStream_internal(zcs, output, 1);
}