optimized memory usage for ZSTDMT_compress()

Previously, each job would reserve a CCtx right before being posted.
The CCtx would be "part of the job description",
and only released when the job is completed (aka flushed).
For ZSTDMT_compress(), which creates all jobs first and only join at the end,
that meant one CCtx per job.
The nb of jobs used to be == nb of threads,
but since latest modification,
which reduces the size of jobs in order to spread the load of difficult areas,
it also increases the nb of jobs for large sources / small compression level.
This resulted in many more CCtx being created.

In this new version, CCtx are reserved within the worker thread.
It guaranteea there cannot be more CCtx reserved than workers (<= nb threads).

To do that, it required to make the CCtx Pool multi-threading-safe :
it can now be called from multiple threads in parallel.
dev
Yann Collet 2017-07-10 16:30:55 -07:00
parent 3510efb02d
commit 670b1fc547
2 changed files with 86 additions and 56 deletions

View File

@ -126,6 +126,7 @@ static size_t ZSTDMT_sizeof_bufferPool(ZSTDMT_bufferPool* bufPool)
* assumption : invocation from main thread only ! */ * assumption : invocation from main thread only ! */
static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* pool, size_t bSize) static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* pool, size_t bSize)
{ {
DEBUGLOG(2, "ZSTDMT_getBuffer");
if (pool->nbBuffers) { /* try to use an existing buffer */ if (pool->nbBuffers) { /* try to use an existing buffer */
buffer_t const buf = pool->bTable[--(pool->nbBuffers)]; buffer_t const buf = pool->bTable[--(pool->nbBuffers)];
size_t const availBufferSize = buf.size; size_t const availBufferSize = buf.size;
@ -160,21 +161,23 @@ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* pool, buffer_t buf)
/* ===== CCtx Pool ===== */ /* ===== CCtx Pool ===== */
/* a single cctxPool can be called from multiple threads in parallel */
typedef struct { typedef struct {
pthread_mutex_t poolMutex;
unsigned totalCCtx; unsigned totalCCtx;
unsigned availCCtx; unsigned availCCtx;
ZSTD_customMem cMem; ZSTD_customMem cMem;
ZSTD_CCtx* cctx[1]; /* variable size */ ZSTD_CCtx* cctx[1]; /* variable size */
} ZSTDMT_CCtxPool; } ZSTDMT_CCtxPool;
/* assumption : CCtxPool invocation only from main thread */
/* note : all CCtx borrowed from the pool should be released back to the pool _before_ freeing the pool */ /* note : all CCtx borrowed from the pool should be released back to the pool _before_ freeing the pool */
static void ZSTDMT_freeCCtxPool(ZSTDMT_CCtxPool* pool) static void ZSTDMT_freeCCtxPool(ZSTDMT_CCtxPool* pool)
{ {
unsigned u; unsigned u;
for (u=0; u<pool->totalCCtx; u++) for (u=0; u<pool->totalCCtx; u++)
ZSTD_freeCCtx(pool->cctx[u]); /* note : compatible with free on NULL */ ZSTD_freeCCtx(pool->cctx[u]); /* note : compatible with free on NULL */
pthread_mutex_destroy(&pool->poolMutex);
ZSTD_free(pool, pool->cMem); ZSTD_free(pool, pool->cMem);
} }
@ -186,6 +189,7 @@ static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(unsigned nbThreads,
ZSTDMT_CCtxPool* const cctxPool = (ZSTDMT_CCtxPool*) ZSTD_calloc( ZSTDMT_CCtxPool* const cctxPool = (ZSTDMT_CCtxPool*) ZSTD_calloc(
sizeof(ZSTDMT_CCtxPool) + (nbThreads-1)*sizeof(ZSTD_CCtx*), cMem); sizeof(ZSTDMT_CCtxPool) + (nbThreads-1)*sizeof(ZSTD_CCtx*), cMem);
if (!cctxPool) return NULL; if (!cctxPool) return NULL;
pthread_mutex_init(&cctxPool->poolMutex, NULL);
cctxPool->cMem = cMem; cctxPool->cMem = cMem;
cctxPool->totalCCtx = nbThreads; cctxPool->totalCCtx = nbThreads;
cctxPool->availCCtx = 1; /* at least one cctx for single-thread mode */ cctxPool->availCCtx = 1; /* at least one cctx for single-thread mode */
@ -198,34 +202,47 @@ static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(unsigned nbThreads,
/* only works during initialization phase, not during compression */ /* only works during initialization phase, not during compression */
static size_t ZSTDMT_sizeof_CCtxPool(ZSTDMT_CCtxPool* cctxPool) static size_t ZSTDMT_sizeof_CCtxPool(ZSTDMT_CCtxPool* cctxPool)
{ {
unsigned const nbThreads = cctxPool->totalCCtx; pthread_mutex_lock(&cctxPool->poolMutex);
size_t const poolSize = sizeof(*cctxPool) { unsigned const nbThreads = cctxPool->totalCCtx;
+ (nbThreads-1)*sizeof(ZSTD_CCtx*); size_t const poolSize = sizeof(*cctxPool)
unsigned u; + (nbThreads-1)*sizeof(ZSTD_CCtx*);
size_t totalCCtxSize = 0; unsigned u;
for (u=0; u<nbThreads; u++) size_t totalCCtxSize = 0;
totalCCtxSize += ZSTD_sizeof_CCtx(cctxPool->cctx[u]); for (u=0; u<nbThreads; u++) {
totalCCtxSize += ZSTD_sizeof_CCtx(cctxPool->cctx[u]);
return poolSize + totalCCtxSize; }
pthread_mutex_unlock(&cctxPool->poolMutex);
return poolSize + totalCCtxSize;
}
} }
static ZSTD_CCtx* ZSTDMT_getCCtx(ZSTDMT_CCtxPool* pool) static ZSTD_CCtx* ZSTDMT_getCCtx(ZSTDMT_CCtxPool* cctxPool)
{ {
if (pool->availCCtx) { DEBUGLOG(5, "ZSTDMT_getCCtx");
pool->availCCtx--; pthread_mutex_lock(&cctxPool->poolMutex);
return pool->cctx[pool->availCCtx]; if (cctxPool->availCCtx) {
} cctxPool->availCCtx--;
return ZSTD_createCCtx_advanced(pool->cMem); /* note : can be NULL, when creation fails ! */ { ZSTD_CCtx* const cctx = cctxPool->cctx[cctxPool->availCCtx];
pthread_mutex_unlock(&cctxPool->poolMutex);
return cctx;
} }
pthread_mutex_unlock(&cctxPool->poolMutex);
DEBUGLOG(5, "create one more CCtx");
return ZSTD_createCCtx_advanced(cctxPool->cMem); /* note : can be NULL, when creation fails ! */
} }
static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx) static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx)
{ {
if (cctx==NULL) return; /* compatibility with release on NULL */ if (cctx==NULL) return; /* compatibility with release on NULL */
pthread_mutex_lock(&pool->poolMutex);
if (pool->availCCtx < pool->totalCCtx) if (pool->availCCtx < pool->totalCCtx)
pool->cctx[pool->availCCtx++] = cctx; pool->cctx[pool->availCCtx++] = cctx;
else else {
/* pool overflow : should not happen, since totalCCtx==nbThreads */ /* pool overflow : should not happen, since totalCCtx==nbThreads */
DEBUGLOG(5, "CCtx pool overflow : free cctx");
ZSTD_freeCCtx(cctx); ZSTD_freeCCtx(cctx);
}
pthread_mutex_unlock(&pool->poolMutex);
} }
@ -237,7 +254,6 @@ typedef struct {
} inBuff_t; } inBuff_t;
typedef struct { typedef struct {
ZSTD_CCtx* cctx;
buffer_t src; buffer_t src;
const void* srcStart; const void* srcStart;
size_t srcSize; size_t srcSize;
@ -253,6 +269,7 @@ typedef struct {
pthread_cond_t* jobCompleted_cond; pthread_cond_t* jobCompleted_cond;
ZSTD_parameters params; ZSTD_parameters params;
const ZSTD_CDict* cdict; const ZSTD_CDict* cdict;
ZSTDMT_CCtxPool* cctxPool;
unsigned long long fullFrameSize; unsigned long long fullFrameSize;
} ZSTDMT_jobDescription; } ZSTDMT_jobDescription;
@ -260,37 +277,45 @@ typedef struct {
void ZSTDMT_compressChunk(void* jobDescription) void ZSTDMT_compressChunk(void* jobDescription)
{ {
ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription; ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription;
ZSTD_CCtx* cctx = ZSTDMT_getCCtx(job->cctxPool);
const void* const src = (const char*)job->srcStart + job->dictSize; const void* const src = (const char*)job->srcStart + job->dictSize;
buffer_t const dstBuff = job->dstBuff; buffer_t const dstBuff = job->dstBuff;
DEBUGLOG(5, "job (first:%u) (last:%u) : dictSize %u, srcSize %u", DEBUGLOG(5, "job (first:%u) (last:%u) : dictSize %u, srcSize %u",
job->firstChunk, job->lastChunk, (U32)job->dictSize, (U32)job->srcSize); job->firstChunk, job->lastChunk, (U32)job->dictSize, (U32)job->srcSize);
if (cctx==NULL) {
job->cSize = ERROR(memory_allocation);
goto _endJob;
}
if (job->cdict) { /* should only happen for first segment */ if (job->cdict) { /* should only happen for first segment */
size_t const initError = ZSTD_compressBegin_usingCDict_advanced(job->cctx, job->cdict, job->params.fParams, job->fullFrameSize); size_t const initError = ZSTD_compressBegin_usingCDict_advanced(cctx, job->cdict, job->params.fParams, job->fullFrameSize);
DEBUGLOG(5, "using CDict"); DEBUGLOG(5, "using CDict");
if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; } if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; }
} else { /* srcStart points at reloaded section */ } else { /* srcStart points at reloaded section */
if (!job->firstChunk) job->params.fParams.contentSizeFlag = 0; /* ensure no srcSize control */ if (!job->firstChunk) job->params.fParams.contentSizeFlag = 0; /* ensure no srcSize control */
{ size_t const dictModeError = ZSTD_setCCtxParameter(job->cctx, ZSTD_p_forceRawDict, 1); /* Force loading dictionary in "content-only" mode (no header analysis) */ { size_t const dictModeError = ZSTD_setCCtxParameter(cctx, ZSTD_p_forceRawDict, 1); /* Force loading dictionary in "content-only" mode (no header analysis) */
size_t const initError = ZSTD_compressBegin_advanced(job->cctx, job->srcStart, job->dictSize, job->params, job->fullFrameSize); size_t const initError = ZSTD_compressBegin_advanced(cctx, job->srcStart, job->dictSize, job->params, job->fullFrameSize);
if (ZSTD_isError(initError) || ZSTD_isError(dictModeError)) { job->cSize = initError; goto _endJob; } if (ZSTD_isError(initError) || ZSTD_isError(dictModeError)) { job->cSize = initError; goto _endJob; }
ZSTD_setCCtxParameter(job->cctx, ZSTD_p_forceWindow, 1); ZSTD_setCCtxParameter(cctx, ZSTD_p_forceWindow, 1);
} } } }
if (!job->firstChunk) { /* flush and overwrite frame header when it's not first segment */ if (!job->firstChunk) { /* flush and overwrite frame header when it's not first segment */
size_t const hSize = ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, src, 0); size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, 0);
if (ZSTD_isError(hSize)) { job->cSize = hSize; goto _endJob; } if (ZSTD_isError(hSize)) { job->cSize = hSize; goto _endJob; }
ZSTD_invalidateRepCodes(job->cctx); ZSTD_invalidateRepCodes(cctx);
} }
DEBUGLOG(5, "Compressing : "); DEBUGLOG(5, "Compressing : ");
DEBUG_PRINTHEX(4, job->srcStart, 12); DEBUG_PRINTHEX(4, job->srcStart, 12);
job->cSize = (job->lastChunk) ? job->cSize = (job->lastChunk) ?
ZSTD_compressEnd (job->cctx, dstBuff.start, dstBuff.size, src, job->srcSize) : ZSTD_compressEnd (cctx, dstBuff.start, dstBuff.size, src, job->srcSize) :
ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, src, job->srcSize); ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, job->srcSize);
DEBUGLOG(5, "compressed %u bytes into %u bytes (first:%u) (last:%u)", DEBUGLOG(5, "compressed %u bytes into %u bytes (first:%u) (last:%u)",
(unsigned)job->srcSize, (unsigned)job->cSize, job->firstChunk, job->lastChunk); (unsigned)job->srcSize, (unsigned)job->cSize, job->firstChunk, job->lastChunk);
DEBUGLOG(5, "dstBuff.size : %u ; => %s", (U32)dstBuff.size, ZSTD_getErrorName(job->cSize)); DEBUGLOG(5, "dstBuff.size : %u ; => %s", (U32)dstBuff.size, ZSTD_getErrorName(job->cSize));
_endJob: _endJob:
ZSTDMT_releaseCCtx(job->cctxPool, cctx);
PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex);
job->jobCompleted = 1; job->jobCompleted = 1;
job->jobScanned = 0; job->jobScanned = 0;
@ -390,8 +415,6 @@ static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx)
mtctx->jobs[jobID].dstBuff = g_nullBuffer; mtctx->jobs[jobID].dstBuff = g_nullBuffer;
ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[jobID].src); ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[jobID].src);
mtctx->jobs[jobID].src = g_nullBuffer; mtctx->jobs[jobID].src = g_nullBuffer;
ZSTDMT_releaseCCtx(mtctx->cctxPool, mtctx->jobs[jobID].cctx);
mtctx->jobs[jobID].cctx = NULL;
} }
memset(mtctx->jobs, 0, (mtctx->jobIDMask+1)*sizeof(ZSTDMT_jobDescription)); memset(mtctx->jobs, 0, (mtctx->jobIDMask+1)*sizeof(ZSTDMT_jobDescription));
ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->inBuff.buffer); ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->inBuff.buffer);
@ -497,10 +520,9 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx,
size_t const dstBufferCapacity = ZSTD_compressBound(chunkSize); size_t const dstBufferCapacity = ZSTD_compressBound(chunkSize);
buffer_t const dstAsBuffer = { (char*)dst + dstBufferPos, dstBufferCapacity }; buffer_t const dstAsBuffer = { (char*)dst + dstBufferPos, dstBufferCapacity };
buffer_t const dstBuffer = u < compressWithinDst ? dstAsBuffer : ZSTDMT_getBuffer(mtctx->buffPool, dstBufferCapacity); buffer_t const dstBuffer = u < compressWithinDst ? dstAsBuffer : ZSTDMT_getBuffer(mtctx->buffPool, dstBufferCapacity);
ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(mtctx->cctxPool);
size_t dictSize = u ? overlapSize : 0; size_t dictSize = u ? overlapSize : 0;
if ((cctx==NULL) || (dstBuffer.start==NULL)) { if (dstBuffer.start==NULL) {
mtctx->jobs[u].cSize = ERROR(memory_allocation); /* job result */ mtctx->jobs[u].cSize = ERROR(memory_allocation); /* job result */
mtctx->jobs[u].jobCompleted = 1; mtctx->jobs[u].jobCompleted = 1;
nbChunks = u+1; /* only wait and free u jobs, instead of initially expected nbChunks ones */ nbChunks = u+1; /* only wait and free u jobs, instead of initially expected nbChunks ones */
@ -516,7 +538,7 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx,
/* do not calculate checksum within sections, but write it in header for first section */ /* do not calculate checksum within sections, but write it in header for first section */
if (u!=0) mtctx->jobs[u].params.fParams.checksumFlag = 0; if (u!=0) mtctx->jobs[u].params.fParams.checksumFlag = 0;
mtctx->jobs[u].dstBuff = dstBuffer; mtctx->jobs[u].dstBuff = dstBuffer;
mtctx->jobs[u].cctx = cctx; mtctx->jobs[u].cctxPool = mtctx->cctxPool;
mtctx->jobs[u].firstChunk = (u==0); mtctx->jobs[u].firstChunk = (u==0);
mtctx->jobs[u].lastChunk = (u==nbChunks-1); mtctx->jobs[u].lastChunk = (u==nbChunks-1);
mtctx->jobs[u].jobCompleted = 0; mtctx->jobs[u].jobCompleted = 0;
@ -545,8 +567,6 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx,
pthread_mutex_unlock(&mtctx->jobCompleted_mutex); pthread_mutex_unlock(&mtctx->jobCompleted_mutex);
DEBUGLOG(5, "ready to write chunk %u ", chunkID); DEBUGLOG(5, "ready to write chunk %u ", chunkID);
ZSTDMT_releaseCCtx(mtctx->cctxPool, mtctx->jobs[chunkID].cctx);
mtctx->jobs[chunkID].cctx = NULL;
mtctx->jobs[chunkID].srcStart = NULL; mtctx->jobs[chunkID].srcStart = NULL;
{ size_t const cSize = mtctx->jobs[chunkID].cSize; { size_t const cSize = mtctx->jobs[chunkID].cSize;
if (ZSTD_isError(cSize)) error = cSize; if (ZSTD_isError(cSize)) error = cSize;
@ -703,10 +723,9 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
{ {
size_t const dstBufferCapacity = ZSTD_compressBound(srcSize); size_t const dstBufferCapacity = ZSTD_compressBound(srcSize);
buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity); buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity);
ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool);
unsigned const jobID = zcs->nextJobID & zcs->jobIDMask; unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
if ((cctx==NULL) || (dstBuffer.start==NULL)) { if (dstBuffer.start==NULL) {
zcs->jobs[jobID].jobCompleted = 1; zcs->jobs[jobID].jobCompleted = 1;
zcs->nextJobID++; zcs->nextJobID++;
ZSTDMT_waitForAllJobsCompleted(zcs); ZSTDMT_waitForAllJobsCompleted(zcs);
@ -727,7 +746,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL; zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL;
zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize; zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize;
zcs->jobs[jobID].dstBuff = dstBuffer; zcs->jobs[jobID].dstBuff = dstBuffer;
zcs->jobs[jobID].cctx = cctx; zcs->jobs[jobID].cctxPool = zcs->cctxPool;
zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0); zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0);
zcs->jobs[jobID].lastChunk = endFrame; zcs->jobs[jobID].lastChunk = endFrame;
zcs->jobs[jobID].jobCompleted = 0; zcs->jobs[jobID].jobCompleted = 0;
@ -804,8 +823,6 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi
ZSTDMT_releaseAllJobResources(zcs); ZSTDMT_releaseAllJobResources(zcs);
return job.cSize; return job.cSize;
} }
ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx);
zcs->jobs[wJobID].cctx = NULL;
DEBUGLOG(5, "zcs->params.fParams.checksumFlag : %u ", zcs->params.fParams.checksumFlag); DEBUGLOG(5, "zcs->params.fParams.checksumFlag : %u ", zcs->params.fParams.checksumFlag);
if (zcs->params.fParams.checksumFlag) { if (zcs->params.fParams.checksumFlag) {
XXH64_update(&zcs->xxhState, (const char*)job.srcStart + job.dictSize, job.srcSize); XXH64_update(&zcs->xxhState, (const char*)job.srcStart + job.dictSize, job.srcSize);
@ -884,7 +901,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
/* fill input buffer */ /* fill input buffer */
if ((input->src) && (mtctx->inBuff.buffer.start)) { /* support NULL input */ if ((input->src) && (mtctx->inBuff.buffer.start)) { /* support NULL input */
size_t const toLoad = MIN(input->size - input->pos, mtctx->inBuffSize - mtctx->inBuff.filled); size_t const toLoad = MIN(input->size - input->pos, mtctx->inBuffSize - mtctx->inBuff.filled);
DEBUGLOG(2, "inBuff:%08X; inBuffSize=%u; ToCopy=%u", (U32)(size_t)mtctx->inBuff.buffer.start, (U32)mtctx->inBuffSize, (U32)toLoad); DEBUGLOG(5, "inBuff:%08X; inBuffSize=%u; ToCopy=%u", (U32)(size_t)mtctx->inBuff.buffer.start, (U32)mtctx->inBuffSize, (U32)toLoad);
memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, toLoad); memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, toLoad);
input->pos += toLoad; input->pos += toLoad;
mtctx->inBuff.filled += toLoad; mtctx->inBuff.filled += toLoad;

View File

@ -105,12 +105,13 @@ static unsigned FUZ_highbit32(U32 v32)
typedef struct { typedef struct {
unsigned long long totalMalloc; unsigned long long totalMalloc;
size_t currentMalloc;
size_t peakMalloc; size_t peakMalloc;
unsigned nbMalloc; unsigned nbMalloc;
unsigned nbFree; unsigned nbFree;
} mallocCounter_t; } mallocCounter_t;
static const mallocCounter_t INIT_MALLOC_COUNTER = { 0, 0, 0, 0 }; static const mallocCounter_t INIT_MALLOC_COUNTER = { 0, 0, 0, 0, 0 };
static void* FUZ_mallocDebug(void* counter, size_t size) static void* FUZ_mallocDebug(void* counter, size_t size)
{ {
@ -118,7 +119,9 @@ static void* FUZ_mallocDebug(void* counter, size_t size)
void* const ptr = malloc(size); void* const ptr = malloc(size);
if (ptr==NULL) return NULL; if (ptr==NULL) return NULL;
mcPtr->totalMalloc += size; mcPtr->totalMalloc += size;
mcPtr->peakMalloc += size; mcPtr->currentMalloc += size;
if (mcPtr->currentMalloc > mcPtr->peakMalloc)
mcPtr->peakMalloc = mcPtr->currentMalloc;
mcPtr->nbMalloc += 1; mcPtr->nbMalloc += 1;
return ptr; return ptr;
} }
@ -126,9 +129,10 @@ static void* FUZ_mallocDebug(void* counter, size_t size)
static void FUZ_freeDebug(void* counter, void* address) static void FUZ_freeDebug(void* counter, void* address)
{ {
mallocCounter_t* const mcPtr = (mallocCounter_t*)counter; mallocCounter_t* const mcPtr = (mallocCounter_t*)counter;
free(address); DISPLAYLEVEL(4, "releasing %u KB \n", (U32)(malloc_size(address) >> 10));
mcPtr->nbFree += 1; mcPtr->nbFree += 1;
mcPtr->peakMalloc -= malloc_size(address); /* OS-X specific */ mcPtr->currentMalloc -= malloc_size(address); /* OS-X specific */
free(address);
} }
static void FUZ_displayMallocStats(mallocCounter_t count) static void FUZ_displayMallocStats(mallocCounter_t count)
@ -139,6 +143,14 @@ static void FUZ_displayMallocStats(mallocCounter_t count)
(U32)(count.totalMalloc >> 10)); (U32)(count.totalMalloc >> 10));
} }
#define CHECK_Z(f) { \
size_t const err = f; \
if (ZSTD_isError(err)) { \
DISPLAY("Error => %s : %s ", \
#f, ZSTD_getErrorName(err)); \
exit(1); \
} }
static int FUZ_mallocTests(unsigned seed, double compressibility) static int FUZ_mallocTests(unsigned seed, double compressibility)
{ {
size_t const inSize = 64 MB + 16 MB + 4 MB + 1 MB + 256 KB + 64 KB; /* 85.3 MB */ size_t const inSize = 64 MB + 16 MB + 4 MB + 1 MB + 256 KB + 64 KB; /* 85.3 MB */
@ -162,7 +174,7 @@ static int FUZ_mallocTests(unsigned seed, double compressibility)
mallocCounter_t malcount = INIT_MALLOC_COUNTER; mallocCounter_t malcount = INIT_MALLOC_COUNTER;
ZSTD_customMem const cMem = { FUZ_mallocDebug, FUZ_freeDebug, &malcount }; ZSTD_customMem const cMem = { FUZ_mallocDebug, FUZ_freeDebug, &malcount };
ZSTD_CCtx* const cctx = ZSTD_createCCtx_advanced(cMem); ZSTD_CCtx* const cctx = ZSTD_createCCtx_advanced(cMem);
ZSTD_compressCCtx(cctx, outBuffer, outSize, inBuffer, inSize, compressionLevel); CHECK_Z( ZSTD_compressCCtx(cctx, outBuffer, outSize, inBuffer, inSize, compressionLevel) );
ZSTD_freeCCtx(cctx); ZSTD_freeCCtx(cctx);
DISPLAYLEVEL(3, "compressCCtx level %i : ", compressionLevel); DISPLAYLEVEL(3, "compressCCtx level %i : ", compressionLevel);
FUZ_displayMallocStats(malcount); FUZ_displayMallocStats(malcount);
@ -176,9 +188,9 @@ static int FUZ_mallocTests(unsigned seed, double compressibility)
ZSTD_CCtx* const cstream = ZSTD_createCStream_advanced(cMem); ZSTD_CCtx* const cstream = ZSTD_createCStream_advanced(cMem);
ZSTD_outBuffer out = { outBuffer, outSize, 0 }; ZSTD_outBuffer out = { outBuffer, outSize, 0 };
ZSTD_inBuffer in = { inBuffer, inSize, 0 }; ZSTD_inBuffer in = { inBuffer, inSize, 0 };
ZSTD_initCStream(cstream, compressionLevel); CHECK_Z( ZSTD_initCStream(cstream, compressionLevel) );
ZSTD_compressStream(cstream, &out, &in); CHECK_Z( ZSTD_compressStream(cstream, &out, &in) );
ZSTD_endStream(cstream, &out); CHECK_Z( ZSTD_endStream(cstream, &out) );
ZSTD_freeCStream(cstream); ZSTD_freeCStream(cstream);
DISPLAYLEVEL(3, "compressStream level %i : ", compressionLevel); DISPLAYLEVEL(3, "compressStream level %i : ", compressionLevel);
FUZ_displayMallocStats(malcount); FUZ_displayMallocStats(malcount);
@ -194,9 +206,9 @@ static int FUZ_mallocTests(unsigned seed, double compressibility)
ZSTD_CCtx* const cctx = ZSTD_createCCtx_advanced(cMem); ZSTD_CCtx* const cctx = ZSTD_createCCtx_advanced(cMem);
ZSTD_outBuffer out = { outBuffer, outSize, 0 }; ZSTD_outBuffer out = { outBuffer, outSize, 0 };
ZSTD_inBuffer in = { inBuffer, inSize, 0 }; ZSTD_inBuffer in = { inBuffer, inSize, 0 };
ZSTD_CCtx_setParameter(cctx, ZSTD_p_compressionLevel, (U32)compressionLevel); CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_compressionLevel, (U32)compressionLevel) );
ZSTD_CCtx_setParameter(cctx, ZSTD_p_nbThreads, nbThreads); CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_nbThreads, nbThreads) );
ZSTD_compress_generic(cctx, &out, &in, ZSTD_e_end); while ( ZSTD_compress_generic(cctx, &out, &in, ZSTD_e_end) ) {}
ZSTD_freeCCtx(cctx); ZSTD_freeCCtx(cctx);
DISPLAYLEVEL(3, "compress_generic,-T%u,end level %i : ", DISPLAYLEVEL(3, "compress_generic,-T%u,end level %i : ",
nbThreads, compressionLevel); nbThreads, compressionLevel);
@ -213,10 +225,10 @@ static int FUZ_mallocTests(unsigned seed, double compressibility)
ZSTD_CCtx* const cctx = ZSTD_createCCtx_advanced(cMem); ZSTD_CCtx* const cctx = ZSTD_createCCtx_advanced(cMem);
ZSTD_outBuffer out = { outBuffer, outSize, 0 }; ZSTD_outBuffer out = { outBuffer, outSize, 0 };
ZSTD_inBuffer in = { inBuffer, inSize, 0 }; ZSTD_inBuffer in = { inBuffer, inSize, 0 };
ZSTD_CCtx_setParameter(cctx, ZSTD_p_compressionLevel, (U32)compressionLevel); CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_compressionLevel, (U32)compressionLevel) );
ZSTD_CCtx_setParameter(cctx, ZSTD_p_nbThreads, nbThreads); CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_nbThreads, nbThreads) );
ZSTD_compress_generic(cctx, &out, &in, ZSTD_e_continue); CHECK_Z( ZSTD_compress_generic(cctx, &out, &in, ZSTD_e_continue) );
ZSTD_compress_generic(cctx, &out, &in, ZSTD_e_end); while ( ZSTD_compress_generic(cctx, &out, &in, ZSTD_e_end) ) {}
ZSTD_freeCCtx(cctx); ZSTD_freeCCtx(cctx);
DISPLAYLEVEL(3, "compress_generic,-T%u,continue level %i : ", DISPLAYLEVEL(3, "compress_generic,-T%u,continue level %i : ",
nbThreads, compressionLevel); nbThreads, compressionLevel);
@ -1046,6 +1058,7 @@ static size_t FUZ_randomLength(U32* seed, U32 maxLog)
goto _output_error; \ goto _output_error; \
} } } }
#undef CHECK_Z
#define CHECK_Z(f) { \ #define CHECK_Z(f) { \
size_t const err = f; \ size_t const err = f; \
if (ZSTD_isError(err)) { \ if (ZSTD_isError(err)) { \
@ -1473,7 +1486,7 @@ int main(int argc, const char** argv)
if (proba!=FUZ_compressibility_default) DISPLAY("Compressibility : %u%%\n", proba); if (proba!=FUZ_compressibility_default) DISPLAY("Compressibility : %u%%\n", proba);
if (memTestsOnly) { if (memTestsOnly) {
g_displayLevel=3; g_displayLevel = MAX(3, g_displayLevel);
return FUZ_mallocTests(seed, ((double)proba) / 100); return FUZ_mallocTests(seed, ((double)proba) / 100);
} }