zstd cli can increase level when input is too slow

dev
Yann Collet 2018-08-09 15:51:30 -07:00
parent 79a35ac20d
commit 2dd76037be
4 changed files with 27 additions and 5 deletions

View File

@ -900,6 +900,7 @@ ZSTD_frameProgression ZSTD_getFrameProgression(const ZSTD_CCtx* cctx)
fp.ingested = cctx->consumedSrcSize + buffered; fp.ingested = cctx->consumedSrcSize + buffered;
fp.consumed = cctx->consumedSrcSize; fp.consumed = cctx->consumedSrcSize;
fp.produced = cctx->producedCSize; fp.produced = cctx->producedCSize;
fp.currentJobID = 0;
return fp; return fp;
} } } }

View File

@ -1083,6 +1083,7 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
fps.ingested = mtctx->consumed + mtctx->inBuff.filled; fps.ingested = mtctx->consumed + mtctx->inBuff.filled;
fps.consumed = mtctx->consumed; fps.consumed = mtctx->consumed;
fps.produced = mtctx->produced; fps.produced = mtctx->produced;
fps.currentJobID = mtctx->nextJobID;
{ unsigned jobNb; { unsigned jobNb;
unsigned lastJobNb = mtctx->nextJobID + mtctx->jobReady; assert(mtctx->jobReady <= 1); unsigned lastJobNb = mtctx->nextJobID + mtctx->jobReady; assert(mtctx->jobReady <= 1);
DEBUGLOG(6, "ZSTDMT_getFrameProgression: jobs: from %u to <%u (jobReady:%u)", DEBUGLOG(6, "ZSTDMT_getFrameProgression: jobs: from %u to <%u (jobReady:%u)",

View File

@ -735,6 +735,7 @@ typedef struct {
unsigned long long ingested; unsigned long long ingested;
unsigned long long consumed; unsigned long long consumed;
unsigned long long produced; unsigned long long produced;
unsigned currentJobID;
} ZSTD_frameProgression; } ZSTD_frameProgression;
/* ZSTD_getFrameProgression(): /* ZSTD_getFrameProgression():

View File

@ -737,6 +737,9 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
FILE* const dstFile = ress.dstFile; FILE* const dstFile = ress.dstFile;
U64 compressedfilesize = 0; U64 compressedfilesize = 0;
ZSTD_EndDirective directive = ZSTD_e_continue; ZSTD_EndDirective directive = ZSTD_e_continue;
unsigned inputBlocked = 0;
unsigned lastJobID = 0;
DISPLAYLEVEL(6, "compression using zstd format \n"); DISPLAYLEVEL(6, "compression using zstd format \n");
/* init */ /* init */
@ -747,7 +750,7 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
/* Main compression loop */ /* Main compression loop */
do { do {
size_t result; size_t stillToFlush;
/* Fill input Buffer */ /* Fill input Buffer */
size_t const inSize = fread(ress.srcBuffer, (size_t)1, ress.srcBufferSize, srcFile); size_t const inSize = fread(ress.srcBuffer, (size_t)1, ress.srcBufferSize, srcFile);
ZSTD_inBuffer inBuff = { ress.srcBuffer, inSize, 0 }; ZSTD_inBuffer inBuff = { ress.srcBuffer, inSize, 0 };
@ -757,14 +760,18 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
if ((inSize == 0) || (*readsize == fileSize)) if ((inSize == 0) || (*readsize == fileSize))
directive = ZSTD_e_end; directive = ZSTD_e_end;
result = 1; stillToFlush = 1;
while ((inBuff.pos != inBuff.size) /* input buffer must be entirely ingested */ while ((inBuff.pos != inBuff.size) /* input buffer must be entirely ingested */
|| (directive == ZSTD_e_end && result != 0) ) { || (directive == ZSTD_e_end && stillToFlush != 0) ) {
size_t const oldIPos = inBuff.pos;
ZSTD_outBuffer outBuff = { ress.dstBuffer, ress.dstBufferSize, 0 }; ZSTD_outBuffer outBuff = { ress.dstBuffer, ress.dstBufferSize, 0 };
CHECK_V(result, ZSTD_compress_generic(ress.cctx, &outBuff, &inBuff, directive)); CHECK_V(stillToFlush, ZSTD_compress_generic(ress.cctx, &outBuff, &inBuff, directive));
/* count stats */
if (oldIPos == inBuff.pos) inputBlocked++;
/* Write compressed stream */ /* Write compressed stream */
DISPLAYLEVEL(6, "ZSTD_compress_generic(end:%u) => intput pos(%u)<=(%u)size ; output generated %u bytes \n", DISPLAYLEVEL(6, "ZSTD_compress_generic(end:%u) => input pos(%u)<=(%u)size ; output generated %u bytes \n",
(U32)directive, (U32)inBuff.pos, (U32)inBuff.size, (U32)outBuff.pos); (U32)directive, (U32)inBuff.pos, (U32)inBuff.size, (U32)outBuff.pos);
if (outBuff.pos) { if (outBuff.pos) {
size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile); size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile);
@ -775,6 +782,18 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
if (READY_FOR_UPDATE()) { if (READY_FOR_UPDATE()) {
ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx); ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);
double const cShare = (double)zfp.produced / (zfp.consumed + !zfp.consumed/*avoid div0*/) * 100; double const cShare = (double)zfp.produced / (zfp.consumed + !zfp.consumed/*avoid div0*/) * 100;
/* check input speed */
if (zfp.currentJobID >= lastJobID+2) {
if (inputBlocked<=1) { /* small tolerance */
DISPLAYLEVEL(6, "input is never blocked => input is too slow \n");
compressionLevel++;
ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_compressionLevel, (unsigned)compressionLevel);
}
lastJobID = zfp.currentJobID;
inputBlocked = 0;
}
if (g_displayLevel >= 3) { if (g_displayLevel >= 3) {
DISPLAYUPDATE(3, "\r(L%i) Buffered :%4u MB - Consumed :%4u MB - Compressed :%4u MB => %.2f%%", DISPLAYUPDATE(3, "\r(L%i) Buffered :%4u MB - Consumed :%4u MB - Compressed :%4u MB => %.2f%%",
compressionLevel, compressionLevel,