From 15768cabb580fa8fab7a3838861eba307a655b68 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Thu, 16 Nov 2017 15:02:28 -0800 Subject: [PATCH] fixed some complex scenarios Fixed : multithreading to compress some small data with dictionary Fixed : ZSTD_initCStream_usingCDict() Improved streaming memory usage when pledgedSrcSize is known. --- lib/compress/zstd_compress.c | 70 +++++++++++++++++----------------- lib/compress/zstd_opt.c | 2 +- lib/compress/zstdmt_compress.h | 3 ++ tests/zstreamtest.c | 8 +++- 4 files changed, 44 insertions(+), 39 deletions(-) diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index 0bdcc26a..f0cebe7a 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -249,7 +249,6 @@ size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned v return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value); case ZSTD_p_compressionLevel: - if (value == 0) return 0; /* special value : 0 means "don't change anything" */ if (cctx->cdict) return ERROR(stage_wrong); return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value); @@ -260,7 +259,6 @@ size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned v case ZSTD_p_minMatch: case ZSTD_p_targetLength: case ZSTD_p_compressionStrategy: - if (value == 0) return 0; /* special value : 0 means "don't change anything" */ if (cctx->cdict) return ERROR(stage_wrong); ZSTD_cLevelToCParams(cctx); /* Can optimize if srcSize is known */ return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value); @@ -277,8 +275,6 @@ size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned v return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value); case ZSTD_p_nbThreads: - if (value==0) return 0; - DEBUGLOG(5, " setting nbThreads : %u", value); if (value > 1 && cctx->staticSize) { return ERROR(parameter_unsupported); /* MT not compatible with static alloc */ } @@ -288,22 +284,15 @@ size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned v return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value); case ZSTD_p_overlapSizeLog: - DEBUGLOG(5, " setting overlap with nbThreads == %u", cctx->requestedParams.nbThreads); return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value); case ZSTD_p_enableLongDistanceMatching: if (cctx->cdict) return ERROR(stage_wrong); - if (value != 0) { - ZSTD_cLevelToCParams(cctx); - } + if (value>0) ZSTD_cLevelToCParams(cctx); return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value); case ZSTD_p_ldmHashLog: case ZSTD_p_ldmMinMatch: - if (value == 0) return 0; /* special value : 0 means "don't change anything" */ - if (cctx->cdict) return ERROR(stage_wrong); - return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value); - case ZSTD_p_ldmBucketSizeLog: case ZSTD_p_ldmHashEveryLog: if (cctx->cdict) return ERROR(stage_wrong); @@ -779,24 +768,27 @@ static U32 ZSTD_equivalentLdmParams(ldmParams_t ldmParams1, typedef enum { ZSTDb_not_buffered, ZSTDb_buffered } ZSTD_buffered_policy_e; -/* ZSTD_equivalentBuffPolicy() : +/* ZSTD_sufficientBuff() : * check internal buffers exist for streaming if buffPol == ZSTDb_buffered . * Note : they are assumed to be correctly sized if ZSTD_equivalentCParams()==1 */ -static U32 ZSTD_equivalentBuffPolicy(size_t bufferSize, ZSTD_buffered_policy_e buffPol) +static U32 ZSTD_sufficientBuff(size_t bufferSize, ZSTD_buffered_policy_e buffPol, ZSTD_compressionParameters cParams2, U64 pledgedSrcSize) { - if ((buffPol == ZSTDb_buffered) & (!bufferSize)) return 0; - return 1; + size_t const windowSize = MAX(1, (size_t)MIN(((U64)1 << cParams2.windowLog), pledgedSrcSize)); + size_t const blockSize = MIN(ZSTD_BLOCKSIZE_MAX, windowSize); + size_t const neededBufferSize = (buffPol==ZSTDb_buffered) ? windowSize + blockSize : 0; + return (neededBufferSize <= bufferSize); } /** Equivalence for resetCCtx purposes */ static U32 ZSTD_equivalentParams(ZSTD_CCtx_params params1, ZSTD_CCtx_params params2, size_t buffSize1, - ZSTD_buffered_policy_e buffPol2) + ZSTD_buffered_policy_e buffPol2, + U64 pledgedSrcSize) { return ZSTD_equivalentCParams(params1.cParams, params2.cParams) && ZSTD_equivalentLdmParams(params1.ldmParams, params2.ldmParams) && - ZSTD_equivalentBuffPolicy(buffSize1, buffPol2); + ZSTD_sufficientBuff(buffSize1, buffPol2, params2.cParams, pledgedSrcSize); } /*! ZSTD_continueCCtx() : @@ -837,7 +829,7 @@ static size_t ZSTD_resetCCtx_internal(ZSTD_CCtx* zc, assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams))); if (crp == ZSTDcrp_continue) { - if (ZSTD_equivalentParams(params, zc->appliedParams, zc->inBuffSize, zbuff)) { + if (ZSTD_equivalentParams(params, zc->appliedParams, zc->inBuffSize, zbuff, pledgedSrcSize)) { DEBUGLOG(4, "ZSTD_equivalentParams()==1 -> continue mode"); assert(!(params.ldmParams.enableLdm && params.ldmParams.hashEveryLog == ZSTD_LDM_HASHEVERYLOG_NOTSET)); @@ -858,7 +850,8 @@ static size_t ZSTD_resetCCtx_internal(ZSTD_CCtx* zc, ZSTD_ldm_getHashPower(params.ldmParams.minMatchLength); } - { size_t const blockSize = MIN(ZSTD_BLOCKSIZE_MAX, (size_t)1 << params.cParams.windowLog); + { size_t const windowSize = MAX(1, (size_t)MIN(((U64)1 << params.cParams.windowLog), pledgedSrcSize)); + size_t const blockSize = MIN(ZSTD_BLOCKSIZE_MAX, windowSize); U32 const divider = (params.cParams.searchLength==3) ? 3 : 4; size_t const maxNbSeq = blockSize / divider; size_t const tokenSpace = blockSize + 11*maxNbSeq; @@ -870,7 +863,7 @@ static size_t ZSTD_resetCCtx_internal(ZSTD_CCtx* zc, size_t const h3Size = ((size_t)1) << hashLog3; size_t const tableSpace = (chainSize + hSize + h3Size) * sizeof(U32); size_t const buffOutSize = (zbuff==ZSTDb_buffered) ? ZSTD_compressBound(blockSize)+1 : 0; - size_t const buffInSize = (zbuff==ZSTDb_buffered) ? ((size_t)1 << params.cParams.windowLog) + blockSize : 0; + size_t const buffInSize = (zbuff==ZSTDb_buffered) ? windowSize + blockSize : 0; void* ptr; /* Check if workSpace is large enough, alloc a new one if needed */ @@ -886,10 +879,10 @@ static size_t ZSTD_resetCCtx_internal(ZSTD_CCtx* zc, : 0; size_t const neededSpace = entropySpace + optSpace + ldmSpace + tableSpace + tokenSpace + bufferSpace; - DEBUGLOG(4, "Need %uKB workspace, including %uKB for tables", - (U32)(neededSpace>>10), (U32)(tableSpace>>10)); - DEBUGLOG(4, "chainSize: %u - hSize: %u - h3Size: %u", - (U32)chainSize, (U32)hSize, (U32)h3Size); + DEBUGLOG(4, "Need %uKB workspace, including %uKB for tables, and %uKB for buffers", + (U32)(neededSpace>>10), (U32)(tableSpace>>10), (U32)(bufferSpace>>10)); + DEBUGLOG(4, "chainSize: %u - hSize: %u - h3Size: %u - windowSize: %u", + (U32)chainSize, (U32)hSize, (U32)h3Size, (U32)windowSize); if (zc->workSpaceSize < neededSpace) { /* too small : resize */ DEBUGLOG(4, "Need to update workSpaceSize from %uK to %uK", @@ -917,7 +910,7 @@ static size_t ZSTD_resetCCtx_internal(ZSTD_CCtx* zc, zc->consumedSrcSize = 0; if (pledgedSrcSize == ZSTD_CONTENTSIZE_UNKNOWN) zc->appliedParams.fParams.contentSizeFlag = 0; - DEBUGLOG(5, "pledged content size : %u ; flag : %u", + DEBUGLOG(4, "pledged content size : %u ; flag : %u", (U32)pledgedSrcSize, zc->appliedParams.fParams.contentSizeFlag); zc->blockSize = blockSize; @@ -1016,8 +1009,8 @@ void ZSTD_invalidateRepCodes(ZSTD_CCtx* cctx) { /*! ZSTD_copyCCtx_internal() : * Duplicate an existing context `srcCCtx` into another one `dstCCtx`. - * The "context", in this case, refers to the hash and chain tables, entropy - * tables, and dictionary offsets. + * The "context", in this case, refers to the hash and chain tables, + * entropy tables, and dictionary offsets. * Only works during stage ZSTDcs_init (i.e. after creation, but before first call to ZSTD_compressContinue()). * pledgedSrcSize=0 means "empty". * @return : 0, or an error code */ @@ -2079,6 +2072,7 @@ static size_t ZSTD_compressBegin_internal(ZSTD_CCtx* cctx, assert(!((dict) && (cdict))); /* either dict or cdict, not both */ if (cdict && cdict->dictContentSize>0) { + cctx->requestedParams = params; return ZSTD_copyCCtx_internal(cctx, cdict->refContext, params.fParams, pledgedSrcSize, zbuff); @@ -2525,9 +2519,9 @@ size_t ZSTD_CStreamOutSize(void) } static size_t ZSTD_resetCStream_internal(ZSTD_CStream* zcs, - const void* dict, size_t dictSize, ZSTD_dictMode_e dictMode, - const ZSTD_CDict* cdict, - const ZSTD_CCtx_params params, unsigned long long pledgedSrcSize) + const void* const dict, size_t const dictSize, ZSTD_dictMode_e const dictMode, + const ZSTD_CDict* const cdict, + ZSTD_CCtx_params const params, unsigned long long const pledgedSrcSize) { DEBUGLOG(4, "ZSTD_resetCStream_internal"); /* params are supposed to be fully validated at this point */ @@ -2606,8 +2600,9 @@ size_t ZSTD_initCStream_usingCDict_advanced(ZSTD_CStream* zcs, const ZSTD_CDict* cdict, ZSTD_frameParameters fParams, unsigned long long pledgedSrcSize) -{ /* cannot handle NULL cdict (does not know what to do) */ - if (!cdict) return ERROR(dictionary_wrong); +{ + DEBUGLOG(4, "ZSTD_initCStream_usingCDict_advanced"); + if (!cdict) return ERROR(dictionary_wrong); /* cannot handle NULL cdict (does not know what to do) */ { ZSTD_CCtx_params params = zcs->requestedParams; params.cParams = ZSTD_getCParamsFromCDict(cdict); params.fParams = fParams; @@ -2620,8 +2615,9 @@ size_t ZSTD_initCStream_usingCDict_advanced(ZSTD_CStream* zcs, /* note : cdict must outlive compression session */ size_t ZSTD_initCStream_usingCDict(ZSTD_CStream* zcs, const ZSTD_CDict* cdict) { - ZSTD_frameParameters const fParams = { 0 /* contentSize */, 0 /* checksum */, 0 /* hideDictID */ }; - return ZSTD_initCStream_usingCDict_advanced(zcs, cdict, fParams, 0); /* note : will check that cdict != NULL */ + ZSTD_frameParameters const fParams = { 0 /* contentSizeFlag */, 0 /* checksum */, 0 /* hideDictID */ }; + DEBUGLOG(4, "ZSTD_initCStream_usingCDict"); + return ZSTD_initCStream_usingCDict_advanced(zcs, cdict, fParams, ZSTD_CONTENTSIZE_UNKNOWN); /* note : will check that cdict != NULL */ } /* ZSTD_initCStream_advanced() : @@ -2853,7 +2849,8 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx, cctx->requestedParams, cctx->pledgedSrcSizePlusOne-1, 0 /*dictSize*/); #ifdef ZSTD_MULTITHREAD - if ((cctx->pledgedSrcSizePlusOne-1) <= ZSTDMT_JOBSIZE_MIN) params.nbThreads = 1; /* do not invoke multi-threading when src size is too small */ + if ((cctx->pledgedSrcSizePlusOne-1) <= ZSTDMT_JOBSIZE_MIN) + params.nbThreads = 1; /* do not invoke multi-threading when src size is too small */ if (params.nbThreads > 1) { if (cctx->mtctx == NULL || cctx->appliedParams.nbThreads != params.nbThreads) { ZSTDMT_freeCCtx(cctx->mtctx); @@ -2874,6 +2871,7 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx, prefixDict.dictMode, cctx->cdict, params, cctx->pledgedSrcSizePlusOne-1) ); assert(cctx->streamStage == zcss_load); + assert(cctx->appliedParams.nbThreads <= 1); } } /* compression stage */ diff --git a/lib/compress/zstd_opt.c b/lib/compress/zstd_opt.c index 52486bd3..aebe55f8 100644 --- a/lib/compress/zstd_opt.c +++ b/lib/compress/zstd_opt.c @@ -231,7 +231,7 @@ static U32 ZSTD_insertAndFindFirstIndexHash3 (ZSTD_CCtx* zc, const BYTE* ip) /*-************************************* * Binary Tree search ***************************************/ -//FORCE_INLINE_TEMPLATE +FORCE_INLINE_TEMPLATE U32 ZSTD_insertBtAndGetAllMatches ( ZSTD_CCtx* zc, const BYTE* const ip, const BYTE* const iLimit, const int extDict, diff --git a/lib/compress/zstdmt_compress.h b/lib/compress/zstdmt_compress.h index 598e6daf..269c54b1 100644 --- a/lib/compress/zstdmt_compress.h +++ b/lib/compress/zstdmt_compress.h @@ -112,6 +112,9 @@ ZSTDLIB_API size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, size_t ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params, ZSTDMT_parameter parameter, unsigned value); +/* ZSTDMT_CCtxParam_setNbThreads() + * Set nbThreads, and clamp it correctly, + * but also reset jobSize and overlapLog */ size_t ZSTDMT_CCtxParam_setNbThreads(ZSTD_CCtx_params* params, unsigned nbThreads); /*! ZSTDMT_initCStream_internal() : diff --git a/tests/zstreamtest.c b/tests/zstreamtest.c index 384858ed..4cdd6235 100644 --- a/tests/zstreamtest.c +++ b/tests/zstreamtest.c @@ -473,6 +473,7 @@ static int basicUnitTests(U32 seed, double compressibility, ZSTD_customMem custo DISPLAYLEVEL(3, "test%3i : digested dictionary : ", testNb++); { ZSTD_CDict* const cdict = ZSTD_createCDict(dictionary.start, dictionary.filled, 1 /*byRef*/ ); size_t const initError = ZSTD_initCStream_usingCDict(zc, cdict); + DISPLAYLEVEL(5, "ZSTD_initCStream_usingCDict result : %u ", (U32)initError); if (ZSTD_isError(initError)) goto _output_error; cSize = 0; outBuff.dst = compressedBuffer; @@ -481,10 +482,13 @@ static int basicUnitTests(U32 seed, double compressibility, ZSTD_customMem custo inBuff.src = CNBuffer; inBuff.size = CNBufferSize; inBuff.pos = 0; + DISPLAYLEVEL(5, "- starting ZSTD_compressStream "); CHECK_Z( ZSTD_compressStream(zc, &outBuff, &inBuff) ); if (inBuff.pos != inBuff.size) goto _output_error; /* entire input should be consumed */ - { size_t const r = ZSTD_endStream(zc, &outBuff); - if (r != 0) goto _output_error; } /* error, or some data not flushed */ + { size_t const r = ZSTD_endStream(zc, &outBuff); + DISPLAYLEVEL(5, "- ZSTD_endStream result : %u ", (U32)r); + if (r != 0) goto _output_error; /* error, or some data not flushed */ + } cSize = outBuff.pos; ZSTD_freeCDict(cdict); DISPLAYLEVEL(3, "OK (%u bytes : %.2f%%)\n", (U32)cSize, (double)cSize/CNBufferSize*100);