zstdmt : intermediate outBuffer allocated from within worker

reduces total amount of memory needed,
since jobs in queue do not have an outBuffer pre-reserved now
This commit is contained in:
Yann Collet 2017-07-11 14:59:10 -07:00
parent 16261e6951
commit 34b2b95631

View File

@ -286,6 +286,7 @@ typedef struct {
ZSTD_parameters params; ZSTD_parameters params;
const ZSTD_CDict* cdict; const ZSTD_CDict* cdict;
ZSTDMT_CCtxPool* cctxPool; ZSTDMT_CCtxPool* cctxPool;
ZSTDMT_bufferPool* bufPool;
unsigned long long fullFrameSize; unsigned long long fullFrameSize;
} ZSTDMT_jobDescription; } ZSTDMT_jobDescription;
@ -295,7 +296,7 @@ void ZSTDMT_compressChunk(void* jobDescription)
ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription; ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription;
ZSTD_CCtx* cctx = ZSTDMT_getCCtx(job->cctxPool); ZSTD_CCtx* cctx = ZSTDMT_getCCtx(job->cctxPool);
const void* const src = (const char*)job->srcStart + job->dictSize; const void* const src = (const char*)job->srcStart + job->dictSize;
buffer_t const dstBuff = job->dstBuff; buffer_t dstBuff = job->dstBuff;
DEBUGLOG(5, "job (first:%u) (last:%u) : dictSize %u, srcSize %u", DEBUGLOG(5, "job (first:%u) (last:%u) : dictSize %u, srcSize %u",
job->firstChunk, job->lastChunk, (U32)job->dictSize, (U32)job->srcSize); job->firstChunk, job->lastChunk, (U32)job->dictSize, (U32)job->srcSize);
@ -304,6 +305,16 @@ void ZSTDMT_compressChunk(void* jobDescription)
goto _endJob; goto _endJob;
} }
if (dstBuff.start == NULL) {
size_t const dstCapacity = ZSTD_compressBound(job->srcSize);
dstBuff = ZSTDMT_getBuffer(job->bufPool, dstCapacity);
if (dstBuff.start==NULL) {
job->cSize = ERROR(memory_allocation);
goto _endJob;
}
job->dstBuff = dstBuff;
}
if (job->cdict) { /* should only happen for first segment */ if (job->cdict) { /* should only happen for first segment */
size_t const initError = ZSTD_compressBegin_usingCDict_advanced(cctx, job->cdict, job->params.fParams, job->fullFrameSize); size_t const initError = ZSTD_compressBegin_usingCDict_advanced(cctx, job->cdict, job->params.fParams, job->fullFrameSize);
DEBUGLOG(5, "using CDict"); DEBUGLOG(5, "using CDict");
@ -347,7 +358,7 @@ _endJob:
struct ZSTDMT_CCtx_s { struct ZSTDMT_CCtx_s {
POOL_ctx* factory; POOL_ctx* factory;
ZSTDMT_jobDescription* jobs; ZSTDMT_jobDescription* jobs;
ZSTDMT_bufferPool* buffPool; ZSTDMT_bufferPool* bufPool;
ZSTDMT_CCtxPool* cctxPool; ZSTDMT_CCtxPool* cctxPool;
pthread_mutex_t jobCompleted_mutex; pthread_mutex_t jobCompleted_mutex;
pthread_cond_t jobCompleted_cond; pthread_cond_t jobCompleted_cond;
@ -402,9 +413,9 @@ ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem)
mtctx->factory = POOL_create(nbThreads, 1); mtctx->factory = POOL_create(nbThreads, 1);
mtctx->jobs = ZSTDMT_allocJobsTable(&nbJobs, cMem); mtctx->jobs = ZSTDMT_allocJobsTable(&nbJobs, cMem);
mtctx->jobIDMask = nbJobs - 1; mtctx->jobIDMask = nbJobs - 1;
mtctx->buffPool = ZSTDMT_createBufferPool(nbThreads, cMem); mtctx->bufPool = ZSTDMT_createBufferPool(nbThreads, cMem);
mtctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads, cMem); mtctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads, cMem);
if (!mtctx->factory | !mtctx->jobs | !mtctx->buffPool | !mtctx->cctxPool) { if (!mtctx->factory | !mtctx->jobs | !mtctx->bufPool | !mtctx->cctxPool) {
ZSTDMT_freeCCtx(mtctx); ZSTDMT_freeCCtx(mtctx);
return NULL; return NULL;
} }
@ -426,13 +437,13 @@ static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx)
unsigned jobID; unsigned jobID;
DEBUGLOG(3, "ZSTDMT_releaseAllJobResources"); DEBUGLOG(3, "ZSTDMT_releaseAllJobResources");
for (jobID=0; jobID <= mtctx->jobIDMask; jobID++) { for (jobID=0; jobID <= mtctx->jobIDMask; jobID++) {
ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[jobID].dstBuff); ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].dstBuff);
mtctx->jobs[jobID].dstBuff = g_nullBuffer; mtctx->jobs[jobID].dstBuff = g_nullBuffer;
ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[jobID].src); ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].src);
mtctx->jobs[jobID].src = g_nullBuffer; mtctx->jobs[jobID].src = g_nullBuffer;
} }
memset(mtctx->jobs, 0, (mtctx->jobIDMask+1)*sizeof(ZSTDMT_jobDescription)); memset(mtctx->jobs, 0, (mtctx->jobIDMask+1)*sizeof(ZSTDMT_jobDescription));
ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->inBuff.buffer); ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->inBuff.buffer);
mtctx->inBuff.buffer = g_nullBuffer; mtctx->inBuff.buffer = g_nullBuffer;
mtctx->allJobsCompleted = 1; mtctx->allJobsCompleted = 1;
} }
@ -442,7 +453,7 @@ size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx)
if (mtctx==NULL) return 0; /* compatible with free on NULL */ if (mtctx==NULL) return 0; /* compatible with free on NULL */
POOL_free(mtctx->factory); POOL_free(mtctx->factory);
if (!mtctx->allJobsCompleted) ZSTDMT_releaseAllJobResources(mtctx); /* stop workers first */ if (!mtctx->allJobsCompleted) ZSTDMT_releaseAllJobResources(mtctx); /* stop workers first */
ZSTDMT_freeBufferPool(mtctx->buffPool); /* release job resources into pools first */ ZSTDMT_freeBufferPool(mtctx->bufPool); /* release job resources into pools first */
ZSTD_free(mtctx->jobs, mtctx->cMem); ZSTD_free(mtctx->jobs, mtctx->cMem);
ZSTDMT_freeCCtxPool(mtctx->cctxPool); ZSTDMT_freeCCtxPool(mtctx->cctxPool);
ZSTD_freeCDict(mtctx->cdictLocal); ZSTD_freeCDict(mtctx->cdictLocal);
@ -456,11 +467,11 @@ size_t ZSTDMT_sizeof_CCtx(ZSTDMT_CCtx* mtctx)
{ {
if (mtctx == NULL) return 0; /* supports sizeof NULL */ if (mtctx == NULL) return 0; /* supports sizeof NULL */
return sizeof(*mtctx) return sizeof(*mtctx)
+ POOL_sizeof(mtctx->factory) + POOL_sizeof(mtctx->factory)
+ ZSTDMT_sizeof_bufferPool(mtctx->buffPool) + ZSTDMT_sizeof_bufferPool(mtctx->bufPool)
+ (mtctx->jobIDMask+1) * sizeof(ZSTDMT_jobDescription) + (mtctx->jobIDMask+1) * sizeof(ZSTDMT_jobDescription)
+ ZSTDMT_sizeof_CCtxPool(mtctx->cctxPool) + ZSTDMT_sizeof_CCtxPool(mtctx->cctxPool)
+ ZSTD_sizeof_CDict(mtctx->cdictLocal); + ZSTD_sizeof_CDict(mtctx->cdictLocal);
} }
size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSDTMT_parameter parameter, unsigned value) size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSDTMT_parameter parameter, unsigned value)
@ -534,16 +545,9 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx,
size_t const chunkSize = MIN(remainingSrcSize, avgChunkSize); size_t const chunkSize = MIN(remainingSrcSize, avgChunkSize);
size_t const dstBufferCapacity = ZSTD_compressBound(chunkSize); size_t const dstBufferCapacity = ZSTD_compressBound(chunkSize);
buffer_t const dstAsBuffer = { (char*)dst + dstBufferPos, dstBufferCapacity }; buffer_t const dstAsBuffer = { (char*)dst + dstBufferPos, dstBufferCapacity };
buffer_t const dstBuffer = u < compressWithinDst ? dstAsBuffer : ZSTDMT_getBuffer(mtctx->buffPool, dstBufferCapacity); buffer_t const dstBuffer = u < compressWithinDst ? dstAsBuffer : g_nullBuffer;
size_t dictSize = u ? overlapSize : 0; size_t dictSize = u ? overlapSize : 0;
if (dstBuffer.start==NULL) {
mtctx->jobs[u].cSize = ERROR(memory_allocation); /* job result */
mtctx->jobs[u].jobCompleted = 1;
nbChunks = u+1; /* only wait and free u jobs, instead of initially expected nbChunks ones */
break; /* let's wait for previous jobs to complete, but don't start new ones */
}
mtctx->jobs[u].srcStart = srcStart + frameStartPos - dictSize; mtctx->jobs[u].srcStart = srcStart + frameStartPos - dictSize;
mtctx->jobs[u].dictSize = dictSize; mtctx->jobs[u].dictSize = dictSize;
mtctx->jobs[u].srcSize = chunkSize; mtctx->jobs[u].srcSize = chunkSize;
@ -554,6 +558,7 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx,
if (u!=0) mtctx->jobs[u].params.fParams.checksumFlag = 0; if (u!=0) mtctx->jobs[u].params.fParams.checksumFlag = 0;
mtctx->jobs[u].dstBuff = dstBuffer; mtctx->jobs[u].dstBuff = dstBuffer;
mtctx->jobs[u].cctxPool = mtctx->cctxPool; mtctx->jobs[u].cctxPool = mtctx->cctxPool;
mtctx->jobs[u].bufPool = mtctx->bufPool;
mtctx->jobs[u].firstChunk = (u==0); mtctx->jobs[u].firstChunk = (u==0);
mtctx->jobs[u].lastChunk = (u==nbChunks-1); mtctx->jobs[u].lastChunk = (u==nbChunks-1);
mtctx->jobs[u].jobCompleted = 0; mtctx->jobs[u].jobCompleted = 0;
@ -591,13 +596,13 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx,
memmove((char*)dst + dstPos, mtctx->jobs[chunkID].dstBuff.start, cSize); /* may overlap when chunk compressed within dst */ memmove((char*)dst + dstPos, mtctx->jobs[chunkID].dstBuff.start, cSize); /* may overlap when chunk compressed within dst */
if (chunkID >= compressWithinDst) { /* chunk compressed into its own buffer, which must be released */ if (chunkID >= compressWithinDst) { /* chunk compressed into its own buffer, which must be released */
DEBUGLOG(5, "releasing buffer %u>=%u", chunkID, compressWithinDst); DEBUGLOG(5, "releasing buffer %u>=%u", chunkID, compressWithinDst);
ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[chunkID].dstBuff); ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[chunkID].dstBuff);
} }
mtctx->jobs[chunkID].dstBuff = g_nullBuffer; mtctx->jobs[chunkID].dstBuff = g_nullBuffer;
} }
dstPos += cSize ; dstPos += cSize ;
} }
} } /* for (chunkID=0; chunkID<nbChunks; chunkID++) */
if (!error) DEBUGLOG(4, "compressed size : %u ", (U32)dstPos); if (!error) DEBUGLOG(4, "compressed size : %u ", (U32)dstPos);
return error ? error : dstPos; return error ? error : dstPos;
} }
@ -696,8 +701,9 @@ size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs,
} }
size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* mtctx, size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* mtctx,
const void* dict, size_t dictSize, const void* dict, size_t dictSize,
ZSTD_parameters params, unsigned long long pledgedSrcSize) ZSTD_parameters params,
unsigned long long pledgedSrcSize)
{ {
DEBUGLOG(5, "ZSTDMT_initCStream_advanced"); DEBUGLOG(5, "ZSTDMT_initCStream_advanced");
return ZSTDMT_initCStream_internal(mtctx, dict, dictSize, NULL, params, pledgedSrcSize); return ZSTDMT_initCStream_internal(mtctx, dict, dictSize, NULL, params, pledgedSrcSize);
@ -733,18 +739,8 @@ size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) {
static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsigned endFrame) static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsigned endFrame)
{ {
size_t const dstBufferCapacity = ZSTD_compressBound(srcSize);
buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity);
unsigned const jobID = zcs->nextJobID & zcs->jobIDMask; unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
if (dstBuffer.start==NULL) {
zcs->jobs[jobID].jobCompleted = 1;
zcs->nextJobID++;
ZSTDMT_waitForAllJobsCompleted(zcs);
ZSTDMT_releaseAllJobResources(zcs);
return ERROR(memory_allocation);
}
DEBUGLOG(4, "preparing job %u to compress %u bytes with %u preload ", DEBUGLOG(4, "preparing job %u to compress %u bytes with %u preload ",
zcs->nextJobID, (U32)srcSize, (U32)zcs->dictSize); zcs->nextJobID, (U32)srcSize, (U32)zcs->dictSize);
zcs->jobs[jobID].src = zcs->inBuff.buffer; zcs->jobs[jobID].src = zcs->inBuff.buffer;
@ -757,8 +753,9 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0; if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0;
zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL; zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL;
zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize; zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize;
zcs->jobs[jobID].dstBuff = dstBuffer; zcs->jobs[jobID].dstBuff = g_nullBuffer;
zcs->jobs[jobID].cctxPool = zcs->cctxPool; zcs->jobs[jobID].cctxPool = zcs->cctxPool;
zcs->jobs[jobID].bufPool = zcs->bufPool;
zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0); zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0);
zcs->jobs[jobID].lastChunk = endFrame; zcs->jobs[jobID].lastChunk = endFrame;
zcs->jobs[jobID].jobCompleted = 0; zcs->jobs[jobID].jobCompleted = 0;
@ -770,7 +767,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
if (!endFrame) { if (!endFrame) {
size_t const newDictSize = MIN(srcSize + zcs->dictSize, zcs->targetDictSize); size_t const newDictSize = MIN(srcSize + zcs->dictSize, zcs->targetDictSize);
DEBUGLOG(5, "ZSTDMT_createCompressionJob::endFrame = %u", endFrame); DEBUGLOG(5, "ZSTDMT_createCompressionJob::endFrame = %u", endFrame);
zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize); zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->bufPool, zcs->inBuffSize);
if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */ if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */
zcs->jobs[jobID].jobCompleted = 1; zcs->jobs[jobID].jobCompleted = 1;
zcs->nextJobID++; zcs->nextJobID++;
@ -845,19 +842,19 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi
job.cSize += 4; job.cSize += 4;
zcs->jobs[wJobID].cSize += 4; zcs->jobs[wJobID].cSize += 4;
} } } }
ZSTDMT_releaseBuffer(zcs->buffPool, job.src); ZSTDMT_releaseBuffer(zcs->bufPool, job.src);
zcs->jobs[wJobID].srcStart = NULL; zcs->jobs[wJobID].srcStart = NULL;
zcs->jobs[wJobID].src = g_nullBuffer; zcs->jobs[wJobID].src = g_nullBuffer;
zcs->jobs[wJobID].jobScanned = 1; zcs->jobs[wJobID].jobScanned = 1;
} }
{ size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos); { size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
DEBUGLOG(5, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID); DEBUGLOG(2, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite); memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
output->pos += toWrite; output->pos += toWrite;
job.dstFlushed += toWrite; job.dstFlushed += toWrite;
} }
if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => move to next one */ if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => move to next one */
ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff); ZSTDMT_releaseBuffer(zcs->bufPool, job.dstBuff);
zcs->jobs[wJobID].dstBuff = g_nullBuffer; zcs->jobs[wJobID].dstBuff = g_nullBuffer;
zcs->jobs[wJobID].jobCompleted = 0; zcs->jobs[wJobID].jobCompleted = 0;
zcs->doneJobID++; zcs->doneJobID++;
@ -904,7 +901,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
if (ZSTD_isError(cSize)) return cSize; if (ZSTD_isError(cSize)) return cSize;
input->pos = input->size; input->pos = input->size;
output->pos += cSize; output->pos += cSize;
ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->inBuff.buffer); /* was allocated in initStream */ ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->inBuff.buffer); /* was allocated in initStream */
mtctx->allJobsCompleted = 1; mtctx->allJobsCompleted = 1;
mtctx->frameEnded = 1; mtctx->frameEnded = 1;
return 0; return 0;
@ -913,7 +910,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
/* fill input buffer */ /* fill input buffer */
if (input->size > input->pos) { /* support NULL input */ if (input->size > input->pos) { /* support NULL input */
if (mtctx->inBuff.buffer.start == NULL) { if (mtctx->inBuff.buffer.start == NULL) {
mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->buffPool, mtctx->inBuffSize); mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool, mtctx->inBuffSize);
if (mtctx->inBuff.buffer.start == NULL) return ERROR(memory_allocation); if (mtctx->inBuff.buffer.start == NULL) return ERROR(memory_allocation);
mtctx->inBuff.filled = 0; mtctx->inBuff.filled = 0;
} }