mtctx->jobs allocate its own memory space

to make ZSTDMT_CCtx_s size predictable
so that it can be included in CCtx
dev
Yann Collet 2017-05-27 00:21:33 -07:00
parent b8136f019a
commit e071159101
1 changed files with 8 additions and 5 deletions

View File

@ -271,6 +271,7 @@ _endJob:
struct ZSTDMT_CCtx_s { struct ZSTDMT_CCtx_s {
POOL_ctx* factory; POOL_ctx* factory;
ZSTDMT_jobDescription* jobs;
ZSTDMT_bufferPool* buffPool; ZSTDMT_bufferPool* buffPool;
ZSTDMT_CCtxPool* cctxPool; ZSTDMT_CCtxPool* cctxPool;
pthread_mutex_t jobCompleted_mutex; pthread_mutex_t jobCompleted_mutex;
@ -294,10 +295,9 @@ struct ZSTDMT_CCtx_s {
size_t sectionSize; size_t sectionSize;
ZSTD_CDict* cdict; ZSTD_CDict* cdict;
ZSTD_CStream* cstream; ZSTD_CStream* cstream;
ZSTDMT_jobDescription jobs[1]; /* variable size (must lies at the end) */
}; };
ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads) ZSTDMT_CCtx* ZSTDMT_createCCtx(unsigned nbThreads)
{ {
ZSTDMT_CCtx* cctx; ZSTDMT_CCtx* cctx;
U32 const minNbJobs = nbThreads + 2; U32 const minNbJobs = nbThreads + 2;
@ -306,7 +306,7 @@ ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads)
DEBUGLOG(5, "nbThreads : %u ; minNbJobs : %u ; nbJobsLog2 : %u ; nbJobs : %u \n", DEBUGLOG(5, "nbThreads : %u ; minNbJobs : %u ; nbJobsLog2 : %u ; nbJobs : %u \n",
nbThreads, minNbJobs, nbJobsLog2, nbJobs); nbThreads, minNbJobs, nbJobsLog2, nbJobs);
if ((nbThreads < 1) | (nbThreads > ZSTDMT_NBTHREADS_MAX)) return NULL; if ((nbThreads < 1) | (nbThreads > ZSTDMT_NBTHREADS_MAX)) return NULL;
cctx = (ZSTDMT_CCtx*) calloc(1, sizeof(ZSTDMT_CCtx) + nbJobs*sizeof(ZSTDMT_jobDescription)); cctx = (ZSTDMT_CCtx*) calloc(1, sizeof(ZSTDMT_CCtx));
if (!cctx) return NULL; if (!cctx) return NULL;
cctx->nbThreads = nbThreads; cctx->nbThreads = nbThreads;
cctx->jobIDMask = nbJobs - 1; cctx->jobIDMask = nbJobs - 1;
@ -314,9 +314,10 @@ ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads)
cctx->sectionSize = 0; cctx->sectionSize = 0;
cctx->overlapRLog = 3; cctx->overlapRLog = 3;
cctx->factory = POOL_create(nbThreads, 1); cctx->factory = POOL_create(nbThreads, 1);
cctx->jobs = malloc(nbJobs * sizeof(*cctx->jobs));
cctx->buffPool = ZSTDMT_createBufferPool(nbThreads); cctx->buffPool = ZSTDMT_createBufferPool(nbThreads);
cctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads); cctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads);
if (!cctx->factory | !cctx->buffPool | !cctx->cctxPool) { /* one object was not created */ if (!cctx->factory | !cctx->jobs | !cctx->buffPool | !cctx->cctxPool) {
ZSTDMT_freeCCtx(cctx); ZSTDMT_freeCCtx(cctx);
return NULL; return NULL;
} }
@ -356,6 +357,7 @@ size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx)
POOL_free(mtctx->factory); POOL_free(mtctx->factory);
if (!mtctx->allJobsCompleted) ZSTDMT_releaseAllJobResources(mtctx); /* stop workers first */ if (!mtctx->allJobsCompleted) ZSTDMT_releaseAllJobResources(mtctx); /* stop workers first */
ZSTDMT_freeBufferPool(mtctx->buffPool); /* release job resources into pools first */ ZSTDMT_freeBufferPool(mtctx->buffPool); /* release job resources into pools first */
free(mtctx->jobs);
ZSTDMT_freeCCtxPool(mtctx->cctxPool); ZSTDMT_freeCCtxPool(mtctx->cctxPool);
ZSTD_freeCDict(mtctx->cdict); ZSTD_freeCDict(mtctx->cdict);
ZSTD_freeCStream(mtctx->cstream); ZSTD_freeCStream(mtctx->cstream);
@ -491,7 +493,8 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
/* ======= Streaming API ======= */ /* ======= Streaming API ======= */
/* ====================================== */ /* ====================================== */
static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* zcs) { static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* zcs)
{
while (zcs->doneJobID < zcs->nextJobID) { while (zcs->doneJobID < zcs->nextJobID) {
unsigned const jobID = zcs->doneJobID & zcs->jobIDMask; unsigned const jobID = zcs->doneJobID & zcs->jobIDMask;
PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex); PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);