diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 9e7754b8..0b91ad4e 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -29,6 +29,8 @@ #include "threading.h" /* mutex */ #include "zstd_internal.h" /* MIN, ERROR, ZSTD_*, ZSTD_highbit32 */ #include "zstdmt_compress.h" +#define XXH_STATIC_LINKING_ONLY /* XXH64_state_t */ +#include "xxhash.h" /* ====== Debug ====== */ @@ -217,6 +219,7 @@ typedef struct { unsigned firstChunk; unsigned lastChunk; unsigned jobCompleted; + unsigned jobScanned; pthread_mutex_t* jobCompleted_mutex; pthread_cond_t* jobCompleted_cond; ZSTD_parameters params; @@ -254,6 +257,7 @@ void ZSTDMT_compressChunk(void* jobDescription) _endJob: PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); job->jobCompleted = 1; + job->jobScanned = 0; pthread_cond_signal(job->jobCompleted_cond); pthread_mutex_unlock(job->jobCompleted_mutex); } @@ -273,6 +277,7 @@ struct ZSTDMT_CCtx_s { size_t inBuffSize; inBuff_t inBuff; ZSTD_parameters params; + XXH64_state_t xxhState; unsigned nbThreads; unsigned jobIDMask; unsigned doneJobID; @@ -474,7 +479,6 @@ static size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs, ZSTDMT_releaseAllJobResources(zcs); zcs->allJobsCompleted = 1; } - params.fParams.checksumFlag = 0; /* current limitation : no checksum (to be lifted in a later version) */ zcs->params = params; if (updateDict) { ZSTD_freeCDict(zcs->cdict); zcs->cdict = NULL; @@ -492,6 +496,7 @@ static size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs, zcs->nextJobID = 0; zcs->frameEnded = 0; zcs->allJobsCompleted = 0; + if (params.fParams.checksumFlag) XXH64_reset(&zcs->xxhState, 0); return 0; } @@ -518,7 +523,7 @@ size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) { /* ZSTDMT_flushNextJob() : * output : will be updated with amount of data flushed . - * blockToFlush : the function will block and wait if there is no data available to flush . + * blockToFlush : if >0, the function will block and wait if there is no data available to flush . * @return : amount of data remaining within internal buffer, 1 if unknown but > 0, 0 if no more */ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush) { @@ -526,28 +531,43 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi if (zcs->doneJobID == zcs->nextJobID) return 0; /* all flushed ! */ PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex); while (zcs->jobs[wJobID].jobCompleted==0) { - DEBUGLOG(5, "waiting for jobCompleted signal from job %u", zcs->doneJobID); /* block when nothing available to flush */ - if (!blockToFlush) { pthread_mutex_unlock(&zcs->jobCompleted_mutex); return 0; } /* nothing ready to be flushed => skip */ - pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); + DEBUGLOG(5, "waiting for jobCompleted signal from job %u", zcs->doneJobID); + if (!blockToFlush) { pthread_mutex_unlock(&zcs->jobCompleted_mutex); return 0; } /* nothing ready to be flushed => skip */ + pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); /* block when nothing available to flush */ } pthread_mutex_unlock(&zcs->jobCompleted_mutex); /* compression job completed : output can be flushed */ { ZSTDMT_jobDescription job = zcs->jobs[wJobID]; - size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos); - DEBUGLOG(4, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID); - ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); - zcs->jobs[wJobID].cctx = NULL; - ZSTDMT_releaseBuffer(zcs->buffPool, job.src); - zcs->jobs[wJobID].srcStart = NULL; - zcs->jobs[wJobID].src = g_nullBuffer; - if (ZSTD_isError(job.cSize)) { - ZSTDMT_waitForAllJobsCompleted(zcs); - ZSTDMT_releaseAllJobResources(zcs); - return job.cSize; + if (!job.jobScanned) { + if (ZSTD_isError(job.cSize)) { + DEBUGLOG(5, "compression error detected "); + ZSTDMT_waitForAllJobsCompleted(zcs); + ZSTDMT_releaseAllJobResources(zcs); + return job.cSize; + } + ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); + zcs->jobs[wJobID].cctx = NULL; + DEBUGLOG(5, "zcs->params.fParams.checksumFlag : %u ", zcs->params.fParams.checksumFlag); + if (zcs->params.fParams.checksumFlag) { + XXH64_update(&zcs->xxhState, job.srcStart, job.srcSize); + if (zcs->frameEnded && (zcs->doneJobID+1 == zcs->nextJobID)) { /* write checksum at end of last section */ + U32 const checksum = (U32)XXH64_digest(&zcs->xxhState); + DEBUGLOG(4, "writing checksum : %08X \n", checksum); + MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum); + job.cSize += 4; + zcs->jobs[wJobID].cSize += 4; + } } + ZSTDMT_releaseBuffer(zcs->buffPool, job.src); + zcs->jobs[wJobID].srcStart = NULL; + zcs->jobs[wJobID].src = g_nullBuffer; + zcs->jobs[wJobID].jobScanned = 1; + } + { size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos); + DEBUGLOG(4, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID); + memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite); + output->pos += toWrite; + job.dstFlushed += toWrite; } - memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite); - output->pos += toWrite; - job.dstFlushed += toWrite; if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => move to next one */ ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff); zcs->jobs[wJobID].dstBuff = g_nullBuffer; @@ -583,7 +603,7 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool); unsigned const jobID = zcs->nextJobID & zcs->jobIDMask; - if ((cctx==NULL) || (dstBuffer.start==NULL)) { + if ((cctx==NULL) || (dstBuffer.start==NULL)) { /* cannot get resources for next job */ zcs->jobs[jobID].jobCompleted = 1; zcs->nextJobID++; ZSTDMT_waitForAllJobsCompleted(zcs); @@ -591,11 +611,12 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu return ERROR(memory_allocation); } - DEBUGLOG(1, "preparing job %u to compress %u bytes \n", (U32)zcs->nextJobID, (U32)zcs->targetSectionSize); + DEBUGLOG(4, "preparing job %u to compress %u bytes \n", (U32)zcs->nextJobID, (U32)zcs->targetSectionSize); zcs->jobs[jobID].src = zcs->inBuff.buffer; zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start; zcs->jobs[jobID].srcSize = zcs->targetSectionSize; zcs->jobs[jobID].params = zcs->params; + if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0; /* do not calculate checksum within sections, just keep it in header for first section */ zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL; zcs->jobs[jobID].dict = NULL; zcs->jobs[jobID].dictSize = 0; @@ -626,44 +647,11 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu zcs->nextJobID++; } - /* check if there is any data available to flush */ + /* check for data to flush */ ZSTDMT_flushNextJob(zcs, output, (zcs->inBuff.filled == zcs->inBuffSize)); /* we'll block if it wasn't possible to create new job due to saturation */ -#if 0 - { unsigned const jobID = zcs->doneJobID & zcs->jobIDMask; - unsigned jobCompleted; - pthread_mutex_lock(&zcs->jobCompleted_mutex); - while (zcs->jobs[jobID].jobCompleted == 0 && zcs->inBuff.filled == zcs->inBuffSize) { - /* when no new job could be started, block until there is something to flush, ensuring forward progress */ - pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); - } - jobCompleted = zcs->jobs[jobID].jobCompleted; - pthread_mutex_unlock(&zcs->jobCompleted_mutex); - if (jobCompleted) { - ZSTDMT_jobDescription const job = zcs->jobs[jobID]; - size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos); - DEBUGLOG(1, "flush %u bytes from job %u ", (U32)toWrite, zcs->doneJobID); - ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); - zcs->jobs[jobID].cctx = NULL; - ZSTDMT_releaseBuffer(zcs->buffPool, job.src); - zcs->jobs[jobID].srcStart = NULL; zcs->jobs[jobID].src = g_nullBuffer; - if (ZSTD_isError(job.cSize)) { - ZSTDMT_waitForAllJobsCompleted(zcs); - ZSTDMT_releaseAllJobResources(zcs); - return job.cSize; - } - memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite); - output->pos += toWrite; - zcs->jobs[jobID].dstFlushed += toWrite; - DEBUGLOG(1, "remaining : %u bytes ", (U32)(job.cSize - job.dstFlushed)); - if (zcs->jobs[jobID].dstFlushed == job.cSize) { /* output buffer fully flushed => go to next one */ - ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff); - zcs->jobs[jobID].dstBuff = g_nullBuffer; - zcs->jobs[jobID].jobCompleted = 0; - zcs->doneJobID++; - } } } -#endif + /* recommended next input size : fill current input buffer */ - return zcs->inBuffSize - zcs->inBuff.filled; + return zcs->inBuffSize - zcs->inBuff.filled; /* note : could be zero when input buffer is fully filled and no more availability to create new job */ } @@ -671,7 +659,7 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp { size_t const srcSize = zcs->inBuff.filled; - DEBUGLOG(1, "flushing : %u bytes to compress", (U32)srcSize); + DEBUGLOG(4, "flushing : %u bytes left to compress", (U32)srcSize); if ( ((srcSize > 0) || (endFrame && !zcs->frameEnded)) && (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) { size_t const dstBufferCapacity = ZSTD_compressBound(srcSize); @@ -691,6 +679,7 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start; zcs->jobs[jobID].srcSize = srcSize; zcs->jobs[jobID].params = zcs->params; + if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0; /* do not calculate checksum within sections, just keep it in header for first section */ zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL; zcs->jobs[jobID].dict = NULL; zcs->jobs[jobID].dictSize = 0; @@ -719,6 +708,8 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp zcs->inBuff.buffer = g_nullBuffer; zcs->inBuff.filled = 0; zcs->frameEnded = 1; + if (zcs->nextJobID == 0) + zcs->params.fParams.checksumFlag = 0; /* single chunk : checksum is calculated directly within worker thread */ } DEBUGLOG(3, "posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize, zcs->jobs[jobID].lastChunk, zcs->doneJobID, zcs->doneJobID & zcs->jobIDMask); @@ -729,43 +720,6 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp /* check if there is any data available to flush */ DEBUGLOG(5, "zcs->doneJobID : %u ; zcs->nextJobID : %u ", zcs->doneJobID, zcs->nextJobID); return ZSTDMT_flushNextJob(zcs, output, 1); - -#if 0 - { unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask; - PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex); - while (zcs->jobs[wJobID].jobCompleted==0) { - DEBUGLOG(5, "waiting for jobCompleted signal from job %u", zcs->doneJobID); /* block when nothing available to flush */ - pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); - } - pthread_mutex_unlock(&zcs->jobCompleted_mutex); - /* compression job completed : output can be flushed */ - { ZSTDMT_jobDescription job = zcs->jobs[wJobID]; - size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos); - DEBUGLOG(4, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID); - ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); zcs->jobs[wJobID].cctx = NULL; /* release cctx for future task */ - ZSTDMT_releaseBuffer(zcs->buffPool, job.src); zcs->jobs[wJobID].srcStart = NULL; zcs->jobs[wJobID].src = g_nullBuffer; - if (ZSTD_isError(job.cSize)) { - ZSTDMT_waitForAllJobsCompleted(zcs); - ZSTDMT_releaseAllJobResources(zcs); - return job.cSize; - } - memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite); - output->pos += toWrite; - job.dstFlushed += toWrite; - if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => next one */ - ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff); zcs->jobs[wJobID].dstBuff = g_nullBuffer; - zcs->jobs[wJobID].jobCompleted = 0; - zcs->doneJobID++; - } else { - zcs->jobs[wJobID].dstFlushed = job.dstFlushed; - } - /* return value : how many bytes left in buffer ; fake it to 1 if unknown but >0 */ - if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed); - if ((zcs->doneJobID < zcs->nextJobID) || (zcs->inBuff.filled)) return 1; /* still some buffer to flush */ - zcs->allJobsCompleted = zcs->frameEnded; - return 0; - } } -#endif } diff --git a/lib/compress/zstdmt_compress.h b/lib/compress/zstdmt_compress.h index bdc4caab..84d25f73 100644 --- a/lib/compress/zstdmt_compress.h +++ b/lib/compress/zstdmt_compress.h @@ -28,9 +28,9 @@ ZSTDLIB_API size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* cctx, /* === Streaming functions === */ ZSTDLIB_API size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel); -ZSTDLIB_API size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize); /**< pledgedSrcSize is optional and can be zero == unknown */ -ZSTDLIB_API size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* zcs, const void* dict, size_t dictSize, /**< dict can be released after init */ - ZSTD_parameters params, unsigned long long pledgedSrcSize); /**< params current limitation : no checksum ; pledgedSrcSize is optional and can be zero == unknown */ +ZSTDLIB_API size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize); /**< pledgedSrcSize is optional and can be zero == unknown */ +ZSTDLIB_API size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* zcs, const void* dict, size_t dictSize, /**< dict can be released after init, a local copy is preserved within zcs */ + ZSTD_parameters params, unsigned long long pledgedSrcSize); /**< pledgedSrcSize is optional and can be zero == unknown */ ZSTDLIB_API size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input);