fixed a few access contention

passes thread sanitizer test
This commit is contained in:
Yann Collet 2018-01-17 17:18:19 -08:00
parent 394eec697b
commit aa79c18e3f
2 changed files with 12 additions and 4 deletions

View File

@ -390,15 +390,17 @@ void ZSTDMT_compressChunk(void* jobDescription)
BYTE* oend = op + dstBuff.size; BYTE* oend = op + dstBuff.size;
int blockNb; int blockNb;
DEBUGLOG(5, "ZSTDMT_compressChunk: compress %u bytes in %i blocks", (U32)job->srcSize, nbBlocks); DEBUGLOG(5, "ZSTDMT_compressChunk: compress %u bytes in %i blocks", (U32)job->srcSize, nbBlocks);
job->cSize = 0; assert(job->cSize == 0);
for (blockNb = 1; blockNb < nbBlocks; blockNb++) { for (blockNb = 1; blockNb < nbBlocks; blockNb++) {
size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, ZSTD_BLOCKSIZE_MAX); size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, ZSTD_BLOCKSIZE_MAX);
if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; } if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; }
ip += ZSTD_BLOCKSIZE_MAX; ip += ZSTD_BLOCKSIZE_MAX;
op += cSize; assert(op < oend); op += cSize; assert(op < oend);
/* stats */ /* stats */
ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); /* note : it's a mtctx mutex */
job->cSize += cSize; job->cSize += cSize;
job->consumed = ZSTD_BLOCKSIZE_MAX * blockNb; job->consumed = ZSTD_BLOCKSIZE_MAX * blockNb;
ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex);
} }
/* last block */ /* last block */
if ((nbBlocks > 0) | job->lastChunk /*need to output a "last block" flag*/ ) { if ((nbBlocks > 0) | job->lastChunk /*need to output a "last block" flag*/ ) {
@ -409,9 +411,11 @@ void ZSTDMT_compressChunk(void* jobDescription)
ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize); ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize);
if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; } if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; }
/* stats */ /* stats */
ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); /* note : it's a mtctx mutex */
job->cSize += cSize; job->cSize += cSize;
}
job->consumed = job->srcSize; job->consumed = job->srcSize;
ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex);
}
} }
#endif #endif
@ -422,6 +426,7 @@ _endJob:
job->src = g_nullBuffer; job->srcStart = NULL; job->src = g_nullBuffer; job->srcStart = NULL;
/* report */ /* report */
ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex);
job->consumed = job->srcSize;
job->jobCompleted = 1; job->jobCompleted = 1;
job->jobScanned = 0; job->jobScanned = 0;
ZSTD_pthread_cond_signal(job->jobCompleted_cond); ZSTD_pthread_cond_signal(job->jobCompleted_cond);
@ -757,6 +762,8 @@ static size_t ZSTDMT_compress_advanced_internal(
mtctx->jobs[u].srcStart = srcStart + frameStartPos - dictSize; mtctx->jobs[u].srcStart = srcStart + frameStartPos - dictSize;
mtctx->jobs[u].prefixSize = dictSize; mtctx->jobs[u].prefixSize = dictSize;
mtctx->jobs[u].srcSize = chunkSize; mtctx->jobs[u].srcSize = chunkSize;
mtctx->jobs[u].consumed = 0;
mtctx->jobs[u].cSize = 0;
mtctx->jobs[u].cdict = (u==0) ? cdict : NULL; mtctx->jobs[u].cdict = (u==0) ? cdict : NULL;
mtctx->jobs[u].fullFrameSize = srcSize; mtctx->jobs[u].fullFrameSize = srcSize;
mtctx->jobs[u].params = jobParams; mtctx->jobs[u].params = jobParams;
@ -996,6 +1003,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start; zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
zcs->jobs[jobID].srcSize = srcSize; zcs->jobs[jobID].srcSize = srcSize;
zcs->jobs[jobID].consumed = 0; zcs->jobs[jobID].consumed = 0;
zcs->jobs[jobID].cSize = 0;
zcs->jobs[jobID].prefixSize = zcs->prefixSize; zcs->jobs[jobID].prefixSize = zcs->prefixSize;
assert(zcs->inBuff.filled >= srcSize + zcs->prefixSize); assert(zcs->inBuff.filled >= srcSize + zcs->prefixSize);
zcs->jobs[jobID].params = zcs->params; zcs->jobs[jobID].params = zcs->params;

View File

@ -813,8 +813,8 @@ static int FIO_compressFilename_internal(cRess_t ress,
} }
} }
#if 1 #if 1
if (READY_FOR_UPDATE) if (READY_FOR_UPDATE) {
{ ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx); ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);
DISPLAYUPDATE(2, "\rRead :%6u MB - Consumed :%6u MB - Compressed :%6u MB => %.2f%%", DISPLAYUPDATE(2, "\rRead :%6u MB - Consumed :%6u MB - Compressed :%6u MB => %.2f%%",
(U32)(zfp.ingested >> 20), (U32)(zfp.ingested >> 20),
(U32)(zfp.consumed >> 20), (U32)(zfp.consumed >> 20),