removed mtctx->cstream

use the first cctx in pool when ZSTDMT is used in single-thread mode
now that cctx and cstream are the same object.
dev
Yann Collet 2017-05-30 16:37:19 -07:00
parent beb62b15a8
commit 5bcef1ada2
1 changed files with 44 additions and 22 deletions

View File

@ -27,7 +27,13 @@
/* ====== Debug ====== */
#if defined(ZSTDMT_DEBUG)
#if defined(ZSTDMT_DEBUG) && (ZSTDMT_DEBUG>=1)
# include <assert.h>
#else
# define assert(condition) ((void)0)
#endif
#if defined(ZSTDMT_DEBUG) && (ZSTDMT_DEBUG>=2)
# include <stdio.h>
# include <unistd.h>
@ -309,7 +315,6 @@ struct ZSTDMT_CCtx_s {
size_t sectionSize;
ZSTD_customMem cMem;
ZSTD_CDict* cdict;
ZSTD_CStream* cstream;
};
ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem)
@ -343,11 +348,6 @@ ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem)
ZSTDMT_freeCCtx(mtctx);
return NULL;
}
if (nbThreads==1) {
mtctx->cstream = ZSTD_createCStream_advanced(cMem);
if (!mtctx->cstream) {
ZSTDMT_freeCCtx(mtctx); return NULL;
} }
pthread_mutex_init(&mtctx->jobCompleted_mutex, NULL); /* Todo : check init function return */
pthread_cond_init(&mtctx->jobCompleted_cond, NULL);
DEBUGLOG(4, "mt_cctx created, for %u threads", nbThreads);
@ -387,7 +387,6 @@ size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx)
ZSTD_free(mtctx->jobs, mtctx->cMem);
ZSTDMT_freeCCtxPool(mtctx->cctxPool);
ZSTD_freeCDict(mtctx->cdict);
ZSTD_freeCStream(mtctx->cstream);
pthread_mutex_destroy(&mtctx->jobCompleted_mutex);
pthread_cond_destroy(&mtctx->jobCompleted_cond);
ZSTD_free(mtctx, mtctx->cMem);
@ -540,8 +539,12 @@ static size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs,
ZSTD_parameters params, unsigned long long pledgedSrcSize)
{
ZSTD_customMem const cmem = { NULL, NULL, NULL };
DEBUGLOG(3, "Started new compression, with windowLog : %u", params.cParams.windowLog);
if (zcs->nbThreads==1) return ZSTD_initCStream_advanced(zcs->cstream, dict, dictSize, params, pledgedSrcSize);
DEBUGLOG(3, "Started new compression, with windowLog : %u",
params.cParams.windowLog);
if (zcs->nbThreads==1)
return ZSTD_initCStream_advanced(zcs->cctxPool->cctx[0],
dict, dictSize,
params, pledgedSrcSize);
if (zcs->allJobsCompleted == 0) { /* previous job not correctly finished */
ZSTDMT_waitForAllJobsCompleted(zcs);
ZSTDMT_releaseAllJobResources(zcs);
@ -587,7 +590,8 @@ size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* zcs,
* pledgedSrcSize is optional and can be zero == unknown */
size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize)
{
if (zcs->nbThreads==1) return ZSTD_resetCStream(zcs->cstream, pledgedSrcSize);
if (zcs->nbThreads==1)
return ZSTD_resetCStream(zcs->cctxPool->cctx[0], pledgedSrcSize);
return ZSTDMT_initCStream_internal(zcs, NULL, 0, 0, zcs->params, pledgedSrcSize);
}
@ -612,13 +616,16 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
return ERROR(memory_allocation);
}
DEBUGLOG(4, "preparing job %u to compress %u bytes with %u preload ", zcs->nextJobID, (U32)srcSize, (U32)zcs->dictSize);
DEBUGLOG(4, "preparing job %u to compress %u bytes with %u preload ",
zcs->nextJobID, (U32)srcSize, (U32)zcs->dictSize);
zcs->jobs[jobID].src = zcs->inBuff.buffer;
zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
zcs->jobs[jobID].srcSize = srcSize;
zcs->jobs[jobID].dictSize = zcs->dictSize; /* note : zcs->inBuff.filled is presumed >= srcSize + dictSize */
zcs->jobs[jobID].dictSize = zcs->dictSize;
assert(zcs->inBuff.filled >= srcSize + zcs->dictSize);
zcs->jobs[jobID].params = zcs->params;
if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0; /* do not calculate checksum within sections, just keep it in header for first section */
/* do not calculate checksum within sections, but write it in header for first section */
if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0;
zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL;
zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize;
zcs->jobs[jobID].dstBuff = dstBuffer;
@ -643,8 +650,12 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
}
DEBUGLOG(5, "inBuff filled to %u", (U32)zcs->inBuff.filled);
zcs->inBuff.filled -= srcSize + zcs->dictSize - newDictSize;
DEBUGLOG(5, "new job : filled to %u, with %u dict and %u src", (U32)zcs->inBuff.filled, (U32)newDictSize, (U32)(zcs->inBuff.filled - newDictSize));
memmove(zcs->inBuff.buffer.start, (const char*)zcs->jobs[jobID].srcStart + zcs->dictSize + srcSize - newDictSize, zcs->inBuff.filled);
DEBUGLOG(5, "new job : filled to %u, with %u dict and %u src",
(U32)zcs->inBuff.filled, (U32)newDictSize,
(U32)(zcs->inBuff.filled - newDictSize));
memmove(zcs->inBuff.buffer.start,
(const char*)zcs->jobs[jobID].srcStart + zcs->dictSize + srcSize - newDictSize,
zcs->inBuff.filled);
DEBUGLOG(5, "new inBuff pre-filled");
zcs->dictSize = newDictSize;
} else {
@ -653,10 +664,16 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
zcs->dictSize = 0;
zcs->frameEnded = 1;
if (zcs->nextJobID == 0)
zcs->params.fParams.checksumFlag = 0; /* single chunk : checksum is calculated directly within worker thread */
/* single chunk exception : checksum is calculated directly within worker thread */
zcs->params.fParams.checksumFlag = 0;
}
DEBUGLOG(3, "posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize, zcs->jobs[jobID].lastChunk, zcs->doneJobID, zcs->doneJobID & zcs->jobIDMask);
DEBUGLOG(3, "posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)",
zcs->nextJobID,
(U32)zcs->jobs[jobID].srcSize,
zcs->jobs[jobID].lastChunk,
zcs->doneJobID,
zcs->doneJobID & zcs->jobIDMask);
POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]); /* this call is blocking when thread worker pool is exhausted */
zcs->nextJobID++;
return 0;
@ -729,8 +746,11 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi
size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
{
size_t const newJobThreshold = zcs->dictSize + zcs->targetSectionSize + zcs->marginSize;
if (zcs->frameEnded) return ERROR(stage_wrong); /* current frame being ended. Only flush is allowed. Restart with init */
if (zcs->nbThreads==1) return ZSTD_compressStream(zcs->cstream, output, input);
if (zcs->frameEnded)
/* current frame being ended. Only flush is allowed. Restart with init */
return ERROR(stage_wrong);
if (zcs->nbThreads==1)
return ZSTD_compressStream(zcs->cctxPool->cctx[0], output, input);
/* fill input buffer */
{ size_t const toLoad = MIN(input->size - input->pos, zcs->inBuffSize - zcs->inBuff.filled);
@ -770,12 +790,14 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output)
{
if (zcs->nbThreads==1) return ZSTD_flushStream(zcs->cstream, output);
if (zcs->nbThreads==1)
return ZSTD_flushStream(zcs->cctxPool->cctx[0], output);
return ZSTDMT_flushStream_internal(zcs, output, 0);
}
size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output)
{
if (zcs->nbThreads==1) return ZSTD_endStream(zcs->cstream, output);
if (zcs->nbThreads==1)
return ZSTD_endStream(zcs->cctxPool->cctx[0], output);
return ZSTDMT_flushStream_internal(zcs, output, 1);
}