Merge pull request #947 from facebook/fix944

Fix #944
This commit is contained in:
Yann Collet 2017-12-14 10:01:52 -08:00 committed by GitHub
commit c005df136f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 196 additions and 103 deletions

View File

@ -410,7 +410,7 @@ size_t ZSTD_CCtxParam_setParameter(
return ERROR(parameter_unsupported); return ERROR(parameter_unsupported);
#else #else
if (CCtxParams->nbThreads <= 1) return ERROR(parameter_unsupported); if (CCtxParams->nbThreads <= 1) return ERROR(parameter_unsupported);
return ZSTDMT_CCtxParam_setMTCtxParameter(CCtxParams, ZSTDMT_p_sectionSize, value); return ZSTDMT_CCtxParam_setMTCtxParameter(CCtxParams, ZSTDMT_p_jobSize, value);
#endif #endif
case ZSTD_p_overlapSizeLog : case ZSTD_p_overlapSizeLog :
@ -477,7 +477,7 @@ size_t ZSTD_CCtx_setParametersUsingCCtxParams(
ZSTDLIB_API size_t ZSTD_CCtx_setPledgedSrcSize(ZSTD_CCtx* cctx, unsigned long long pledgedSrcSize) ZSTDLIB_API size_t ZSTD_CCtx_setPledgedSrcSize(ZSTD_CCtx* cctx, unsigned long long pledgedSrcSize)
{ {
DEBUGLOG(4, " setting pledgedSrcSize to %u", (U32)pledgedSrcSize); DEBUGLOG(4, "ZSTD_CCtx_setPledgedSrcSize to %u bytes", (U32)pledgedSrcSize);
if (cctx->streamStage != zcss_init) return ERROR(stage_wrong); if (cctx->streamStage != zcss_init) return ERROR(stage_wrong);
cctx->pledgedSrcSizePlusOne = pledgedSrcSize+1; cctx->pledgedSrcSizePlusOne = pledgedSrcSize+1;
return 0; return 0;
@ -489,7 +489,7 @@ size_t ZSTD_CCtx_loadDictionary_advanced(
{ {
if (cctx->streamStage != zcss_init) return ERROR(stage_wrong); if (cctx->streamStage != zcss_init) return ERROR(stage_wrong);
if (cctx->staticSize) return ERROR(memory_allocation); /* no malloc for static CCtx */ if (cctx->staticSize) return ERROR(memory_allocation); /* no malloc for static CCtx */
DEBUGLOG(4, "load dictionary of size %u", (U32)dictSize); DEBUGLOG(4, "ZSTD_CCtx_loadDictionary_advanced (size: %u)", (U32)dictSize);
ZSTD_freeCDict(cctx->cdictLocal); /* in case one already exists */ ZSTD_freeCDict(cctx->cdictLocal); /* in case one already exists */
if (dict==NULL || dictSize==0) { /* no dictionary mode */ if (dict==NULL || dictSize==0) { /* no dictionary mode */
cctx->cdictLocal = NULL; cctx->cdictLocal = NULL;
@ -987,15 +987,16 @@ void ZSTD_invalidateRepCodes(ZSTD_CCtx* cctx) {
/*! ZSTD_copyCCtx_internal() : /*! ZSTD_copyCCtx_internal() :
* Duplicate an existing context `srcCCtx` into another one `dstCCtx`. * Duplicate an existing context `srcCCtx` into another one `dstCCtx`.
* The "context", in this case, refers to the hash and chain tables,
* entropy tables, and dictionary offsets.
* Only works during stage ZSTDcs_init (i.e. after creation, but before first call to ZSTD_compressContinue()). * Only works during stage ZSTDcs_init (i.e. after creation, but before first call to ZSTD_compressContinue()).
* pledgedSrcSize=0 means "empty". * The "context", in this case, refers to the hash and chain tables,
* @return : 0, or an error code */ * entropy tables, and dictionary references.
* `windowLog` value is enforced if != 0, otherwise value is copied from srcCCtx.
* @return : 0, or an error code */
static size_t ZSTD_copyCCtx_internal(ZSTD_CCtx* dstCCtx, static size_t ZSTD_copyCCtx_internal(ZSTD_CCtx* dstCCtx,
const ZSTD_CCtx* srcCCtx, const ZSTD_CCtx* srcCCtx,
unsigned windowLog,
ZSTD_frameParameters fParams, ZSTD_frameParameters fParams,
unsigned long long pledgedSrcSize, U64 pledgedSrcSize,
ZSTD_buffered_policy_e zbuff) ZSTD_buffered_policy_e zbuff)
{ {
DEBUGLOG(5, "ZSTD_copyCCtx_internal"); DEBUGLOG(5, "ZSTD_copyCCtx_internal");
@ -1005,6 +1006,7 @@ static size_t ZSTD_copyCCtx_internal(ZSTD_CCtx* dstCCtx,
{ ZSTD_CCtx_params params = dstCCtx->requestedParams; { ZSTD_CCtx_params params = dstCCtx->requestedParams;
/* Copy only compression parameters related to tables. */ /* Copy only compression parameters related to tables. */
params.cParams = srcCCtx->appliedParams.cParams; params.cParams = srcCCtx->appliedParams.cParams;
if (windowLog) params.cParams.windowLog = windowLog;
params.fParams = fParams; params.fParams = fParams;
ZSTD_resetCCtx_internal(dstCCtx, params, pledgedSrcSize, ZSTD_resetCCtx_internal(dstCCtx, params, pledgedSrcSize,
ZSTDcrp_noMemset, zbuff); ZSTDcrp_noMemset, zbuff);
@ -1056,7 +1058,9 @@ size_t ZSTD_copyCCtx(ZSTD_CCtx* dstCCtx, const ZSTD_CCtx* srcCCtx, unsigned long
if (pledgedSrcSize==0) pledgedSrcSize = ZSTD_CONTENTSIZE_UNKNOWN; if (pledgedSrcSize==0) pledgedSrcSize = ZSTD_CONTENTSIZE_UNKNOWN;
fParams.contentSizeFlag = (pledgedSrcSize != ZSTD_CONTENTSIZE_UNKNOWN); fParams.contentSizeFlag = (pledgedSrcSize != ZSTD_CONTENTSIZE_UNKNOWN);
return ZSTD_copyCCtx_internal(dstCCtx, srcCCtx, fParams, pledgedSrcSize, zbuff); return ZSTD_copyCCtx_internal(dstCCtx, srcCCtx,
0 /*windowLog from srcCCtx*/, fParams, pledgedSrcSize,
zbuff);
} }
@ -2043,12 +2047,12 @@ static size_t ZSTD_compress_insertDictionary(ZSTD_CCtx* cctx,
/*! ZSTD_compressBegin_internal() : /*! ZSTD_compressBegin_internal() :
* @return : 0, or an error code */ * @return : 0, or an error code */
static size_t ZSTD_compressBegin_internal(ZSTD_CCtx* cctx, size_t ZSTD_compressBegin_internal(ZSTD_CCtx* cctx,
const void* dict, size_t dictSize, const void* dict, size_t dictSize,
ZSTD_dictMode_e dictMode, ZSTD_dictMode_e dictMode,
const ZSTD_CDict* cdict, const ZSTD_CDict* cdict,
ZSTD_CCtx_params params, U64 pledgedSrcSize, ZSTD_CCtx_params params, U64 pledgedSrcSize,
ZSTD_buffered_policy_e zbuff) ZSTD_buffered_policy_e zbuff)
{ {
DEBUGLOG(4, "ZSTD_compressBegin_internal"); DEBUGLOG(4, "ZSTD_compressBegin_internal");
/* params are supposed to be fully validated at this point */ /* params are supposed to be fully validated at this point */
@ -2058,7 +2062,7 @@ static size_t ZSTD_compressBegin_internal(ZSTD_CCtx* cctx,
if (cdict && cdict->dictContentSize>0) { if (cdict && cdict->dictContentSize>0) {
cctx->requestedParams = params; cctx->requestedParams = params;
return ZSTD_copyCCtx_internal(cctx, cdict->refContext, return ZSTD_copyCCtx_internal(cctx, cdict->refContext,
params.fParams, pledgedSrcSize, params.cParams.windowLog, params.fParams, pledgedSrcSize,
zbuff); zbuff);
} }
@ -2067,17 +2071,19 @@ static size_t ZSTD_compressBegin_internal(ZSTD_CCtx* cctx,
return ZSTD_compress_insertDictionary(cctx, dict, dictSize, dictMode); return ZSTD_compress_insertDictionary(cctx, dict, dictSize, dictMode);
} }
size_t ZSTD_compressBegin_advanced_internal( size_t ZSTD_compressBegin_advanced_internal(ZSTD_CCtx* cctx,
ZSTD_CCtx* cctx,
const void* dict, size_t dictSize, const void* dict, size_t dictSize,
ZSTD_dictMode_e dictMode, ZSTD_dictMode_e dictMode,
const ZSTD_CDict* cdict,
ZSTD_CCtx_params params, ZSTD_CCtx_params params,
unsigned long long pledgedSrcSize) unsigned long long pledgedSrcSize)
{ {
DEBUGLOG(4, "ZSTD_compressBegin_advanced_internal"); DEBUGLOG(4, "ZSTD_compressBegin_advanced_internal");
/* compression parameters verification and optimization */ /* compression parameters verification and optimization */
CHECK_F( ZSTD_checkCParams(params.cParams) ); CHECK_F( ZSTD_checkCParams(params.cParams) );
return ZSTD_compressBegin_internal(cctx, dict, dictSize, dictMode, NULL, return ZSTD_compressBegin_internal(cctx,
dict, dictSize, dictMode,
cdict,
params, pledgedSrcSize, params, pledgedSrcSize,
ZSTDb_not_buffered); ZSTDb_not_buffered);
} }
@ -2090,9 +2096,10 @@ size_t ZSTD_compressBegin_advanced(ZSTD_CCtx* cctx,
{ {
ZSTD_CCtx_params const cctxParams = ZSTD_CCtx_params const cctxParams =
ZSTD_assignParamsToCCtxParams(cctx->requestedParams, params); ZSTD_assignParamsToCCtxParams(cctx->requestedParams, params);
return ZSTD_compressBegin_advanced_internal(cctx, dict, dictSize, ZSTD_dm_auto, return ZSTD_compressBegin_advanced_internal(cctx,
cctxParams, dict, dictSize, ZSTD_dm_auto,
pledgedSrcSize); NULL /*cdict*/,
cctxParams, pledgedSrcSize);
} }
size_t ZSTD_compressBegin_usingDict(ZSTD_CCtx* cctx, const void* dict, size_t dictSize, int compressionLevel) size_t ZSTD_compressBegin_usingDict(ZSTD_CCtx* cctx, const void* dict, size_t dictSize, int compressionLevel)
@ -2279,7 +2286,7 @@ static size_t ZSTD_initCDict_internal(
ZSTD_dictMode_e dictMode, ZSTD_dictMode_e dictMode,
ZSTD_compressionParameters cParams) ZSTD_compressionParameters cParams)
{ {
DEBUGLOG(4, "ZSTD_initCDict_internal, mode %u", (U32)dictMode); DEBUGLOG(3, "ZSTD_initCDict_internal, mode %u", (U32)dictMode);
if ((dictLoadMethod == ZSTD_dlm_byRef) || (!dictBuffer) || (!dictSize)) { if ((dictLoadMethod == ZSTD_dlm_byRef) || (!dictBuffer) || (!dictSize)) {
cdict->dictBuffer = NULL; cdict->dictBuffer = NULL;
cdict->dictContent = dictBuffer; cdict->dictContent = dictBuffer;
@ -2309,7 +2316,7 @@ ZSTD_CDict* ZSTD_createCDict_advanced(const void* dictBuffer, size_t dictSize,
ZSTD_dictMode_e dictMode, ZSTD_dictMode_e dictMode,
ZSTD_compressionParameters cParams, ZSTD_customMem customMem) ZSTD_compressionParameters cParams, ZSTD_customMem customMem)
{ {
DEBUGLOG(4, "ZSTD_createCDict_advanced, mode %u", (U32)dictMode); DEBUGLOG(3, "ZSTD_createCDict_advanced, mode %u", (U32)dictMode);
if (!customMem.customAlloc ^ !customMem.customFree) return NULL; if (!customMem.customAlloc ^ !customMem.customFree) return NULL;
{ ZSTD_CDict* const cdict = (ZSTD_CDict*)ZSTD_malloc(sizeof(ZSTD_CDict), customMem); { ZSTD_CDict* const cdict = (ZSTD_CDict*)ZSTD_malloc(sizeof(ZSTD_CDict), customMem);
@ -2513,10 +2520,10 @@ static size_t ZSTD_resetCStream_internal(ZSTD_CStream* zcs,
assert(!((dict) && (cdict))); /* either dict or cdict, not both */ assert(!((dict) && (cdict))); /* either dict or cdict, not both */
CHECK_F( ZSTD_compressBegin_internal(zcs, CHECK_F( ZSTD_compressBegin_internal(zcs,
dict, dictSize, dictMode, dict, dictSize, dictMode,
cdict, cdict,
params, pledgedSrcSize, params, pledgedSrcSize,
ZSTDb_buffered) ); ZSTDb_buffered) );
zcs->inToCompress = 0; zcs->inToCompress = 0;
zcs->inBuffPos = 0; zcs->inBuffPos = 0;
@ -2540,7 +2547,7 @@ size_t ZSTD_resetCStream(ZSTD_CStream* zcs, unsigned long long pledgedSrcSize)
} }
/*! ZSTD_initCStream_internal() : /*! ZSTD_initCStream_internal() :
* Note : not static, but hidden (not exposed). Used by zstdmt_compress.c * Note : for lib/compress only. Used by zstdmt_compress.c.
* Assumption 1 : params are valid * Assumption 1 : params are valid
* Assumption 2 : either dict, or cdict, is defined, not both */ * Assumption 2 : either dict, or cdict, is defined, not both */
size_t ZSTD_initCStream_internal(ZSTD_CStream* zcs, size_t ZSTD_initCStream_internal(ZSTD_CStream* zcs,
@ -2552,7 +2559,7 @@ size_t ZSTD_initCStream_internal(ZSTD_CStream* zcs,
assert(!((dict) && (cdict))); /* either dict or cdict, not both */ assert(!((dict) && (cdict))); /* either dict or cdict, not both */
if (dict && dictSize >= 8) { if (dict && dictSize >= 8) {
DEBUGLOG(5, "loading dictionary of size %u", (U32)dictSize); DEBUGLOG(4, "loading dictionary of size %u", (U32)dictSize);
if (zcs->staticSize) { /* static CCtx : never uses malloc */ if (zcs->staticSize) { /* static CCtx : never uses malloc */
/* incompatible with internal cdict creation */ /* incompatible with internal cdict creation */
return ERROR(memory_allocation); return ERROR(memory_allocation);
@ -2565,14 +2572,14 @@ size_t ZSTD_initCStream_internal(ZSTD_CStream* zcs,
if (zcs->cdictLocal == NULL) return ERROR(memory_allocation); if (zcs->cdictLocal == NULL) return ERROR(memory_allocation);
} else { } else {
if (cdict) { if (cdict) {
params.cParams = ZSTD_getCParamsFromCDict(cdict); /* cParams are enforced from cdict */ params.cParams = ZSTD_getCParamsFromCDict(cdict); /* cParams are enforced from cdict; it includes windowLog */
} }
ZSTD_freeCDict(zcs->cdictLocal); ZSTD_freeCDict(zcs->cdictLocal);
zcs->cdictLocal = NULL; zcs->cdictLocal = NULL;
zcs->cdict = cdict; zcs->cdict = cdict;
} }
params.compressionLevel = ZSTD_CLEVEL_CUSTOM; params.compressionLevel = ZSTD_CLEVEL_CUSTOM; /* enforce usage of cParams, instead of a dynamic derivation from cLevel (but does that happen ?) */
zcs->requestedParams = params; zcs->requestedParams = params;
return ZSTD_resetCStream_internal(zcs, NULL, 0, ZSTD_dm_auto, zcs->cdict, params, pledgedSrcSize); return ZSTD_resetCStream_internal(zcs, NULL, 0, ZSTD_dm_auto, zcs->cdict, params, pledgedSrcSize);
@ -2612,10 +2619,9 @@ size_t ZSTD_initCStream_advanced(ZSTD_CStream* zcs,
const void* dict, size_t dictSize, const void* dict, size_t dictSize,
ZSTD_parameters params, unsigned long long pledgedSrcSize) ZSTD_parameters params, unsigned long long pledgedSrcSize)
{ {
ZSTD_CCtx_params const cctxParams = ZSTD_CCtx_params const cctxParams = ZSTD_assignParamsToCCtxParams(zcs->requestedParams, params);
ZSTD_assignParamsToCCtxParams(zcs->requestedParams, params);
DEBUGLOG(4, "ZSTD_initCStream_advanced: pledgedSrcSize=%u, flag=%u", DEBUGLOG(4, "ZSTD_initCStream_advanced: pledgedSrcSize=%u, flag=%u",
(U32)pledgedSrcSize, params.fParams.contentSizeFlag); (U32)pledgedSrcSize, params.fParams.contentSizeFlag);
CHECK_F( ZSTD_checkCParams(params.cParams) ); CHECK_F( ZSTD_checkCParams(params.cParams) );
if ((pledgedSrcSize==0) && (params.fParams.contentSizeFlag==0)) pledgedSrcSize = ZSTD_CONTENTSIZE_UNKNOWN; /* for compatibility with older programs relying on this behavior. Users should now specify ZSTD_CONTENTSIZE_UNKNOWN. This line will be removed in the future. */ if ((pledgedSrcSize==0) && (params.fParams.contentSizeFlag==0)) pledgedSrcSize = ZSTD_CONTENTSIZE_UNKNOWN; /* for compatibility with older programs relying on this behavior. Users should now specify ZSTD_CONTENTSIZE_UNKNOWN. This line will be removed in the future. */
return ZSTD_initCStream_internal(zcs, dict, dictSize, NULL /*cdict*/, cctxParams, pledgedSrcSize); return ZSTD_initCStream_internal(zcs, dict, dictSize, NULL /*cdict*/, cctxParams, pledgedSrcSize);
@ -2815,7 +2821,7 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx,
ZSTD_inBuffer* input, ZSTD_inBuffer* input,
ZSTD_EndDirective endOp) ZSTD_EndDirective endOp)
{ {
DEBUGLOG(5, "ZSTD_compress_generic"); DEBUGLOG(5, "ZSTD_compress_generic, endOp=%u ", (U32)endOp);
/* check conditions */ /* check conditions */
if (output->pos > output->size) return ERROR(GENERIC); if (output->pos > output->size) return ERROR(GENERIC);
if (input->pos > input->size) return ERROR(GENERIC); if (input->pos > input->size) return ERROR(GENERIC);
@ -2862,7 +2868,6 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx,
#ifdef ZSTD_MULTITHREAD #ifdef ZSTD_MULTITHREAD
if (cctx->appliedParams.nbThreads > 1) { if (cctx->appliedParams.nbThreads > 1) {
size_t const flushMin = ZSTDMT_compressStream_generic(cctx->mtctx, output, input, endOp); size_t const flushMin = ZSTDMT_compressStream_generic(cctx->mtctx, output, input, endOp);
DEBUGLOG(5, "ZSTDMT_compressStream_generic result : %u", (U32)flushMin);
if ( ZSTD_isError(flushMin) if ( ZSTD_isError(flushMin)
|| (endOp == ZSTD_e_end && flushMin == 0) ) { /* compression completed */ || (endOp == ZSTD_e_end && flushMin == 0) ) { /* compression completed */
ZSTD_startNewCompression(cctx); ZSTD_startNewCompression(cctx);

View File

@ -447,6 +447,7 @@ ZSTD_compressionParameters ZSTD_getCParamsFromCDict(const ZSTD_CDict* cdict);
size_t ZSTD_compressBegin_advanced_internal(ZSTD_CCtx* cctx, size_t ZSTD_compressBegin_advanced_internal(ZSTD_CCtx* cctx,
const void* dict, size_t dictSize, const void* dict, size_t dictSize,
ZSTD_dictMode_e dictMode, ZSTD_dictMode_e dictMode,
const ZSTD_CDict* cdict,
ZSTD_CCtx_params params, ZSTD_CCtx_params params,
unsigned long long pledgedSrcSize); unsigned long long pledgedSrcSize);

View File

@ -310,7 +310,7 @@ static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx)
typedef struct { typedef struct {
buffer_t src; buffer_t src;
const void* srcStart; const void* srcStart;
size_t dictSize; size_t prefixSize;
size_t srcSize; size_t srcSize;
buffer_t dstBuff; buffer_t dstBuff;
size_t cSize; size_t cSize;
@ -333,10 +333,10 @@ void ZSTDMT_compressChunk(void* jobDescription)
{ {
ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription; ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription;
ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(job->cctxPool); ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(job->cctxPool);
const void* const src = (const char*)job->srcStart + job->dictSize; const void* const src = (const char*)job->srcStart + job->prefixSize;
buffer_t dstBuff = job->dstBuff; buffer_t dstBuff = job->dstBuff;
DEBUGLOG(5, "ZSTDMT_compressChunk: job (first:%u) (last:%u) : dictSize %u, srcSize %u", DEBUGLOG(5, "ZSTDMT_compressChunk: job (first:%u) (last:%u) : prefixSize %u, srcSize %u ",
job->firstChunk, job->lastChunk, (U32)job->dictSize, (U32)job->srcSize); job->firstChunk, job->lastChunk, (U32)job->prefixSize, (U32)job->srcSize);
if (cctx==NULL) { if (cctx==NULL) {
job->cSize = ERROR(memory_allocation); job->cSize = ERROR(memory_allocation);
@ -350,28 +350,35 @@ void ZSTDMT_compressChunk(void* jobDescription)
goto _endJob; goto _endJob;
} }
job->dstBuff = dstBuff; job->dstBuff = dstBuff;
DEBUGLOG(5, "ZSTDMT_compressChunk: allocated dstBuff of size %u", (U32)dstBuff.size); DEBUGLOG(5, "ZSTDMT_compressChunk: received dstBuff of size %u", (U32)dstBuff.size);
} }
if (job->cdict) { if (job->cdict) {
size_t const initError = ZSTD_compressBegin_usingCDict_advanced(cctx, job->cdict, job->params.fParams, job->fullFrameSize); size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dm_auto, job->cdict, job->params, job->fullFrameSize);
DEBUGLOG(4, "ZSTDMT_compressChunk: init using CDict"); DEBUGLOG(4, "ZSTDMT_compressChunk: init using CDict");
assert(job->firstChunk); /* should only happen for first segment */ assert(job->firstChunk); /* only allowed for first job */
if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; } if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; }
} else { /* srcStart points at reloaded section */ } else { /* srcStart points at reloaded section */
U64 const pledgedSrcSize = job->firstChunk ? job->fullFrameSize : ZSTD_CONTENTSIZE_UNKNOWN;
ZSTD_CCtx_params jobParams = job->params; /* do not modify job->params ! copy it, modify the copy */ ZSTD_CCtx_params jobParams = job->params; /* do not modify job->params ! copy it, modify the copy */
size_t const forceWindowError = ZSTD_CCtxParam_setParameter(&jobParams, ZSTD_p_forceMaxWindow, !job->firstChunk); size_t const forceWindowError = ZSTD_CCtxParam_setParameter(&jobParams, ZSTD_p_forceMaxWindow, !job->firstChunk);
U64 const pledgedSrcSize = job->firstChunk ? job->fullFrameSize : ZSTD_CONTENTSIZE_UNKNOWN; if (ZSTD_isError(forceWindowError)) {
/* load dictionary in "content-only" mode (no header analysis) */ DEBUGLOG(5, "ZSTD_CCtxParam_setParameter error : %s ", ZSTD_getErrorName(forceWindowError));
size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, job->srcStart, job->dictSize, ZSTD_dm_rawContent, jobParams, pledgedSrcSize); job->cSize = forceWindowError;
if (ZSTD_isError(initError) || ZSTD_isError(forceWindowError)) {
job->cSize = initError;
goto _endJob; goto _endJob;
} }
/* load dictionary in "content-only" mode (no header analysis) */
{ size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, job->srcStart, job->prefixSize, ZSTD_dm_rawContent, NULL, jobParams, pledgedSrcSize);
DEBUGLOG(5, "ZSTD_compressBegin_advanced_internal called with windowLog = %u ", jobParams.cParams.windowLog);
if (ZSTD_isError(initError)) {
DEBUGLOG(5, "ZSTD_compressBegin_advanced_internal error : %s ", ZSTD_getErrorName(initError));
job->cSize = initError;
goto _endJob;
} }
} }
if (!job->firstChunk) { /* flush and overwrite frame header when it's not first segment */ if (!job->firstChunk) { /* flush and overwrite frame header when it's not first job */
size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, 0); size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, 0);
if (ZSTD_isError(hSize)) { job->cSize = hSize; goto _endJob; } if (ZSTD_isError(hSize)) { job->cSize = hSize; /* save error code */ goto _endJob; }
ZSTD_invalidateRepCodes(cctx); ZSTD_invalidateRepCodes(cctx);
} }
@ -380,9 +387,9 @@ void ZSTDMT_compressChunk(void* jobDescription)
job->cSize = (job->lastChunk) ? job->cSize = (job->lastChunk) ?
ZSTD_compressEnd (cctx, dstBuff.start, dstBuff.size, src, job->srcSize) : ZSTD_compressEnd (cctx, dstBuff.start, dstBuff.size, src, job->srcSize) :
ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, job->srcSize); ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, job->srcSize);
DEBUGLOG(5, "compressed %u bytes into %u bytes (first:%u) (last:%u)", DEBUGLOG(5, "compressed %u bytes into %u bytes (first:%u) (last:%u) ",
(unsigned)job->srcSize, (unsigned)job->cSize, job->firstChunk, job->lastChunk); (unsigned)job->srcSize, (unsigned)job->cSize, job->firstChunk, job->lastChunk);
DEBUGLOG(5, "dstBuff.size : %u ; => %s", (U32)dstBuff.size, ZSTD_getErrorName(job->cSize)); DEBUGLOG(5, "dstBuff.size : %u ; => %s ", (U32)dstBuff.size, ZSTD_getErrorName(job->cSize));
_endJob: _endJob:
ZSTDMT_releaseCCtx(job->cctxPool, cctx); ZSTDMT_releaseCCtx(job->cctxPool, cctx);
@ -558,11 +565,13 @@ size_t ZSTDMT_sizeof_CCtx(ZSTDMT_CCtx* mtctx)
} }
/* Internal only */ /* Internal only */
size_t ZSTDMT_CCtxParam_setMTCtxParameter( size_t ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params,
ZSTD_CCtx_params* params, ZSTDMT_parameter parameter, unsigned value) { ZSTDMT_parameter parameter, unsigned value) {
DEBUGLOG(4, "ZSTDMT_CCtxParam_setMTCtxParameter");
switch(parameter) switch(parameter)
{ {
case ZSTDMT_p_sectionSize : case ZSTDMT_p_jobSize :
DEBUGLOG(4, "ZSTDMT_CCtxParam_setMTCtxParameter : set jobSize to %u", value);
if ( (value > 0) /* value==0 => automatic job size */ if ( (value > 0) /* value==0 => automatic job size */
& (value < ZSTDMT_JOBSIZE_MIN) ) & (value < ZSTDMT_JOBSIZE_MIN) )
value = ZSTDMT_JOBSIZE_MIN; value = ZSTDMT_JOBSIZE_MIN;
@ -580,9 +589,10 @@ size_t ZSTDMT_CCtxParam_setMTCtxParameter(
size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, unsigned value) size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, unsigned value)
{ {
DEBUGLOG(4, "ZSTDMT_setMTCtxParameter");
switch(parameter) switch(parameter)
{ {
case ZSTDMT_p_sectionSize : case ZSTDMT_p_jobSize :
return ZSTDMT_CCtxParam_setMTCtxParameter(&mtctx->params, parameter, value); return ZSTDMT_CCtxParam_setMTCtxParameter(&mtctx->params, parameter, value);
case ZSTDMT_p_overlapSectionLog : case ZSTDMT_p_overlapSectionLog :
return ZSTDMT_CCtxParam_setMTCtxParameter(&mtctx->params, parameter, value); return ZSTDMT_CCtxParam_setMTCtxParameter(&mtctx->params, parameter, value);
@ -618,7 +628,7 @@ static size_t ZSTDMT_compress_advanced_internal(
size_t const overlapSize = (overlapRLog>=9) ? 0 : (size_t)1 << (params.cParams.windowLog - overlapRLog); size_t const overlapSize = (overlapRLog>=9) ? 0 : (size_t)1 << (params.cParams.windowLog - overlapRLog);
unsigned nbChunks = computeNbChunks(srcSize, params.cParams.windowLog, params.nbThreads); unsigned nbChunks = computeNbChunks(srcSize, params.cParams.windowLog, params.nbThreads);
size_t const proposedChunkSize = (srcSize + (nbChunks-1)) / nbChunks; size_t const proposedChunkSize = (srcSize + (nbChunks-1)) / nbChunks;
size_t const avgChunkSize = ((proposedChunkSize & 0x1FFFF) < 0x7FFF) ? proposedChunkSize + 0xFFFF : proposedChunkSize; /* avoid too small last block */ size_t const avgChunkSize = (((proposedChunkSize-1) & 0x1FFFF) < 0x7FFF) ? proposedChunkSize + 0xFFFF : proposedChunkSize; /* avoid too small last block */
const char* const srcStart = (const char*)src; const char* const srcStart = (const char*)src;
size_t remainingSrcSize = srcSize; size_t remainingSrcSize = srcSize;
unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbChunks : (unsigned)(dstCapacity / ZSTD_compressBound(avgChunkSize)); /* presumes avgChunkSize >= 256 KB, which should be the case */ unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbChunks : (unsigned)(dstCapacity / ZSTD_compressBound(avgChunkSize)); /* presumes avgChunkSize >= 256 KB, which should be the case */
@ -628,7 +638,8 @@ static size_t ZSTDMT_compress_advanced_internal(
assert(mtctx->cctxPool->totalCCtx == params.nbThreads); assert(mtctx->cctxPool->totalCCtx == params.nbThreads);
DEBUGLOG(4, "ZSTDMT_compress_advanced_internal"); DEBUGLOG(4, "ZSTDMT_compress_advanced_internal");
DEBUGLOG(4, "nbChunks : %2u (chunkSize : %u bytes) ", nbChunks, (U32)avgChunkSize); DEBUGLOG(4, "nbChunks : %2u (raw chunkSize : %u bytes; fixed chunkSize: %u) ",
nbChunks, (U32)proposedChunkSize, (U32)avgChunkSize);
if (nbChunks==1) { /* fallback to single-thread mode */ if (nbChunks==1) { /* fallback to single-thread mode */
ZSTD_CCtx* const cctx = mtctx->cctxPool->cctx[0]; ZSTD_CCtx* const cctx = mtctx->cctxPool->cctx[0];
if (cdict) return ZSTD_compress_usingCDict_advanced(cctx, dst, dstCapacity, src, srcSize, cdict, jobParams.fParams); if (cdict) return ZSTD_compress_usingCDict_advanced(cctx, dst, dstCapacity, src, srcSize, cdict, jobParams.fParams);
@ -657,7 +668,7 @@ static size_t ZSTDMT_compress_advanced_internal(
mtctx->jobs[u].src = g_nullBuffer; mtctx->jobs[u].src = g_nullBuffer;
mtctx->jobs[u].srcStart = srcStart + frameStartPos - dictSize; mtctx->jobs[u].srcStart = srcStart + frameStartPos - dictSize;
mtctx->jobs[u].dictSize = dictSize; mtctx->jobs[u].prefixSize = dictSize;
mtctx->jobs[u].srcSize = chunkSize; mtctx->jobs[u].srcSize = chunkSize;
mtctx->jobs[u].cdict = (u==0) ? cdict : NULL; mtctx->jobs[u].cdict = (u==0) ? cdict : NULL;
mtctx->jobs[u].fullFrameSize = srcSize; mtctx->jobs[u].fullFrameSize = srcSize;
@ -815,7 +826,7 @@ size_t ZSTDMT_initCStream_internal(
zcs->targetSectionSize = params.jobSize ? params.jobSize : (size_t)1 << (params.cParams.windowLog + 2); zcs->targetSectionSize = params.jobSize ? params.jobSize : (size_t)1 << (params.cParams.windowLog + 2);
if (zcs->targetSectionSize < ZSTDMT_JOBSIZE_MIN) zcs->targetSectionSize = ZSTDMT_JOBSIZE_MIN; if (zcs->targetSectionSize < ZSTDMT_JOBSIZE_MIN) zcs->targetSectionSize = ZSTDMT_JOBSIZE_MIN;
if (zcs->targetSectionSize < zcs->targetDictSize) zcs->targetSectionSize = zcs->targetDictSize; /* job size must be >= overlap size */ if (zcs->targetSectionSize < zcs->targetDictSize) zcs->targetSectionSize = zcs->targetDictSize; /* job size must be >= overlap size */
DEBUGLOG(4, "Job Size : %u KB", (U32)(zcs->targetSectionSize>>10)); DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(zcs->targetSectionSize>>10), params.jobSize);
zcs->inBuffSize = zcs->targetDictSize + zcs->targetSectionSize; zcs->inBuffSize = zcs->targetDictSize + zcs->targetSectionSize;
DEBUGLOG(4, "inBuff Size : %u KB", (U32)(zcs->inBuffSize>>10)); DEBUGLOG(4, "inBuff Size : %u KB", (U32)(zcs->inBuffSize>>10));
ZSTDMT_setBufferSize(zcs->bufPool, MAX(zcs->inBuffSize, ZSTD_compressBound(zcs->targetSectionSize)) ); ZSTDMT_setBufferSize(zcs->bufPool, MAX(zcs->inBuffSize, ZSTD_compressBound(zcs->targetSectionSize)) );
@ -888,7 +899,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
zcs->jobs[jobID].src = zcs->inBuff.buffer; zcs->jobs[jobID].src = zcs->inBuff.buffer;
zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start; zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
zcs->jobs[jobID].srcSize = srcSize; zcs->jobs[jobID].srcSize = srcSize;
zcs->jobs[jobID].dictSize = zcs->dictSize; zcs->jobs[jobID].prefixSize = zcs->dictSize;
assert(zcs->inBuff.filled >= srcSize + zcs->dictSize); assert(zcs->inBuff.filled >= srcSize + zcs->dictSize);
zcs->jobs[jobID].params = zcs->params; zcs->jobs[jobID].params = zcs->params;
/* do not calculate checksum within sections, but write it in header for first section */ /* do not calculate checksum within sections, but write it in header for first section */
@ -953,6 +964,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush) static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush)
{ {
unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask; unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
DEBUGLOG(5, "ZSTDMT_flushNextJob");
if (zcs->doneJobID == zcs->nextJobID) return 0; /* all flushed ! */ if (zcs->doneJobID == zcs->nextJobID) return 0; /* all flushed ! */
ZSTD_PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex); ZSTD_PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
while (zcs->jobs[wJobID].jobCompleted==0) { while (zcs->jobs[wJobID].jobCompleted==0) {
@ -965,7 +977,8 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi
{ ZSTDMT_jobDescription job = zcs->jobs[wJobID]; { ZSTDMT_jobDescription job = zcs->jobs[wJobID];
if (!job.jobScanned) { if (!job.jobScanned) {
if (ZSTD_isError(job.cSize)) { if (ZSTD_isError(job.cSize)) {
DEBUGLOG(5, "compression error detected "); DEBUGLOG(5, "job %u : compression error detected : %s",
zcs->doneJobID, ZSTD_getErrorName(job.cSize));
ZSTDMT_waitForAllJobsCompleted(zcs); ZSTDMT_waitForAllJobsCompleted(zcs);
ZSTDMT_releaseAllJobResources(zcs); ZSTDMT_releaseAllJobResources(zcs);
return job.cSize; return job.cSize;
@ -1014,7 +1027,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
{ {
size_t const newJobThreshold = mtctx->dictSize + mtctx->targetSectionSize; size_t const newJobThreshold = mtctx->dictSize + mtctx->targetSectionSize;
unsigned forwardInputProgress = 0; unsigned forwardInputProgress = 0;
DEBUGLOG(5, "ZSTDMT_compressStream_generic"); DEBUGLOG(5, "ZSTDMT_compressStream_generic ");
assert(output->pos <= output->size); assert(output->pos <= output->size);
assert(input->pos <= input->size); assert(input->pos <= input->size);
@ -1097,9 +1110,11 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, unsigned endFrame) static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, unsigned endFrame)
{ {
size_t const srcSize = mtctx->inBuff.filled - mtctx->dictSize; size_t const srcSize = mtctx->inBuff.filled - mtctx->dictSize;
DEBUGLOG(5, "ZSTDMT_flushStream_internal");
if ( ((srcSize > 0) || (endFrame && !mtctx->frameEnded)) if ( ((srcSize > 0) || (endFrame && !mtctx->frameEnded))
&& (mtctx->nextJobID <= mtctx->doneJobID + mtctx->jobIDMask) ) { && (mtctx->nextJobID <= mtctx->doneJobID + mtctx->jobIDMask) ) {
DEBUGLOG(5, "ZSTDMT_flushStream_internal : create a new job");
CHECK_F( ZSTDMT_createCompressionJob(mtctx, srcSize, endFrame) ); CHECK_F( ZSTDMT_createCompressionJob(mtctx, srcSize, endFrame) );
} }

View File

@ -84,13 +84,13 @@ ZSTDLIB_API size_t ZSTDMT_initCStream_usingCDict(ZSTDMT_CCtx* mtctx,
/* ZSTDMT_parameter : /* ZSTDMT_parameter :
* List of parameters that can be set using ZSTDMT_setMTCtxParameter() */ * List of parameters that can be set using ZSTDMT_setMTCtxParameter() */
typedef enum { typedef enum {
ZSTDMT_p_sectionSize, /* size of input "section". Each section is compressed in parallel. 0 means default, which is dynamically determined within compression functions */ ZSTDMT_p_jobSize, /* Each job is compressed in parallel. By default, this value is dynamically determined depending on compression parameters. Can be set explicitly here. */
ZSTDMT_p_overlapSectionLog /* Log of overlapped section; 0 == no overlap, 6(default) == use 1/8th of window, >=9 == use full window */ ZSTDMT_p_overlapSectionLog /* Each job may reload a part of previous job to enhance compressionr ratio; 0 == no overlap, 6(default) == use 1/8th of window, >=9 == use full window */
} ZSTDMT_parameter; } ZSTDMT_parameter;
/* ZSTDMT_setMTCtxParameter() : /* ZSTDMT_setMTCtxParameter() :
* allow setting individual parameters, one at a time, among a list of enums defined in ZSTDMT_parameter. * allow setting individual parameters, one at a time, among a list of enums defined in ZSTDMT_parameter.
* The function must be called typically after ZSTD_createCCtx(). * The function must be called typically after ZSTD_createCCtx() but __before ZSTDMT_init*() !__
* Parameters not explicitly reset by ZSTDMT_init*() remain the same in consecutive compression sessions. * Parameters not explicitly reset by ZSTDMT_init*() remain the same in consecutive compression sessions.
* @return : 0, or an error code (which can be tested using ZSTD_isError()) */ * @return : 0, or an error code (which can be tested using ZSTD_isError()) */
ZSTDLIB_API size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, unsigned value); ZSTDLIB_API size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, unsigned value);

View File

@ -2451,14 +2451,16 @@ size_t ZSTD_decompressStream(ZSTD_DStream* zds, ZSTD_outBuffer* output, ZSTD_inB
return ZSTD_decompressLegacyStream(zds->legacyContext, legacyVersion, output, input); return ZSTD_decompressLegacyStream(zds->legacyContext, legacyVersion, output, input);
} }
#endif #endif
return hSize; /* error */ return hSize; /* error */
} }
if (hSize != 0) { /* need more input */ if (hSize != 0) { /* need more input */
size_t const toLoad = hSize - zds->lhSize; /* if hSize!=0, hSize > zds->lhSize */ size_t const toLoad = hSize - zds->lhSize; /* if hSize!=0, hSize > zds->lhSize */
if (toLoad > (size_t)(iend-ip)) { /* not enough input to load full header */ size_t const remainingInput = (size_t)(iend-ip);
if (iend-ip > 0) { assert(iend >= ip);
memcpy(zds->headerBuffer + zds->lhSize, ip, iend-ip); if (toLoad > remainingInput) { /* not enough input to load full header */
zds->lhSize += iend-ip; if (remainingInput > 0) {
memcpy(zds->headerBuffer + zds->lhSize, ip, remainingInput);
zds->lhSize += remainingInput;
} }
input->pos = input->size; input->pos = input->size;
return (MAX(ZSTD_frameHeaderSize_min, hSize) - zds->lhSize) + ZSTD_blockHeaderSize; /* remaining header bytes + next block header */ return (MAX(ZSTD_frameHeaderSize_min, hSize) - zds->lhSize) + ZSTD_blockHeaderSize; /* remaining header bytes + next block header */
@ -2473,8 +2475,10 @@ size_t ZSTD_decompressStream(ZSTD_DStream* zds, ZSTD_outBuffer* output, ZSTD_inB
&& (U64)(size_t)(oend-op) >= zds->fParams.frameContentSize) { && (U64)(size_t)(oend-op) >= zds->fParams.frameContentSize) {
size_t const cSize = ZSTD_findFrameCompressedSize(istart, iend-istart); size_t const cSize = ZSTD_findFrameCompressedSize(istart, iend-istart);
if (cSize <= (size_t)(iend-istart)) { if (cSize <= (size_t)(iend-istart)) {
/* shortcut : using single-pass mode */
size_t const decompressedSize = ZSTD_decompress_usingDDict(zds, op, oend-op, istart, cSize, zds->ddict); size_t const decompressedSize = ZSTD_decompress_usingDDict(zds, op, oend-op, istart, cSize, zds->ddict);
if (ZSTD_isError(decompressedSize)) return decompressedSize; if (ZSTD_isError(decompressedSize)) return decompressedSize;
DEBUGLOG(4, "shortcut to single-pass ZSTD_decompress_usingDDict()")
ip = istart + cSize; ip = istart + cSize;
op += decompressedSize; op += decompressedSize;
zds->expected = 0; zds->expected = 0;
@ -2497,8 +2501,9 @@ size_t ZSTD_decompressStream(ZSTD_DStream* zds, ZSTD_outBuffer* output, ZSTD_inB
} }
/* control buffer memory usage */ /* control buffer memory usage */
DEBUGLOG(4, "Control max buffer memory usage (max %u KB)", DEBUGLOG(4, "Control max memory usage (%u KB <= max %u KB)",
(U32)(zds->maxWindowSize >> 10)); (U32)(zds->fParams.windowSize >>10),
(U32)(zds->maxWindowSize >> 10) );
zds->fParams.windowSize = MAX(zds->fParams.windowSize, 1U << ZSTD_WINDOWLOG_ABSOLUTEMIN); zds->fParams.windowSize = MAX(zds->fParams.windowSize, 1U << ZSTD_WINDOWLOG_ABSOLUTEMIN);
if (zds->fParams.windowSize > zds->maxWindowSize) return ERROR(frameParameter_windowTooLarge); if (zds->fParams.windowSize > zds->maxWindowSize) return ERROR(frameParameter_windowTooLarge);

View File

@ -1016,7 +1016,8 @@ typedef enum {
* More threads improve speed, but also increase memory usage. * More threads improve speed, but also increase memory usage.
* Can only receive a value > 1 if ZSTD_MULTITHREAD is enabled. * Can only receive a value > 1 if ZSTD_MULTITHREAD is enabled.
* Special: value 0 means "do not change nbThreads" */ * Special: value 0 means "do not change nbThreads" */
ZSTD_p_jobSize, /* Size of a compression job. Each compression job is completed in parallel. ZSTD_p_jobSize, /* Size of a compression job. This value is only enforced in streaming (non-blocking) mode.
* Each compression job is completed in parallel, so indirectly controls the nb of active threads.
* 0 means default, which is dynamically determined based on compression parameters. * 0 means default, which is dynamically determined based on compression parameters.
* Job size must be a minimum of overlapSize, or 1 KB, whichever is largest * Job size must be a minimum of overlapSize, or 1 KB, whichever is largest
* The minimum size is automatically and transparently enforced */ * The minimum size is automatically and transparently enforced */
@ -1144,13 +1145,19 @@ typedef enum {
* - Compression parameters cannot be changed once compression is started. * - Compression parameters cannot be changed once compression is started.
* - outpot->pos must be <= dstCapacity, input->pos must be <= srcSize * - outpot->pos must be <= dstCapacity, input->pos must be <= srcSize
* - outpot->pos and input->pos will be updated. They are guaranteed to remain below their respective limit. * - outpot->pos and input->pos will be updated. They are guaranteed to remain below their respective limit.
* - @return provides the minimum amount of data still to flush from internal buffers * - In single-thread mode (default), function is blocking : it completed its job before returning to caller.
* - In multi-thread mode, function is non-blocking : it just acquires a copy of input, and distribute job to internal worker threads,
* and then immediately returns, just indicating that there is some data remaining to be flushed.
* The function nonetheless guarantees forward progress : it will return only after it reads or write at least 1+ byte.
* - Exception : in multi-threading mode, if the first call requests a ZSTD_e_end directive, it is blocking : it will complete compression before giving back control to caller.
* - @return provides the minimum amount of data remaining to be flushed from internal buffers
* or an error code, which can be tested using ZSTD_isError(). * or an error code, which can be tested using ZSTD_isError().
* if @return != 0, flush is not fully completed, there is some data left within internal buffers. * if @return != 0, flush is not fully completed, there is still some data left within internal buffers.
* - after a ZSTD_e_end directive, if internal buffer is not fully flushed, * This is useful to determine if a ZSTD_e_flush or ZSTD_e_end directive is completed.
* - after a ZSTD_e_end directive, if internal buffer is not fully flushed (@return != 0),
* only ZSTD_e_end or ZSTD_e_flush operations are allowed. * only ZSTD_e_end or ZSTD_e_flush operations are allowed.
* It is necessary to fully flush internal buffers * Before starting a new compression job, or changing compression parameters,
* before starting a new compression job, or changing compression parameters. * it is required to fully flush internal buffers.
*/ */
ZSTDLIB_API size_t ZSTD_compress_generic (ZSTD_CCtx* cctx, ZSTDLIB_API size_t ZSTD_compress_generic (ZSTD_CCtx* cctx,
ZSTD_outBuffer* output, ZSTD_outBuffer* output,

View File

@ -76,16 +76,16 @@ allnothread: fullbench fuzzer paramgrill datagen decodecorpus
dll: fuzzer-dll zstreamtest-dll dll: fuzzer-dll zstreamtest-dll
zstd: zstd:
$(MAKE) -C $(PRGDIR) $@ $(MAKE) -C $(PRGDIR) $@ MOREFLAGS+="$(DEBUGFLAGS)"
zstd32: zstd32:
$(MAKE) -C $(PRGDIR) $@ $(MAKE) -C $(PRGDIR) $@ MOREFLAGS+="$(DEBUGFLAGS)"
zstd-nolegacy: zstd-nolegacy:
$(MAKE) -C $(PRGDIR) $@ $(MAKE) -C $(PRGDIR) $@ MOREFLAGS+="$(DEBUGFLAGS)"
gzstd: gzstd:
$(MAKE) -C $(PRGDIR) zstd HAVE_ZLIB=1 $(MAKE) -C $(PRGDIR) zstd HAVE_ZLIB=1 MOREFLAGS="$(DEBUGFLAGS)"
fullbench32: CPPFLAGS += -m32 fullbench32: CPPFLAGS += -m32
fullbench fullbench32 : CPPFLAGS += $(MULTITHREAD_CPP) fullbench fullbench32 : CPPFLAGS += $(MULTITHREAD_CPP)

View File

@ -305,6 +305,11 @@ cp $TESTFILE tmp
$ZSTD -f tmp -D tmpDict $ZSTD -f tmp -D tmpDict
$ZSTD -d tmp.zst -D tmpDict -fo result $ZSTD -d tmp.zst -D tmpDict -fo result
$DIFF $TESTFILE result $DIFF $TESTFILE result
if [ -n "$hasMT" ]
then
$ECHO "- Test dictionary compression with multithreading "
./datagen -g5M | $ZSTD -T2 -D tmpDict | $ZSTD -t -D tmpDict # fails with v1.3.2
fi
$ECHO "- Create second (different) dictionary " $ECHO "- Create second (different) dictionary "
$ZSTD --train *.c ../programs/*.c ../programs/*.h -o tmpDictC $ZSTD --train *.c ../programs/*.c ../programs/*.h -o tmpDictC
$ZSTD -d tmp.zst -D tmpDictC -fo result && die "wrong dictionary not detected!" $ZSTD -d tmp.zst -D tmpDictC -fo result && die "wrong dictionary not detected!"

View File

@ -524,12 +524,12 @@ static int basicUnitTests(U32 seed, double compressibility)
{ ZSTD_DDict* const ddict = ZSTD_createDDict(dictionary.start, dictionary.filled); { ZSTD_DDict* const ddict = ZSTD_createDDict(dictionary.start, dictionary.filled);
size_t const initError = ZSTD_initDStream_usingDDict(zd, ddict); size_t const initError = ZSTD_initDStream_usingDDict(zd, ddict);
if (ZSTD_isError(initError)) goto _output_error; if (ZSTD_isError(initError)) goto _output_error;
inBuff.src = compressedBuffer;
inBuff.size = cSize;
inBuff.pos = 0;
outBuff.dst = decodedBuffer; outBuff.dst = decodedBuffer;
outBuff.size = CNBufferSize; outBuff.size = CNBufferSize;
outBuff.pos = 0; outBuff.pos = 0;
inBuff.src = compressedBuffer;
inBuff.size = cSize;
inBuff.pos = 0;
{ size_t const r = ZSTD_decompressStream(zd, &outBuff, &inBuff); { size_t const r = ZSTD_decompressStream(zd, &outBuff, &inBuff);
if (r != 0) goto _output_error; } /* should reach end of frame == 0; otherwise, some data left, or an error */ if (r != 0) goto _output_error; } /* should reach end of frame == 0; otherwise, some data left, or an error */
if (outBuff.pos != CNBufferSize) goto _output_error; /* should regenerate the same amount */ if (outBuff.pos != CNBufferSize) goto _output_error; /* should regenerate the same amount */
@ -548,12 +548,12 @@ static int basicUnitTests(U32 seed, double compressibility)
DISPLAYLEVEL(3, "test%3i : maxWindowSize < frame requirement : ", testNb++); DISPLAYLEVEL(3, "test%3i : maxWindowSize < frame requirement : ", testNb++);
ZSTD_initDStream_usingDict(zd, CNBuffer, dictSize); ZSTD_initDStream_usingDict(zd, CNBuffer, dictSize);
CHECK_Z( ZSTD_setDStreamParameter(zd, DStream_p_maxWindowSize, 1000) ); /* too small limit */ CHECK_Z( ZSTD_setDStreamParameter(zd, DStream_p_maxWindowSize, 1000) ); /* too small limit */
inBuff.src = compressedBuffer;
inBuff.size = cSize;
inBuff.pos = 0;
outBuff.dst = decodedBuffer; outBuff.dst = decodedBuffer;
outBuff.size = CNBufferSize; outBuff.size = CNBufferSize;
outBuff.pos = 0; outBuff.pos = 0;
inBuff.src = compressedBuffer;
inBuff.size = cSize;
inBuff.pos = 0;
{ size_t const r = ZSTD_decompressStream(zd, &outBuff, &inBuff); { size_t const r = ZSTD_decompressStream(zd, &outBuff, &inBuff);
if (!ZSTD_isError(r)) goto _output_error; /* must fail : frame requires > 100 bytes */ if (!ZSTD_isError(r)) goto _output_error; /* must fail : frame requires > 100 bytes */
DISPLAYLEVEL(3, "OK (%s)\n", ZSTD_getErrorName(r)); } DISPLAYLEVEL(3, "OK (%s)\n", ZSTD_getErrorName(r)); }
@ -642,12 +642,12 @@ static int basicUnitTests(U32 seed, double compressibility)
params.fParams.contentSizeFlag = 1; params.fParams.contentSizeFlag = 1;
CHECK_Z( ZSTD_initCStream_advanced(zc, dictionary.start, dictionary.filled, params, 0 /* pledgedSrcSize==0 means "empty" when params.fParams.contentSizeFlag is set */) ); CHECK_Z( ZSTD_initCStream_advanced(zc, dictionary.start, dictionary.filled, params, 0 /* pledgedSrcSize==0 means "empty" when params.fParams.contentSizeFlag is set */) );
} /* cstream advanced shall write content size = 0 */ } /* cstream advanced shall write content size = 0 */
inBuff.src = CNBuffer;
inBuff.size = 0;
inBuff.pos = 0;
outBuff.dst = compressedBuffer; outBuff.dst = compressedBuffer;
outBuff.size = compressedBufferSize; outBuff.size = compressedBufferSize;
outBuff.pos = 0; outBuff.pos = 0;
inBuff.src = CNBuffer;
inBuff.size = 0;
inBuff.pos = 0;
CHECK_Z( ZSTD_compressStream(zc, &outBuff, &inBuff) ); CHECK_Z( ZSTD_compressStream(zc, &outBuff, &inBuff) );
if (ZSTD_endStream(zc, &outBuff) != 0) goto _output_error; if (ZSTD_endStream(zc, &outBuff) != 0) goto _output_error;
cSize = outBuff.pos; cSize = outBuff.pos;
@ -671,12 +671,12 @@ static int basicUnitTests(U32 seed, double compressibility)
if (ZSTD_findDecompressedSize(compressedBuffer, cSize) != 0) goto _output_error; if (ZSTD_findDecompressedSize(compressedBuffer, cSize) != 0) goto _output_error;
ZSTD_resetCStream(zc, 0); /* resetCStream should treat 0 as unknown */ ZSTD_resetCStream(zc, 0); /* resetCStream should treat 0 as unknown */
inBuff.src = CNBuffer;
inBuff.size = 0;
inBuff.pos = 0;
outBuff.dst = compressedBuffer; outBuff.dst = compressedBuffer;
outBuff.size = compressedBufferSize; outBuff.size = compressedBufferSize;
outBuff.pos = 0; outBuff.pos = 0;
inBuff.src = CNBuffer;
inBuff.size = 0;
inBuff.pos = 0;
CHECK_Z( ZSTD_compressStream(zc, &outBuff, &inBuff) ); CHECK_Z( ZSTD_compressStream(zc, &outBuff, &inBuff) );
if (ZSTD_endStream(zc, &outBuff) != 0) goto _output_error; if (ZSTD_endStream(zc, &outBuff) != 0) goto _output_error;
cSize = outBuff.pos; cSize = outBuff.pos;
@ -688,7 +688,7 @@ static int basicUnitTests(U32 seed, double compressibility)
{ ZSTD_parameters const params = ZSTD_getParams(1, 0, 0); { ZSTD_parameters const params = ZSTD_getParams(1, 0, 0);
CHECK_Z( ZSTDMT_initCStream_advanced(mtctx, CNBuffer, dictSize, params, CNBufferSize) ); CHECK_Z( ZSTDMT_initCStream_advanced(mtctx, CNBuffer, dictSize, params, CNBufferSize) );
} }
outBuff.dst = (char*)(compressedBuffer); outBuff.dst = compressedBuffer;
outBuff.size = compressedBufferSize; outBuff.size = compressedBufferSize;
outBuff.pos = 0; outBuff.pos = 0;
inBuff.src = CNBuffer; inBuff.src = CNBuffer;
@ -700,6 +700,61 @@ static int basicUnitTests(U32 seed, double compressibility)
if (r != 0) goto _output_error; } /* error, or some data not flushed */ if (r != 0) goto _output_error; } /* error, or some data not flushed */
DISPLAYLEVEL(3, "OK \n"); DISPLAYLEVEL(3, "OK \n");
/* Complex multithreading + dictionary test */
{ U32 const nbThreads = 2;
size_t const jobSize = 4 * 1 MB;
size_t const srcSize = jobSize * nbThreads; /* we want each job to have predictable size */
size_t const segLength = 2 KB;
size_t const offset = 600 KB; /* must be larger than window defined in cdict */
size_t const start = jobSize + (offset-1);
const BYTE* const srcToCopy = (const BYTE*)CNBuffer + start;
BYTE* const dst = (BYTE*)CNBuffer + start - offset;
DISPLAYLEVEL(3, "test%3i : compress %u bytes with multiple threads + dictionary : ", testNb++, (U32)srcSize);
CHECK_Z( ZSTD_CCtx_setParameter(zc, ZSTD_p_compressionLevel, 3) );
CHECK_Z( ZSTD_CCtx_setParameter(zc, ZSTD_p_nbThreads, 2) );
CHECK_Z( ZSTD_CCtx_setParameter(zc, ZSTD_p_jobSize, jobSize) );
assert(start > offset);
assert(start + segLength < COMPRESSIBLE_NOISE_LENGTH);
memcpy(dst, srcToCopy, segLength); /* create a long repetition at long distance for job 2 */
outBuff.dst = compressedBuffer;
outBuff.size = compressedBufferSize;
outBuff.pos = 0;
inBuff.src = CNBuffer;
inBuff.size = srcSize; assert(srcSize < COMPRESSIBLE_NOISE_LENGTH);
inBuff.pos = 0;
}
{ ZSTD_compressionParameters const cParams = ZSTD_getCParams(1, 4 KB, dictionary.filled); /* intentionnally lies on estimatedSrcSize, to push cdict into targeting a small window size */
ZSTD_CDict* const cdict = ZSTD_createCDict_advanced(dictionary.start, dictionary.filled, ZSTD_dlm_byRef, ZSTD_dm_fullDict, cParams, ZSTD_defaultCMem);
DISPLAYLEVEL(5, "cParams.windowLog = %u : ", cParams.windowLog);
CHECK_Z( ZSTD_CCtx_refCDict(zc, cdict) );
CHECK_Z( ZSTD_compress_generic(zc, &outBuff, &inBuff, ZSTD_e_end) );
CHECK_Z( ZSTD_CCtx_refCDict(zc, NULL) ); /* do not keep a reference to cdict, as its lifetime ends */
ZSTD_freeCDict(cdict);
}
if (inBuff.pos != inBuff.size) goto _output_error; /* entire input should be consumed */
cSize = outBuff.pos;
DISPLAYLEVEL(3, "OK \n");
DISPLAYLEVEL(3, "test%3i : decompress large frame created from multiple threads + dictionary : ", testNb++);
{ ZSTD_DStream* const dstream = ZSTD_createDCtx();
ZSTD_frameHeader zfh;
ZSTD_getFrameHeader(&zfh, compressedBuffer, cSize);
DISPLAYLEVEL(5, "frame windowsize = %u : ", (U32)zfh.windowSize);
outBuff.dst = decodedBuffer;
outBuff.size = CNBufferSize;
outBuff.pos = 0;
inBuff.src = compressedBuffer;
inBuff.pos = 0;
CHECK_Z( ZSTD_initDStream_usingDict(dstream, dictionary.start, dictionary.filled) );
inBuff.size = 1; /* avoid shortcut to single-pass mode */
CHECK_Z( ZSTD_decompressStream(dstream, &outBuff, &inBuff) );
inBuff.size = cSize;
CHECK_Z( ZSTD_decompressStream(dstream, &outBuff, &inBuff) );
if (inBuff.pos != inBuff.size) goto _output_error; /* entire input should be consumed */
ZSTD_freeDStream(dstream);
}
DISPLAYLEVEL(3, "OK \n");
DISPLAYLEVEL(3, "test%3i : check dictionary FSE tables can represent every code : ", testNb++); DISPLAYLEVEL(3, "test%3i : check dictionary FSE tables can represent every code : ", testNb++);
{ unsigned const kMaxWindowLog = 24; { unsigned const kMaxWindowLog = 24;
unsigned value; unsigned value;
@ -1208,15 +1263,15 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
} }
{ U64 const pledgedSrcSize = (FUZ_rand(&lseed) & 3) ? ZSTD_CONTENTSIZE_UNKNOWN : maxTestSize; { U64 const pledgedSrcSize = (FUZ_rand(&lseed) & 3) ? ZSTD_CONTENTSIZE_UNKNOWN : maxTestSize;
ZSTD_parameters params = ZSTD_getParams(cLevel, pledgedSrcSize, dictSize); ZSTD_parameters params = ZSTD_getParams(cLevel, pledgedSrcSize, dictSize);
DISPLAYLEVEL(5, "Init with windowLog = %u and pledgedSrcSize = %u \n", DISPLAYLEVEL(5, "Init with windowLog = %u, pledgedSrcSize = %u, dictSize = %u \n",
params.cParams.windowLog, (U32)pledgedSrcSize); params.cParams.windowLog, (U32)pledgedSrcSize, (U32)dictSize);
params.fParams.checksumFlag = FUZ_rand(&lseed) & 1; params.fParams.checksumFlag = FUZ_rand(&lseed) & 1;
params.fParams.noDictIDFlag = FUZ_rand(&lseed) & 1; params.fParams.noDictIDFlag = FUZ_rand(&lseed) & 1;
params.fParams.contentSizeFlag = FUZ_rand(&lseed) & 1; params.fParams.contentSizeFlag = FUZ_rand(&lseed) & 1;
DISPLAYLEVEL(5, "checksumFlag : %u \n", params.fParams.checksumFlag); DISPLAYLEVEL(5, "checksumFlag : %u \n", params.fParams.checksumFlag);
CHECK_Z( ZSTDMT_initCStream_advanced(zc, dict, dictSize, params, pledgedSrcSize) );
CHECK_Z( ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_overlapSectionLog, FUZ_rand(&lseed) % 12) ); CHECK_Z( ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_overlapSectionLog, FUZ_rand(&lseed) % 12) );
CHECK_Z( ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_sectionSize, FUZ_rand(&lseed) % (2*maxTestSize+1)) ); CHECK_Z( ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_jobSize, FUZ_rand(&lseed) % (2*maxTestSize+1)) ); /* custome job size */
CHECK_Z( ZSTDMT_initCStream_advanced(zc, dict, dictSize, params, pledgedSrcSize) );
} } } }
/* multi-segments compression test */ /* multi-segments compression test */
@ -1233,9 +1288,9 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
ZSTD_inBuffer inBuff = { srcBuffer+srcStart, srcSize, 0 }; ZSTD_inBuffer inBuff = { srcBuffer+srcStart, srcSize, 0 };
outBuff.size = outBuff.pos + dstBuffSize; outBuff.size = outBuff.pos + dstBuffSize;
DISPLAYLEVEL(5, "Sending %u bytes to compress \n", (U32)srcSize); DISPLAYLEVEL(6, "Sending %u bytes to compress \n", (U32)srcSize);
CHECK_Z( ZSTDMT_compressStream(zc, &outBuff, &inBuff) ); CHECK_Z( ZSTDMT_compressStream(zc, &outBuff, &inBuff) );
DISPLAYLEVEL(5, "%u bytes read by ZSTDMT_compressStream \n", (U32)inBuff.pos); DISPLAYLEVEL(6, "%u bytes read by ZSTDMT_compressStream \n", (U32)inBuff.pos);
XXH64_update(&xxhState, srcBuffer+srcStart, inBuff.pos); XXH64_update(&xxhState, srcBuffer+srcStart, inBuff.pos);
memcpy(copyBuffer+totalTestSize, srcBuffer+srcStart, inBuff.pos); memcpy(copyBuffer+totalTestSize, srcBuffer+srcStart, inBuff.pos);
@ -1282,10 +1337,10 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
size_t const dstBuffSize = MIN(dstBufferSize - totalGenSize, randomDstSize); size_t const dstBuffSize = MIN(dstBufferSize - totalGenSize, randomDstSize);
inBuff.size = inBuff.pos + readCSrcSize; inBuff.size = inBuff.pos + readCSrcSize;
outBuff.size = outBuff.pos + dstBuffSize; outBuff.size = outBuff.pos + dstBuffSize;
DISPLAYLEVEL(5, "ZSTD_decompressStream input %u bytes \n", (U32)readCSrcSize); DISPLAYLEVEL(6, "ZSTD_decompressStream input %u bytes \n", (U32)readCSrcSize);
decompressionResult = ZSTD_decompressStream(zd, &outBuff, &inBuff); decompressionResult = ZSTD_decompressStream(zd, &outBuff, &inBuff);
CHECK (ZSTD_isError(decompressionResult), "decompression error : %s", ZSTD_getErrorName(decompressionResult)); CHECK (ZSTD_isError(decompressionResult), "decompression error : %s", ZSTD_getErrorName(decompressionResult));
DISPLAYLEVEL(5, "inBuff.pos = %u \n", (U32)readCSrcSize); DISPLAYLEVEL(6, "inBuff.pos = %u \n", (U32)readCSrcSize);
} }
CHECK (outBuff.pos != totalTestSize, "decompressed data : wrong size (%u != %u)", (U32)outBuff.pos, (U32)totalTestSize); CHECK (outBuff.pos != totalTestSize, "decompressed data : wrong size (%u != %u)", (U32)outBuff.pos, (U32)totalTestSize);
CHECK (inBuff.pos != cSize, "compressed data should be fully read (%u != %u)", (U32)inBuff.pos, (U32)cSize); CHECK (inBuff.pos != cSize, "compressed data should be fully read (%u != %u)", (U32)inBuff.pos, (U32)cSize);