fixed minor reporting discrepancy in MT mode

dev
Yann Collet 2018-09-19 16:30:55 -07:00
parent ca02ebee07
commit 6b07a66aec
1 changed files with 15 additions and 7 deletions

View File

@ -712,7 +712,12 @@ void ZSTDMT_compressionJob(void* jobDescription)
assert(job->cSize == 0);
for (chunkNb = 1; chunkNb < nbChunks; chunkNb++) {
size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, chunkSize);
if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; }
if (ZSTD_isError(cSize)) {
ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex);
job->cSize = cSize;
ZSTD_pthread_mutex_unlock(&job->job_mutex);
goto _endJob;
}
ip += chunkSize;
op += cSize; assert(op < oend);
/* stats */
@ -725,7 +730,8 @@ void ZSTDMT_compressionJob(void* jobDescription)
ZSTD_pthread_mutex_unlock(&job->job_mutex);
}
/* last block */
assert(chunkSize > 0); assert((chunkSize & (chunkSize - 1)) == 0); /* chunkSize must be power of 2 for mask==(chunkSize-1) to work */
assert(chunkSize > 0);
assert((chunkSize & (chunkSize - 1)) == 0); /* chunkSize must be power of 2 for mask==(chunkSize-1) to work */
if ((nbChunks > 0) | job->lastJob /*must output a "last block" flag*/ ) {
size_t const lastBlockSize1 = job->src.size & (chunkSize-1);
size_t const lastBlockSize = ((lastBlockSize1==0) & (job->src.size>=chunkSize)) ? chunkSize : lastBlockSize1;
@ -736,6 +742,7 @@ void ZSTDMT_compressionJob(void* jobDescription)
/* stats */
ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex);
job->cSize += cSize;
job->consumed = job->src.size;
ZSTD_pthread_mutex_unlock(&job->job_mutex);
} }
@ -748,10 +755,7 @@ _endJob:
ZSTDMT_releaseSeq(job->seqPool, rawSeqStore);
ZSTDMT_releaseCCtx(job->cctxPool, cctx);
/* report */
ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex);
job->consumed = job->src.size;
ZSTD_pthread_cond_signal(&job->job_cond);
ZSTD_pthread_mutex_unlock(&job->job_mutex);
}
@ -1119,15 +1123,19 @@ size_t ZSTDMT_toFlushNow(ZSTDMT_CCtx* mtctx)
assert(jobID <= mtctx->nextJobID);
if (jobID == mtctx->nextJobID) return 0; /* no active job => nothing to flush */
/* look into oldest non-fully-flushed job */
{ unsigned const wJobID = jobID & mtctx->jobIDMask;
ZSTDMT_jobDescription* jobPtr = &mtctx->jobs[wJobID];
ZSTDMT_jobDescription* const jobPtr = &mtctx->jobs[wJobID];
ZSTD_pthread_mutex_lock(&jobPtr->job_mutex);
{ size_t const cResult = jobPtr->cSize;
size_t const produced = ZSTD_isError(cResult) ? 0 : cResult;
size_t const flushed = ZSTD_isError(cResult) ? 0 : jobPtr->dstFlushed;
assert(flushed <= produced);
toFlush = produced - flushed;
if (toFlush==0) assert(jobPtr->consumed < jobPtr->src.size); /* if toFlush==0, doneJobID should still be active: if doneJobID is completed and fully flushed, ZSTDMT_flushProduced() should have already moved to next job */
if (toFlush==0 && (jobPtr->consumed >= jobPtr->src.size)) {
/* doneJobID is not-fully-flushed, but toFlush==0 : doneJobID should be compressing some more data */
assert(jobPtr->consumed < jobPtr->src.size);
}
}
ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);
}