Merge pull request #2339 from terrelln/zstdmt-stability

Fix zstdmt stability issues and clean up the zstdmt code
dev
Nick Terrell 2020-10-27 19:43:13 -07:00 committed by GitHub
commit 599ff58e08
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 167 additions and 814 deletions

View File

@ -817,7 +817,6 @@ struct ZSTDMT_CCtx_s {
roundBuff_t roundBuff;
serialState_t serial;
rsyncState_t rsync;
unsigned singleBlockingThread;
unsigned jobIDMask;
unsigned doneJobID;
unsigned nextJobID;
@ -883,7 +882,7 @@ static size_t ZSTDMT_expandJobsTable (ZSTDMT_CCtx* mtctx, U32 nbWorkers) {
/* ZSTDMT_CCtxParam_setNbWorkers():
* Internal use only */
size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers)
static size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers)
{
return ZSTD_CCtxParams_setParameter(params, ZSTD_c_nbWorkers, (int)nbWorkers);
}
@ -942,11 +941,6 @@ ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbWorkers, ZSTD_customMem cMem,
#endif
}
ZSTDMT_CCtx* ZSTDMT_createCCtx(unsigned nbWorkers)
{
return ZSTDMT_createCCtx_advanced(nbWorkers, ZSTD_defaultCMem, NULL);
}
/* ZSTDMT_releaseAllJobResources() :
* note : ensure all workers are killed first ! */
@ -1018,65 +1012,6 @@ size_t ZSTDMT_sizeof_CCtx(ZSTDMT_CCtx* mtctx)
+ mtctx->roundBuff.capacity;
}
/* Internal only */
size_t
ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params,
ZSTDMT_parameter parameter,
int value)
{
DEBUGLOG(4, "ZSTDMT_CCtxParam_setMTCtxParameter");
switch(parameter)
{
case ZSTDMT_p_jobSize :
DEBUGLOG(4, "ZSTDMT_CCtxParam_setMTCtxParameter : set jobSize to %i", value);
return ZSTD_CCtxParams_setParameter(params, ZSTD_c_jobSize, value);
case ZSTDMT_p_overlapLog :
DEBUGLOG(4, "ZSTDMT_p_overlapLog : %i", value);
return ZSTD_CCtxParams_setParameter(params, ZSTD_c_overlapLog, value);
case ZSTDMT_p_rsyncable :
DEBUGLOG(4, "ZSTD_p_rsyncable : %i", value);
return ZSTD_CCtxParams_setParameter(params, ZSTD_c_rsyncable, value);
default :
return ERROR(parameter_unsupported);
}
}
size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, int value)
{
DEBUGLOG(4, "ZSTDMT_setMTCtxParameter");
return ZSTDMT_CCtxParam_setMTCtxParameter(&mtctx->params, parameter, value);
}
size_t ZSTDMT_getMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, int* value)
{
switch (parameter) {
case ZSTDMT_p_jobSize:
return ZSTD_CCtxParams_getParameter(&mtctx->params, ZSTD_c_jobSize, value);
case ZSTDMT_p_overlapLog:
return ZSTD_CCtxParams_getParameter(&mtctx->params, ZSTD_c_overlapLog, value);
case ZSTDMT_p_rsyncable:
return ZSTD_CCtxParams_getParameter(&mtctx->params, ZSTD_c_rsyncable, value);
default:
return ERROR(parameter_unsupported);
}
}
/* Sets parameters relevant to the compression job,
* initializing others to default values. */
static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(const ZSTD_CCtx_params* params)
{
ZSTD_CCtx_params jobParams = *params;
/* Clear parameters related to multithreading */
jobParams.forceWindow = 0;
jobParams.nbWorkers = 0;
jobParams.jobSize = 0;
jobParams.overlapLog = 0;
jobParams.rsyncable = 0;
ZSTD_memset(&jobParams.ldmParams, 0, sizeof(ldmParams_t));
ZSTD_memset(&jobParams.customMem, 0, sizeof(ZSTD_customMem));
return jobParams;
}
/* ZSTDMT_resize() :
* @return : error code if fails, 0 on success */
@ -1247,174 +1182,6 @@ static size_t ZSTDMT_computeOverlapSize(const ZSTD_CCtx_params* params)
return (ovLog==0) ? 0 : (size_t)1 << ovLog;
}
static unsigned
ZSTDMT_computeNbJobs(const ZSTD_CCtx_params* params, size_t srcSize, unsigned nbWorkers)
{
assert(nbWorkers>0);
{ size_t const jobSizeTarget = (size_t)1 << ZSTDMT_computeTargetJobLog(params);
size_t const jobMaxSize = jobSizeTarget << 2;
size_t const passSizeMax = jobMaxSize * nbWorkers;
unsigned const multiplier = (unsigned)(srcSize / passSizeMax) + 1;
unsigned const nbJobsLarge = multiplier * nbWorkers;
unsigned const nbJobsMax = (unsigned)(srcSize / jobSizeTarget) + 1;
unsigned const nbJobsSmall = MIN(nbJobsMax, nbWorkers);
return (multiplier>1) ? nbJobsLarge : nbJobsSmall;
} }
/* ZSTDMT_compress_advanced_internal() :
* This is a blocking function : it will only give back control to caller after finishing its compression job.
*/
static size_t
ZSTDMT_compress_advanced_internal(
ZSTDMT_CCtx* mtctx,
void* dst, size_t dstCapacity,
const void* src, size_t srcSize,
const ZSTD_CDict* cdict,
ZSTD_CCtx_params params)
{
ZSTD_CCtx_params const jobParams = ZSTDMT_initJobCCtxParams(&params);
size_t const overlapSize = ZSTDMT_computeOverlapSize(&params);
unsigned const nbJobs = ZSTDMT_computeNbJobs(&params, srcSize, params.nbWorkers);
size_t const proposedJobSize = (srcSize + (nbJobs-1)) / nbJobs;
size_t const avgJobSize = (((proposedJobSize-1) & 0x1FFFF) < 0x7FFF) ? proposedJobSize + 0xFFFF : proposedJobSize; /* avoid too small last block */
const char* const srcStart = (const char*)src;
size_t remainingSrcSize = srcSize;
unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbJobs : (unsigned)(dstCapacity / ZSTD_compressBound(avgJobSize)); /* presumes avgJobSize >= 256 KB, which should be the case */
size_t frameStartPos = 0, dstBufferPos = 0;
assert(jobParams.nbWorkers == 0);
assert(mtctx->cctxPool->totalCCtx == params.nbWorkers);
params.jobSize = (U32)avgJobSize;
DEBUGLOG(4, "ZSTDMT_compress_advanced_internal: nbJobs=%2u (rawSize=%u bytes; fixedSize=%u) ",
nbJobs, (U32)proposedJobSize, (U32)avgJobSize);
if ((nbJobs==1) | (params.nbWorkers<=1)) { /* fallback to single-thread mode : this is a blocking invocation anyway */
ZSTD_CCtx* const cctx = mtctx->cctxPool->cctx[0];
DEBUGLOG(4, "ZSTDMT_compress_advanced_internal: fallback to single-thread mode");
if (cdict) return ZSTD_compress_usingCDict_advanced(cctx, dst, dstCapacity, src, srcSize, cdict, jobParams.fParams);
return ZSTD_compress_advanced_internal(cctx, dst, dstCapacity, src, srcSize, NULL, 0, &jobParams);
}
assert(avgJobSize >= 256 KB); /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), required to compress directly into Dst (no additional buffer) */
ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgJobSize) );
/* LDM doesn't even try to load the dictionary in single-ingestion mode */
if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, avgJobSize, NULL, 0, ZSTD_dct_auto))
return ERROR(memory_allocation);
FORWARD_IF_ERROR( ZSTDMT_expandJobsTable(mtctx, nbJobs) , ""); /* only expands if necessary */
{ unsigned u;
for (u=0; u<nbJobs; u++) {
size_t const jobSize = MIN(remainingSrcSize, avgJobSize);
size_t const dstBufferCapacity = ZSTD_compressBound(jobSize);
buffer_t const dstAsBuffer = { (char*)dst + dstBufferPos, dstBufferCapacity };
buffer_t const dstBuffer = u < compressWithinDst ? dstAsBuffer : g_nullBuffer;
size_t dictSize = u ? overlapSize : 0;
mtctx->jobs[u].prefix.start = srcStart + frameStartPos - dictSize;
mtctx->jobs[u].prefix.size = dictSize;
mtctx->jobs[u].src.start = srcStart + frameStartPos;
mtctx->jobs[u].src.size = jobSize; assert(jobSize > 0); /* avoid job.src.size == 0 */
mtctx->jobs[u].consumed = 0;
mtctx->jobs[u].cSize = 0;
mtctx->jobs[u].cdict = (u==0) ? cdict : NULL;
mtctx->jobs[u].fullFrameSize = srcSize;
mtctx->jobs[u].params = jobParams;
/* do not calculate checksum within sections, but write it in header for first section */
mtctx->jobs[u].dstBuff = dstBuffer;
mtctx->jobs[u].cctxPool = mtctx->cctxPool;
mtctx->jobs[u].bufPool = mtctx->bufPool;
mtctx->jobs[u].seqPool = mtctx->seqPool;
mtctx->jobs[u].serial = &mtctx->serial;
mtctx->jobs[u].jobID = u;
mtctx->jobs[u].firstJob = (u==0);
mtctx->jobs[u].lastJob = (u==nbJobs-1);
DEBUGLOG(5, "ZSTDMT_compress_advanced_internal: posting job %u (%u bytes)", u, (U32)jobSize);
DEBUG_PRINTHEX(6, mtctx->jobs[u].prefix.start, 12);
POOL_add(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[u]);
frameStartPos += jobSize;
dstBufferPos += dstBufferCapacity;
remainingSrcSize -= jobSize;
} }
/* collect result */
{ size_t error = 0, dstPos = 0;
unsigned jobID;
for (jobID=0; jobID<nbJobs; jobID++) {
DEBUGLOG(5, "waiting for job %u ", jobID);
ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[jobID].job_mutex);
while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].src.size) {
DEBUGLOG(5, "waiting for jobCompleted signal from job %u", jobID);
ZSTD_pthread_cond_wait(&mtctx->jobs[jobID].job_cond, &mtctx->jobs[jobID].job_mutex);
}
ZSTD_pthread_mutex_unlock(&mtctx->jobs[jobID].job_mutex);
DEBUGLOG(5, "ready to write job %u ", jobID);
{ size_t const cSize = mtctx->jobs[jobID].cSize;
if (ZSTD_isError(cSize)) error = cSize;
if ((!error) && (dstPos + cSize > dstCapacity)) error = ERROR(dstSize_tooSmall);
if (jobID) { /* note : job 0 is written directly at dst, which is correct position */
if (!error)
ZSTD_memmove((char*)dst + dstPos, mtctx->jobs[jobID].dstBuff.start, cSize); /* may overlap when job compressed within dst */
if (jobID >= compressWithinDst) { /* job compressed into its own buffer, which must be released */
DEBUGLOG(5, "releasing buffer %u>=%u", jobID, compressWithinDst);
ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].dstBuff);
} }
mtctx->jobs[jobID].dstBuff = g_nullBuffer;
mtctx->jobs[jobID].cSize = 0;
dstPos += cSize ;
}
} /* for (jobID=0; jobID<nbJobs; jobID++) */
DEBUGLOG(4, "checksumFlag : %u ", params.fParams.checksumFlag);
if (params.fParams.checksumFlag) {
U32 const checksum = (U32)XXH64_digest(&mtctx->serial.xxhState);
if (dstPos + 4 > dstCapacity) {
error = ERROR(dstSize_tooSmall);
} else {
DEBUGLOG(4, "writing checksum : %08X \n", checksum);
MEM_writeLE32((char*)dst + dstPos, checksum);
dstPos += 4;
} }
if (!error) DEBUGLOG(4, "compressed size : %u ", (U32)dstPos);
return error ? error : dstPos;
}
}
size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx,
void* dst, size_t dstCapacity,
const void* src, size_t srcSize,
const ZSTD_CDict* cdict,
ZSTD_parameters params,
int overlapLog)
{
ZSTD_CCtx_params cctxParams = mtctx->params;
cctxParams.cParams = params.cParams;
cctxParams.fParams = params.fParams;
assert(ZSTD_OVERLAPLOG_MIN <= overlapLog && overlapLog <= ZSTD_OVERLAPLOG_MAX);
cctxParams.overlapLog = overlapLog;
return ZSTDMT_compress_advanced_internal(mtctx,
dst, dstCapacity,
src, srcSize,
cdict, cctxParams);
}
size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
void* dst, size_t dstCapacity,
const void* src, size_t srcSize,
int compressionLevel)
{
ZSTD_parameters params = ZSTD_getParams(compressionLevel, srcSize, 0);
int const overlapLog = ZSTDMT_overlapLog_default(params.cParams.strategy);
params.fParams.contentSizeFlag = 1;
return ZSTDMT_compress_advanced(mtctx, dst, dstCapacity, src, srcSize, NULL, params, overlapLog);
}
/* ====================================== */
/* ======= Streaming API ======= */
/* ====================================== */
@ -1439,16 +1206,6 @@ size_t ZSTDMT_initCStream_internal(
if (params.jobSize != 0 && params.jobSize < ZSTDMT_JOBSIZE_MIN) params.jobSize = ZSTDMT_JOBSIZE_MIN;
if (params.jobSize > (size_t)ZSTDMT_JOBSIZE_MAX) params.jobSize = (size_t)ZSTDMT_JOBSIZE_MAX;
mtctx->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN); /* do not trigger multi-threading when srcSize is too small */
if (mtctx->singleBlockingThread) {
ZSTD_CCtx_params const singleThreadParams = ZSTDMT_initJobCCtxParams(&params);
DEBUGLOG(5, "ZSTDMT_initCStream_internal: switch to single blocking thread mode");
assert(singleThreadParams.nbWorkers == 0);
return ZSTD_initCStream_internal(mtctx->cctxPool->cctx[0],
dict, dictSize, cdict,
&singleThreadParams, pledgedSrcSize);
}
DEBUGLOG(4, "ZSTDMT_initCStream_internal: %u workers", params.nbWorkers);
if (mtctx->allJobsCompleted == 0) { /* previous compression not correctly finished */
@ -1537,53 +1294,6 @@ size_t ZSTDMT_initCStream_internal(
return 0;
}
size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* mtctx,
const void* dict, size_t dictSize,
ZSTD_parameters params,
unsigned long long pledgedSrcSize)
{
ZSTD_CCtx_params cctxParams = mtctx->params; /* retrieve sticky params */
DEBUGLOG(4, "ZSTDMT_initCStream_advanced (pledgedSrcSize=%u)", (U32)pledgedSrcSize);
cctxParams.cParams = params.cParams;
cctxParams.fParams = params.fParams;
return ZSTDMT_initCStream_internal(mtctx, dict, dictSize, ZSTD_dct_auto, NULL,
cctxParams, pledgedSrcSize);
}
size_t ZSTDMT_initCStream_usingCDict(ZSTDMT_CCtx* mtctx,
const ZSTD_CDict* cdict,
ZSTD_frameParameters fParams,
unsigned long long pledgedSrcSize)
{
ZSTD_CCtx_params cctxParams = mtctx->params;
if (cdict==NULL) return ERROR(dictionary_wrong); /* method incompatible with NULL cdict */
cctxParams.cParams = ZSTD_getCParamsFromCDict(cdict);
cctxParams.fParams = fParams;
return ZSTDMT_initCStream_internal(mtctx, NULL, 0 /*dictSize*/, ZSTD_dct_auto, cdict,
cctxParams, pledgedSrcSize);
}
/* ZSTDMT_resetCStream() :
* pledgedSrcSize can be zero == unknown (for the time being)
* prefer using ZSTD_CONTENTSIZE_UNKNOWN,
* as `0` might mean "empty" in the future */
size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* mtctx, unsigned long long pledgedSrcSize)
{
if (!pledgedSrcSize) pledgedSrcSize = ZSTD_CONTENTSIZE_UNKNOWN;
return ZSTDMT_initCStream_internal(mtctx, NULL, 0, ZSTD_dct_auto, 0, mtctx->params,
pledgedSrcSize);
}
size_t ZSTDMT_initCStream(ZSTDMT_CCtx* mtctx, int compressionLevel) {
ZSTD_parameters const params = ZSTD_getParams(compressionLevel, ZSTD_CONTENTSIZE_UNKNOWN, 0);
ZSTD_CCtx_params cctxParams = mtctx->params; /* retrieve sticky params */
DEBUGLOG(4, "ZSTDMT_initCStream (cLevel=%i)", compressionLevel);
cctxParams.cParams = params.cParams;
cctxParams.fParams = params.fParams;
return ZSTDMT_initCStream_internal(mtctx, NULL, 0, ZSTD_dct_auto, NULL, cctxParams, ZSTD_CONTENTSIZE_UNKNOWN);
}
/* ZSTDMT_writeLastEmptyBlock()
* Write a single empty block with an end-of-frame to finish a frame.
@ -1975,6 +1685,16 @@ findSynchronizationPoint(ZSTDMT_CCtx const* mtctx, ZSTD_inBuffer const input)
pos = 0;
prev = (BYTE const*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled - RSYNC_LENGTH;
hash = ZSTD_rollingHash_compute(prev, RSYNC_LENGTH);
if ((hash & hitMask) == hitMask) {
/* We're already at a sync point so don't load any more until
* we're able to flush this sync point.
* This likely happened because the job table was full so we
* couldn't add our job.
*/
syncPoint.toLoad = 0;
syncPoint.flush = 1;
return syncPoint;
}
} else {
/* We don't have enough bytes buffered to initialize the hash, but
* we know we have at least RSYNC_LENGTH bytes total.
@ -2029,34 +1749,11 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
assert(output->pos <= output->size);
assert(input->pos <= input->size);
if (mtctx->singleBlockingThread) { /* delegate to single-thread (synchronous) */
return ZSTD_compressStream2(mtctx->cctxPool->cctx[0], output, input, endOp);
}
if ((mtctx->frameEnded) && (endOp==ZSTD_e_continue)) {
/* current frame being ended. Only flush/end are allowed */
return ERROR(stage_wrong);
}
/* single-pass shortcut (note : synchronous-mode) */
if ( (!mtctx->params.rsyncable) /* rsyncable mode is disabled */
&& (mtctx->nextJobID == 0) /* just started */
&& (mtctx->inBuff.filled == 0) /* nothing buffered */
&& (!mtctx->jobReady) /* no job already created */
&& (endOp == ZSTD_e_end) /* end order */
&& (output->size - output->pos >= ZSTD_compressBound(input->size - input->pos)) ) { /* enough space in dst */
size_t const cSize = ZSTDMT_compress_advanced_internal(mtctx,
(char*)output->dst + output->pos, output->size - output->pos,
(const char*)input->src + input->pos, input->size - input->pos,
mtctx->cdict, mtctx->params);
if (ZSTD_isError(cSize)) return cSize;
input->pos = input->size;
output->pos += cSize;
mtctx->allJobsCompleted = 1;
mtctx->frameEnded = 1;
return 0;
}
/* fill input buffer */
if ( (!mtctx->jobReady)
&& (input->size > input->pos) ) { /* support NULL input */
@ -2084,8 +1781,16 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
mtctx->inBuff.filled += syncPoint.toLoad;
forwardInputProgress = syncPoint.toLoad>0;
}
if ((input->pos < input->size) && (endOp == ZSTD_e_end))
endOp = ZSTD_e_flush; /* can't end now : not all input consumed */
}
if ((input->pos < input->size) && (endOp == ZSTD_e_end)) {
/* Can't end yet because the input is not fully consumed.
* We are in one of these cases:
* - mtctx->inBuff is NULL & empty: we couldn't get an input buffer so don't create a new job.
* - We filled the input buffer: flush this job but don't end the frame.
* - We hit a synchronization point: flush this job but don't end the frame.
*/
assert(mtctx->inBuff.filled == 0 || mtctx->inBuff.filled == mtctx->targetSectionSize || mtctx->params.rsyncable);
endOp = ZSTD_e_flush;
}
if ( (mtctx->jobReady)
@ -2104,47 +1809,3 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
return remainingToFlush;
}
}
size_t ZSTDMT_compressStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
{
FORWARD_IF_ERROR( ZSTDMT_compressStream_generic(mtctx, output, input, ZSTD_e_continue) , "");
/* recommended next input size : fill current input buffer */
return mtctx->targetSectionSize - mtctx->inBuff.filled; /* note : could be zero when input buffer is fully filled and no more availability to create new job */
}
static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_EndDirective endFrame)
{
size_t const srcSize = mtctx->inBuff.filled;
DEBUGLOG(5, "ZSTDMT_flushStream_internal");
if ( mtctx->jobReady /* one job ready for a worker to pick up */
|| (srcSize > 0) /* still some data within input buffer */
|| ((endFrame==ZSTD_e_end) && !mtctx->frameEnded)) { /* need a last 0-size block to end frame */
DEBUGLOG(5, "ZSTDMT_flushStream_internal : create a new job (%u bytes, end:%u)",
(U32)srcSize, (U32)endFrame);
FORWARD_IF_ERROR( ZSTDMT_createCompressionJob(mtctx, srcSize, endFrame) , "");
}
/* check if there is any data available to flush */
return ZSTDMT_flushProduced(mtctx, output, 1 /* blockToFlush */, endFrame);
}
size_t ZSTDMT_flushStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output)
{
DEBUGLOG(5, "ZSTDMT_flushStream");
if (mtctx->singleBlockingThread)
return ZSTD_flushStream(mtctx->cctxPool->cctx[0], output);
return ZSTDMT_flushStream_internal(mtctx, output, ZSTD_e_flush);
}
size_t ZSTDMT_endStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output)
{
DEBUGLOG(4, "ZSTDMT_endStream");
if (mtctx->singleBlockingThread)
return ZSTD_endStream(mtctx->cctxPool->cctx[0], output);
return ZSTDMT_flushStream_internal(mtctx, output, ZSTD_e_end);
}

