Merge pull request #1416 from terrelln/rsync
- Factor out LDM's hash function for reuse - Add rsyncable mode to zstdmt and expose it via the advanced API - Fix `-B`/`--block-size` to actually set the job size - Add rsyncable tests to `zstreamtest` and `playTests.sh` Tested by: ``` > cat A.100MB B.100MB C.100MB D.100MB E.100MB | zstd --rsyncable -fo src/file.zst /*stdin*\ : 48.22% (524288000 => 252837782 bytes, src/file.zst) > rsync -rc --stats src devbigvm:/data/users/terrelln/rsync-test Total bytes sent: 252868779 total size is 252837782 speedup is 1.00 > echo test > test > cat test A.100MB test B.100MB test C.100MB test D.100MB test E.100MB | zstd --rsyncable -fo src/file.zst /*stdin*\ : 48.23% (524288025 => 252838025 bytes, src/unicorn.tar.zst) > rsync -rc --stats src devbigvm:/data/users/terrelln/rsync-test Total bytes sent: 4605696 total size is 252838025 speedup is 53.60 ``` Close #1155.dev
commit
f15f1bfefb
|
@ -254,6 +254,7 @@ static int ZSTD_isUpdateAuthorized(ZSTD_cParameter param)
|
|||
case ZSTD_p_nbWorkers:
|
||||
case ZSTD_p_jobSize:
|
||||
case ZSTD_p_overlapSizeLog:
|
||||
case ZSTD_p_rsyncable:
|
||||
case ZSTD_p_enableLongDistanceMatching:
|
||||
case ZSTD_p_ldmHashLog:
|
||||
case ZSTD_p_ldmMinMatch:
|
||||
|
@ -315,6 +316,7 @@ size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned v
|
|||
|
||||
case ZSTD_p_jobSize:
|
||||
case ZSTD_p_overlapSizeLog:
|
||||
case ZSTD_p_rsyncable:
|
||||
return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
|
||||
|
||||
case ZSTD_p_enableLongDistanceMatching:
|
||||
|
@ -441,6 +443,13 @@ size_t ZSTD_CCtxParam_setParameter(
|
|||
return ZSTDMT_CCtxParam_setMTCtxParameter(CCtxParams, ZSTDMT_p_overlapSectionLog, value);
|
||||
#endif
|
||||
|
||||
case ZSTD_p_rsyncable :
|
||||
#ifndef ZSTD_MULTITHREAD
|
||||
return ERROR(parameter_unsupported);
|
||||
#else
|
||||
return ZSTDMT_CCtxParam_setMTCtxParameter(CCtxParams, ZSTDMT_p_rsyncable, value);
|
||||
#endif
|
||||
|
||||
case ZSTD_p_enableLongDistanceMatching :
|
||||
CCtxParams->ldmParams.enableLdm = (value>0);
|
||||
return CCtxParams->ldmParams.enableLdm;
|
||||
|
@ -544,6 +553,13 @@ size_t ZSTD_CCtxParam_getParameter(
|
|||
#else
|
||||
*value = CCtxParams->overlapSizeLog;
|
||||
break;
|
||||
#endif
|
||||
case ZSTD_p_rsyncable :
|
||||
#ifndef ZSTD_MULTITHREAD
|
||||
return ERROR(parameter_unsupported);
|
||||
#else
|
||||
*value = CCtxParams->rsyncable;
|
||||
break;
|
||||
#endif
|
||||
case ZSTD_p_enableLongDistanceMatching :
|
||||
*value = CCtxParams->ldmParams.enableLdm;
|
||||
|
@ -1160,7 +1176,7 @@ static size_t ZSTD_resetCCtx_internal(ZSTD_CCtx* zc,
|
|||
ZSTD_ldm_adjustParameters(¶ms.ldmParams, ¶ms.cParams);
|
||||
assert(params.ldmParams.hashLog >= params.ldmParams.bucketSizeLog);
|
||||
assert(params.ldmParams.hashEveryLog < 32);
|
||||
zc->ldmState.hashPower = ZSTD_ldm_getHashPower(params.ldmParams.minMatchLength);
|
||||
zc->ldmState.hashPower = ZSTD_rollingHash_primePower(params.ldmParams.minMatchLength);
|
||||
}
|
||||
|
||||
{ size_t const windowSize = MAX(1, (size_t)MIN(((U64)1 << params.cParams.windowLog), pledgedSrcSize));
|
||||
|
|
|
@ -193,6 +193,7 @@ struct ZSTD_CCtx_params_s {
|
|||
unsigned nbWorkers;
|
||||
unsigned jobSize;
|
||||
unsigned overlapSizeLog;
|
||||
unsigned rsyncable;
|
||||
|
||||
/* Long distance matching parameters */
|
||||
ldmParams_t ldmParams;
|
||||
|
@ -492,6 +493,64 @@ MEM_STATIC size_t ZSTD_hashPtr(const void* p, U32 hBits, U32 mls)
|
|||
}
|
||||
}
|
||||
|
||||
/** ZSTD_ipow() :
|
||||
* Return base^exponent.
|
||||
*/
|
||||
static U64 ZSTD_ipow(U64 base, U64 exponent)
|
||||
{
|
||||
U64 power = 1;
|
||||
while (exponent) {
|
||||
if (exponent & 1) power *= base;
|
||||
exponent >>= 1;
|
||||
base *= base;
|
||||
}
|
||||
return power;
|
||||
}
|
||||
|
||||
#define ZSTD_ROLL_HASH_CHAR_OFFSET 10
|
||||
|
||||
/** ZSTD_rollingHash_append() :
|
||||
* Add the buffer to the hash value.
|
||||
*/
|
||||
static U64 ZSTD_rollingHash_append(U64 hash, void const* buf, size_t size)
|
||||
{
|
||||
BYTE const* istart = (BYTE const*)buf;
|
||||
size_t pos;
|
||||
for (pos = 0; pos < size; ++pos) {
|
||||
hash *= prime8bytes;
|
||||
hash += istart[pos] + ZSTD_ROLL_HASH_CHAR_OFFSET;
|
||||
}
|
||||
return hash;
|
||||
}
|
||||
|
||||
/** ZSTD_rollingHash_compute() :
|
||||
* Compute the rolling hash value of the buffer.
|
||||
*/
|
||||
MEM_STATIC U64 ZSTD_rollingHash_compute(void const* buf, size_t size)
|
||||
{
|
||||
return ZSTD_rollingHash_append(0, buf, size);
|
||||
}
|
||||
|
||||
/** ZSTD_rollingHash_primePower() :
|
||||
* Compute the primePower to be passed to ZSTD_rollingHash_rotate() for a hash
|
||||
* over a window of length bytes.
|
||||
*/
|
||||
MEM_STATIC U64 ZSTD_rollingHash_primePower(U32 length)
|
||||
{
|
||||
return ZSTD_ipow(prime8bytes, length - 1);
|
||||
}
|
||||
|
||||
/** ZSTD_rollingHash_rotate() :
|
||||
* Rotate the rolling hash by one byte.
|
||||
*/
|
||||
MEM_STATIC U64 ZSTD_rollingHash_rotate(U64 hash, BYTE toRemove, BYTE toAdd, U64 primePower)
|
||||
{
|
||||
hash -= (toRemove + ZSTD_ROLL_HASH_CHAR_OFFSET) * primePower;
|
||||
hash *= prime8bytes;
|
||||
hash += toAdd + ZSTD_ROLL_HASH_CHAR_OFFSET;
|
||||
return hash;
|
||||
}
|
||||
|
||||
/*-*************************************
|
||||
* Round buffer management
|
||||
***************************************/
|
||||
|
|
|
@ -143,56 +143,6 @@ static void ZSTD_ldm_makeEntryAndInsertByTag(ldmState_t* ldmState,
|
|||
}
|
||||
}
|
||||
|
||||
/** ZSTD_ldm_getRollingHash() :
|
||||
* Get a 64-bit hash using the first len bytes from buf.
|
||||
*
|
||||
* Giving bytes s = s_1, s_2, ... s_k, the hash is defined to be
|
||||
* H(s) = s_1*(a^(k-1)) + s_2*(a^(k-2)) + ... + s_k*(a^0)
|
||||
*
|
||||
* where the constant a is defined to be prime8bytes.
|
||||
*
|
||||
* The implementation adds an offset to each byte, so
|
||||
* H(s) = (s_1 + HASH_CHAR_OFFSET)*(a^(k-1)) + ... */
|
||||
static U64 ZSTD_ldm_getRollingHash(const BYTE* buf, U32 len)
|
||||
{
|
||||
U64 ret = 0;
|
||||
U32 i;
|
||||
for (i = 0; i < len; i++) {
|
||||
ret *= prime8bytes;
|
||||
ret += buf[i] + LDM_HASH_CHAR_OFFSET;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
/** ZSTD_ldm_ipow() :
|
||||
* Return base^exp. */
|
||||
static U64 ZSTD_ldm_ipow(U64 base, U64 exp)
|
||||
{
|
||||
U64 ret = 1;
|
||||
while (exp) {
|
||||
if (exp & 1) { ret *= base; }
|
||||
exp >>= 1;
|
||||
base *= base;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
U64 ZSTD_ldm_getHashPower(U32 minMatchLength) {
|
||||
DEBUGLOG(4, "ZSTD_ldm_getHashPower: mml=%u", minMatchLength);
|
||||
assert(minMatchLength >= ZSTD_LDM_MINMATCH_MIN);
|
||||
return ZSTD_ldm_ipow(prime8bytes, minMatchLength - 1);
|
||||
}
|
||||
|
||||
/** ZSTD_ldm_updateHash() :
|
||||
* Updates hash by removing toRemove and adding toAdd. */
|
||||
static U64 ZSTD_ldm_updateHash(U64 hash, BYTE toRemove, BYTE toAdd, U64 hashPower)
|
||||
{
|
||||
hash -= ((toRemove + LDM_HASH_CHAR_OFFSET) * hashPower);
|
||||
hash *= prime8bytes;
|
||||
hash += toAdd + LDM_HASH_CHAR_OFFSET;
|
||||
return hash;
|
||||
}
|
||||
|
||||
/** ZSTD_ldm_countBackwardsMatch() :
|
||||
* Returns the number of bytes that match backwards before pIn and pMatch.
|
||||
*
|
||||
|
@ -261,7 +211,7 @@ static U64 ZSTD_ldm_fillLdmHashTable(ldmState_t* state,
|
|||
const BYTE* cur = lastHashed + 1;
|
||||
|
||||
while (cur < iend) {
|
||||
rollingHash = ZSTD_ldm_updateHash(rollingHash, cur[-1],
|
||||
rollingHash = ZSTD_rollingHash_rotate(rollingHash, cur[-1],
|
||||
cur[ldmParams.minMatchLength-1],
|
||||
state->hashPower);
|
||||
ZSTD_ldm_makeEntryAndInsertByTag(state,
|
||||
|
@ -324,11 +274,11 @@ static size_t ZSTD_ldm_generateSequences_internal(
|
|||
size_t forwardMatchLength = 0, backwardMatchLength = 0;
|
||||
ldmEntry_t* bestEntry = NULL;
|
||||
if (ip != istart) {
|
||||
rollingHash = ZSTD_ldm_updateHash(rollingHash, lastHashed[0],
|
||||
rollingHash = ZSTD_rollingHash_rotate(rollingHash, lastHashed[0],
|
||||
lastHashed[minMatchLength],
|
||||
hashPower);
|
||||
} else {
|
||||
rollingHash = ZSTD_ldm_getRollingHash(ip, minMatchLength);
|
||||
rollingHash = ZSTD_rollingHash_compute(ip, minMatchLength);
|
||||
}
|
||||
lastHashed = ip;
|
||||
|
||||
|
|
|
@ -86,10 +86,6 @@ size_t ZSTD_ldm_getTableSize(ldmParams_t params);
|
|||
*/
|
||||
size_t ZSTD_ldm_getMaxNbSeq(ldmParams_t params, size_t maxChunkSize);
|
||||
|
||||
/** ZSTD_ldm_getTableSize() :
|
||||
* Return prime8bytes^(minMatchLength-1) */
|
||||
U64 ZSTD_ldm_getHashPower(U32 minMatchLength);
|
||||
|
||||
/** ZSTD_ldm_adjustParameters() :
|
||||
* If the params->hashEveryLog is not set, set it to its default value based on
|
||||
* windowLog and params->hashLog.
|
||||
|
|
|
@ -471,7 +471,7 @@ static int ZSTDMT_serialState_reset(serialState_t* serialState, ZSTDMT_seqPool*
|
|||
assert(params.ldmParams.hashLog >= params.ldmParams.bucketSizeLog);
|
||||
assert(params.ldmParams.hashEveryLog < 32);
|
||||
serialState->ldmState.hashPower =
|
||||
ZSTD_ldm_getHashPower(params.ldmParams.minMatchLength);
|
||||
ZSTD_rollingHash_primePower(params.ldmParams.minMatchLength);
|
||||
} else {
|
||||
memset(¶ms.ldmParams, 0, sizeof(params.ldmParams));
|
||||
}
|
||||
|
@ -777,6 +777,14 @@ typedef struct {
|
|||
|
||||
static const roundBuff_t kNullRoundBuff = {NULL, 0, 0};
|
||||
|
||||
#define RSYNC_LENGTH 32
|
||||
|
||||
typedef struct {
|
||||
U64 hash;
|
||||
U64 hitMask;
|
||||
U64 primePower;
|
||||
} rsyncState_t;
|
||||
|
||||
struct ZSTDMT_CCtx_s {
|
||||
POOL_ctx* factory;
|
||||
ZSTDMT_jobDescription* jobs;
|
||||
|
@ -790,6 +798,7 @@ struct ZSTDMT_CCtx_s {
|
|||
inBuff_t inBuff;
|
||||
roundBuff_t roundBuff;
|
||||
serialState_t serial;
|
||||
rsyncState_t rsync;
|
||||
unsigned singleBlockingThread;
|
||||
unsigned jobIDMask;
|
||||
unsigned doneJobID;
|
||||
|
@ -988,6 +997,9 @@ size_t ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params,
|
|||
DEBUGLOG(4, "ZSTDMT_p_overlapSectionLog : %u", value);
|
||||
params->overlapSizeLog = (value >= 9) ? 9 : value;
|
||||
return value;
|
||||
case ZSTDMT_p_rsyncable :
|
||||
params->rsyncable = (value == 0 ? 0 : 1);
|
||||
return value;
|
||||
default :
|
||||
return ERROR(parameter_unsupported);
|
||||
}
|
||||
|
@ -996,15 +1008,7 @@ size_t ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params,
|
|||
size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, unsigned value)
|
||||
{
|
||||
DEBUGLOG(4, "ZSTDMT_setMTCtxParameter");
|
||||
switch(parameter)
|
||||
{
|
||||
case ZSTDMT_p_jobSize :
|
||||
return ZSTDMT_CCtxParam_setMTCtxParameter(&mtctx->params, parameter, value);
|
||||
case ZSTDMT_p_overlapSectionLog :
|
||||
return ZSTDMT_CCtxParam_setMTCtxParameter(&mtctx->params, parameter, value);
|
||||
default :
|
||||
return ERROR(parameter_unsupported);
|
||||
}
|
||||
}
|
||||
|
||||
size_t ZSTDMT_getMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, unsigned* value)
|
||||
|
@ -1016,6 +1020,9 @@ size_t ZSTDMT_getMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter,
|
|||
case ZSTDMT_p_overlapSectionLog:
|
||||
*value = mtctx->params.overlapSizeLog;
|
||||
break;
|
||||
case ZSTDMT_p_rsyncable:
|
||||
*value = mtctx->params.rsyncable;
|
||||
break;
|
||||
default:
|
||||
return ERROR(parameter_unsupported);
|
||||
}
|
||||
|
@ -1381,6 +1388,16 @@ size_t ZSTDMT_initCStream_internal(
|
|||
if (mtctx->targetSectionSize == 0) {
|
||||
mtctx->targetSectionSize = 1ULL << ZSTDMT_computeTargetJobLog(params);
|
||||
}
|
||||
if (params.rsyncable) {
|
||||
/* Aim for the targetsectionSize as the average job size. */
|
||||
U32 const jobSizeMB = (U32)(mtctx->targetSectionSize >> 20);
|
||||
U32 const rsyncBits = ZSTD_highbit32(jobSizeMB) + 20;
|
||||
assert(jobSizeMB >= 1);
|
||||
DEBUGLOG(4, "rsyncLog = %u", rsyncBits);
|
||||
mtctx->rsync.hash = 0;
|
||||
mtctx->rsync.hitMask = (1ULL << rsyncBits) - 1;
|
||||
mtctx->rsync.primePower = ZSTD_rollingHash_primePower(RSYNC_LENGTH);
|
||||
}
|
||||
if (mtctx->targetSectionSize < mtctx->targetPrefixSize) mtctx->targetSectionSize = mtctx->targetPrefixSize; /* job size must be >= overlap size */
|
||||
DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(mtctx->targetSectionSize>>10), params.jobSize);
|
||||
DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->targetSectionSize>>10));
|
||||
|
@ -1818,6 +1835,80 @@ static int ZSTDMT_tryGetInputRange(ZSTDMT_CCtx* mtctx)
|
|||
return 1;
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
size_t toLoad; /* The number of bytes to load from the input. */
|
||||
int flush; /* Boolean declaring if we must flush because we found a synchronization point. */
|
||||
} syncPoint_t;
|
||||
|
||||
/**
|
||||
* Searches through the input for a synchronization point. If one is found, we
|
||||
* will instruct the caller to flush, and return the number of bytes to load.
|
||||
* Otherwise, we will load as many bytes as possible and instruct the caller
|
||||
* to continue as normal.
|
||||
*/
|
||||
static syncPoint_t findSynchronizationPoint(ZSTDMT_CCtx const* mtctx, ZSTD_inBuffer const input) {
|
||||
BYTE const* const istart = (BYTE const*)input.src + input.pos;
|
||||
U64 const primePower = mtctx->rsync.primePower;
|
||||
U64 const hitMask = mtctx->rsync.hitMask;
|
||||
|
||||
syncPoint_t syncPoint;
|
||||
U64 hash;
|
||||
BYTE const* prev;
|
||||
size_t pos;
|
||||
|
||||
syncPoint.toLoad = MIN(input.size - input.pos, mtctx->targetSectionSize - mtctx->inBuff.filled);
|
||||
syncPoint.flush = 0;
|
||||
if (!mtctx->params.rsyncable)
|
||||
/* Rsync is disabled. */
|
||||
return syncPoint;
|
||||
if (mtctx->inBuff.filled + syncPoint.toLoad < RSYNC_LENGTH)
|
||||
/* Not enough to compute the hash.
|
||||
* We will miss any synchronization points in this RSYNC_LENGTH byte
|
||||
* window. However, since it depends only in the internal buffers, if the
|
||||
* state is already synchronized, we will remain synchronized.
|
||||
* Additionally, the probability that we miss a synchronization point is
|
||||
* low: RSYNC_LENGTH / targetSectionSize.
|
||||
*/
|
||||
return syncPoint;
|
||||
/* Initialize the loop variables. */
|
||||
if (mtctx->inBuff.filled >= RSYNC_LENGTH) {
|
||||
/* We have enough bytes buffered to initialize the hash.
|
||||
* Start scanning at the beginning of the input.
|
||||
*/
|
||||
pos = 0;
|
||||
prev = (BYTE const*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled - RSYNC_LENGTH;
|
||||
hash = ZSTD_rollingHash_compute(prev, RSYNC_LENGTH);
|
||||
} else {
|
||||
/* We don't have enough bytes buffered to initialize the hash, but
|
||||
* we know we have at least RSYNC_LENGTH bytes total.
|
||||
* Start scanning after the first RSYNC_LENGTH bytes less the bytes
|
||||
* already buffered.
|
||||
*/
|
||||
pos = RSYNC_LENGTH - mtctx->inBuff.filled;
|
||||
prev = (BYTE const*)mtctx->inBuff.buffer.start - pos;
|
||||
hash = ZSTD_rollingHash_compute(mtctx->inBuff.buffer.start, mtctx->inBuff.filled);
|
||||
hash = ZSTD_rollingHash_append(hash, istart, pos);
|
||||
}
|
||||
/* Starting with the hash of the previous RSYNC_LENGTH bytes, roll
|
||||
* through the input. If we hit a synchronization point, then cut the
|
||||
* job off, and tell the compressor to flush the job. Otherwise, load
|
||||
* all the bytes and continue as normal.
|
||||
* If we go too long without a synchronization point (targetSectionSize)
|
||||
* then a block will be emitted anyways, but this is okay, since if we
|
||||
* are already synchronized we will remain synchronized.
|
||||
*/
|
||||
for (; pos < syncPoint.toLoad; ++pos) {
|
||||
BYTE const toRemove = pos < RSYNC_LENGTH ? prev[pos] : istart[pos - RSYNC_LENGTH];
|
||||
/* if (pos >= RSYNC_LENGTH) assert(ZSTD_rollingHash_compute(istart + pos - RSYNC_LENGTH, RSYNC_LENGTH) == hash); */
|
||||
hash = ZSTD_rollingHash_rotate(hash, toRemove, istart[pos], primePower);
|
||||
if ((hash & hitMask) == hitMask) {
|
||||
syncPoint.toLoad = pos + 1;
|
||||
syncPoint.flush = 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return syncPoint;
|
||||
}
|
||||
|
||||
/** ZSTDMT_compressStream_generic() :
|
||||
* internal use only - exposed to be invoked from zstd_compress.c
|
||||
|
@ -1844,7 +1935,8 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
|
|||
}
|
||||
|
||||
/* single-pass shortcut (note : synchronous-mode) */
|
||||
if ( (mtctx->nextJobID == 0) /* just started */
|
||||
if ( (!mtctx->params.rsyncable) /* rsyncable mode is disabled */
|
||||
&& (mtctx->nextJobID == 0) /* just started */
|
||||
&& (mtctx->inBuff.filled == 0) /* nothing buffered */
|
||||
&& (!mtctx->jobReady) /* no job already created */
|
||||
&& (endOp == ZSTD_e_end) /* end order */
|
||||
|
@ -1876,14 +1968,17 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
|
|||
DEBUGLOG(5, "ZSTDMT_tryGetInputRange completed successfully : mtctx->inBuff.buffer.start = %p", mtctx->inBuff.buffer.start);
|
||||
}
|
||||
if (mtctx->inBuff.buffer.start != NULL) {
|
||||
size_t const toLoad = MIN(input->size - input->pos, mtctx->targetSectionSize - mtctx->inBuff.filled);
|
||||
syncPoint_t const syncPoint = findSynchronizationPoint(mtctx, *input);
|
||||
if (syncPoint.flush && endOp == ZSTD_e_continue) {
|
||||
endOp = ZSTD_e_flush;
|
||||
}
|
||||
assert(mtctx->inBuff.buffer.capacity >= mtctx->targetSectionSize);
|
||||
DEBUGLOG(5, "ZSTDMT_compressStream_generic: adding %u bytes on top of %u to buffer of size %u",
|
||||
(U32)toLoad, (U32)mtctx->inBuff.filled, (U32)mtctx->targetSectionSize);
|
||||
memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, toLoad);
|
||||
input->pos += toLoad;
|
||||
mtctx->inBuff.filled += toLoad;
|
||||
forwardInputProgress = toLoad>0;
|
||||
(U32)syncPoint.toLoad, (U32)mtctx->inBuff.filled, (U32)mtctx->targetSectionSize);
|
||||
memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, syncPoint.toLoad);
|
||||
input->pos += syncPoint.toLoad;
|
||||
mtctx->inBuff.filled += syncPoint.toLoad;
|
||||
forwardInputProgress = syncPoint.toLoad>0;
|
||||
}
|
||||
if ((input->pos < input->size) && (endOp == ZSTD_e_end))
|
||||
endOp = ZSTD_e_flush; /* can't end now : not all input consumed */
|
||||
|
|
|
@ -85,7 +85,8 @@ ZSTDLIB_API size_t ZSTDMT_initCStream_usingCDict(ZSTDMT_CCtx* mtctx,
|
|||
* List of parameters that can be set using ZSTDMT_setMTCtxParameter() */
|
||||
typedef enum {
|
||||
ZSTDMT_p_jobSize, /* Each job is compressed in parallel. By default, this value is dynamically determined depending on compression parameters. Can be set explicitly here. */
|
||||
ZSTDMT_p_overlapSectionLog /* Each job may reload a part of previous job to enhance compressionr ratio; 0 == no overlap, 6(default) == use 1/8th of window, >=9 == use full window. This is a "sticky" parameter : its value will be re-used on next compression job */
|
||||
ZSTDMT_p_overlapSectionLog, /* Each job may reload a part of previous job to enhance compressionr ratio; 0 == no overlap, 6(default) == use 1/8th of window, >=9 == use full window. This is a "sticky" parameter : its value will be re-used on next compression job */
|
||||
ZSTDMT_p_rsyncable /* Enables rsyncable mode. */
|
||||
} ZSTDMT_parameter;
|
||||
|
||||
/* ZSTDMT_setMTCtxParameter() :
|
||||
|
|
21
lib/zstd.h
21
lib/zstd.h
|
@ -1149,6 +1149,27 @@ typedef enum {
|
|||
* enum. See the comments on that enum for an
|
||||
* explanation of the feature.
|
||||
*/
|
||||
ZSTD_p_rsyncable, /* Enables rsyncable mode, which makes compressed
|
||||
* files more rsync friendly by adding periodic
|
||||
* synchronization points to the compressed data.
|
||||
* The target average block size is
|
||||
* ZSTD_p_jobSize / 2. You can modify the job size
|
||||
* to increase or decrease the granularity of the
|
||||
* synchronization point. Once the jobSize is
|
||||
* smaller than the window size, you will start to
|
||||
* see degraded compression ratio.
|
||||
* NOTE: This only works when multithreading is
|
||||
* enabled.
|
||||
* NOTE: You probably don't want to use this with
|
||||
* long range mode, since that will decrease the
|
||||
* effectiveness of the synchronization points,
|
||||
* but your milage may vary.
|
||||
* NOTE: Rsyncable mode will limit the maximum
|
||||
* compression speed to approximately 400 MB/s.
|
||||
* If your compression level is already running
|
||||
* significantly slower than that (< 200 MB/s),
|
||||
* the speed won't be significantly impacted.
|
||||
*/
|
||||
} ZSTD_cParameter;
|
||||
|
||||
|
||||
|
|
|
@ -307,6 +307,12 @@ void FIO_setAdaptiveMode(unsigned adapt) {
|
|||
EXM_THROW(1, "Adaptive mode is not compatible with single thread mode \n");
|
||||
g_adaptiveMode = adapt;
|
||||
}
|
||||
static U32 g_rsyncable = 0;
|
||||
void FIO_setRsyncable(unsigned rsyncable) {
|
||||
if ((rsyncable>0) && (g_nbWorkers==0))
|
||||
EXM_THROW(1, "Rsyncable mode is not compatible with single thread mode \n");
|
||||
g_rsyncable = rsyncable;
|
||||
}
|
||||
static int g_minAdaptLevel = -50; /* initializing this value requires a constant, so ZSTD_minCLevel() doesn't work */
|
||||
void FIO_setAdaptMin(int minCLevel)
|
||||
{
|
||||
|
@ -550,6 +556,7 @@ static cRess_t FIO_createCResources(const char* dictFileName, int cLevel,
|
|||
#ifdef ZSTD_MULTITHREAD
|
||||
DISPLAYLEVEL(5,"set nb workers = %u \n", g_nbWorkers);
|
||||
CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_nbWorkers, g_nbWorkers) );
|
||||
CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_jobSize, g_blockSize) );
|
||||
if ( (g_overlapLog == FIO_OVERLAP_LOG_NOTSET)
|
||||
&& (cLevel == ZSTD_maxCLevel()) )
|
||||
g_overlapLog = 9; /* full overlap */
|
||||
|
@ -557,6 +564,7 @@ static cRess_t FIO_createCResources(const char* dictFileName, int cLevel,
|
|||
DISPLAYLEVEL(3,"set overlapLog = %u \n", g_overlapLog);
|
||||
CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_overlapSizeLog, g_overlapLog) );
|
||||
}
|
||||
CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_rsyncable, g_rsyncable) );
|
||||
#endif
|
||||
/* dictionary */
|
||||
CHECK( ZSTD_CCtx_setPledgedSrcSize(ress.cctx, srcSize) ); /* set the value temporarily for dictionary loading, to adapt compression parameters */
|
||||
|
|
|
@ -65,6 +65,7 @@ void FIO_setNotificationLevel(unsigned level);
|
|||
void FIO_setOverlapLog(unsigned overlapLog);
|
||||
void FIO_setRemoveSrcFile(unsigned flag);
|
||||
void FIO_setSparseWrite(unsigned sparse); /**< 0: no sparse; 1: disable on stdout; 2: always enabled */
|
||||
void FIO_setRsyncable(unsigned rsyncable);
|
||||
|
||||
|
||||
/*-*************************************
|
||||
|
|
|
@ -144,6 +144,14 @@ the last one takes effect.
|
|||
Due to the chaotic nature of dynamic adaptation, compressed result is not reproducible.
|
||||
_note_ : at the time of this writing, `--adapt` can remain stuck at low speed
|
||||
when combined with multiple worker threads (>=2).
|
||||
* `--rsyncable` :
|
||||
`zstd` will periodically synchronize the compression state to make the
|
||||
compressed file more rsync-friendly. There is a negligible impact to
|
||||
compression ratio, and the faster compression levels will see a small
|
||||
compression speed hit.
|
||||
This feature does not work with `--single-thread`. You probably don't want
|
||||
to use it with long range mode, since it will decrease the effectiveness of
|
||||
the synchronization points, but your milage may vary.
|
||||
* `-D file`:
|
||||
use `file` as Dictionary to compress or decompress FILE(s)
|
||||
* `--no-dictID`:
|
||||
|
|
|
@ -143,6 +143,7 @@ static int usage_advanced(const char* programName)
|
|||
#ifdef ZSTD_MULTITHREAD
|
||||
DISPLAY( " -T# : spawns # compression threads (default: 1, 0==# cores) \n");
|
||||
DISPLAY( " -B# : select size of each job (default: 0==automatic) \n");
|
||||
DISPLAY( " --rsyncable : compress using a rsync-friendly method (-B sets block size) \n");
|
||||
#endif
|
||||
DISPLAY( "--no-dictID : don't write dictID into header (dictionary compression)\n");
|
||||
DISPLAY( "--[no-]check : integrity check (default: enabled) \n");
|
||||
|
@ -475,6 +476,7 @@ int main(int argCount, const char* argv[])
|
|||
adapt = 0,
|
||||
adaptMin = MINCLEVEL,
|
||||
adaptMax = MAXCLEVEL,
|
||||
rsyncable = 0,
|
||||
nextArgumentIsOutFileName = 0,
|
||||
nextArgumentIsMaxDict = 0,
|
||||
nextArgumentIsDictID = 0,
|
||||
|
@ -607,6 +609,7 @@ int main(int argCount, const char* argv[])
|
|||
#ifdef ZSTD_LZ4COMPRESS
|
||||
if (!strcmp(argument, "--format=lz4")) { suffix = LZ4_EXTENSION; FIO_setCompressionType(FIO_lz4Compression); continue; }
|
||||
#endif
|
||||
if (!strcmp(argument, "--rsyncable")) { rsyncable = 1; continue; }
|
||||
|
||||
/* long commands with arguments */
|
||||
#ifndef ZSTD_NODICT
|
||||
|
@ -1052,6 +1055,7 @@ int main(int argCount, const char* argv[])
|
|||
FIO_setAdaptiveMode(adapt);
|
||||
FIO_setAdaptMin(adaptMin);
|
||||
FIO_setAdaptMax(adaptMax);
|
||||
FIO_setRsyncable(rsyncable);
|
||||
if (adaptMin > cLevel) cLevel = adaptMin;
|
||||
if (adaptMax < cLevel) cLevel = adaptMax;
|
||||
|
||||
|
@ -1060,7 +1064,7 @@ int main(int argCount, const char* argv[])
|
|||
else
|
||||
operationResult = FIO_compressMultipleFilenames(filenameTable, filenameIdx, outFileName, suffix, dictFileName, cLevel, compressionParams);
|
||||
#else
|
||||
(void)suffix; (void)adapt; (void)ultra; (void)cLevel; (void)ldmFlag; /* not used when ZSTD_NOCOMPRESS set */
|
||||
(void)suffix; (void)adapt; (void)rsyncable; (void)ultra; (void)cLevel; (void)ldmFlag; /* not used when ZSTD_NOCOMPRESS set */
|
||||
DISPLAY("Compression not supported \n");
|
||||
#endif
|
||||
} else { /* decompression or test */
|
||||
|
|
|
@ -836,6 +836,12 @@ $ECHO "===> test: --adapt must fail on incoherent bounds "
|
|||
./datagen > tmp
|
||||
$ZSTD -f -vv --adapt=min=10,max=9 tmp && die "--adapt must fail on incoherent bounds"
|
||||
|
||||
$ECHO "\n===> rsyncable mode "
|
||||
roundTripTest -g10M " --rsyncable"
|
||||
roundTripTest -g10M " --rsyncable -B100K"
|
||||
$ECHO "===> test: --rsyncable must fail with --single-thread"
|
||||
$ZSTD -f -vv --rsyncable --single-thread tmp && die "--rsyncable must fail with --single-thread"
|
||||
|
||||
|
||||
if [ "$1" != "--test-large-data" ]; then
|
||||
$ECHO "Skipping large data tests"
|
||||
|
|
|
@ -1915,6 +1915,8 @@ static int fuzzerTests_newAPI(U32 seed, U32 nbTests, unsigned startTest,
|
|||
CHECK_Z( setCCtxParameter(zc, cctxParams, ZSTD_p_jobSize, (U32)FUZ_rLogLength(&lseed, jobLog), opaqueAPI) );
|
||||
}
|
||||
}
|
||||
/* Enable rsyncable mode 1 in 4 times. */
|
||||
setCCtxParameter(zc, cctxParams, ZSTD_p_rsyncable, (FUZ_rand(&lseed) % 4 == 0), opaqueAPI);
|
||||
|
||||
if (FUZ_rand(&lseed) & 1) CHECK_Z( setCCtxParameter(zc, cctxParams, ZSTD_p_forceMaxWindow, FUZ_rand(&lseed) & 1, opaqueAPI) );
|
||||
|
||||
|
|
Loading…
Reference in New Issue