buffer pool : all buffers have same size
to reduce memory fragmentation. They can be used for in or out, interchangeably.
This commit is contained in:
parent
34b2b95631
commit
57236184af
@ -84,6 +84,7 @@ static const buffer_t g_nullBuffer = { NULL, 0 };
|
|||||||
|
|
||||||
typedef struct ZSTDMT_bufferPool_s {
|
typedef struct ZSTDMT_bufferPool_s {
|
||||||
pthread_mutex_t poolMutex;
|
pthread_mutex_t poolMutex;
|
||||||
|
size_t bufferSize;
|
||||||
unsigned totalBuffers;
|
unsigned totalBuffers;
|
||||||
unsigned nbBuffers;
|
unsigned nbBuffers;
|
||||||
ZSTD_customMem cMem;
|
ZSTD_customMem cMem;
|
||||||
@ -97,6 +98,7 @@ static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned nbThreads, ZSTD_custo
|
|||||||
sizeof(ZSTDMT_bufferPool) + (maxNbBuffers-1) * sizeof(buffer_t), cMem);
|
sizeof(ZSTDMT_bufferPool) + (maxNbBuffers-1) * sizeof(buffer_t), cMem);
|
||||||
if (bufPool==NULL) return NULL;
|
if (bufPool==NULL) return NULL;
|
||||||
pthread_mutex_init(&bufPool->poolMutex, NULL);
|
pthread_mutex_init(&bufPool->poolMutex, NULL);
|
||||||
|
bufPool->bufferSize = 64 KB;
|
||||||
bufPool->totalBuffers = maxNbBuffers;
|
bufPool->totalBuffers = maxNbBuffers;
|
||||||
bufPool->nbBuffers = 0;
|
bufPool->nbBuffers = 0;
|
||||||
bufPool->cMem = cMem;
|
bufPool->cMem = cMem;
|
||||||
@ -128,10 +130,16 @@ static size_t ZSTDMT_sizeof_bufferPool(ZSTDMT_bufferPool* bufPool)
|
|||||||
return poolSize + totalBufferSize;
|
return poolSize + totalBufferSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** ZSTDMT_getBuffer() :
|
static void ZSTDMT_setBufferSize(ZSTDMT_bufferPool* bufPool, size_t bSize)
|
||||||
* assumption : invocation from main thread only ! */
|
|
||||||
static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool, size_t bSize)
|
|
||||||
{
|
{
|
||||||
|
bufPool->bufferSize = bSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** ZSTDMT_getBuffer() :
|
||||||
|
* assumption : bufPool must be valid */
|
||||||
|
static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool)
|
||||||
|
{
|
||||||
|
size_t const bSize = bufPool->bufferSize;
|
||||||
DEBUGLOG(2, "ZSTDMT_getBuffer");
|
DEBUGLOG(2, "ZSTDMT_getBuffer");
|
||||||
pthread_mutex_lock(&bufPool->poolMutex);
|
pthread_mutex_lock(&bufPool->poolMutex);
|
||||||
if (bufPool->nbBuffers) { /* try to use an existing buffer */
|
if (bufPool->nbBuffers) { /* try to use an existing buffer */
|
||||||
@ -151,9 +159,8 @@ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool, size_t bSize)
|
|||||||
DEBUGLOG(2, "create a new buffer");
|
DEBUGLOG(2, "create a new buffer");
|
||||||
{ buffer_t buffer;
|
{ buffer_t buffer;
|
||||||
void* const start = ZSTD_malloc(bSize, bufPool->cMem);
|
void* const start = ZSTD_malloc(bSize, bufPool->cMem);
|
||||||
if (start==NULL) bSize = 0;
|
|
||||||
buffer.start = start; /* note : start can be NULL if malloc fails ! */
|
buffer.start = start; /* note : start can be NULL if malloc fails ! */
|
||||||
buffer.size = bSize;
|
buffer.size = (start==NULL) ? 0 : bSize;
|
||||||
return buffer;
|
return buffer;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -306,8 +313,7 @@ void ZSTDMT_compressChunk(void* jobDescription)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (dstBuff.start == NULL) {
|
if (dstBuff.start == NULL) {
|
||||||
size_t const dstCapacity = ZSTD_compressBound(job->srcSize);
|
dstBuff = ZSTDMT_getBuffer(job->bufPool);
|
||||||
dstBuff = ZSTDMT_getBuffer(job->bufPool, dstCapacity);
|
|
||||||
if (dstBuff.start==NULL) {
|
if (dstBuff.start==NULL) {
|
||||||
job->cSize = ERROR(memory_allocation);
|
job->cSize = ERROR(memory_allocation);
|
||||||
goto _endJob;
|
goto _endJob;
|
||||||
@ -530,6 +536,7 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx,
|
|||||||
return ZSTD_compress_advanced(cctx, dst, dstCapacity, src, srcSize, NULL, 0, params);
|
return ZSTD_compress_advanced(cctx, dst, dstCapacity, src, srcSize, NULL, 0, params);
|
||||||
}
|
}
|
||||||
assert(avgChunkSize >= 256 KB); /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), which is useful to avoid allocating extra buffers */
|
assert(avgChunkSize >= 256 KB); /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), which is useful to avoid allocating extra buffers */
|
||||||
|
ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgChunkSize) );
|
||||||
|
|
||||||
if (nbChunks > mtctx->jobIDMask+1) { /* enlarge job table */
|
if (nbChunks > mtctx->jobIDMask+1) { /* enlarge job table */
|
||||||
U32 nbJobs = nbChunks;
|
U32 nbJobs = nbChunks;
|
||||||
@ -690,6 +697,7 @@ size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs,
|
|||||||
zcs->targetSectionSize = MAX(zcs->targetDictSize, zcs->targetSectionSize);
|
zcs->targetSectionSize = MAX(zcs->targetDictSize, zcs->targetSectionSize);
|
||||||
DEBUGLOG(4, "Section Size : %u KB", (U32)(zcs->targetSectionSize>>10));
|
DEBUGLOG(4, "Section Size : %u KB", (U32)(zcs->targetSectionSize>>10));
|
||||||
zcs->inBuffSize = zcs->targetDictSize + zcs->targetSectionSize;
|
zcs->inBuffSize = zcs->targetDictSize + zcs->targetSectionSize;
|
||||||
|
ZSTDMT_setBufferSize(zcs->bufPool, MAX(zcs->inBuffSize, ZSTD_compressBound(zcs->targetSectionSize)) );
|
||||||
zcs->inBuff.buffer = g_nullBuffer;
|
zcs->inBuff.buffer = g_nullBuffer;
|
||||||
zcs->dictSize = 0;
|
zcs->dictSize = 0;
|
||||||
zcs->doneJobID = 0;
|
zcs->doneJobID = 0;
|
||||||
@ -767,7 +775,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
|
|||||||
if (!endFrame) {
|
if (!endFrame) {
|
||||||
size_t const newDictSize = MIN(srcSize + zcs->dictSize, zcs->targetDictSize);
|
size_t const newDictSize = MIN(srcSize + zcs->dictSize, zcs->targetDictSize);
|
||||||
DEBUGLOG(5, "ZSTDMT_createCompressionJob::endFrame = %u", endFrame);
|
DEBUGLOG(5, "ZSTDMT_createCompressionJob::endFrame = %u", endFrame);
|
||||||
zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->bufPool, zcs->inBuffSize);
|
zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->bufPool);
|
||||||
if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */
|
if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */
|
||||||
zcs->jobs[jobID].jobCompleted = 1;
|
zcs->jobs[jobID].jobCompleted = 1;
|
||||||
zcs->nextJobID++;
|
zcs->nextJobID++;
|
||||||
@ -910,7 +918,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
|
|||||||
/* fill input buffer */
|
/* fill input buffer */
|
||||||
if (input->size > input->pos) { /* support NULL input */
|
if (input->size > input->pos) { /* support NULL input */
|
||||||
if (mtctx->inBuff.buffer.start == NULL) {
|
if (mtctx->inBuff.buffer.start == NULL) {
|
||||||
mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool, mtctx->inBuffSize);
|
mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool);
|
||||||
if (mtctx->inBuff.buffer.start == NULL) return ERROR(memory_allocation);
|
if (mtctx->inBuff.buffer.start == NULL) return ERROR(memory_allocation);
|
||||||
mtctx->inBuff.filled = 0;
|
mtctx->inBuff.filled = 0;
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user