diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index 5c7a724c..c8703719 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -307,6 +307,10 @@ static ZSTD_CCtx_params ZSTD_assignParamsToCCtxParams( return ERROR(parameter_outOfBound); \ } } +size_t ZSTDMT_CCtxParam_setMTCtxParameter( + ZSTD_CCtx_params* params, ZSDTMT_parameter parameter, unsigned value); +size_t ZSTDMT_initializeCCtxParameters(ZSTD_CCtx_params* params, unsigned nbThreads); + size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned value) { if (cctx->streamStage != zcss_init) return ERROR(stage_wrong); @@ -359,19 +363,20 @@ size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned v cctx->mtctx = ZSTDMT_createCCtx_advanced(value, cctx->customMem); if (cctx->mtctx == NULL) return ERROR(memory_allocation); } - cctx->requestedParams.nbThreads = value; - return 0; + + /* Need to initialize overlapSizeLog */ + return ZSTDMT_initializeCCtxParameters(&cctx->requestedParams, value); case ZSTD_p_jobSize: if (cctx->requestedParams.nbThreads <= 1) return ERROR(parameter_unsupported); assert(cctx->mtctx != NULL); - return ZSTDMT_setMTCtxParameter(cctx->mtctx, ZSTDMT_p_sectionSize, value); + return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value); case ZSTD_p_overlapSizeLog: DEBUGLOG(5, " setting overlap with nbThreads == %u", cctx->requestedParams.nbThreads); if (cctx->requestedParams.nbThreads <= 1) return ERROR(parameter_unsupported); assert(cctx->mtctx != NULL); - return ZSTDMT_setMTCtxParameter(cctx->mtctx, ZSTDMT_p_overlapSectionLog, value); + return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value); default: return ERROR(parameter_unsupported); } @@ -471,19 +476,15 @@ size_t ZSTD_CCtxParam_setParameter( #ifndef ZSTD_MULTITHREAD if (value > 1) return ERROR(parameter_unsupported); #endif - /* Do checks when applying params to cctx */ - params->nbThreads = value; - return 0; + return ZSTDMT_initializeCCtxParameters(params, value); case ZSTD_p_jobSize : if (params->nbThreads <= 1) return ERROR(parameter_unsupported); - params->jobSize = value; - return 0; + return ZSTDMT_CCtxParam_setMTCtxParameter(params, ZSTDMT_p_sectionSize, value); case ZSTD_p_overlapSizeLog : if (params->nbThreads <= 1) return ERROR(parameter_unsupported); - params->overlapSizeLog = value; - return 0; + return ZSTDMT_CCtxParam_setMTCtxParameter(params, ZSTDMT_p_overlapSectionLog, value); default: return ERROR(parameter_unsupported); } diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index cf21177e..7c3bd019 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -404,15 +404,12 @@ struct ZSTDMT_CCtx_s { inBuff_t inBuff; ZSTD_CCtx_params params; XXH64_state_t xxhState; - unsigned nbThreads; unsigned jobIDMask; unsigned doneJobID; unsigned nextJobID; unsigned frameEnded; unsigned allJobsCompleted; - unsigned overlapLog; unsigned long long frameContentSize; - size_t sectionSize; ZSTD_customMem cMem; ZSTD_CDict* cdictLocal; const ZSTD_CDict* cdict; @@ -427,6 +424,15 @@ static ZSTDMT_jobDescription* ZSTDMT_allocJobsTable(U32* nbJobsPtr, ZSTD_customM nbJobs * sizeof(ZSTDMT_jobDescription), cMem); } +/* Internal only */ +size_t ZSTDMT_initializeCCtxParameters(ZSTD_CCtx_params* params, unsigned nbThreads) +{ + params->nbThreads = nbThreads; + params->overlapSizeLog = ZSTDMT_OVERLAPLOG_DEFAULT; + params->jobSize = 0; + return 0; +} + ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem) { ZSTDMT_CCtx* mtctx; @@ -441,11 +447,9 @@ ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem) mtctx = (ZSTDMT_CCtx*) ZSTD_calloc(sizeof(ZSTDMT_CCtx), cMem); if (!mtctx) return NULL; + ZSTDMT_initializeCCtxParameters(&mtctx->params, nbThreads); mtctx->cMem = cMem; - mtctx->nbThreads = nbThreads; mtctx->allJobsCompleted = 1; - mtctx->sectionSize = 0; - mtctx->overlapLog = ZSTDMT_OVERLAPLOG_DEFAULT; mtctx->factory = POOL_create(nbThreads, 1); mtctx->jobs = ZSTDMT_allocJobsTable(&nbJobs, cMem); mtctx->jobIDMask = nbJobs - 1; @@ -516,22 +520,35 @@ size_t ZSTDMT_sizeof_CCtx(ZSTDMT_CCtx* mtctx) + ZSTD_sizeof_CDict(mtctx->cdictLocal); } -size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSDTMT_parameter parameter, unsigned value) -{ +/* Internal only */ +size_t ZSTDMT_CCtxParam_setMTCtxParameter( + ZSTD_CCtx_params* params, ZSDTMT_parameter parameter, unsigned value) { switch(parameter) { case ZSTDMT_p_sectionSize : - mtctx->sectionSize = value; + params->jobSize = value; return 0; case ZSTDMT_p_overlapSectionLog : DEBUGLOG(5, "ZSTDMT_p_overlapSectionLog : %u", value); - mtctx->overlapLog = (value >= 9) ? 9 : value; + params->overlapSizeLog = (value >= 9) ? 9 : value; return 0; default : return ERROR(parameter_unsupported); } } +size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSDTMT_parameter parameter, unsigned value) +{ + switch(parameter) + { + case ZSTDMT_p_sectionSize : + return ZSTDMT_CCtxParam_setMTCtxParameter(&mtctx->params, parameter, value); + case ZSTDMT_p_overlapSectionLog : + return ZSTDMT_CCtxParam_setMTCtxParameter(&mtctx->params, parameter, value); + default : + return ERROR(parameter_unsupported); + } +} /* ------------------------------------------ */ /* ===== Multi-threaded compression ===== */ @@ -553,13 +570,12 @@ static size_t ZSTDMT_compress_advanced_internal( void* dst, size_t dstCapacity, const void* src, size_t srcSize, const ZSTD_CDict* cdict, - ZSTD_CCtx_params const cctxParams, - unsigned overlapLog) + ZSTD_CCtx_params const params) { - ZSTD_CCtx_params const jobParams = ZSTDMT_makeJobCCtxParams(cctxParams); - unsigned const overlapRLog = (overlapLog>9) ? 0 : 9-overlapLog; - size_t const overlapSize = (overlapRLog>=9) ? 0 : (size_t)1 << (cctxParams.cParams.windowLog - overlapRLog); - unsigned nbChunks = computeNbChunks(srcSize, cctxParams.cParams.windowLog, mtctx->nbThreads); + ZSTD_CCtx_params const jobParams = ZSTDMT_makeJobCCtxParams(params); + unsigned const overlapRLog = (params.overlapSizeLog>9) ? 0 : 9-params.overlapSizeLog; + size_t const overlapSize = (overlapRLog>=9) ? 0 : (size_t)1 << (params.cParams.windowLog - overlapRLog); + unsigned nbChunks = computeNbChunks(srcSize, params.cParams.windowLog, params.nbThreads); size_t const proposedChunkSize = (srcSize + (nbChunks-1)) / nbChunks; size_t const avgChunkSize = ((proposedChunkSize & 0x1FFFF) < 0x7FFF) ? proposedChunkSize + 0xFFFF : proposedChunkSize; /* avoid too small last block */ const char* const srcStart = (const char*)src; @@ -567,6 +583,8 @@ static size_t ZSTDMT_compress_advanced_internal( unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbChunks : (unsigned)(dstCapacity / ZSTD_compressBound(avgChunkSize)); /* presumes avgChunkSize >= 256 KB, which should be the case */ size_t frameStartPos = 0, dstBufferPos = 0; XXH64_state_t xxh64; + assert(jobParams.nbThreads == 0); + assert(mtctx->cctxPool.totalCCtx == params.nbThreads); DEBUGLOG(4, "nbChunks : %2u (chunkSize : %u bytes) ", nbChunks, (U32)avgChunkSize); if (nbChunks==1) { /* fallback to single-thread mode */ @@ -613,7 +631,7 @@ static size_t ZSTDMT_compress_advanced_internal( mtctx->jobs[u].jobCompleted_mutex = &mtctx->jobCompleted_mutex; mtctx->jobs[u].jobCompleted_cond = &mtctx->jobCompleted_cond; - if (cctxParams.fParams.checksumFlag) { + if (params.fParams.checksumFlag) { XXH64_update(&xxh64, srcStart + frameStartPos, chunkSize); } @@ -656,8 +674,8 @@ static size_t ZSTDMT_compress_advanced_internal( } } /* for (chunkID=0; chunkID dstCapacity) { error = ERROR(dstSize_tooSmall); @@ -682,10 +700,11 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx, ZSTD_CCtx_params cctxParams = mtctx->params; cctxParams.cParams = params.cParams; cctxParams.fParams = params.fParams; + cctxParams.overlapSizeLog = overlapLog; return ZSTDMT_compress_advanced_internal(mtctx, dst, dstCapacity, src, srcSize, - cdict, cctxParams, overlapLog); + cdict, cctxParams); } @@ -722,20 +741,22 @@ static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* zcs) size_t ZSTDMT_initCStream_internal( ZSTDMT_CCtx* zcs, const void* dict, size_t dictSize, - const ZSTD_CDict* cdict, ZSTD_CCtx_params cctxParams, + const ZSTD_CDict* cdict, ZSTD_CCtx_params params, unsigned long long pledgedSrcSize) { DEBUGLOG(4, "ZSTDMT_initCStream_internal"); /* params are supposed to be fully validated at this point */ - assert(!ZSTD_isError(ZSTD_checkCParams(cctxParams.cParams))); + assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams))); assert(!((dict) && (cdict))); /* either dict or cdict, not both */ + assert(mtctx->cctxPool.totalCCtx == params.nbThreads); - if (zcs->nbThreads==1) { - ZSTD_CCtx_params const jobParams = ZSTDMT_makeJobCCtxParams(cctxParams); + if (params.nbThreads==1) { + ZSTD_CCtx_params const singleThreadParams = ZSTDMT_makeJobCCtxParams(params); DEBUGLOG(4, "single thread mode"); + assert(singleThreadParams.nbThreads == 0); return ZSTD_initCStream_internal(zcs->cctxPool->cctx[0], dict, dictSize, cdict, - jobParams, pledgedSrcSize); + singleThreadParams, pledgedSrcSize); } if (zcs->allJobsCompleted == 0) { /* previous compression not correctly finished */ @@ -744,14 +765,14 @@ size_t ZSTDMT_initCStream_internal( zcs->allJobsCompleted = 1; } - zcs->params = cctxParams; + zcs->params = params; zcs->frameContentSize = pledgedSrcSize; if (dict) { DEBUGLOG(4,"cdictLocal: %08X", (U32)(size_t)zcs->cdictLocal); ZSTD_freeCDict(zcs->cdictLocal); zcs->cdictLocal = ZSTD_createCDict_advanced(dict, dictSize, 0 /* byRef */, ZSTD_dm_auto, /* note : a loadPrefix becomes an internal CDict */ - cctxParams.cParams, zcs->cMem); + params.cParams, zcs->cMem); zcs->cdict = zcs->cdictLocal; if (zcs->cdictLocal == NULL) return ERROR(memory_allocation); } else { @@ -761,10 +782,10 @@ size_t ZSTDMT_initCStream_internal( zcs->cdict = cdict; } - zcs->targetDictSize = (zcs->overlapLog==0) ? 0 : (size_t)1 << (zcs->params.cParams.windowLog - (9 - zcs->overlapLog)); - DEBUGLOG(4, "overlapLog : %u ", zcs->overlapLog); + zcs->targetDictSize = (params.overlapSizeLog==0) ? 0 : (size_t)1 << (params.cParams.windowLog - (9 - params.overlapSizeLog)); + DEBUGLOG(4, "overlapLog : %u ", params.overlapSizeLog); DEBUGLOG(4, "overlap Size : %u KB", (U32)(zcs->targetDictSize>>10)); - zcs->targetSectionSize = zcs->sectionSize ? zcs->sectionSize : (size_t)1 << (zcs->params.cParams.windowLog + 2); + zcs->targetSectionSize = params.jobSize ? params.jobSize : (size_t)1 << (params.cParams.windowLog + 2); zcs->targetSectionSize = MAX(ZSTDMT_SECTION_SIZE_MIN, zcs->targetSectionSize); zcs->targetSectionSize = MAX(zcs->targetDictSize, zcs->targetSectionSize); DEBUGLOG(4, "Section Size : %u KB", (U32)(zcs->targetSectionSize>>10)); @@ -776,7 +797,7 @@ size_t ZSTDMT_initCStream_internal( zcs->nextJobID = 0; zcs->frameEnded = 0; zcs->allJobsCompleted = 0; - if (cctxParams.fParams.checksumFlag) XXH64_reset(&zcs->xxhState, 0); + if (params.fParams.checksumFlag) XXH64_reset(&zcs->xxhState, 0); return 0; } @@ -798,11 +819,12 @@ size_t ZSTDMT_initCStream_usingCDict(ZSTDMT_CCtx* mtctx, ZSTD_frameParameters fParams, unsigned long long pledgedSrcSize) { - ZSTD_CCtx_params params = ZSTD_getCCtxParamsFromCDict(cdict); + ZSTD_CCtx_params cctxParams = mtctx->params; + cctxParams.cParams = ZSTD_getCCtxParamsFromCDict(cdict).cParams; + cctxParams.fParams = fParams; if (cdict==NULL) return ERROR(dictionary_wrong); /* method incompatible with NULL cdict */ - params.fParams = fParams; return ZSTDMT_initCStream_internal(mtctx, NULL, 0 /*dictSize*/, cdict, - params, pledgedSrcSize); + cctxParams, pledgedSrcSize); } @@ -810,7 +832,7 @@ size_t ZSTDMT_initCStream_usingCDict(ZSTDMT_CCtx* mtctx, * pledgedSrcSize is optional and can be zero == unknown */ size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize) { - if (zcs->nbThreads==1) + if (zcs->params.nbThreads==1) return ZSTD_resetCStream(zcs->cctxPool->cctx[0], pledgedSrcSize); return ZSTDMT_initCStream_internal(zcs, NULL, 0, 0, zcs->params, pledgedSrcSize); @@ -965,7 +987,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, /* current frame being ended. Only flush/end are allowed. Or start new frame with init */ return ERROR(stage_wrong); } - if (mtctx->nbThreads==1) { /* delegate to single-thread (synchronous) */ + if (mtctx->params.nbThreads==1) { /* delegate to single-thread (synchronous) */ return ZSTD_compressStream_generic(mtctx->cctxPool->cctx[0], output, input, endOp); } @@ -977,7 +999,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, size_t const cSize = ZSTDMT_compress_advanced_internal(mtctx, (char*)output->dst + output->pos, output->size - output->pos, (const char*)input->src + input->pos, input->size - input->pos, - mtctx->cdict, mtctx->params, mtctx->overlapLog); + mtctx->cdict, mtctx->params); if (ZSTD_isError(cSize)) return cSize; input->pos = input->size; output->pos += cSize; @@ -1052,7 +1074,7 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output) { DEBUGLOG(5, "ZSTDMT_flushStream"); - if (zcs->nbThreads==1) + if (zcs->params.nbThreads==1) return ZSTD_flushStream(zcs->cctxPool->cctx[0], output); return ZSTDMT_flushStream_internal(zcs, output, 0 /* endFrame */); } @@ -1060,7 +1082,7 @@ size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output) size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output) { DEBUGLOG(4, "ZSTDMT_endStream"); - if (zcs->nbThreads==1) + if (zcs->params.nbThreads==1) return ZSTD_endStream(zcs->cctxPool->cctx[0], output); return ZSTDMT_flushStream_internal(zcs, output, 1 /* endFrame */); } diff --git a/tests/Makefile b/tests/Makefile index 006059b7..f6389bd9 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -167,7 +167,7 @@ datagen : $(PRGDIR)/datagen.c datagencli.c $(CC) $(FLAGS) $^ -o $@$(EXT) roundTripCrash : $(ZSTD_FILES) roundTripCrash.c - $(CC) $(FLAGS) $^ -o $@$(EXT) + $(CC) $(FLAGS) $(MULTITHREAD) $^ -o $@$(EXT) longmatch : $(ZSTD_FILES) longmatch.c $(CC) $(FLAGS) $^ -o $@$(EXT) diff --git a/tests/roundTripCrash.c b/tests/roundTripCrash.c index fb14fa87..cb0221c5 100644 --- a/tests/roundTripCrash.c +++ b/tests/roundTripCrash.c @@ -93,6 +93,9 @@ static size_t cctxParamRoundTripTest(void* resultBuff, size_t resultBuffCapacity /* Set parameters */ CHECK_Z( ZSTD_CCtxParam_setParameter(cctxParams, ZSTD_p_compressionLevel, cLevel) ); + CHECK_Z( ZSTD_CCtxParam_setParameter(cctxParams, ZSTD_p_nbThreads, 2) ); + CHECK_Z( ZSTD_CCtxParam_setParameter(cctxParams, ZSTD_p_overlapSizeLog, 5) ); + /* Apply parameters */ CHECK_Z( ZSTD_CCtx_applyCCtxParams(cctx, cctxParams) );