From 94c77710a90c982ce22d776fa34ab5f3dc5e0446 Mon Sep 17 00:00:00 2001 From: Nick Terrell Date: Tue, 6 Mar 2018 19:51:21 -0800 Subject: [PATCH 1/6] Integrate ldm with zstdmt Integrate ldm into zstdmt by running it in serial and in order in the first step of each job, in the same place as the hash gets updated. The input buffer is sized to fit the whole LDM window and 2 full buffers of slack. Input buffers cannot be reused until the LDM step is done with them. After the LDM step is finished, the jobs don't actually have access to the full window, only the overlap. Tested on a few different multi-GB files with and without sanitizers, and with different numbers of threads. --- lib/compress/zstdmt_compress.c | 282 +++++++++++++++++++++++++++++++-- 1 file changed, 266 insertions(+), 16 deletions(-) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 99131260..3225f982 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -27,8 +27,14 @@ #include "pool.h" /* threadpool */ #include "threading.h" /* mutex */ #include "zstd_compress_internal.h" /* MIN, ERROR, ZSTD_*, ZSTD_highbit32 */ +#include "zstd_ldm.h" #include "zstdmt_compress.h" +/* Guards code to support resizing the SeqPool. + * We will want to resize the SeqPool to save memory in the future. + * Until then, comment the code out since it is unused. + */ +#define ZSTD_RESIZE_SEQPOOL 0 /* ====== Debug ====== */ #if defined(ZSTD_DEBUG) && (ZSTD_DEBUG>=2) @@ -194,6 +200,32 @@ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool) } } +#if ZSTD_RESIZE_SEQPOOL +/** ZSTDMT_resizeBuffer() : + * assumption : bufPool must be valid + * @return : a buffer that is at least the buffer pool buffer size. + * If a reallocation happens, the data in the input buffer is copied. + */ +static buffer_t ZSTDMT_resizeBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buffer) +{ + size_t const bSize = bufPool->bufferSize; + if (buffer.capacity < bSize) { + void* const start = ZSTD_malloc(bSize, bufPool->cMem); + buffer_t newBuffer; + newBuffer.start = start; + newBuffer.capacity = start == NULL ? 0 : bSize; + if (start != NULL) { + assert(newBuffer.capacity >= buffer.capacity); + memcpy(newBuffer.start, buffer.start, buffer.capacity); + DEBUGLOG(5, "ZSTDMT_resizeBuffer: created buffer of size %u", (U32)bSize); + return newBuffer; + } + DEBUGLOG(5, "ZSTDMT_resizeBuffer: buffer allocation failure !!"); + } + return buffer; +} +#endif + /* store buffer for later re-use, up to pool capacity */ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf) { @@ -214,6 +246,72 @@ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf) } +/* ===== Seq Pool Wrapper ====== */ + +static rawSeqStore_t kNullRawSeqStore = {NULL, 0, 0, 0}; + +typedef ZSTDMT_bufferPool ZSTDMT_seqPool; + +static size_t ZSTDMT_sizeof_seqPool(ZSTDMT_seqPool* seqPool) +{ + return ZSTDMT_sizeof_bufferPool(seqPool); +} + +static rawSeqStore_t bufferToSeq(buffer_t buffer) +{ + rawSeqStore_t seq = {NULL, 0, 0, 0}; + seq.seq = (rawSeq*)buffer.start; + seq.capacity = buffer.capacity / sizeof(rawSeq); + return seq; +} + +static buffer_t seqToBuffer(rawSeqStore_t seq) +{ + buffer_t buffer; + buffer.start = seq.seq; + buffer.capacity = seq.capacity * sizeof(rawSeq); + return buffer; +} + +static rawSeqStore_t ZSTDMT_getSeq(ZSTDMT_seqPool* seqPool) +{ + if (seqPool->bufferSize == 0) { + return kNullRawSeqStore; + } + return bufferToSeq(ZSTDMT_getBuffer(seqPool)); +} + +#if ZSTD_RESIZE_SEQPOOL +static rawSeqStore_t ZSTDMT_resizeSeq(ZSTDMT_seqPool* seqPool, rawSeqStore_t seq) +{ + return bufferToSeq(ZSTDMT_resizeBuffer(seqPool, seqToBuffer(seq))); +} +#endif + +static void ZSTDMT_releaseSeq(ZSTDMT_seqPool* seqPool, rawSeqStore_t seq) +{ + ZSTDMT_releaseBuffer(seqPool, seqToBuffer(seq)); +} + +static void ZSTDMT_setNbSeq(ZSTDMT_seqPool* const seqPool, size_t const nbSeq) +{ + ZSTDMT_setBufferSize(seqPool, nbSeq * sizeof(rawSeq)); +} + +static ZSTDMT_seqPool* ZSTDMT_createSeqPool(unsigned nbWorkers, ZSTD_customMem cMem) +{ + ZSTDMT_seqPool* seqPool = ZSTDMT_createBufferPool(nbWorkers, cMem); + ZSTDMT_setNbSeq(seqPool, 0); + return seqPool; +} + +static void ZSTDMT_freeSeqPool(ZSTDMT_seqPool* seqPool) +{ + ZSTDMT_freeBufferPool(seqPool); +} + + + /* ===== CCtx Pool ===== */ /* a single CCtx Pool can be invoked from multiple threads in parallel */ @@ -312,36 +410,95 @@ typedef struct { } range_t; typedef struct { + /* All variables in the struct are protected by mutex. */ ZSTD_pthread_mutex_t mutex; ZSTD_pthread_cond_t cond; ZSTD_CCtx_params params; + ldmState_t ldmState; XXH64_state_t xxhState; unsigned nextJobID; + /* Protects ldmWindow. + * Must be acquired after the main mutex when acquiring both. + */ + ZSTD_pthread_mutex_t ldmWindowMutex; + ZSTD_pthread_cond_t ldmWindowCond; /* Signaled when ldmWindow is udpated */ + ZSTD_window_t ldmWindow; /* A thread-safe copy of ldmState.window */ } serialState_t; -static void ZSTDMT_serialState_reset(serialState_t* serialState, ZSTD_CCtx_params params) +static int ZSTDMT_serialState_reset(serialState_t* serialState, ZSTDMT_seqPool* seqPool, ZSTD_CCtx_params params) { + /* Adjust parameters */ + if (params.ldmParams.enableLdm) { + DEBUGLOG(4, "LDM window size = %u KB", (1U << params.cParams.windowLog) >> 10); + params.ldmParams.windowLog = params.cParams.windowLog; + ZSTD_ldm_adjustParameters(¶ms.ldmParams, ¶ms.cParams); + assert(params.ldmParams.hashLog >= params.ldmParams.bucketSizeLog); + assert(params.ldmParams.hashEveryLog < 32); + serialState->ldmState.hashPower = + ZSTD_ldm_getHashPower(params.ldmParams.minMatchLength); + } serialState->nextJobID = 0; if (params.fParams.checksumFlag) XXH64_reset(&serialState->xxhState, 0); + if (params.ldmParams.enableLdm) { + ZSTD_customMem cMem = params.customMem; + unsigned const hashLog = params.ldmParams.hashLog; + size_t const hashSize = ((size_t)1 << hashLog) * sizeof(ldmEntry_t); + unsigned const bucketLog = + params.ldmParams.hashLog - params.ldmParams.bucketSizeLog; + size_t const bucketSize = (size_t)1 << bucketLog; + unsigned const prevBucketLog = + serialState->params.ldmParams.hashLog - + serialState->params.ldmParams.bucketSizeLog; + /* Size the seq pool tables */ + ZSTDMT_setNbSeq(seqPool, ZSTD_ldm_getMaxNbSeq(params.ldmParams, params.jobSize)); + /* Reset the window */ + ZSTD_window_clear(&serialState->ldmState.window); + serialState->ldmWindow = serialState->ldmState.window; + /* Resize tables and output space if necessary. */ + if (serialState->params.ldmParams.hashLog < hashLog) { + ZSTD_free(serialState->ldmState.hashTable, cMem); + serialState->ldmState.hashTable = (ldmEntry_t*)ZSTD_malloc(hashSize, cMem); + } + if (prevBucketLog < bucketLog) { + ZSTD_free(serialState->ldmState.bucketOffsets, cMem); + serialState->ldmState.bucketOffsets = (BYTE*)ZSTD_malloc(bucketSize, cMem); + } + if (!serialState->ldmState.hashTable || !serialState->ldmState.bucketOffsets) + return 1; + /* Zero the tables */ + memset(serialState->ldmState.hashTable, 0, hashSize); + memset(serialState->ldmState.bucketOffsets, 0, bucketSize); + } serialState->params = params; + return 0; } static int ZSTDMT_serialState_init(serialState_t* serialState) { int initError = 0; + memset(serialState, 0, sizeof(*serialState)); initError |= ZSTD_pthread_mutex_init(&serialState->mutex, NULL); initError |= ZSTD_pthread_cond_init(&serialState->cond, NULL); + initError |= ZSTD_pthread_mutex_init(&serialState->ldmWindowMutex, NULL); + initError |= ZSTD_pthread_cond_init(&serialState->ldmWindowCond, NULL); return initError; } static void ZSTDMT_serialState_free(serialState_t* serialState) { + ZSTD_customMem cMem = serialState->params.customMem; ZSTD_pthread_mutex_destroy(&serialState->mutex); ZSTD_pthread_cond_destroy(&serialState->cond); + ZSTD_pthread_mutex_destroy(&serialState->ldmWindowMutex); + ZSTD_pthread_cond_destroy(&serialState->ldmWindowCond); + ZSTD_free(serialState->ldmState.hashTable, cMem); + ZSTD_free(serialState->ldmState.bucketOffsets, cMem); } -static void ZSTDMT_serialState_update(serialState_t* serialState, range_t src, unsigned jobID) +static void ZSTDMT_serialState_update(serialState_t* serialState, + ZSTD_CCtx* jobCCtx, rawSeqStore_t seqStore, + range_t src, unsigned jobID) { /* Wait for our turn */ ZSTD_PTHREAD_MUTEX_LOCK(&serialState->mutex); @@ -351,6 +508,24 @@ static void ZSTDMT_serialState_update(serialState_t* serialState, range_t src, u /* A future job may error and skip our job */ if (serialState->nextJobID == jobID) { /* It is now our turn, do any processing necessary */ + if (serialState->params.ldmParams.enableLdm) { + size_t error; + assert(seqStore.seq != NULL && seqStore.pos == 0 && + seqStore.size == 0 && seqStore.capacity > 0); + ZSTD_window_update(&serialState->ldmState.window, src.start, src.size); + error = ZSTD_ldm_generateSequences( + &serialState->ldmState, &seqStore, + &serialState->params.ldmParams, src.start, src.size); + /* We provide a large enough buffer to never fail. */ + assert(!ZSTD_isError(error)); (void)error; + /* Update ldmWindow to match the ldmState.window and signal the main + * thread if it is waiting for a buffer. + */ + ZSTD_PTHREAD_MUTEX_LOCK(&serialState->ldmWindowMutex); + serialState->ldmWindow = serialState->ldmState.window; + ZSTD_pthread_cond_signal(&serialState->ldmWindowCond); + ZSTD_pthread_mutex_unlock(&serialState->ldmWindowMutex); + } if (serialState->params.fParams.checksumFlag && src.size > 0) XXH64_update(&serialState->xxhState, src.start, src.size); } @@ -358,6 +533,14 @@ static void ZSTDMT_serialState_update(serialState_t* serialState, range_t src, u serialState->nextJobID++; ZSTD_pthread_cond_broadcast(&serialState->cond); ZSTD_pthread_mutex_unlock(&serialState->mutex); + + if (seqStore.size > 0) { + size_t const err = ZSTD_referenceExternalSequences( + jobCCtx, seqStore.seq, seqStore.size); + assert(serialState->params.ldmParams.enableLdm); + assert(!ZSTD_isError(err)); + (void)err; + } } static void ZSTDMT_serialState_ensureFinished(serialState_t* serialState, @@ -369,6 +552,11 @@ static void ZSTDMT_serialState_ensureFinished(serialState_t* serialState, DEBUGLOG(5, "Skipping past job %u because of error", jobID); serialState->nextJobID = jobID + 1; ZSTD_pthread_cond_broadcast(&serialState->cond); + + ZSTD_PTHREAD_MUTEX_LOCK(&serialState->ldmWindowMutex); + ZSTD_window_clear(&serialState->ldmWindow); + ZSTD_pthread_cond_signal(&serialState->ldmWindowCond); + ZSTD_pthread_mutex_unlock(&serialState->ldmWindowMutex); } ZSTD_pthread_mutex_unlock(&serialState->mutex); @@ -388,6 +576,7 @@ typedef struct { ZSTD_pthread_cond_t job_cond; /* Thread-safe - used by mtctx and worker */ ZSTDMT_CCtxPool* cctxPool; /* Thread-safe - used by mtctx and (all) workers */ ZSTDMT_bufferPool* bufPool; /* Thread-safe - used by mtctx and (all) workers */ + ZSTDMT_seqPool* seqPool; /* Thread-safe - used by mtctx and (all) workers */ serialState_t* serial; /* Thread-safe - used by mtctx and (all) workers */ buffer_t dstBuff; /* set by worker (or mtctx), then read by worker & mtctx, then modified by mtctx => no barrier */ range_t prefix; /* set by mtctx, then read by worker & mtctx => no barrier */ @@ -408,10 +597,15 @@ void ZSTDMT_compressionJob(void* jobDescription) ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription; ZSTD_CCtx_params jobParams = job->params; /* do not modify job->params ! copy it, modify the copy */ ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(job->cctxPool); + rawSeqStore_t rawSeqStore = ZSTDMT_getSeq(job->seqPool); buffer_t dstBuff = job->dstBuff; - /* Don't compute the checksum for chunks, but write it in the header */ + /* Don't compute the checksum for chunks, since we compute it externally, + * but write it in the header. + */ if (job->jobID != 0) jobParams.fParams.checksumFlag = 0; + /* Don't run LDM for the chunks, since we handle it externally */ + jobParams.ldmParams.enableLdm = 0; /* ressources */ if (cctx==NULL) { @@ -448,8 +642,8 @@ void ZSTDMT_compressionJob(void* jobDescription) goto _endJob; } } } - /* Perform serial step as early as possible */ - ZSTDMT_serialState_update(job->serial, job->src, job->jobID); + /* Perform serial step as early as possible, but after CCtx initialization */ + ZSTDMT_serialState_update(job->serial, cctx, rawSeqStore, job->src, job->jobID); if (!job->firstJob) { /* flush and overwrite frame header when it's not first job */ size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.capacity, job->src.start, 0); @@ -504,6 +698,7 @@ _endJob: DEBUGLOG(5, "Finished with prefix: %zx", (size_t)job->prefix.start); DEBUGLOG(5, "Finished with source: %zx", (size_t)job->src.start); /* release resources */ + ZSTDMT_releaseSeq(job->seqPool, rawSeqStore); ZSTDMT_releaseCCtx(job->cctxPool, cctx); /* report */ ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); @@ -544,6 +739,7 @@ struct ZSTDMT_CCtx_s { ZSTDMT_jobDescription* jobs; ZSTDMT_bufferPool* bufPool; ZSTDMT_CCtxPool* cctxPool; + ZSTDMT_seqPool* seqPool; ZSTD_CCtx_params params; size_t targetSectionSize; size_t targetPrefixSize; @@ -635,9 +831,10 @@ ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbWorkers, ZSTD_customMem cMem) mtctx->jobIDMask = nbJobs - 1; mtctx->bufPool = ZSTDMT_createBufferPool(nbWorkers, cMem); mtctx->cctxPool = ZSTDMT_createCCtxPool(nbWorkers, cMem); + mtctx->seqPool = ZSTDMT_createSeqPool(nbWorkers, cMem); initError = ZSTDMT_serialState_init(&mtctx->serial); mtctx->roundBuff = kNullRoundBuff; - if (!mtctx->factory | !mtctx->jobs | !mtctx->bufPool | !mtctx->cctxPool | initError) { + if (!mtctx->factory | !mtctx->jobs | !mtctx->bufPool | !mtctx->cctxPool | !mtctx->seqPool | initError) { ZSTDMT_freeCCtx(mtctx); return NULL; } @@ -692,6 +889,7 @@ size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx) ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem); ZSTDMT_freeBufferPool(mtctx->bufPool); ZSTDMT_freeCCtxPool(mtctx->cctxPool); + ZSTDMT_freeSeqPool(mtctx->seqPool); ZSTDMT_serialState_free(&mtctx->serial); ZSTD_freeCDict(mtctx->cdictLocal); if (mtctx->roundBuff.buffer) @@ -708,6 +906,7 @@ size_t ZSTDMT_sizeof_CCtx(ZSTDMT_CCtx* mtctx) + ZSTDMT_sizeof_bufferPool(mtctx->bufPool) + (mtctx->jobIDMask+1) * sizeof(ZSTDMT_jobDescription) + ZSTDMT_sizeof_CCtxPool(mtctx->cctxPool) + + ZSTDMT_sizeof_seqPool(mtctx->seqPool) + ZSTD_sizeof_CDict(mtctx->cdictLocal) + mtctx->roundBuff.capacity; } @@ -761,7 +960,6 @@ static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params) jobParams.compressionLevel = params.compressionLevel; jobParams.disableLiteralCompression = params.disableLiteralCompression; - jobParams.ldmParams = params.ldmParams; return jobParams; } @@ -827,12 +1025,16 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx) static size_t ZSTDMT_computeTargetJobLog(ZSTD_CCtx_params const params) { + if (params.ldmParams.enableLdm) + return MAX(21, params.cParams.chainLog + 4); return params.cParams.windowLog + 2; } static size_t ZSTDMT_computeOverlapLog(ZSTD_CCtx_params const params) { unsigned const overlapRLog = (params.overlapSizeLog>9) ? 0 : 9-params.overlapSizeLog; + if (params.ldmParams.enableLdm) + return (MIN(params.cParams.windowLog, ZSTDMT_computeTargetJobLog(params) - 2) - overlapRLog); return overlapRLog >= 9 ? 0 : (params.cParams.windowLog - overlapRLog); } @@ -856,7 +1058,7 @@ static size_t ZSTDMT_compress_advanced_internal( void* dst, size_t dstCapacity, const void* src, size_t srcSize, const ZSTD_CDict* cdict, - ZSTD_CCtx_params const params) + ZSTD_CCtx_params params) { ZSTD_CCtx_params const jobParams = ZSTDMT_initJobCCtxParams(params); size_t const overlapSize = (size_t)1 << ZSTDMT_computeOverlapLog(params); @@ -870,6 +1072,7 @@ static size_t ZSTDMT_compress_advanced_internal( assert(jobParams.nbWorkers == 0); assert(mtctx->cctxPool->totalCCtx == params.nbWorkers); + params.jobSize = (U32)avgJobSize; DEBUGLOG(4, "ZSTDMT_compress_advanced_internal: nbJobs=%2u (rawSize=%u bytes; fixedSize=%u) ", nbJobs, (U32)proposedJobSize, (U32)avgJobSize); @@ -882,7 +1085,8 @@ static size_t ZSTDMT_compress_advanced_internal( assert(avgJobSize >= 256 KB); /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), required to compress directly into Dst (no additional buffer) */ ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgJobSize) ); - ZSTDMT_serialState_reset(&mtctx->serial, params); + if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params)) + return ERROR(memory_allocation); if (nbJobs > mtctx->jobIDMask+1) { /* enlarge job table */ U32 jobsTableSize = nbJobs; @@ -915,6 +1119,7 @@ static size_t ZSTDMT_compress_advanced_internal( mtctx->jobs[u].dstBuff = dstBuffer; mtctx->jobs[u].cctxPool = mtctx->cctxPool; mtctx->jobs[u].bufPool = mtctx->bufPool; + mtctx->jobs[u].seqPool = mtctx->seqPool; mtctx->jobs[u].serial = &mtctx->serial; mtctx->jobs[u].jobID = u; mtctx->jobs[u].firstJob = (u==0); @@ -1023,10 +1228,7 @@ size_t ZSTDMT_initCStream_internal( /* init */ if (params.jobSize == 0) { - if (params.cParams.windowLog >= 29) - params.jobSize = ZSTDMT_JOBSIZE_MAX; - else - params.jobSize = 1U << ZSTDMT_computeTargetJobLog(params); + params.jobSize = 1U << ZSTDMT_computeTargetJobLog(params); } if (params.jobSize > ZSTDMT_JOBSIZE_MAX) params.jobSize = ZSTDMT_JOBSIZE_MAX; @@ -1072,11 +1274,15 @@ size_t ZSTDMT_initCStream_internal( DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->targetSectionSize>>10)); ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(mtctx->targetSectionSize)); { + /* If ldm is enabled we need windowSize space. */ + size_t const windowSize = mtctx->params.ldmParams.enableLdm ? (1U << mtctx->params.cParams.windowLog) : 0; /* Two buffers of slack, plus extra space for the overlap */ size_t const nbWorkers = MAX(mtctx->params.nbWorkers, 1); size_t const nbSlackBuffers = MIN(nbWorkers, 2) + (mtctx->targetPrefixSize > 0); - size_t const nbSections = nbWorkers + nbSlackBuffers; - size_t const capacity = mtctx->targetSectionSize * nbSections; + size_t const slackSize = mtctx->targetSectionSize * nbSlackBuffers; + /* Compute the total size, and always have enough slack */ + size_t const sectionsSize = mtctx->targetSectionSize * nbWorkers; + size_t const capacity = MAX(windowSize, sectionsSize) + slackSize; if (mtctx->roundBuff.capacity < capacity) { if (mtctx->roundBuff.buffer) ZSTD_free(mtctx->roundBuff.buffer, mtctx->cMem); @@ -1099,7 +1305,8 @@ size_t ZSTDMT_initCStream_internal( mtctx->allJobsCompleted = 0; mtctx->consumed = 0; mtctx->produced = 0; - ZSTDMT_serialState_reset(&mtctx->serial, params); + if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params)) + return ERROR(memory_allocation); return 0; } @@ -1201,6 +1408,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS mtctx->jobs[jobID].dstBuff = g_nullBuffer; mtctx->jobs[jobID].cctxPool = mtctx->cctxPool; mtctx->jobs[jobID].bufPool = mtctx->bufPool; + mtctx->jobs[jobID].seqPool = mtctx->seqPool; mtctx->jobs[jobID].serial = &mtctx->serial; mtctx->jobs[jobID].jobID = mtctx->nextJobID; mtctx->jobs[jobID].firstJob = (mtctx->nextJobID==0); @@ -1387,6 +1595,44 @@ static int ZSTDMT_isOverlapped(buffer_t buffer, range_t range) return bufferStart < rangeEnd && rangeStart < bufferEnd; } +static int ZSTDMT_doesOverlapWindow(buffer_t buffer, ZSTD_window_t window) +{ + range_t extDict; + range_t prefix; + + extDict.start = window.dictBase + window.lowLimit; + extDict.size = window.dictLimit - window.lowLimit; + + prefix.start = window.base + window.dictLimit; + prefix.size = window.nextSrc - (window.base + window.dictLimit); + DEBUGLOG(5, "extDict [0x%zx, 0x%zx)", + (size_t)extDict.start, + (size_t)extDict.start + extDict.size); + DEBUGLOG(5, "prefix [0x%zx, 0x%zx)", + (size_t)prefix.start, + (size_t)prefix.start + prefix.size); + + return ZSTDMT_isOverlapped(buffer, extDict) + || ZSTDMT_isOverlapped(buffer, prefix); +} + +static void ZSTDMT_waitForLdmComplete(ZSTDMT_CCtx* mtctx, buffer_t buffer) +{ + if (mtctx->params.ldmParams.enableLdm) { + ZSTD_pthread_mutex_t* mutex = &mtctx->serial.ldmWindowMutex; + DEBUGLOG(5, "source [0x%zx, 0x%zx)", + (size_t)buffer.start, + (size_t)buffer.start + buffer.capacity); + ZSTD_PTHREAD_MUTEX_LOCK(mutex); + while (ZSTDMT_doesOverlapWindow(buffer, mtctx->serial.ldmWindow)) { + DEBUGLOG(6, "Waiting for LDM to finish..."); + ZSTD_pthread_cond_wait(&mtctx->serial.ldmWindowCond, mutex); + } + DEBUGLOG(6, "Done waiting for LDM to finish"); + ZSTD_pthread_mutex_unlock(mutex); + } +} + /** * Attempts to set the inBuff to the next section to fill. * If any part of the new section is still in use we give up. @@ -1415,6 +1661,7 @@ static int ZSTDMT_tryGetInputRange(ZSTDMT_CCtx* mtctx) DEBUGLOG(6, "Waiting for buffer..."); return 0; } + ZSTDMT_waitForLdmComplete(mtctx, buffer); memmove(start, mtctx->inBuff.prefix.start, prefixSize); mtctx->inBuff.prefix.start = start; mtctx->roundBuff.pos = prefixSize; @@ -1427,6 +1674,9 @@ static int ZSTDMT_tryGetInputRange(ZSTDMT_CCtx* mtctx) return 0; } assert(!ZSTDMT_isOverlapped(buffer, mtctx->inBuff.prefix)); + + ZSTDMT_waitForLdmComplete(mtctx, buffer); + DEBUGLOG(5, "Using prefix range [%zx, %zx)", (size_t)mtctx->inBuff.prefix.start, (size_t)mtctx->inBuff.prefix.start + mtctx->inBuff.prefix.size); From 4b92574feb7be219aa8c89e3b7ae770a145ecbd8 Mon Sep 17 00:00:00 2001 From: Nick Terrell Date: Mon, 19 Mar 2018 17:54:04 -0700 Subject: [PATCH 2/6] Fix corner cases exposed by zstreamtest --- lib/compress/zstdmt_compress.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 3225f982..9d54da0d 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -456,11 +456,11 @@ static int ZSTDMT_serialState_reset(serialState_t* serialState, ZSTDMT_seqPool* ZSTD_window_clear(&serialState->ldmState.window); serialState->ldmWindow = serialState->ldmState.window; /* Resize tables and output space if necessary. */ - if (serialState->params.ldmParams.hashLog < hashLog) { + if (serialState->ldmState.hashTable == NULL || serialState->params.ldmParams.hashLog < hashLog) { ZSTD_free(serialState->ldmState.hashTable, cMem); serialState->ldmState.hashTable = (ldmEntry_t*)ZSTD_malloc(hashSize, cMem); } - if (prevBucketLog < bucketLog) { + if (serialState->ldmState.bucketOffsets == NULL || prevBucketLog < bucketLog) { ZSTD_free(serialState->ldmState.bucketOffsets, cMem); serialState->ldmState.bucketOffsets = (BYTE*)ZSTD_malloc(bucketSize, cMem); } @@ -1591,6 +1591,9 @@ static int ZSTDMT_isOverlapped(buffer_t buffer, range_t range) if (rangeStart == NULL || bufferStart == NULL) return 0; + /* Empty ranges cannot overlap */ + if (bufferStart == bufferEnd || rangeStart == rangeEnd) + return 0; return bufferStart < rangeEnd && rangeStart < bufferEnd; } From 24d9edbdd866be330161359db3badba5a73c3510 Mon Sep 17 00:00:00 2001 From: Nick Terrell Date: Mon, 19 Mar 2018 18:23:54 -0700 Subject: [PATCH 3/6] Set ldmParams to 0 when disabled --- lib/compress/zstdmt_compress.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 9d54da0d..b0d97c70 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -436,6 +436,8 @@ static int ZSTDMT_serialState_reset(serialState_t* serialState, ZSTDMT_seqPool* assert(params.ldmParams.hashEveryLog < 32); serialState->ldmState.hashPower = ZSTD_ldm_getHashPower(params.ldmParams.minMatchLength); + } else { + memset(¶ms.ldmParams, 0, sizeof(params.ldmParams)); } serialState->nextJobID = 0; if (params.fParams.checksumFlag) From d19f803a3b18d46acf565e99214d33c9fbbb13a0 Mon Sep 17 00:00:00 2001 From: Nick Terrell Date: Mon, 19 Mar 2018 18:56:39 -0700 Subject: [PATCH 4/6] Fix window size for 1 worker + flushing --- lib/compress/zstdmt_compress.c | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index b0d97c70..975e4149 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -1278,11 +1278,16 @@ size_t ZSTDMT_initCStream_internal( { /* If ldm is enabled we need windowSize space. */ size_t const windowSize = mtctx->params.ldmParams.enableLdm ? (1U << mtctx->params.cParams.windowLog) : 0; - /* Two buffers of slack, plus extra space for the overlap */ - size_t const nbWorkers = MAX(mtctx->params.nbWorkers, 1); - size_t const nbSlackBuffers = MIN(nbWorkers, 2) + (mtctx->targetPrefixSize > 0); + /* Two buffers of slack, plus extra space for the overlap + * This is the minimum slack that LDM works with. One extra because + * flush might waste up to targetSectionSize-1 bytes. Another extra + * for the overlap (if > 0), then one to fill which doesn't overlap + * with the LDM window. + */ + size_t const nbSlackBuffers = 2 + (mtctx->targetPrefixSize > 0); size_t const slackSize = mtctx->targetSectionSize * nbSlackBuffers; /* Compute the total size, and always have enough slack */ + size_t const nbWorkers = MAX(mtctx->params.nbWorkers, 1); size_t const sectionsSize = mtctx->targetSectionSize * nbWorkers; size_t const capacity = MAX(windowSize, sectionsSize) + slackSize; if (mtctx->roundBuff.capacity < capacity) { From 136b9e2392995a2e975237f832703362e3c6d523 Mon Sep 17 00:00:00 2001 From: Nick Terrell Date: Tue, 20 Mar 2018 14:31:43 -0700 Subject: [PATCH 5/6] Fix external sequence corner cases * Clear external sequences when we reset the `ZSTD_CCtx`. * Skip external sequences when a block is too small to compress. --- lib/compress/zstd_compress.c | 6 ++- lib/compress/zstd_ldm.c | 73 ++++++++++++++++++------------------ lib/compress/zstd_ldm.h | 10 +++++ 3 files changed, 52 insertions(+), 37 deletions(-) diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index db8d7a8c..642296fb 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -922,6 +922,7 @@ static size_t ZSTD_continueCCtx(ZSTD_CCtx* cctx, ZSTD_CCtx_params params, U64 pl cctx->dictID = 0; if (params.ldmParams.enableLdm) ZSTD_window_clear(&cctx->ldmState.window); + ZSTD_referenceExternalSequences(cctx, NULL, 0); ZSTD_invalidateMatchState(&cctx->blockState.matchState); ZSTD_reset_compressedBlockState(cctx->blockState.prevCBlock); XXH64_reset(&cctx->xxhState, 0); @@ -1108,6 +1109,7 @@ static size_t ZSTD_resetCCtx_internal(ZSTD_CCtx* zc, ptr = zc->ldmState.bucketOffsets + ldmBucketSize; ZSTD_window_clear(&zc->ldmState.window); } + ZSTD_referenceExternalSequences(zc, NULL, 0); /* buffers */ zc->inBuffSize = buffInSize; @@ -1818,8 +1820,10 @@ static size_t ZSTD_compressBlock_internal(ZSTD_CCtx* zc, ZSTD_matchState_t* const ms = &zc->blockState.matchState; DEBUGLOG(5, "ZSTD_compressBlock_internal (dstCapacity=%u, dictLimit=%u, nextToUpdate=%u)", (U32)dstCapacity, ms->window.dictLimit, ms->nextToUpdate); - if (srcSize < MIN_CBLOCK_SIZE+ZSTD_blockHeaderSize+1) + if (srcSize < MIN_CBLOCK_SIZE+ZSTD_blockHeaderSize+1) { + ZSTD_ldm_skipSequences(&zc->externSeqStore, srcSize, zc->appliedParams.cParams.searchLength); return 0; /* don't even attempt compression below a certain srcSize */ + } ZSTD_resetSeqStore(&(zc->seqStore)); /* limited update after a very long match */ diff --git a/lib/compress/zstd_ldm.c b/lib/compress/zstd_ldm.c index 5c9c0d2b..aff9dd2b 100644 --- a/lib/compress/zstd_ldm.c +++ b/lib/compress/zstd_ldm.c @@ -536,6 +536,34 @@ size_t ZSTD_ldm_generateSequences( return 0; } +void ZSTD_ldm_skipSequences(rawSeqStore_t* rawSeqStore, size_t srcSize, U32 const minMatch) { + while (srcSize > 0 && rawSeqStore->pos < rawSeqStore->size) { + rawSeq* seq = rawSeqStore->seq + rawSeqStore->pos; + if (srcSize <= seq->litLength) { + /* Skip past srcSize literals */ + seq->litLength -= srcSize; + return; + } + srcSize -= seq->litLength; + seq->litLength = 0; + if (srcSize < seq->matchLength) { + /* Skip past the first srcSize of the match */ + seq->matchLength -= srcSize; + if (seq->matchLength < minMatch) { + /* The match is too short, omit it */ + if (rawSeqStore->pos + 1 < rawSeqStore->size) { + seq[1].litLength += seq[0].matchLength; + } + rawSeqStore->pos++; + } + return; + } + srcSize -= seq->matchLength; + seq->matchLength = 0; + rawSeqStore->pos++; + } +} + /** * If the sequence length is longer than remaining then the sequence is split * between this block and the next. @@ -546,51 +574,24 @@ size_t ZSTD_ldm_generateSequences( static rawSeq maybeSplitSequence(rawSeqStore_t* rawSeqStore, U32 const remaining, U32 const minMatch) { - size_t const pos = rawSeqStore->pos; rawSeq sequence = rawSeqStore->seq[rawSeqStore->pos]; assert(sequence.offset > 0); - /* Handle partial sequences */ + /* Likely: No partial sequence */ + if (remaining >= sequence.litLength + sequence.matchLength) { + rawSeqStore->pos++; + return sequence; + } + /* Cut the sequence short (offset == 0 ==> rest is literals). */ if (remaining <= sequence.litLength) { - /* Split the literals that we have out of the sequence. - * They will become the last literals of this block. - * The next block starts off with the remaining literals. - */ - rawSeqStore->seq[pos].litLength -= remaining; sequence.offset = 0; } else if (remaining < sequence.litLength + sequence.matchLength) { - /* Split the match up into two sequences. One in this block, and one - * in the next with no literals. If either match would be shorter - * than searchLength we omit it. - */ - U32 const matchPrefix = remaining - sequence.litLength; - U32 const matchSuffix = sequence.matchLength - matchPrefix; - - assert(remaining > sequence.litLength); - assert(matchPrefix < sequence.matchLength); - assert(matchPrefix + matchSuffix == sequence.matchLength); - /* Update the first sequence */ - sequence.matchLength = matchPrefix; - /* Update the second sequence */ - if (matchSuffix >= minMatch) { - /* Update the second sequence, since the suffix is long enough */ - rawSeqStore->seq[pos].litLength = 0; - rawSeqStore->seq[pos].matchLength = matchSuffix; - } else { - /* Omit the second sequence since the match suffix is too short. - * Add to the next sequences literals (if any). - */ - if (pos + 1 < rawSeqStore->size) - rawSeqStore->seq[pos + 1].litLength += matchSuffix; - rawSeqStore->pos++; /* Consume the sequence */ - } + sequence.matchLength = remaining - sequence.litLength; if (sequence.matchLength < minMatch) { - /* Skip the current sequence if it is too short */ sequence.offset = 0; } - } else { - /* No partial sequence */ - rawSeqStore->pos++; /* Consume the sequence */ } + /* Skip past `remaining` bytes for the future sequences. */ + ZSTD_ldm_skipSequences(rawSeqStore, remaining, minMatch); return sequence; } diff --git a/lib/compress/zstd_ldm.h b/lib/compress/zstd_ldm.h index 9d2f7c39..84d3723c 100644 --- a/lib/compress/zstd_ldm.h +++ b/lib/compress/zstd_ldm.h @@ -65,6 +65,16 @@ size_t ZSTD_ldm_blockCompress(rawSeqStore_t* rawSeqStore, ZSTD_compressionParameters const* cParams, void const* src, size_t srcSize, int const extDict); +/** + * ZSTD_ldm_skipSequences(): + * + * Skip past `srcSize` bytes worth of sequences in `rawSeqStore`. + * Avoids emitting matches less than `minMatch` bytes. + * Must be called for data with is not passed to ZSTD_ldm_blockCompress(). + */ +void ZSTD_ldm_skipSequences(rawSeqStore_t* rawSeqStore, size_t srcSize, + U32 const minMatch); + /** ZSTD_ldm_initializeParameters() : * Initialize the long distance matching parameters to their default values. */ From a3b76a77ef9b9313c063bf6731f4259faef9978b Mon Sep 17 00:00:00 2001 From: Nick Terrell Date: Tue, 20 Mar 2018 15:34:40 -0700 Subject: [PATCH 6/6] Quiet appveyor warnings --- lib/compress/zstd_ldm.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/compress/zstd_ldm.c b/lib/compress/zstd_ldm.c index aff9dd2b..1684ac11 100644 --- a/lib/compress/zstd_ldm.c +++ b/lib/compress/zstd_ldm.c @@ -541,14 +541,14 @@ void ZSTD_ldm_skipSequences(rawSeqStore_t* rawSeqStore, size_t srcSize, U32 cons rawSeq* seq = rawSeqStore->seq + rawSeqStore->pos; if (srcSize <= seq->litLength) { /* Skip past srcSize literals */ - seq->litLength -= srcSize; + seq->litLength -= (U32)srcSize; return; } srcSize -= seq->litLength; seq->litLength = 0; if (srcSize < seq->matchLength) { /* Skip past the first srcSize of the match */ - seq->matchLength -= srcSize; + seq->matchLength -= (U32)srcSize; if (seq->matchLength < minMatch) { /* The match is too short, omit it */ if (rawSeqStore->pos + 1 < rawSeqStore->size) {