Merge pull request #1059 from terrelln/mt-ldm

Integrate ldm with zstdmt
This commit is contained in:
Yann Collet 2018-03-20 17:50:20 -07:00 committed by GitHub
commit d1bf609abf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 330 additions and 55 deletions

View File

@ -922,6 +922,7 @@ static size_t ZSTD_continueCCtx(ZSTD_CCtx* cctx, ZSTD_CCtx_params params, U64 pl
cctx->dictID = 0; cctx->dictID = 0;
if (params.ldmParams.enableLdm) if (params.ldmParams.enableLdm)
ZSTD_window_clear(&cctx->ldmState.window); ZSTD_window_clear(&cctx->ldmState.window);
ZSTD_referenceExternalSequences(cctx, NULL, 0);
ZSTD_invalidateMatchState(&cctx->blockState.matchState); ZSTD_invalidateMatchState(&cctx->blockState.matchState);
ZSTD_reset_compressedBlockState(cctx->blockState.prevCBlock); ZSTD_reset_compressedBlockState(cctx->blockState.prevCBlock);
XXH64_reset(&cctx->xxhState, 0); XXH64_reset(&cctx->xxhState, 0);
@ -1108,6 +1109,7 @@ static size_t ZSTD_resetCCtx_internal(ZSTD_CCtx* zc,
ptr = zc->ldmState.bucketOffsets + ldmBucketSize; ptr = zc->ldmState.bucketOffsets + ldmBucketSize;
ZSTD_window_clear(&zc->ldmState.window); ZSTD_window_clear(&zc->ldmState.window);
} }
ZSTD_referenceExternalSequences(zc, NULL, 0);
/* buffers */ /* buffers */
zc->inBuffSize = buffInSize; zc->inBuffSize = buffInSize;
@ -1925,8 +1927,10 @@ static size_t ZSTD_compressBlock_internal(ZSTD_CCtx* zc,
ZSTD_matchState_t* const ms = &zc->blockState.matchState; ZSTD_matchState_t* const ms = &zc->blockState.matchState;
DEBUGLOG(5, "ZSTD_compressBlock_internal (dstCapacity=%u, dictLimit=%u, nextToUpdate=%u)", DEBUGLOG(5, "ZSTD_compressBlock_internal (dstCapacity=%u, dictLimit=%u, nextToUpdate=%u)",
(U32)dstCapacity, ms->window.dictLimit, ms->nextToUpdate); (U32)dstCapacity, ms->window.dictLimit, ms->nextToUpdate);
if (srcSize < MIN_CBLOCK_SIZE+ZSTD_blockHeaderSize+1) if (srcSize < MIN_CBLOCK_SIZE+ZSTD_blockHeaderSize+1) {
ZSTD_ldm_skipSequences(&zc->externSeqStore, srcSize, zc->appliedParams.cParams.searchLength);
return 0; /* don't even attempt compression below a certain srcSize */ return 0; /* don't even attempt compression below a certain srcSize */
}
ZSTD_resetSeqStore(&(zc->seqStore)); ZSTD_resetSeqStore(&(zc->seqStore));
/* limited update after a very long match */ /* limited update after a very long match */

View File

@ -536,6 +536,34 @@ size_t ZSTD_ldm_generateSequences(
return 0; return 0;
} }
void ZSTD_ldm_skipSequences(rawSeqStore_t* rawSeqStore, size_t srcSize, U32 const minMatch) {
while (srcSize > 0 && rawSeqStore->pos < rawSeqStore->size) {
rawSeq* seq = rawSeqStore->seq + rawSeqStore->pos;
if (srcSize <= seq->litLength) {
/* Skip past srcSize literals */
seq->litLength -= (U32)srcSize;
return;
}
srcSize -= seq->litLength;
seq->litLength = 0;
if (srcSize < seq->matchLength) {
/* Skip past the first srcSize of the match */
seq->matchLength -= (U32)srcSize;
if (seq->matchLength < minMatch) {
/* The match is too short, omit it */
if (rawSeqStore->pos + 1 < rawSeqStore->size) {
seq[1].litLength += seq[0].matchLength;
}
rawSeqStore->pos++;
}
return;
}
srcSize -= seq->matchLength;
seq->matchLength = 0;
rawSeqStore->pos++;
}
}
/** /**
* If the sequence length is longer than remaining then the sequence is split * If the sequence length is longer than remaining then the sequence is split
* between this block and the next. * between this block and the next.
@ -546,51 +574,24 @@ size_t ZSTD_ldm_generateSequences(
static rawSeq maybeSplitSequence(rawSeqStore_t* rawSeqStore, static rawSeq maybeSplitSequence(rawSeqStore_t* rawSeqStore,
U32 const remaining, U32 const minMatch) U32 const remaining, U32 const minMatch)
{ {
size_t const pos = rawSeqStore->pos;
rawSeq sequence = rawSeqStore->seq[rawSeqStore->pos]; rawSeq sequence = rawSeqStore->seq[rawSeqStore->pos];
assert(sequence.offset > 0); assert(sequence.offset > 0);
/* Handle partial sequences */ /* Likely: No partial sequence */
if (remaining >= sequence.litLength + sequence.matchLength) {
rawSeqStore->pos++;
return sequence;
}
/* Cut the sequence short (offset == 0 ==> rest is literals). */
if (remaining <= sequence.litLength) { if (remaining <= sequence.litLength) {
/* Split the literals that we have out of the sequence.
* They will become the last literals of this block.
* The next block starts off with the remaining literals.
*/
rawSeqStore->seq[pos].litLength -= remaining;
sequence.offset = 0; sequence.offset = 0;
} else if (remaining < sequence.litLength + sequence.matchLength) { } else if (remaining < sequence.litLength + sequence.matchLength) {
/* Split the match up into two sequences. One in this block, and one sequence.matchLength = remaining - sequence.litLength;
* in the next with no literals. If either match would be shorter
* than searchLength we omit it.
*/
U32 const matchPrefix = remaining - sequence.litLength;
U32 const matchSuffix = sequence.matchLength - matchPrefix;
assert(remaining > sequence.litLength);
assert(matchPrefix < sequence.matchLength);
assert(matchPrefix + matchSuffix == sequence.matchLength);
/* Update the first sequence */
sequence.matchLength = matchPrefix;
/* Update the second sequence */
if (matchSuffix >= minMatch) {
/* Update the second sequence, since the suffix is long enough */
rawSeqStore->seq[pos].litLength = 0;
rawSeqStore->seq[pos].matchLength = matchSuffix;
} else {
/* Omit the second sequence since the match suffix is too short.
* Add to the next sequences literals (if any).
*/
if (pos + 1 < rawSeqStore->size)
rawSeqStore->seq[pos + 1].litLength += matchSuffix;
rawSeqStore->pos++; /* Consume the sequence */
}
if (sequence.matchLength < minMatch) { if (sequence.matchLength < minMatch) {
/* Skip the current sequence if it is too short */
sequence.offset = 0; sequence.offset = 0;
} }
} else {
/* No partial sequence */
rawSeqStore->pos++; /* Consume the sequence */
} }
/* Skip past `remaining` bytes for the future sequences. */
ZSTD_ldm_skipSequences(rawSeqStore, remaining, minMatch);
return sequence; return sequence;
} }

