From 58ecf13e025c3601391d0c001ef2369eaa0d747c Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Sat, 13 Jan 2018 13:18:57 -0800 Subject: [PATCH 1/9] zstdmt : can compress at block granularity offering perspective of more accurate progression report. --- lib/compress/zstdmt_compress.c | 101 +++++++++++++++++++++------------ programs/fileio.c | 2 +- 2 files changed, 67 insertions(+), 36 deletions(-) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index e51edf12..3a5b58a7 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -22,6 +22,7 @@ /* ====== Dependencies ====== */ #include /* memcpy, memset */ +#include /* INT_MAX */ #include "pool.h" /* threadpool */ #include "threading.h" /* mutex */ #include "zstd_compress_internal.h" /* MIN, ERROR, ZSTD_*, ZSTD_highbit32 */ @@ -129,7 +130,7 @@ static void ZSTDMT_freeBufferPool(ZSTDMT_bufferPool* bufPool) static size_t ZSTDMT_sizeof_bufferPool(ZSTDMT_bufferPool* bufPool) { size_t const poolSize = sizeof(*bufPool) - + (bufPool->totalBuffers - 1) * sizeof(buffer_t); + + (bufPool->totalBuffers - 1) * sizeof(buffer_t); unsigned u; size_t totalBufferSize = 0; ZSTD_pthread_mutex_lock(&bufPool->poolMutex); @@ -201,20 +202,6 @@ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf) ZSTD_free(buf.start, bufPool->cMem); } -/* Sets parameters relevant to the compression job, initializing others to - * default values. Notably, nbThreads should probably be zero. */ -static ZSTD_CCtx_params ZSTDMT_makeJobCCtxParams(ZSTD_CCtx_params const params) -{ - ZSTD_CCtx_params jobParams; - memset(&jobParams, 0, sizeof(jobParams)); - - jobParams.cParams = params.cParams; - jobParams.fParams = params.fParams; - jobParams.compressionLevel = params.compressionLevel; - - jobParams.ldmParams = params.ldmParams; - return jobParams; -} /* ===== CCtx Pool ===== */ /* a single CCtx Pool can be invoked from multiple threads in parallel */ @@ -305,13 +292,16 @@ static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx) } -/* ===== Thread worker ===== */ +/* ------------------------------------------ */ +/* ===== Thread worker ===== */ +/* ------------------------------------------ */ typedef struct { buffer_t src; const void* srcStart; size_t prefixSize; size_t srcSize; + size_t readSize; buffer_t dstBuff; size_t cSize; size_t dstFlushed; @@ -328,21 +318,19 @@ typedef struct { unsigned long long fullFrameSize; } ZSTDMT_jobDescription; -/* ZSTDMT_compressChunk() : POOL_function type */ +/* ZSTDMT_compressChunk() is a POOL_function type */ void ZSTDMT_compressChunk(void* jobDescription) { ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription; ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(job->cctxPool); const void* const src = (const char*)job->srcStart + job->prefixSize; buffer_t dstBuff = job->dstBuff; - DEBUGLOG(5, "ZSTDMT_compressChunk: job (first:%u) (last:%u) : prefixSize %u, srcSize %u ", - job->firstChunk, job->lastChunk, (U32)job->prefixSize, (U32)job->srcSize); + /* ressources */ if (cctx==NULL) { job->cSize = ERROR(memory_allocation); goto _endJob; } - if (dstBuff.start == NULL) { dstBuff = ZSTDMT_getBuffer(job->bufPool); if (dstBuff.start==NULL) { @@ -350,30 +338,26 @@ void ZSTDMT_compressChunk(void* jobDescription) goto _endJob; } job->dstBuff = dstBuff; - DEBUGLOG(5, "ZSTDMT_compressChunk: received dstBuff of size %u", (U32)dstBuff.size); } + /* init */ if (job->cdict) { size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dm_auto, job->cdict, job->params, job->fullFrameSize); - DEBUGLOG(4, "ZSTDMT_compressChunk: init using CDict (windowLog=%u)", job->params.cParams.windowLog); assert(job->firstChunk); /* only allowed for first job */ if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; } } else { /* srcStart points at reloaded section */ U64 const pledgedSrcSize = job->firstChunk ? job->fullFrameSize : ZSTD_CONTENTSIZE_UNKNOWN; ZSTD_CCtx_params jobParams = job->params; /* do not modify job->params ! copy it, modify the copy */ - size_t const forceWindowError = ZSTD_CCtxParam_setParameter(&jobParams, ZSTD_p_forceMaxWindow, !job->firstChunk); - if (ZSTD_isError(forceWindowError)) { - DEBUGLOG(5, "ZSTD_CCtxParam_setParameter error : %s ", ZSTD_getErrorName(forceWindowError)); - job->cSize = forceWindowError; - goto _endJob; - } - DEBUGLOG(5, "ZSTDMT_compressChunk: invoking ZSTD_compressBegin_advanced_internal with windowLog = %u ", jobParams.cParams.windowLog); + { size_t const forceWindowError = ZSTD_CCtxParam_setParameter(&jobParams, ZSTD_p_forceMaxWindow, !job->firstChunk); + if (ZSTD_isError(forceWindowError)) { + job->cSize = forceWindowError; + goto _endJob; + } } { size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, job->srcStart, job->prefixSize, ZSTD_dm_rawContent, /* load dictionary in "content-only" mode (no header analysis) */ NULL, jobParams, pledgedSrcSize); if (ZSTD_isError(initError)) { - DEBUGLOG(5, "ZSTD_compressBegin_advanced_internal error : %s ", ZSTD_getErrorName(initError)); job->cSize = initError; goto _endJob; } } @@ -384,19 +368,50 @@ void ZSTDMT_compressChunk(void* jobDescription) ZSTD_invalidateRepCodes(cctx); } - DEBUGLOG(5, "Compressing into dstBuff of size %u", (U32)dstBuff.size); - DEBUG_PRINTHEX(6, job->srcStart, 12); + /* compress */ +#if 1 job->cSize = (job->lastChunk) ? ZSTD_compressEnd (cctx, dstBuff.start, dstBuff.size, src, job->srcSize) : ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, job->srcSize); - DEBUGLOG(5, "compressed %u bytes into %u bytes (first:%u) (last:%u) ", - (unsigned)job->srcSize, (unsigned)job->cSize, job->firstChunk, job->lastChunk); - DEBUGLOG(5, "dstBuff.size : %u ; => %s ", (U32)dstBuff.size, ZSTD_getErrorName(job->cSize)); +#else + if (sizeof(size_t) > sizeof(int)) + assert(job->srcSize < ((size_t)INT_MAX) * ZSTD_BLOCKSIZE_MAX); /* check overflow */ + { int const nbBlocks = (int)((job->srcSize + (ZSTD_BLOCKSIZE_MAX-1)) / ZSTD_BLOCKSIZE_MAX); + const BYTE* ip = (const BYTE*) src; + BYTE* const ostart = (BYTE*)dstBuff.start; + BYTE* op = ostart; + BYTE* oend = op + dstBuff.size; + int blockNb; + job->cSize = 0; + for (blockNb = 0; blockNb < nbBlocks-1; blockNb++) { + size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, ZSTD_BLOCKSIZE_MAX); + if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; } + ip += ZSTD_BLOCKSIZE_MAX; + op += cSize; assert(op < oend); + /* stats */ + job->cSize += cSize; + job->readSize = ZSTD_BLOCKSIZE_MAX * (blockNb+1); + } + /* last block */ + { size_t const lastBlockSize1 = job->srcSize & (ZSTD_BLOCKSIZE_MAX-1); + size_t const lastBlockSize = (lastBlockSize1==0) ? ZSTD_BLOCKSIZE_MAX : lastBlockSize1; + size_t const cSize = (job->lastChunk) ? + ZSTD_compressEnd (cctx, op, oend-op, ip, lastBlockSize) : + ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize); + if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; } + /* stats */ + job->cSize += cSize; + job->readSize = ZSTD_BLOCKSIZE_MAX * (blockNb+1); + } + } +#endif _endJob: + /* release */ ZSTDMT_releaseCCtx(job->cctxPool, cctx); ZSTDMT_releaseBuffer(job->bufPool, job->src); job->src = g_nullBuffer; job->srcStart = NULL; + /* report */ ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); job->jobCompleted = 1; job->jobScanned = 0; @@ -440,6 +455,21 @@ struct ZSTDMT_CCtx_s { const ZSTD_CDict* cdict; }; +/* Sets parameters relevant to the compression job, initializing others to + * default values. Notably, nbThreads should probably be zero. */ +static ZSTD_CCtx_params ZSTDMT_makeJobCCtxParams(ZSTD_CCtx_params const params) +{ + ZSTD_CCtx_params jobParams; + memset(&jobParams, 0, sizeof(jobParams)); + + jobParams.cParams = params.cParams; + jobParams.fParams = params.fParams; + jobParams.compressionLevel = params.compressionLevel; + + jobParams.ldmParams = params.ldmParams; + return jobParams; +} + static ZSTDMT_jobDescription* ZSTDMT_allocJobsTable(U32* nbJobsPtr, ZSTD_customMem cMem) { U32 const nbJobsLog2 = ZSTD_highbit32(*nbJobsPtr) + 1; @@ -908,6 +938,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi zcs->jobs[jobID].src = zcs->inBuff.buffer; zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start; zcs->jobs[jobID].srcSize = srcSize; + zcs->jobs[jobID].readSize = 0; zcs->jobs[jobID].prefixSize = zcs->dictSize; assert(zcs->inBuff.filled >= srcSize + zcs->dictSize); zcs->jobs[jobID].params = zcs->params; diff --git a/programs/fileio.c b/programs/fileio.c index 887cbeb3..3ae2d405 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -828,7 +828,7 @@ finish: /* Status */ DISPLAYLEVEL(2, "\r%79s\r", ""); DISPLAYLEVEL(2,"%-20s :%6.2f%% (%6llu => %6llu bytes, %s) \n", srcFileName, - (double)compressedfilesize/(readsize+(!readsize) /* avoid div by zero */ )*100, + (double)compressedfilesize / (readsize+(!readsize)/*avoid div by zero*/) * 100, (unsigned long long)readsize, (unsigned long long) compressedfilesize, dstFileName); From 2e23333094e6df6f84b7e21b2dcf64deff88ec30 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Tue, 16 Jan 2018 15:28:43 -0800 Subject: [PATCH 2/9] ZSTDMT can now work in non-blocking mode with 1 thread it still fallbacks to single-thread blocking invocation when input is small (<1job) or when invoking ZSTDMT_compress(), which is blocking. Also : fixed a bug in new block-granular compression routine. --- lib/compress/zstd_compress.c | 3 +- lib/compress/zstdmt_compress.c | 71 ++++++++++++++++++-------------- lib/compress/zstdmt_compress.h | 2 +- lib/decompress/zstd_decompress.c | 12 +++--- tests/zstreamtest.c | 7 ++++ 5 files changed, 57 insertions(+), 38 deletions(-) diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index 49f73dc5..d2703f23 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -1962,7 +1962,8 @@ static size_t ZSTD_compressContinue_internal (ZSTD_CCtx* cctx, { size_t fhSize = 0; - DEBUGLOG(5, "ZSTD_compressContinue_internal, stage: %u", cctx->stage); + DEBUGLOG(5, "ZSTD_compressContinue_internal, stage: %u, srcSize: %u", + cctx->stage, (U32)srcSize); if (cctx->stage==ZSTDcs_created) return ERROR(stage_wrong); /* missing init (ZSTD_compressBegin) */ if (frame && (cctx->stage==ZSTDcs_init)) { diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 3a5b58a7..47b8923a 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -365,11 +365,12 @@ void ZSTDMT_compressChunk(void* jobDescription) if (!job->firstChunk) { /* flush and overwrite frame header when it's not first job */ size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, 0); if (ZSTD_isError(hSize)) { job->cSize = hSize; /* save error code */ goto _endJob; } + DEBUGLOG(5, "ZSTDMT_compressChunk: flush and overwrite %u bytes of frame header (not first chunk)", (U32)hSize); ZSTD_invalidateRepCodes(cctx); } /* compress */ -#if 1 +#if 0 job->cSize = (job->lastChunk) ? ZSTD_compressEnd (cctx, dstBuff.start, dstBuff.size, src, job->srcSize) : ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, job->srcSize); @@ -382,26 +383,28 @@ void ZSTDMT_compressChunk(void* jobDescription) BYTE* op = ostart; BYTE* oend = op + dstBuff.size; int blockNb; + DEBUGLOG(5, "ZSTDMT_compressChunk: compress %u bytes in %i blocks", (U32)job->srcSize, nbBlocks); job->cSize = 0; - for (blockNb = 0; blockNb < nbBlocks-1; blockNb++) { + for (blockNb = 1; blockNb < nbBlocks; blockNb++) { size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, ZSTD_BLOCKSIZE_MAX); if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; } ip += ZSTD_BLOCKSIZE_MAX; op += cSize; assert(op < oend); /* stats */ job->cSize += cSize; - job->readSize = ZSTD_BLOCKSIZE_MAX * (blockNb+1); + job->readSize = ZSTD_BLOCKSIZE_MAX * blockNb; } /* last block */ - { size_t const lastBlockSize1 = job->srcSize & (ZSTD_BLOCKSIZE_MAX-1); - size_t const lastBlockSize = (lastBlockSize1==0) ? ZSTD_BLOCKSIZE_MAX : lastBlockSize1; + if ((nbBlocks > 0) | job->lastChunk /*need to output a "last block" flag*/ ) { + size_t const lastBlockSize1 = job->srcSize & (ZSTD_BLOCKSIZE_MAX-1); + size_t const lastBlockSize = ((lastBlockSize1==0) & (job->srcSize>=ZSTD_BLOCKSIZE_MAX)) ? ZSTD_BLOCKSIZE_MAX : lastBlockSize1; size_t const cSize = (job->lastChunk) ? ZSTD_compressEnd (cctx, op, oend-op, ip, lastBlockSize) : ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize); if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; } /* stats */ job->cSize += cSize; - job->readSize = ZSTD_BLOCKSIZE_MAX * (blockNb+1); + job->readSize = job->srcSize; } } #endif @@ -443,7 +446,7 @@ struct ZSTDMT_CCtx_s { size_t targetDictSize; inBuff_t inBuff; XXH64_state_t xxhState; - unsigned singleThreaded; + unsigned singleBlockingThread; unsigned jobIDMask; unsigned doneJobID; unsigned nextJobID; @@ -457,7 +460,7 @@ struct ZSTDMT_CCtx_s { /* Sets parameters relevant to the compression job, initializing others to * default values. Notably, nbThreads should probably be zero. */ -static ZSTD_CCtx_params ZSTDMT_makeJobCCtxParams(ZSTD_CCtx_params const params) +static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params) { ZSTD_CCtx_params jobParams; memset(&jobParams, 0, sizeof(jobParams)); @@ -646,17 +649,21 @@ size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, /* ===== Multi-threaded compression ===== */ /* ------------------------------------------ */ -static unsigned computeNbChunks(size_t srcSize, unsigned windowLog, unsigned nbThreads) { - size_t const chunkSizeTarget = (size_t)1 << (windowLog + 2); - size_t const chunkMaxSize = chunkSizeTarget << 2; - size_t const passSizeMax = chunkMaxSize * nbThreads; - unsigned const multiplier = (unsigned)(srcSize / passSizeMax) + 1; - unsigned const nbChunksLarge = multiplier * nbThreads; - unsigned const nbChunksMax = (unsigned)(srcSize / chunkSizeTarget) + 1; - unsigned const nbChunksSmall = MIN(nbChunksMax, nbThreads); - return (multiplier>1) ? nbChunksLarge : nbChunksSmall; -} +static unsigned ZSTDMT_computeNbChunks(size_t srcSize, unsigned windowLog, unsigned nbThreads) { + assert(nbThreads>0); + { size_t const chunkSizeTarget = (size_t)1 << (windowLog + 2); + size_t const chunkMaxSize = chunkSizeTarget << 2; + size_t const passSizeMax = chunkMaxSize * nbThreads; + unsigned const multiplier = (unsigned)(srcSize / passSizeMax) + 1; + unsigned const nbChunksLarge = multiplier * nbThreads; + unsigned const nbChunksMax = (unsigned)(srcSize / chunkSizeTarget) + 1; + unsigned const nbChunksSmall = MIN(nbChunksMax, nbThreads); + return (multiplier>1) ? nbChunksLarge : nbChunksSmall; +} } +/* ZSTDMT_compress_advanced_internal() : + * This is a blocking function : it will only give back control to caller after finishing its compression job. + */ static size_t ZSTDMT_compress_advanced_internal( ZSTDMT_CCtx* mtctx, void* dst, size_t dstCapacity, @@ -664,10 +671,10 @@ static size_t ZSTDMT_compress_advanced_internal( const ZSTD_CDict* cdict, ZSTD_CCtx_params const params) { - ZSTD_CCtx_params const jobParams = ZSTDMT_makeJobCCtxParams(params); + ZSTD_CCtx_params const jobParams = ZSTDMT_initJobCCtxParams(params); unsigned const overlapRLog = (params.overlapSizeLog>9) ? 0 : 9-params.overlapSizeLog; size_t const overlapSize = (overlapRLog>=9) ? 0 : (size_t)1 << (params.cParams.windowLog - overlapRLog); - unsigned nbChunks = computeNbChunks(srcSize, params.cParams.windowLog, params.nbThreads); + unsigned nbChunks = ZSTDMT_computeNbChunks(srcSize, params.cParams.windowLog, params.nbThreads); size_t const proposedChunkSize = (srcSize + (nbChunks-1)) / nbChunks; size_t const avgChunkSize = (((proposedChunkSize-1) & 0x1FFFF) < 0x7FFF) ? proposedChunkSize + 0xFFFF : proposedChunkSize; /* avoid too small last block */ const char* const srcStart = (const char*)src; @@ -678,14 +685,16 @@ static size_t ZSTDMT_compress_advanced_internal( assert(jobParams.nbThreads == 0); assert(mtctx->cctxPool->totalCCtx == params.nbThreads); - DEBUGLOG(4, "ZSTDMT_compress_advanced_internal: nbChunks=%2u (rawSize=%u bytes; fixedSize=%u) ", + DEBUGLOG(4, "ZSTDMT_compress_advanced_internal: nbChunks=%2u (rawSize=%u bytes; fixedSize=%u) ", nbChunks, (U32)proposedChunkSize, (U32)avgChunkSize); - if (nbChunks==1) { /* fallback to single-thread mode */ + + if ((nbChunks==1) | (params.nbThreads<=1)) { /* fallback to single-thread mode : this is a blocking invocation anyway */ ZSTD_CCtx* const cctx = mtctx->cctxPool->cctx[0]; if (cdict) return ZSTD_compress_usingCDict_advanced(cctx, dst, dstCapacity, src, srcSize, cdict, jobParams.fParams); return ZSTD_compress_advanced_internal(cctx, dst, dstCapacity, src, srcSize, NULL, 0, jobParams); } - assert(avgChunkSize >= 256 KB); /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), which is required for compressWithinDst */ + + assert(avgChunkSize >= 256 KB); /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), required to compress directly into Dst (no additional buffer) */ ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgChunkSize) ); XXH64_reset(&xxh64, 0); @@ -695,6 +704,7 @@ static size_t ZSTDMT_compress_advanced_internal( mtctx->jobIDMask = 0; mtctx->jobs = ZSTDMT_allocJobsTable(&nbJobs, mtctx->cMem); if (mtctx->jobs==NULL) return ERROR(memory_allocation); + assert((nbJobs != 0) && ((nbJobs & (nbJobs - 1)) == 0)); /* ensure nbJobs is a power of 2 */ mtctx->jobIDMask = nbJobs - 1; } @@ -827,10 +837,10 @@ size_t ZSTDMT_initCStream_internal( assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams))); assert(!((dict) && (cdict))); /* either dict or cdict, not both */ assert(zcs->cctxPool->totalCCtx == params.nbThreads); - zcs->singleThreaded = (params.nbThreads==1) | (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN); /* do not trigger multi-threading when srcSize is too small */ + zcs->singleBlockingThread = pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN; /* do not trigger multi-threading when srcSize is too small */ - if (zcs->singleThreaded) { - ZSTD_CCtx_params const singleThreadParams = ZSTDMT_makeJobCCtxParams(params); + if (zcs->singleBlockingThread) { + ZSTD_CCtx_params const singleThreadParams = ZSTDMT_initJobCCtxParams(params); DEBUGLOG(4, "single thread mode"); assert(singleThreadParams.nbThreads == 0); return ZSTD_initCStream_internal(zcs->cctxPool->cctx[0], @@ -921,10 +931,11 @@ size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize) } size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) { - ZSTD_parameters const params = ZSTD_getParams(compressionLevel, 0, 0); + ZSTD_parameters const params = ZSTD_getParams(compressionLevel, ZSTD_CONTENTSIZE_UNKNOWN, 0); ZSTD_CCtx_params cctxParams = zcs->params; cctxParams.cParams = params.cParams; cctxParams.fParams = params.fParams; + DEBUGLOG(4, "ZSTDMT_initCStream (cLevel=%i)", compressionLevel); return ZSTDMT_initCStream_internal(zcs, NULL, 0, ZSTD_dm_auto, NULL, cctxParams, ZSTD_CONTENTSIZE_UNKNOWN); } @@ -1071,7 +1082,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, assert(output->pos <= output->size); assert(input->pos <= input->size); - if (mtctx->singleThreaded) { /* delegate to single-thread (synchronous) */ + if (mtctx->singleBlockingThread) { /* delegate to single-thread (synchronous) */ return ZSTD_compressStream_generic(mtctx->cctxPool->cctx[0], output, input, endOp); } @@ -1166,7 +1177,7 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* ou size_t ZSTDMT_flushStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output) { DEBUGLOG(5, "ZSTDMT_flushStream"); - if (mtctx->singleThreaded) + if (mtctx->singleBlockingThread) return ZSTD_flushStream(mtctx->cctxPool->cctx[0], output); return ZSTDMT_flushStream_internal(mtctx, output, 0 /* endFrame */); } @@ -1174,7 +1185,7 @@ size_t ZSTDMT_flushStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output) size_t ZSTDMT_endStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output) { DEBUGLOG(4, "ZSTDMT_endStream"); - if (mtctx->singleThreaded) + if (mtctx->singleBlockingThread) return ZSTD_endStream(mtctx->cctxPool->cctx[0], output); return ZSTDMT_flushStream_internal(mtctx, output, 1 /* endFrame */); } diff --git a/lib/compress/zstdmt_compress.h b/lib/compress/zstdmt_compress.h index d12f0adb..b6e68684 100644 --- a/lib/compress/zstdmt_compress.h +++ b/lib/compress/zstdmt_compress.h @@ -85,7 +85,7 @@ ZSTDLIB_API size_t ZSTDMT_initCStream_usingCDict(ZSTDMT_CCtx* mtctx, * List of parameters that can be set using ZSTDMT_setMTCtxParameter() */ typedef enum { ZSTDMT_p_jobSize, /* Each job is compressed in parallel. By default, this value is dynamically determined depending on compression parameters. Can be set explicitly here. */ - ZSTDMT_p_overlapSectionLog /* Each job may reload a part of previous job to enhance compressionr ratio; 0 == no overlap, 6(default) == use 1/8th of window, >=9 == use full window */ + ZSTDMT_p_overlapSectionLog /* Each job may reload a part of previous job to enhance compressionr ratio; 0 == no overlap, 6(default) == use 1/8th of window, >=9 == use full window. This is a "sticky" parameter : its value will be re-used on next compression job */ } ZSTDMT_parameter; /* ZSTDMT_setMTCtxParameter() : diff --git a/lib/decompress/zstd_decompress.c b/lib/decompress/zstd_decompress.c index ab6c2edc..a573cc19 100644 --- a/lib/decompress/zstd_decompress.c +++ b/lib/decompress/zstd_decompress.c @@ -1759,7 +1759,7 @@ static int ZSTD_isSkipFrame(ZSTD_DCtx* dctx) { return dctx->stage == ZSTDds_skip * or an error code, which can be tested using ZSTD_isError() */ size_t ZSTD_decompressContinue(ZSTD_DCtx* dctx, void* dst, size_t dstCapacity, const void* src, size_t srcSize) { - DEBUGLOG(5, "ZSTD_decompressContinue"); + DEBUGLOG(5, "ZSTD_decompressContinue (srcSize:%u)", (U32)srcSize); /* Sanity check */ if (srcSize != dctx->expected) return ERROR(srcSize_wrong); /* not allowed */ if (dstCapacity) ZSTD_checkContinuity(dctx, dst); @@ -1820,12 +1820,12 @@ size_t ZSTD_decompressContinue(ZSTD_DCtx* dctx, void* dst, size_t dstCapacity, c case ZSTDds_decompressLastBlock: case ZSTDds_decompressBlock: - DEBUGLOG(5, "case ZSTDds_decompressBlock"); + DEBUGLOG(5, "ZSTD_decompressContinue: case ZSTDds_decompressBlock"); { size_t rSize; switch(dctx->bType) { case bt_compressed: - DEBUGLOG(5, "case bt_compressed"); + DEBUGLOG(5, "ZSTD_decompressContinue: case bt_compressed"); rSize = ZSTD_decompressBlock_internal(dctx, dst, dstCapacity, src, srcSize, /* frame */ 1); break; case bt_raw : @@ -1839,12 +1839,12 @@ size_t ZSTD_decompressContinue(ZSTD_DCtx* dctx, void* dst, size_t dstCapacity, c return ERROR(corruption_detected); } if (ZSTD_isError(rSize)) return rSize; - DEBUGLOG(5, "decoded size from block : %u", (U32)rSize); + DEBUGLOG(5, "ZSTD_decompressContinue: decoded size from block : %u", (U32)rSize); dctx->decodedSize += rSize; if (dctx->fParams.checksumFlag) XXH64_update(&dctx->xxhState, dst, rSize); if (dctx->stage == ZSTDds_decompressLastBlock) { /* end of frame */ - DEBUGLOG(4, "decoded size from frame : %u", (U32)dctx->decodedSize); + DEBUGLOG(4, "ZSTD_decompressContinue: decoded size from frame : %u", (U32)dctx->decodedSize); if (dctx->fParams.frameContentSize != ZSTD_CONTENTSIZE_UNKNOWN) { if (dctx->decodedSize != dctx->fParams.frameContentSize) { return ERROR(corruption_detected); @@ -1868,7 +1868,7 @@ size_t ZSTD_decompressContinue(ZSTD_DCtx* dctx, void* dst, size_t dstCapacity, c assert(srcSize == 4); /* guaranteed by dctx->expected */ { U32 const h32 = (U32)XXH64_digest(&dctx->xxhState); U32 const check32 = MEM_readLE32(src); - DEBUGLOG(4, "checksum : calculated %08X :: %08X read", h32, check32); + DEBUGLOG(4, "ZSTD_decompressContinue: checksum : calculated %08X :: %08X read", h32, check32); if (check32 != h32) return ERROR(checksum_wrong); dctx->expected = 0; dctx->stage = ZSTDds_getFrameHeaderSize; diff --git a/tests/zstreamtest.c b/tests/zstreamtest.c index 5590c9af..5cd1ea0f 100644 --- a/tests/zstreamtest.c +++ b/tests/zstreamtest.c @@ -1243,6 +1243,7 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp maxTestSize = FUZ_randomLength(&lseed, oldTestLog+2); if (maxTestSize >= srcBufferSize) maxTestSize = srcBufferSize-1; { int const compressionLevel = (FUZ_rand(&lseed) % 5) + 1; + DISPLAYLEVEL(5, "Init with compression level = %i \n", compressionLevel); CHECK_Z( ZSTDMT_initCStream(zc, compressionLevel) ); } } else { @@ -1301,9 +1302,12 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp if ((FUZ_rand(&lseed) & 15) == 0) { size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog); size_t const adjustedDstSize = MIN(cBufferSize - cSize, randomDstSize); + size_t const previousPos = outBuff.pos; outBuff.size = outBuff.pos + adjustedDstSize; DISPLAYLEVEL(5, "Flushing into dst buffer of size %u \n", (U32)adjustedDstSize); CHECK_Z( ZSTDMT_flushStream(zc, &outBuff) ); + assert(outBuff.pos >= previousPos); + DISPLAYLEVEL(6, "%u bytes flushed by ZSTDMT_flushStream \n", (U32)(outBuff.pos-previousPos)); } } /* final frame epilogue */ @@ -1311,10 +1315,13 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp while (remainingToFlush) { size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog); size_t const adjustedDstSize = MIN(cBufferSize - cSize, randomDstSize); + size_t const previousPos = outBuff.pos; outBuff.size = outBuff.pos + adjustedDstSize; DISPLAYLEVEL(5, "Ending into dst buffer of size %u \n", (U32)adjustedDstSize); remainingToFlush = ZSTDMT_endStream(zc, &outBuff); CHECK (ZSTD_isError(remainingToFlush), "ZSTDMT_endStream error : %s", ZSTD_getErrorName(remainingToFlush)); + assert(outBuff.pos >= previousPos); + DISPLAYLEVEL(6, "%u bytes flushed by ZSTDMT_endStream \n", (U32)(outBuff.pos-previousPos)); DISPLAYLEVEL(5, "endStream : remainingToFlush : %u \n", (U32)remainingToFlush); } } crcOrig = XXH64_digest(&xxhState); From 6025465e42dfce6d3312b14a80e7614bc52b362c Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Tue, 16 Jan 2018 15:34:41 -0800 Subject: [PATCH 3/9] ZSTDMT : minor CCtx memory optimization can be useful when a compression job only has small amount of data to compress. --- lib/compress/zstdmt_compress.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 47b8923a..0dadda28 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -346,7 +346,7 @@ void ZSTDMT_compressChunk(void* jobDescription) assert(job->firstChunk); /* only allowed for first job */ if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; } } else { /* srcStart points at reloaded section */ - U64 const pledgedSrcSize = job->firstChunk ? job->fullFrameSize : ZSTD_CONTENTSIZE_UNKNOWN; + U64 const pledgedSrcSize = job->firstChunk ? job->fullFrameSize : job->srcSize; ZSTD_CCtx_params jobParams = job->params; /* do not modify job->params ! copy it, modify the copy */ { size_t const forceWindowError = ZSTD_CCtxParam_setParameter(&jobParams, ZSTD_p_forceMaxWindow, !job->firstChunk); if (ZSTD_isError(forceWindowError)) { @@ -355,7 +355,7 @@ void ZSTDMT_compressChunk(void* jobDescription) } } { size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, job->srcStart, job->prefixSize, ZSTD_dm_rawContent, /* load dictionary in "content-only" mode (no header analysis) */ - NULL, + NULL, /*cdict*/ jobParams, pledgedSrcSize); if (ZSTD_isError(initError)) { job->cSize = initError; @@ -404,8 +404,8 @@ void ZSTDMT_compressChunk(void* jobDescription) if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; } /* stats */ job->cSize += cSize; - job->readSize = job->srcSize; } + job->readSize = job->srcSize; } #endif From 1dba98d563d81fb42b4ac575e70eaf52654bbf8e Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Tue, 16 Jan 2018 16:15:47 -0800 Subject: [PATCH 4/9] introduced parameter ZSTD_p_nonBlockingMode This new parameter makes it possible to call streaming ZSTDMT with a single thread set which is non blocking. It makes it possible for the main thread to do other tasks in parallel while the worker thread does compression. Typically, for zstd cli, it means it can do I/O stuff. Applied within fileio.c, this patch provides non-negligible gains during compression. Tested on my laptop, with enwik9 (1000000000 bytes) : time zstd -f enwik9 With traditional single-thread blocking mode : real 0m9.557s user 0m8.861s sys 0m0.538s With new single-worker non blocking mode : real 0m7.938s user 0m8.049s sys 0m0.514s => 20% faster --- lib/compress/zstd_compress.c | 29 ++++++++++++++++++--------- lib/compress/zstd_compress_internal.h | 1 + lib/compress/zstdmt_compress.c | 2 +- lib/compress/zstdmt_compress.h | 2 +- lib/zstd.h | 10 +++++++++ programs/fileio.c | 1 + 6 files changed, 34 insertions(+), 11 deletions(-) diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index d2703f23..fbae3c3f 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -281,9 +281,8 @@ size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned v } return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value); + case ZSTD_p_nonBlockingMode: case ZSTD_p_jobSize: - return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value); - case ZSTD_p_overlapSizeLog: return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value); @@ -407,11 +406,18 @@ size_t ZSTD_CCtxParam_setParameter( return ZSTDMT_CCtxParam_setNbThreads(CCtxParams, value); #endif + case ZSTD_p_nonBlockingMode : +#ifndef ZSTD_MULTITHREAD + return ERROR(parameter_unsupported); +#else + CCtxParams->nonBlockingMode = (value>0); + return CCtxParams->nonBlockingMode; +#endif + case ZSTD_p_jobSize : #ifndef ZSTD_MULTITHREAD return ERROR(parameter_unsupported); #else - if (CCtxParams->nbThreads <= 1) return ERROR(parameter_unsupported); return ZSTDMT_CCtxParam_setMTCtxParameter(CCtxParams, ZSTDMT_p_jobSize, value); #endif @@ -419,7 +425,6 @@ size_t ZSTD_CCtxParam_setParameter( #ifndef ZSTD_MULTITHREAD return ERROR(parameter_unsupported); #else - if (CCtxParams->nbThreads <= 1) return ERROR(parameter_unsupported); return ZSTDMT_CCtxParam_setMTCtxParameter(CCtxParams, ZSTDMT_p_overlapSectionLog, value); #endif @@ -3007,12 +3012,17 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx, cctx->requestedParams, cctx->pledgedSrcSizePlusOne-1, 0 /*dictSize*/); #ifdef ZSTD_MULTITHREAD - if ((cctx->pledgedSrcSizePlusOne-1) <= ZSTDMT_JOBSIZE_MIN) + if ((cctx->pledgedSrcSizePlusOne-1) <= ZSTDMT_JOBSIZE_MIN) { params.nbThreads = 1; /* do not invoke multi-threading when src size is too small */ - if (params.nbThreads > 1) { + params.nonBlockingMode = 0; + } + if ((params.nbThreads > 1) | (params.nonBlockingMode == 1)) { if (cctx->mtctx == NULL || (params.nbThreads != ZSTDMT_getNbThreads(cctx->mtctx))) { - DEBUGLOG(4, "ZSTD_compress_generic: creating new mtctx for nbThreads=%u (previous: %u)", - params.nbThreads, (unsigned)ZSTDMT_getNbThreads(cctx->mtctx)); + DEBUGLOG(4, "ZSTD_compress_generic: creating new mtctx for nbThreads=%u", + params.nbThreads); + if (cctx->mtctx != NULL) + DEBUGLOG(4, "ZSTD_compress_generic: previous nbThreads was %u", + ZSTDMT_getNbThreads(cctx->mtctx)); ZSTDMT_freeCCtx(cctx->mtctx); cctx->mtctx = ZSTDMT_createCCtx_advanced(params.nbThreads, cctx->customMem); if (cctx->mtctx == NULL) return ERROR(memory_allocation); @@ -3024,6 +3034,7 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx, cctx->cdict, params, cctx->pledgedSrcSizePlusOne-1) ); cctx->streamStage = zcss_load; cctx->appliedParams.nbThreads = params.nbThreads; + cctx->appliedParams.nonBlockingMode = params.nonBlockingMode; } else #endif { CHECK_F( ZSTD_resetCStream_internal( @@ -3036,7 +3047,7 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx, /* compression stage */ #ifdef ZSTD_MULTITHREAD - if (cctx->appliedParams.nbThreads > 1) { + if ((cctx->appliedParams.nbThreads > 1) | (cctx->appliedParams.nonBlockingMode==1)) { size_t const flushMin = ZSTDMT_compressStream_generic(cctx->mtctx, output, input, endOp); if ( ZSTD_isError(flushMin) || (endOp == ZSTD_e_end && flushMin == 0) ) { /* compression completed */ diff --git a/lib/compress/zstd_compress_internal.h b/lib/compress/zstd_compress_internal.h index 7be20d4c..eb18cbc5 100644 --- a/lib/compress/zstd_compress_internal.h +++ b/lib/compress/zstd_compress_internal.h @@ -150,6 +150,7 @@ struct ZSTD_CCtx_params_s { /* Multithreading: used to pass parameters to mtctx */ U32 nbThreads; + int nonBlockingMode; /* will trigger ZSTDMT even with nbThreads==1 */ unsigned jobSize; unsigned overlapSizeLog; diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 0dadda28..0c28eb78 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -497,7 +497,7 @@ size_t ZSTDMT_CCtxParam_setNbThreads(ZSTD_CCtx_params* params, unsigned nbThread /* ZSTDMT_getNbThreads(): * @return nb threads currently active in mtctx. * mtctx must be valid */ -size_t ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx) +unsigned ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx) { assert(mtctx != NULL); return mtctx->params.nbThreads; diff --git a/lib/compress/zstdmt_compress.h b/lib/compress/zstdmt_compress.h index b6e68684..7716ea68 100644 --- a/lib/compress/zstdmt_compress.h +++ b/lib/compress/zstdmt_compress.h @@ -120,7 +120,7 @@ size_t ZSTDMT_CCtxParam_setNbThreads(ZSTD_CCtx_params* params, unsigned nbThread /* ZSTDMT_getNbThreads(): * @return nb threads currently active in mtctx. * mtctx must be valid */ -size_t ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx); +unsigned ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx); /*! ZSTDMT_initCStream_internal() : * Private use only. Init streaming operation. diff --git a/lib/zstd.h b/lib/zstd.h index a84490d9..6ac132a6 100644 --- a/lib/zstd.h +++ b/lib/zstd.h @@ -972,10 +972,20 @@ typedef enum { ZSTD_p_dictIDFlag, /* When applicable, dictionary's ID is written into frame header (default:1) */ /* multi-threading parameters */ + /* These parameters are only useful if multi-threading is enabled (ZSTD_MULTITHREAD). + * They return an error otherwise. */ ZSTD_p_nbThreads=400, /* Select how many threads a compression job can spawn (default:1) * More threads improve speed, but also increase memory usage. * Can only receive a value > 1 if ZSTD_MULTITHREAD is enabled. * Special: value 0 means "do not change nbThreads" */ + ZSTD_p_nonBlockingMode, /* Single thread mode is by default "blocking" : + * it finishes its job as much as possible, and only then gives back control to caller. + * In contrast, multi-thread is by default "non-blocking" : + * it takes some input, flush some output if available, and immediately gives back control to caller. + * Compression work is performed in parallel, within worker threads. + * (note : a strong exception to this rule is when first job is called with ZSTD_e_end : it becomes blocking) + * Setting this parameter to 1 will enforce non-blocking mode even when only 1 thread is selected. + * It allows the caller to do other tasks while the worker thread compresses in parallel. */ ZSTD_p_jobSize, /* Size of a compression job. This value is only enforced in streaming (non-blocking) mode. * Each compression job is completed in parallel, so indirectly controls the nb of active threads. * 0 means default, which is dynamically determined based on compression parameters. diff --git a/programs/fileio.c b/programs/fileio.c index 3ae2d405..ea84853c 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -457,6 +457,7 @@ static cRess_t FIO_createCResources(const char* dictFileName, int cLevel, /* multi-threading */ DISPLAYLEVEL(5,"set nb threads = %u \n", g_nbThreads); CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_nbThreads, g_nbThreads) ); + CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_nonBlockingMode, 1) ); /* dictionary */ CHECK( ZSTD_CCtx_setPledgedSrcSize(ress.cctx, srcSize) ); /* just for dictionary loading, for compression parameters adaptation */ CHECK( ZSTD_CCtx_loadDictionary(ress.cctx, dictBuffer, dictBuffSize) ); From 10c213761ac8714bfa9b9e6cf70648ec458308e8 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Tue, 16 Jan 2018 17:28:11 -0800 Subject: [PATCH 5/9] cli: fix for no-MT mode when cli is compiled without MT support, invoking ZSTD_p_nonBlockingMode result in an error code. This patch only sets ZSTD_p_nonBlockingMode when ZSTD_MULTITHREAD is set, meaning there is MT support. The error code could also be intentionnally ignored (there is no side effect). --- programs/fileio.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/programs/fileio.c b/programs/fileio.c index ea84853c..58b77a81 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -457,7 +457,9 @@ static cRess_t FIO_createCResources(const char* dictFileName, int cLevel, /* multi-threading */ DISPLAYLEVEL(5,"set nb threads = %u \n", g_nbThreads); CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_nbThreads, g_nbThreads) ); +#ifdef ZSTD_MULTITHREAD CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_nonBlockingMode, 1) ); +#endif /* dictionary */ CHECK( ZSTD_CCtx_setPledgedSrcSize(ress.cctx, srcSize) ); /* just for dictionary loading, for compression parameters adaptation */ CHECK( ZSTD_CCtx_loadDictionary(ress.cctx, dictBuffer, dictBuffSize) ); From 3e1e57db2775c49595b483e22589eb2e20015b14 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Tue, 16 Jan 2018 17:35:00 -0800 Subject: [PATCH 6/9] fix fileio progression status update The compression % is no longer correct, since it's no longer possible to make direct correlation between nb bytes read and nb bytes written due to large internal buffer inside CCtx (exacerbated with --long). The current "fix" is to no longer display the %. A more complex solution will have to count exactly how much data has been consumed and compressed internally, within CCtx buffers. --- programs/fileio.c | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/programs/fileio.c b/programs/fileio.c index 58b77a81..326f0e67 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -809,21 +809,11 @@ static int FIO_compressFilename_internal(cRess_t ress, compressedfilesize += outBuff.pos; } } - if (g_nbThreads > 1) { - if (fileSize == UTIL_FILESIZE_UNKNOWN) - DISPLAYUPDATE(2, "\rRead : %u MB", (U32)(readsize>>20)) - else - DISPLAYUPDATE(2, "\rRead : %u / %u MB", - (U32)(readsize>>20), (U32)(fileSize>>20)); + if (fileSize == UTIL_FILESIZE_UNKNOWN) { + DISPLAYUPDATE(2, "\rRead : %u MB", (U32)(readsize>>20)); } else { - if (fileSize == UTIL_FILESIZE_UNKNOWN) - DISPLAYUPDATE(2, "\rRead : %u MB ==> %.2f%%", - (U32)(readsize>>20), - (double)compressedfilesize/readsize*100) - else - DISPLAYUPDATE(2, "\rRead : %u / %u MB ==> %.2f%%", - (U32)(readsize>>20), (U32)(fileSize>>20), - (double)compressedfilesize/readsize*100); + DISPLAYUPDATE(2, "\rRead : %u / %u MB", + (U32)(readsize>>20), (U32)(fileSize>>20)); } } while (directive != ZSTD_e_end); From cb57c107ff12b1f39adbb25ff14db56112a68cd1 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Wed, 17 Jan 2018 11:39:07 -0800 Subject: [PATCH 7/9] zstdmt: minor variable renaming, for clarity --- lib/compress/zstd_compress.c | 1 + lib/compress/zstdmt_compress.c | 58 +++++++++++++++++----------------- 2 files changed, 30 insertions(+), 29 deletions(-) diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index fbae3c3f..5cc02bcc 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -2579,6 +2579,7 @@ ZSTD_CDict* ZSTD_initStaticCDict(void* workspace, size_t workspaceSize, } ZSTD_compressionParameters ZSTD_getCParamsFromCDict(const ZSTD_CDict* cdict) { + assert(cdict != NULL); return cdict->refContext->appliedParams.cParams; } diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 0c28eb78..c65f3ea1 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -442,8 +442,8 @@ struct ZSTDMT_CCtx_s { ZSTD_CCtx_params params; size_t targetSectionSize; size_t inBuffSize; - size_t dictSize; - size_t targetDictSize; + size_t prefixSize; + size_t targetPrefixSize; inBuff_t inBuff; XXH64_state_t xxhState; unsigned singleBlockingThread; @@ -841,13 +841,13 @@ size_t ZSTDMT_initCStream_internal( if (zcs->singleBlockingThread) { ZSTD_CCtx_params const singleThreadParams = ZSTDMT_initJobCCtxParams(params); - DEBUGLOG(4, "single thread mode"); + DEBUGLOG(4, "ZSTDMT_initCStream_internal: switch to single blocking thread mode"); assert(singleThreadParams.nbThreads == 0); return ZSTD_initCStream_internal(zcs->cctxPool->cctx[0], dict, dictSize, cdict, singleThreadParams, pledgedSrcSize); } - DEBUGLOG(4, "multi-threading mode (%u threads)", params.nbThreads); + DEBUGLOG(4, "ZSTDMT_initCStream_internal: %u threads", params.nbThreads); if (zcs->allJobsCompleted == 0) { /* previous compression not correctly finished */ ZSTDMT_waitForAllJobsCompleted(zcs); @@ -871,17 +871,17 @@ size_t ZSTDMT_initCStream_internal( } assert(params.overlapSizeLog <= 9); - zcs->targetDictSize = (params.overlapSizeLog==0) ? 0 : (size_t)1 << (params.cParams.windowLog - (9 - params.overlapSizeLog)); - DEBUGLOG(4, "overlapLog=%u => %u KB", params.overlapSizeLog, (U32)(zcs->targetDictSize>>10)); + zcs->targetPrefixSize = (params.overlapSizeLog==0) ? 0 : (size_t)1 << (params.cParams.windowLog - (9 - params.overlapSizeLog)); + DEBUGLOG(4, "overlapLog=%u => %u KB", params.overlapSizeLog, (U32)(zcs->targetPrefixSize>>10)); zcs->targetSectionSize = params.jobSize ? params.jobSize : (size_t)1 << (params.cParams.windowLog + 2); if (zcs->targetSectionSize < ZSTDMT_JOBSIZE_MIN) zcs->targetSectionSize = ZSTDMT_JOBSIZE_MIN; - if (zcs->targetSectionSize < zcs->targetDictSize) zcs->targetSectionSize = zcs->targetDictSize; /* job size must be >= overlap size */ + if (zcs->targetSectionSize < zcs->targetPrefixSize) zcs->targetSectionSize = zcs->targetPrefixSize; /* job size must be >= overlap size */ DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(zcs->targetSectionSize>>10), params.jobSize); - zcs->inBuffSize = zcs->targetDictSize + zcs->targetSectionSize; + zcs->inBuffSize = zcs->targetPrefixSize + zcs->targetSectionSize; DEBUGLOG(4, "inBuff Size : %u KB", (U32)(zcs->inBuffSize>>10)); ZSTDMT_setBufferSize(zcs->bufPool, MAX(zcs->inBuffSize, ZSTD_compressBound(zcs->targetSectionSize)) ); zcs->inBuff.buffer = g_nullBuffer; - zcs->dictSize = 0; + zcs->prefixSize = 0; zcs->doneJobID = 0; zcs->nextJobID = 0; zcs->frameEnded = 0; @@ -895,8 +895,8 @@ size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* mtctx, ZSTD_parameters params, unsigned long long pledgedSrcSize) { - ZSTD_CCtx_params cctxParams = mtctx->params; - DEBUGLOG(5, "ZSTDMT_initCStream_advanced (pledgedSrcSize=%u)", (U32)pledgedSrcSize); + ZSTD_CCtx_params cctxParams = mtctx->params; /* retrieve sticky params */ + DEBUGLOG(4, "ZSTDMT_initCStream_advanced (pledgedSrcSize=%u)", (U32)pledgedSrcSize); cctxParams.cParams = params.cParams; cctxParams.fParams = params.fParams; return ZSTDMT_initCStream_internal(mtctx, dict, dictSize, ZSTD_dm_auto, NULL, @@ -909,9 +909,9 @@ size_t ZSTDMT_initCStream_usingCDict(ZSTDMT_CCtx* mtctx, unsigned long long pledgedSrcSize) { ZSTD_CCtx_params cctxParams = mtctx->params; + if (cdict==NULL) return ERROR(dictionary_wrong); /* method incompatible with NULL cdict */ cctxParams.cParams = ZSTD_getCParamsFromCDict(cdict); cctxParams.fParams = fParams; - if (cdict==NULL) return ERROR(dictionary_wrong); /* method incompatible with NULL cdict */ return ZSTDMT_initCStream_internal(mtctx, NULL, 0 /*dictSize*/, ZSTD_dm_auto, cdict, cctxParams, pledgedSrcSize); } @@ -932,10 +932,10 @@ size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize) size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) { ZSTD_parameters const params = ZSTD_getParams(compressionLevel, ZSTD_CONTENTSIZE_UNKNOWN, 0); - ZSTD_CCtx_params cctxParams = zcs->params; + ZSTD_CCtx_params cctxParams = zcs->params; /* retrieve sticky params */ + DEBUGLOG(4, "ZSTDMT_initCStream (cLevel=%i)", compressionLevel); cctxParams.cParams = params.cParams; cctxParams.fParams = params.fParams; - DEBUGLOG(4, "ZSTDMT_initCStream (cLevel=%i)", compressionLevel); return ZSTDMT_initCStream_internal(zcs, NULL, 0, ZSTD_dm_auto, NULL, cctxParams, ZSTD_CONTENTSIZE_UNKNOWN); } @@ -945,13 +945,13 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi unsigned const jobID = zcs->nextJobID & zcs->jobIDMask; DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ", - zcs->nextJobID, (U32)srcSize, (U32)zcs->dictSize); + zcs->nextJobID, (U32)srcSize, (U32)zcs->prefixSize); zcs->jobs[jobID].src = zcs->inBuff.buffer; zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start; zcs->jobs[jobID].srcSize = srcSize; zcs->jobs[jobID].readSize = 0; - zcs->jobs[jobID].prefixSize = zcs->dictSize; - assert(zcs->inBuff.filled >= srcSize + zcs->dictSize); + zcs->jobs[jobID].prefixSize = zcs->prefixSize; + assert(zcs->inBuff.filled >= srcSize + zcs->prefixSize); zcs->jobs[jobID].params = zcs->params; /* do not calculate checksum within sections, but write it in header for first section */ if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0; @@ -968,11 +968,11 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond; if (zcs->params.fParams.checksumFlag) - XXH64_update(&zcs->xxhState, (const char*)zcs->inBuff.buffer.start + zcs->dictSize, srcSize); + XXH64_update(&zcs->xxhState, (const char*)zcs->inBuff.buffer.start + zcs->prefixSize, srcSize); /* get a new buffer for next input */ if (!endFrame) { - size_t const newDictSize = MIN(srcSize + zcs->dictSize, zcs->targetDictSize); + size_t const newPrefixSize = MIN(srcSize + zcs->prefixSize, zcs->targetPrefixSize); zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->bufPool); if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */ zcs->jobs[jobID].jobCompleted = 1; @@ -981,15 +981,15 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi ZSTDMT_releaseAllJobResources(zcs); return ERROR(memory_allocation); } - zcs->inBuff.filled -= srcSize + zcs->dictSize - newDictSize; + zcs->inBuff.filled -= srcSize + zcs->prefixSize - newPrefixSize; memmove(zcs->inBuff.buffer.start, - (const char*)zcs->jobs[jobID].srcStart + zcs->dictSize + srcSize - newDictSize, + (const char*)zcs->jobs[jobID].srcStart + zcs->prefixSize + srcSize - newPrefixSize, zcs->inBuff.filled); - zcs->dictSize = newDictSize; + zcs->prefixSize = newPrefixSize; } else { /* if (endFrame==1) */ zcs->inBuff.buffer = g_nullBuffer; zcs->inBuff.filled = 0; - zcs->dictSize = 0; + zcs->prefixSize = 0; zcs->frameEnded = 1; if (zcs->nextJobID == 0) { /* single chunk exception : checksum is calculated directly within worker thread */ @@ -1028,17 +1028,17 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi { ZSTDMT_jobDescription job = zcs->jobs[wJobID]; if (!job.jobScanned) { if (ZSTD_isError(job.cSize)) { - DEBUGLOG(5, "job %u : compression error detected : %s", + DEBUGLOG(5, "ZSTDMT_flushNextJob: job %u : compression error detected : %s", zcs->doneJobID, ZSTD_getErrorName(job.cSize)); ZSTDMT_waitForAllJobsCompleted(zcs); ZSTDMT_releaseAllJobResources(zcs); return job.cSize; } - DEBUGLOG(5, "zcs->params.fParams.checksumFlag : %u ", zcs->params.fParams.checksumFlag); + DEBUGLOG(5, "ZSTDMT_flushNextJob: zcs->params.fParams.checksumFlag : %u ", zcs->params.fParams.checksumFlag); if (zcs->params.fParams.checksumFlag) { if (zcs->frameEnded && (zcs->doneJobID+1 == zcs->nextJobID)) { /* write checksum at end of last section */ U32 const checksum = (U32)XXH64_digest(&zcs->xxhState); - DEBUGLOG(5, "writing checksum : %08X \n", checksum); + DEBUGLOG(5, "ZSTDMT_flushNextJob: writing checksum : %08X \n", checksum); MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum); job.cSize += 4; zcs->jobs[wJobID].cSize += 4; @@ -1046,7 +1046,7 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi zcs->jobs[wJobID].jobScanned = 1; } { size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos); - DEBUGLOG(5, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID); + DEBUGLOG(5, "ZSTDMT_flushNextJob: Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID); memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite); output->pos += toWrite; job.dstFlushed += toWrite; @@ -1076,7 +1076,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, ZSTD_inBuffer* input, ZSTD_EndDirective endOp) { - size_t const newJobThreshold = mtctx->dictSize + mtctx->targetSectionSize; + size_t const newJobThreshold = mtctx->prefixSize + mtctx->targetSectionSize; unsigned forwardInputProgress = 0; DEBUGLOG(5, "ZSTDMT_compressStream_generic "); assert(output->pos <= output->size); @@ -1160,7 +1160,7 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, unsigned endFrame) { - size_t const srcSize = mtctx->inBuff.filled - mtctx->dictSize; + size_t const srcSize = mtctx->inBuff.filled - mtctx->prefixSize; DEBUGLOG(5, "ZSTDMT_flushStream_internal"); if ( ((srcSize > 0) || (endFrame && !mtctx->frameEnded)) From 58dd7de640e630f19d64a10fbecb52f2628e9750 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Wed, 17 Jan 2018 12:10:15 -0800 Subject: [PATCH 8/9] zstdmt: fixed an endless loop on allocation failure this happened on 32-bits build when requiring a too large input buffer, typically on wlog=29, creating jobs of 2 GB size. also : zstd32 now compiles with multithread support enabled by default (can be disabled with HAVE_THREAD=0) --- lib/compress/zstdmt_compress.c | 21 +++++++++++++++------ programs/Makefile | 2 ++ 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index c65f3ea1..fe6ca52c 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -150,7 +150,9 @@ static void ZSTDMT_setBufferSize(ZSTDMT_bufferPool* const bufPool, size_t const } /** ZSTDMT_getBuffer() : - * assumption : bufPool must be valid */ + * assumption : bufPool must be valid + * @return : a buffer, with start pointer and size + * note: allocation may fail, in this case, start==NULL and size==0 */ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool) { size_t const bSize = bufPool->bufferSize; @@ -178,7 +180,11 @@ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool) void* const start = ZSTD_malloc(bSize, bufPool->cMem); buffer.start = start; /* note : start can be NULL if malloc fails ! */ buffer.size = (start==NULL) ? 0 : bSize; - DEBUGLOG(5, "ZSTDMT_getBuffer: created buffer of size %u", (U32)bSize); + if (start==NULL) { + DEBUGLOG(5, "ZSTDMT_getBuffer: buffer allocation failure !!"); + } else { + DEBUGLOG(5, "ZSTDMT_getBuffer: created buffer of size %u", (U32)bSize); + } return buffer; } } @@ -1015,7 +1021,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush) { unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask; - DEBUGLOG(5, "ZSTDMT_flushNextJob"); + DEBUGLOG(5, "ZSTDMT_flushNextJob (blocking:%u)", blockToFlush); if (zcs->doneJobID == zcs->nextJobID) return 0; /* all flushed ! */ ZSTD_PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex); while (zcs->jobs[wJobID].jobCompleted==0) { @@ -1112,10 +1118,13 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, /* fill input buffer */ if (input->size > input->pos) { /* support NULL input */ if (mtctx->inBuff.buffer.start == NULL) { - mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool); /* note : may fail, in which case, no forward input progress */ + mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool); /* note : allocation can fail, in which case, no forward input progress */ mtctx->inBuff.filled = 0; - } - if (mtctx->inBuff.buffer.start) { + if ( (mtctx->inBuff.buffer.start == NULL) /* allocation failure */ + && (mtctx->doneJobID == mtctx->nextJobID) ) { /* and nothing to flush */ + return ERROR(memory_allocation); /* no forward progress possible => output an error */ + } } + if (mtctx->inBuff.buffer.start != NULL) { size_t const toLoad = MIN(input->size - input->pos, mtctx->inBuffSize - mtctx->inBuff.filled); DEBUGLOG(5, "inBuff:%08X; inBuffSize=%u; ToCopy=%u", (U32)(size_t)mtctx->inBuff.buffer.start, (U32)mtctx->inBuffSize, (U32)toLoad); memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, toLoad); diff --git a/programs/Makefile b/programs/Makefile index 51888d3e..be666b4e 100644 --- a/programs/Makefile +++ b/programs/Makefile @@ -157,6 +157,8 @@ endif zstd-release: DEBUGFLAGS := zstd-release: zstd +zstd32 : CPPFLAGS += $(THREAD_CPP) +zstd32 : LDFLAGS += $(THREAD_LD) zstd32 : CPPFLAGS += -DZSTD_LEGACY_SUPPORT=$(ZSTD_LEGACY_SUPPORT) zstd32 : $(ZSTDLIB_FILES) zstdcli.c fileio.c bench.c datagen.c dibio.c ifneq (,$(filter Windows%,$(OS))) From d14cc881b0f039d641cd3a7df33bad897320e654 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Wed, 17 Jan 2018 12:39:58 -0800 Subject: [PATCH 9/9] zstdmt : fixed very large window sizes would create too large buffers, since default job size == window size * 4. This would crash on 32-bit systems. Also : jobSize being a 32-bit unsigned, it cannot be >= 4 GB, so the formula was failing for large window sizes >= 1 GB. Fixed now : max job Size is 2 GB, whatever the window size. --- lib/compress/zstdmt_compress.c | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index fe6ca52c..b3ccfe39 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -11,6 +11,7 @@ /* ====== Tuning parameters ====== */ #define ZSTDMT_NBTHREADS_MAX 200 +#define ZSTDMT_JOBSIZE_MAX (MEM_32bits() ? (512 MB) : (2 GB)) /* note : limited by `jobSize` type, which is `unsigned` */ #define ZSTDMT_OVERLAPLOG_DEFAULT 6 @@ -843,7 +844,14 @@ size_t ZSTDMT_initCStream_internal( assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams))); assert(!((dict) && (cdict))); /* either dict or cdict, not both */ assert(zcs->cctxPool->totalCCtx == params.nbThreads); - zcs->singleBlockingThread = pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN; /* do not trigger multi-threading when srcSize is too small */ + zcs->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN); /* do not trigger multi-threading when srcSize is too small */ + if (params.jobSize == 0) { + if (params.cParams.windowLog >= 29) + params.jobSize = ZSTDMT_JOBSIZE_MAX; + else + params.jobSize = 1 << (params.cParams.windowLog + 2); + } + if (params.jobSize > ZSTDMT_JOBSIZE_MAX) params.jobSize = ZSTDMT_JOBSIZE_MAX; if (zcs->singleBlockingThread) { ZSTD_CCtx_params const singleThreadParams = ZSTDMT_initJobCCtxParams(params); @@ -879,7 +887,7 @@ size_t ZSTDMT_initCStream_internal( assert(params.overlapSizeLog <= 9); zcs->targetPrefixSize = (params.overlapSizeLog==0) ? 0 : (size_t)1 << (params.cParams.windowLog - (9 - params.overlapSizeLog)); DEBUGLOG(4, "overlapLog=%u => %u KB", params.overlapSizeLog, (U32)(zcs->targetPrefixSize>>10)); - zcs->targetSectionSize = params.jobSize ? params.jobSize : (size_t)1 << (params.cParams.windowLog + 2); + zcs->targetSectionSize = params.jobSize; if (zcs->targetSectionSize < ZSTDMT_JOBSIZE_MIN) zcs->targetSectionSize = ZSTDMT_JOBSIZE_MIN; if (zcs->targetSectionSize < zcs->targetPrefixSize) zcs->targetSectionSize = zcs->targetPrefixSize; /* job size must be >= overlap size */ DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(zcs->targetSectionSize>>10), params.jobSize);