introduced parameter ZSTD_p_nonBlockingMode

This new parameter makes it possible to call
streaming ZSTDMT with a single thread set
which is non blocking.

It makes it possible for the main thread to do other tasks in parallel
while the worker thread does compression.
Typically, for zstd cli, it means it can do I/O stuff.

Applied within fileio.c, this patch provides non-negligible gains during compression.

Tested on my laptop, with enwik9 (1000000000 bytes) : time zstd -f enwik9

With traditional single-thread blocking mode :
real    0m9.557s
user    0m8.861s
sys     0m0.538s

With new single-worker non blocking mode :
real    0m7.938s
user    0m8.049s
sys     0m0.514s

=> 20% faster
This commit is contained in:
Yann Collet 2018-01-16 16:15:47 -08:00
parent 6025465e42
commit 1dba98d563
6 changed files with 34 additions and 11 deletions

View File

@ -281,9 +281,8 @@ size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned v
} }
return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value); return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
case ZSTD_p_nonBlockingMode:
case ZSTD_p_jobSize: case ZSTD_p_jobSize:
return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
case ZSTD_p_overlapSizeLog: case ZSTD_p_overlapSizeLog:
return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value); return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
@ -407,11 +406,18 @@ size_t ZSTD_CCtxParam_setParameter(
return ZSTDMT_CCtxParam_setNbThreads(CCtxParams, value); return ZSTDMT_CCtxParam_setNbThreads(CCtxParams, value);
#endif #endif
case ZSTD_p_nonBlockingMode :
#ifndef ZSTD_MULTITHREAD
return ERROR(parameter_unsupported);
#else
CCtxParams->nonBlockingMode = (value>0);
return CCtxParams->nonBlockingMode;
#endif
case ZSTD_p_jobSize : case ZSTD_p_jobSize :
#ifndef ZSTD_MULTITHREAD #ifndef ZSTD_MULTITHREAD
return ERROR(parameter_unsupported); return ERROR(parameter_unsupported);
#else #else
if (CCtxParams->nbThreads <= 1) return ERROR(parameter_unsupported);
return ZSTDMT_CCtxParam_setMTCtxParameter(CCtxParams, ZSTDMT_p_jobSize, value); return ZSTDMT_CCtxParam_setMTCtxParameter(CCtxParams, ZSTDMT_p_jobSize, value);
#endif #endif
@ -419,7 +425,6 @@ size_t ZSTD_CCtxParam_setParameter(
#ifndef ZSTD_MULTITHREAD #ifndef ZSTD_MULTITHREAD
return ERROR(parameter_unsupported); return ERROR(parameter_unsupported);
#else #else
if (CCtxParams->nbThreads <= 1) return ERROR(parameter_unsupported);
return ZSTDMT_CCtxParam_setMTCtxParameter(CCtxParams, ZSTDMT_p_overlapSectionLog, value); return ZSTDMT_CCtxParam_setMTCtxParameter(CCtxParams, ZSTDMT_p_overlapSectionLog, value);
#endif #endif
@ -3007,12 +3012,17 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx,
cctx->requestedParams, cctx->pledgedSrcSizePlusOne-1, 0 /*dictSize*/); cctx->requestedParams, cctx->pledgedSrcSizePlusOne-1, 0 /*dictSize*/);
#ifdef ZSTD_MULTITHREAD #ifdef ZSTD_MULTITHREAD
if ((cctx->pledgedSrcSizePlusOne-1) <= ZSTDMT_JOBSIZE_MIN) if ((cctx->pledgedSrcSizePlusOne-1) <= ZSTDMT_JOBSIZE_MIN) {
params.nbThreads = 1; /* do not invoke multi-threading when src size is too small */ params.nbThreads = 1; /* do not invoke multi-threading when src size is too small */
if (params.nbThreads > 1) { params.nonBlockingMode = 0;
}
if ((params.nbThreads > 1) | (params.nonBlockingMode == 1)) {
if (cctx->mtctx == NULL || (params.nbThreads != ZSTDMT_getNbThreads(cctx->mtctx))) { if (cctx->mtctx == NULL || (params.nbThreads != ZSTDMT_getNbThreads(cctx->mtctx))) {
DEBUGLOG(4, "ZSTD_compress_generic: creating new mtctx for nbThreads=%u (previous: %u)", DEBUGLOG(4, "ZSTD_compress_generic: creating new mtctx for nbThreads=%u",
params.nbThreads, (unsigned)ZSTDMT_getNbThreads(cctx->mtctx)); params.nbThreads);
if (cctx->mtctx != NULL)
DEBUGLOG(4, "ZSTD_compress_generic: previous nbThreads was %u",
ZSTDMT_getNbThreads(cctx->mtctx));
ZSTDMT_freeCCtx(cctx->mtctx); ZSTDMT_freeCCtx(cctx->mtctx);
cctx->mtctx = ZSTDMT_createCCtx_advanced(params.nbThreads, cctx->customMem); cctx->mtctx = ZSTDMT_createCCtx_advanced(params.nbThreads, cctx->customMem);
if (cctx->mtctx == NULL) return ERROR(memory_allocation); if (cctx->mtctx == NULL) return ERROR(memory_allocation);
@ -3024,6 +3034,7 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx,
cctx->cdict, params, cctx->pledgedSrcSizePlusOne-1) ); cctx->cdict, params, cctx->pledgedSrcSizePlusOne-1) );
cctx->streamStage = zcss_load; cctx->streamStage = zcss_load;
cctx->appliedParams.nbThreads = params.nbThreads; cctx->appliedParams.nbThreads = params.nbThreads;
cctx->appliedParams.nonBlockingMode = params.nonBlockingMode;
} else } else
#endif #endif
{ CHECK_F( ZSTD_resetCStream_internal( { CHECK_F( ZSTD_resetCStream_internal(
@ -3036,7 +3047,7 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx,
/* compression stage */ /* compression stage */
#ifdef ZSTD_MULTITHREAD #ifdef ZSTD_MULTITHREAD
if (cctx->appliedParams.nbThreads > 1) { if ((cctx->appliedParams.nbThreads > 1) | (cctx->appliedParams.nonBlockingMode==1)) {
size_t const flushMin = ZSTDMT_compressStream_generic(cctx->mtctx, output, input, endOp); size_t const flushMin = ZSTDMT_compressStream_generic(cctx->mtctx, output, input, endOp);
if ( ZSTD_isError(flushMin) if ( ZSTD_isError(flushMin)
|| (endOp == ZSTD_e_end && flushMin == 0) ) { /* compression completed */ || (endOp == ZSTD_e_end && flushMin == 0) ) { /* compression completed */

View File

@ -150,6 +150,7 @@ struct ZSTD_CCtx_params_s {
/* Multithreading: used to pass parameters to mtctx */ /* Multithreading: used to pass parameters to mtctx */
U32 nbThreads; U32 nbThreads;
int nonBlockingMode; /* will trigger ZSTDMT even with nbThreads==1 */
unsigned jobSize; unsigned jobSize;
unsigned overlapSizeLog; unsigned overlapSizeLog;

View File

@ -497,7 +497,7 @@ size_t ZSTDMT_CCtxParam_setNbThreads(ZSTD_CCtx_params* params, unsigned nbThread
/* ZSTDMT_getNbThreads(): /* ZSTDMT_getNbThreads():
* @return nb threads currently active in mtctx. * @return nb threads currently active in mtctx.
* mtctx must be valid */ * mtctx must be valid */
size_t ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx) unsigned ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx)
{ {
assert(mtctx != NULL); assert(mtctx != NULL);
return mtctx->params.nbThreads; return mtctx->params.nbThreads;

View File

@ -120,7 +120,7 @@ size_t ZSTDMT_CCtxParam_setNbThreads(ZSTD_CCtx_params* params, unsigned nbThread
/* ZSTDMT_getNbThreads(): /* ZSTDMT_getNbThreads():
* @return nb threads currently active in mtctx. * @return nb threads currently active in mtctx.
* mtctx must be valid */ * mtctx must be valid */
size_t ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx); unsigned ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx);
/*! ZSTDMT_initCStream_internal() : /*! ZSTDMT_initCStream_internal() :
* Private use only. Init streaming operation. * Private use only. Init streaming operation.

View File

@ -972,10 +972,20 @@ typedef enum {
ZSTD_p_dictIDFlag, /* When applicable, dictionary's ID is written into frame header (default:1) */ ZSTD_p_dictIDFlag, /* When applicable, dictionary's ID is written into frame header (default:1) */
/* multi-threading parameters */ /* multi-threading parameters */
/* These parameters are only useful if multi-threading is enabled (ZSTD_MULTITHREAD).
* They return an error otherwise. */
ZSTD_p_nbThreads=400, /* Select how many threads a compression job can spawn (default:1) ZSTD_p_nbThreads=400, /* Select how many threads a compression job can spawn (default:1)
* More threads improve speed, but also increase memory usage. * More threads improve speed, but also increase memory usage.
* Can only receive a value > 1 if ZSTD_MULTITHREAD is enabled. * Can only receive a value > 1 if ZSTD_MULTITHREAD is enabled.
* Special: value 0 means "do not change nbThreads" */ * Special: value 0 means "do not change nbThreads" */
ZSTD_p_nonBlockingMode, /* Single thread mode is by default "blocking" :
* it finishes its job as much as possible, and only then gives back control to caller.
* In contrast, multi-thread is by default "non-blocking" :
* it takes some input, flush some output if available, and immediately gives back control to caller.
* Compression work is performed in parallel, within worker threads.
* (note : a strong exception to this rule is when first job is called with ZSTD_e_end : it becomes blocking)
* Setting this parameter to 1 will enforce non-blocking mode even when only 1 thread is selected.
* It allows the caller to do other tasks while the worker thread compresses in parallel. */
ZSTD_p_jobSize, /* Size of a compression job. This value is only enforced in streaming (non-blocking) mode. ZSTD_p_jobSize, /* Size of a compression job. This value is only enforced in streaming (non-blocking) mode.
* Each compression job is completed in parallel, so indirectly controls the nb of active threads. * Each compression job is completed in parallel, so indirectly controls the nb of active threads.
* 0 means default, which is dynamically determined based on compression parameters. * 0 means default, which is dynamically determined based on compression parameters.

View File

@ -457,6 +457,7 @@ static cRess_t FIO_createCResources(const char* dictFileName, int cLevel,
/* multi-threading */ /* multi-threading */
DISPLAYLEVEL(5,"set nb threads = %u \n", g_nbThreads); DISPLAYLEVEL(5,"set nb threads = %u \n", g_nbThreads);
CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_nbThreads, g_nbThreads) ); CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_nbThreads, g_nbThreads) );
CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_nonBlockingMode, 1) );
/* dictionary */ /* dictionary */
CHECK( ZSTD_CCtx_setPledgedSrcSize(ress.cctx, srcSize) ); /* just for dictionary loading, for compression parameters adaptation */ CHECK( ZSTD_CCtx_setPledgedSrcSize(ress.cctx, srcSize) ); /* just for dictionary loading, for compression parameters adaptation */
CHECK( ZSTD_CCtx_loadDictionary(ress.cctx, dictBuffer, dictBuffSize) ); CHECK( ZSTD_CCtx_loadDictionary(ress.cctx, dictBuffer, dictBuffSize) );