From 670b1fc547424897aa324736d47a1c79bf61e355 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Mon, 10 Jul 2017 16:30:55 -0700 Subject: [PATCH] optimized memory usage for ZSTDMT_compress() Previously, each job would reserve a CCtx right before being posted. The CCtx would be "part of the job description", and only released when the job is completed (aka flushed). For ZSTDMT_compress(), which creates all jobs first and only join at the end, that meant one CCtx per job. The nb of jobs used to be == nb of threads, but since latest modification, which reduces the size of jobs in order to spread the load of difficult areas, it also increases the nb of jobs for large sources / small compression level. This resulted in many more CCtx being created. In this new version, CCtx are reserved within the worker thread. It guaranteea there cannot be more CCtx reserved than workers (<= nb threads). To do that, it required to make the CCtx Pool multi-threading-safe : it can now be called from multiple threads in parallel. --- lib/compress/zstdmt_compress.c | 97 ++++++++++++++++++++-------------- tests/fuzzer.c | 45 ++++++++++------ 2 files changed, 86 insertions(+), 56 deletions(-) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index bad2db9c..a176c3ee 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -126,6 +126,7 @@ static size_t ZSTDMT_sizeof_bufferPool(ZSTDMT_bufferPool* bufPool) * assumption : invocation from main thread only ! */ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* pool, size_t bSize) { + DEBUGLOG(2, "ZSTDMT_getBuffer"); if (pool->nbBuffers) { /* try to use an existing buffer */ buffer_t const buf = pool->bTable[--(pool->nbBuffers)]; size_t const availBufferSize = buf.size; @@ -160,21 +161,23 @@ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* pool, buffer_t buf) /* ===== CCtx Pool ===== */ +/* a single cctxPool can be called from multiple threads in parallel */ + typedef struct { + pthread_mutex_t poolMutex; unsigned totalCCtx; unsigned availCCtx; ZSTD_customMem cMem; ZSTD_CCtx* cctx[1]; /* variable size */ } ZSTDMT_CCtxPool; -/* assumption : CCtxPool invocation only from main thread */ - /* note : all CCtx borrowed from the pool should be released back to the pool _before_ freeing the pool */ static void ZSTDMT_freeCCtxPool(ZSTDMT_CCtxPool* pool) { unsigned u; for (u=0; utotalCCtx; u++) ZSTD_freeCCtx(pool->cctx[u]); /* note : compatible with free on NULL */ + pthread_mutex_destroy(&pool->poolMutex); ZSTD_free(pool, pool->cMem); } @@ -186,6 +189,7 @@ static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(unsigned nbThreads, ZSTDMT_CCtxPool* const cctxPool = (ZSTDMT_CCtxPool*) ZSTD_calloc( sizeof(ZSTDMT_CCtxPool) + (nbThreads-1)*sizeof(ZSTD_CCtx*), cMem); if (!cctxPool) return NULL; + pthread_mutex_init(&cctxPool->poolMutex, NULL); cctxPool->cMem = cMem; cctxPool->totalCCtx = nbThreads; cctxPool->availCCtx = 1; /* at least one cctx for single-thread mode */ @@ -198,34 +202,47 @@ static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(unsigned nbThreads, /* only works during initialization phase, not during compression */ static size_t ZSTDMT_sizeof_CCtxPool(ZSTDMT_CCtxPool* cctxPool) { - unsigned const nbThreads = cctxPool->totalCCtx; - size_t const poolSize = sizeof(*cctxPool) - + (nbThreads-1)*sizeof(ZSTD_CCtx*); - unsigned u; - size_t totalCCtxSize = 0; - for (u=0; ucctx[u]); - - return poolSize + totalCCtxSize; + pthread_mutex_lock(&cctxPool->poolMutex); + { unsigned const nbThreads = cctxPool->totalCCtx; + size_t const poolSize = sizeof(*cctxPool) + + (nbThreads-1)*sizeof(ZSTD_CCtx*); + unsigned u; + size_t totalCCtxSize = 0; + for (u=0; ucctx[u]); + } + pthread_mutex_unlock(&cctxPool->poolMutex); + return poolSize + totalCCtxSize; + } } -static ZSTD_CCtx* ZSTDMT_getCCtx(ZSTDMT_CCtxPool* pool) +static ZSTD_CCtx* ZSTDMT_getCCtx(ZSTDMT_CCtxPool* cctxPool) { - if (pool->availCCtx) { - pool->availCCtx--; - return pool->cctx[pool->availCCtx]; - } - return ZSTD_createCCtx_advanced(pool->cMem); /* note : can be NULL, when creation fails ! */ + DEBUGLOG(5, "ZSTDMT_getCCtx"); + pthread_mutex_lock(&cctxPool->poolMutex); + if (cctxPool->availCCtx) { + cctxPool->availCCtx--; + { ZSTD_CCtx* const cctx = cctxPool->cctx[cctxPool->availCCtx]; + pthread_mutex_unlock(&cctxPool->poolMutex); + return cctx; + } } + pthread_mutex_unlock(&cctxPool->poolMutex); + DEBUGLOG(5, "create one more CCtx"); + return ZSTD_createCCtx_advanced(cctxPool->cMem); /* note : can be NULL, when creation fails ! */ } static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx) { if (cctx==NULL) return; /* compatibility with release on NULL */ + pthread_mutex_lock(&pool->poolMutex); if (pool->availCCtx < pool->totalCCtx) pool->cctx[pool->availCCtx++] = cctx; - else + else { /* pool overflow : should not happen, since totalCCtx==nbThreads */ + DEBUGLOG(5, "CCtx pool overflow : free cctx"); ZSTD_freeCCtx(cctx); + } + pthread_mutex_unlock(&pool->poolMutex); } @@ -237,7 +254,6 @@ typedef struct { } inBuff_t; typedef struct { - ZSTD_CCtx* cctx; buffer_t src; const void* srcStart; size_t srcSize; @@ -253,6 +269,7 @@ typedef struct { pthread_cond_t* jobCompleted_cond; ZSTD_parameters params; const ZSTD_CDict* cdict; + ZSTDMT_CCtxPool* cctxPool; unsigned long long fullFrameSize; } ZSTDMT_jobDescription; @@ -260,37 +277,45 @@ typedef struct { void ZSTDMT_compressChunk(void* jobDescription) { ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription; + ZSTD_CCtx* cctx = ZSTDMT_getCCtx(job->cctxPool); const void* const src = (const char*)job->srcStart + job->dictSize; buffer_t const dstBuff = job->dstBuff; DEBUGLOG(5, "job (first:%u) (last:%u) : dictSize %u, srcSize %u", job->firstChunk, job->lastChunk, (U32)job->dictSize, (U32)job->srcSize); + + if (cctx==NULL) { + job->cSize = ERROR(memory_allocation); + goto _endJob; + } + if (job->cdict) { /* should only happen for first segment */ - size_t const initError = ZSTD_compressBegin_usingCDict_advanced(job->cctx, job->cdict, job->params.fParams, job->fullFrameSize); + size_t const initError = ZSTD_compressBegin_usingCDict_advanced(cctx, job->cdict, job->params.fParams, job->fullFrameSize); DEBUGLOG(5, "using CDict"); if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; } } else { /* srcStart points at reloaded section */ if (!job->firstChunk) job->params.fParams.contentSizeFlag = 0; /* ensure no srcSize control */ - { size_t const dictModeError = ZSTD_setCCtxParameter(job->cctx, ZSTD_p_forceRawDict, 1); /* Force loading dictionary in "content-only" mode (no header analysis) */ - size_t const initError = ZSTD_compressBegin_advanced(job->cctx, job->srcStart, job->dictSize, job->params, job->fullFrameSize); + { size_t const dictModeError = ZSTD_setCCtxParameter(cctx, ZSTD_p_forceRawDict, 1); /* Force loading dictionary in "content-only" mode (no header analysis) */ + size_t const initError = ZSTD_compressBegin_advanced(cctx, job->srcStart, job->dictSize, job->params, job->fullFrameSize); if (ZSTD_isError(initError) || ZSTD_isError(dictModeError)) { job->cSize = initError; goto _endJob; } - ZSTD_setCCtxParameter(job->cctx, ZSTD_p_forceWindow, 1); + ZSTD_setCCtxParameter(cctx, ZSTD_p_forceWindow, 1); } } if (!job->firstChunk) { /* flush and overwrite frame header when it's not first segment */ - size_t const hSize = ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, src, 0); + size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, 0); if (ZSTD_isError(hSize)) { job->cSize = hSize; goto _endJob; } - ZSTD_invalidateRepCodes(job->cctx); + ZSTD_invalidateRepCodes(cctx); } DEBUGLOG(5, "Compressing : "); DEBUG_PRINTHEX(4, job->srcStart, 12); job->cSize = (job->lastChunk) ? - ZSTD_compressEnd (job->cctx, dstBuff.start, dstBuff.size, src, job->srcSize) : - ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, src, job->srcSize); + ZSTD_compressEnd (cctx, dstBuff.start, dstBuff.size, src, job->srcSize) : + ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, job->srcSize); DEBUGLOG(5, "compressed %u bytes into %u bytes (first:%u) (last:%u)", (unsigned)job->srcSize, (unsigned)job->cSize, job->firstChunk, job->lastChunk); DEBUGLOG(5, "dstBuff.size : %u ; => %s", (U32)dstBuff.size, ZSTD_getErrorName(job->cSize)); _endJob: + ZSTDMT_releaseCCtx(job->cctxPool, cctx); PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); job->jobCompleted = 1; job->jobScanned = 0; @@ -390,8 +415,6 @@ static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx) mtctx->jobs[jobID].dstBuff = g_nullBuffer; ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[jobID].src); mtctx->jobs[jobID].src = g_nullBuffer; - ZSTDMT_releaseCCtx(mtctx->cctxPool, mtctx->jobs[jobID].cctx); - mtctx->jobs[jobID].cctx = NULL; } memset(mtctx->jobs, 0, (mtctx->jobIDMask+1)*sizeof(ZSTDMT_jobDescription)); ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->inBuff.buffer); @@ -497,10 +520,9 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx, size_t const dstBufferCapacity = ZSTD_compressBound(chunkSize); buffer_t const dstAsBuffer = { (char*)dst + dstBufferPos, dstBufferCapacity }; buffer_t const dstBuffer = u < compressWithinDst ? dstAsBuffer : ZSTDMT_getBuffer(mtctx->buffPool, dstBufferCapacity); - ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(mtctx->cctxPool); size_t dictSize = u ? overlapSize : 0; - if ((cctx==NULL) || (dstBuffer.start==NULL)) { + if (dstBuffer.start==NULL) { mtctx->jobs[u].cSize = ERROR(memory_allocation); /* job result */ mtctx->jobs[u].jobCompleted = 1; nbChunks = u+1; /* only wait and free u jobs, instead of initially expected nbChunks ones */ @@ -516,7 +538,7 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx, /* do not calculate checksum within sections, but write it in header for first section */ if (u!=0) mtctx->jobs[u].params.fParams.checksumFlag = 0; mtctx->jobs[u].dstBuff = dstBuffer; - mtctx->jobs[u].cctx = cctx; + mtctx->jobs[u].cctxPool = mtctx->cctxPool; mtctx->jobs[u].firstChunk = (u==0); mtctx->jobs[u].lastChunk = (u==nbChunks-1); mtctx->jobs[u].jobCompleted = 0; @@ -545,8 +567,6 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx, pthread_mutex_unlock(&mtctx->jobCompleted_mutex); DEBUGLOG(5, "ready to write chunk %u ", chunkID); - ZSTDMT_releaseCCtx(mtctx->cctxPool, mtctx->jobs[chunkID].cctx); - mtctx->jobs[chunkID].cctx = NULL; mtctx->jobs[chunkID].srcStart = NULL; { size_t const cSize = mtctx->jobs[chunkID].cSize; if (ZSTD_isError(cSize)) error = cSize; @@ -703,10 +723,9 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi { size_t const dstBufferCapacity = ZSTD_compressBound(srcSize); buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity); - ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool); unsigned const jobID = zcs->nextJobID & zcs->jobIDMask; - if ((cctx==NULL) || (dstBuffer.start==NULL)) { + if (dstBuffer.start==NULL) { zcs->jobs[jobID].jobCompleted = 1; zcs->nextJobID++; ZSTDMT_waitForAllJobsCompleted(zcs); @@ -727,7 +746,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL; zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize; zcs->jobs[jobID].dstBuff = dstBuffer; - zcs->jobs[jobID].cctx = cctx; + zcs->jobs[jobID].cctxPool = zcs->cctxPool; zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0); zcs->jobs[jobID].lastChunk = endFrame; zcs->jobs[jobID].jobCompleted = 0; @@ -804,8 +823,6 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi ZSTDMT_releaseAllJobResources(zcs); return job.cSize; } - ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); - zcs->jobs[wJobID].cctx = NULL; DEBUGLOG(5, "zcs->params.fParams.checksumFlag : %u ", zcs->params.fParams.checksumFlag); if (zcs->params.fParams.checksumFlag) { XXH64_update(&zcs->xxhState, (const char*)job.srcStart + job.dictSize, job.srcSize); @@ -884,7 +901,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, /* fill input buffer */ if ((input->src) && (mtctx->inBuff.buffer.start)) { /* support NULL input */ size_t const toLoad = MIN(input->size - input->pos, mtctx->inBuffSize - mtctx->inBuff.filled); - DEBUGLOG(2, "inBuff:%08X; inBuffSize=%u; ToCopy=%u", (U32)(size_t)mtctx->inBuff.buffer.start, (U32)mtctx->inBuffSize, (U32)toLoad); + DEBUGLOG(5, "inBuff:%08X; inBuffSize=%u; ToCopy=%u", (U32)(size_t)mtctx->inBuff.buffer.start, (U32)mtctx->inBuffSize, (U32)toLoad); memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, toLoad); input->pos += toLoad; mtctx->inBuff.filled += toLoad; diff --git a/tests/fuzzer.c b/tests/fuzzer.c index aa1ebd48..667de08c 100644 --- a/tests/fuzzer.c +++ b/tests/fuzzer.c @@ -105,12 +105,13 @@ static unsigned FUZ_highbit32(U32 v32) typedef struct { unsigned long long totalMalloc; + size_t currentMalloc; size_t peakMalloc; unsigned nbMalloc; unsigned nbFree; } mallocCounter_t; -static const mallocCounter_t INIT_MALLOC_COUNTER = { 0, 0, 0, 0 }; +static const mallocCounter_t INIT_MALLOC_COUNTER = { 0, 0, 0, 0, 0 }; static void* FUZ_mallocDebug(void* counter, size_t size) { @@ -118,7 +119,9 @@ static void* FUZ_mallocDebug(void* counter, size_t size) void* const ptr = malloc(size); if (ptr==NULL) return NULL; mcPtr->totalMalloc += size; - mcPtr->peakMalloc += size; + mcPtr->currentMalloc += size; + if (mcPtr->currentMalloc > mcPtr->peakMalloc) + mcPtr->peakMalloc = mcPtr->currentMalloc; mcPtr->nbMalloc += 1; return ptr; } @@ -126,9 +129,10 @@ static void* FUZ_mallocDebug(void* counter, size_t size) static void FUZ_freeDebug(void* counter, void* address) { mallocCounter_t* const mcPtr = (mallocCounter_t*)counter; - free(address); + DISPLAYLEVEL(4, "releasing %u KB \n", (U32)(malloc_size(address) >> 10)); mcPtr->nbFree += 1; - mcPtr->peakMalloc -= malloc_size(address); /* OS-X specific */ + mcPtr->currentMalloc -= malloc_size(address); /* OS-X specific */ + free(address); } static void FUZ_displayMallocStats(mallocCounter_t count) @@ -139,6 +143,14 @@ static void FUZ_displayMallocStats(mallocCounter_t count) (U32)(count.totalMalloc >> 10)); } +#define CHECK_Z(f) { \ + size_t const err = f; \ + if (ZSTD_isError(err)) { \ + DISPLAY("Error => %s : %s ", \ + #f, ZSTD_getErrorName(err)); \ + exit(1); \ +} } + static int FUZ_mallocTests(unsigned seed, double compressibility) { size_t const inSize = 64 MB + 16 MB + 4 MB + 1 MB + 256 KB + 64 KB; /* 85.3 MB */ @@ -162,7 +174,7 @@ static int FUZ_mallocTests(unsigned seed, double compressibility) mallocCounter_t malcount = INIT_MALLOC_COUNTER; ZSTD_customMem const cMem = { FUZ_mallocDebug, FUZ_freeDebug, &malcount }; ZSTD_CCtx* const cctx = ZSTD_createCCtx_advanced(cMem); - ZSTD_compressCCtx(cctx, outBuffer, outSize, inBuffer, inSize, compressionLevel); + CHECK_Z( ZSTD_compressCCtx(cctx, outBuffer, outSize, inBuffer, inSize, compressionLevel) ); ZSTD_freeCCtx(cctx); DISPLAYLEVEL(3, "compressCCtx level %i : ", compressionLevel); FUZ_displayMallocStats(malcount); @@ -176,9 +188,9 @@ static int FUZ_mallocTests(unsigned seed, double compressibility) ZSTD_CCtx* const cstream = ZSTD_createCStream_advanced(cMem); ZSTD_outBuffer out = { outBuffer, outSize, 0 }; ZSTD_inBuffer in = { inBuffer, inSize, 0 }; - ZSTD_initCStream(cstream, compressionLevel); - ZSTD_compressStream(cstream, &out, &in); - ZSTD_endStream(cstream, &out); + CHECK_Z( ZSTD_initCStream(cstream, compressionLevel) ); + CHECK_Z( ZSTD_compressStream(cstream, &out, &in) ); + CHECK_Z( ZSTD_endStream(cstream, &out) ); ZSTD_freeCStream(cstream); DISPLAYLEVEL(3, "compressStream level %i : ", compressionLevel); FUZ_displayMallocStats(malcount); @@ -194,9 +206,9 @@ static int FUZ_mallocTests(unsigned seed, double compressibility) ZSTD_CCtx* const cctx = ZSTD_createCCtx_advanced(cMem); ZSTD_outBuffer out = { outBuffer, outSize, 0 }; ZSTD_inBuffer in = { inBuffer, inSize, 0 }; - ZSTD_CCtx_setParameter(cctx, ZSTD_p_compressionLevel, (U32)compressionLevel); - ZSTD_CCtx_setParameter(cctx, ZSTD_p_nbThreads, nbThreads); - ZSTD_compress_generic(cctx, &out, &in, ZSTD_e_end); + CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_compressionLevel, (U32)compressionLevel) ); + CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_nbThreads, nbThreads) ); + while ( ZSTD_compress_generic(cctx, &out, &in, ZSTD_e_end) ) {} ZSTD_freeCCtx(cctx); DISPLAYLEVEL(3, "compress_generic,-T%u,end level %i : ", nbThreads, compressionLevel); @@ -213,10 +225,10 @@ static int FUZ_mallocTests(unsigned seed, double compressibility) ZSTD_CCtx* const cctx = ZSTD_createCCtx_advanced(cMem); ZSTD_outBuffer out = { outBuffer, outSize, 0 }; ZSTD_inBuffer in = { inBuffer, inSize, 0 }; - ZSTD_CCtx_setParameter(cctx, ZSTD_p_compressionLevel, (U32)compressionLevel); - ZSTD_CCtx_setParameter(cctx, ZSTD_p_nbThreads, nbThreads); - ZSTD_compress_generic(cctx, &out, &in, ZSTD_e_continue); - ZSTD_compress_generic(cctx, &out, &in, ZSTD_e_end); + CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_compressionLevel, (U32)compressionLevel) ); + CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_nbThreads, nbThreads) ); + CHECK_Z( ZSTD_compress_generic(cctx, &out, &in, ZSTD_e_continue) ); + while ( ZSTD_compress_generic(cctx, &out, &in, ZSTD_e_end) ) {} ZSTD_freeCCtx(cctx); DISPLAYLEVEL(3, "compress_generic,-T%u,continue level %i : ", nbThreads, compressionLevel); @@ -1046,6 +1058,7 @@ static size_t FUZ_randomLength(U32* seed, U32 maxLog) goto _output_error; \ } } +#undef CHECK_Z #define CHECK_Z(f) { \ size_t const err = f; \ if (ZSTD_isError(err)) { \ @@ -1473,7 +1486,7 @@ int main(int argc, const char** argv) if (proba!=FUZ_compressibility_default) DISPLAY("Compressibility : %u%%\n", proba); if (memTestsOnly) { - g_displayLevel=3; + g_displayLevel = MAX(3, g_displayLevel); return FUZ_mallocTests(seed, ((double)proba) / 100); }