commit
48bed91606
@ -273,6 +273,7 @@ struct ZSTDMT_CCtx_s {
|
||||
pthread_mutex_t jobCompleted_mutex;
|
||||
pthread_cond_t jobCompleted_cond;
|
||||
size_t targetSectionSize;
|
||||
size_t marginSize;
|
||||
size_t inBuffSize;
|
||||
size_t dictSize;
|
||||
size_t targetDictSize;
|
||||
@ -285,7 +286,7 @@ struct ZSTDMT_CCtx_s {
|
||||
unsigned nextJobID;
|
||||
unsigned frameEnded;
|
||||
unsigned allJobsCompleted;
|
||||
unsigned overlapWrLog;
|
||||
unsigned overlapRLog;
|
||||
unsigned long long frameContentSize;
|
||||
size_t sectionSize;
|
||||
ZSTD_CDict* cdict;
|
||||
@ -308,7 +309,7 @@ ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads)
|
||||
cctx->jobIDMask = nbJobs - 1;
|
||||
cctx->allJobsCompleted = 1;
|
||||
cctx->sectionSize = 0;
|
||||
cctx->overlapWrLog = 3;
|
||||
cctx->overlapRLog = 3;
|
||||
cctx->factory = POOL_create(nbThreads, 1);
|
||||
cctx->buffPool = ZSTDMT_createBufferPool(nbThreads);
|
||||
cctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads);
|
||||
@ -368,8 +369,9 @@ size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSDTMT_parameter parameter,
|
||||
case ZSTDMT_p_sectionSize :
|
||||
mtctx->sectionSize = value;
|
||||
return 0;
|
||||
case ZSTDMT_p_overlapSectionRLog :
|
||||
mtctx->overlapWrLog = value;
|
||||
case ZSTDMT_p_overlapSectionLog :
|
||||
DEBUGLOG(4, "ZSTDMT_p_overlapSectionLog : %u", value);
|
||||
mtctx->overlapRLog = (value >= 9) ? 0 : 9 - value;
|
||||
return 0;
|
||||
default :
|
||||
return ERROR(compressionParameter_unsupported);
|
||||
@ -512,10 +514,15 @@ static size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs,
|
||||
if (zcs->cdict == NULL) return ERROR(memory_allocation);
|
||||
} }
|
||||
zcs->frameContentSize = pledgedSrcSize;
|
||||
zcs->targetDictSize = (zcs->overlapRLog>=9) ? 0 : (size_t)1 << (zcs->params.cParams.windowLog - zcs->overlapRLog);
|
||||
DEBUGLOG(4, "overlapRLog : %u ", zcs->overlapRLog);
|
||||
DEBUGLOG(3, "overlap Size : %u KB", (U32)(zcs->targetDictSize>>10));
|
||||
zcs->targetSectionSize = zcs->sectionSize ? zcs->sectionSize : (size_t)1 << (zcs->params.cParams.windowLog + 2);
|
||||
zcs->targetSectionSize = MAX(ZSTDMT_SECTION_SIZE_MIN, zcs->targetSectionSize);
|
||||
zcs->targetDictSize = zcs->overlapWrLog < 10 ? (size_t)1 << (zcs->params.cParams.windowLog - zcs->overlapWrLog) : 0;
|
||||
zcs->inBuffSize = zcs->targetSectionSize + ((size_t)1 << zcs->params.cParams.windowLog) /* margin */ + zcs->targetDictSize;
|
||||
zcs->targetSectionSize = MAX(zcs->targetDictSize, zcs->targetSectionSize);
|
||||
DEBUGLOG(3, "Section Size : %u KB", (U32)(zcs->targetSectionSize>>10));
|
||||
zcs->marginSize = zcs->targetSectionSize >> 2;
|
||||
zcs->inBuffSize = zcs->targetDictSize + zcs->targetSectionSize + zcs->marginSize;
|
||||
zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize);
|
||||
if (zcs->inBuff.buffer.start == NULL) return ERROR(memory_allocation);
|
||||
zcs->inBuff.filled = 0;
|
||||
@ -680,6 +687,7 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi
|
||||
|
||||
size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
|
||||
{
|
||||
size_t const newJobThreshold = zcs->dictSize + zcs->targetSectionSize + zcs->marginSize;
|
||||
if (zcs->frameEnded) return ERROR(stage_wrong); /* current frame being ended. Only flush is allowed. Restart with init */
|
||||
if (zcs->nbThreads==1) return ZSTD_compressStream(zcs->cstream, output, input);
|
||||
|
||||
@ -690,7 +698,7 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
|
||||
zcs->inBuff.filled += toLoad;
|
||||
}
|
||||
|
||||
if ( (zcs->inBuff.filled == zcs->inBuffSize) /* filled enough : let's compress */
|
||||
if ( (zcs->inBuff.filled >= newJobThreshold) /* filled enough : let's compress */
|
||||
&& (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) { /* avoid overwriting job round buffer */
|
||||
CHECK_F( ZSTDMT_createCompressionJob(zcs, zcs->targetSectionSize, 0) );
|
||||
}
|
||||
|
@ -60,7 +60,7 @@ ZSTDLIB_API size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* mtctx, const void* d
|
||||
* List of parameters that can be set using ZSTDMT_setMTCtxParameter() */
|
||||
typedef enum {
|
||||
ZSTDMT_p_sectionSize, /* size of input "section". Each section is compressed in parallel. 0 means default, which is dynamically determined within compression functions */
|
||||
ZSTDMT_p_overlapSectionRLog /* reverse log of overlapped section; 0 == use a complete window, 3(default) == use 1/8th of window, values >=10 means no overlap */
|
||||
ZSTDMT_p_overlapSectionLog /* Log of overlapped section; 0 == no overlap, 6(default) == use 1/8th of window, >=9 == use full window */
|
||||
} ZSDTMT_parameter;
|
||||
|
||||
/* ZSTDMT_setMTCtxParameter() :
|
||||
|
@ -78,14 +78,14 @@
|
||||
* Macros
|
||||
***************************************/
|
||||
#define DISPLAY(...) fprintf(stderr, __VA_ARGS__)
|
||||
#define DISPLAYLEVEL(l, ...) if (g_displayLevel>=l) { DISPLAY(__VA_ARGS__); }
|
||||
#define DISPLAYLEVEL(l, ...) { if (g_displayLevel>=l) { DISPLAY(__VA_ARGS__); } }
|
||||
static U32 g_displayLevel = 2; /* 0 : no display; 1: errors; 2 : + result + interaction + warnings; 3 : + progression; 4 : + information */
|
||||
void FIO_setNotificationLevel(unsigned level) { g_displayLevel=level; }
|
||||
|
||||
#define DISPLAYUPDATE(l, ...) if (g_displayLevel>=l) { \
|
||||
#define DISPLAYUPDATE(l, ...) { if (g_displayLevel>=l) { \
|
||||
if ((clock() - g_time > refreshRate) || (g_displayLevel>=4)) \
|
||||
{ g_time = clock(); DISPLAY(__VA_ARGS__); \
|
||||
if (g_displayLevel>=4) fflush(stdout); } }
|
||||
if (g_displayLevel>=4) fflush(stdout); } } }
|
||||
static const clock_t refreshRate = CLOCKS_PER_SEC * 15 / 100;
|
||||
static clock_t g_time = 0;
|
||||
|
||||
@ -124,6 +124,13 @@ void FIO_setBlockSize(unsigned blockSize) {
|
||||
#endif
|
||||
g_blockSize = blockSize;
|
||||
}
|
||||
#define FIO_OVERLAP_LOG_NOTSET 9999
|
||||
static U32 g_overlapLog = FIO_OVERLAP_LOG_NOTSET;
|
||||
void FIO_setOverlapLog(unsigned overlapLog){
|
||||
if (overlapLog && g_nbThreads==1)
|
||||
DISPLAYLEVEL(2, "Setting overlapLog is useless in single-thread mode \n");
|
||||
g_overlapLog = overlapLog;
|
||||
}
|
||||
|
||||
|
||||
/*-*************************************
|
||||
@ -272,8 +279,10 @@ static cRess_t FIO_createCResources(const char* dictFileName, int cLevel,
|
||||
#ifdef ZSTD_MULTITHREAD
|
||||
ress.cctx = ZSTDMT_createCCtx(g_nbThreads);
|
||||
if (ress.cctx == NULL) EXM_THROW(30, "zstd: allocation error : can't create ZSTD_CStream");
|
||||
if (cLevel==ZSTD_maxCLevel())
|
||||
ZSTDMT_setMTCtxParameter(ress.cctx, ZSTDMT_p_overlapSectionRLog, 0); /* use complete window for overlap */
|
||||
if ((cLevel==ZSTD_maxCLevel()) && (g_overlapLog==FIO_OVERLAP_LOG_NOTSET))
|
||||
ZSTDMT_setMTCtxParameter(ress.cctx, ZSTDMT_p_overlapSectionLog, 9); /* use complete window for overlap */
|
||||
if (g_overlapLog != FIO_OVERLAP_LOG_NOTSET)
|
||||
ZSTDMT_setMTCtxParameter(ress.cctx, ZSTDMT_p_overlapSectionLog, g_overlapLog);
|
||||
#else
|
||||
ress.cctx = ZSTD_createCStream();
|
||||
if (ress.cctx == NULL) EXM_THROW(30, "zstd: allocation error : can't create ZSTD_CStream");
|
||||
@ -355,7 +364,6 @@ static int FIO_compressFilename_internal(cRess_t ress,
|
||||
size_t const inSize = fread(ress.srcBuffer, (size_t)1, ress.srcBufferSize, srcFile);
|
||||
if (inSize==0) break;
|
||||
readsize += inSize;
|
||||
DISPLAYUPDATE(2, "\rRead : %u MB ", (U32)(readsize>>20));
|
||||
|
||||
{ ZSTD_inBuffer inBuff = { ress.srcBuffer, inSize, 0 };
|
||||
while (inBuff.pos != inBuff.size) { /* note : is there any possibility of endless loop ? for example, if outBuff is not large enough ? */
|
||||
@ -373,7 +381,13 @@ static int FIO_compressFilename_internal(cRess_t ress,
|
||||
if (sizeCheck!=outBuff.pos) EXM_THROW(25, "Write error : cannot write compressed block into %s", dstFileName);
|
||||
compressedfilesize += outBuff.pos;
|
||||
} } }
|
||||
DISPLAYUPDATE(2, "\rRead : %u MB ==> %.2f%% ", (U32)(readsize>>20), (double)compressedfilesize/readsize*100);
|
||||
#ifdef ZSTD_MULTITHREAD
|
||||
if (!fileSize) DISPLAYUPDATE(2, "\rRead : %u MB", (U32)(readsize>>20))
|
||||
else DISPLAYUPDATE(2, "\rRead : %u / %u MB", (U32)(readsize>>20), (U32)(fileSize>>20));
|
||||
#else
|
||||
if (!fileSize) DISPLAYUPDATE(2, "\rRead : %u MB ==> %.2f%%", (U32)(readsize>>20), (double)compressedfilesize/readsize*100)
|
||||
else DISPLAYUPDATE(2, "\rRead : %u / %u MB ==> %.2f%%", (U32)(readsize>>20), (U32)(fileSize>>20), (double)compressedfilesize/readsize*100);
|
||||
#endif
|
||||
}
|
||||
|
||||
/* End of Frame */
|
||||
|
@ -43,6 +43,7 @@ void FIO_setRemoveSrcFile(unsigned flag);
|
||||
void FIO_setMemLimit(unsigned memLimit);
|
||||
void FIO_setNbThreads(unsigned nbThreads);
|
||||
void FIO_setBlockSize(unsigned blockSize);
|
||||
void FIO_setOverlapLog(unsigned overlapLog);
|
||||
|
||||
|
||||
/*-*************************************
|
||||
|
@ -63,6 +63,8 @@ static const char* g_defaultDictName = "dictionary";
|
||||
static const unsigned g_defaultMaxDictSize = 110 KB;
|
||||
static const int g_defaultDictCLevel = 3;
|
||||
static const unsigned g_defaultSelectivityLevel = 9;
|
||||
#define OVERLAP_LOG_DEFAULT 9999
|
||||
static U32 g_overlapLog = OVERLAP_LOG_DEFAULT;
|
||||
|
||||
|
||||
/*-************************************
|
||||
@ -186,7 +188,7 @@ static unsigned readU32FromChar(const char** stringPtr)
|
||||
}
|
||||
|
||||
/** longCommandWArg() :
|
||||
* check is *stringPtr is the same as longCommand.
|
||||
* check if *stringPtr is the same as longCommand.
|
||||
* If yes, @return 1 and advances *stringPtr to the position which immediately follows longCommand.
|
||||
* @return 0 and doesn't modify *stringPtr otherwise.
|
||||
*/
|
||||
@ -220,6 +222,8 @@ static unsigned parseCoverParameters(const char* stringPtr, COVER_params_t *para
|
||||
return 1;
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
/** parseCompressionParameters() :
|
||||
* reads compression parameters from *stringPtr (e.g. "--zstd=wlog=23,clog=23,hlog=22,slog=6,slen=3,tlen=48,strat=6") into *params
|
||||
* @return 1 means that compression parameters were correct
|
||||
@ -235,6 +239,7 @@ static unsigned parseCompressionParameters(const char* stringPtr, ZSTD_compressi
|
||||
if (longCommandWArg(&stringPtr, "searchLength=") || longCommandWArg(&stringPtr, "slen=")) { params->searchLength = readU32FromChar(&stringPtr); if (stringPtr[0]==',') { stringPtr++; continue; } else break; }
|
||||
if (longCommandWArg(&stringPtr, "targetLength=") || longCommandWArg(&stringPtr, "tlen=")) { params->targetLength = readU32FromChar(&stringPtr); if (stringPtr[0]==',') { stringPtr++; continue; } else break; }
|
||||
if (longCommandWArg(&stringPtr, "strategy=") || longCommandWArg(&stringPtr, "strat=")) { params->strategy = (ZSTD_strategy)(1 + readU32FromChar(&stringPtr)); if (stringPtr[0]==',') { stringPtr++; continue; } else break; }
|
||||
if (longCommandWArg(&stringPtr, "overlapLog=") || longCommandWArg(&stringPtr, "ovlog=")) { g_overlapLog = readU32FromChar(&stringPtr); if (stringPtr[0]==',') { stringPtr++; continue; } else break; }
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -370,6 +375,7 @@ int main(int argCount, const char* argv[])
|
||||
if (longCommandWArg(&argument, "--memlimit=")) { memLimit = readU32FromChar(&argument); continue; }
|
||||
if (longCommandWArg(&argument, "--memory=")) { memLimit = readU32FromChar(&argument); continue; }
|
||||
if (longCommandWArg(&argument, "--memlimit-decompress=")) { memLimit = readU32FromChar(&argument); continue; }
|
||||
if (longCommandWArg(&argument, "--block-size=")) { blockSize = readU32FromChar(&argument); continue; }
|
||||
if (longCommandWArg(&argument, "--zstd=")) { if (!parseCompressionParameters(argument, &compressionParams)) CLEAN_RETURN(badusage(programName)); continue; }
|
||||
/* fall-through, will trigger bad_usage() later on */
|
||||
}
|
||||
@ -629,6 +635,7 @@ int main(int argCount, const char* argv[])
|
||||
#ifndef ZSTD_NOCOMPRESS
|
||||
FIO_setNbThreads(nbThreads);
|
||||
FIO_setBlockSize((U32)blockSize);
|
||||
if (g_overlapLog!=OVERLAP_LOG_DEFAULT) FIO_setOverlapLog(g_overlapLog);
|
||||
if ((filenameIdx==1) && outFileName)
|
||||
operationResult = FIO_compressFilename(outFileName, filenameTable[0], dictFileName, cLevel, &compressionParams);
|
||||
else
|
||||
|
@ -837,9 +837,11 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
|
||||
DISPLAYLEVEL(5, "Init with windowLog = %u \n", params.cParams.windowLog);
|
||||
params.fParams.checksumFlag = FUZ_rand(&lseed) & 1;
|
||||
params.fParams.noDictIDFlag = FUZ_rand(&lseed) & 1;
|
||||
{ size_t const initError = ZSTDMT_initCStream_advanced(zc, dict, dictSize, params, pledgedSrcSize);
|
||||
CHECK (ZSTD_isError(initError),"ZSTDMT_initCStream_advanced error : %s", ZSTD_getErrorName(initError));
|
||||
} } }
|
||||
{ size_t const initError = ZSTDMT_initCStream_advanced(zc, dict, dictSize, params, pledgedSrcSize);
|
||||
CHECK (ZSTD_isError(initError),"ZSTDMT_initCStream_advanced error : %s", ZSTD_getErrorName(initError)); }
|
||||
ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_overlapSectionLog, FUZ_rand(&lseed) % 12);
|
||||
ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_sectionSize, FUZ_rand(&lseed) % (2*maxTestSize+1));
|
||||
} }
|
||||
|
||||
/* multi-segments compression test */
|
||||
XXH64_reset(&xxhState, 0);
|
||||
|
Loading…
x
Reference in New Issue
Block a user