ensure all writes to job->cSize are mutex protected

even when reporting errors,
using a macro for code brevity, as suggested by @terrelln,
This commit is contained in:
Yann Collet 2018-09-21 15:37:30 -07:00
parent 7992942d66
commit bfff4f4809
2 changed files with 24 additions and 31 deletions

View File

@ -632,6 +632,13 @@ typedef struct {
unsigned frameChecksumNeeded; /* used only by mtctx */
} ZSTDMT_jobDescription;
#define JOB_ERROR(e) { \
ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); \
job->cSize = e; \
ZSTD_pthread_mutex_unlock(&job->job_mutex); \
goto _endJob; \
}
/* ZSTDMT_compressionJob() is a POOL_function type */
void ZSTDMT_compressionJob(void* jobDescription)
{
@ -643,22 +650,14 @@ void ZSTDMT_compressionJob(void* jobDescription)
size_t lastCBlockSize = 0;
/* ressources */
if (cctx==NULL) {
job->cSize = ERROR(memory_allocation);
goto _endJob;
}
if (cctx==NULL) JOB_ERROR(ERROR(memory_allocation));
if (dstBuff.start == NULL) { /* streaming job : doesn't provide a dstBuffer */
dstBuff = ZSTDMT_getBuffer(job->bufPool);
if (dstBuff.start==NULL) {
job->cSize = ERROR(memory_allocation);
goto _endJob;
}
if (dstBuff.start==NULL) JOB_ERROR(ERROR(memory_allocation));
job->dstBuff = dstBuff; /* this value can be read in ZSTDMT_flush, when it copies the whole job */
}
if (jobParams.ldmParams.enableLdm && rawSeqStore.seq == NULL) {
job->cSize = ERROR(memory_allocation);
goto _endJob;
}
if (jobParams.ldmParams.enableLdm && rawSeqStore.seq == NULL)
JOB_ERROR(ERROR(memory_allocation));
/* Don't compute the checksum for chunks, since we compute it externally,
* but write it in the header.
@ -672,30 +671,26 @@ void ZSTDMT_compressionJob(void* jobDescription)
if (job->cdict) {
size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dct_auto, ZSTD_dtlm_fast, job->cdict, jobParams, job->fullFrameSize);
assert(job->firstJob); /* only allowed for first job */
if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; }
if (ZSTD_isError(initError)) JOB_ERROR(initError);
} else { /* srcStart points at reloaded section */
U64 const pledgedSrcSize = job->firstJob ? job->fullFrameSize : job->src.size;
{ size_t const forceWindowError = ZSTD_CCtxParam_setParameter(&jobParams, ZSTD_p_forceMaxWindow, !job->firstJob);
if (ZSTD_isError(forceWindowError)) {
job->cSize = forceWindowError;
goto _endJob;
} }
if (ZSTD_isError(forceWindowError)) JOB_ERROR(forceWindowError);
}
{ size_t const initError = ZSTD_compressBegin_advanced_internal(cctx,
job->prefix.start, job->prefix.size, ZSTD_dct_rawContent, /* load dictionary in "content-only" mode (no header analysis) */
ZSTD_dtlm_fast,
NULL, /*cdict*/
jobParams, pledgedSrcSize);
if (ZSTD_isError(initError)) {
job->cSize = initError;
goto _endJob;
} } }
if (ZSTD_isError(initError)) JOB_ERROR(initError);
} }
/* Perform serial step as early as possible, but after CCtx initialization */
ZSTDMT_serialState_update(job->serial, cctx, rawSeqStore, job->src, job->jobID);
if (!job->firstJob) { /* flush and overwrite frame header when it's not first job */
size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.capacity, job->src.start, 0);
if (ZSTD_isError(hSize)) { job->cSize = hSize; /* save error code */ goto _endJob; }
if (ZSTD_isError(hSize)) JOB_ERROR(hSize);
DEBUGLOG(5, "ZSTDMT_compressionJob: flush and overwrite %u bytes of frame header (not first job)", (U32)hSize);
ZSTD_invalidateRepCodes(cctx);
}
@ -713,12 +708,7 @@ void ZSTDMT_compressionJob(void* jobDescription)
assert(job->cSize == 0);
for (chunkNb = 1; chunkNb < nbChunks; chunkNb++) {
size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, chunkSize);
if (ZSTD_isError(cSize)) {
ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex);
job->cSize = cSize;
ZSTD_pthread_mutex_unlock(&job->job_mutex);
goto _endJob;
}
if (ZSTD_isError(cSize)) JOB_ERROR(cSize);
ip += chunkSize;
op += cSize; assert(op < oend);
/* stats */
@ -739,7 +729,7 @@ void ZSTDMT_compressionJob(void* jobDescription)
size_t const cSize = (job->lastJob) ?
ZSTD_compressEnd (cctx, op, oend-op, ip, lastBlockSize) :
ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize);
if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; }
if (ZSTD_isError(cSize)) JOB_ERROR(cSize);
lastCBlockSize = cSize;
} }
@ -1657,7 +1647,7 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u
DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one",
mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed);
ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[wJobID].dstBuff);
DEBUGLOG(5, "dstBuffer released")
DEBUGLOG(5, "dstBuffer released");
mtctx->jobs[wJobID].dstBuff = g_nullBuffer;
mtctx->jobs[wJobID].cSize = 0; /* ensure this job slot is considered "not started" in future check */
mtctx->consumed += srcSize;
@ -1880,7 +1870,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
/* It is only possible for this operation to fail if there are
* still compression jobs ongoing.
*/
DEBUGLOG(5, "ZSTDMT_tryGetInputRange failed")
DEBUGLOG(5, "ZSTDMT_tryGetInputRange failed");
assert(mtctx->doneJobID != mtctx->nextJobID);
} else
DEBUGLOG(5, "ZSTDMT_tryGetInputRange completed successfully : mtctx->inBuff.buffer.start = %p", mtctx->inBuff.buffer.start);

View File

@ -892,6 +892,9 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
assert(zfp.produced >= previous_zfp_update.produced);
assert(g_nbWorkers >= 1);
/* test if output speed is so slow that all buffers are full
* and no further progress is possible
* (neither compression nor adding more input into internal buffers) */
if ( (zfp.ingested == previous_zfp_update.ingested) /* no data read : input buffer full */
&& (zfp.consumed == previous_zfp_update.consumed) /* no data compressed : no more buffer to compress OR compression is really slow */
&& (zfp.nbActiveWorkers == 0) /* confirmed : no compression : either no more buffer to compress OR not enough data to start first worker */