ZSTDMT now supports frame checksum

dev
Yann Collet 2017-01-24 11:48:40 -08:00
parent 94364bf87a
commit 3488a4a473
2 changed files with 52 additions and 98 deletions

View File

@ -29,6 +29,8 @@
#include "threading.h" /* mutex */
#include "zstd_internal.h" /* MIN, ERROR, ZSTD_*, ZSTD_highbit32 */
#include "zstdmt_compress.h"
#define XXH_STATIC_LINKING_ONLY /* XXH64_state_t */
#include "xxhash.h"
/* ====== Debug ====== */
@ -217,6 +219,7 @@ typedef struct {
unsigned firstChunk;
unsigned lastChunk;
unsigned jobCompleted;
unsigned jobScanned;
pthread_mutex_t* jobCompleted_mutex;
pthread_cond_t* jobCompleted_cond;
ZSTD_parameters params;
@ -254,6 +257,7 @@ void ZSTDMT_compressChunk(void* jobDescription)
_endJob:
PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex);
job->jobCompleted = 1;
job->jobScanned = 0;
pthread_cond_signal(job->jobCompleted_cond);
pthread_mutex_unlock(job->jobCompleted_mutex);
}
@ -273,6 +277,7 @@ struct ZSTDMT_CCtx_s {
size_t inBuffSize;
inBuff_t inBuff;
ZSTD_parameters params;
XXH64_state_t xxhState;
unsigned nbThreads;
unsigned jobIDMask;
unsigned doneJobID;
@ -474,7 +479,6 @@ static size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs,
ZSTDMT_releaseAllJobResources(zcs);
zcs->allJobsCompleted = 1;
}
params.fParams.checksumFlag = 0; /* current limitation : no checksum (to be lifted in a later version) */
zcs->params = params;
if (updateDict) {
ZSTD_freeCDict(zcs->cdict); zcs->cdict = NULL;
@ -492,6 +496,7 @@ static size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs,
zcs->nextJobID = 0;
zcs->frameEnded = 0;
zcs->allJobsCompleted = 0;
if (params.fParams.checksumFlag) XXH64_reset(&zcs->xxhState, 0);
return 0;
}
@ -518,7 +523,7 @@ size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) {
/* ZSTDMT_flushNextJob() :
* output : will be updated with amount of data flushed .
* blockToFlush : the function will block and wait if there is no data available to flush .
* blockToFlush : if >0, the function will block and wait if there is no data available to flush .
* @return : amount of data remaining within internal buffer, 1 if unknown but > 0, 0 if no more */
static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush)
{
@ -526,28 +531,43 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi
if (zcs->doneJobID == zcs->nextJobID) return 0; /* all flushed ! */
PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
while (zcs->jobs[wJobID].jobCompleted==0) {
DEBUGLOG(5, "waiting for jobCompleted signal from job %u", zcs->doneJobID); /* block when nothing available to flush */
DEBUGLOG(5, "waiting for jobCompleted signal from job %u", zcs->doneJobID);
if (!blockToFlush) { pthread_mutex_unlock(&zcs->jobCompleted_mutex); return 0; } /* nothing ready to be flushed => skip */
pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex);
pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); /* block when nothing available to flush */
}
pthread_mutex_unlock(&zcs->jobCompleted_mutex);
/* compression job completed : output can be flushed */
{ ZSTDMT_jobDescription job = zcs->jobs[wJobID];
size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
DEBUGLOG(4, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx);
zcs->jobs[wJobID].cctx = NULL;
ZSTDMT_releaseBuffer(zcs->buffPool, job.src);
zcs->jobs[wJobID].srcStart = NULL;
zcs->jobs[wJobID].src = g_nullBuffer;
if (!job.jobScanned) {
if (ZSTD_isError(job.cSize)) {
DEBUGLOG(5, "compression error detected ");
ZSTDMT_waitForAllJobsCompleted(zcs);
ZSTDMT_releaseAllJobResources(zcs);
return job.cSize;
}
ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx);
zcs->jobs[wJobID].cctx = NULL;
DEBUGLOG(5, "zcs->params.fParams.checksumFlag : %u ", zcs->params.fParams.checksumFlag);
if (zcs->params.fParams.checksumFlag) {
XXH64_update(&zcs->xxhState, job.srcStart, job.srcSize);
if (zcs->frameEnded && (zcs->doneJobID+1 == zcs->nextJobID)) { /* write checksum at end of last section */
U32 const checksum = (U32)XXH64_digest(&zcs->xxhState);
DEBUGLOG(4, "writing checksum : %08X \n", checksum);
MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum);
job.cSize += 4;
zcs->jobs[wJobID].cSize += 4;
} }
ZSTDMT_releaseBuffer(zcs->buffPool, job.src);
zcs->jobs[wJobID].srcStart = NULL;
zcs->jobs[wJobID].src = g_nullBuffer;
zcs->jobs[wJobID].jobScanned = 1;
}
{ size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
DEBUGLOG(4, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
output->pos += toWrite;
job.dstFlushed += toWrite;
}
if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => move to next one */
ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff);
zcs->jobs[wJobID].dstBuff = g_nullBuffer;
@ -583,7 +603,7 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool);
unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
if ((cctx==NULL) || (dstBuffer.start==NULL)) {
if ((cctx==NULL) || (dstBuffer.start==NULL)) { /* cannot get resources for next job */
zcs->jobs[jobID].jobCompleted = 1;
zcs->nextJobID++;
ZSTDMT_waitForAllJobsCompleted(zcs);
@ -591,11 +611,12 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
return ERROR(memory_allocation);
}
DEBUGLOG(1, "preparing job %u to compress %u bytes \n", (U32)zcs->nextJobID, (U32)zcs->targetSectionSize);
DEBUGLOG(4, "preparing job %u to compress %u bytes \n", (U32)zcs->nextJobID, (U32)zcs->targetSectionSize);
zcs->jobs[jobID].src = zcs->inBuff.buffer;
zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
zcs->jobs[jobID].srcSize = zcs->targetSectionSize;
zcs->jobs[jobID].params = zcs->params;
if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0; /* do not calculate checksum within sections, just keep it in header for first section */
zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL;
zcs->jobs[jobID].dict = NULL;
zcs->jobs[jobID].dictSize = 0;
@ -626,44 +647,11 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
zcs->nextJobID++;
}
/* check if there is any data available to flush */
/* check for data to flush */
ZSTDMT_flushNextJob(zcs, output, (zcs->inBuff.filled == zcs->inBuffSize)); /* we'll block if it wasn't possible to create new job due to saturation */
#if 0
{ unsigned const jobID = zcs->doneJobID & zcs->jobIDMask;
unsigned jobCompleted;
pthread_mutex_lock(&zcs->jobCompleted_mutex);
while (zcs->jobs[jobID].jobCompleted == 0 && zcs->inBuff.filled == zcs->inBuffSize) {
/* when no new job could be started, block until there is something to flush, ensuring forward progress */
pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex);
}
jobCompleted = zcs->jobs[jobID].jobCompleted;
pthread_mutex_unlock(&zcs->jobCompleted_mutex);
if (jobCompleted) {
ZSTDMT_jobDescription const job = zcs->jobs[jobID];
size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
DEBUGLOG(1, "flush %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx);
zcs->jobs[jobID].cctx = NULL;
ZSTDMT_releaseBuffer(zcs->buffPool, job.src);
zcs->jobs[jobID].srcStart = NULL; zcs->jobs[jobID].src = g_nullBuffer;
if (ZSTD_isError(job.cSize)) {
ZSTDMT_waitForAllJobsCompleted(zcs);
ZSTDMT_releaseAllJobResources(zcs);
return job.cSize;
}
memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
output->pos += toWrite;
zcs->jobs[jobID].dstFlushed += toWrite;
DEBUGLOG(1, "remaining : %u bytes ", (U32)(job.cSize - job.dstFlushed));
if (zcs->jobs[jobID].dstFlushed == job.cSize) { /* output buffer fully flushed => go to next one */
ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff);
zcs->jobs[jobID].dstBuff = g_nullBuffer;
zcs->jobs[jobID].jobCompleted = 0;
zcs->doneJobID++;
} } }
#endif
/* recommended next input size : fill current input buffer */
return zcs->inBuffSize - zcs->inBuff.filled;
return zcs->inBuffSize - zcs->inBuff.filled; /* note : could be zero when input buffer is fully filled and no more availability to create new job */
}
@ -671,7 +659,7 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
{
size_t const srcSize = zcs->inBuff.filled;
DEBUGLOG(1, "flushing : %u bytes to compress", (U32)srcSize);
DEBUGLOG(4, "flushing : %u bytes left to compress", (U32)srcSize);
if ( ((srcSize > 0) || (endFrame && !zcs->frameEnded))
&& (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) {
size_t const dstBufferCapacity = ZSTD_compressBound(srcSize);
@ -691,6 +679,7 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
zcs->jobs[jobID].srcSize = srcSize;
zcs->jobs[jobID].params = zcs->params;
if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0; /* do not calculate checksum within sections, just keep it in header for first section */
zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL;
zcs->jobs[jobID].dict = NULL;
zcs->jobs[jobID].dictSize = 0;
@ -719,6 +708,8 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
zcs->inBuff.buffer = g_nullBuffer;
zcs->inBuff.filled = 0;
zcs->frameEnded = 1;
if (zcs->nextJobID == 0)
zcs->params.fParams.checksumFlag = 0; /* single chunk : checksum is calculated directly within worker thread */
}
DEBUGLOG(3, "posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize, zcs->jobs[jobID].lastChunk, zcs->doneJobID, zcs->doneJobID & zcs->jobIDMask);
@ -729,43 +720,6 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
/* check if there is any data available to flush */
DEBUGLOG(5, "zcs->doneJobID : %u ; zcs->nextJobID : %u ", zcs->doneJobID, zcs->nextJobID);
return ZSTDMT_flushNextJob(zcs, output, 1);
#if 0
{ unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
while (zcs->jobs[wJobID].jobCompleted==0) {
DEBUGLOG(5, "waiting for jobCompleted signal from job %u", zcs->doneJobID); /* block when nothing available to flush */
pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex);
}
pthread_mutex_unlock(&zcs->jobCompleted_mutex);
/* compression job completed : output can be flushed */
{ ZSTDMT_jobDescription job = zcs->jobs[wJobID];
size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
DEBUGLOG(4, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); zcs->jobs[wJobID].cctx = NULL; /* release cctx for future task */
ZSTDMT_releaseBuffer(zcs->buffPool, job.src); zcs->jobs[wJobID].srcStart = NULL; zcs->jobs[wJobID].src = g_nullBuffer;
if (ZSTD_isError(job.cSize)) {
ZSTDMT_waitForAllJobsCompleted(zcs);
ZSTDMT_releaseAllJobResources(zcs);
return job.cSize;
}
memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
output->pos += toWrite;
job.dstFlushed += toWrite;
if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => next one */
ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff); zcs->jobs[wJobID].dstBuff = g_nullBuffer;
zcs->jobs[wJobID].jobCompleted = 0;
zcs->doneJobID++;
} else {
zcs->jobs[wJobID].dstFlushed = job.dstFlushed;
}
/* return value : how many bytes left in buffer ; fake it to 1 if unknown but >0 */
if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed);
if ((zcs->doneJobID < zcs->nextJobID) || (zcs->inBuff.filled)) return 1; /* still some buffer to flush */
zcs->allJobsCompleted = zcs->frameEnded;
return 0;
} }
#endif
}

View File

@ -29,8 +29,8 @@ ZSTDLIB_API size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* cctx,
ZSTDLIB_API size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel);
ZSTDLIB_API size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize); /**< pledgedSrcSize is optional and can be zero == unknown */
ZSTDLIB_API size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* zcs, const void* dict, size_t dictSize, /**< dict can be released after init */
ZSTD_parameters params, unsigned long long pledgedSrcSize); /**< params current limitation : no checksum ; pledgedSrcSize is optional and can be zero == unknown */
ZSTDLIB_API size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* zcs, const void* dict, size_t dictSize, /**< dict can be released after init, a local copy is preserved within zcs */
ZSTD_parameters params, unsigned long long pledgedSrcSize); /**< pledgedSrcSize is optional and can be zero == unknown */
ZSTDLIB_API size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input);