first implementation of bench.c with new API ZSTD_compress_generic()

Doesn't speed optimize this buffer-to-buffer scenario yet.
Still internally defers to streaming implementation.

Also : fixed a long standing bug in ZSTDMT streaming API.
This commit is contained in:
Yann Collet 2017-06-19 18:25:35 -07:00
parent 008d44ad66
commit c08e649e95
4 changed files with 110 additions and 66 deletions

View File

@ -13,7 +13,7 @@
<li><a href="#Chapter3">Simple API</a></li> <li><a href="#Chapter3">Simple API</a></li>
<li><a href="#Chapter4">Explicit memory management</a></li> <li><a href="#Chapter4">Explicit memory management</a></li>
<li><a href="#Chapter5">Simple dictionary API</a></li> <li><a href="#Chapter5">Simple dictionary API</a></li>
<li><a href="#Chapter6">Fast dictionary API</a></li> <li><a href="#Chapter6">Bulk processing dictionary API</a></li>
<li><a href="#Chapter7">Streaming</a></li> <li><a href="#Chapter7">Streaming</a></li>
<li><a href="#Chapter8">Streaming compression - HowTo</a></li> <li><a href="#Chapter8">Streaming compression - HowTo</a></li>
<li><a href="#Chapter9">Streaming decompression - HowTo</a></li> <li><a href="#Chapter9">Streaming decompression - HowTo</a></li>
@ -40,18 +40,18 @@
- a single step (described as Simple API) - a single step (described as Simple API)
- a single step, reusing a context (described as Explicit memory management) - a single step, reusing a context (described as Explicit memory management)
- unbounded multiple steps (described as Streaming compression) - unbounded multiple steps (described as Streaming compression)
The compression ratio achievable on small data can be highly improved using compression with a dictionary in: The compression ratio achievable on small data can be highly improved using a dictionary in:
- a single step (described as Simple dictionary API) - a single step (described as Simple dictionary API)
- a single step, reusing a dictionary (described as Fast dictionary API) - a single step, reusing a dictionary (described as Fast dictionary API)
Advanced experimental functions can be accessed using #define ZSTD_STATIC_LINKING_ONLY before including zstd.h. Advanced experimental functions can be accessed using #define ZSTD_STATIC_LINKING_ONLY before including zstd.h.
These APIs shall never be used with a dynamic library. Advanced experimental APIs shall never be used with a dynamic library.
They are not "stable", their definition may change in the future. Only static linking is allowed. They are not "stable", their definition may change in the future. Only static linking is allowed.
<BR></pre> <BR></pre>
<a name="Chapter2"></a><h2>Version</h2><pre></pre> <a name="Chapter2"></a><h2>Version</h2><pre></pre>
<pre><b>unsigned ZSTD_versionNumber(void); </b>/**< to be used when checking dll version */<b> <pre><b>unsigned ZSTD_versionNumber(void); </b>/**< useful to check dll version */<b>
</b></pre><BR> </b></pre><BR>
<a name="Chapter3"></a><h2>Simple API</h2><pre></pre> <a name="Chapter3"></a><h2>Simple API</h2><pre></pre>
@ -67,7 +67,7 @@
<pre><b>size_t ZSTD_decompress( void* dst, size_t dstCapacity, <pre><b>size_t ZSTD_decompress( void* dst, size_t dstCapacity,
const void* src, size_t compressedSize); const void* src, size_t compressedSize);
</b><p> `compressedSize` : must be the _exact_ size of some number of compressed and/or skippable frames. </b><p> `compressedSize` : must be the _exact_ size of some number of compressed and/or skippable frames.
`dstCapacity` is an upper bound of originalSize. `dstCapacity` is an upper bound of originalSize to regenerate.
If user cannot imply a maximum upper bound, it's better to use streaming mode to decompress data. If user cannot imply a maximum upper bound, it's better to use streaming mode to decompress data.
@return : the number of bytes decompressed into `dst` (<= `dstCapacity`), @return : the number of bytes decompressed into `dst` (<= `dstCapacity`),
or an errorCode if it fails (which can be tested using ZSTD_isError()). or an errorCode if it fails (which can be tested using ZSTD_isError()).
@ -140,33 +140,33 @@ size_t ZSTD_freeDCtx(ZSTD_DCtx* dctx);
const void* src, size_t srcSize, const void* src, size_t srcSize,
const void* dict,size_t dictSize, const void* dict,size_t dictSize,
int compressionLevel); int compressionLevel);
</b><p> Compression using a predefined Dictionary (see dictBuilder/zdict.h). </b><p> Compression using a predefined Dictionary (see dictBuilder/zdict.h).
Note : This function loads the dictionary, resulting in significant startup delay. Note : This function loads the dictionary, resulting in significant startup delay.
Note : When `dict == NULL || dictSize < 8` no dictionary is used. Note : When `dict == NULL || dictSize < 8` no dictionary is used.
</p></pre><BR> </p></pre><BR>
<pre><b>size_t ZSTD_decompress_usingDict(ZSTD_DCtx* dctx, <pre><b>size_t ZSTD_decompress_usingDict(ZSTD_DCtx* dctx,
void* dst, size_t dstCapacity, void* dst, size_t dstCapacity,
const void* src, size_t srcSize, const void* src, size_t srcSize,
const void* dict,size_t dictSize); const void* dict,size_t dictSize);
</b><p> Decompression using a predefined Dictionary (see dictBuilder/zdict.h). </b><p> Decompression using a predefined Dictionary (see dictBuilder/zdict.h).
Dictionary must be identical to the one used during compression. Dictionary must be identical to the one used during compression.
Note : This function loads the dictionary, resulting in significant startup delay. Note : This function loads the dictionary, resulting in significant startup delay.
Note : When `dict == NULL || dictSize < 8` no dictionary is used. Note : When `dict == NULL || dictSize < 8` no dictionary is used.
</p></pre><BR> </p></pre><BR>
<a name="Chapter6"></a><h2>Fast dictionary API</h2><pre></pre> <a name="Chapter6"></a><h2>Bulk processing dictionary API</h2><pre></pre>
<pre><b>ZSTD_CDict* ZSTD_createCDict(const void* dictBuffer, size_t dictSize, <pre><b>ZSTD_CDict* ZSTD_createCDict(const void* dictBuffer, size_t dictSize,
int compressionLevel); int compressionLevel);
</b><p> When compressing multiple messages / blocks with the same dictionary, it's recommended to load it just once. </b><p> When compressing multiple messages / blocks with the same dictionary, it's recommended to load it just once.
ZSTD_createCDict() will create a digested dictionary, ready to start future compression operations without startup delay. ZSTD_createCDict() will create a digested dictionary, ready to start future compression operations without startup delay.
ZSTD_CDict can be created once and used by multiple threads concurrently, as its usage is read-only. ZSTD_CDict can be created once and shared by multiple threads concurrently, since its usage is read-only.
`dictBuffer` can be released after ZSTD_CDict creation, as its content is copied within CDict `dictBuffer` can be released after ZSTD_CDict creation, since its content is copied within CDict
</p></pre><BR> </p></pre><BR>
<pre><b>size_t ZSTD_freeCDict(ZSTD_CDict* CDict); <pre><b>size_t ZSTD_freeCDict(ZSTD_CDict* CDict);
</b><p> Function frees memory allocated by ZSTD_createCDict(). </b><p> Function frees memory allocated by ZSTD_createCDict().
</p></pre><BR> </p></pre><BR>
<pre><b>size_t ZSTD_compress_usingCDict(ZSTD_CCtx* cctx, <pre><b>size_t ZSTD_compress_usingCDict(ZSTD_CCtx* cctx,
@ -180,20 +180,20 @@ size_t ZSTD_freeDCtx(ZSTD_DCtx* dctx);
</p></pre><BR> </p></pre><BR>
<pre><b>ZSTD_DDict* ZSTD_createDDict(const void* dictBuffer, size_t dictSize); <pre><b>ZSTD_DDict* ZSTD_createDDict(const void* dictBuffer, size_t dictSize);
</b><p> Create a digested dictionary, ready to start decompression operation without startup delay. </b><p> Create a digested dictionary, ready to start decompression operation without startup delay.
dictBuffer can be released after DDict creation, as its content is copied inside DDict dictBuffer can be released after DDict creation, as its content is copied inside DDict
</p></pre><BR> </p></pre><BR>
<pre><b>size_t ZSTD_freeDDict(ZSTD_DDict* ddict); <pre><b>size_t ZSTD_freeDDict(ZSTD_DDict* ddict);
</b><p> Function frees memory allocated with ZSTD_createDDict() </b><p> Function frees memory allocated with ZSTD_createDDict()
</p></pre><BR> </p></pre><BR>
<pre><b>size_t ZSTD_decompress_usingDDict(ZSTD_DCtx* dctx, <pre><b>size_t ZSTD_decompress_usingDDict(ZSTD_DCtx* dctx,
void* dst, size_t dstCapacity, void* dst, size_t dstCapacity,
const void* src, size_t srcSize, const void* src, size_t srcSize,
const ZSTD_DDict* ddict); const ZSTD_DDict* ddict);
</b><p> Decompression using a digested Dictionary. </b><p> Decompression using a digested Dictionary.
Faster startup than ZSTD_decompress_usingDict(), recommended when same dictionary is used multiple times. Faster startup than ZSTD_decompress_usingDict(), recommended when same dictionary is used multiple times.
</p></pre><BR> </p></pre><BR>
<a name="Chapter7"></a><h2>Streaming</h2><pre></pre> <a name="Chapter7"></a><h2>Streaming</h2><pre></pre>
@ -250,7 +250,7 @@ size_t ZSTD_freeDCtx(ZSTD_DCtx* dctx);
<BR></pre> <BR></pre>
<pre><b>typedef ZSTD_CCtx ZSTD_CStream; </b>/**< CCtx and CStream are effectively same object */<b> <pre><b>typedef ZSTD_CCtx ZSTD_CStream; </b>/**< CCtx and CStream are now effectively same object (>= v1.3.0) */<b>
</b></pre><BR> </b></pre><BR>
<h3>ZSTD_CStream management functions</h3><pre></pre><b><pre>ZSTD_CStream* ZSTD_createCStream(void); <h3>ZSTD_CStream management functions</h3><pre></pre><b><pre>ZSTD_CStream* ZSTD_createCStream(void);
size_t ZSTD_freeCStream(ZSTD_CStream* zcs); size_t ZSTD_freeCStream(ZSTD_CStream* zcs);
@ -285,6 +285,8 @@ size_t ZSTD_endStream(ZSTD_CStream* zcs, ZSTD_outBuffer* output);
<BR></pre> <BR></pre>
<pre><b>typedef ZSTD_DCtx ZSTD_DStream; </b>/**< DCtx and DStream are now effectively same object (>= v1.3.0) */<b>
</b></pre><BR>
<h3>ZSTD_DStream management functions</h3><pre></pre><b><pre>ZSTD_DStream* ZSTD_createDStream(void); <h3>ZSTD_DStream management functions</h3><pre></pre><b><pre>ZSTD_DStream* ZSTD_createDStream(void);
size_t ZSTD_freeDStream(ZSTD_DStream* zds); size_t ZSTD_freeDStream(ZSTD_DStream* zds);
</pre></b><BR> </pre></b><BR>

View File

@ -571,7 +571,7 @@ static U32 ZSTD_equivalentParams(ZSTD_compressionParameters cParams1,
} }
/*! ZSTD_continueCCtx() : /*! ZSTD_continueCCtx() :
reuse CCtx without reset (note : requires no dictionary) */ * reuse CCtx without reset (note : requires no dictionary) */
static size_t ZSTD_continueCCtx(ZSTD_CCtx* cctx, ZSTD_parameters params, U64 pledgedSrcSize) static size_t ZSTD_continueCCtx(ZSTD_CCtx* cctx, ZSTD_parameters params, U64 pledgedSrcSize)
{ {
U32 const end = (U32)(cctx->nextSrc - cctx->base); U32 const end = (U32)(cctx->nextSrc - cctx->base);
@ -3831,6 +3831,7 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx,
#ifdef ZSTD_MULTITHREAD #ifdef ZSTD_MULTITHREAD
if (cctx->nbThreads > 1) { if (cctx->nbThreads > 1) {
DEBUGLOG(4, "call ZSTDMT_initCStream_internal");
CHECK_F( ZSTDMT_initCStream_internal(cctx->mtctx, NULL, 0, cctx->cdict, params, cctx->pledgedSrcSizePlusOne-1) ); CHECK_F( ZSTDMT_initCStream_internal(cctx->mtctx, NULL, 0, cctx->cdict, params, cctx->pledgedSrcSizePlusOne-1) );
cctx->streamStage = zcss_load; cctx->streamStage = zcss_load;
} else } else
@ -3842,7 +3843,7 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx,
#ifdef ZSTD_MULTITHREAD #ifdef ZSTD_MULTITHREAD
if (cctx->nbThreads > 1) { if (cctx->nbThreads > 1) {
size_t const flushMin = ZSTDMT_compressStream_generic(cctx->mtctx, output, input, endOp); size_t const flushMin = ZSTDMT_compressStream_generic(cctx->mtctx, output, input, endOp);
DEBUGLOG(5, "ZSTDMT result : %u", (U32)flushMin); DEBUGLOG(4, "ZSTDMT_compressStream_generic : %u", (U32)flushMin);
if ( ZSTD_isError(flushMin) if ( ZSTD_isError(flushMin)
|| (endOp == ZSTD_e_end && flushMin == 0) ) { /* compression completed */ || (endOp == ZSTD_e_end && flushMin == 0) ) { /* compression completed */
ZSTD_startNewCompression(cctx); ZSTD_startNewCompression(cctx);

View File

@ -190,7 +190,7 @@ static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(unsigned nbThreads,
cctxPool->availCCtx = 1; /* at least one cctx for single-thread mode */ cctxPool->availCCtx = 1; /* at least one cctx for single-thread mode */
cctxPool->cctx[0] = ZSTD_createCCtx_advanced(cMem); cctxPool->cctx[0] = ZSTD_createCCtx_advanced(cMem);
if (!cctxPool->cctx[0]) { ZSTDMT_freeCCtxPool(cctxPool); return NULL; } if (!cctxPool->cctx[0]) { ZSTDMT_freeCCtxPool(cctxPool); return NULL; }
DEBUGLOG(3, "cctxPool created, with %u threads", nbThreads); DEBUGLOG(4, "cctxPool created, with %u threads", nbThreads);
return cctxPool; return cctxPool;
} }
@ -261,11 +261,11 @@ void ZSTDMT_compressChunk(void* jobDescription)
ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription; ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription;
const void* const src = (const char*)job->srcStart + job->dictSize; const void* const src = (const char*)job->srcStart + job->dictSize;
buffer_t const dstBuff = job->dstBuff; buffer_t const dstBuff = job->dstBuff;
DEBUGLOG(3, "job (first:%u) (last:%u) : dictSize %u, srcSize %u", DEBUGLOG(4, "job (first:%u) (last:%u) : dictSize %u, srcSize %u",
job->firstChunk, job->lastChunk, (U32)job->dictSize, (U32)job->srcSize); job->firstChunk, job->lastChunk, (U32)job->dictSize, (U32)job->srcSize);
if (job->cdict) { /* should only happen for first segment */ if (job->cdict) { /* should only happen for first segment */
size_t const initError = ZSTD_compressBegin_usingCDict_advanced(job->cctx, job->cdict, job->params.fParams, job->fullFrameSize); size_t const initError = ZSTD_compressBegin_usingCDict_advanced(job->cctx, job->cdict, job->params.fParams, job->fullFrameSize);
DEBUGLOG(3, "using CDict"); DEBUGLOG(5, "using CDict");
if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; } if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; }
} else { /* srcStart points at reloaded section */ } else { /* srcStart points at reloaded section */
if (!job->firstChunk) job->params.fParams.contentSizeFlag = 0; /* ensure no srcSize control */ if (!job->firstChunk) job->params.fParams.contentSizeFlag = 0; /* ensure no srcSize control */
@ -280,12 +280,12 @@ void ZSTDMT_compressChunk(void* jobDescription)
ZSTD_invalidateRepCodes(job->cctx); ZSTD_invalidateRepCodes(job->cctx);
} }
DEBUGLOG(4, "Compressing : "); DEBUGLOG(5, "Compressing : ");
DEBUG_PRINTHEX(4, job->srcStart, 12); DEBUG_PRINTHEX(4, job->srcStart, 12);
job->cSize = (job->lastChunk) ? job->cSize = (job->lastChunk) ?
ZSTD_compressEnd (job->cctx, dstBuff.start, dstBuff.size, src, job->srcSize) : ZSTD_compressEnd (job->cctx, dstBuff.start, dstBuff.size, src, job->srcSize) :
ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, src, job->srcSize); ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, src, job->srcSize);
DEBUGLOG(3, "compressed %u bytes into %u bytes (first:%u) (last:%u)", DEBUGLOG(4, "compressed %u bytes into %u bytes (first:%u) (last:%u)",
(unsigned)job->srcSize, (unsigned)job->cSize, job->firstChunk, job->lastChunk); (unsigned)job->srcSize, (unsigned)job->cSize, job->firstChunk, job->lastChunk);
DEBUGLOG(5, "dstBuff.size : %u ; => %s", (U32)dstBuff.size, ZSTD_getErrorName(job->cSize)); DEBUGLOG(5, "dstBuff.size : %u ; => %s", (U32)dstBuff.size, ZSTD_getErrorName(job->cSize));
@ -426,7 +426,7 @@ size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSDTMT_parameter parameter,
mtctx->sectionSize = value; mtctx->sectionSize = value;
return 0; return 0;
case ZSTDMT_p_overlapSectionLog : case ZSTDMT_p_overlapSectionLog :
DEBUGLOG(4, "ZSTDMT_p_overlapSectionLog : %u", value); DEBUGLOG(5, "ZSTDMT_p_overlapSectionLog : %u", value);
mtctx->overlapRLog = (value >= 9) ? 0 : 9 - value; mtctx->overlapRLog = (value >= 9) ? 0 : 9 - value;
return 0; return 0;
default : default :
@ -457,8 +457,8 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbChunks : (unsigned)(dstCapacity / ZSTD_compressBound(avgChunkSize)); /* presumes avgChunkSize >= 256 KB, which should be the case */ unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbChunks : (unsigned)(dstCapacity / ZSTD_compressBound(avgChunkSize)); /* presumes avgChunkSize >= 256 KB, which should be the case */
size_t frameStartPos = 0, dstBufferPos = 0; size_t frameStartPos = 0, dstBufferPos = 0;
DEBUGLOG(3, "windowLog : %2u => chunkTargetSize : %u bytes ", params.cParams.windowLog, (U32)chunkTargetSize); DEBUGLOG(4, "windowLog : %2u => chunkTargetSize : %u bytes ", params.cParams.windowLog, (U32)chunkTargetSize);
DEBUGLOG(3, "nbChunks : %2u (chunkSize : %u bytes) ", nbChunks, (U32)avgChunkSize); DEBUGLOG(4, "nbChunks : %2u (chunkSize : %u bytes) ", nbChunks, (U32)avgChunkSize);
params.fParams.contentSizeFlag = 1; params.fParams.contentSizeFlag = 1;
if (nbChunks==1) { /* fallback to single-thread mode */ if (nbChunks==1) { /* fallback to single-thread mode */
@ -495,8 +495,8 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
mtctx->jobs[u].jobCompleted_mutex = &mtctx->jobCompleted_mutex; mtctx->jobs[u].jobCompleted_mutex = &mtctx->jobCompleted_mutex;
mtctx->jobs[u].jobCompleted_cond = &mtctx->jobCompleted_cond; mtctx->jobs[u].jobCompleted_cond = &mtctx->jobCompleted_cond;
DEBUGLOG(3, "posting job %u (%u bytes)", u, (U32)chunkSize); DEBUGLOG(4, "posting job %u (%u bytes)", u, (U32)chunkSize);
DEBUG_PRINTHEX(3, mtctx->jobs[u].srcStart, 12); DEBUG_PRINTHEX(5, mtctx->jobs[u].srcStart, 12);
POOL_add(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[u]); POOL_add(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[u]);
frameStartPos += chunkSize; frameStartPos += chunkSize;
@ -508,14 +508,14 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
{ unsigned chunkID; { unsigned chunkID;
size_t error = 0, dstPos = 0; size_t error = 0, dstPos = 0;
for (chunkID=0; chunkID<nbChunks; chunkID++) { for (chunkID=0; chunkID<nbChunks; chunkID++) {
DEBUGLOG(3, "waiting for chunk %u ", chunkID); DEBUGLOG(5, "waiting for chunk %u ", chunkID);
PTHREAD_MUTEX_LOCK(&mtctx->jobCompleted_mutex); PTHREAD_MUTEX_LOCK(&mtctx->jobCompleted_mutex);
while (mtctx->jobs[chunkID].jobCompleted==0) { while (mtctx->jobs[chunkID].jobCompleted==0) {
DEBUGLOG(4, "waiting for jobCompleted signal from chunk %u", chunkID); DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", chunkID);
pthread_cond_wait(&mtctx->jobCompleted_cond, &mtctx->jobCompleted_mutex); pthread_cond_wait(&mtctx->jobCompleted_cond, &mtctx->jobCompleted_mutex);
} }
pthread_mutex_unlock(&mtctx->jobCompleted_mutex); pthread_mutex_unlock(&mtctx->jobCompleted_mutex);
DEBUGLOG(3, "ready to write chunk %u ", chunkID); DEBUGLOG(5, "ready to write chunk %u ", chunkID);
ZSTDMT_releaseCCtx(mtctx->cctxPool, mtctx->jobs[chunkID].cctx); ZSTDMT_releaseCCtx(mtctx->cctxPool, mtctx->jobs[chunkID].cctx);
mtctx->jobs[chunkID].cctx = NULL; mtctx->jobs[chunkID].cctx = NULL;
@ -533,7 +533,7 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
dstPos += cSize ; dstPos += cSize ;
} }
} }
if (!error) DEBUGLOG(3, "compressed size : %u ", (U32)dstPos); if (!error) DEBUGLOG(4, "compressed size : %u ", (U32)dstPos);
return error ? error : dstPos; return error ? error : dstPos;
} }
@ -550,7 +550,7 @@ static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* zcs)
unsigned const jobID = zcs->doneJobID & zcs->jobIDMask; unsigned const jobID = zcs->doneJobID & zcs->jobIDMask;
PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex); PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
while (zcs->jobs[jobID].jobCompleted==0) { while (zcs->jobs[jobID].jobCompleted==0) {
DEBUGLOG(4, "waiting for jobCompleted signal from chunk %u", zcs->doneJobID); /* we want to block when waiting for data to flush */ DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", zcs->doneJobID); /* we want to block when waiting for data to flush */
pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex);
} }
pthread_mutex_unlock(&zcs->jobCompleted_mutex); pthread_mutex_unlock(&zcs->jobCompleted_mutex);
@ -593,12 +593,12 @@ size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs,
} }
zcs->targetDictSize = (zcs->overlapRLog>=9) ? 0 : (size_t)1 << (zcs->params.cParams.windowLog - zcs->overlapRLog); zcs->targetDictSize = (zcs->overlapRLog>=9) ? 0 : (size_t)1 << (zcs->params.cParams.windowLog - zcs->overlapRLog);
DEBUGLOG(4, "overlapRLog : %u ", zcs->overlapRLog); DEBUGLOG(5, "overlapRLog : %u ", zcs->overlapRLog);
DEBUGLOG(3, "overlap Size : %u KB", (U32)(zcs->targetDictSize>>10)); DEBUGLOG(5, "overlap Size : %u KB", (U32)(zcs->targetDictSize>>10));
zcs->targetSectionSize = zcs->sectionSize ? zcs->sectionSize : (size_t)1 << (zcs->params.cParams.windowLog + 2); zcs->targetSectionSize = zcs->sectionSize ? zcs->sectionSize : (size_t)1 << (zcs->params.cParams.windowLog + 2);
zcs->targetSectionSize = MAX(ZSTDMT_SECTION_SIZE_MIN, zcs->targetSectionSize); zcs->targetSectionSize = MAX(ZSTDMT_SECTION_SIZE_MIN, zcs->targetSectionSize);
zcs->targetSectionSize = MAX(zcs->targetDictSize, zcs->targetSectionSize); zcs->targetSectionSize = MAX(zcs->targetDictSize, zcs->targetSectionSize);
DEBUGLOG(3, "Section Size : %u KB", (U32)(zcs->targetSectionSize>>10)); DEBUGLOG(5, "Section Size : %u KB", (U32)(zcs->targetSectionSize>>10));
zcs->marginSize = zcs->targetSectionSize >> 2; zcs->marginSize = zcs->targetSectionSize >> 2;
zcs->inBuffSize = zcs->targetDictSize + zcs->targetSectionSize + zcs->marginSize; zcs->inBuffSize = zcs->targetDictSize + zcs->targetSectionSize + zcs->marginSize;
zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize); zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize);
@ -628,7 +628,8 @@ size_t ZSTDMT_initCStream_usingCDict(ZSTDMT_CCtx* mtctx,
ZSTD_parameters params = ZSTD_getParamsFromCDict(cdict); ZSTD_parameters params = ZSTD_getParamsFromCDict(cdict);
if (cdict==NULL) return ERROR(GENERIC); /* method incompatible with NULL cdict */ if (cdict==NULL) return ERROR(GENERIC); /* method incompatible with NULL cdict */
params.fParams = fParams; params.fParams = fParams;
return ZSTDMT_initCStream_internal(mtctx, NULL, 0, cdict, params, pledgedSrcSize); return ZSTDMT_initCStream_internal(mtctx, NULL, 0 /*dictSize*/, cdict,
params, pledgedSrcSize);
} }
@ -686,6 +687,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
/* get a new buffer for next input */ /* get a new buffer for next input */
if (!endFrame) { if (!endFrame) {
size_t const newDictSize = MIN(srcSize + zcs->dictSize, zcs->targetDictSize); size_t const newDictSize = MIN(srcSize + zcs->dictSize, zcs->targetDictSize);
DEBUGLOG(5, "ZSTDMT_createCompressionJob::endFrame = %u", endFrame);
zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize); zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize);
if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */ if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */
zcs->jobs[jobID].jobCompleted = 1; zcs->jobs[jobID].jobCompleted = 1;
@ -694,9 +696,9 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
ZSTDMT_releaseAllJobResources(zcs); ZSTDMT_releaseAllJobResources(zcs);
return ERROR(memory_allocation); return ERROR(memory_allocation);
} }
DEBUGLOG(5, "inBuff filled to %u", (U32)zcs->inBuff.filled); DEBUGLOG(5, "inBuff currently filled to %u", (U32)zcs->inBuff.filled);
zcs->inBuff.filled -= srcSize + zcs->dictSize - newDictSize; zcs->inBuff.filled -= srcSize + zcs->dictSize - newDictSize;
DEBUGLOG(5, "new job : filled to %u, with %u dict and %u src", DEBUGLOG(5, "new job : inBuff filled to %u, with %u dict and %u src",
(U32)zcs->inBuff.filled, (U32)newDictSize, (U32)zcs->inBuff.filled, (U32)newDictSize,
(U32)(zcs->inBuff.filled - newDictSize)); (U32)(zcs->inBuff.filled - newDictSize));
memmove(zcs->inBuff.buffer.start, memmove(zcs->inBuff.buffer.start,
@ -705,6 +707,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
DEBUGLOG(5, "new inBuff pre-filled"); DEBUGLOG(5, "new inBuff pre-filled");
zcs->dictSize = newDictSize; zcs->dictSize = newDictSize;
} else { /* if (endFrame==1) */ } else { /* if (endFrame==1) */
DEBUGLOG(5, "ZSTDMT_createCompressionJob::endFrame = %u", endFrame);
zcs->inBuff.buffer = g_nullBuffer; zcs->inBuff.buffer = g_nullBuffer;
zcs->inBuff.filled = 0; zcs->inBuff.filled = 0;
zcs->dictSize = 0; zcs->dictSize = 0;
@ -714,7 +717,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
zcs->params.fParams.checksumFlag = 0; zcs->params.fParams.checksumFlag = 0;
} }
DEBUGLOG(3, "posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)", DEBUGLOG(4, "posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)",
zcs->nextJobID, zcs->nextJobID,
(U32)zcs->jobs[jobID].srcSize, (U32)zcs->jobs[jobID].srcSize,
zcs->jobs[jobID].lastChunk, zcs->jobs[jobID].lastChunk,
@ -757,7 +760,7 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi
XXH64_update(&zcs->xxhState, (const char*)job.srcStart + job.dictSize, job.srcSize); XXH64_update(&zcs->xxhState, (const char*)job.srcStart + job.dictSize, job.srcSize);
if (zcs->frameEnded && (zcs->doneJobID+1 == zcs->nextJobID)) { /* write checksum at end of last section */ if (zcs->frameEnded && (zcs->doneJobID+1 == zcs->nextJobID)) { /* write checksum at end of last section */
U32 const checksum = (U32)XXH64_digest(&zcs->xxhState); U32 const checksum = (U32)XXH64_digest(&zcs->xxhState);
DEBUGLOG(4, "writing checksum : %08X \n", checksum); DEBUGLOG(5, "writing checksum : %08X \n", checksum);
MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum); MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum);
job.cSize += 4; job.cSize += 4;
zcs->jobs[wJobID].cSize += 4; zcs->jobs[wJobID].cSize += 4;
@ -792,23 +795,25 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi
size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input) size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
{ {
size_t const newJobThreshold = zcs->dictSize + zcs->targetSectionSize + zcs->marginSize; size_t const newJobThreshold = zcs->dictSize + zcs->targetSectionSize + zcs->marginSize;
if (zcs->frameEnded) if (zcs->frameEnded) {
/* current frame being ended. Only flush is allowed. Restart with init */ /* current frame being ended. Only flush is allowed. Or start new job with init */
DEBUGLOG(5, "ZSTDMT_compressStream: zcs::frameEnded==1");
return ERROR(stage_wrong); return ERROR(stage_wrong);
}
if (zcs->nbThreads==1) { if (zcs->nbThreads==1) {
return ZSTD_compressStream(zcs->cctxPool->cctx[0], output, input); return ZSTD_compressStream(zcs->cctxPool->cctx[0], output, input);
} }
/* fill input buffer */ /* fill input buffer */
{ size_t const toLoad = MIN(input->size - input->pos, zcs->inBuffSize - zcs->inBuff.filled); { size_t const toLoad = MIN(input->size - input->pos, zcs->inBuffSize - zcs->inBuff.filled);
memcpy((char*)zcs->inBuff.buffer.start + zcs->inBuff.filled, input->src, toLoad); memcpy((char*)zcs->inBuff.buffer.start + zcs->inBuff.filled, (const char*)input->src + input->pos, toLoad);
input->pos += toLoad; input->pos += toLoad;
zcs->inBuff.filled += toLoad; zcs->inBuff.filled += toLoad;
} }
if ( (zcs->inBuff.filled >= newJobThreshold) /* filled enough : let's compress */ if ( (zcs->inBuff.filled >= newJobThreshold) /* filled enough : let's compress */
&& (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) { /* avoid overwriting job round buffer */ && (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) { /* avoid overwriting job round buffer */
CHECK_F( ZSTDMT_createCompressionJob(zcs, zcs->targetSectionSize, 0 /* blockToFlush */) ); CHECK_F( ZSTDMT_createCompressionJob(zcs, zcs->targetSectionSize, 0 /* endFrame */) );
} }
/* check for data to flush */ /* check for data to flush */
@ -824,7 +829,8 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
size_t const srcSize = zcs->inBuff.filled - zcs->dictSize; size_t const srcSize = zcs->inBuff.filled - zcs->dictSize;
if (srcSize) if (srcSize)
DEBUGLOG(5, "flushing : %u bytes left to compress", (U32)srcSize); DEBUGLOG(5, "ZSTDMT_flushStream_internal : %u bytes left to compress",
(U32)srcSize);
if ( ((srcSize > 0) || (endFrame && !zcs->frameEnded)) if ( ((srcSize > 0) || (endFrame && !zcs->frameEnded))
&& (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) { && (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) {
DEBUGLOG(5, "create new job with %u bytes to compress", (U32)srcSize); DEBUGLOG(5, "create new job with %u bytes to compress", (U32)srcSize);
@ -842,6 +848,7 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output) size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output)
{ {
DEBUGLOG(5, "ZSTDMT_flushStream");
if (zcs->nbThreads==1) if (zcs->nbThreads==1)
return ZSTD_flushStream(zcs->cctxPool->cctx[0], output); return ZSTD_flushStream(zcs->cctxPool->cctx[0], output);
return ZSTDMT_flushStream_internal(zcs, output, 0 /* endFrame */); return ZSTDMT_flushStream_internal(zcs, output, 0 /* endFrame */);
@ -849,6 +856,7 @@ size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output)
size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output) size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output)
{ {
DEBUGLOG(5, "ZSTDMT_endStream");
if (zcs->nbThreads==1) if (zcs->nbThreads==1)
return ZSTD_endStream(zcs->cctxPool->cctx[0], output); return ZSTD_endStream(zcs->cctxPool->cctx[0], output);
return ZSTDMT_flushStream_internal(zcs, output, 1 /* endFrame */); return ZSTDMT_flushStream_internal(zcs, output, 1 /* endFrame */);
@ -859,17 +867,21 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
ZSTD_inBuffer* input, ZSTD_inBuffer* input,
ZSTD_EndDirective endOp) ZSTD_EndDirective endOp)
{ {
DEBUGLOG(5, "in: pos:%u / size:%u ; endOp=%u",
(U32)input->pos, (U32)input->size, (U32)endOp);
if (input->pos < input->size) /* exclude final flushes */ if (input->pos < input->size) /* exclude final flushes */
CHECK_F(ZSTDMT_compressStream(mtctx, output, input)); CHECK_F(ZSTDMT_compressStream(mtctx, output, input));
if (input->pos < input->size) endOp = ZSTD_e_continue;
switch(endOp) switch(endOp)
{ {
case ZSTD_e_flush: case ZSTD_e_flush:
return ZSTDMT_flushStream(mtctx, output); return ZSTDMT_flushStream(mtctx, output);
case ZSTD_e_end: case ZSTD_e_end:
DEBUGLOG(5, "endOp:%u; calling ZSTDMT_endStream", (U32)endOp);
return ZSTDMT_endStream(mtctx, output); return ZSTDMT_endStream(mtctx, output);
case ZSTD_e_continue: case ZSTD_e_continue:
return 1; return 1;
default: default:
return ERROR(GENERIC); return ERROR(GENERIC); /* invalid endDirective */
} }
} }

View File

@ -158,7 +158,6 @@ static int BMK_benchMem(const void* srcBuffer, size_t srcSize,
const ZSTD_compressionParameters* comprParams) const ZSTD_compressionParameters* comprParams)
{ {
size_t const blockSize = ((g_blockSize>=32 && !g_decodeOnly) ? g_blockSize : srcSize) + (!srcSize) /* avoid div by 0 */ ; size_t const blockSize = ((g_blockSize>=32 && !g_decodeOnly) ? g_blockSize : srcSize) + (!srcSize) /* avoid div by 0 */ ;
size_t const avgSize = MIN(blockSize, (srcSize / nbFiles));
U32 const maxNbBlocks = (U32) ((srcSize + (blockSize-1)) / blockSize) + nbFiles; U32 const maxNbBlocks = (U32) ((srcSize + (blockSize-1)) / blockSize) + nbFiles;
blockParam_t* const blockTable = (blockParam_t*) malloc(maxNbBlocks * sizeof(blockParam_t)); blockParam_t* const blockTable = (blockParam_t*) malloc(maxNbBlocks * sizeof(blockParam_t));
size_t const maxCompressedSize = ZSTD_compressBound(srcSize) + (maxNbBlocks * 1024); /* add some room for safety */ size_t const maxCompressedSize = ZSTD_compressBound(srcSize) + (maxNbBlocks * 1024); /* add some room for safety */
@ -261,42 +260,72 @@ static int BMK_benchMem(const void* srcBuffer, size_t srcSize,
UTIL_getTime(&clockStart); UTIL_getTime(&clockStart);
if (!cCompleted) { /* still some time to do compression tests */ if (!cCompleted) { /* still some time to do compression tests */
ZSTD_customMem const cmem = { NULL, NULL, NULL };
U64 const clockLoop = g_nbSeconds ? TIMELOOP_MICROSEC : 1; U64 const clockLoop = g_nbSeconds ? TIMELOOP_MICROSEC : 1;
U32 nbLoops = 0; U32 nbLoops = 0;
ZSTD_CDict* cdict = NULL;
#ifdef ZSTD_NEWAPI
ZSTD_CCtx_setParameter(ctx, ZSTD_p_nbThreads, g_nbThreads);
ZSTD_CCtx_setParameter(ctx, ZSTD_p_compressionLevel, cLevel);
ZSTD_CCtx_setParameter(ctx, ZSTD_p_windowLog, comprParams->windowLog);
ZSTD_CCtx_setParameter(ctx, ZSTD_p_chainLog, comprParams->chainLog);
ZSTD_CCtx_setParameter(ctx, ZSTD_p_searchLog, comprParams->searchLog);
ZSTD_CCtx_setParameter(ctx, ZSTD_p_minMatch, comprParams->searchLength);
ZSTD_CCtx_setParameter(ctx, ZSTD_p_targetLength, comprParams->targetLength);
ZSTD_CCtx_setParameter(ctx, ZSTD_p_compressionStrategy, comprParams->strategy);
ZSTD_CCtx_loadDictionary(ctx, dictBuffer, dictBufferSize);
#else
size_t const avgSize = MIN(blockSize, (srcSize / nbFiles));
ZSTD_parameters zparams = ZSTD_getParams(cLevel, avgSize, dictBufferSize); ZSTD_parameters zparams = ZSTD_getParams(cLevel, avgSize, dictBufferSize);
ZSTD_CDict* cdict; ZSTD_customMem const cmem = { NULL, NULL, NULL };
if (comprParams->windowLog) zparams.cParams.windowLog = comprParams->windowLog; if (comprParams->windowLog) zparams.cParams.windowLog = comprParams->windowLog;
if (comprParams->chainLog) zparams.cParams.chainLog = comprParams->chainLog; if (comprParams->chainLog) zparams.cParams.chainLog = comprParams->chainLog;
if (comprParams->hashLog) zparams.cParams.hashLog = comprParams->hashLog; if (comprParams->hashLog) zparams.cParams.hashLog = comprParams->hashLog;
if (comprParams->searchLog) zparams.cParams.searchLog = comprParams->searchLog; if (comprParams->searchLog) zparams.cParams.searchLog = comprParams->searchLog;
if (comprParams->searchLength) zparams.cParams.searchLength = comprParams->searchLength; if (comprParams->searchLength) zparams.cParams.searchLength = comprParams->searchLength;
if (comprParams->targetLength) zparams.cParams.targetLength = comprParams->targetLength; if (comprParams->targetLength) zparams.cParams.targetLength = comprParams->targetLength;
if (comprParams->strategy) zparams.cParams.strategy = (ZSTD_strategy)(comprParams->strategy - 1); if (comprParams->strategy) zparams.cParams.strategy = comprParams->strategy;
cdict = ZSTD_createCDict_advanced(dictBuffer, dictBufferSize, 1, zparams.cParams, cmem); cdict = ZSTD_createCDict_advanced(dictBuffer, dictBufferSize, 1, zparams.cParams, cmem);
if (cdict==NULL) EXM_THROW(1, "ZSTD_createCDict_advanced() allocation failure"); if (cdict==NULL) EXM_THROW(1, "ZSTD_createCDict_advanced() allocation failure");
#endif
do { do {
U32 blockNb; U32 blockNb;
size_t rSize;
for (blockNb=0; blockNb<nbBlocks; blockNb++) { for (blockNb=0; blockNb<nbBlocks; blockNb++) {
size_t rSize;
#ifdef ZSTD_NEWAPI
ZSTD_outBuffer out = { blockTable[blockNb].cPtr, blockTable[blockNb].cRoom, 0 };
ZSTD_inBuffer in = { blockTable[blockNb].srcPtr, blockTable[blockNb].srcSize, 0 };
size_t cError = 1;
while (cError) {
cError = ZSTD_compress_generic(ctx,
&out, &in, ZSTD_e_end);
if (ZSTD_isError(cError))
EXM_THROW(1, "ZSTD_compress_generic() error : %s",
ZSTD_getErrorName(cError));
}
rSize = out.pos;
#else /* ! ZSTD_NEWAPI */
if (dictBufferSize) { if (dictBufferSize) {
rSize = ZSTD_compress_usingCDict(ctx, rSize = ZSTD_compress_usingCDict(ctx,
blockTable[blockNb].cPtr, blockTable[blockNb].cRoom, blockTable[blockNb].cPtr, blockTable[blockNb].cRoom,
blockTable[blockNb].srcPtr,blockTable[blockNb].srcSize, blockTable[blockNb].srcPtr,blockTable[blockNb].srcSize,
cdict); cdict);
} else { } else {
#ifdef ZSTD_MULTITHREAD /* note : limitation : MT single-pass does not support compression with dictionary */ # ifdef ZSTD_MULTITHREAD /* note : limitation : MT single-pass does not support compression with dictionary */
rSize = ZSTDMT_compressCCtx(mtctx, rSize = ZSTDMT_compressCCtx(mtctx,
blockTable[blockNb].cPtr, blockTable[blockNb].cRoom, blockTable[blockNb].cPtr, blockTable[blockNb].cRoom,
blockTable[blockNb].srcPtr,blockTable[blockNb].srcSize, blockTable[blockNb].srcPtr,blockTable[blockNb].srcSize,
cLevel); cLevel);
#else # else
rSize = ZSTD_compress_advanced (ctx, rSize = ZSTD_compress_advanced (ctx,
blockTable[blockNb].cPtr, blockTable[blockNb].cRoom, blockTable[blockNb].cPtr, blockTable[blockNb].cRoom,
blockTable[blockNb].srcPtr,blockTable[blockNb].srcSize, NULL, 0, zparams); blockTable[blockNb].srcPtr,blockTable[blockNb].srcSize,
#endif NULL, 0, zparams);
# endif
} }
if (ZSTD_isError(rSize)) EXM_THROW(1, "ZSTD_compress_usingCDict() failed : %s", ZSTD_getErrorName(rSize)); if (ZSTD_isError(rSize))
EXM_THROW(1, "ZSTD_compress_usingCDict() failed : %s",
ZSTD_getErrorName(rSize));
#endif /* ZSTD_NEWAPI */
blockTable[blockNb].cSize = rSize; blockTable[blockNb].cSize = rSize;
} }
nbLoops++; nbLoops++;
@ -382,7 +411,7 @@ static int BMK_benchMem(const void* srcBuffer, size_t srcSize,
} }
pos = (U32)(u - bacc); pos = (U32)(u - bacc);
bNb = pos / (128 KB); bNb = pos / (128 KB);
DISPLAY("(block %u, sub %u, pos %u) \n", segNb, bNb, pos); DISPLAY("(sample %u, block %u, pos %u) \n", segNb, bNb, pos);
if (u>5) { if (u>5) {
int n; int n;
for (n=-5; n<0; n++) DISPLAY("%02X ", ((const BYTE*)srcBuffer)[u+n]); for (n=-5; n<0; n++) DISPLAY("%02X ", ((const BYTE*)srcBuffer)[u+n]);