ZSTDMT_initCStream() supports restart from invalid state

ZSTDMT_initCStream() will correcly scrub for resources
when it detects that previous compression was not properly finished.
dev
Yann Collet 2017-01-18 15:18:17 -08:00
parent 4885f591b3
commit 3a01c46b26
1 changed files with 14 additions and 4 deletions

View File

@ -228,6 +228,7 @@ struct ZSTDMT_CCtx_s {
unsigned doneJobID;
unsigned nextJobID;
unsigned frameEnded;
unsigned allJobsCompleted;
ZSTDMT_jobDescription jobs[1]; /* variable size (must lies at the end) */
};
@ -244,6 +245,7 @@ ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads)
if (!cctx) return NULL;
cctx->nbThreads = nbThreads;
cctx->jobIDMask = nbJobs - 1;
cctx->allJobsCompleted = 1;
cctx->factory = POOL_create(nbThreads, 1);
cctx->buffPool = ZSTDMT_createBufferPool(nbThreads);
cctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads);
@ -277,8 +279,8 @@ size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx)
{
if (mtctx==NULL) return 0; /* compatible with free on NULL */
POOL_free(mtctx->factory);
ZSTDMT_releaseAllJobResources(mtctx); /* kill workers first */
ZSTDMT_freeBufferPool(mtctx->buffPool); /* release job resources first */
if (!mtctx->allJobsCompleted) ZSTDMT_releaseAllJobResources(mtctx); /* stop workers first */
ZSTDMT_freeBufferPool(mtctx->buffPool); /* release job resources into pools first */
ZSTDMT_freeCCtxPool(mtctx->cctxPool);
pthread_mutex_destroy(&mtctx->jobCompleted_mutex);
pthread_cond_destroy(&mtctx->jobCompleted_cond);
@ -393,6 +395,11 @@ static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* zcs) {
}
size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) {
if (zcs->allJobsCompleted == 0) { /* previous job not correctly finished */
ZSTDMT_waitForAllJobsCompleted(zcs);
ZSTDMT_releaseAllJobResources(zcs);
zcs->allJobsCompleted = 1;
}
zcs->params = ZSTD_getParams(compressionLevel, 0, 0);
zcs->targetSectionSize = (size_t)1 << (zcs->params.cParams.windowLog + 2);
zcs->inBuffSize = 5 * (1 << zcs->params.cParams.windowLog);
@ -402,13 +409,14 @@ size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) {
zcs->doneJobID = 0;
zcs->nextJobID = 0;
zcs->frameEnded = 0;
zcs->allJobsCompleted = 0;
return 0;
}
size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
{
if (zcs->frameEnded) return ERROR(stage_wrong); /* current frame being ended. Finish it and restart a new one */
if (zcs->frameEnded) return ERROR(stage_wrong); /* current frame being ended. Only flush is allowed. Restart with init */
/* fill input buffer */
{ size_t const toLoad = MIN(input->size - input->pos, zcs->inBuffSize - zcs->inBuff.filled);
@ -573,7 +581,9 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
}
/* return value : how many bytes left in buffer ; fake it to 1 if unknown but >0 */
if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed);
return (zcs->doneJobID < zcs->nextJobID);
if (zcs->doneJobID < zcs->nextJobID) return 1; /* still some buffer to flush */
zcs->allJobsCompleted = zcs->frameEnded;
return 0;
} }
}