View File

@ -65,6 +65,16 @@ size_t ZSTD_ldm_blockCompress(rawSeqStore_t* rawSeqStore,
ZSTD_compressionParameters const* cParams, void const* src, size_t srcSize, ZSTD_compressionParameters const* cParams, void const* src, size_t srcSize,
int const extDict); int const extDict);
/**
* ZSTD_ldm_skipSequences():
*
* Skip past `srcSize` bytes worth of sequences in `rawSeqStore`.
* Avoids emitting matches less than `minMatch` bytes.
* Must be called for data with is not passed to ZSTD_ldm_blockCompress().
*/
void ZSTD_ldm_skipSequences(rawSeqStore_t* rawSeqStore, size_t srcSize,
U32 const minMatch);
/** ZSTD_ldm_initializeParameters() : /** ZSTD_ldm_initializeParameters() :
* Initialize the long distance matching parameters to their default values. */ * Initialize the long distance matching parameters to their default values. */

View File

@ -27,8 +27,14 @@
#include "pool.h" /* threadpool */ #include "pool.h" /* threadpool */
#include "threading.h" /* mutex */ #include "threading.h" /* mutex */
#include "zstd_compress_internal.h" /* MIN, ERROR, ZSTD_*, ZSTD_highbit32 */ #include "zstd_compress_internal.h" /* MIN, ERROR, ZSTD_*, ZSTD_highbit32 */
#include "zstd_ldm.h"
#include "zstdmt_compress.h" #include "zstdmt_compress.h"
/* Guards code to support resizing the SeqPool.
* We will want to resize the SeqPool to save memory in the future.
* Until then, comment the code out since it is unused.
*/
#define ZSTD_RESIZE_SEQPOOL 0
/* ====== Debug ====== */ /* ====== Debug ====== */
#if defined(ZSTD_DEBUG) && (ZSTD_DEBUG>=2) #if defined(ZSTD_DEBUG) && (ZSTD_DEBUG>=2)
@ -194,6 +200,32 @@ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool)
} }
} }
#if ZSTD_RESIZE_SEQPOOL
/** ZSTDMT_resizeBuffer() :
* assumption : bufPool must be valid
* @return : a buffer that is at least the buffer pool buffer size.
* If a reallocation happens, the data in the input buffer is copied.
*/
static buffer_t ZSTDMT_resizeBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buffer)
{
size_t const bSize = bufPool->bufferSize;
if (buffer.capacity < bSize) {
void* const start = ZSTD_malloc(bSize, bufPool->cMem);
buffer_t newBuffer;
newBuffer.start = start;
newBuffer.capacity = start == NULL ? 0 : bSize;
if (start != NULL) {
assert(newBuffer.capacity >= buffer.capacity);
memcpy(newBuffer.start, buffer.start, buffer.capacity);
DEBUGLOG(5, "ZSTDMT_resizeBuffer: created buffer of size %u", (U32)bSize);
return newBuffer;
}
DEBUGLOG(5, "ZSTDMT_resizeBuffer: buffer allocation failure !!");
}
return buffer;
}
#endif
/* store buffer for later re-use, up to pool capacity */ /* store buffer for later re-use, up to pool capacity */
static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf) static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf)
{ {
@ -214,6 +246,72 @@ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf)
} }
/* ===== Seq Pool Wrapper ====== */
static rawSeqStore_t kNullRawSeqStore = {NULL, 0, 0, 0};
typedef ZSTDMT_bufferPool ZSTDMT_seqPool;
static size_t ZSTDMT_sizeof_seqPool(ZSTDMT_seqPool* seqPool)
{
return ZSTDMT_sizeof_bufferPool(seqPool);
}
static rawSeqStore_t bufferToSeq(buffer_t buffer)
{
rawSeqStore_t seq = {NULL, 0, 0, 0};
seq.seq = (rawSeq*)buffer.start;
seq.capacity = buffer.capacity / sizeof(rawSeq);
return seq;
}
static buffer_t seqToBuffer(rawSeqStore_t seq)
{
buffer_t buffer;
buffer.start = seq.seq;
buffer.capacity = seq.capacity * sizeof(rawSeq);
return buffer;
}
static rawSeqStore_t ZSTDMT_getSeq(ZSTDMT_seqPool* seqPool)
{
if (seqPool->bufferSize == 0) {
return kNullRawSeqStore;
}
return bufferToSeq(ZSTDMT_getBuffer(seqPool));
}
#if ZSTD_RESIZE_SEQPOOL
static rawSeqStore_t ZSTDMT_resizeSeq(ZSTDMT_seqPool* seqPool, rawSeqStore_t seq)
{
return bufferToSeq(ZSTDMT_resizeBuffer(seqPool, seqToBuffer(seq)));
}
#endif
static void ZSTDMT_releaseSeq(ZSTDMT_seqPool* seqPool, rawSeqStore_t seq)
{
ZSTDMT_releaseBuffer(seqPool, seqToBuffer(seq));
}
static void ZSTDMT_setNbSeq(ZSTDMT_seqPool* const seqPool, size_t const nbSeq)
{
ZSTDMT_setBufferSize(seqPool, nbSeq * sizeof(rawSeq));
}
static ZSTDMT_seqPool* ZSTDMT_createSeqPool(unsigned nbWorkers, ZSTD_customMem cMem)
{
ZSTDMT_seqPool* seqPool = ZSTDMT_createBufferPool(nbWorkers, cMem);
ZSTDMT_setNbSeq(seqPool, 0);
return seqPool;
}
static void ZSTDMT_freeSeqPool(ZSTDMT_seqPool* seqPool)
{
ZSTDMT_freeBufferPool(seqPool);
}
/* ===== CCtx Pool ===== */ /* ===== CCtx Pool ===== */
/* a single CCtx Pool can be invoked from multiple threads in parallel */ /* a single CCtx Pool can be invoked from multiple threads in parallel */
@ -312,36 +410,97 @@ typedef struct {
} range_t; } range_t;
typedef struct { typedef struct {
/* All variables in the struct are protected by mutex. */
ZSTD_pthread_mutex_t mutex; ZSTD_pthread_mutex_t mutex;
ZSTD_pthread_cond_t cond; ZSTD_pthread_cond_t cond;
ZSTD_CCtx_params params; ZSTD_CCtx_params params;
ldmState_t ldmState;
XXH64_state_t xxhState; XXH64_state_t xxhState;
unsigned nextJobID; unsigned nextJobID;
/* Protects ldmWindow.
* Must be acquired after the main mutex when acquiring both.
*/
ZSTD_pthread_mutex_t ldmWindowMutex;
ZSTD_pthread_cond_t ldmWindowCond; /* Signaled when ldmWindow is udpated */
ZSTD_window_t ldmWindow; /* A thread-safe copy of ldmState.window */
} serialState_t; } serialState_t;
static void ZSTDMT_serialState_reset(serialState_t* serialState, ZSTD_CCtx_params params) static int ZSTDMT_serialState_reset(serialState_t* serialState, ZSTDMT_seqPool* seqPool, ZSTD_CCtx_params params)
{ {
/* Adjust parameters */
if (params.ldmParams.enableLdm) {
DEBUGLOG(4, "LDM window size = %u KB", (1U << params.cParams.windowLog) >> 10);
params.ldmParams.windowLog = params.cParams.windowLog;
ZSTD_ldm_adjustParameters(&params.ldmParams, &params.cParams);
assert(params.ldmParams.hashLog >= params.ldmParams.bucketSizeLog);
assert(params.ldmParams.hashEveryLog < 32);
serialState->ldmState.hashPower =
ZSTD_ldm_getHashPower(params.ldmParams.minMatchLength);
} else {
memset(&params.ldmParams, 0, sizeof(params.ldmParams));
}
serialState->nextJobID = 0; serialState->nextJobID = 0;
if (params.fParams.checksumFlag) if (params.fParams.checksumFlag)
XXH64_reset(&serialState->xxhState, 0); XXH64_reset(&serialState->xxhState, 0);
if (params.ldmParams.enableLdm) {
ZSTD_customMem cMem = params.customMem;
unsigned const hashLog = params.ldmParams.hashLog;
size_t const hashSize = ((size_t)1 << hashLog) * sizeof(ldmEntry_t);
unsigned const bucketLog =
params.ldmParams.hashLog - params.ldmParams.bucketSizeLog;
size_t const bucketSize = (size_t)1 << bucketLog;
unsigned const prevBucketLog =
serialState->params.ldmParams.hashLog -
serialState->params.ldmParams.bucketSizeLog;
/* Size the seq pool tables */
ZSTDMT_setNbSeq(seqPool, ZSTD_ldm_getMaxNbSeq(params.ldmParams, params.jobSize));
/* Reset the window */
ZSTD_window_clear(&serialState->ldmState.window);
serialState->ldmWindow = serialState->ldmState.window;
/* Resize tables and output space if necessary. */
if (serialState->ldmState.hashTable == NULL || serialState->params.ldmParams.hashLog < hashLog) {
ZSTD_free(serialState->ldmState.hashTable, cMem);
serialState->ldmState.hashTable = (ldmEntry_t*)ZSTD_malloc(hashSize, cMem);
}
if (serialState->ldmState.bucketOffsets == NULL || prevBucketLog < bucketLog) {
ZSTD_free(serialState->ldmState.bucketOffsets, cMem);
serialState->ldmState.bucketOffsets = (BYTE*)ZSTD_malloc(bucketSize, cMem);
}
if (!serialState->ldmState.hashTable || !serialState->ldmState.bucketOffsets)
return 1;
/* Zero the tables */
memset(serialState->ldmState.hashTable, 0, hashSize);
memset(serialState->ldmState.bucketOffsets, 0, bucketSize);
}
serialState->params = params; serialState->params = params;
return 0;
} }
static int ZSTDMT_serialState_init(serialState_t* serialState) static int ZSTDMT_serialState_init(serialState_t* serialState)
{ {
int initError = 0; int initError = 0;
memset(serialState, 0, sizeof(*serialState));
initError |= ZSTD_pthread_mutex_init(&serialState->mutex, NULL); initError |= ZSTD_pthread_mutex_init(&serialState->mutex, NULL);
initError |= ZSTD_pthread_cond_init(&serialState->cond, NULL); initError |= ZSTD_pthread_cond_init(&serialState->cond, NULL);
initError |= ZSTD_pthread_mutex_init(&serialState->ldmWindowMutex, NULL);
initError |= ZSTD_pthread_cond_init(&serialState->ldmWindowCond, NULL);
return initError; return initError;
} }
static void ZSTDMT_serialState_free(serialState_t* serialState) static void ZSTDMT_serialState_free(serialState_t* serialState)
{ {
ZSTD_customMem cMem = serialState->params.customMem;
ZSTD_pthread_mutex_destroy(&serialState->mutex); ZSTD_pthread_mutex_destroy(&serialState->mutex);
ZSTD_pthread_cond_destroy(&serialState->cond); ZSTD_pthread_cond_destroy(&serialState->cond);
ZSTD_pthread_mutex_destroy(&serialState->ldmWindowMutex);
ZSTD_pthread_cond_destroy(&serialState->ldmWindowCond);
ZSTD_free(serialState->ldmState.hashTable, cMem);
ZSTD_free(serialState->ldmState.bucketOffsets, cMem);
} }
static void ZSTDMT_serialState_update(serialState_t* serialState, range_t src, unsigned jobID) static void ZSTDMT_serialState_update(serialState_t* serialState,
ZSTD_CCtx* jobCCtx, rawSeqStore_t seqStore,
range_t src, unsigned jobID)
{ {
/* Wait for our turn */ /* Wait for our turn */
ZSTD_PTHREAD_MUTEX_LOCK(&serialState->mutex); ZSTD_PTHREAD_MUTEX_LOCK(&serialState->mutex);
@ -351,6 +510,24 @@ static void ZSTDMT_serialState_update(serialState_t* serialState, range_t src, u
/* A future job may error and skip our job */ /* A future job may error and skip our job */
if (serialState->nextJobID == jobID) { if (serialState->nextJobID == jobID) {
/* It is now our turn, do any processing necessary */ /* It is now our turn, do any processing necessary */
if (serialState->params.ldmParams.enableLdm) {
size_t error;
assert(seqStore.seq != NULL && seqStore.pos == 0 &&
seqStore.size == 0 && seqStore.capacity > 0);
ZSTD_window_update(&serialState->ldmState.window, src.start, src.size);
error = ZSTD_ldm_generateSequences(
&serialState->ldmState, &seqStore,
&serialState->params.ldmParams, src.start, src.size);
/* We provide a large enough buffer to never fail. */
assert(!ZSTD_isError(error)); (void)error;
/* Update ldmWindow to match the ldmState.window and signal the main
* thread if it is waiting for a buffer.
*/
ZSTD_PTHREAD_MUTEX_LOCK(&serialState->ldmWindowMutex);
serialState->ldmWindow = serialState->ldmState.window;
ZSTD_pthread_cond_signal(&serialState->ldmWindowCond);
ZSTD_pthread_mutex_unlock(&serialState->ldmWindowMutex);
}
if (serialState->params.fParams.checksumFlag && src.size > 0) if (serialState->params.fParams.checksumFlag && src.size > 0)
XXH64_update(&serialState->xxhState, src.start, src.size); XXH64_update(&serialState->xxhState, src.start, src.size);
} }
@ -358,6 +535,14 @@ static void ZSTDMT_serialState_update(serialState_t* serialState, range_t src, u
serialState->nextJobID++; serialState->nextJobID++;
ZSTD_pthread_cond_broadcast(&serialState->cond); ZSTD_pthread_cond_broadcast(&serialState->cond);
ZSTD_pthread_mutex_unlock(&serialState->mutex); ZSTD_pthread_mutex_unlock(&serialState->mutex);
if (seqStore.size > 0) {
size_t const err = ZSTD_referenceExternalSequences(
jobCCtx, seqStore.seq, seqStore.size);
assert(serialState->params.ldmParams.enableLdm);
assert(!ZSTD_isError(err));
(void)err;
}
} }
static void ZSTDMT_serialState_ensureFinished(serialState_t* serialState, static void ZSTDMT_serialState_ensureFinished(serialState_t* serialState,
@ -369,6 +554,11 @@ static void ZSTDMT_serialState_ensureFinished(serialState_t* serialState,
DEBUGLOG(5, "Skipping past job %u because of error", jobID); DEBUGLOG(5, "Skipping past job %u because of error", jobID);
serialState->nextJobID = jobID + 1; serialState->nextJobID = jobID + 1;
ZSTD_pthread_cond_broadcast(&serialState->cond); ZSTD_pthread_cond_broadcast(&serialState->cond);
ZSTD_PTHREAD_MUTEX_LOCK(&serialState->ldmWindowMutex);
ZSTD_window_clear(&serialState->ldmWindow);
ZSTD_pthread_cond_signal(&serialState->ldmWindowCond);
ZSTD_pthread_mutex_unlock(&serialState->ldmWindowMutex);
} }
ZSTD_pthread_mutex_unlock(&serialState->mutex); ZSTD_pthread_mutex_unlock(&serialState->mutex);
@ -388,6 +578,7 @@ typedef struct {
ZSTD_pthread_cond_t job_cond; /* Thread-safe - used by mtctx and worker */ ZSTD_pthread_cond_t job_cond; /* Thread-safe - used by mtctx and worker */
ZSTDMT_CCtxPool* cctxPool; /* Thread-safe - used by mtctx and (all) workers */ ZSTDMT_CCtxPool* cctxPool; /* Thread-safe - used by mtctx and (all) workers */
ZSTDMT_bufferPool* bufPool; /* Thread-safe - used by mtctx and (all) workers */ ZSTDMT_bufferPool* bufPool; /* Thread-safe - used by mtctx and (all) workers */
ZSTDMT_seqPool* seqPool; /* Thread-safe - used by mtctx and (all) workers */
serialState_t* serial; /* Thread-safe - used by mtctx and (all) workers */ serialState_t* serial; /* Thread-safe - used by mtctx and (all) workers */
buffer_t dstBuff; /* set by worker (or mtctx), then read by worker & mtctx, then modified by mtctx => no barrier */ buffer_t dstBuff; /* set by worker (or mtctx), then read by worker & mtctx, then modified by mtctx => no barrier */
range_t prefix; /* set by mtctx, then read by worker & mtctx => no barrier */ range_t prefix; /* set by mtctx, then read by worker & mtctx => no barrier */
@ -408,10 +599,15 @@ void ZSTDMT_compressionJob(void* jobDescription)
ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription; ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription;
ZSTD_CCtx_params jobParams = job->params; /* do not modify job->params ! copy it, modify the copy */ ZSTD_CCtx_params jobParams = job->params; /* do not modify job->params ! copy it, modify the copy */
ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(job->cctxPool); ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(job->cctxPool);
rawSeqStore_t rawSeqStore = ZSTDMT_getSeq(job->seqPool);
buffer_t dstBuff = job->dstBuff; buffer_t dstBuff = job->dstBuff;
/* Don't compute the checksum for chunks, but write it in the header */ /* Don't compute the checksum for chunks, since we compute it externally,
* but write it in the header.
*/
if (job->jobID != 0) jobParams.fParams.checksumFlag = 0; if (job->jobID != 0) jobParams.fParams.checksumFlag = 0;
/* Don't run LDM for the chunks, since we handle it externally */
jobParams.ldmParams.enableLdm = 0;
/* ressources */ /* ressources */
if (cctx==NULL) { if (cctx==NULL) {
@ -448,8 +644,8 @@ void ZSTDMT_compressionJob(void* jobDescription)
goto _endJob; goto _endJob;
} } } } } }
/* Perform serial step as early as possible */ /* Perform serial step as early as possible, but after CCtx initialization */
ZSTDMT_serialState_update(job->serial, job->src, job->jobID); ZSTDMT_serialState_update(job->serial, cctx, rawSeqStore, job->src, job->jobID);
if (!job->firstJob) { /* flush and overwrite frame header when it's not first job */ if (!job->firstJob) { /* flush and overwrite frame header when it's not first job */
size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.capacity, job->src.start, 0); size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.capacity, job->src.start, 0);
@ -504,6 +700,7 @@ _endJob:
DEBUGLOG(5, "Finished with prefix: %zx", (size_t)job->prefix.start); DEBUGLOG(5, "Finished with prefix: %zx", (size_t)job->prefix.start);
DEBUGLOG(5, "Finished with source: %zx", (size_t)job->src.start); DEBUGLOG(5, "Finished with source: %zx", (size_t)job->src.start);
/* release resources */ /* release resources */
ZSTDMT_releaseSeq(job->seqPool, rawSeqStore);
ZSTDMT_releaseCCtx(job->cctxPool, cctx); ZSTDMT_releaseCCtx(job->cctxPool, cctx);
/* report */ /* report */
ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex);
@ -544,6 +741,7 @@ struct ZSTDMT_CCtx_s {
ZSTDMT_jobDescription* jobs; ZSTDMT_jobDescription* jobs;
ZSTDMT_bufferPool* bufPool; ZSTDMT_bufferPool* bufPool;
ZSTDMT_CCtxPool* cctxPool; ZSTDMT_CCtxPool* cctxPool;
ZSTDMT_seqPool* seqPool;
ZSTD_CCtx_params params; ZSTD_CCtx_params params;
size_t targetSectionSize; size_t targetSectionSize;
size_t targetPrefixSize; size_t targetPrefixSize;
@ -635,9 +833,10 @@ ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbWorkers, ZSTD_customMem cMem)
mtctx->jobIDMask = nbJobs - 1; mtctx->jobIDMask = nbJobs - 1;
mtctx->bufPool = ZSTDMT_createBufferPool(nbWorkers, cMem); mtctx->bufPool = ZSTDMT_createBufferPool(nbWorkers, cMem);
mtctx->cctxPool = ZSTDMT_createCCtxPool(nbWorkers, cMem); mtctx->cctxPool = ZSTDMT_createCCtxPool(nbWorkers, cMem);
mtctx->seqPool = ZSTDMT_createSeqPool(nbWorkers, cMem);
initError = ZSTDMT_serialState_init(&mtctx->serial); initError = ZSTDMT_serialState_init(&mtctx->serial);
mtctx->roundBuff = kNullRoundBuff; mtctx->roundBuff = kNullRoundBuff;
if (!mtctx->factory | !mtctx->jobs | !mtctx->bufPool | !mtctx->cctxPool | initError) { if (!mtctx->factory | !mtctx->jobs | !mtctx->bufPool | !mtctx->cctxPool | !mtctx->seqPool | initError) {
ZSTDMT_freeCCtx(mtctx); ZSTDMT_freeCCtx(mtctx);
return NULL; return NULL;
} }
@ -692,6 +891,7 @@ size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx)
ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem); ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem);
ZSTDMT_freeBufferPool(mtctx->bufPool); ZSTDMT_freeBufferPool(mtctx->bufPool);
ZSTDMT_freeCCtxPool(mtctx->cctxPool); ZSTDMT_freeCCtxPool(mtctx->cctxPool);
ZSTDMT_freeSeqPool(mtctx->seqPool);
ZSTDMT_serialState_free(&mtctx->serial); ZSTDMT_serialState_free(&mtctx->serial);
ZSTD_freeCDict(mtctx->cdictLocal); ZSTD_freeCDict(mtctx->cdictLocal);
if (mtctx->roundBuff.buffer) if (mtctx->roundBuff.buffer)
@ -708,6 +908,7 @@ size_t ZSTDMT_sizeof_CCtx(ZSTDMT_CCtx* mtctx)
+ ZSTDMT_sizeof_bufferPool(mtctx->bufPool) + ZSTDMT_sizeof_bufferPool(mtctx->bufPool)
+ (mtctx->jobIDMask+1) * sizeof(ZSTDMT_jobDescription) + (mtctx->jobIDMask+1) * sizeof(ZSTDMT_jobDescription)
+ ZSTDMT_sizeof_CCtxPool(mtctx->cctxPool) + ZSTDMT_sizeof_CCtxPool(mtctx->cctxPool)
+ ZSTDMT_sizeof_seqPool(mtctx->seqPool)
+ ZSTD_sizeof_CDict(mtctx->cdictLocal) + ZSTD_sizeof_CDict(mtctx->cdictLocal)
+ mtctx->roundBuff.capacity; + mtctx->roundBuff.capacity;
} }
@ -761,7 +962,6 @@ static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params)
jobParams.compressionLevel = params.compressionLevel; jobParams.compressionLevel = params.compressionLevel;
jobParams.disableLiteralCompression = params.disableLiteralCompression; jobParams.disableLiteralCompression = params.disableLiteralCompression;
jobParams.ldmParams = params.ldmParams;
return jobParams; return jobParams;
} }
@ -827,12 +1027,16 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
static size_t ZSTDMT_computeTargetJobLog(ZSTD_CCtx_params const params) static size_t ZSTDMT_computeTargetJobLog(ZSTD_CCtx_params const params)
{ {
if (params.ldmParams.enableLdm)
return MAX(21, params.cParams.chainLog + 4);
return params.cParams.windowLog + 2; return params.cParams.windowLog + 2;
} }
static size_t ZSTDMT_computeOverlapLog(ZSTD_CCtx_params const params) static size_t ZSTDMT_computeOverlapLog(ZSTD_CCtx_params const params)
{ {
unsigned const overlapRLog = (params.overlapSizeLog>9) ? 0 : 9-params.overlapSizeLog; unsigned const overlapRLog = (params.overlapSizeLog>9) ? 0 : 9-params.overlapSizeLog;
if (params.ldmParams.enableLdm)
return (MIN(params.cParams.windowLog, ZSTDMT_computeTargetJobLog(params) - 2) - overlapRLog);
return overlapRLog >= 9 ? 0 : (params.cParams.windowLog - overlapRLog); return overlapRLog >= 9 ? 0 : (params.cParams.windowLog - overlapRLog);
} }
@ -856,7 +1060,7 @@ static size_t ZSTDMT_compress_advanced_internal(
void* dst, size_t dstCapacity, void* dst, size_t dstCapacity,
const void* src, size_t srcSize, const void* src, size_t srcSize,
const ZSTD_CDict* cdict, const ZSTD_CDict* cdict,
ZSTD_CCtx_params const params) ZSTD_CCtx_params params)
{ {
ZSTD_CCtx_params const jobParams = ZSTDMT_initJobCCtxParams(params); ZSTD_CCtx_params const jobParams = ZSTDMT_initJobCCtxParams(params);
size_t const overlapSize = (size_t)1 << ZSTDMT_computeOverlapLog(params); size_t const overlapSize = (size_t)1 << ZSTDMT_computeOverlapLog(params);
@ -870,6 +1074,7 @@ static size_t ZSTDMT_compress_advanced_internal(
assert(jobParams.nbWorkers == 0); assert(jobParams.nbWorkers == 0);
assert(mtctx->cctxPool->totalCCtx == params.nbWorkers); assert(mtctx->cctxPool->totalCCtx == params.nbWorkers);
params.jobSize = (U32)avgJobSize;
DEBUGLOG(4, "ZSTDMT_compress_advanced_internal: nbJobs=%2u (rawSize=%u bytes; fixedSize=%u) ", DEBUGLOG(4, "ZSTDMT_compress_advanced_internal: nbJobs=%2u (rawSize=%u bytes; fixedSize=%u) ",
nbJobs, (U32)proposedJobSize, (U32)avgJobSize); nbJobs, (U32)proposedJobSize, (U32)avgJobSize);
@ -882,7 +1087,8 @@ static size_t ZSTDMT_compress_advanced_internal(
assert(avgJobSize >= 256 KB); /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), required to compress directly into Dst (no additional buffer) */ assert(avgJobSize >= 256 KB); /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), required to compress directly into Dst (no additional buffer) */
ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgJobSize) ); ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgJobSize) );
ZSTDMT_serialState_reset(&mtctx->serial, params); if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params))
return ERROR(memory_allocation);
if (nbJobs > mtctx->jobIDMask+1) { /* enlarge job table */ if (nbJobs > mtctx->jobIDMask+1) { /* enlarge job table */
U32 jobsTableSize = nbJobs; U32 jobsTableSize = nbJobs;
@ -915,6 +1121,7 @@ static size_t ZSTDMT_compress_advanced_internal(
mtctx->jobs[u].dstBuff = dstBuffer; mtctx->jobs[u].dstBuff = dstBuffer;
mtctx->jobs[u].cctxPool = mtctx->cctxPool; mtctx->jobs[u].cctxPool = mtctx->cctxPool;
mtctx->jobs[u].bufPool = mtctx->bufPool; mtctx->jobs[u].bufPool = mtctx->bufPool;
mtctx->jobs[u].seqPool = mtctx->seqPool;
mtctx->jobs[u].serial = &mtctx->serial; mtctx->jobs[u].serial = &mtctx->serial;
mtctx->jobs[u].jobID = u; mtctx->jobs[u].jobID = u;
mtctx->jobs[u].firstJob = (u==0); mtctx->jobs[u].firstJob = (u==0);
@ -1023,9 +1230,6 @@ size_t ZSTDMT_initCStream_internal(
/* init */ /* init */
if (params.jobSize == 0) { if (params.jobSize == 0) {
if (params.cParams.windowLog >= 29)
params.jobSize = ZSTDMT_JOBSIZE_MAX;
else
params.jobSize = 1U << ZSTDMT_computeTargetJobLog(params); params.jobSize = 1U << ZSTDMT_computeTargetJobLog(params);
} }
if (params.jobSize > ZSTDMT_JOBSIZE_MAX) params.jobSize = ZSTDMT_JOBSIZE_MAX; if (params.jobSize > ZSTDMT_JOBSIZE_MAX) params.jobSize = ZSTDMT_JOBSIZE_MAX;
@ -1072,11 +1276,20 @@ size_t ZSTDMT_initCStream_internal(
DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->targetSectionSize>>10)); DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->targetSectionSize>>10));
ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(mtctx->targetSectionSize)); ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(mtctx->targetSectionSize));
{ {
/* Two buffers of slack, plus extra space for the overlap */ /* If ldm is enabled we need windowSize space. */
size_t const windowSize = mtctx->params.ldmParams.enableLdm ? (1U << mtctx->params.cParams.windowLog) : 0;
/* Two buffers of slack, plus extra space for the overlap
* This is the minimum slack that LDM works with. One extra because
* flush might waste up to targetSectionSize-1 bytes. Another extra
* for the overlap (if > 0), then one to fill which doesn't overlap
* with the LDM window.
*/
size_t const nbSlackBuffers = 2 + (mtctx->targetPrefixSize > 0);
size_t const slackSize = mtctx->targetSectionSize * nbSlackBuffers;
/* Compute the total size, and always have enough slack */
size_t const nbWorkers = MAX(mtctx->params.nbWorkers, 1); size_t const nbWorkers = MAX(mtctx->params.nbWorkers, 1);
size_t const nbSlackBuffers = MIN(nbWorkers, 2) + (mtctx->targetPrefixSize > 0); size_t const sectionsSize = mtctx->targetSectionSize * nbWorkers;
size_t const nbSections = nbWorkers + nbSlackBuffers; size_t const capacity = MAX(windowSize, sectionsSize) + slackSize;
size_t const capacity = mtctx->targetSectionSize * nbSections;
if (mtctx->roundBuff.capacity < capacity) { if (mtctx->roundBuff.capacity < capacity) {
if (mtctx->roundBuff.buffer) if (mtctx->roundBuff.buffer)
ZSTD_free(mtctx->roundBuff.buffer, mtctx->cMem); ZSTD_free(mtctx->roundBuff.buffer, mtctx->cMem);
@ -1099,7 +1312,8 @@ size_t ZSTDMT_initCStream_internal(
mtctx->allJobsCompleted = 0; mtctx->allJobsCompleted = 0;
mtctx->consumed = 0; mtctx->consumed = 0;
mtctx->produced = 0; mtctx->produced = 0;
ZSTDMT_serialState_reset(&mtctx->serial, params); if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params))
return ERROR(memory_allocation);
return 0; return 0;
} }
@ -1201,6 +1415,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS
mtctx->jobs[jobID].dstBuff = g_nullBuffer; mtctx->jobs[jobID].dstBuff = g_nullBuffer;
mtctx->jobs[jobID].cctxPool = mtctx->cctxPool; mtctx->jobs[jobID].cctxPool = mtctx->cctxPool;
mtctx->jobs[jobID].bufPool = mtctx->bufPool; mtctx->jobs[jobID].bufPool = mtctx->bufPool;
mtctx->jobs[jobID].seqPool = mtctx->seqPool;
mtctx->jobs[jobID].serial = &mtctx->serial; mtctx->jobs[jobID].serial = &mtctx->serial;
mtctx->jobs[jobID].jobID = mtctx->nextJobID; mtctx->jobs[jobID].jobID = mtctx->nextJobID;
mtctx->jobs[jobID].firstJob = (mtctx->nextJobID==0); mtctx->jobs[jobID].firstJob = (mtctx->nextJobID==0);
@ -1383,10 +1598,51 @@ static int ZSTDMT_isOverlapped(buffer_t buffer, range_t range)
if (rangeStart == NULL || bufferStart == NULL) if (rangeStart == NULL || bufferStart == NULL)
return 0; return 0;
/* Empty ranges cannot overlap */
if (bufferStart == bufferEnd || rangeStart == rangeEnd)
return 0;
return bufferStart < rangeEnd && rangeStart < bufferEnd; return bufferStart < rangeEnd && rangeStart < bufferEnd;
} }
static int ZSTDMT_doesOverlapWindow(buffer_t buffer, ZSTD_window_t window)
{
range_t extDict;
range_t prefix;
extDict.start = window.dictBase + window.lowLimit;
extDict.size = window.dictLimit - window.lowLimit;
prefix.start = window.base + window.dictLimit;
prefix.size = window.nextSrc - (window.base + window.dictLimit);
DEBUGLOG(5, "extDict [0x%zx, 0x%zx)",
(size_t)extDict.start,
(size_t)extDict.start + extDict.size);
DEBUGLOG(5, "prefix [0x%zx, 0x%zx)",
(size_t)prefix.start,
(size_t)prefix.start + prefix.size);
return ZSTDMT_isOverlapped(buffer, extDict)
|| ZSTDMT_isOverlapped(buffer, prefix);
}
static void ZSTDMT_waitForLdmComplete(ZSTDMT_CCtx* mtctx, buffer_t buffer)
{
if (mtctx->params.ldmParams.enableLdm) {
ZSTD_pthread_mutex_t* mutex = &mtctx->serial.ldmWindowMutex;
DEBUGLOG(5, "source [0x%zx, 0x%zx)",
(size_t)buffer.start,
(size_t)buffer.start + buffer.capacity);
ZSTD_PTHREAD_MUTEX_LOCK(mutex);
while (ZSTDMT_doesOverlapWindow(buffer, mtctx->serial.ldmWindow)) {
DEBUGLOG(6, "Waiting for LDM to finish...");
ZSTD_pthread_cond_wait(&mtctx->serial.ldmWindowCond, mutex);
}
DEBUGLOG(6, "Done waiting for LDM to finish");
ZSTD_pthread_mutex_unlock(mutex);
}
}
/** /**
* Attempts to set the inBuff to the next section to fill. * Attempts to set the inBuff to the next section to fill.
* If any part of the new section is still in use we give up. * If any part of the new section is still in use we give up.
@ -1415,6 +1671,7 @@ static int ZSTDMT_tryGetInputRange(ZSTDMT_CCtx* mtctx)
DEBUGLOG(6, "Waiting for buffer..."); DEBUGLOG(6, "Waiting for buffer...");
return 0; return 0;
} }
ZSTDMT_waitForLdmComplete(mtctx, buffer);
memmove(start, mtctx->inBuff.prefix.start, prefixSize); memmove(start, mtctx->inBuff.prefix.start, prefixSize);
mtctx->inBuff.prefix.start = start; mtctx->inBuff.prefix.start = start;
mtctx->roundBuff.pos = prefixSize; mtctx->roundBuff.pos = prefixSize;
@ -1427,6 +1684,9 @@ static int ZSTDMT_tryGetInputRange(ZSTDMT_CCtx* mtctx)
return 0; return 0;
} }
assert(!ZSTDMT_isOverlapped(buffer, mtctx->inBuff.prefix)); assert(!ZSTDMT_isOverlapped(buffer, mtctx->inBuff.prefix));
ZSTDMT_waitForLdmComplete(mtctx, buffer);
DEBUGLOG(5, "Using prefix range [%zx, %zx)", DEBUGLOG(5, "Using prefix range [%zx, %zx)",
(size_t)mtctx->inBuff.prefix.start, (size_t)mtctx->inBuff.prefix.start,
(size_t)mtctx->inBuff.prefix.start + mtctx->inBuff.prefix.size); (size_t)mtctx->inBuff.prefix.start + mtctx->inBuff.prefix.size);