release input buffers from inside worker thread

buffers are released sooner, which makes them available faster for next job.
=> decreases total nb of buffers necessary
This commit is contained in:
Yann Collet 2017-07-11 15:56:40 -07:00
parent 57236184af
commit 2a62f48bf4

View File

@ -169,7 +169,7 @@ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool)
static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf) static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf)
{ {
DEBUGLOG(2, "ZSTDMT_releaseBuffer"); DEBUGLOG(2, "ZSTDMT_releaseBuffer");
if (buf.start == NULL) return; /* release on NULL */ if (buf.start == NULL) return; /* compatible with release on NULL */
pthread_mutex_lock(&bufPool->poolMutex); pthread_mutex_lock(&bufPool->poolMutex);
if (bufPool->nbBuffers < bufPool->totalBuffers) { if (bufPool->nbBuffers < bufPool->totalBuffers) {
bufPool->bTable[bufPool->nbBuffers++] = buf; /* store for later re-use */ bufPool->bTable[bufPool->nbBuffers++] = buf; /* store for later re-use */
@ -271,16 +271,11 @@ static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx)
/* ===== Thread worker ===== */ /* ===== Thread worker ===== */
typedef struct {
buffer_t buffer;
size_t filled;
} inBuff_t;
typedef struct { typedef struct {
buffer_t src; buffer_t src;
const void* srcStart; const void* srcStart;
size_t srcSize;
size_t dictSize; size_t dictSize;
size_t srcSize;
buffer_t dstBuff; buffer_t dstBuff;
size_t cSize; size_t cSize;
size_t dstFlushed; size_t dstFlushed;
@ -349,6 +344,8 @@ void ZSTDMT_compressChunk(void* jobDescription)
_endJob: _endJob:
ZSTDMT_releaseCCtx(job->cctxPool, cctx); ZSTDMT_releaseCCtx(job->cctxPool, cctx);
ZSTDMT_releaseBuffer(job->bufPool, job->src);
job->src = g_nullBuffer; job->srcStart = NULL;
PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex);
job->jobCompleted = 1; job->jobCompleted = 1;
job->jobScanned = 0; job->jobScanned = 0;
@ -361,6 +358,11 @@ _endJob:
/* ===== Multi-threaded compression ===== */ /* ===== Multi-threaded compression ===== */
/* ------------------------------------------ */ /* ------------------------------------------ */
typedef struct {
buffer_t buffer;
size_t filled;
} inBuff_t;
struct ZSTDMT_CCtx_s { struct ZSTDMT_CCtx_s {
POOL_ctx* factory; POOL_ctx* factory;
ZSTDMT_jobDescription* jobs; ZSTDMT_jobDescription* jobs;
@ -513,6 +515,7 @@ static unsigned computeNbChunks(size_t srcSize, unsigned windowLog, unsigned nbT
} }
/* Note : missing checksum at the end ! */
size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx, size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx,
void* dst, size_t dstCapacity, void* dst, size_t dstCapacity,
const void* src, size_t srcSize, const void* src, size_t srcSize,
@ -555,6 +558,7 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx,
buffer_t const dstBuffer = u < compressWithinDst ? dstAsBuffer : g_nullBuffer; buffer_t const dstBuffer = u < compressWithinDst ? dstAsBuffer : g_nullBuffer;
size_t dictSize = u ? overlapSize : 0; size_t dictSize = u ? overlapSize : 0;
mtctx->jobs[u].src = g_nullBuffer;
mtctx->jobs[u].srcStart = srcStart + frameStartPos - dictSize; mtctx->jobs[u].srcStart = srcStart + frameStartPos - dictSize;
mtctx->jobs[u].dictSize = dictSize; mtctx->jobs[u].dictSize = dictSize;
mtctx->jobs[u].srcSize = chunkSize; mtctx->jobs[u].srcSize = chunkSize;
@ -771,10 +775,12 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex; zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex;
zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond; zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond;
if (zcs->params.fParams.checksumFlag)
XXH64_update(&zcs->xxhState, (const char*)zcs->inBuff.buffer.start + zcs->dictSize, srcSize);
/* get a new buffer for next input */ /* get a new buffer for next input */
if (!endFrame) { if (!endFrame) {
size_t const newDictSize = MIN(srcSize + zcs->dictSize, zcs->targetDictSize); size_t const newDictSize = MIN(srcSize + zcs->dictSize, zcs->targetDictSize);
DEBUGLOG(5, "ZSTDMT_createCompressionJob::endFrame = %u", endFrame);
zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->bufPool); zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->bufPool);
if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */ if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */
zcs->jobs[jobID].jobCompleted = 1; zcs->jobs[jobID].jobCompleted = 1;
@ -783,18 +789,12 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
ZSTDMT_releaseAllJobResources(zcs); ZSTDMT_releaseAllJobResources(zcs);
return ERROR(memory_allocation); return ERROR(memory_allocation);
} }
DEBUGLOG(5, "inBuff currently filled to %u", (U32)zcs->inBuff.filled);
zcs->inBuff.filled -= srcSize + zcs->dictSize - newDictSize; zcs->inBuff.filled -= srcSize + zcs->dictSize - newDictSize;
DEBUGLOG(5, "new job : inBuff filled to %u, with %u dict and %u src",
(U32)zcs->inBuff.filled, (U32)newDictSize,
(U32)(zcs->inBuff.filled - newDictSize));
memmove(zcs->inBuff.buffer.start, memmove(zcs->inBuff.buffer.start,
(const char*)zcs->jobs[jobID].srcStart + zcs->dictSize + srcSize - newDictSize, (const char*)zcs->jobs[jobID].srcStart + zcs->dictSize + srcSize - newDictSize,
zcs->inBuff.filled); zcs->inBuff.filled);
DEBUGLOG(5, "new inBuff pre-filled");
zcs->dictSize = newDictSize; zcs->dictSize = newDictSize;
} else { /* if (endFrame==1) */ } else { /* if (endFrame==1) */
DEBUGLOG(5, "ZSTDMT_createCompressionJob::endFrame = %u", endFrame);
zcs->inBuff.buffer = g_nullBuffer; zcs->inBuff.buffer = g_nullBuffer;
zcs->inBuff.filled = 0; zcs->inBuff.filled = 0;
zcs->dictSize = 0; zcs->dictSize = 0;
@ -842,7 +842,6 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi
} }
DEBUGLOG(5, "zcs->params.fParams.checksumFlag : %u ", zcs->params.fParams.checksumFlag); DEBUGLOG(5, "zcs->params.fParams.checksumFlag : %u ", zcs->params.fParams.checksumFlag);
if (zcs->params.fParams.checksumFlag) { if (zcs->params.fParams.checksumFlag) {
XXH64_update(&zcs->xxhState, (const char*)job.srcStart + job.dictSize, job.srcSize);
if (zcs->frameEnded && (zcs->doneJobID+1 == zcs->nextJobID)) { /* write checksum at end of last section */ if (zcs->frameEnded && (zcs->doneJobID+1 == zcs->nextJobID)) { /* write checksum at end of last section */
U32 const checksum = (U32)XXH64_digest(&zcs->xxhState); U32 const checksum = (U32)XXH64_digest(&zcs->xxhState);
DEBUGLOG(5, "writing checksum : %08X \n", checksum); DEBUGLOG(5, "writing checksum : %08X \n", checksum);
@ -850,9 +849,6 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi
job.cSize += 4; job.cSize += 4;
zcs->jobs[wJobID].cSize += 4; zcs->jobs[wJobID].cSize += 4;
} } } }
ZSTDMT_releaseBuffer(zcs->bufPool, job.src);
zcs->jobs[wJobID].srcStart = NULL;
zcs->jobs[wJobID].src = g_nullBuffer;
zcs->jobs[wJobID].jobScanned = 1; zcs->jobs[wJobID].jobScanned = 1;
} }
{ size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos); { size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);