slow down faster when output speed is limited
This commit is contained in:
parent
3d7b533f68
commit
9d26cb6a75
@ -737,8 +737,13 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
|
|||||||
FILE* const dstFile = ress.dstFile;
|
FILE* const dstFile = ress.dstFile;
|
||||||
U64 compressedfilesize = 0;
|
U64 compressedfilesize = 0;
|
||||||
ZSTD_EndDirective directive = ZSTD_e_continue;
|
ZSTD_EndDirective directive = ZSTD_e_continue;
|
||||||
|
|
||||||
|
typedef enum { noChange, slower, faster } speedChange_e;
|
||||||
|
speedChange_e speedChange = noChange;
|
||||||
unsigned inputBlocked = 0;
|
unsigned inputBlocked = 0;
|
||||||
unsigned lastJobID = 0;
|
unsigned lastJobID = 0;
|
||||||
|
unsigned long long lastProduced = 0;
|
||||||
|
unsigned long long lastFlushedSize = 0;
|
||||||
|
|
||||||
DISPLAYLEVEL(6, "compression using zstd format \n");
|
DISPLAYLEVEL(6, "compression using zstd format \n");
|
||||||
|
|
||||||
@ -763,6 +768,7 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
|
|||||||
stillToFlush = 1;
|
stillToFlush = 1;
|
||||||
while ((inBuff.pos != inBuff.size) /* input buffer must be entirely ingested */
|
while ((inBuff.pos != inBuff.size) /* input buffer must be entirely ingested */
|
||||||
|| (directive == ZSTD_e_end && stillToFlush != 0) ) {
|
|| (directive == ZSTD_e_end && stillToFlush != 0) ) {
|
||||||
|
|
||||||
size_t const oldIPos = inBuff.pos;
|
size_t const oldIPos = inBuff.pos;
|
||||||
ZSTD_outBuffer outBuff = { ress.dstBuffer, ress.dstBufferSize, 0 };
|
ZSTD_outBuffer outBuff = { ress.dstBuffer, ress.dstBufferSize, 0 };
|
||||||
CHECK_V(stillToFlush, ZSTD_compress_generic(ress.cctx, &outBuff, &inBuff, directive));
|
CHECK_V(stillToFlush, ZSTD_compress_generic(ress.cctx, &outBuff, &inBuff, directive));
|
||||||
@ -779,21 +785,53 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
|
|||||||
EXM_THROW(25, "Write error : cannot write compressed block");
|
EXM_THROW(25, "Write error : cannot write compressed block");
|
||||||
compressedfilesize += outBuff.pos;
|
compressedfilesize += outBuff.pos;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* display notification; and adapt compression level */
|
||||||
if (READY_FOR_UPDATE()) {
|
if (READY_FOR_UPDATE()) {
|
||||||
ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);
|
ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);
|
||||||
double const cShare = (double)zfp.produced / (zfp.consumed + !zfp.consumed/*avoid div0*/) * 100;
|
double const cShare = (double)zfp.produced / (zfp.consumed + !zfp.consumed/*avoid div0*/) * 100;
|
||||||
|
|
||||||
|
/* check output speed */
|
||||||
|
if (zfp.currentJobID > 0) {
|
||||||
|
unsigned long long newlyProduced = zfp.produced - lastProduced;
|
||||||
|
unsigned long long newlyFlushed = compressedfilesize - lastFlushedSize;
|
||||||
|
assert(zfp.produced >= lastProduced);
|
||||||
|
if (newlyProduced == 0) {
|
||||||
|
DISPLAYLEVEL(6, "no more data compression generation => buffers are full, compression waiting => output (or input) too slow \n")
|
||||||
|
speedChange = slower;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( (newlyProduced > (newlyFlushed * 9 / 8))
|
||||||
|
&& (stillToFlush > ZSTD_BLOCKSIZE_MAX) ) {
|
||||||
|
DISPLAYLEVEL(6, "production faster than flushing (%llu > %llu) \n", newlyProduced, newlyFlushed);
|
||||||
|
speedChange = slower;
|
||||||
|
}
|
||||||
|
lastProduced = zfp.produced;
|
||||||
|
lastFlushedSize = compressedfilesize;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* course correct only if there is at least one job completed */
|
||||||
|
if (zfp.currentJobID > lastJobID) {
|
||||||
|
DISPLAYLEVEL(6, "compression level adaptation check \n")
|
||||||
|
|
||||||
/* check input speed */
|
/* check input speed */
|
||||||
if (zfp.currentJobID >= lastJobID+2) {
|
if (zfp.currentJobID > g_nbWorkers+1) { /* warm up period, to fill all workers */
|
||||||
if (inputBlocked <= 1) { /* small tolerance */
|
if (inputBlocked <= 1) { /* small tolerance */
|
||||||
DISPLAYLEVEL(6, "input is never blocked => input is too slow \n");
|
DISPLAYLEVEL(6, "input is never blocked => input is too slow \n");
|
||||||
compressionLevel++;
|
speedChange = slower;
|
||||||
ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_compressionLevel, (unsigned)compressionLevel);
|
|
||||||
}
|
}
|
||||||
lastJobID = zfp.currentJobID;
|
|
||||||
inputBlocked = 0;
|
inputBlocked = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (speedChange == slower) {
|
||||||
|
DISPLAYLEVEL(6, "slower speed , higher compression \n")
|
||||||
|
compressionLevel ++;
|
||||||
|
ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_compressionLevel, (unsigned)compressionLevel);
|
||||||
|
speedChange = noChange;
|
||||||
|
}
|
||||||
|
lastJobID = zfp.currentJobID;
|
||||||
|
} /* if (zfp.currentJobID > lastJobID) */
|
||||||
|
|
||||||
if (g_displayLevel >= 3) {
|
if (g_displayLevel >= 3) {
|
||||||
DISPLAYUPDATE(3, "\r(L%i) Buffered :%4u MB - Consumed :%4u MB - Compressed :%4u MB => %.2f%% ",
|
DISPLAYUPDATE(3, "\r(L%i) Buffered :%4u MB - Consumed :%4u MB - Compressed :%4u MB => %.2f%% ",
|
||||||
compressionLevel,
|
compressionLevel,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user