diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index d2703f23..fbae3c3f 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -281,9 +281,8 @@ size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned v } return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value); + case ZSTD_p_nonBlockingMode: case ZSTD_p_jobSize: - return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value); - case ZSTD_p_overlapSizeLog: return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value); @@ -407,11 +406,18 @@ size_t ZSTD_CCtxParam_setParameter( return ZSTDMT_CCtxParam_setNbThreads(CCtxParams, value); #endif + case ZSTD_p_nonBlockingMode : +#ifndef ZSTD_MULTITHREAD + return ERROR(parameter_unsupported); +#else + CCtxParams->nonBlockingMode = (value>0); + return CCtxParams->nonBlockingMode; +#endif + case ZSTD_p_jobSize : #ifndef ZSTD_MULTITHREAD return ERROR(parameter_unsupported); #else - if (CCtxParams->nbThreads <= 1) return ERROR(parameter_unsupported); return ZSTDMT_CCtxParam_setMTCtxParameter(CCtxParams, ZSTDMT_p_jobSize, value); #endif @@ -419,7 +425,6 @@ size_t ZSTD_CCtxParam_setParameter( #ifndef ZSTD_MULTITHREAD return ERROR(parameter_unsupported); #else - if (CCtxParams->nbThreads <= 1) return ERROR(parameter_unsupported); return ZSTDMT_CCtxParam_setMTCtxParameter(CCtxParams, ZSTDMT_p_overlapSectionLog, value); #endif @@ -3007,12 +3012,17 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx, cctx->requestedParams, cctx->pledgedSrcSizePlusOne-1, 0 /*dictSize*/); #ifdef ZSTD_MULTITHREAD - if ((cctx->pledgedSrcSizePlusOne-1) <= ZSTDMT_JOBSIZE_MIN) + if ((cctx->pledgedSrcSizePlusOne-1) <= ZSTDMT_JOBSIZE_MIN) { params.nbThreads = 1; /* do not invoke multi-threading when src size is too small */ - if (params.nbThreads > 1) { + params.nonBlockingMode = 0; + } + if ((params.nbThreads > 1) | (params.nonBlockingMode == 1)) { if (cctx->mtctx == NULL || (params.nbThreads != ZSTDMT_getNbThreads(cctx->mtctx))) { - DEBUGLOG(4, "ZSTD_compress_generic: creating new mtctx for nbThreads=%u (previous: %u)", - params.nbThreads, (unsigned)ZSTDMT_getNbThreads(cctx->mtctx)); + DEBUGLOG(4, "ZSTD_compress_generic: creating new mtctx for nbThreads=%u", + params.nbThreads); + if (cctx->mtctx != NULL) + DEBUGLOG(4, "ZSTD_compress_generic: previous nbThreads was %u", + ZSTDMT_getNbThreads(cctx->mtctx)); ZSTDMT_freeCCtx(cctx->mtctx); cctx->mtctx = ZSTDMT_createCCtx_advanced(params.nbThreads, cctx->customMem); if (cctx->mtctx == NULL) return ERROR(memory_allocation); @@ -3024,6 +3034,7 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx, cctx->cdict, params, cctx->pledgedSrcSizePlusOne-1) ); cctx->streamStage = zcss_load; cctx->appliedParams.nbThreads = params.nbThreads; + cctx->appliedParams.nonBlockingMode = params.nonBlockingMode; } else #endif { CHECK_F( ZSTD_resetCStream_internal( @@ -3036,7 +3047,7 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx, /* compression stage */ #ifdef ZSTD_MULTITHREAD - if (cctx->appliedParams.nbThreads > 1) { + if ((cctx->appliedParams.nbThreads > 1) | (cctx->appliedParams.nonBlockingMode==1)) { size_t const flushMin = ZSTDMT_compressStream_generic(cctx->mtctx, output, input, endOp); if ( ZSTD_isError(flushMin) || (endOp == ZSTD_e_end && flushMin == 0) ) { /* compression completed */ diff --git a/lib/compress/zstd_compress_internal.h b/lib/compress/zstd_compress_internal.h index 7be20d4c..eb18cbc5 100644 --- a/lib/compress/zstd_compress_internal.h +++ b/lib/compress/zstd_compress_internal.h @@ -150,6 +150,7 @@ struct ZSTD_CCtx_params_s { /* Multithreading: used to pass parameters to mtctx */ U32 nbThreads; + int nonBlockingMode; /* will trigger ZSTDMT even with nbThreads==1 */ unsigned jobSize; unsigned overlapSizeLog; diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 0dadda28..0c28eb78 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -497,7 +497,7 @@ size_t ZSTDMT_CCtxParam_setNbThreads(ZSTD_CCtx_params* params, unsigned nbThread /* ZSTDMT_getNbThreads(): * @return nb threads currently active in mtctx. * mtctx must be valid */ -size_t ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx) +unsigned ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx) { assert(mtctx != NULL); return mtctx->params.nbThreads; diff --git a/lib/compress/zstdmt_compress.h b/lib/compress/zstdmt_compress.h index b6e68684..7716ea68 100644 --- a/lib/compress/zstdmt_compress.h +++ b/lib/compress/zstdmt_compress.h @@ -120,7 +120,7 @@ size_t ZSTDMT_CCtxParam_setNbThreads(ZSTD_CCtx_params* params, unsigned nbThread /* ZSTDMT_getNbThreads(): * @return nb threads currently active in mtctx. * mtctx must be valid */ -size_t ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx); +unsigned ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx); /*! ZSTDMT_initCStream_internal() : * Private use only. Init streaming operation. diff --git a/lib/zstd.h b/lib/zstd.h index a84490d9..6ac132a6 100644 --- a/lib/zstd.h +++ b/lib/zstd.h @@ -972,10 +972,20 @@ typedef enum { ZSTD_p_dictIDFlag, /* When applicable, dictionary's ID is written into frame header (default:1) */ /* multi-threading parameters */ + /* These parameters are only useful if multi-threading is enabled (ZSTD_MULTITHREAD). + * They return an error otherwise. */ ZSTD_p_nbThreads=400, /* Select how many threads a compression job can spawn (default:1) * More threads improve speed, but also increase memory usage. * Can only receive a value > 1 if ZSTD_MULTITHREAD is enabled. * Special: value 0 means "do not change nbThreads" */ + ZSTD_p_nonBlockingMode, /* Single thread mode is by default "blocking" : + * it finishes its job as much as possible, and only then gives back control to caller. + * In contrast, multi-thread is by default "non-blocking" : + * it takes some input, flush some output if available, and immediately gives back control to caller. + * Compression work is performed in parallel, within worker threads. + * (note : a strong exception to this rule is when first job is called with ZSTD_e_end : it becomes blocking) + * Setting this parameter to 1 will enforce non-blocking mode even when only 1 thread is selected. + * It allows the caller to do other tasks while the worker thread compresses in parallel. */ ZSTD_p_jobSize, /* Size of a compression job. This value is only enforced in streaming (non-blocking) mode. * Each compression job is completed in parallel, so indirectly controls the nb of active threads. * 0 means default, which is dynamically determined based on compression parameters. diff --git a/programs/fileio.c b/programs/fileio.c index 3ae2d405..ea84853c 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -457,6 +457,7 @@ static cRess_t FIO_createCResources(const char* dictFileName, int cLevel, /* multi-threading */ DISPLAYLEVEL(5,"set nb threads = %u \n", g_nbThreads); CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_nbThreads, g_nbThreads) ); + CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_nonBlockingMode, 1) ); /* dictionary */ CHECK( ZSTD_CCtx_setPledgedSrcSize(ress.cctx, srcSize) ); /* just for dictionary loading, for compression parameters adaptation */ CHECK( ZSTD_CCtx_loadDictionary(ress.cctx, dictBuffer, dictBuffSize) );