zstdmt: removed job->jobCompleted

replaced by equivalent signal job->consumer == job->srcSize.

created additional functions
ZSTD_writeLastEmptyBlock()
and
ZSTDMT_writeLastEmptyBlock()
required when it's necessary to finish a frame with a last empty job, to create an "end of frame" marker.

It avoids creating a job with srcSize==0.
This commit is contained in:
Yann Collet 2018-01-25 17:35:49 -08:00
parent 1272d8e760
commit a1d4041e69
3 changed files with 231 additions and 189 deletions

View File

@ -1824,7 +1824,7 @@ static size_t ZSTD_compressBlock_internal(ZSTD_CCtx* zc,
void* dst, size_t dstCapacity,
const void* src, size_t srcSize)
{
DEBUGLOG(5, "ZSTD_compressBlock_internal (dstCapacity=%u) (dictLimit=%u, nextToUpdate=%u)",
DEBUGLOG(5, "ZSTD_compressBlock_internal (dstCapacity=%u, dictLimit=%u, nextToUpdate=%u)",
(U32)dstCapacity, zc->blockState.matchState.dictLimit, zc->blockState.matchState.nextToUpdate);
if (srcSize < MIN_CBLOCK_SIZE+ZSTD_blockHeaderSize+1)
return 0; /* don't even attempt compression below a certain srcSize */
@ -1837,9 +1837,9 @@ static size_t ZSTD_compressBlock_internal(ZSTD_CCtx* zc,
if (current > zc->blockState.matchState.nextToUpdate + 384)
zc->blockState.matchState.nextToUpdate = current - MIN(192, (U32)(current - zc->blockState.matchState.nextToUpdate - 384));
}
/* find and store sequences */
{
U32 const extDict = zc->blockState.matchState.lowLimit < zc->blockState.matchState.dictLimit;
/* select and store sequences */
{ U32 const extDict = zc->blockState.matchState.lowLimit < zc->blockState.matchState.dictLimit;
size_t lastLLSize;
{ int i; for (i = 0; i < ZSTD_REP_NUM; ++i) zc->blockState.nextCBlock->rep[i] = zc->blockState.prevCBlock->rep[i]; }
if (zc->appliedParams.ldmParams.enableLdm) {
@ -1848,26 +1848,20 @@ static size_t ZSTD_compressBlock_internal(ZSTD_CCtx* zc,
U32 rep[ZSTD_REP_NUM], ZSTD_CCtx_params const* params,
void const* src, size_t srcSize);
ZSTD_ldmBlockCompressor const ldmBlockCompressor = extDict ? ZSTD_compressBlock_ldm_extDict : ZSTD_compressBlock_ldm;
lastLLSize = ldmBlockCompressor(&zc->ldmState, &zc->blockState.matchState, &zc->seqStore, zc->blockState.nextCBlock->rep, &zc->appliedParams, src, srcSize);
} else {
} else { /* not long range mode */
ZSTD_blockCompressor const blockCompressor = ZSTD_selectBlockCompressor(zc->appliedParams.cParams.strategy, extDict);
lastLLSize = blockCompressor(&zc->blockState.matchState, &zc->seqStore, zc->blockState.nextCBlock->rep, &zc->appliedParams.cParams, src, srcSize);
}
{
const BYTE* const anchor = (const BYTE*)src + srcSize - lastLLSize;
ZSTD_storeLastLiterals(&zc->seqStore, anchor, lastLLSize);
}
}
/* encode */
{
size_t const cSize = ZSTD_compressSequences(&zc->seqStore, &zc->blockState.prevCBlock->entropy, &zc->blockState.nextCBlock->entropy, &zc->appliedParams.cParams, dst, dstCapacity, srcSize, zc->entropyWorkspace);
if (ZSTD_isError(cSize) || cSize == 0)
return cSize;
{ const BYTE* const lastLiterals = (const BYTE*)src + srcSize - lastLLSize;
ZSTD_storeLastLiterals(&zc->seqStore, lastLiterals, lastLLSize);
} }
/* encode sequences and literals */
{ size_t const cSize = ZSTD_compressSequences(&zc->seqStore, &zc->blockState.prevCBlock->entropy, &zc->blockState.nextCBlock->entropy, &zc->appliedParams.cParams, dst, dstCapacity, srcSize, zc->entropyWorkspace);
if (ZSTD_isError(cSize) || cSize == 0) return cSize;
/* confirm repcodes and entropy tables */
{
ZSTD_compressedBlockState_t* const tmp = zc->blockState.prevCBlock;
{ ZSTD_compressedBlockState_t* const tmp = zc->blockState.prevCBlock;
zc->blockState.prevCBlock = zc->blockState.nextCBlock;
zc->blockState.nextCBlock = tmp;
}
@ -2030,6 +2024,19 @@ static size_t ZSTD_writeFrameHeader(void* dst, size_t dstCapacity,
return pos;
}
/* ZSTD_writeLastEmptyBlock() :
* output an empty Block with end-of-frame mark to complete a frame
* @return : size of data written into `dst` (== ZSTD_blockHeaderSize (defined in zstd_internal.h))
* or an error code if `dstCapcity` is too small (<ZSTD_blockHeaderSize)
*/
size_t ZSTD_writeLastEmptyBlock(void* dst, size_t dstCapacity)
{
if (dstCapacity < ZSTD_blockHeaderSize) return ERROR(dstSize_tooSmall);
U32 const cBlockHeader24 = 1 /*lastBlock*/ + (((U32)bt_raw)<<1); /* 0 size */
MEM_writeLE24(dst, cBlockHeader24);
return ZSTD_blockHeaderSize;
}
static void ZSTD_manageWindowContinuity(ZSTD_matchState_t* ms, void const* src, size_t srcSize)
{

View File

@ -481,4 +481,13 @@ size_t ZSTD_compress_advanced_internal(ZSTD_CCtx* cctx,
const void* dict,size_t dictSize,
ZSTD_CCtx_params params);
/* ZSTD_writeLastEmptyBlock() :
* output an empty Block with end-of-frame mark to complete a frame
* @return : size of data written into `dst` (== ZSTD_blockHeaderSize (defined in zstd_internal.h))
* or an error code if `dstCapcity` is too small (<ZSTD_blockHeaderSize)
*/
size_t ZSTD_writeLastEmptyBlock(void* dst, size_t dstCapacity);
#endif /* ZSTD_COMPRESS_H */

View File

@ -314,7 +314,6 @@ typedef struct {
size_t dstFlushed;
unsigned firstChunk;
unsigned lastChunk;
unsigned jobCompleted;
unsigned frameChecksumNeeded;
ZSTD_pthread_mutex_t* mtctx_mutex;
ZSTD_pthread_cond_t* mtctx_cond;
@ -345,7 +344,7 @@ void ZSTDMT_compressChunk(void* jobDescription)
goto _endJob;
}
ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex);
job->dstBuff = dstBuff;
job->dstBuff = dstBuff; /* this value can be read in ZSTDMT_flush, when it copies the whole job */
ZSTD_pthread_mutex_unlock(job->mtctx_mutex);
}
@ -369,8 +368,7 @@ void ZSTDMT_compressChunk(void* jobDescription)
if (ZSTD_isError(initError)) {
job->cSize = initError;
goto _endJob;
} }
}
} } }
if (!job->firstChunk) { /* flush and overwrite frame header when it's not first job */
size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, 0);
if (ZSTD_isError(hSize)) { job->cSize = hSize; /* save error code */ goto _endJob; }
@ -379,12 +377,9 @@ void ZSTDMT_compressChunk(void* jobDescription)
}
/* compress */
#if 0
job->cSize = (job->lastChunk) ?
ZSTD_compressEnd (cctx, dstBuff.start, dstBuff.size, src, job->srcSize) :
ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, job->srcSize);
#else
if (sizeof(size_t) > sizeof(int)) assert(job->srcSize < ((size_t)INT_MAX) * ZSTD_BLOCKSIZE_MAX); /* check overflow */
if (sizeof(size_t) > sizeof(int))
assert(job->srcSize < ((size_t)INT_MAX) * ZSTD_BLOCKSIZE_MAX); /* check overflow */
{ int const nbBlocks = (int)((job->srcSize + (ZSTD_BLOCKSIZE_MAX-1)) / ZSTD_BLOCKSIZE_MAX);
const BYTE* ip = (const BYTE*) src;
BYTE* const ostart = (BYTE*)dstBuff.start;
@ -404,7 +399,7 @@ void ZSTDMT_compressChunk(void* jobDescription)
job->consumed = ZSTD_BLOCKSIZE_MAX * blockNb;
DEBUGLOG(5, "ZSTDMT_compressChunk: compress new block : cSize==%u bytes (total: %u)",
(U32)cSize, (U32)job->cSize);
ZSTD_pthread_cond_signal(job->mtctx_cond);
ZSTD_pthread_cond_signal(job->mtctx_cond); /* warns some more data is ready to be flushed */
ZSTD_pthread_mutex_unlock(job->mtctx_mutex);
}
/* last block */
@ -416,23 +411,19 @@ void ZSTDMT_compressChunk(void* jobDescription)
ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize);
if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; }
/* stats */
ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex); /* note : it's a mtctx mutex */
ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex);
job->cSize += cSize;
job->consumed = job->srcSize;
ZSTD_pthread_mutex_unlock(job->mtctx_mutex);
}
}
#endif
} }
_endJob:
/* release */
/* release resources */
ZSTDMT_releaseCCtx(job->cctxPool, cctx);
ZSTDMT_releaseBuffer(job->bufPool, job->src);
job->src = g_nullBuffer; job->srcStart = NULL;
/* report */
ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex);
job->consumed = job->srcSize;
job->jobCompleted = 1;
ZSTD_pthread_cond_signal(job->mtctx_cond);
ZSTD_pthread_mutex_unlock(job->mtctx_mutex);
}
@ -577,18 +568,18 @@ static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx)
mtctx->allJobsCompleted = 1;
}
static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* zcs)
static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* mtctx)
{
DEBUGLOG(4, "ZSTDMT_waitForAllJobsCompleted");
while (zcs->doneJobID < zcs->nextJobID) {
unsigned const jobID = zcs->doneJobID & zcs->jobIDMask;
ZSTD_PTHREAD_MUTEX_LOCK(&zcs->mtctx_mutex);
while (zcs->jobs[jobID].jobCompleted==0) {
DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", zcs->doneJobID); /* we want to block when waiting for data to flush */
ZSTD_pthread_cond_wait(&zcs->mtctx_cond, &zcs->mtctx_mutex);
while (mtctx->doneJobID < mtctx->nextJobID) {
unsigned const jobID = mtctx->doneJobID & mtctx->jobIDMask;
ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex);
while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].srcSize) {
DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", mtctx->doneJobID); /* we want to block when waiting for data to flush */
ZSTD_pthread_cond_wait(&mtctx->mtctx_cond, &mtctx->mtctx_mutex);
}
ZSTD_pthread_mutex_unlock(&zcs->mtctx_mutex);
zcs->doneJobID++;
ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex);
mtctx->doneJobID++;
}
}
@ -769,7 +760,7 @@ static size_t ZSTDMT_compress_advanced_internal(
mtctx->jobs[u].src = g_nullBuffer;
mtctx->jobs[u].srcStart = srcStart + frameStartPos - dictSize;
mtctx->jobs[u].prefixSize = dictSize;
mtctx->jobs[u].srcSize = chunkSize;
mtctx->jobs[u].srcSize = chunkSize; assert(chunkSize > 0); /* avoid job.srcSize == 0 */
mtctx->jobs[u].consumed = 0;
mtctx->jobs[u].cSize = 0;
mtctx->jobs[u].cdict = (u==0) ? cdict : NULL;
@ -782,7 +773,6 @@ static size_t ZSTDMT_compress_advanced_internal(
mtctx->jobs[u].bufPool = mtctx->bufPool;
mtctx->jobs[u].firstChunk = (u==0);
mtctx->jobs[u].lastChunk = (u==nbChunks-1);
mtctx->jobs[u].jobCompleted = 0;
mtctx->jobs[u].mtctx_mutex = &mtctx->mtctx_mutex;
mtctx->jobs[u].mtctx_cond = &mtctx->mtctx_cond;
@ -805,7 +795,7 @@ static size_t ZSTDMT_compress_advanced_internal(
for (chunkID=0; chunkID<nbChunks; chunkID++) {
DEBUGLOG(5, "waiting for chunk %u ", chunkID);
ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex);
while (mtctx->jobs[chunkID].jobCompleted==0) {
while (mtctx->jobs[chunkID].consumed < mtctx->jobs[chunkID].srcSize) {
DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", chunkID);
ZSTD_pthread_cond_wait(&mtctx->mtctx_cond, &mtctx->mtctx_mutex);
}
@ -879,7 +869,7 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
/* ====================================== */
size_t ZSTDMT_initCStream_internal(
ZSTDMT_CCtx* zcs,
ZSTDMT_CCtx* mtctx,
const void* dict, size_t dictSize, ZSTD_dictMode_e dictMode,
const ZSTD_CDict* cdict, ZSTD_CCtx_params params,
unsigned long long pledgedSrcSize)
@ -888,8 +878,8 @@ size_t ZSTDMT_initCStream_internal(
/* params are supposed to be fully validated at this point */
assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams)));
assert(!((dict) && (cdict))); /* either dict or cdict, not both */
assert(zcs->cctxPool->totalCCtx == params.nbThreads);
zcs->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN); /* do not trigger multi-threading when srcSize is too small */
assert(mtctx->cctxPool->totalCCtx == params.nbThreads);
mtctx->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN); /* do not trigger multi-threading when srcSize is too small */
if (params.jobSize == 0) {
if (params.cParams.windowLog >= 29)
params.jobSize = ZSTDMT_JOBSIZE_MAX;
@ -898,56 +888,56 @@ size_t ZSTDMT_initCStream_internal(
}
if (params.jobSize > ZSTDMT_JOBSIZE_MAX) params.jobSize = ZSTDMT_JOBSIZE_MAX;
if (zcs->singleBlockingThread) {
if (mtctx->singleBlockingThread) {
ZSTD_CCtx_params const singleThreadParams = ZSTDMT_initJobCCtxParams(params);
DEBUGLOG(4, "ZSTDMT_initCStream_internal: switch to single blocking thread mode");
assert(singleThreadParams.nbThreads == 0);
return ZSTD_initCStream_internal(zcs->cctxPool->cctx[0],
return ZSTD_initCStream_internal(mtctx->cctxPool->cctx[0],
dict, dictSize, cdict,
singleThreadParams, pledgedSrcSize);
}
DEBUGLOG(4, "ZSTDMT_initCStream_internal: %u threads", params.nbThreads);
if (zcs->allJobsCompleted == 0) { /* previous compression not correctly finished */
ZSTDMT_waitForAllJobsCompleted(zcs);
ZSTDMT_releaseAllJobResources(zcs);
zcs->allJobsCompleted = 1;
if (mtctx->allJobsCompleted == 0) { /* previous compression not correctly finished */
ZSTDMT_waitForAllJobsCompleted(mtctx);
ZSTDMT_releaseAllJobResources(mtctx);
mtctx->allJobsCompleted = 1;
}
zcs->params = params;
zcs->frameContentSize = pledgedSrcSize;
mtctx->params = params;
mtctx->frameContentSize = pledgedSrcSize;
if (dict) {
ZSTD_freeCDict(zcs->cdictLocal);
zcs->cdictLocal = ZSTD_createCDict_advanced(dict, dictSize,
ZSTD_freeCDict(mtctx->cdictLocal);
mtctx->cdictLocal = ZSTD_createCDict_advanced(dict, dictSize,
ZSTD_dlm_byCopy, dictMode, /* note : a loadPrefix becomes an internal CDict */
params.cParams, zcs->cMem);
zcs->cdict = zcs->cdictLocal;
if (zcs->cdictLocal == NULL) return ERROR(memory_allocation);
params.cParams, mtctx->cMem);
mtctx->cdict = mtctx->cdictLocal;
if (mtctx->cdictLocal == NULL) return ERROR(memory_allocation);
} else {
ZSTD_freeCDict(zcs->cdictLocal);
zcs->cdictLocal = NULL;
zcs->cdict = cdict;
ZSTD_freeCDict(mtctx->cdictLocal);
mtctx->cdictLocal = NULL;
mtctx->cdict = cdict;
}
assert(params.overlapSizeLog <= 9);
zcs->targetPrefixSize = (params.overlapSizeLog==0) ? 0 : (size_t)1 << (params.cParams.windowLog - (9 - params.overlapSizeLog));
DEBUGLOG(4, "overlapLog=%u => %u KB", params.overlapSizeLog, (U32)(zcs->targetPrefixSize>>10));
zcs->targetSectionSize = params.jobSize;
if (zcs->targetSectionSize < ZSTDMT_JOBSIZE_MIN) zcs->targetSectionSize = ZSTDMT_JOBSIZE_MIN;
if (zcs->targetSectionSize < zcs->targetPrefixSize) zcs->targetSectionSize = zcs->targetPrefixSize; /* job size must be >= overlap size */
DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(zcs->targetSectionSize>>10), params.jobSize);
zcs->inBuffSize = zcs->targetPrefixSize + zcs->targetSectionSize;
DEBUGLOG(4, "inBuff Size : %u KB", (U32)(zcs->inBuffSize>>10));
ZSTDMT_setBufferSize(zcs->bufPool, MAX(zcs->inBuffSize, ZSTD_compressBound(zcs->targetSectionSize)) );
zcs->inBuff.buffer = g_nullBuffer;
zcs->prefixSize = 0;
zcs->doneJobID = 0;
zcs->nextJobID = 0;
zcs->frameEnded = 0;
zcs->allJobsCompleted = 0;
zcs->consumed = 0;
zcs->produced = 0;
if (params.fParams.checksumFlag) XXH64_reset(&zcs->xxhState, 0);
mtctx->targetPrefixSize = (params.overlapSizeLog==0) ? 0 : (size_t)1 << (params.cParams.windowLog - (9 - params.overlapSizeLog));
DEBUGLOG(4, "overlapLog=%u => %u KB", params.overlapSizeLog, (U32)(mtctx->targetPrefixSize>>10));
mtctx->targetSectionSize = params.jobSize;
if (mtctx->targetSectionSize < ZSTDMT_JOBSIZE_MIN) mtctx->targetSectionSize = ZSTDMT_JOBSIZE_MIN;
if (mtctx->targetSectionSize < mtctx->targetPrefixSize) mtctx->targetSectionSize = mtctx->targetPrefixSize; /* job size must be >= overlap size */
DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(mtctx->targetSectionSize>>10), params.jobSize);
mtctx->inBuffSize = mtctx->targetPrefixSize + mtctx->targetSectionSize;
DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->inBuffSize>>10));
ZSTDMT_setBufferSize(mtctx->bufPool, MAX(mtctx->inBuffSize, ZSTD_compressBound(mtctx->targetSectionSize)) );
mtctx->inBuff.buffer = g_nullBuffer;
mtctx->prefixSize = 0;
mtctx->doneJobID = 0;
mtctx->nextJobID = 0;
mtctx->frameEnded = 0;
mtctx->allJobsCompleted = 0;
mtctx->consumed = 0;
mtctx->produced = 0;
if (params.fParams.checksumFlag) XXH64_reset(&mtctx->xxhState, 0);
return 0;
}
@ -982,103 +972,134 @@ size_t ZSTDMT_initCStream_usingCDict(ZSTDMT_CCtx* mtctx,
* pledgedSrcSize can be zero == unknown (for the time being)
* prefer using ZSTD_CONTENTSIZE_UNKNOWN,
* as `0` might mean "empty" in the future */
size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize)
size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* mtctx, unsigned long long pledgedSrcSize)
{
if (!pledgedSrcSize) pledgedSrcSize = ZSTD_CONTENTSIZE_UNKNOWN;
if (zcs->params.nbThreads==1)
return ZSTD_resetCStream(zcs->cctxPool->cctx[0], pledgedSrcSize);
return ZSTDMT_initCStream_internal(zcs, NULL, 0, ZSTD_dm_auto, 0, zcs->params,
if (mtctx->params.nbThreads==1)
return ZSTD_resetCStream(mtctx->cctxPool->cctx[0], pledgedSrcSize);
return ZSTDMT_initCStream_internal(mtctx, NULL, 0, ZSTD_dm_auto, 0, mtctx->params,
pledgedSrcSize);
}
size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) {
size_t ZSTDMT_initCStream(ZSTDMT_CCtx* mtctx, int compressionLevel) {
ZSTD_parameters const params = ZSTD_getParams(compressionLevel, ZSTD_CONTENTSIZE_UNKNOWN, 0);
ZSTD_CCtx_params cctxParams = zcs->params; /* retrieve sticky params */
ZSTD_CCtx_params cctxParams = mtctx->params; /* retrieve sticky params */
DEBUGLOG(4, "ZSTDMT_initCStream (cLevel=%i)", compressionLevel);
cctxParams.cParams = params.cParams;
cctxParams.fParams = params.fParams;
return ZSTDMT_initCStream_internal(zcs, NULL, 0, ZSTD_dm_auto, NULL, cctxParams, ZSTD_CONTENTSIZE_UNKNOWN);
return ZSTDMT_initCStream_internal(mtctx, NULL, 0, ZSTD_dm_auto, NULL, cctxParams, ZSTD_CONTENTSIZE_UNKNOWN);
}
static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, ZSTD_EndDirective endOp)
/* ZSTDMT_writeLastEmptyBlock()
* Write a single empty block with an end-of-frame
* to finish a frame.
* Completed synchronously.
* @return : 0, or an error code (can fail due to memory allocation)
*/
static size_t ZSTDMT_writeLastEmptyBlock(ZSTDMT_jobDescription* job)
{
unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
assert(job->srcSize == 0);
assert(job->lastChunk == 1);
assert(job->firstChunk == 0); /* first chunk needs to create frame header too */
assert(job->dstBuff.start == NULL); /* invoked from streaming variant only */
{ buffer_t const dstBuff = ZSTDMT_getBuffer(job->bufPool);
if (dstBuff.start==NULL) return ERROR(memory_allocation);
job->dstBuff = dstBuff; /* will be released by ZSTDMT_flushProduced() */
assert(dstBuff.size >= ZSTD_blockHeaderSize);
job->cSize = ZSTD_writeLastEmptyBlock(dstBuff.start, dstBuff.size);
assert(!ZSTD_isError(job->cSize));
assert(job->consumed == 0);
}
return 0;
}
static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZSTD_EndDirective endOp)
{
unsigned const jobID = mtctx->nextJobID & mtctx->jobIDMask;
int const endFrame = (endOp == ZSTD_e_end);
if (zcs->nextJobID > zcs->doneJobID + zcs->jobIDMask) {
if (mtctx->nextJobID > mtctx->doneJobID + mtctx->jobIDMask) {
DEBUGLOG(5, "ZSTDMT_createCompressionJob: will not create new job : table is full");
assert((zcs->nextJobID & zcs->jobIDMask) == (zcs->doneJobID & zcs->jobIDMask));
assert((mtctx->nextJobID & mtctx->jobIDMask) == (mtctx->doneJobID & mtctx->jobIDMask));
return 0;
}
if (!zcs->jobReady) {
if (!mtctx->jobReady) {
DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ",
zcs->nextJobID, (U32)srcSize, (U32)zcs->prefixSize);
zcs->jobs[jobID].src = zcs->inBuff.buffer;
zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
zcs->jobs[jobID].srcSize = srcSize;
zcs->jobs[jobID].consumed = 0;
zcs->jobs[jobID].cSize = 0;
zcs->jobs[jobID].prefixSize = zcs->prefixSize;
assert(zcs->inBuff.filled >= srcSize + zcs->prefixSize);
zcs->jobs[jobID].params = zcs->params;
mtctx->nextJobID, (U32)srcSize, (U32)mtctx->prefixSize);
mtctx->jobs[jobID].src = mtctx->inBuff.buffer;
mtctx->jobs[jobID].srcStart = mtctx->inBuff.buffer.start;
mtctx->jobs[jobID].srcSize = srcSize;
mtctx->jobs[jobID].consumed = 0;
mtctx->jobs[jobID].cSize = 0;
mtctx->jobs[jobID].prefixSize = mtctx->prefixSize;
assert(mtctx->inBuff.filled >= srcSize + mtctx->prefixSize);
mtctx->jobs[jobID].params = mtctx->params;
/* do not calculate checksum within sections, but write it in header for first section */
if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0;
zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL;
zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize;
zcs->jobs[jobID].dstBuff = g_nullBuffer;
zcs->jobs[jobID].cctxPool = zcs->cctxPool;
zcs->jobs[jobID].bufPool = zcs->bufPool;
zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0);
zcs->jobs[jobID].lastChunk = endFrame;
zcs->jobs[jobID].jobCompleted = 0;
zcs->jobs[jobID].frameChecksumNeeded = endFrame && (zcs->nextJobID>0) && zcs->params.fParams.checksumFlag;
zcs->jobs[jobID].dstFlushed = 0;
zcs->jobs[jobID].mtctx_mutex = &zcs->mtctx_mutex;
zcs->jobs[jobID].mtctx_cond = &zcs->mtctx_cond;
if (mtctx->nextJobID) mtctx->jobs[jobID].params.fParams.checksumFlag = 0;
mtctx->jobs[jobID].cdict = mtctx->nextJobID==0 ? mtctx->cdict : NULL;
mtctx->jobs[jobID].fullFrameSize = mtctx->frameContentSize;
mtctx->jobs[jobID].dstBuff = g_nullBuffer;
mtctx->jobs[jobID].cctxPool = mtctx->cctxPool;
mtctx->jobs[jobID].bufPool = mtctx->bufPool;
mtctx->jobs[jobID].firstChunk = (mtctx->nextJobID==0);
mtctx->jobs[jobID].lastChunk = endFrame;
mtctx->jobs[jobID].frameChecksumNeeded = endFrame && (mtctx->nextJobID>0) && mtctx->params.fParams.checksumFlag;
mtctx->jobs[jobID].dstFlushed = 0;
mtctx->jobs[jobID].mtctx_mutex = &mtctx->mtctx_mutex;
mtctx->jobs[jobID].mtctx_cond = &mtctx->mtctx_cond;
if (zcs->params.fParams.checksumFlag)
XXH64_update(&zcs->xxhState, (const char*)zcs->inBuff.buffer.start + zcs->prefixSize, srcSize);
if (mtctx->params.fParams.checksumFlag)
XXH64_update(&mtctx->xxhState, (const char*)mtctx->inBuff.buffer.start + mtctx->prefixSize, srcSize);
/* get a new buffer for next input */
if (!endFrame) {
size_t const newPrefixSize = MIN(srcSize + zcs->prefixSize, zcs->targetPrefixSize);
zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->bufPool);
if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */
zcs->jobs[jobID].jobCompleted = 1;
zcs->nextJobID++;
ZSTDMT_waitForAllJobsCompleted(zcs);
ZSTDMT_releaseAllJobResources(zcs);
size_t const newPrefixSize = MIN(srcSize + mtctx->prefixSize, mtctx->targetPrefixSize);
mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool);
if (mtctx->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */
mtctx->jobs[jobID].srcSize = mtctx->jobs[jobID].consumed = 0;
mtctx->nextJobID++;
ZSTDMT_waitForAllJobsCompleted(mtctx);
ZSTDMT_releaseAllJobResources(mtctx);
return ERROR(memory_allocation);
}
zcs->inBuff.filled -= srcSize + zcs->prefixSize - newPrefixSize;
memmove(zcs->inBuff.buffer.start, /* copy end of current job into next job, as "prefix" */
(const char*)zcs->jobs[jobID].srcStart + zcs->prefixSize + srcSize - newPrefixSize,
zcs->inBuff.filled);
zcs->prefixSize = newPrefixSize;
mtctx->inBuff.filled -= srcSize + mtctx->prefixSize - newPrefixSize;
memmove(mtctx->inBuff.buffer.start, /* copy end of current job into next job, as "prefix" */
(const char*)mtctx->jobs[jobID].srcStart + mtctx->prefixSize + srcSize - newPrefixSize,
mtctx->inBuff.filled);
mtctx->prefixSize = newPrefixSize;
} else { /* endFrame==1 => no need for another input buffer */
zcs->inBuff.buffer = g_nullBuffer;
zcs->inBuff.filled = 0;
zcs->prefixSize = 0;
zcs->frameEnded = endFrame;
if (zcs->nextJobID == 0) {
/* single chunk exception : checksum is calculated directly within worker thread */
zcs->params.fParams.checksumFlag = 0;
} } }
mtctx->inBuff.buffer = g_nullBuffer;
mtctx->inBuff.filled = 0;
mtctx->prefixSize = 0;
mtctx->frameEnded = endFrame;
if (mtctx->nextJobID == 0) {
/* single chunk exception : checksum is already calculated directly within worker thread */
mtctx->params.fParams.checksumFlag = 0;
} }
DEBUGLOG(2, "ZSTDMT_createCompressionJob: posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)",
zcs->nextJobID,
(U32)zcs->jobs[jobID].srcSize,
zcs->jobs[jobID].lastChunk,
zcs->doneJobID,
zcs->doneJobID & zcs->jobIDMask);
if (POOL_tryAdd(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID])) {
zcs->nextJobID++;
zcs->jobReady = 0;
if ( (srcSize == 0)
&& (mtctx->nextJobID>0)/*single chunk must also write frame header*/ ) {
assert(endOp == ZSTD_e_end); /* only possible case : need to end the frame with an empty last block */
CHECK_F( ZSTDMT_writeLastEmptyBlock(mtctx->jobs + jobID) );
mtctx->nextJobID++;
return 0;
}
}
DEBUGLOG(5, "ZSTDMT_createCompressionJob: posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)",
mtctx->nextJobID,
(U32)mtctx->jobs[jobID].srcSize,
mtctx->jobs[jobID].lastChunk,
mtctx->doneJobID,
mtctx->doneJobID & mtctx->jobIDMask);
if (POOL_tryAdd(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[jobID])) {
mtctx->nextJobID++;
mtctx->jobReady = 0;
} else {
DEBUGLOG(5, "ZSTDMT_createCompressionJob: no worker available for job %u", zcs->nextJobID);
zcs->jobReady = 1;
DEBUGLOG(5, "ZSTDMT_createCompressionJob: no worker available for job %u", mtctx->nextJobID);
mtctx->jobReady = 1;
}
return 0;
}
@ -1088,73 +1109,78 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, ZSTD
* `output` : `pos` will be updated with amount of data flushed .
* `blockToFlush` : if >0, the function will block and wait if there is no data available to flush .
* @return : amount of data remaining within internal buffer, 0 if no more, 1 if unknown but > 0, or an error code */
static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush, ZSTD_EndDirective end)
static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, unsigned blockToFlush, ZSTD_EndDirective end)
{
unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
unsigned const wJobID = mtctx->doneJobID & mtctx->jobIDMask;
DEBUGLOG(5, "ZSTDMT_flushProduced (blocking:%u)", blockToFlush);
assert(output->size >= output->pos);
ZSTD_PTHREAD_MUTEX_LOCK(&zcs->mtctx_mutex);
if (blockToFlush && (zcs->doneJobID < zcs->nextJobID)) {
while (zcs->jobs[wJobID].dstFlushed == zcs->jobs[wJobID].cSize) {
if (zcs->jobs[wJobID].jobCompleted==1) break;
ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex);
if (blockToFlush && (mtctx->doneJobID < mtctx->nextJobID)) {
assert(mtctx->jobs[wJobID].dstFlushed <= mtctx->jobs[wJobID].cSize);
while (mtctx->jobs[wJobID].dstFlushed == mtctx->jobs[wJobID].cSize) {
if (mtctx->jobs[wJobID].consumed == mtctx->jobs[wJobID].srcSize) {
DEBUGLOG(5, "job %u is completely consumed (%u == %u) => don't wait for cond",
mtctx->doneJobID, (U32)mtctx->jobs[wJobID].consumed, (U32)mtctx->jobs[wJobID].srcSize);
break;
}
DEBUGLOG(5, "waiting for something to flush from job %u (currently flushed: %u bytes)",
zcs->doneJobID, (U32)zcs->jobs[wJobID].dstFlushed);
ZSTD_pthread_cond_wait(&zcs->mtctx_cond, &zcs->mtctx_mutex); /* block when nothing available to flush but more to come */
mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed);
ZSTD_pthread_cond_wait(&mtctx->mtctx_cond, &mtctx->mtctx_mutex); /* block when nothing available to flush but more to come */
} }
/* some output is available to be flushed */
{ ZSTDMT_jobDescription job = zcs->jobs[wJobID];
ZSTD_pthread_mutex_unlock(&zcs->mtctx_mutex);
{ ZSTDMT_jobDescription job = mtctx->jobs[wJobID];
ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex);
if (ZSTD_isError(job.cSize)) {
DEBUGLOG(5, "ZSTDMT_flushProduced: job %u : compression error detected : %s",
zcs->doneJobID, ZSTD_getErrorName(job.cSize));
ZSTDMT_waitForAllJobsCompleted(zcs);
ZSTDMT_releaseAllJobResources(zcs);
mtctx->doneJobID, ZSTD_getErrorName(job.cSize));
ZSTDMT_waitForAllJobsCompleted(mtctx);
ZSTDMT_releaseAllJobResources(mtctx);
return job.cSize;
}
/* add frame checksum if necessary (can only happen once) */
if ( job.jobCompleted
assert(job.consumed <= job.srcSize);
if ( (job.consumed == job.srcSize)
&& job.frameChecksumNeeded ) {
U32 const checksum = (U32)XXH64_digest(&zcs->xxhState);
U32 const checksum = (U32)XXH64_digest(&mtctx->xxhState);
DEBUGLOG(4, "ZSTDMT_flushProduced: writing checksum : %08X \n", checksum);
MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum);
job.cSize += 4;
zcs->jobs[wJobID].cSize += 4;
zcs->jobs[wJobID].frameChecksumNeeded = 0;
mtctx->jobs[wJobID].cSize += 4;
mtctx->jobs[wJobID].frameChecksumNeeded = 0;
}
assert(job.cSize >= job.dstFlushed);
if (job.dstBuff.start != NULL) { /* one buffer present : some job is ongoing */
if (job.dstBuff.start != NULL) { /* dst buffer present : some work is ongoing or completed */
size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%.1f%%)",
(U32)toWrite, zcs->doneJobID, (double)job.consumed / job.srcSize * 100);
(U32)toWrite, mtctx->doneJobID, (double)job.consumed / (job.srcSize + !job.srcSize /*avoid div0*/) * 100);
memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
output->pos += toWrite;
job.dstFlushed += toWrite;
if ( job.jobCompleted
if ( (job.consumed == job.srcSize)
&& (job.dstFlushed == job.cSize) ) { /* output buffer fully flushed => move to next one */
DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one",
zcs->doneJobID, (U32)job.dstFlushed);
ZSTDMT_releaseBuffer(zcs->bufPool, job.dstBuff);
zcs->jobs[wJobID].dstBuff = g_nullBuffer;
zcs->jobs[wJobID].jobCompleted = 0;
zcs->consumed += job.srcSize;
zcs->produced += job.cSize;
zcs->doneJobID++;
mtctx->doneJobID, (U32)job.dstFlushed);
ZSTDMT_releaseBuffer(mtctx->bufPool, job.dstBuff);
mtctx->jobs[wJobID].dstBuff = g_nullBuffer;
mtctx->consumed += job.srcSize;
mtctx->produced += job.cSize;
mtctx->doneJobID++;
} else {
zcs->jobs[wJobID].dstFlushed = job.dstFlushed; /* remember how much was flushed for next attempt */
mtctx->jobs[wJobID].dstFlushed = job.dstFlushed; /* remember how much was flushed for next attempt */
} }
/* return value : how many bytes left in buffer ; fake it to 1 when unknown but >0 */
if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed);
if (job.srcSize > job.consumed) return 1; /* current job not completely compressed */
}
if (zcs->doneJobID < zcs->nextJobID) return 1; /* some more jobs to flush */
if (zcs->jobReady) return 1; /* one job is ready and queued! */
if (zcs->inBuff.filled > 0) return 1; /* input not empty */
zcs->allJobsCompleted = zcs->frameEnded; /* last frame entirely flushed */
if (end == ZSTD_e_end) return !zcs->frameEnded; /* for ZSTD_e_end, question becomes : is frame completed ? It may need a last null-block */
if (mtctx->doneJobID < mtctx->nextJobID) return 1; /* some more jobs to flush */
if (mtctx->jobReady) return 1; /* one job is ready and queued! */
if (mtctx->inBuff.filled > 0) return 1; /* input not empty */
mtctx->allJobsCompleted = mtctx->frameEnded; /* last frame entirely flushed */
if (end == ZSTD_e_end) return !mtctx->frameEnded; /* for ZSTD_e_end, question becomes : is frame completed ? It may need a last null-block */
return 0; /* everything flushed */
}
@ -1241,12 +1267,12 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
}
size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
size_t ZSTDMT_compressStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
{
CHECK_F( ZSTDMT_compressStream_generic(zcs, output, input, ZSTD_e_continue) );
CHECK_F( ZSTDMT_compressStream_generic(mtctx, output, input, ZSTD_e_continue) );
/* recommended next input size : fill current input buffer */
return zcs->inBuffSize - zcs->inBuff.filled; /* note : could be zero when input buffer is fully filled and no more availability to create new job */
return mtctx->inBuffSize - mtctx->inBuff.filled; /* note : could be zero when input buffer is fully filled and no more availability to create new job */
}