From 2a62f48bf4df42f3ca389ee891ff76da56623802 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Tue, 11 Jul 2017 15:56:40 -0700 Subject: [PATCH] release input buffers from inside worker thread buffers are released sooner, which makes them available faster for next job. => decreases total nb of buffers necessary --- lib/compress/zstdmt_compress.c | 32 ++++++++++++++------------------ 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 703d25e3..7cf637f5 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -169,7 +169,7 @@ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool) static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf) { DEBUGLOG(2, "ZSTDMT_releaseBuffer"); - if (buf.start == NULL) return; /* release on NULL */ + if (buf.start == NULL) return; /* compatible with release on NULL */ pthread_mutex_lock(&bufPool->poolMutex); if (bufPool->nbBuffers < bufPool->totalBuffers) { bufPool->bTable[bufPool->nbBuffers++] = buf; /* store for later re-use */ @@ -271,16 +271,11 @@ static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx) /* ===== Thread worker ===== */ -typedef struct { - buffer_t buffer; - size_t filled; -} inBuff_t; - typedef struct { buffer_t src; const void* srcStart; - size_t srcSize; size_t dictSize; + size_t srcSize; buffer_t dstBuff; size_t cSize; size_t dstFlushed; @@ -349,6 +344,8 @@ void ZSTDMT_compressChunk(void* jobDescription) _endJob: ZSTDMT_releaseCCtx(job->cctxPool, cctx); + ZSTDMT_releaseBuffer(job->bufPool, job->src); + job->src = g_nullBuffer; job->srcStart = NULL; PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); job->jobCompleted = 1; job->jobScanned = 0; @@ -361,6 +358,11 @@ _endJob: /* ===== Multi-threaded compression ===== */ /* ------------------------------------------ */ +typedef struct { + buffer_t buffer; + size_t filled; +} inBuff_t; + struct ZSTDMT_CCtx_s { POOL_ctx* factory; ZSTDMT_jobDescription* jobs; @@ -513,6 +515,7 @@ static unsigned computeNbChunks(size_t srcSize, unsigned windowLog, unsigned nbT } +/* Note : missing checksum at the end ! */ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx, void* dst, size_t dstCapacity, const void* src, size_t srcSize, @@ -555,6 +558,7 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx, buffer_t const dstBuffer = u < compressWithinDst ? dstAsBuffer : g_nullBuffer; size_t dictSize = u ? overlapSize : 0; + mtctx->jobs[u].src = g_nullBuffer; mtctx->jobs[u].srcStart = srcStart + frameStartPos - dictSize; mtctx->jobs[u].dictSize = dictSize; mtctx->jobs[u].srcSize = chunkSize; @@ -771,10 +775,12 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex; zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond; + if (zcs->params.fParams.checksumFlag) + XXH64_update(&zcs->xxhState, (const char*)zcs->inBuff.buffer.start + zcs->dictSize, srcSize); + /* get a new buffer for next input */ if (!endFrame) { size_t const newDictSize = MIN(srcSize + zcs->dictSize, zcs->targetDictSize); - DEBUGLOG(5, "ZSTDMT_createCompressionJob::endFrame = %u", endFrame); zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->bufPool); if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */ zcs->jobs[jobID].jobCompleted = 1; @@ -783,18 +789,12 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi ZSTDMT_releaseAllJobResources(zcs); return ERROR(memory_allocation); } - DEBUGLOG(5, "inBuff currently filled to %u", (U32)zcs->inBuff.filled); zcs->inBuff.filled -= srcSize + zcs->dictSize - newDictSize; - DEBUGLOG(5, "new job : inBuff filled to %u, with %u dict and %u src", - (U32)zcs->inBuff.filled, (U32)newDictSize, - (U32)(zcs->inBuff.filled - newDictSize)); memmove(zcs->inBuff.buffer.start, (const char*)zcs->jobs[jobID].srcStart + zcs->dictSize + srcSize - newDictSize, zcs->inBuff.filled); - DEBUGLOG(5, "new inBuff pre-filled"); zcs->dictSize = newDictSize; } else { /* if (endFrame==1) */ - DEBUGLOG(5, "ZSTDMT_createCompressionJob::endFrame = %u", endFrame); zcs->inBuff.buffer = g_nullBuffer; zcs->inBuff.filled = 0; zcs->dictSize = 0; @@ -842,7 +842,6 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi } DEBUGLOG(5, "zcs->params.fParams.checksumFlag : %u ", zcs->params.fParams.checksumFlag); if (zcs->params.fParams.checksumFlag) { - XXH64_update(&zcs->xxhState, (const char*)job.srcStart + job.dictSize, job.srcSize); if (zcs->frameEnded && (zcs->doneJobID+1 == zcs->nextJobID)) { /* write checksum at end of last section */ U32 const checksum = (U32)XXH64_digest(&zcs->xxhState); DEBUGLOG(5, "writing checksum : %08X \n", checksum); @@ -850,9 +849,6 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi job.cSize += 4; zcs->jobs[wJobID].cSize += 4; } } - ZSTDMT_releaseBuffer(zcs->bufPool, job.src); - zcs->jobs[wJobID].srcStart = NULL; - zcs->jobs[wJobID].src = g_nullBuffer; zcs->jobs[wJobID].jobScanned = 1; } { size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);