diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index 92e0bf1d..c59511b9 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -922,6 +922,7 @@ static size_t ZSTD_continueCCtx(ZSTD_CCtx* cctx, ZSTD_CCtx_params params, U64 pl cctx->dictID = 0; if (params.ldmParams.enableLdm) ZSTD_window_clear(&cctx->ldmState.window); + ZSTD_referenceExternalSequences(cctx, NULL, 0); ZSTD_invalidateMatchState(&cctx->blockState.matchState); ZSTD_reset_compressedBlockState(cctx->blockState.prevCBlock); XXH64_reset(&cctx->xxhState, 0); @@ -1108,6 +1109,7 @@ static size_t ZSTD_resetCCtx_internal(ZSTD_CCtx* zc, ptr = zc->ldmState.bucketOffsets + ldmBucketSize; ZSTD_window_clear(&zc->ldmState.window); } + ZSTD_referenceExternalSequences(zc, NULL, 0); /* buffers */ zc->inBuffSize = buffInSize; @@ -1925,8 +1927,10 @@ static size_t ZSTD_compressBlock_internal(ZSTD_CCtx* zc, ZSTD_matchState_t* const ms = &zc->blockState.matchState; DEBUGLOG(5, "ZSTD_compressBlock_internal (dstCapacity=%u, dictLimit=%u, nextToUpdate=%u)", (U32)dstCapacity, ms->window.dictLimit, ms->nextToUpdate); - if (srcSize < MIN_CBLOCK_SIZE+ZSTD_blockHeaderSize+1) + if (srcSize < MIN_CBLOCK_SIZE+ZSTD_blockHeaderSize+1) { + ZSTD_ldm_skipSequences(&zc->externSeqStore, srcSize, zc->appliedParams.cParams.searchLength); return 0; /* don't even attempt compression below a certain srcSize */ + } ZSTD_resetSeqStore(&(zc->seqStore)); /* limited update after a very long match */ diff --git a/lib/compress/zstd_ldm.c b/lib/compress/zstd_ldm.c index 5c9c0d2b..1684ac11 100644 --- a/lib/compress/zstd_ldm.c +++ b/lib/compress/zstd_ldm.c @@ -536,6 +536,34 @@ size_t ZSTD_ldm_generateSequences( return 0; } +void ZSTD_ldm_skipSequences(rawSeqStore_t* rawSeqStore, size_t srcSize, U32 const minMatch) { + while (srcSize > 0 && rawSeqStore->pos < rawSeqStore->size) { + rawSeq* seq = rawSeqStore->seq + rawSeqStore->pos; + if (srcSize <= seq->litLength) { + /* Skip past srcSize literals */ + seq->litLength -= (U32)srcSize; + return; + } + srcSize -= seq->litLength; + seq->litLength = 0; + if (srcSize < seq->matchLength) { + /* Skip past the first srcSize of the match */ + seq->matchLength -= (U32)srcSize; + if (seq->matchLength < minMatch) { + /* The match is too short, omit it */ + if (rawSeqStore->pos + 1 < rawSeqStore->size) { + seq[1].litLength += seq[0].matchLength; + } + rawSeqStore->pos++; + } + return; + } + srcSize -= seq->matchLength; + seq->matchLength = 0; + rawSeqStore->pos++; + } +} + /** * If the sequence length is longer than remaining then the sequence is split * between this block and the next. @@ -546,51 +574,24 @@ size_t ZSTD_ldm_generateSequences( static rawSeq maybeSplitSequence(rawSeqStore_t* rawSeqStore, U32 const remaining, U32 const minMatch) { - size_t const pos = rawSeqStore->pos; rawSeq sequence = rawSeqStore->seq[rawSeqStore->pos]; assert(sequence.offset > 0); - /* Handle partial sequences */ + /* Likely: No partial sequence */ + if (remaining >= sequence.litLength + sequence.matchLength) { + rawSeqStore->pos++; + return sequence; + } + /* Cut the sequence short (offset == 0 ==> rest is literals). */ if (remaining <= sequence.litLength) { - /* Split the literals that we have out of the sequence. - * They will become the last literals of this block. - * The next block starts off with the remaining literals. - */ - rawSeqStore->seq[pos].litLength -= remaining; sequence.offset = 0; } else if (remaining < sequence.litLength + sequence.matchLength) { - /* Split the match up into two sequences. One in this block, and one - * in the next with no literals. If either match would be shorter - * than searchLength we omit it. - */ - U32 const matchPrefix = remaining - sequence.litLength; - U32 const matchSuffix = sequence.matchLength - matchPrefix; - - assert(remaining > sequence.litLength); - assert(matchPrefix < sequence.matchLength); - assert(matchPrefix + matchSuffix == sequence.matchLength); - /* Update the first sequence */ - sequence.matchLength = matchPrefix; - /* Update the second sequence */ - if (matchSuffix >= minMatch) { - /* Update the second sequence, since the suffix is long enough */ - rawSeqStore->seq[pos].litLength = 0; - rawSeqStore->seq[pos].matchLength = matchSuffix; - } else { - /* Omit the second sequence since the match suffix is too short. - * Add to the next sequences literals (if any). - */ - if (pos + 1 < rawSeqStore->size) - rawSeqStore->seq[pos + 1].litLength += matchSuffix; - rawSeqStore->pos++; /* Consume the sequence */ - } + sequence.matchLength = remaining - sequence.litLength; if (sequence.matchLength < minMatch) { - /* Skip the current sequence if it is too short */ sequence.offset = 0; } - } else { - /* No partial sequence */ - rawSeqStore->pos++; /* Consume the sequence */ } + /* Skip past `remaining` bytes for the future sequences. */ + ZSTD_ldm_skipSequences(rawSeqStore, remaining, minMatch); return sequence; } diff --git a/lib/compress/zstd_ldm.h b/lib/compress/zstd_ldm.h index 9d2f7c39..84d3723c 100644 --- a/lib/compress/zstd_ldm.h +++ b/lib/compress/zstd_ldm.h @@ -65,6 +65,16 @@ size_t ZSTD_ldm_blockCompress(rawSeqStore_t* rawSeqStore, ZSTD_compressionParameters const* cParams, void const* src, size_t srcSize, int const extDict); +/** + * ZSTD_ldm_skipSequences(): + * + * Skip past `srcSize` bytes worth of sequences in `rawSeqStore`. + * Avoids emitting matches less than `minMatch` bytes. + * Must be called for data with is not passed to ZSTD_ldm_blockCompress(). + */ +void ZSTD_ldm_skipSequences(rawSeqStore_t* rawSeqStore, size_t srcSize, + U32 const minMatch); + /** ZSTD_ldm_initializeParameters() : * Initialize the long distance matching parameters to their default values. */ diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 99131260..975e4149 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -27,8 +27,14 @@ #include "pool.h" /* threadpool */ #include "threading.h" /* mutex */ #include "zstd_compress_internal.h" /* MIN, ERROR, ZSTD_*, ZSTD_highbit32 */ +#include "zstd_ldm.h" #include "zstdmt_compress.h" +/* Guards code to support resizing the SeqPool. + * We will want to resize the SeqPool to save memory in the future. + * Until then, comment the code out since it is unused. + */ +#define ZSTD_RESIZE_SEQPOOL 0 /* ====== Debug ====== */ #if defined(ZSTD_DEBUG) && (ZSTD_DEBUG>=2) @@ -194,6 +200,32 @@ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool) } } +#if ZSTD_RESIZE_SEQPOOL +/** ZSTDMT_resizeBuffer() : + * assumption : bufPool must be valid + * @return : a buffer that is at least the buffer pool buffer size. + * If a reallocation happens, the data in the input buffer is copied. + */ +static buffer_t ZSTDMT_resizeBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buffer) +{ + size_t const bSize = bufPool->bufferSize; + if (buffer.capacity < bSize) { + void* const start = ZSTD_malloc(bSize, bufPool->cMem); + buffer_t newBuffer; + newBuffer.start = start; + newBuffer.capacity = start == NULL ? 0 : bSize; + if (start != NULL) { + assert(newBuffer.capacity >= buffer.capacity); + memcpy(newBuffer.start, buffer.start, buffer.capacity); + DEBUGLOG(5, "ZSTDMT_resizeBuffer: created buffer of size %u", (U32)bSize); + return newBuffer; + } + DEBUGLOG(5, "ZSTDMT_resizeBuffer: buffer allocation failure !!"); + } + return buffer; +} +#endif + /* store buffer for later re-use, up to pool capacity */ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf) { @@ -214,6 +246,72 @@ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf) } +/* ===== Seq Pool Wrapper ====== */ + +static rawSeqStore_t kNullRawSeqStore = {NULL, 0, 0, 0}; + +typedef ZSTDMT_bufferPool ZSTDMT_seqPool; + +static size_t ZSTDMT_sizeof_seqPool(ZSTDMT_seqPool* seqPool) +{ + return ZSTDMT_sizeof_bufferPool(seqPool); +} + +static rawSeqStore_t bufferToSeq(buffer_t buffer) +{ + rawSeqStore_t seq = {NULL, 0, 0, 0}; + seq.seq = (rawSeq*)buffer.start; + seq.capacity = buffer.capacity / sizeof(rawSeq); + return seq; +} + +static buffer_t seqToBuffer(rawSeqStore_t seq) +{ + buffer_t buffer; + buffer.start = seq.seq; + buffer.capacity = seq.capacity * sizeof(rawSeq); + return buffer; +} + +static rawSeqStore_t ZSTDMT_getSeq(ZSTDMT_seqPool* seqPool) +{ + if (seqPool->bufferSize == 0) { + return kNullRawSeqStore; + } + return bufferToSeq(ZSTDMT_getBuffer(seqPool)); +} + +#if ZSTD_RESIZE_SEQPOOL +static rawSeqStore_t ZSTDMT_resizeSeq(ZSTDMT_seqPool* seqPool, rawSeqStore_t seq) +{ + return bufferToSeq(ZSTDMT_resizeBuffer(seqPool, seqToBuffer(seq))); +} +#endif + +static void ZSTDMT_releaseSeq(ZSTDMT_seqPool* seqPool, rawSeqStore_t seq) +{ + ZSTDMT_releaseBuffer(seqPool, seqToBuffer(seq)); +} + +static void ZSTDMT_setNbSeq(ZSTDMT_seqPool* const seqPool, size_t const nbSeq) +{ + ZSTDMT_setBufferSize(seqPool, nbSeq * sizeof(rawSeq)); +} + +static ZSTDMT_seqPool* ZSTDMT_createSeqPool(unsigned nbWorkers, ZSTD_customMem cMem) +{ + ZSTDMT_seqPool* seqPool = ZSTDMT_createBufferPool(nbWorkers, cMem); + ZSTDMT_setNbSeq(seqPool, 0); + return seqPool; +} + +static void ZSTDMT_freeSeqPool(ZSTDMT_seqPool* seqPool) +{ + ZSTDMT_freeBufferPool(seqPool); +} + + + /* ===== CCtx Pool ===== */ /* a single CCtx Pool can be invoked from multiple threads in parallel */ @@ -312,36 +410,97 @@ typedef struct { } range_t; typedef struct { + /* All variables in the struct are protected by mutex. */ ZSTD_pthread_mutex_t mutex; ZSTD_pthread_cond_t cond; ZSTD_CCtx_params params; + ldmState_t ldmState; XXH64_state_t xxhState; unsigned nextJobID; + /* Protects ldmWindow. + * Must be acquired after the main mutex when acquiring both. + */ + ZSTD_pthread_mutex_t ldmWindowMutex; + ZSTD_pthread_cond_t ldmWindowCond; /* Signaled when ldmWindow is udpated */ + ZSTD_window_t ldmWindow; /* A thread-safe copy of ldmState.window */ } serialState_t; -static void ZSTDMT_serialState_reset(serialState_t* serialState, ZSTD_CCtx_params params) +static int ZSTDMT_serialState_reset(serialState_t* serialState, ZSTDMT_seqPool* seqPool, ZSTD_CCtx_params params) { + /* Adjust parameters */ + if (params.ldmParams.enableLdm) { + DEBUGLOG(4, "LDM window size = %u KB", (1U << params.cParams.windowLog) >> 10); + params.ldmParams.windowLog = params.cParams.windowLog; + ZSTD_ldm_adjustParameters(¶ms.ldmParams, ¶ms.cParams); + assert(params.ldmParams.hashLog >= params.ldmParams.bucketSizeLog); + assert(params.ldmParams.hashEveryLog < 32); + serialState->ldmState.hashPower = + ZSTD_ldm_getHashPower(params.ldmParams.minMatchLength); + } else { + memset(¶ms.ldmParams, 0, sizeof(params.ldmParams)); + } serialState->nextJobID = 0; if (params.fParams.checksumFlag) XXH64_reset(&serialState->xxhState, 0); + if (params.ldmParams.enableLdm) { + ZSTD_customMem cMem = params.customMem; + unsigned const hashLog = params.ldmParams.hashLog; + size_t const hashSize = ((size_t)1 << hashLog) * sizeof(ldmEntry_t); + unsigned const bucketLog = + params.ldmParams.hashLog - params.ldmParams.bucketSizeLog; + size_t const bucketSize = (size_t)1 << bucketLog; + unsigned const prevBucketLog = + serialState->params.ldmParams.hashLog - + serialState->params.ldmParams.bucketSizeLog; + /* Size the seq pool tables */ + ZSTDMT_setNbSeq(seqPool, ZSTD_ldm_getMaxNbSeq(params.ldmParams, params.jobSize)); + /* Reset the window */ + ZSTD_window_clear(&serialState->ldmState.window); + serialState->ldmWindow = serialState->ldmState.window; + /* Resize tables and output space if necessary. */ + if (serialState->ldmState.hashTable == NULL || serialState->params.ldmParams.hashLog < hashLog) { + ZSTD_free(serialState->ldmState.hashTable, cMem); + serialState->ldmState.hashTable = (ldmEntry_t*)ZSTD_malloc(hashSize, cMem); + } + if (serialState->ldmState.bucketOffsets == NULL || prevBucketLog < bucketLog) { + ZSTD_free(serialState->ldmState.bucketOffsets, cMem); + serialState->ldmState.bucketOffsets = (BYTE*)ZSTD_malloc(bucketSize, cMem); + } + if (!serialState->ldmState.hashTable || !serialState->ldmState.bucketOffsets) + return 1; + /* Zero the tables */ + memset(serialState->ldmState.hashTable, 0, hashSize); + memset(serialState->ldmState.bucketOffsets, 0, bucketSize); + } serialState->params = params; + return 0; } static int ZSTDMT_serialState_init(serialState_t* serialState) { int initError = 0; + memset(serialState, 0, sizeof(*serialState)); initError |= ZSTD_pthread_mutex_init(&serialState->mutex, NULL); initError |= ZSTD_pthread_cond_init(&serialState->cond, NULL); + initError |= ZSTD_pthread_mutex_init(&serialState->ldmWindowMutex, NULL); + initError |= ZSTD_pthread_cond_init(&serialState->ldmWindowCond, NULL); return initError; } static void ZSTDMT_serialState_free(serialState_t* serialState) { + ZSTD_customMem cMem = serialState->params.customMem; ZSTD_pthread_mutex_destroy(&serialState->mutex); ZSTD_pthread_cond_destroy(&serialState->cond); + ZSTD_pthread_mutex_destroy(&serialState->ldmWindowMutex); + ZSTD_pthread_cond_destroy(&serialState->ldmWindowCond); + ZSTD_free(serialState->ldmState.hashTable, cMem); + ZSTD_free(serialState->ldmState.bucketOffsets, cMem); } -static void ZSTDMT_serialState_update(serialState_t* serialState, range_t src, unsigned jobID) +static void ZSTDMT_serialState_update(serialState_t* serialState, + ZSTD_CCtx* jobCCtx, rawSeqStore_t seqStore, + range_t src, unsigned jobID) { /* Wait for our turn */ ZSTD_PTHREAD_MUTEX_LOCK(&serialState->mutex); @@ -351,6 +510,24 @@ static void ZSTDMT_serialState_update(serialState_t* serialState, range_t src, u /* A future job may error and skip our job */ if (serialState->nextJobID == jobID) { /* It is now our turn, do any processing necessary */ + if (serialState->params.ldmParams.enableLdm) { + size_t error; + assert(seqStore.seq != NULL && seqStore.pos == 0 && + seqStore.size == 0 && seqStore.capacity > 0); + ZSTD_window_update(&serialState->ldmState.window, src.start, src.size); + error = ZSTD_ldm_generateSequences( + &serialState->ldmState, &seqStore, + &serialState->params.ldmParams, src.start, src.size); + /* We provide a large enough buffer to never fail. */ + assert(!ZSTD_isError(error)); (void)error; + /* Update ldmWindow to match the ldmState.window and signal the main + * thread if it is waiting for a buffer. + */ + ZSTD_PTHREAD_MUTEX_LOCK(&serialState->ldmWindowMutex); + serialState->ldmWindow = serialState->ldmState.window; + ZSTD_pthread_cond_signal(&serialState->ldmWindowCond); + ZSTD_pthread_mutex_unlock(&serialState->ldmWindowMutex); + } if (serialState->params.fParams.checksumFlag && src.size > 0) XXH64_update(&serialState->xxhState, src.start, src.size); } @@ -358,6 +535,14 @@ static void ZSTDMT_serialState_update(serialState_t* serialState, range_t src, u serialState->nextJobID++; ZSTD_pthread_cond_broadcast(&serialState->cond); ZSTD_pthread_mutex_unlock(&serialState->mutex); + + if (seqStore.size > 0) { + size_t const err = ZSTD_referenceExternalSequences( + jobCCtx, seqStore.seq, seqStore.size); + assert(serialState->params.ldmParams.enableLdm); + assert(!ZSTD_isError(err)); + (void)err; + } } static void ZSTDMT_serialState_ensureFinished(serialState_t* serialState, @@ -369,6 +554,11 @@ static void ZSTDMT_serialState_ensureFinished(serialState_t* serialState, DEBUGLOG(5, "Skipping past job %u because of error", jobID); serialState->nextJobID = jobID + 1; ZSTD_pthread_cond_broadcast(&serialState->cond); + + ZSTD_PTHREAD_MUTEX_LOCK(&serialState->ldmWindowMutex); + ZSTD_window_clear(&serialState->ldmWindow); + ZSTD_pthread_cond_signal(&serialState->ldmWindowCond); + ZSTD_pthread_mutex_unlock(&serialState->ldmWindowMutex); } ZSTD_pthread_mutex_unlock(&serialState->mutex); @@ -388,6 +578,7 @@ typedef struct { ZSTD_pthread_cond_t job_cond; /* Thread-safe - used by mtctx and worker */ ZSTDMT_CCtxPool* cctxPool; /* Thread-safe - used by mtctx and (all) workers */ ZSTDMT_bufferPool* bufPool; /* Thread-safe - used by mtctx and (all) workers */ + ZSTDMT_seqPool* seqPool; /* Thread-safe - used by mtctx and (all) workers */ serialState_t* serial; /* Thread-safe - used by mtctx and (all) workers */ buffer_t dstBuff; /* set by worker (or mtctx), then read by worker & mtctx, then modified by mtctx => no barrier */ range_t prefix; /* set by mtctx, then read by worker & mtctx => no barrier */ @@ -408,10 +599,15 @@ void ZSTDMT_compressionJob(void* jobDescription) ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription; ZSTD_CCtx_params jobParams = job->params; /* do not modify job->params ! copy it, modify the copy */ ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(job->cctxPool); + rawSeqStore_t rawSeqStore = ZSTDMT_getSeq(job->seqPool); buffer_t dstBuff = job->dstBuff; - /* Don't compute the checksum for chunks, but write it in the header */ + /* Don't compute the checksum for chunks, since we compute it externally, + * but write it in the header. + */ if (job->jobID != 0) jobParams.fParams.checksumFlag = 0; + /* Don't run LDM for the chunks, since we handle it externally */ + jobParams.ldmParams.enableLdm = 0; /* ressources */ if (cctx==NULL) { @@ -448,8 +644,8 @@ void ZSTDMT_compressionJob(void* jobDescription) goto _endJob; } } } - /* Perform serial step as early as possible */ - ZSTDMT_serialState_update(job->serial, job->src, job->jobID); + /* Perform serial step as early as possible, but after CCtx initialization */ + ZSTDMT_serialState_update(job->serial, cctx, rawSeqStore, job->src, job->jobID); if (!job->firstJob) { /* flush and overwrite frame header when it's not first job */ size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.capacity, job->src.start, 0); @@ -504,6 +700,7 @@ _endJob: DEBUGLOG(5, "Finished with prefix: %zx", (size_t)job->prefix.start); DEBUGLOG(5, "Finished with source: %zx", (size_t)job->src.start); /* release resources */ + ZSTDMT_releaseSeq(job->seqPool, rawSeqStore); ZSTDMT_releaseCCtx(job->cctxPool, cctx); /* report */ ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); @@ -544,6 +741,7 @@ struct ZSTDMT_CCtx_s { ZSTDMT_jobDescription* jobs; ZSTDMT_bufferPool* bufPool; ZSTDMT_CCtxPool* cctxPool; + ZSTDMT_seqPool* seqPool; ZSTD_CCtx_params params; size_t targetSectionSize; size_t targetPrefixSize; @@ -635,9 +833,10 @@ ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbWorkers, ZSTD_customMem cMem) mtctx->jobIDMask = nbJobs - 1; mtctx->bufPool = ZSTDMT_createBufferPool(nbWorkers, cMem); mtctx->cctxPool = ZSTDMT_createCCtxPool(nbWorkers, cMem); + mtctx->seqPool = ZSTDMT_createSeqPool(nbWorkers, cMem); initError = ZSTDMT_serialState_init(&mtctx->serial); mtctx->roundBuff = kNullRoundBuff; - if (!mtctx->factory | !mtctx->jobs | !mtctx->bufPool | !mtctx->cctxPool | initError) { + if (!mtctx->factory | !mtctx->jobs | !mtctx->bufPool | !mtctx->cctxPool | !mtctx->seqPool | initError) { ZSTDMT_freeCCtx(mtctx); return NULL; } @@ -692,6 +891,7 @@ size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx) ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem); ZSTDMT_freeBufferPool(mtctx->bufPool); ZSTDMT_freeCCtxPool(mtctx->cctxPool); + ZSTDMT_freeSeqPool(mtctx->seqPool); ZSTDMT_serialState_free(&mtctx->serial); ZSTD_freeCDict(mtctx->cdictLocal); if (mtctx->roundBuff.buffer) @@ -708,6 +908,7 @@ size_t ZSTDMT_sizeof_CCtx(ZSTDMT_CCtx* mtctx) + ZSTDMT_sizeof_bufferPool(mtctx->bufPool) + (mtctx->jobIDMask+1) * sizeof(ZSTDMT_jobDescription) + ZSTDMT_sizeof_CCtxPool(mtctx->cctxPool) + + ZSTDMT_sizeof_seqPool(mtctx->seqPool) + ZSTD_sizeof_CDict(mtctx->cdictLocal) + mtctx->roundBuff.capacity; } @@ -761,7 +962,6 @@ static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params) jobParams.compressionLevel = params.compressionLevel; jobParams.disableLiteralCompression = params.disableLiteralCompression; - jobParams.ldmParams = params.ldmParams; return jobParams; } @@ -827,12 +1027,16 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx) static size_t ZSTDMT_computeTargetJobLog(ZSTD_CCtx_params const params) { + if (params.ldmParams.enableLdm) + return MAX(21, params.cParams.chainLog + 4); return params.cParams.windowLog + 2; } static size_t ZSTDMT_computeOverlapLog(ZSTD_CCtx_params const params) { unsigned const overlapRLog = (params.overlapSizeLog>9) ? 0 : 9-params.overlapSizeLog; + if (params.ldmParams.enableLdm) + return (MIN(params.cParams.windowLog, ZSTDMT_computeTargetJobLog(params) - 2) - overlapRLog); return overlapRLog >= 9 ? 0 : (params.cParams.windowLog - overlapRLog); } @@ -856,7 +1060,7 @@ static size_t ZSTDMT_compress_advanced_internal( void* dst, size_t dstCapacity, const void* src, size_t srcSize, const ZSTD_CDict* cdict, - ZSTD_CCtx_params const params) + ZSTD_CCtx_params params) { ZSTD_CCtx_params const jobParams = ZSTDMT_initJobCCtxParams(params); size_t const overlapSize = (size_t)1 << ZSTDMT_computeOverlapLog(params); @@ -870,6 +1074,7 @@ static size_t ZSTDMT_compress_advanced_internal( assert(jobParams.nbWorkers == 0); assert(mtctx->cctxPool->totalCCtx == params.nbWorkers); + params.jobSize = (U32)avgJobSize; DEBUGLOG(4, "ZSTDMT_compress_advanced_internal: nbJobs=%2u (rawSize=%u bytes; fixedSize=%u) ", nbJobs, (U32)proposedJobSize, (U32)avgJobSize); @@ -882,7 +1087,8 @@ static size_t ZSTDMT_compress_advanced_internal( assert(avgJobSize >= 256 KB); /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), required to compress directly into Dst (no additional buffer) */ ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgJobSize) ); - ZSTDMT_serialState_reset(&mtctx->serial, params); + if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params)) + return ERROR(memory_allocation); if (nbJobs > mtctx->jobIDMask+1) { /* enlarge job table */ U32 jobsTableSize = nbJobs; @@ -915,6 +1121,7 @@ static size_t ZSTDMT_compress_advanced_internal( mtctx->jobs[u].dstBuff = dstBuffer; mtctx->jobs[u].cctxPool = mtctx->cctxPool; mtctx->jobs[u].bufPool = mtctx->bufPool; + mtctx->jobs[u].seqPool = mtctx->seqPool; mtctx->jobs[u].serial = &mtctx->serial; mtctx->jobs[u].jobID = u; mtctx->jobs[u].firstJob = (u==0); @@ -1023,10 +1230,7 @@ size_t ZSTDMT_initCStream_internal( /* init */ if (params.jobSize == 0) { - if (params.cParams.windowLog >= 29) - params.jobSize = ZSTDMT_JOBSIZE_MAX; - else - params.jobSize = 1U << ZSTDMT_computeTargetJobLog(params); + params.jobSize = 1U << ZSTDMT_computeTargetJobLog(params); } if (params.jobSize > ZSTDMT_JOBSIZE_MAX) params.jobSize = ZSTDMT_JOBSIZE_MAX; @@ -1072,11 +1276,20 @@ size_t ZSTDMT_initCStream_internal( DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->targetSectionSize>>10)); ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(mtctx->targetSectionSize)); { - /* Two buffers of slack, plus extra space for the overlap */ + /* If ldm is enabled we need windowSize space. */ + size_t const windowSize = mtctx->params.ldmParams.enableLdm ? (1U << mtctx->params.cParams.windowLog) : 0; + /* Two buffers of slack, plus extra space for the overlap + * This is the minimum slack that LDM works with. One extra because + * flush might waste up to targetSectionSize-1 bytes. Another extra + * for the overlap (if > 0), then one to fill which doesn't overlap + * with the LDM window. + */ + size_t const nbSlackBuffers = 2 + (mtctx->targetPrefixSize > 0); + size_t const slackSize = mtctx->targetSectionSize * nbSlackBuffers; + /* Compute the total size, and always have enough slack */ size_t const nbWorkers = MAX(mtctx->params.nbWorkers, 1); - size_t const nbSlackBuffers = MIN(nbWorkers, 2) + (mtctx->targetPrefixSize > 0); - size_t const nbSections = nbWorkers + nbSlackBuffers; - size_t const capacity = mtctx->targetSectionSize * nbSections; + size_t const sectionsSize = mtctx->targetSectionSize * nbWorkers; + size_t const capacity = MAX(windowSize, sectionsSize) + slackSize; if (mtctx->roundBuff.capacity < capacity) { if (mtctx->roundBuff.buffer) ZSTD_free(mtctx->roundBuff.buffer, mtctx->cMem); @@ -1099,7 +1312,8 @@ size_t ZSTDMT_initCStream_internal( mtctx->allJobsCompleted = 0; mtctx->consumed = 0; mtctx->produced = 0; - ZSTDMT_serialState_reset(&mtctx->serial, params); + if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params)) + return ERROR(memory_allocation); return 0; } @@ -1201,6 +1415,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS mtctx->jobs[jobID].dstBuff = g_nullBuffer; mtctx->jobs[jobID].cctxPool = mtctx->cctxPool; mtctx->jobs[jobID].bufPool = mtctx->bufPool; + mtctx->jobs[jobID].seqPool = mtctx->seqPool; mtctx->jobs[jobID].serial = &mtctx->serial; mtctx->jobs[jobID].jobID = mtctx->nextJobID; mtctx->jobs[jobID].firstJob = (mtctx->nextJobID==0); @@ -1383,10 +1598,51 @@ static int ZSTDMT_isOverlapped(buffer_t buffer, range_t range) if (rangeStart == NULL || bufferStart == NULL) return 0; + /* Empty ranges cannot overlap */ + if (bufferStart == bufferEnd || rangeStart == rangeEnd) + return 0; return bufferStart < rangeEnd && rangeStart < bufferEnd; } +static int ZSTDMT_doesOverlapWindow(buffer_t buffer, ZSTD_window_t window) +{ + range_t extDict; + range_t prefix; + + extDict.start = window.dictBase + window.lowLimit; + extDict.size = window.dictLimit - window.lowLimit; + + prefix.start = window.base + window.dictLimit; + prefix.size = window.nextSrc - (window.base + window.dictLimit); + DEBUGLOG(5, "extDict [0x%zx, 0x%zx)", + (size_t)extDict.start, + (size_t)extDict.start + extDict.size); + DEBUGLOG(5, "prefix [0x%zx, 0x%zx)", + (size_t)prefix.start, + (size_t)prefix.start + prefix.size); + + return ZSTDMT_isOverlapped(buffer, extDict) + || ZSTDMT_isOverlapped(buffer, prefix); +} + +static void ZSTDMT_waitForLdmComplete(ZSTDMT_CCtx* mtctx, buffer_t buffer) +{ + if (mtctx->params.ldmParams.enableLdm) { + ZSTD_pthread_mutex_t* mutex = &mtctx->serial.ldmWindowMutex; + DEBUGLOG(5, "source [0x%zx, 0x%zx)", + (size_t)buffer.start, + (size_t)buffer.start + buffer.capacity); + ZSTD_PTHREAD_MUTEX_LOCK(mutex); + while (ZSTDMT_doesOverlapWindow(buffer, mtctx->serial.ldmWindow)) { + DEBUGLOG(6, "Waiting for LDM to finish..."); + ZSTD_pthread_cond_wait(&mtctx->serial.ldmWindowCond, mutex); + } + DEBUGLOG(6, "Done waiting for LDM to finish"); + ZSTD_pthread_mutex_unlock(mutex); + } +} + /** * Attempts to set the inBuff to the next section to fill. * If any part of the new section is still in use we give up. @@ -1415,6 +1671,7 @@ static int ZSTDMT_tryGetInputRange(ZSTDMT_CCtx* mtctx) DEBUGLOG(6, "Waiting for buffer..."); return 0; } + ZSTDMT_waitForLdmComplete(mtctx, buffer); memmove(start, mtctx->inBuff.prefix.start, prefixSize); mtctx->inBuff.prefix.start = start; mtctx->roundBuff.pos = prefixSize; @@ -1427,6 +1684,9 @@ static int ZSTDMT_tryGetInputRange(ZSTDMT_CCtx* mtctx) return 0; } assert(!ZSTDMT_isOverlapped(buffer, mtctx->inBuff.prefix)); + + ZSTDMT_waitForLdmComplete(mtctx, buffer); + DEBUGLOG(5, "Using prefix range [%zx, %zx)", (size_t)mtctx->inBuff.prefix.start, (size_t)mtctx->inBuff.prefix.start + mtctx->inBuff.prefix.size);