From 63b8c985317b38e69b69fa69f4727e8d55ce2cb4 Mon Sep 17 00:00:00 2001 From: Stella Lau Date: Fri, 18 Aug 2017 16:17:24 -0700 Subject: [PATCH] Pass cctx parameters to MTCtx --- lib/compress/zstd_compress.c | 108 ++++++++++++++++++++++++--------- lib/compress/zstdmt_compress.c | 107 +++++++++++++++++++++++--------- lib/compress/zstdmt_compress.h | 9 +++ 3 files changed, 165 insertions(+), 59 deletions(-) diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index 426636cf..b63519d9 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -213,7 +213,7 @@ static ZSTD_parameters ZSTD_getParamsFromCCtxParams(const ZSTD_CCtx_params cctxP } // TODO: get rid of this function too -static ZSTD_CCtx_params ZSTD_makeCCtxParamsFromParams(ZSTD_parameters params) { +ZSTD_CCtx_params ZSTD_makeCCtxParamsFromParams(ZSTD_parameters params) { ZSTD_CCtx_params cctxParams; memset(&cctxParams, 0, sizeof(ZSTD_CCtx_params)); cctxParams.cParams = params.cParams; @@ -3474,6 +3474,17 @@ static size_t ZSTD_compressBegin_internal(ZSTD_CCtx* cctx, return ZSTD_compress_insertDictionary(cctx, dict, dictSize, params.dictMode); } +size_t ZSTD_compressBegin_advanced_opaque(ZSTD_CCtx* cctx, + const void* dict, size_t dictSize, + ZSTD_CCtx_params params, + unsigned long long pledgedSrcSize) +{ + /* compression parameters verification and optimization */ + CHECK_F( ZSTD_checkCParams(params.cParams) ); + return ZSTD_compressBegin_internal(cctx, dict, dictSize, NULL, + params, pledgedSrcSize, + ZSTDb_not_buffered); +} /*! ZSTD_compressBegin_advanced() : * @return : 0, or an error code */ @@ -3481,15 +3492,13 @@ size_t ZSTD_compressBegin_advanced(ZSTD_CCtx* cctx, const void* dict, size_t dictSize, ZSTD_parameters params, unsigned long long pledgedSrcSize) { - ZSTD_CCtx_params cctxParams = cctx->requestedParams; cctxParams.cParams = params.cParams; cctxParams.fParams = params.fParams; cctxParams.dictMode = ZSTD_dm_auto; - /* compression parameters verification and optimization */ - CHECK_F(ZSTD_checkCParams(params.cParams)); - return ZSTD_compressBegin_internal(cctx, dict, dictSize, NULL, - cctxParams, pledgedSrcSize, ZSTDb_not_buffered); + + return ZSTD_compressBegin_advanced_opaque(cctx, dict, dictSize, cctxParams, + pledgedSrcSize); } @@ -3580,10 +3589,11 @@ static size_t ZSTD_compress_internal (ZSTD_CCtx* cctx, cctxParams.cParams = params.cParams; cctxParams.fParams = params.fParams; cctxParams.dictMode = ZSTD_dm_auto; - - CHECK_F( ZSTD_compressBegin_internal(cctx, dict, dictSize, NULL, - cctxParams, srcSize, ZSTDb_not_buffered) ); - return ZSTD_compressEnd(cctx, dst, dstCapacity, src, srcSize); + return ZSTD_compress_advanced_opaque(cctx, + dst, dstCapacity, + src, srcSize, + dict, dictSize, + cctxParams); } size_t ZSTD_compress_advanced (ZSTD_CCtx* ctx, @@ -3596,6 +3606,18 @@ size_t ZSTD_compress_advanced (ZSTD_CCtx* ctx, return ZSTD_compress_internal(ctx, dst, dstCapacity, src, srcSize, dict, dictSize, params); } +/* Internal */ +size_t ZSTD_compress_advanced_opaque(ZSTD_CCtx* cctx, + void* dst, size_t dstCapacity, + const void* src, size_t srcSize, + const void* dict,size_t dictSize, + ZSTD_CCtx_params params) +{ + CHECK_F( ZSTD_compressBegin_internal(cctx, dict, dictSize, NULL, + params, srcSize, ZSTDb_not_buffered) ); + return ZSTD_compressEnd(cctx, dst, dstCapacity, src, srcSize); +} + size_t ZSTD_compress_usingDict(ZSTD_CCtx* ctx, void* dst, size_t dstCapacity, const void* src, size_t srcSize, const void* dict, size_t dictSize, int compressionLevel) { @@ -3920,14 +3942,13 @@ size_t ZSTD_CStreamOutSize(void) return ZSTD_compressBound(ZSTD_BLOCKSIZE_MAX) + ZSTD_blockHeaderSize + 4 /* 32-bits hash */ ; } -static size_t ZSTD_resetCStream_internal(ZSTD_CStream* zcs, - const void* dict, size_t dictSize, ZSTD_dictMode_e dictMode, - const ZSTD_CDict* cdict, - ZSTD_parameters params, unsigned long long pledgedSrcSize) +static size_t ZSTD_resetCStream_internal_opaque( + ZSTD_CStream* zcs, + const void* dict, size_t dictSize, ZSTD_dictMode_e dictMode, + const ZSTD_CDict* cdict, + ZSTD_CCtx_params params, unsigned long long pledgedSrcSize) { - ZSTD_CCtx_params cctxParams = ZSTD_makeCCtxParamsFromParams(params); - cctxParams.compressionLevel = zcs->requestedParams.compressionLevel; - cctxParams.dictMode = dictMode; + params.dictMode = dictMode; DEBUGLOG(4, "ZSTD_resetCStream_internal"); /* params are supposed to be fully validated at this point */ assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams))); @@ -3936,7 +3957,7 @@ static size_t ZSTD_resetCStream_internal(ZSTD_CStream* zcs, CHECK_F( ZSTD_compressBegin_internal(zcs, dict, dictSize, cdict, - cctxParams, pledgedSrcSize, + params, pledgedSrcSize, ZSTDb_buffered) ); zcs->inToCompress = 0; @@ -3948,6 +3969,19 @@ static size_t ZSTD_resetCStream_internal(ZSTD_CStream* zcs, return 0; /* ready to go */ } +static size_t ZSTD_resetCStream_internal(ZSTD_CStream* zcs, + const void* dict, size_t dictSize, ZSTD_dictMode_e dictMode, + const ZSTD_CDict* cdict, + ZSTD_parameters params, unsigned long long pledgedSrcSize) +{ + ZSTD_CCtx_params cctxParams = zcs->requestedParams; + cctxParams.cParams = params.cParams; + cctxParams.fParams = params.fParams; + cctxParams.dictMode = dictMode; + return ZSTD_resetCStream_internal_opaque(zcs, dict, dictSize, dictMode, + cdict, cctxParams, pledgedSrcSize); +} + size_t ZSTD_resetCStream(ZSTD_CStream* zcs, unsigned long long pledgedSrcSize) { ZSTD_parameters params = ZSTD_getParamsFromCCtxParams(zcs->requestedParams); @@ -3959,13 +3993,11 @@ size_t ZSTD_resetCStream(ZSTD_CStream* zcs, unsigned long long pledgedSrcSize) return ZSTD_resetCStream_internal(zcs, NULL, 0, zcs->requestedParams.dictMode, zcs->cdict, params, pledgedSrcSize); } -/*! ZSTD_initCStream_internal() : - * Note : not static, but hidden (not exposed). Used by zstdmt_compress.c - * Assumption 1 : params are valid - * Assumption 2 : either dict, or cdict, is defined, not both */ -size_t ZSTD_initCStream_internal(ZSTD_CStream* zcs, - const void* dict, size_t dictSize, const ZSTD_CDict* cdict, - ZSTD_parameters params, unsigned long long pledgedSrcSize) +size_t ZSTD_initCStream_internal_opaque(ZSTD_CStream* zcs, + const void* dict, size_t dictSize, + const ZSTD_CDict* cdict, + ZSTD_CCtx_params params, + unsigned long long pledgedSrcSize) { DEBUGLOG(5, "ZSTD_initCStream_internal"); assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams))); @@ -3993,11 +4025,28 @@ size_t ZSTD_initCStream_internal(ZSTD_CStream* zcs, zcs->cdictLocal = NULL; zcs->cdict = cdict; } - zcs->requestedParams.cParams = params.cParams; - zcs->requestedParams.fParams = params.fParams; - zcs->requestedParams.compressionLevel = ZSTD_CLEVEL_CUSTOM; + zcs->requestedParams = params; - return ZSTD_resetCStream_internal(zcs, NULL, 0, zcs->requestedParams.dictMode, zcs->cdict, params, pledgedSrcSize); + return ZSTD_resetCStream_internal_opaque( + zcs, NULL, 0, zcs->requestedParams.dictMode, zcs->cdict, + params, pledgedSrcSize); +} + + +/*! ZSTD_initCStream_internal() : + * Note : not static, but hidden (not exposed). Used by zstdmt_compress.c + * Assumption 1 : params are valid + * Assumption 2 : either dict, or cdict, is defined, not both */ +size_t ZSTD_initCStream_internal(ZSTD_CStream* zcs, + const void* dict, size_t dictSize, const ZSTD_CDict* cdict, + ZSTD_parameters params, unsigned long long pledgedSrcSize) +{ + ZSTD_CCtx_params cctxParams = zcs->requestedParams; + cctxParams.cParams = params.cParams; + cctxParams.fParams = params.fParams; + cctxParams.compressionLevel = ZSTD_CLEVEL_CUSTOM; + return ZSTD_initCStream_internal_opaque(zcs, dict, dictSize, cdict, + cctxParams, pledgedSrcSize); } /* ZSTD_initCStream_usingCDict_advanced() : @@ -4227,7 +4276,6 @@ size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs, const void* dict, size_t dictSize, const ZSTD_CDict* cdict, ZSTD_parameters params, unsigned long long pledgedSrcSize); - size_t ZSTD_compress_generic (ZSTD_CCtx* cctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input, diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 234ced9d..2eb714e6 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -186,6 +186,14 @@ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf) ZSTD_free(buf.start, bufPool->cMem); } +static void ZSTDMT_zeroCCtxParams(ZSTD_CCtx_params* params) +{ + params->forceWindow = 0; + params->dictMode = (ZSTD_dictMode_e)(0); + params->nbThreads = 0; + params->jobSize = 0; + params->overlapSizeLog = 0; +} /* ===== CCtx Pool ===== */ /* a single CCtx Pool can be invoked from multiple threads in parallel */ @@ -292,7 +300,7 @@ typedef struct { unsigned jobScanned; pthread_mutex_t* jobCompleted_mutex; pthread_cond_t* jobCompleted_cond; - ZSTD_parameters params; + ZSTD_CCtx_params params; const ZSTD_CDict* cdict; ZSTDMT_CCtxPool* cctxPool; ZSTDMT_bufferPool* bufPool; @@ -330,7 +338,7 @@ void ZSTDMT_compressChunk(void* jobDescription) } else { /* srcStart points at reloaded section */ if (!job->firstChunk) job->params.fParams.contentSizeFlag = 0; /* ensure no srcSize control */ { size_t const dictModeError = ZSTD_setCCtxParameter(cctx, ZSTD_p_forceRawDict, 1); /* Force loading dictionary in "content-only" mode (no header analysis) */ - size_t const initError = ZSTD_compressBegin_advanced(cctx, job->srcStart, job->dictSize, job->params, job->fullFrameSize); + size_t const initError = ZSTD_compressBegin_advanced_opaque(cctx, job->srcStart, job->dictSize, job->params, job->fullFrameSize); if (ZSTD_isError(initError) || ZSTD_isError(dictModeError)) { job->cSize = initError; goto _endJob; } ZSTD_setCCtxParameter(cctx, ZSTD_p_forceWindow, 1); } } @@ -382,7 +390,7 @@ struct ZSTDMT_CCtx_s { size_t dictSize; size_t targetDictSize; inBuff_t inBuff; - ZSTD_parameters params; + ZSTD_CCtx_params params; XXH64_state_t xxhState; unsigned nbThreads; unsigned jobIDMask; @@ -528,17 +536,17 @@ static unsigned computeNbChunks(size_t srcSize, unsigned windowLog, unsigned nbT return (multiplier>1) ? nbChunksLarge : nbChunksSmall; } - -size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx, - void* dst, size_t dstCapacity, - const void* src, size_t srcSize, - const ZSTD_CDict* cdict, - ZSTD_parameters const params, - unsigned overlapLog) +static size_t ZSTDMT_compress_advanced_opaque( + ZSTDMT_CCtx* mtctx, + void* dst, size_t dstCapacity, + const void* src, size_t srcSize, + const ZSTD_CDict* cdict, + ZSTD_CCtx_params const cctxParams, + unsigned overlapLog) { unsigned const overlapRLog = (overlapLog>9) ? 0 : 9-overlapLog; - size_t const overlapSize = (overlapRLog>=9) ? 0 : (size_t)1 << (params.cParams.windowLog - overlapRLog); - unsigned nbChunks = computeNbChunks(srcSize, params.cParams.windowLog, mtctx->nbThreads); + size_t const overlapSize = (overlapRLog>=9) ? 0 : (size_t)1 << (cctxParams.cParams.windowLog - overlapRLog); + unsigned nbChunks = computeNbChunks(srcSize, cctxParams.cParams.windowLog, mtctx->nbThreads); size_t const proposedChunkSize = (srcSize + (nbChunks-1)) / nbChunks; size_t const avgChunkSize = ((proposedChunkSize & 0x1FFFF) < 0x7FFF) ? proposedChunkSize + 0xFFFF : proposedChunkSize; /* avoid too small last block */ const char* const srcStart = (const char*)src; @@ -546,12 +554,15 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx, unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbChunks : (unsigned)(dstCapacity / ZSTD_compressBound(avgChunkSize)); /* presumes avgChunkSize >= 256 KB, which should be the case */ size_t frameStartPos = 0, dstBufferPos = 0; XXH64_state_t xxh64; + ZSTD_CCtx_params requestedParams = cctxParams; + ZSTDMT_zeroCCtxParams(&requestedParams); DEBUGLOG(4, "nbChunks : %2u (chunkSize : %u bytes) ", nbChunks, (U32)avgChunkSize); if (nbChunks==1) { /* fallback to single-thread mode */ ZSTD_CCtx* const cctx = mtctx->cctxPool->cctx[0]; - if (cdict) return ZSTD_compress_usingCDict_advanced(cctx, dst, dstCapacity, src, srcSize, cdict, params.fParams); - return ZSTD_compress_advanced(cctx, dst, dstCapacity, src, srcSize, NULL, 0, params); + + if (cdict) return ZSTD_compress_usingCDict_advanced(cctx, dst, dstCapacity, src, srcSize, cdict, cctxParams.fParams); + return ZSTD_compress_advanced_opaque(cctx, dst, dstCapacity, src, srcSize, NULL, 0, requestedParams); } assert(avgChunkSize >= 256 KB); /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), which is required for compressWithinDst */ ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgChunkSize) ); @@ -580,7 +591,7 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx, mtctx->jobs[u].srcSize = chunkSize; mtctx->jobs[u].cdict = mtctx->nextJobID==0 ? cdict : NULL; mtctx->jobs[u].fullFrameSize = srcSize; - mtctx->jobs[u].params = params; + mtctx->jobs[u].params = requestedParams; /* do not calculate checksum within sections, but write it in header for first section */ if (u!=0) mtctx->jobs[u].params.fParams.checksumFlag = 0; mtctx->jobs[u].dstBuff = dstBuffer; @@ -592,7 +603,7 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx, mtctx->jobs[u].jobCompleted_mutex = &mtctx->jobCompleted_mutex; mtctx->jobs[u].jobCompleted_cond = &mtctx->jobCompleted_cond; - if (params.fParams.checksumFlag) { + if (cctxParams.fParams.checksumFlag) { XXH64_update(&xxh64, srcStart + frameStartPos, chunkSize); } @@ -636,7 +647,7 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx, } /* for (chunkID=0; chunkID dstCapacity) { error = ERROR(dstSize_tooSmall); @@ -649,6 +660,23 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx, if (!error) DEBUGLOG(4, "compressed size : %u ", (U32)dstPos); return error ? error : dstPos; } + +} + +size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx, + void* dst, size_t dstCapacity, + const void* src, size_t srcSize, + const ZSTD_CDict* cdict, + ZSTD_parameters const params, + unsigned overlapLog) +{ + ZSTD_CCtx_params cctxParams = mtctx->params; + cctxParams.cParams = params.cParams; + cctxParams.fParams = params.fParams; + return ZSTDMT_compress_advanced_opaque(mtctx, + dst, dstCapacity, + src, srcSize, + cdict, cctxParams, overlapLog); } @@ -683,23 +711,28 @@ static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* zcs) } } - -/** ZSTDMT_initCStream_internal() : - * internal usage only */ -size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs, - const void* dict, size_t dictSize, const ZSTD_CDict* cdict, - ZSTD_parameters params, unsigned long long pledgedSrcSize) +size_t ZSTDMT_initCStream_internal_opaque( + ZSTDMT_CCtx* zcs, const void* dict, size_t dictSize, + const ZSTD_CDict* cdict, ZSTD_CCtx_params cctxParams, + unsigned long long pledgedSrcSize) { + ZSTD_parameters params; + params.cParams = cctxParams.cParams; + params.fParams = cctxParams.fParams; + DEBUGLOG(4, "ZSTDMT_initCStream_internal"); /* params are supposed to be fully validated at this point */ assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams))); assert(!((dict) && (cdict))); /* either dict or cdict, not both */ + /* TODO: Set stuff to 0 to preserve old semantics. */ + ZSTDMT_zeroCCtxParams(&cctxParams); + if (zcs->nbThreads==1) { DEBUGLOG(4, "single thread mode"); - return ZSTD_initCStream_internal(zcs->cctxPool->cctx[0], - dict, dictSize, cdict, - params, pledgedSrcSize); + return ZSTD_initCStream_internal_opaque(zcs->cctxPool->cctx[0], + dict, dictSize, cdict, + cctxParams, pledgedSrcSize); } if (zcs->allJobsCompleted == 0) { /* previous compression not correctly finished */ @@ -708,7 +741,7 @@ size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs, zcs->allJobsCompleted = 1; } - zcs->params = params; + zcs->params = cctxParams; zcs->frameContentSize = pledgedSrcSize; if (dict) { DEBUGLOG(4,"cdictLocal: %08X", (U32)(size_t)zcs->cdictLocal); @@ -742,6 +775,21 @@ size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs, zcs->allJobsCompleted = 0; if (params.fParams.checksumFlag) XXH64_reset(&zcs->xxhState, 0); return 0; + +} + + +/** ZSTDMT_initCStream_internal() : + * internal usage only */ +size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs, + const void* dict, size_t dictSize, const ZSTD_CDict* cdict, + ZSTD_parameters params, unsigned long long pledgedSrcSize) +{ + ZSTD_CCtx_params cctxParams = zcs->params; + cctxParams.cParams = params.cParams; + cctxParams.fParams = params.fParams; + return ZSTDMT_initCStream_internal_opaque(zcs, dict, dictSize, cdict, + cctxParams, pledgedSrcSize); } size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* mtctx, @@ -772,7 +820,8 @@ size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize) { if (zcs->nbThreads==1) return ZSTD_resetCStream(zcs->cctxPool->cctx[0], pledgedSrcSize); - return ZSTDMT_initCStream_internal(zcs, NULL, 0, 0, zcs->params, pledgedSrcSize); + return ZSTDMT_initCStream_internal_opaque(zcs, NULL, 0, 0, zcs->params, + pledgedSrcSize); } size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) { @@ -930,7 +979,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, && (mtctx->inBuff.filled==0) /* nothing buffered */ && (endOp==ZSTD_e_end) /* end order */ && (output->size - output->pos >= ZSTD_compressBound(input->size - input->pos)) ) { /* enough room */ - size_t const cSize = ZSTDMT_compress_advanced(mtctx, + size_t const cSize = ZSTDMT_compress_advanced_opaque(mtctx, (char*)output->dst + output->pos, output->size - output->pos, (const char*)input->src + input->pos, input->size - input->pos, mtctx->cdict, mtctx->params, mtctx->overlapLog); diff --git a/lib/compress/zstdmt_compress.h b/lib/compress/zstdmt_compress.h index 843a240a..0b478b73 100644 --- a/lib/compress/zstdmt_compress.h +++ b/lib/compress/zstdmt_compress.h @@ -69,6 +69,15 @@ ZSTDLIB_API size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx, const ZSTD_CDict* cdict, ZSTD_parameters const params, unsigned overlapLog); +#if 0 +ZSTDLIB_API size_t ZSTDMT_compress_advanced_opaque( + ZSTDMT_CCtx* mtctx, + void* dst, size_t dstCapacity, + const void* src, size_t srcSize, + const ZSTD_CDict* cdict, + ZSTD_CCtx_params* const params, + unsigned overlapLog); +#endif ZSTDLIB_API size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* mtctx, const void* dict, size_t dictSize, /* dict can be released after init, a local copy is preserved within zcs */