Merge pull request #1726 from nmagerko/stream-size

Add --stream-size=# option
This commit is contained in:
Nick Terrell 2019-08-22 11:31:15 -07:00 committed by GitHub
commit a505463710
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 39 additions and 14 deletions

View File

@ -304,6 +304,7 @@ struct FIO_prefs_s {
int ldmMinMatch; int ldmMinMatch;
int ldmBucketSizeLog; int ldmBucketSizeLog;
int ldmHashRateLog; int ldmHashRateLog;
size_t streamSrcSize;
size_t targetCBlockSize; size_t targetCBlockSize;
ZSTD_literalCompressionMode_e literalCompressionMode; ZSTD_literalCompressionMode_e literalCompressionMode;
@ -349,6 +350,7 @@ FIO_prefs_t* FIO_createPreferences(void)
ret->ldmMinMatch = 0; ret->ldmMinMatch = 0;
ret->ldmBucketSizeLog = FIO_LDM_PARAM_NOTSET; ret->ldmBucketSizeLog = FIO_LDM_PARAM_NOTSET;
ret->ldmHashRateLog = FIO_LDM_PARAM_NOTSET; ret->ldmHashRateLog = FIO_LDM_PARAM_NOTSET;
ret->streamSrcSize = 0;
ret->targetCBlockSize = 0; ret->targetCBlockSize = 0;
ret->literalCompressionMode = ZSTD_lcm_auto; ret->literalCompressionMode = ZSTD_lcm_auto;
return ret; return ret;
@ -418,6 +420,10 @@ void FIO_setRsyncable(FIO_prefs_t* const prefs, int rsyncable) {
prefs->rsyncable = rsyncable; prefs->rsyncable = rsyncable;
} }
void FIO_setStreamSrcSize(FIO_prefs_t* const prefs, size_t streamSrcSize) {
prefs->streamSrcSize = streamSrcSize;
}
void FIO_setTargetCBlockSize(FIO_prefs_t* const prefs, size_t targetCBlockSize) { void FIO_setTargetCBlockSize(FIO_prefs_t* const prefs, size_t targetCBlockSize) {
prefs->targetCBlockSize = targetCBlockSize; prefs->targetCBlockSize = targetCBlockSize;
} }
@ -633,7 +639,6 @@ typedef struct {
static cRess_t FIO_createCResources(FIO_prefs_t* const prefs, static cRess_t FIO_createCResources(FIO_prefs_t* const prefs,
const char* dictFileName, int cLevel, const char* dictFileName, int cLevel,
U64 srcSize,
ZSTD_compressionParameters comprParams) { ZSTD_compressionParameters comprParams) {
cRess_t ress; cRess_t ress;
memset(&ress, 0, sizeof(ress)); memset(&ress, 0, sizeof(ress));
@ -698,10 +703,7 @@ static cRess_t FIO_createCResources(FIO_prefs_t* const prefs,
CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_rsyncable, prefs->rsyncable) ); CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_rsyncable, prefs->rsyncable) );
#endif #endif
/* dictionary */ /* dictionary */
CHECK( ZSTD_CCtx_setPledgedSrcSize(ress.cctx, srcSize) ); /* set the value temporarily for dictionary loading, to adapt compression parameters */
CHECK( ZSTD_CCtx_loadDictionary(ress.cctx, dictBuffer, dictBuffSize) ); CHECK( ZSTD_CCtx_loadDictionary(ress.cctx, dictBuffer, dictBuffSize) );
CHECK( ZSTD_CCtx_setPledgedSrcSize(ress.cctx, ZSTD_CONTENTSIZE_UNKNOWN) ); /* reset */
free(dictBuffer); free(dictBuffer);
} }
@ -1003,6 +1005,9 @@ FIO_compressZstdFrame(FIO_prefs_t* const prefs,
/* init */ /* init */
if (fileSize != UTIL_FILESIZE_UNKNOWN) { if (fileSize != UTIL_FILESIZE_UNKNOWN) {
CHECK(ZSTD_CCtx_setPledgedSrcSize(ress.cctx, fileSize)); CHECK(ZSTD_CCtx_setPledgedSrcSize(ress.cctx, fileSize));
} else if (prefs->streamSrcSize > 0) {
/* unknown source size; use the declared stream size */
CHECK( ZSTD_CCtx_setPledgedSrcSize(ress.cctx, prefs->streamSrcSize) );
} }
(void)srcFileName; (void)srcFileName;
@ -1361,10 +1366,7 @@ int FIO_compressFilename(FIO_prefs_t* const prefs,
const char* dictFileName, int compressionLevel, const char* dictFileName, int compressionLevel,
ZSTD_compressionParameters comprParams) ZSTD_compressionParameters comprParams)
{ {
U64 const fileSize = UTIL_getFileSize(srcFileName); cRess_t const ress = FIO_createCResources(prefs, dictFileName, compressionLevel, comprParams);
U64 const srcSize = (fileSize == UTIL_FILESIZE_UNKNOWN) ? ZSTD_CONTENTSIZE_UNKNOWN : fileSize;
cRess_t const ress = FIO_createCResources(prefs, dictFileName, compressionLevel, srcSize, comprParams);
int const result = FIO_compressFilename_srcFile(prefs, ress, dstFileName, srcFileName, compressionLevel); int const result = FIO_compressFilename_srcFile(prefs, ress, dstFileName, srcFileName, compressionLevel);
@ -1415,10 +1417,7 @@ int FIO_compressMultipleFilenames(FIO_prefs_t* const prefs,
ZSTD_compressionParameters comprParams) ZSTD_compressionParameters comprParams)
{ {
int error = 0; int error = 0;
U64 const firstFileSize = UTIL_getFileSize(inFileNamesTable[0]); cRess_t ress = FIO_createCResources(prefs, dictFileName, compressionLevel, comprParams);
U64 const firstSrcSize = (firstFileSize == UTIL_FILESIZE_UNKNOWN) ? ZSTD_CONTENTSIZE_UNKNOWN : firstFileSize;
U64 const srcSize = (nbFiles != 1) ? ZSTD_CONTENTSIZE_UNKNOWN : firstSrcSize ;
cRess_t ress = FIO_createCResources(prefs, dictFileName, compressionLevel, srcSize, comprParams);
/* init */ /* init */
assert(outFileName != NULL || suffix != NULL); assert(outFileName != NULL || suffix != NULL);

View File

@ -71,6 +71,7 @@ void FIO_setOverlapLog(FIO_prefs_t* const prefs, int overlapLog);
void FIO_setRemoveSrcFile(FIO_prefs_t* const prefs, unsigned flag); void FIO_setRemoveSrcFile(FIO_prefs_t* const prefs, unsigned flag);
void FIO_setSparseWrite(FIO_prefs_t* const prefs, unsigned sparse); /**< 0: no sparse; 1: disable on stdout; 2: always enabled */ void FIO_setSparseWrite(FIO_prefs_t* const prefs, unsigned sparse); /**< 0: no sparse; 1: disable on stdout; 2: always enabled */
void FIO_setRsyncable(FIO_prefs_t* const prefs, int rsyncable); void FIO_setRsyncable(FIO_prefs_t* const prefs, int rsyncable);
void FIO_setStreamSrcSize(FIO_prefs_t* const prefs, size_t streamSrcSize);
void FIO_setTargetCBlockSize(FIO_prefs_t* const prefs, size_t targetCBlockSize); void FIO_setTargetCBlockSize(FIO_prefs_t* const prefs, size_t targetCBlockSize);
void FIO_setLiteralCompressionMode( void FIO_setLiteralCompressionMode(
FIO_prefs_t* const prefs, FIO_prefs_t* const prefs,

View File

@ -144,6 +144,11 @@ the last one takes effect.
Due to the chaotic nature of dynamic adaptation, compressed result is not reproducible. Due to the chaotic nature of dynamic adaptation, compressed result is not reproducible.
_note_ : at the time of this writing, `--adapt` can remain stuck at low speed _note_ : at the time of this writing, `--adapt` can remain stuck at low speed
when combined with multiple worker threads (>=2). when combined with multiple worker threads (>=2).
* `--stream-size=#` :
Sets the pledged source size of input coming from a stream. This value must be exact, as it
will be included in the produced frame header. Incorrect stream sizes will cause an error.
This information will be used to better optimize compression parameters, resulting in
better and potentially faster compression, especially for smaller source sizes.
* `--rsyncable` : * `--rsyncable` :
`zstd` will periodically synchronize the compression state to make the `zstd` will periodically synchronize the compression state to make the
compressed file more rsync-friendly. There is a negligible impact to compressed file more rsync-friendly. There is a negligible impact to

View File

@ -141,6 +141,7 @@ static int usage_advanced(const char* programName)
DISPLAY( "--long[=#]: enable long distance matching with given window log (default: %u)\n", g_defaultMaxWindowLog); DISPLAY( "--long[=#]: enable long distance matching with given window log (default: %u)\n", g_defaultMaxWindowLog);
DISPLAY( "--fast[=#]: switch to ultra fast compression level (default: %u)\n", 1); DISPLAY( "--fast[=#]: switch to ultra fast compression level (default: %u)\n", 1);
DISPLAY( "--adapt : dynamically adapt compression level to I/O conditions \n"); DISPLAY( "--adapt : dynamically adapt compression level to I/O conditions \n");
DISPLAY( "--stream-size=# : optimize compression parameters for streaming input of given number of bytes \n");
DISPLAY( "--target-compressed-block-size=# : make compressed block near targeted size \n"); DISPLAY( "--target-compressed-block-size=# : make compressed block near targeted size \n");
#ifdef ZSTD_MULTITHREAD #ifdef ZSTD_MULTITHREAD
DISPLAY( " -T# : spawns # compression threads (default: 1, 0==# cores) \n"); DISPLAY( " -T# : spawns # compression threads (default: 1, 0==# cores) \n");
@ -588,6 +589,7 @@ int main(int argCount, const char* argv[])
const char* suffix = ZSTD_EXTENSION; const char* suffix = ZSTD_EXTENSION;
unsigned maxDictSize = g_defaultMaxDictSize; unsigned maxDictSize = g_defaultMaxDictSize;
unsigned dictID = 0; unsigned dictID = 0;
size_t streamSrcSize = 0;
size_t targetCBlockSize = 0; size_t targetCBlockSize = 0;
int dictCLevel = g_defaultDictCLevel; int dictCLevel = g_defaultDictCLevel;
unsigned dictSelect = g_defaultSelectivityLevel; unsigned dictSelect = g_defaultSelectivityLevel;
@ -745,6 +747,7 @@ int main(int argCount, const char* argv[])
if (longCommandWArg(&argument, "--maxdict=")) { maxDictSize = readU32FromChar(&argument); continue; } if (longCommandWArg(&argument, "--maxdict=")) { maxDictSize = readU32FromChar(&argument); continue; }
if (longCommandWArg(&argument, "--dictID=")) { dictID = readU32FromChar(&argument); continue; } if (longCommandWArg(&argument, "--dictID=")) { dictID = readU32FromChar(&argument); continue; }
if (longCommandWArg(&argument, "--zstd=")) { if (!parseCompressionParameters(argument, &compressionParams)) CLEAN_RETURN(badusage(programName)); continue; } if (longCommandWArg(&argument, "--zstd=")) { if (!parseCompressionParameters(argument, &compressionParams)) CLEAN_RETURN(badusage(programName)); continue; }
if (longCommandWArg(&argument, "--stream-size=")) { streamSrcSize = readU32FromChar(&argument); continue; }
if (longCommandWArg(&argument, "--target-compressed-block-size=")) { targetCBlockSize = readU32FromChar(&argument); continue; } if (longCommandWArg(&argument, "--target-compressed-block-size=")) { targetCBlockSize = readU32FromChar(&argument); continue; }
if (longCommandWArg(&argument, "--long")) { if (longCommandWArg(&argument, "--long")) {
unsigned ldmWindowLog = 0; unsigned ldmWindowLog = 0;
@ -1150,6 +1153,7 @@ int main(int argCount, const char* argv[])
FIO_setAdaptMin(prefs, adaptMin); FIO_setAdaptMin(prefs, adaptMin);
FIO_setAdaptMax(prefs, adaptMax); FIO_setAdaptMax(prefs, adaptMax);
FIO_setRsyncable(prefs, rsyncable); FIO_setRsyncable(prefs, rsyncable);
FIO_setStreamSrcSize(prefs, streamSrcSize);
FIO_setTargetCBlockSize(prefs, targetCBlockSize); FIO_setTargetCBlockSize(prefs, targetCBlockSize);
FIO_setLiteralCompressionMode(prefs, literalCompressionMode); FIO_setLiteralCompressionMode(prefs, literalCompressionMode);
if (adaptMin > cLevel) cLevel = adaptMin; if (adaptMin > cLevel) cLevel = adaptMin;
@ -1160,7 +1164,7 @@ int main(int argCount, const char* argv[])
else else
operationResult = FIO_compressMultipleFilenames(prefs, filenameTable, filenameIdx, outFileName, suffix, dictFileName, cLevel, compressionParams); operationResult = FIO_compressMultipleFilenames(prefs, filenameTable, filenameIdx, outFileName, suffix, dictFileName, cLevel, compressionParams);
#else #else
(void)suffix; (void)adapt; (void)rsyncable; (void)ultra; (void)cLevel; (void)ldmFlag; (void)literalCompressionMode; (void)targetCBlockSize; /* not used when ZSTD_NOCOMPRESS set */ (void)suffix; (void)adapt; (void)rsyncable; (void)ultra; (void)cLevel; (void)ldmFlag; (void)literalCompressionMode; (void)streamSrcSize; (void)targetCBlockSize; /* not used when ZSTD_NOCOMPRESS set */
DISPLAY("Compression not supported \n"); DISPLAY("Compression not supported \n");
#endif #endif
} else { /* decompression or test */ } else { /* decompression or test */

View File

@ -108,7 +108,6 @@ else
fi fi
println "\n===> simple tests " println "\n===> simple tests "
./datagen > tmp ./datagen > tmp
@ -409,6 +408,23 @@ println "compress multiple files including a missing one (notHere) : "
$ZSTD -f tmp1 notHere tmp2 && die "missing file not detected!" $ZSTD -f tmp1 notHere tmp2 && die "missing file not detected!"
println "\n===> stream-size mode"
./datagen -g11000 > tmp
println "test : basic file compression vs sized streaming compression"
file_size=$($ZSTD -14 -f tmp -o tmp.zst && wc -c < tmp.zst)
stream_size=$(cat tmp | $ZSTD -14 --stream-size=11000 | wc -c)
if [ "$stream_size" -gt "$file_size" ]; then
die "hinted compression larger than expected"
fi
println "test : sized streaming compression and decompression"
cat tmp | $ZSTD -14 -f tmp -o --stream-size=11000 tmp.zst
$ZSTD -df tmp.zst -o tmp_decompress
cmp tmp tmp_decompress || die "difference between original and decompressed file"
println "test : incorrect stream size"
cat tmp | $ZSTD -14 -f -o tmp.zst --stream-size=11001 && die "should fail with incorrect stream size"
println "\n===> dictionary tests " println "\n===> dictionary tests "
println "- test with raw dict (content only) " println "- test with raw dict (content only) "