From 1dba98d563d81fb42b4ac575e70eaf52654bbf8e Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Tue, 16 Jan 2018 16:15:47 -0800 Subject: [PATCH] introduced parameter ZSTD_p_nonBlockingMode This new parameter makes it possible to call streaming ZSTDMT with a single thread set which is non blocking. It makes it possible for the main thread to do other tasks in parallel while the worker thread does compression. Typically, for zstd cli, it means it can do I/O stuff. Applied within fileio.c, this patch provides non-negligible gains during compression. Tested on my laptop, with enwik9 (1000000000 bytes) : time zstd -f enwik9 With traditional single-thread blocking mode : real 0m9.557s user 0m8.861s sys 0m0.538s With new single-worker non blocking mode : real 0m7.938s user 0m8.049s sys 0m0.514s => 20% faster --- lib/compress/zstd_compress.c | 29 ++++++++++++++++++--------- lib/compress/zstd_compress_internal.h | 1 + lib/compress/zstdmt_compress.c | 2 +- lib/compress/zstdmt_compress.h | 2 +- lib/zstd.h | 10 +++++++++ programs/fileio.c | 1 + 6 files changed, 34 insertions(+), 11 deletions(-) diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index d2703f23..fbae3c3f 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -281,9 +281,8 @@ size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned v } return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value); + case ZSTD_p_nonBlockingMode: case ZSTD_p_jobSize: - return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value); - case ZSTD_p_overlapSizeLog: return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value); @@ -407,11 +406,18 @@ size_t ZSTD_CCtxParam_setParameter( return ZSTDMT_CCtxParam_setNbThreads(CCtxParams, value); #endif + case ZSTD_p_nonBlockingMode : +#ifndef ZSTD_MULTITHREAD + return ERROR(parameter_unsupported); +#else + CCtxParams->nonBlockingMode = (value>0); + return CCtxParams->nonBlockingMode; +#endif + case ZSTD_p_jobSize : #ifndef ZSTD_MULTITHREAD return ERROR(parameter_unsupported); #else - if (CCtxParams->nbThreads <= 1) return ERROR(parameter_unsupported); return ZSTDMT_CCtxParam_setMTCtxParameter(CCtxParams, ZSTDMT_p_jobSize, value); #endif @@ -419,7 +425,6 @@ size_t ZSTD_CCtxParam_setParameter( #ifndef ZSTD_MULTITHREAD return ERROR(parameter_unsupported); #else - if (CCtxParams->nbThreads <= 1) return ERROR(parameter_unsupported); return ZSTDMT_CCtxParam_setMTCtxParameter(CCtxParams, ZSTDMT_p_overlapSectionLog, value); #endif @@ -3007,12 +3012,17 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx, cctx->requestedParams, cctx->pledgedSrcSizePlusOne-1, 0 /*dictSize*/); #ifdef ZSTD_MULTITHREAD - if ((cctx->pledgedSrcSizePlusOne-1) <= ZSTDMT_JOBSIZE_MIN) + if ((cctx->pledgedSrcSizePlusOne-1) <= ZSTDMT_JOBSIZE_MIN) { params.nbThreads = 1; /* do not invoke multi-threading when src size is too small */ - if (params.nbThreads > 1) { + params.nonBlockingMode = 0; + } + if ((params.nbThreads > 1) | (params.nonBlockingMode == 1)) { if (cctx->mtctx == NULL || (params.nbThreads != ZSTDMT_getNbThreads(cctx->mtctx))) { - DEBUGLOG(4, "ZSTD_compress_generic: creating new mtctx for nbThreads=%u (previous: %u)", - params.nbThreads, (unsigned)ZSTDMT_getNbThreads(cctx->mtctx)); + DEBUGLOG(4, "ZSTD_compress_generic: creating new mtctx for nbThreads=%u", + params.nbThreads); + if (cctx->mtctx != NULL) + DEBUGLOG(4, "ZSTD_compress_generic: previous nbThreads was %u", + ZSTDMT_getNbThreads(cctx->mtctx)); ZSTDMT_freeCCtx(cctx->mtctx); cctx->mtctx = ZSTDMT_createCCtx_advanced(params.nbThreads, cctx->customMem); if (cctx->mtctx == NULL) return ERROR(memory_allocation); @@ -3024,6 +3034,7 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx, cctx->cdict, params, cctx->pledgedSrcSizePlusOne-1) ); cctx->streamStage = zcss_load; cctx->appliedParams.nbThreads = params.nbThreads; + cctx->appliedParams.nonBlockingMode = params.nonBlockingMode; } else #endif { CHECK_F( ZSTD_resetCStream_internal( @@ -3036,7 +3047,7 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx, /* compression stage */ #ifdef ZSTD_MULTITHREAD - if (cctx->appliedParams.nbThreads > 1) { + if ((cctx->appliedParams.nbThreads > 1) | (cctx->appliedParams.nonBlockingMode==1)) { size_t const flushMin = ZSTDMT_compressStream_generic(cctx->mtctx, output, input, endOp); if ( ZSTD_isError(flushMin) || (endOp == ZSTD_e_end && flushMin == 0) ) { /* compression completed */ diff --git a/lib/compress/zstd_compress_internal.h b/lib/compress/zstd_compress_internal.h index 7be20d4c..eb18cbc5 100644 --- a/lib/compress/zstd_compress_internal.h +++ b/lib/compress/zstd_compress_internal.h @@ -150,6 +150,7 @@ struct ZSTD_CCtx_params_s { /* Multithreading: used to pass parameters to mtctx */ U32 nbThreads; + int nonBlockingMode; /* will trigger ZSTDMT even with nbThreads==1 */ unsigned jobSize; unsigned overlapSizeLog; diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 0dadda28..0c28eb78 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -497,7 +497,7 @@ size_t ZSTDMT_CCtxParam_setNbThreads(ZSTD_CCtx_params* params, unsigned nbThread /* ZSTDMT_getNbThreads(): * @return nb threads currently active in mtctx. * mtctx must be valid */ -size_t ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx) +unsigned ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx) { assert(mtctx != NULL); return mtctx->params.nbThreads; diff --git a/lib/compress/zstdmt_compress.h b/lib/compress/zstdmt_compress.h index b6e68684..7716ea68 100644 --- a/lib/compress/zstdmt_compress.h +++ b/lib/compress/zstdmt_compress.h @@ -120,7 +120,7 @@ size_t ZSTDMT_CCtxParam_setNbThreads(ZSTD_CCtx_params* params, unsigned nbThread /* ZSTDMT_getNbThreads(): * @return nb threads currently active in mtctx. * mtctx must be valid */ -size_t ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx); +unsigned ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx); /*! ZSTDMT_initCStream_internal() : * Private use only. Init streaming operation. diff --git a/lib/zstd.h b/lib/zstd.h index a84490d9..6ac132a6 100644 --- a/lib/zstd.h +++ b/lib/zstd.h @@ -972,10 +972,20 @@ typedef enum { ZSTD_p_dictIDFlag, /* When applicable, dictionary's ID is written into frame header (default:1) */ /* multi-threading parameters */ + /* These parameters are only useful if multi-threading is enabled (ZSTD_MULTITHREAD). + * They return an error otherwise. */ ZSTD_p_nbThreads=400, /* Select how many threads a compression job can spawn (default:1) * More threads improve speed, but also increase memory usage. * Can only receive a value > 1 if ZSTD_MULTITHREAD is enabled. * Special: value 0 means "do not change nbThreads" */ + ZSTD_p_nonBlockingMode, /* Single thread mode is by default "blocking" : + * it finishes its job as much as possible, and only then gives back control to caller. + * In contrast, multi-thread is by default "non-blocking" : + * it takes some input, flush some output if available, and immediately gives back control to caller. + * Compression work is performed in parallel, within worker threads. + * (note : a strong exception to this rule is when first job is called with ZSTD_e_end : it becomes blocking) + * Setting this parameter to 1 will enforce non-blocking mode even when only 1 thread is selected. + * It allows the caller to do other tasks while the worker thread compresses in parallel. */ ZSTD_p_jobSize, /* Size of a compression job. This value is only enforced in streaming (non-blocking) mode. * Each compression job is completed in parallel, so indirectly controls the nb of active threads. * 0 means default, which is dynamically determined based on compression parameters. diff --git a/programs/fileio.c b/programs/fileio.c index 3ae2d405..ea84853c 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -457,6 +457,7 @@ static cRess_t FIO_createCResources(const char* dictFileName, int cLevel, /* multi-threading */ DISPLAYLEVEL(5,"set nb threads = %u \n", g_nbThreads); CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_nbThreads, g_nbThreads) ); + CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_nonBlockingMode, 1) ); /* dictionary */ CHECK( ZSTD_CCtx_setPledgedSrcSize(ress.cctx, srcSize) ); /* just for dictionary loading, for compression parameters adaptation */ CHECK( ZSTD_CCtx_loadDictionary(ress.cctx, dictBuffer, dictBuffSize) );