View File

@ -19,24 +19,12 @@
/* Note : This is an internal API.
* These APIs used to be exposed with ZSTDLIB_API,
* because it used to be the only way to invoke MT compression.
* Now, it's recommended to use ZSTD_compress2 and ZSTD_compressStream2()
* instead.
*
* If you depend on these APIs and can't switch, then define
* ZSTD_LEGACY_MULTITHREADED_API when making the dynamic library.
* However, we may completely remove these functions in a future
* release, so please switch soon.
* Now, you must use ZSTD_compress2 and ZSTD_compressStream2() instead.
*
* This API requires ZSTD_MULTITHREAD to be defined during compilation,
* otherwise ZSTDMT_createCCtx*() will fail.
*/
#ifdef ZSTD_LEGACY_MULTITHREADED_API
# define ZSTDMT_API ZSTDLIB_API
#else
# define ZSTDMT_API
#endif
/* === Dependencies === */
#include "../common/zstd_deps.h" /* size_t */
#define ZSTD_STATIC_LINKING_ONLY /* ZSTD_parameters */
@ -54,79 +42,34 @@
#define ZSTDMT_JOBSIZE_MAX (MEM_32bits() ? (512 MB) : (1024 MB))
/* ========================================================
* === Private interface, for use by ZSTD_compress.c ===
* === Not exposed in libzstd. Never invoke directly ===
* ======================================================== */
/* === Memory management === */
typedef struct ZSTDMT_CCtx_s ZSTDMT_CCtx;
/* Requires ZSTD_MULTITHREAD to be defined during compilation, otherwise it will return NULL. */
ZSTDMT_API ZSTDMT_CCtx* ZSTDMT_createCCtx(unsigned nbWorkers);
/* Requires ZSTD_MULTITHREAD to be defined during compilation, otherwise it will return NULL. */
ZSTDMT_API ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbWorkers,
ZSTD_customMem cMem,
ZSTD_threadPool *pool);
ZSTDMT_API size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx);
ZSTDMT_API size_t ZSTDMT_sizeof_CCtx(ZSTDMT_CCtx* mtctx);
/* === Simple one-pass compression function === */
ZSTDMT_API size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
void* dst, size_t dstCapacity,
const void* src, size_t srcSize,
int compressionLevel);
ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbWorkers,
ZSTD_customMem cMem,
ZSTD_threadPool *pool);
size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx);
size_t ZSTDMT_sizeof_CCtx(ZSTDMT_CCtx* mtctx);
/* === Streaming functions === */
ZSTDMT_API size_t ZSTDMT_initCStream(ZSTDMT_CCtx* mtctx, int compressionLevel);
ZSTDMT_API size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* mtctx, unsigned long long pledgedSrcSize); /**< if srcSize is not known at reset time, use ZSTD_CONTENTSIZE_UNKNOWN. Note: for compatibility with older programs, 0 means the same as ZSTD_CONTENTSIZE_UNKNOWN, but it will change in the future to mean "empty" */
ZSTDMT_API size_t ZSTDMT_nextInputSizeHint(const ZSTDMT_CCtx* mtctx);
ZSTDMT_API size_t ZSTDMT_compressStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input);
ZSTDMT_API size_t ZSTDMT_flushStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output); /**< @return : 0 == all flushed; >0 : still some data to be flushed; or an error code (ZSTD_isError()) */
ZSTDMT_API size_t ZSTDMT_endStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output); /**< @return : 0 == all flushed; >0 : still some data to be flushed; or an error code (ZSTD_isError()) */
/* === Advanced functions and parameters === */
ZSTDMT_API size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx,
void* dst, size_t dstCapacity,
const void* src, size_t srcSize,
const ZSTD_CDict* cdict,
ZSTD_parameters params,
int overlapLog);
ZSTDMT_API size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* mtctx,
const void* dict, size_t dictSize, /* dict can be released after init, a local copy is preserved within zcs */
ZSTD_parameters params,
unsigned long long pledgedSrcSize); /* pledgedSrcSize is optional and can be zero == unknown */
ZSTDMT_API size_t ZSTDMT_initCStream_usingCDict(ZSTDMT_CCtx* mtctx,
const ZSTD_CDict* cdict,
ZSTD_frameParameters fparams,
unsigned long long pledgedSrcSize); /* note : zero means empty */
/* ZSTDMT_parameter :
* List of parameters that can be set using ZSTDMT_setMTCtxParameter() */
typedef enum {
ZSTDMT_p_jobSize, /* Each job is compressed in parallel. By default, this value is dynamically determined depending on compression parameters. Can be set explicitly here. */
ZSTDMT_p_overlapLog, /* Each job may reload a part of previous job to enhance compression ratio; 0 == no overlap, 6(default) == use 1/8th of window, >=9 == use full window. This is a "sticky" parameter : its value will be re-used on next compression job */
ZSTDMT_p_rsyncable /* Enables rsyncable mode. */
} ZSTDMT_parameter;
/* ZSTDMT_setMTCtxParameter() :
* allow setting individual parameters, one at a time, among a list of enums defined in ZSTDMT_parameter.
* The function must be called typically after ZSTD_createCCtx() but __before ZSTDMT_init*() !__
* Parameters not explicitly reset by ZSTDMT_init*() remain the same in consecutive compression sessions.
* @return : 0, or an error code (which can be tested using ZSTD_isError()) */
ZSTDMT_API size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, int value);
/* ZSTDMT_getMTCtxParameter() :
* Query the ZSTDMT_CCtx for a parameter value.
* @return : 0, or an error code (which can be tested using ZSTD_isError()) */
ZSTDMT_API size_t ZSTDMT_getMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, int* value);
size_t ZSTDMT_nextInputSizeHint(const ZSTDMT_CCtx* mtctx);
/*! ZSTDMT_initCStream_internal() :
* Private use only. Init streaming operation.
* expects params to be valid.
* must receive dict, or cdict, or none, but not both.
* @return : 0, or an error code */
size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs,
const void* dict, size_t dictSize, ZSTD_dictContentType_e dictContentType,
const ZSTD_CDict* cdict,
ZSTD_CCtx_params params, unsigned long long pledgedSrcSize);
/*! ZSTDMT_compressStream_generic() :
* Combines ZSTDMT_compressStream() with optional ZSTDMT_flushStream() or ZSTDMT_endStream()
@ -135,16 +78,10 @@ ZSTDMT_API size_t ZSTDMT_getMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter
* 0 if fully flushed
* or an error code
* note : needs to be init using any ZSTD_initCStream*() variant */
ZSTDMT_API size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
ZSTD_outBuffer* output,
ZSTD_inBuffer* input,
ZSTD_EndDirective endOp);
/* ========================================================
* === Private interface, for use by ZSTD_compress.c ===
* === Not exposed in libzstd. Never invoke directly ===
* ======================================================== */
size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
ZSTD_outBuffer* output,
ZSTD_inBuffer* input,
ZSTD_EndDirective endOp);
/*! ZSTDMT_toFlushNow()
* Tell how many bytes are ready to be flushed immediately.
@ -154,15 +91,6 @@ ZSTDMT_API size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
* therefore flushing is limited by speed of oldest job. */
size_t ZSTDMT_toFlushNow(ZSTDMT_CCtx* mtctx);
/*! ZSTDMT_CCtxParam_setMTCtxParameter()
* like ZSTDMT_setMTCtxParameter(), but into a ZSTD_CCtx_Params */
size_t ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params, ZSTDMT_parameter parameter, int value);
/*! ZSTDMT_CCtxParam_setNbWorkers()
* Set nbWorkers, and clamp it.
* Also reset jobSize and overlapLog */
size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers);
/*! ZSTDMT_updateCParams_whileCompressing() :
* Updates only a selected set of compression parameters, to remain compatible with current frame.
* New parameters will be applied to next compression job. */
@ -175,17 +103,6 @@ void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_p
ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx);
/*! ZSTDMT_initCStream_internal() :
* Private use only. Init streaming operation.
* expects params to be valid.
* must receive dict, or cdict, or none, but not both.
* @return : 0, or an error code */
size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs,
const void* dict, size_t dictSize, ZSTD_dictContentType_e dictContentType,
const ZSTD_CDict* cdict,
ZSTD_CCtx_params params, unsigned long long pledgedSrcSize);
#if defined (__cplusplus)
}
#endif

