From aa79c18e3f25cb660bfadee9e0b3b57a3a5d2d08 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Wed, 17 Jan 2018 17:18:19 -0800 Subject: [PATCH] fixed a few access contention passes thread sanitizer test --- lib/compress/zstdmt_compress.c | 12 ++++++++++-- programs/fileio.c | 4 ++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 08171f5f..5624b5e3 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -390,15 +390,17 @@ void ZSTDMT_compressChunk(void* jobDescription) BYTE* oend = op + dstBuff.size; int blockNb; DEBUGLOG(5, "ZSTDMT_compressChunk: compress %u bytes in %i blocks", (U32)job->srcSize, nbBlocks); - job->cSize = 0; + assert(job->cSize == 0); for (blockNb = 1; blockNb < nbBlocks; blockNb++) { size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, ZSTD_BLOCKSIZE_MAX); if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; } ip += ZSTD_BLOCKSIZE_MAX; op += cSize; assert(op < oend); /* stats */ + ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); /* note : it's a mtctx mutex */ job->cSize += cSize; job->consumed = ZSTD_BLOCKSIZE_MAX * blockNb; + ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex); } /* last block */ if ((nbBlocks > 0) | job->lastChunk /*need to output a "last block" flag*/ ) { @@ -409,9 +411,11 @@ void ZSTDMT_compressChunk(void* jobDescription) ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize); if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; } /* stats */ + ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); /* note : it's a mtctx mutex */ job->cSize += cSize; + job->consumed = job->srcSize; + ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex); } - job->consumed = job->srcSize; } #endif @@ -422,6 +426,7 @@ _endJob: job->src = g_nullBuffer; job->srcStart = NULL; /* report */ ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); + job->consumed = job->srcSize; job->jobCompleted = 1; job->jobScanned = 0; ZSTD_pthread_cond_signal(job->jobCompleted_cond); @@ -757,6 +762,8 @@ static size_t ZSTDMT_compress_advanced_internal( mtctx->jobs[u].srcStart = srcStart + frameStartPos - dictSize; mtctx->jobs[u].prefixSize = dictSize; mtctx->jobs[u].srcSize = chunkSize; + mtctx->jobs[u].consumed = 0; + mtctx->jobs[u].cSize = 0; mtctx->jobs[u].cdict = (u==0) ? cdict : NULL; mtctx->jobs[u].fullFrameSize = srcSize; mtctx->jobs[u].params = jobParams; @@ -996,6 +1003,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start; zcs->jobs[jobID].srcSize = srcSize; zcs->jobs[jobID].consumed = 0; + zcs->jobs[jobID].cSize = 0; zcs->jobs[jobID].prefixSize = zcs->prefixSize; assert(zcs->inBuff.filled >= srcSize + zcs->prefixSize); zcs->jobs[jobID].params = zcs->params; diff --git a/programs/fileio.c b/programs/fileio.c index 6a7bb1b0..18552aa3 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -813,8 +813,8 @@ static int FIO_compressFilename_internal(cRess_t ress, } } #if 1 - if (READY_FOR_UPDATE) - { ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx); + if (READY_FOR_UPDATE) { + ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx); DISPLAYUPDATE(2, "\rRead :%6u MB - Consumed :%6u MB - Compressed :%6u MB => %.2f%%", (U32)(zfp.ingested >> 20), (U32)(zfp.consumed >> 20),