diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 9fea4969..7b37c5b3 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -142,6 +142,10 @@ static size_t ZSTDMT_sizeof_bufferPool(ZSTDMT_bufferPool* bufPool) return poolSize + totalBufferSize; } +/* ZSTDMT_setBufferSize() : + * all future buffers provided by this buffer pool will have _at least_ this size + * note : it's better for all buffers to have same size, + * as they become freely interchangeable, reducing malloc/free usages and memory fragmentation */ static void ZSTDMT_setBufferSize(ZSTDMT_bufferPool* const bufPool, size_t const bSize) { ZSTD_pthread_mutex_lock(&bufPool->poolMutex); @@ -992,12 +996,11 @@ size_t ZSTDMT_initCStream(ZSTDMT_CCtx* mtctx, int compressionLevel) { /* ZSTDMT_writeLastEmptyBlock() - * Write a single empty block with an end-of-frame - * to finish a frame. - * Completed synchronously. - * @return : 0, or an error code (can fail due to memory allocation) + * Write a single empty block with an end-of-frame to finish a frame. + * Job must be created from streaming variant. + * This function is always successfull if expected conditions are fulfilled. */ -static size_t ZSTDMT_writeLastEmptyBlock(ZSTDMT_jobDescription* job) +static void ZSTDMT_writeLastEmptyBlock(ZSTDMT_jobDescription* job) { assert(job->lastChunk == 1); assert(job->srcSize == 0); /* last chunk is empty -> will be simplified into a last empty block */ @@ -1006,15 +1009,14 @@ static size_t ZSTDMT_writeLastEmptyBlock(ZSTDMT_jobDescription* job) * It summons a dstBuffer itself, compresses into it, then releases srcBuffer, and gives result to mtctx. * When done, srcBuffer is empty, while dstBuffer is filled, and will be released by mtctx. * This shortcut will simply switch srcBuffer for dstBuffer, providing same outcome as a normal job */ - assert(job->dstBuff.start == NULL); /* invoked from streaming variant only (otherwise, dstBuff might be user's output) */ - assert(job->srcBuff.start != NULL); /* invoked from streaming variant only (otherwise, srcBuff might be user's input) */ - assert(job->srcBuff.size >= ZSTD_blockHeaderSize); + assert(job->dstBuff.start == NULL); /* invoked from streaming variant only (otherwise, dstBuff might be user's output) */ + assert(job->srcBuff.start != NULL); /* invoked from streaming variant only (otherwise, srcBuff might be user's input) */ + assert(job->srcBuff.size >= ZSTD_blockHeaderSize); /* no buffer should ever be that small */ job->dstBuff = job->srcBuff; job->srcBuff = g_nullBuffer; job->cSize = ZSTD_writeLastEmptyBlock(job->dstBuff.start, job->dstBuff.size); assert(!ZSTD_isError(job->cSize)); assert(job->consumed == 0); - return 0; } static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZSTD_EndDirective endOp) @@ -1031,6 +1033,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS if (!mtctx->jobReady) { DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ", mtctx->nextJobID, (U32)srcSize, (U32)mtctx->prefixSize); + assert(mtctx->jobs[jobID].srcBuff.start == NULL); /* no buffer left : supposed already released */ mtctx->jobs[jobID].srcBuff = mtctx->inBuff.buffer; mtctx->jobs[jobID].prefixStart = mtctx->inBuff.buffer.start; mtctx->jobs[jobID].prefixSize = mtctx->prefixSize; @@ -1085,7 +1088,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS if ( (srcSize == 0) && (mtctx->nextJobID>0)/*single chunk must also write frame header*/ ) { assert(endOp == ZSTD_e_end); /* only possible case : need to end the frame with an empty last block */ - CHECK_F( ZSTDMT_writeLastEmptyBlock(mtctx->jobs + jobID) ); + ZSTDMT_writeLastEmptyBlock(mtctx->jobs + jobID); mtctx->nextJobID++; return 0; } @@ -1162,10 +1165,11 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u output->pos += toWrite; job.dstFlushed += toWrite; - if ( (job.consumed == job.srcSize) - && (job.dstFlushed == job.cSize) ) { /* output buffer fully flushed => move to next one */ + if ( (job.consumed == job.srcSize) /* job completed */ + && (job.dstFlushed == job.cSize) ) { /* output buffer fully flushed => free this job position */ DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one", mtctx->doneJobID, (U32)job.dstFlushed); + assert(job.srcBuff.start == NULL); /* srcBuff supposed already released */ ZSTDMT_releaseBuffer(mtctx->bufPool, job.dstBuff); mtctx->jobs[wJobID].dstBuff = g_nullBuffer; mtctx->consumed += job.srcSize;