View File

@ -381,7 +381,6 @@ test-zbuff32: zbufftest32
test-zstream: zstreamtest
$(QEMU_SYS) ./zstreamtest -v $(ZSTREAM_TESTTIME) $(FUZZER_FLAGS)
$(QEMU_SYS) ./zstreamtest --mt -t1 $(ZSTREAM_TESTTIME) $(FUZZER_FLAGS)
$(QEMU_SYS) ./zstreamtest --newapi -t1 $(ZSTREAM_TESTTIME) $(FUZZER_FLAGS)
test-zstream32: zstreamtest32

View File

@ -32,7 +32,6 @@
#include "fse.h"
#include "zstd.h" /* ZSTD_VERSION_STRING */
#include "zstd_errors.h" /* ZSTD_getErrorCode */
#include "zstdmt_compress.h"
#define ZDICT_STATIC_LINKING_ONLY
#include "zdict.h" /* ZDICT_trainFromBuffer */
#include "mem.h"
@ -1385,19 +1384,20 @@ static int basicUnitTests(U32 const seed, double compressibility)
/* ZSTDMT simple MT compression test */
DISPLAYLEVEL(3, "test%3i : create ZSTDMT CCtx : ", testNb++);
{ ZSTDMT_CCtx* const mtctx = ZSTDMT_createCCtx(2);
{ ZSTD_CCtx* const mtctx = ZSTD_createCCtx();
if (mtctx==NULL) {
DISPLAY("mtctx : not enough memory, aborting \n");
testResult = 1;
goto _end;
}
CHECK( ZSTD_CCtx_setParameter(mtctx, ZSTD_c_nbWorkers, 2) );
CHECK( ZSTD_CCtx_setParameter(mtctx, ZSTD_c_compressionLevel, 1) );
DISPLAYLEVEL(3, "OK \n");
DISPLAYLEVEL(3, "test%3u : compress %u bytes with 2 threads : ", testNb++, (unsigned)CNBuffSize);
CHECK_VAR(cSize, ZSTDMT_compressCCtx(mtctx,
CHECK_VAR(cSize, ZSTD_compress2(mtctx,
compressedBuffer, compressedBufferSize,
CNBuffer, CNBuffSize,
1) );
CNBuffer, CNBuffSize) );
DISPLAYLEVEL(3, "OK (%u bytes : %.2f%%)\n", (unsigned)cSize, (double)cSize/CNBuffSize*100);
DISPLAYLEVEL(3, "test%3i : decompressed size test : ", testNb++);
@ -1421,14 +1421,12 @@ static int basicUnitTests(U32 const seed, double compressibility)
DISPLAYLEVEL(3, "OK \n");
DISPLAYLEVEL(3, "test%3i : compress -T2 with checksum : ", testNb++);
{ ZSTD_parameters params = ZSTD_getParams(1, CNBuffSize, 0);
params.fParams.checksumFlag = 1;
params.fParams.contentSizeFlag = 1;
CHECK_VAR(cSize, ZSTDMT_compress_advanced(mtctx,
compressedBuffer, compressedBufferSize,
CNBuffer, CNBuffSize,
NULL, params, 3 /*overlapRLog*/) );
}
CHECK( ZSTD_CCtx_setParameter(mtctx, ZSTD_c_checksumFlag, 1) );
CHECK( ZSTD_CCtx_setParameter(mtctx, ZSTD_c_contentSizeFlag, 1) );
CHECK( ZSTD_CCtx_setParameter(mtctx, ZSTD_c_overlapLog, 3) );
CHECK_VAR(cSize, ZSTD_compress2(mtctx,
compressedBuffer, compressedBufferSize,
CNBuffer, CNBuffSize) );
DISPLAYLEVEL(3, "OK (%u bytes : %.2f%%)\n", (unsigned)cSize, (double)cSize/CNBuffSize*100);
DISPLAYLEVEL(3, "test%3i : decompress %u bytes : ", testNb++, (unsigned)CNBuffSize);
@ -1436,7 +1434,7 @@ static int basicUnitTests(U32 const seed, double compressibility)
if (r != CNBuffSize) goto _output_error; }
DISPLAYLEVEL(3, "OK \n");
ZSTDMT_freeCCtx(mtctx);
ZSTD_freeCCtx(mtctx);
}
DISPLAYLEVEL(3, "test%3u : compress empty string and decompress with small window log : ", testNb++);

View File

@ -31,7 +31,6 @@
#define ZSTD_STATIC_LINKING_ONLY /* ZSTD_maxCLevel, ZSTD_customMem, ZSTD_getDictID_fromFrame */
#include "zstd.h" /* ZSTD_compressBound */
#include "zstd_errors.h" /* ZSTD_error_srcSize_wrong */
#include "zstdmt_compress.h"
#include "zdict.h" /* ZDICT_trainFromBuffer */
#include "datagen.h" /* RDG_genBuffer */
#define XXH_STATIC_LINKING_ONLY /* XXH64_state_t */
@ -274,7 +273,7 @@ static int basicUnitTests(U32 seed, double compressibility)
U32 coreSeed = 0; /* this name to conform with CHECK_Z macro display */
ZSTD_CStream* zc = ZSTD_createCStream();
ZSTD_DStream* zd = ZSTD_createDStream();
ZSTDMT_CCtx* mtctx = ZSTDMT_createCCtx(2);
ZSTD_CCtx* mtctx = ZSTD_createCCtx();
ZSTD_inBuffer inBuff, inBuff2;
ZSTD_outBuffer outBuff;
@ -283,12 +282,14 @@ static int basicUnitTests(U32 seed, double compressibility)
unsigned dictID = 0;
/* Create compressible test buffer */
if (!CNBuffer || !compressedBuffer || !decodedBuffer || !zc || !zd) {
if (!CNBuffer || !compressedBuffer || !decodedBuffer || !zc || !zd || !mtctx) {
DISPLAY("Not enough memory, aborting \n");
goto _output_error;
}
RDG_genBuffer(CNBuffer, CNBufferSize, compressibility, 0., seed);
CHECK_Z(ZSTD_CCtx_setParameter(mtctx, ZSTD_c_nbWorkers, 2));
/* Create dictionary */
DISPLAYLEVEL(3, "creating dictionary for unit tests \n");
dictionary = FUZ_createDictionary(CNBuffer, CNBufferSize / 3, 16 KB, 48 KB);
@ -1144,12 +1145,10 @@ static int basicUnitTests(U32 seed, double compressibility)
/* Basic multithreading compression test */
DISPLAYLEVEL(3, "test%3i : compress %u bytes with multiple threads : ", testNb++, COMPRESSIBLE_NOISE_LENGTH);
{ ZSTD_parameters const params = ZSTD_getParams(1, 0, 0);
int jobSize;
CHECK_Z( ZSTDMT_getMTCtxParameter(mtctx, ZSTDMT_p_jobSize, &jobSize));
{ int jobSize;
CHECK_Z( ZSTD_CCtx_getParameter(mtctx, ZSTD_c_jobSize, &jobSize));
CHECK(jobSize != 0, "job size non-zero");
CHECK_Z( ZSTDMT_initCStream_advanced(mtctx, CNBuffer, dictSize, params, CNBufferSize) );
CHECK_Z( ZSTDMT_getMTCtxParameter(mtctx, ZSTDMT_p_jobSize, &jobSize));
CHECK_Z( ZSTD_CCtx_getParameter(mtctx, ZSTD_c_jobSize, &jobSize));
CHECK(jobSize != 0, "job size non-zero");
}
outBuff.dst = compressedBuffer;
@ -1158,7 +1157,7 @@ static int basicUnitTests(U32 seed, double compressibility)
inBuff.src = CNBuffer;
inBuff.size = CNBufferSize;
inBuff.pos = 0;
{ size_t const compressResult = ZSTDMT_compressStream_generic(mtctx, &outBuff, &inBuff, ZSTD_e_end);
{ size_t const compressResult = ZSTD_compressStream2(mtctx, &outBuff, &inBuff, ZSTD_e_end);
if (compressResult != 0) goto _output_error; /* compression must be completed in a single round */
}
if (inBuff.pos != inBuff.size) goto _output_error; /* entire input should be consumed */
@ -1516,7 +1515,7 @@ _end:
FUZ_freeDictionary(dictionary);
ZSTD_freeCStream(zc);
ZSTD_freeDStream(zd);
ZSTDMT_freeCCtx(mtctx);
ZSTD_freeCCtx(mtctx);
free(CNBuffer);
free(compressedBuffer);
free(decodedBuffer);
@ -1826,283 +1825,6 @@ _output_error:
goto _cleanup;
}
/* fuzzing ZSTDMT_* interface */
static int fuzzerTests_MT(U32 seed, int nbTests, int startTest,
double compressibility, int bigTests)
{
const U32 maxSrcLog = bigTests ? 24 : 22;
static const U32 maxSampleLog = 19;
size_t const srcBufferSize = (size_t)1<<maxSrcLog;
BYTE* cNoiseBuffer[5];
size_t const copyBufferSize= srcBufferSize + (1<<maxSampleLog);
BYTE* const copyBuffer = (BYTE*)malloc (copyBufferSize);
size_t const cBufferSize = ZSTD_compressBound(srcBufferSize);
BYTE* const cBuffer = (BYTE*)malloc (cBufferSize);
size_t const dstBufferSize = srcBufferSize;
BYTE* const dstBuffer = (BYTE*)malloc (dstBufferSize);
U32 result = 0;
int testNb = 0;
U32 coreSeed = seed;
int nbThreads = 2;
ZSTDMT_CCtx* zc = ZSTDMT_createCCtx(nbThreads); /* will be reset sometimes */
ZSTD_DStream* zd = ZSTD_createDStream(); /* will be reset sometimes */
ZSTD_DStream* const zd_noise = ZSTD_createDStream();
UTIL_time_t const startClock = UTIL_getTime();
const BYTE* dict=NULL; /* can keep same dict on 2 consecutive tests */
size_t dictSize = 0;
int const cLevelMax = bigTests ? (U32)ZSTD_maxCLevel()-1 : g_cLevelMax_smallTests;
U32 const nbThreadsMax = bigTests ? 4 : 2;
/* allocations */
cNoiseBuffer[0] = (BYTE*)malloc (srcBufferSize);
cNoiseBuffer[1] = (BYTE*)malloc (srcBufferSize);
cNoiseBuffer[2] = (BYTE*)malloc (srcBufferSize);
cNoiseBuffer[3] = (BYTE*)malloc (srcBufferSize);
cNoiseBuffer[4] = (BYTE*)malloc (srcBufferSize);
CHECK (!cNoiseBuffer[0] || !cNoiseBuffer[1] || !cNoiseBuffer[2] || !cNoiseBuffer[3] || !cNoiseBuffer[4] ||
!copyBuffer || !dstBuffer || !cBuffer || !zc || !zd || !zd_noise ,
"Not enough memory, fuzzer tests cancelled");
/* Create initial samples */
RDG_genBuffer(cNoiseBuffer[0], srcBufferSize, 0.00, 0., coreSeed); /* pure noise */
RDG_genBuffer(cNoiseBuffer[1], srcBufferSize, 0.05, 0., coreSeed); /* barely compressible */
RDG_genBuffer(cNoiseBuffer[2], srcBufferSize, compressibility, 0., coreSeed);
RDG_genBuffer(cNoiseBuffer[3], srcBufferSize, 0.95, 0., coreSeed); /* highly compressible */
RDG_genBuffer(cNoiseBuffer[4], srcBufferSize, 1.00, 0., coreSeed); /* sparse content */
memset(copyBuffer, 0x65, copyBufferSize); /* make copyBuffer considered initialized */
ZSTD_initDStream_usingDict(zd, NULL, 0); /* ensure at least one init */
DISPLAYLEVEL(6, "Creating initial context with %i threads \n", nbThreads);
/* catch up testNb */
for (testNb=1; testNb < startTest; testNb++)
FUZ_rand(&coreSeed);
/* test loop */
for ( ; (testNb <= nbTests) || (UTIL_clockSpanMicro(startClock) < g_clockTime) ; testNb++ ) {
U32 lseed;
const BYTE* srcBuffer;
size_t totalTestSize, totalGenSize, cSize;
XXH64_state_t xxhState;
U64 crcOrig;
size_t maxTestSize;
FUZ_rand(&coreSeed);
if (nbTests >= testNb) {
DISPLAYUPDATE(2, "\r%6u/%6u ", testNb, nbTests);
} else {
DISPLAYUPDATE(2, "\r%6u ", testNb);
}
lseed = coreSeed ^ prime32;
/* states full reset (deliberately not synchronized) */
/* some issues can only happen when reusing states */
if ((FUZ_rand(&lseed) & 0xFF) == 131) {
nbThreads = (FUZ_rand(&lseed) % nbThreadsMax) + 1;
DISPLAYLEVEL(5, "Creating new context with %u threads \n", nbThreads);
ZSTDMT_freeCCtx(zc);
zc = ZSTDMT_createCCtx(nbThreads);
CHECK(zc==NULL, "ZSTDMT_createCCtx allocation error")
}
if ((FUZ_rand(&lseed) & 0xFF) == 132) {
ZSTD_freeDStream(zd);
zd = ZSTD_createDStream();
CHECK(zd==NULL, "ZSTDMT_createCCtx allocation error")
ZSTD_initDStream_usingDict(zd, NULL, 0); /* ensure at least one init */
}
/* srcBuffer selection [0-4] */
{ U32 buffNb = FUZ_rand(&lseed) & 0x7F;
if (buffNb & 7) buffNb=2; /* most common : compressible (P) */
else {
buffNb >>= 3;
if (buffNb & 7) {
const U32 tnb[2] = { 1, 3 }; /* barely/highly compressible */
buffNb = tnb[buffNb >> 3];
} else {
const U32 tnb[2] = { 0, 4 }; /* not compressible / sparse */
buffNb = tnb[buffNb >> 3];
} }
srcBuffer = cNoiseBuffer[buffNb];
}
/* compression init */
{ U32 const testLog = FUZ_rand(&lseed) % maxSrcLog;
U32 const dictLog = FUZ_rand(&lseed) % maxSrcLog;
int const cLevelCandidate = ( FUZ_rand(&lseed)
% (ZSTD_maxCLevel() - (MAX(testLog, dictLog) / 2)) )
+ 1;
int const cLevelThreadAdjusted = cLevelCandidate - (nbThreads * 2) + 2; /* reduce cLevel when multiple threads to reduce memory consumption */
int const cLevelMin = MAX(cLevelThreadAdjusted, 1); /* no negative cLevel yet */
int const cLevel = MIN(cLevelMin, cLevelMax);
maxTestSize = FUZ_rLogLength(&lseed, testLog);
if (FUZ_rand(&lseed)&1) { /* simple init */
int const compressionLevel = (FUZ_rand(&lseed) % 5) + 1;
DISPLAYLEVEL(5, "Init with compression level = %i \n", compressionLevel);
CHECK_Z( ZSTDMT_initCStream(zc, compressionLevel) );
} else { /* advanced init */
/* random dictionary selection */
dictSize = ((FUZ_rand(&lseed)&63)==1) ? FUZ_rLogLength(&lseed, dictLog) : 0;
{ size_t const dictStart = FUZ_rand(&lseed) % (srcBufferSize - dictSize);
dict = srcBuffer + dictStart;
}
{ U64 const pledgedSrcSize = (FUZ_rand(&lseed) & 3) ? ZSTD_CONTENTSIZE_UNKNOWN : maxTestSize;
ZSTD_parameters params = ZSTD_getParams(cLevel, pledgedSrcSize, dictSize);
DISPLAYLEVEL(5, "Init with windowLog = %u, pledgedSrcSize = %u, dictSize = %u \n",
params.cParams.windowLog, (unsigned)pledgedSrcSize, (unsigned)dictSize);
params.fParams.checksumFlag = FUZ_rand(&lseed) & 1;
params.fParams.noDictIDFlag = FUZ_rand(&lseed) & 1;
params.fParams.contentSizeFlag = FUZ_rand(&lseed) & 1;
DISPLAYLEVEL(5, "checksumFlag : %u \n", params.fParams.checksumFlag);
CHECK_Z( ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_overlapLog, FUZ_rand(&lseed) % 12) );
CHECK_Z( ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_jobSize, FUZ_rand(&lseed) % (2*maxTestSize+1)) ); /* custom job size */
CHECK_Z( ZSTDMT_initCStream_advanced(zc, dict, dictSize, params, pledgedSrcSize) );
} } }
/* multi-segments compression test */
XXH64_reset(&xxhState, 0);
{ ZSTD_outBuffer outBuff = { cBuffer, cBufferSize, 0 } ;
U32 n;
for (n=0, cSize=0, totalTestSize=0 ; totalTestSize < maxTestSize ; n++) {
/* compress random chunks into randomly sized dst buffers */
{ size_t const randomSrcSize = FUZ_randomLength(&lseed, maxSampleLog);
size_t const srcSize = MIN (maxTestSize-totalTestSize, randomSrcSize);
size_t const srcStart = FUZ_rand(&lseed) % (srcBufferSize - srcSize);
size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog);
size_t const dstBuffSize = MIN(cBufferSize - cSize, randomDstSize);
ZSTD_inBuffer inBuff = { srcBuffer+srcStart, srcSize, 0 };
outBuff.size = outBuff.pos + dstBuffSize;
DISPLAYLEVEL(6, "Sending %u bytes to compress \n", (unsigned)srcSize);
CHECK_Z( ZSTDMT_compressStream(zc, &outBuff, &inBuff) );
DISPLAYLEVEL(6, "%u bytes read by ZSTDMT_compressStream \n", (unsigned)inBuff.pos);
XXH64_update(&xxhState, srcBuffer+srcStart, inBuff.pos);
memcpy(copyBuffer+totalTestSize, srcBuffer+srcStart, inBuff.pos);
totalTestSize += inBuff.pos;
}
/* random flush operation, to mess around */
if ((FUZ_rand(&lseed) & 15) == 0) {
size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog);
size_t const adjustedDstSize = MIN(cBufferSize - cSize, randomDstSize);
size_t const previousPos = outBuff.pos;
outBuff.size = outBuff.pos + adjustedDstSize;
DISPLAYLEVEL(5, "Flushing into dst buffer of size %u \n", (unsigned)adjustedDstSize);
CHECK_Z( ZSTDMT_flushStream(zc, &outBuff) );
assert(outBuff.pos >= previousPos);
DISPLAYLEVEL(6, "%u bytes flushed by ZSTDMT_flushStream \n", (unsigned)(outBuff.pos-previousPos));
} }
/* final frame epilogue */
{ size_t remainingToFlush = (size_t)(-1);
while (remainingToFlush) {
size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog);
size_t const adjustedDstSize = MIN(cBufferSize - cSize, randomDstSize);
size_t const previousPos = outBuff.pos;
outBuff.size = outBuff.pos + adjustedDstSize;
DISPLAYLEVEL(5, "Ending into dst buffer of size %u \n", (unsigned)adjustedDstSize);
remainingToFlush = ZSTDMT_endStream(zc, &outBuff);
CHECK (ZSTD_isError(remainingToFlush), "ZSTDMT_endStream error : %s", ZSTD_getErrorName(remainingToFlush));
assert(outBuff.pos >= previousPos);
DISPLAYLEVEL(6, "%u bytes flushed by ZSTDMT_endStream \n", (unsigned)(outBuff.pos-previousPos));
DISPLAYLEVEL(5, "endStream : remainingToFlush : %u \n", (unsigned)remainingToFlush);
} }
crcOrig = XXH64_digest(&xxhState);
cSize = outBuff.pos;
DISPLAYLEVEL(5, "Frame completed : %u bytes compressed into %u bytes \n",
(unsigned)totalTestSize, (unsigned)cSize);
}
/* multi - fragments decompression test */
assert(totalTestSize < dstBufferSize);
memset(dstBuffer, 170, totalTestSize); /* init dest area */
if (!dictSize /* don't reset if dictionary : could be different */ && (FUZ_rand(&lseed) & 1)) {
CHECK_Z( ZSTD_resetDStream(zd) );
} else {
CHECK_Z( ZSTD_initDStream_usingDict(zd, dict, dictSize) );
}
{ size_t decompressionResult = 1;
ZSTD_inBuffer inBuff = { cBuffer, cSize, 0 };
ZSTD_outBuffer outBuff= { dstBuffer, dstBufferSize, 0 };
for (totalGenSize = 0 ; decompressionResult ; ) {
size_t const readCSrcSize = FUZ_randomLength(&lseed, maxSampleLog);
size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog);
size_t const dstBuffSize = MIN(dstBufferSize - totalGenSize, randomDstSize);
inBuff.size = inBuff.pos + readCSrcSize;
outBuff.size = outBuff.pos + dstBuffSize;
DISPLAYLEVEL(6, "ZSTD_decompressStream input %u bytes into outBuff %u bytes \n",
(unsigned)readCSrcSize, (unsigned)dstBuffSize);
decompressionResult = ZSTD_decompressStream(zd, &outBuff, &inBuff);
if (ZSTD_isError(decompressionResult)) {
DISPLAY("ZSTD_decompressStream error : %s \n", ZSTD_getErrorName(decompressionResult));
findDiff(copyBuffer, dstBuffer, totalTestSize);
}
CHECK (ZSTD_isError(decompressionResult), "decompression error : %s", ZSTD_getErrorName(decompressionResult));
DISPLAYLEVEL(6, "total ingested (inBuff.pos) = %u and produced (outBuff.pos) = %u \n",
(unsigned)inBuff.pos, (unsigned)outBuff.pos);
}
CHECK (outBuff.pos != totalTestSize,
"decompressed data : wrong size (%u != %u)",
(unsigned)outBuff.pos, (unsigned)totalTestSize );
CHECK (inBuff.pos != cSize,
"compressed data should be fully read (%u != %u)",
(unsigned)inBuff.pos, (unsigned)cSize );
{ U64 const crcDest = XXH64(dstBuffer, totalTestSize, 0);
if (crcDest!=crcOrig) findDiff(copyBuffer, dstBuffer, totalTestSize);
CHECK (crcDest!=crcOrig, "decompressed data corrupted");
} }
/*===== noisy/erroneous src decompression test =====*/
/* add some noise */
{ U32 const nbNoiseChunks = (FUZ_rand(&lseed) & 7) + 2;
U32 nn; for (nn=0; nn<nbNoiseChunks; nn++) {
size_t const randomNoiseSize = FUZ_randomLength(&lseed, maxSampleLog);
size_t const noiseSize = MIN((cSize/3) , randomNoiseSize);
size_t const noiseStart = FUZ_rand(&lseed) % (srcBufferSize - noiseSize);
size_t const cStart = FUZ_rand(&lseed) % (cSize - noiseSize);
memcpy(cBuffer+cStart, srcBuffer+noiseStart, noiseSize);
} }
/* try decompression on noisy data */
CHECK_Z( ZSTD_initDStream(zd_noise) ); /* note : no dictionary */
{ ZSTD_inBuffer inBuff = { cBuffer, cSize, 0 };
ZSTD_outBuffer outBuff= { dstBuffer, dstBufferSize, 0 };
while (outBuff.pos < dstBufferSize) {
size_t const randomCSrcSize = FUZ_randomLength(&lseed, maxSampleLog);
size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog);
size_t const adjustedDstSize = MIN(dstBufferSize - outBuff.pos, randomDstSize);
size_t const adjustedCSrcSize = MIN(cSize - inBuff.pos, randomCSrcSize);
outBuff.size = outBuff.pos + adjustedDstSize;
inBuff.size = inBuff.pos + adjustedCSrcSize;
{ size_t const decompressError = ZSTD_decompressStream(zd, &outBuff, &inBuff);
if (ZSTD_isError(decompressError)) break; /* error correctly detected */
/* No forward progress possible */
if (outBuff.pos < outBuff.size && inBuff.pos == cSize) break;
} } } }
DISPLAY("\r%u fuzzer tests completed \n", testNb);
_cleanup:
ZSTDMT_freeCCtx(zc);
ZSTD_freeDStream(zd);
ZSTD_freeDStream(zd_noise);
free(cNoiseBuffer[0]);
free(cNoiseBuffer[1]);
free(cNoiseBuffer[2]);
free(cNoiseBuffer[3]);
free(cNoiseBuffer[4]);
free(copyBuffer);
free(cBuffer);
free(dstBuffer);
return result;
_output_error:
result = 1;
goto _cleanup;
}
/** If useOpaqueAPI, sets param in cctxParams.
* Otherwise, sets the param in zc. */
static size_t setCCtxParameter(ZSTD_CCtx* zc, ZSTD_CCtx_params* cctxParams,
@ -2179,6 +1901,8 @@ static int fuzzerTests_newAPI(U32 seed, int nbTests, int startTest,
U32 resetAllowed = 1;
size_t maxTestSize;
ZSTD_parameters savedParams;
int isRefPrefix = 0;
U64 pledgedSrcSize = ZSTD_CONTENTSIZE_UNKNOWN;
/* init */
if (nbTests >= testNb) { DISPLAYUPDATE(2, "\r%6u/%6u ", testNb, nbTests); }
@ -2249,8 +1973,8 @@ static int fuzzerTests_newAPI(U32 seed, int nbTests, int startTest,
dict = srcBuffer + dictStart;
if (!dictSize) dict=NULL;
}
{ U64 const pledgedSrcSize = (FUZ_rand(&lseed) & 3) ? ZSTD_CONTENTSIZE_UNKNOWN : maxTestSize;
ZSTD_compressionParameters cParams = ZSTD_getCParams(cLevel, pledgedSrcSize, dictSize);
pledgedSrcSize = (FUZ_rand(&lseed) & 3) ? ZSTD_CONTENTSIZE_UNKNOWN : maxTestSize;
{ ZSTD_compressionParameters cParams = ZSTD_getCParams(cLevel, pledgedSrcSize, dictSize);
const U32 windowLogMax = bigTests ? 24 : 20;
const U32 searchLogMax = bigTests ? 15 : 13;
if (dictSize)
@ -2306,6 +2030,8 @@ static int fuzzerTests_newAPI(U32 seed, int nbTests, int startTest,
if (FUZ_rand(&lseed) & 1) {
DISPLAYLEVEL(5, "t%u: pledgedSrcSize : %u \n", testNb, (unsigned)pledgedSrcSize);
CHECK_Z( ZSTD_CCtx_setPledgedSrcSize(zc, pledgedSrcSize) );
} else {
pledgedSrcSize = ZSTD_CONTENTSIZE_UNKNOWN;
}
/* multi-threading parameters. Only adjust occasionally for small tests. */
@ -2322,7 +2048,11 @@ static int fuzzerTests_newAPI(U32 seed, int nbTests, int startTest,
}
}
/* Enable rsyncable mode 1 in 4 times. */
setCCtxParameter(zc, cctxParams, ZSTD_c_rsyncable, (FUZ_rand(&lseed) % 4 == 0), opaqueAPI);
{
int const rsyncable = (FUZ_rand(&lseed) % 4 == 0);
DISPLAYLEVEL(5, "t%u: rsyncable : %d \n", testNb, rsyncable);
setCCtxParameter(zc, cctxParams, ZSTD_c_rsyncable, rsyncable, opaqueAPI);
}
if (FUZ_rand(&lseed) & 1) CHECK_Z( setCCtxParameter(zc, cctxParams, ZSTD_c_forceMaxWindow, FUZ_rand(&lseed) & 1, opaqueAPI) );
@ -2339,6 +2069,7 @@ static int fuzzerTests_newAPI(U32 seed, int nbTests, int startTest,
CHECK_Z( ZSTD_CCtx_loadDictionary_byReference(zc, dict, dictSize) );
}
} else {
isRefPrefix = 1;
CHECK_Z( ZSTD_CCtx_refPrefix(zc, dict, dictSize) );
}
} }
@ -2346,45 +2077,96 @@ static int fuzzerTests_newAPI(U32 seed, int nbTests, int startTest,
CHECK_Z(getCCtxParams(zc, &savedParams));
/* multi-segments compression test */
XXH64_reset(&xxhState, 0);
{ ZSTD_outBuffer outBuff = { cBuffer, cBufferSize, 0 } ;
for (cSize=0, totalTestSize=0 ; (totalTestSize < maxTestSize) ; ) {
/* compress random chunks into randomly sized dst buffers */
size_t const randomSrcSize = FUZ_randomLength(&lseed, maxSampleLog);
size_t const srcSize = MIN(maxTestSize-totalTestSize, randomSrcSize);
size_t const srcStart = FUZ_rand(&lseed) % (srcBufferSize - srcSize);
size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog+1);
size_t const dstBuffSize = MIN(cBufferSize - cSize, randomDstSize);
ZSTD_EndDirective const flush = (FUZ_rand(&lseed) & 15) ? ZSTD_e_continue : ZSTD_e_flush;
ZSTD_inBuffer inBuff = { srcBuffer+srcStart, srcSize, 0 };
outBuff.size = outBuff.pos + dstBuffSize;
{ int iter;
int const startSeed = lseed;
XXH64_hash_t compressedCrcs[2];
for (iter = 0; iter < 2; ++iter, lseed = startSeed) {
ZSTD_outBuffer outBuff = { cBuffer, cBufferSize, 0 } ;
int const singlePass = (FUZ_rand(&lseed) & 3) == 0;
int nbWorkers;
CHECK_Z( ZSTD_compressStream2(zc, &outBuff, &inBuff, flush) );
DISPLAYLEVEL(6, "t%u: compress consumed %u bytes (total : %u) ; flush: %u (total : %u) \n",
testNb, (unsigned)inBuff.pos, (unsigned)(totalTestSize + inBuff.pos), (unsigned)flush, (unsigned)outBuff.pos);
XXH64_reset(&xxhState, 0);
XXH64_update(&xxhState, srcBuffer+srcStart, inBuff.pos);
memcpy(copyBuffer+totalTestSize, srcBuffer+srcStart, inBuff.pos);
totalTestSize += inBuff.pos;
CHECK_Z( ZSTD_CCtx_setPledgedSrcSize(zc, pledgedSrcSize) );
if (isRefPrefix) {
DISPLAYLEVEL(6, "t%u: Reloading prefix\n", testNb);
/* Need to reload the prefix because it gets dropped after one compression */
CHECK_Z( ZSTD_CCtx_refPrefix(zc, dict, dictSize) );
}
/* Adjust number of workers occassionally - result must be deterministic independent of nbWorkers */
CHECK_Z(ZSTD_CCtx_getParameter(zc, ZSTD_c_nbWorkers, &nbWorkers));
if (nbWorkers > 0 && (FUZ_rand(&lseed) & 7) == 0) {
DISPLAYLEVEL(6, "t%u: Modify nbWorkers: %d -> %d \n", testNb, nbWorkers, nbWorkers + iter);
CHECK_Z(ZSTD_CCtx_setParameter(zc, ZSTD_c_nbWorkers, nbWorkers + iter));
}
if (singlePass) {
ZSTD_inBuffer inBuff = { srcBuffer, maxTestSize, 0 };
CHECK_Z(ZSTD_compressStream2(zc, &outBuff, &inBuff, ZSTD_e_end));
DISPLAYLEVEL(6, "t%u: Single pass compression: consumed %u bytes ; produced %u bytes \n",
testNb, (unsigned)inBuff.pos, (unsigned)outBuff.pos);
CHECK(inBuff.pos != inBuff.size, "Input not consumed!");
crcOrig = XXH64(srcBuffer, maxTestSize, 0);
totalTestSize = maxTestSize;
} else {
outBuff.size = 0;
for (totalTestSize=0 ; (totalTestSize < maxTestSize) ; ) {
/* compress random chunks into randomly sized dst buffers */
size_t const randomSrcSize = FUZ_randomLength(&lseed, maxSampleLog);
size_t const srcSize = MIN(maxTestSize-totalTestSize, randomSrcSize);
size_t const srcStart = FUZ_rand(&lseed) % (srcBufferSize - srcSize);
ZSTD_EndDirective const flush = (FUZ_rand(&lseed) & 15) ? ZSTD_e_continue : ZSTD_e_flush;
ZSTD_inBuffer inBuff = { srcBuffer+srcStart, srcSize, 0 };
int forwardProgress;
do {
size_t const ipos = inBuff.pos;
size_t const opos = outBuff.pos;
size_t ret;
if (outBuff.pos == outBuff.size) {
size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog+1);
size_t const dstBuffSize = MIN(cBufferSize - outBuff.pos, randomDstSize);
outBuff.size = outBuff.pos + dstBuffSize;
}
CHECK_Z( ret = ZSTD_compressStream2(zc, &outBuff, &inBuff, flush) );
DISPLAYLEVEL(6, "t%u: compress consumed %u bytes (total : %u) ; flush: %u (total : %u) \n",
testNb, (unsigned)inBuff.pos, (unsigned)(totalTestSize + inBuff.pos), (unsigned)flush, (unsigned)outBuff.pos);
/* We've completed the flush */
if (flush == ZSTD_e_flush && ret == 0)
break;
/* Ensure maximal forward progress for determinism */
forwardProgress = (inBuff.pos != ipos) || (outBuff.pos != opos);
} while (forwardProgress);
XXH64_update(&xxhState, srcBuffer+srcStart, inBuff.pos);
memcpy(copyBuffer+totalTestSize, srcBuffer+srcStart, inBuff.pos);
totalTestSize += inBuff.pos;
}
/* final frame epilogue */
{ size_t remainingToFlush = 1;
while (remainingToFlush) {
ZSTD_inBuffer inBuff = { NULL, 0, 0 };
size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog+1);
size_t const adjustedDstSize = MIN(cBufferSize - outBuff.pos, randomDstSize);
outBuff.size = outBuff.pos + adjustedDstSize;
DISPLAYLEVEL(6, "t%u: End-flush into dst buffer of size %u \n", testNb, (unsigned)adjustedDstSize);
/* ZSTD_e_end guarantees maximal forward progress */
remainingToFlush = ZSTD_compressStream2(zc, &outBuff, &inBuff, ZSTD_e_end);
DISPLAYLEVEL(6, "t%u: Total flushed so far : %u bytes \n", testNb, (unsigned)outBuff.pos);
CHECK( ZSTD_isError(remainingToFlush),
"ZSTD_compressStream2 w/ ZSTD_e_end error : %s",
ZSTD_getErrorName(remainingToFlush) );
} }
crcOrig = XXH64_digest(&xxhState);
}
cSize = outBuff.pos;
compressedCrcs[iter] = XXH64(cBuffer, cSize, 0);
DISPLAYLEVEL(5, "Frame completed : %zu bytes \n", cSize);
}
/* final frame epilogue */
{ size_t remainingToFlush = 1;
while (remainingToFlush) {
ZSTD_inBuffer inBuff = { NULL, 0, 0 };
size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog+1);
size_t const adjustedDstSize = MIN(cBufferSize - cSize, randomDstSize);
outBuff.size = outBuff.pos + adjustedDstSize;
DISPLAYLEVEL(6, "t%u: End-flush into dst buffer of size %u \n", testNb, (unsigned)adjustedDstSize);
remainingToFlush = ZSTD_compressStream2(zc, &outBuff, &inBuff, ZSTD_e_end);
DISPLAYLEVEL(6, "t%u: Total flushed so far : %u bytes \n", testNb, (unsigned)outBuff.pos);
CHECK( ZSTD_isError(remainingToFlush),
"ZSTD_compressStream2 w/ ZSTD_e_end error : %s",
ZSTD_getErrorName(remainingToFlush) );
} }
crcOrig = XXH64_digest(&xxhState);
cSize = outBuff.pos;
DISPLAYLEVEL(5, "Frame completed : %zu bytes \n", cSize);
CHECK(!(compressedCrcs[0] == compressedCrcs[1]), "Compression is not deterministic!");
}
CHECK(badParameters(zc, savedParams), "CCtx params are wrong");
@ -2496,7 +2278,7 @@ static int FUZ_usage(const char* programName)
return 0;
}
typedef enum { simple_api, mt_api, advanced_api } e_api;
typedef enum { simple_api, advanced_api } e_api;
int main(int argc, const char** argv)
{
@ -2520,7 +2302,6 @@ int main(int argc, const char** argv)
/* Parsing commands. Aggregated commands are allowed */
if (argument[0]=='-') {
if (!strcmp(argument, "--mt")) { selected_api=mt_api; testNb += !testNb; continue; }
if (!strcmp(argument, "--newapi")) { selected_api=advanced_api; testNb += !testNb; continue; }
if (!strcmp(argument, "--no-big-tests")) { bigTests=0; continue; }
@ -2633,9 +2414,6 @@ int main(int argc, const char** argv)
case simple_api :
result = fuzzerTests(seed, nbTests, testNb, ((double)proba) / 100, bigTests);
break;
case mt_api :
result = fuzzerTests_MT(seed, nbTests, testNb, ((double)proba) / 100, bigTests);
break;
case advanced_api :
result = fuzzerTests_newAPI(seed, nbTests, testNb, ((double)proba) / 100, bigTests);
break;