zstdmt : finally vanquished an elusive and rare race condition

This commit is contained in:
Yann Collet 2018-01-19 17:35:08 -08:00
parent 940634a610
commit 3ad7d4951c
2 changed files with 97 additions and 88 deletions

View File

@ -1003,9 +1003,11 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
{
unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
unsigned const limitID = zcs->doneJobID & zcs->jobIDMask;
if ((zcs->doneJobID < zcs->nextJobID) & (jobID == limitID))
return 0; /* new job would overwrite unflushed older job */
if (zcs->nextJobID > zcs->doneJobID + zcs->jobIDMask) {
DEBUGLOG(5, "ZSTDMT_createCompressionJob: will not create new job : table is full");
assert((zcs->nextJobID & zcs->jobIDMask) == (zcs->doneJobID & zcs->jobIDMask));
return 0;
}
if (!zcs->jobReady) {
DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ",
@ -1079,76 +1081,78 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
}
/*! ZSTDMT_flushNextJob() :
* `output` : will be updated with amount of data flushed .
/*! ZSTDMT_flushProduced() :
* `output` : `pos` will be updated with amount of data flushed .
* `blockToFlush` : if >0, the function will block and wait if there is no data available to flush .
* @return : amount of data remaining within internal buffer, 0 if no more, 1 if unknown but > 0, or an error code */
static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush)
static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush)
{
unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
DEBUGLOG(5, "ZSTDMT_flushNextJob (blocking:%u)", blockToFlush);
if (zcs->doneJobID == zcs->nextJobID) {
DEBUGLOG(5, "ZSTDMT_flushNextJob: doneJobID(%u)==(%u)nextJobID : nothing to flush !",
zcs->doneJobID, zcs->nextJobID)
return 0; /* all flushed ! */
}
DEBUGLOG(5, "ZSTDMT_flushProduced (blocking:%u)", blockToFlush);
assert(output->size >= output->pos);
ZSTD_PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
while (zcs->jobs[wJobID].dstFlushed == zcs->jobs[wJobID].cSize) {
if (!blockToFlush) { ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex); return 0; } /* nothing ready to be flushed => skip */
if (zcs->jobs[wJobID].jobCompleted==1) break;
DEBUGLOG(5, "waiting for something to flush from job %u (currently flushed: %u bytes)",
zcs->doneJobID, (U32)zcs->jobs[wJobID].dstFlushed);
ZSTD_pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); /* block when nothing available to flush but more to come */
}
if (blockToFlush && (zcs->doneJobID < zcs->nextJobID)) {
while (zcs->jobs[wJobID].dstFlushed == zcs->jobs[wJobID].cSize) {
if (zcs->jobs[wJobID].jobCompleted==1) break;
DEBUGLOG(5, "waiting for something to flush from job %u (currently flushed: %u bytes)",
zcs->doneJobID, (U32)zcs->jobs[wJobID].dstFlushed);
ZSTD_pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); /* block when nothing available to flush but more to come */
} }
/* some output is available to be flushed */
{ ZSTDMT_jobDescription job = zcs->jobs[wJobID];
ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex);
if (ZSTD_isError(job.cSize)) {
DEBUGLOG(5, "ZSTDMT_flushNextJob: job %u : compression error detected : %s",
DEBUGLOG(5, "ZSTDMT_flushProduced: job %u : compression error detected : %s",
zcs->doneJobID, ZSTD_getErrorName(job.cSize));
ZSTDMT_waitForAllJobsCompleted(zcs);
ZSTDMT_releaseAllJobResources(zcs);
return job.cSize;
}
/* add frame checksum if necessary */
/* add frame checksum if necessary (can only happen once) */
if ( job.jobCompleted
&& job.frameChecksumNeeded ) {
U32 const checksum = (U32)XXH64_digest(&zcs->xxhState);
DEBUGLOG(5, "ZSTDMT_flushNextJob: writing checksum : %08X \n", checksum);
DEBUGLOG(5, "ZSTDMT_flushProduced: writing checksum : %08X \n", checksum);
MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum);
job.cSize += 4;
zcs->jobs[wJobID].cSize += 4;
zcs->jobs[wJobID].frameChecksumNeeded = 0;
}
assert(job.cSize >= job.dstFlushed);
{ size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
DEBUGLOG(5, "ZSTDMT_flushNextJob: Flushing %u bytes from job %u (completion:%.1f%%)",
if (job.dstBuff.start != NULL) { /* one buffer present : some job is ongoing */
size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%.1f%%)",
(U32)toWrite, zcs->doneJobID, (double)job.consumed / job.srcSize * 100);
memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
output->pos += toWrite;
job.dstFlushed += toWrite;
}
if ( job.jobCompleted
&& (job.dstFlushed == job.cSize) ) { /* output buffer fully flushed => move to next one */
DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one",
zcs->doneJobID, (U32)job.dstFlushed);
ZSTDMT_releaseBuffer(zcs->bufPool, job.dstBuff);
zcs->jobs[wJobID].dstBuff = g_nullBuffer;
zcs->jobs[wJobID].jobCompleted = 0;
zcs->consumed += job.srcSize;
zcs->produced += job.cSize;
zcs->doneJobID++;
} else {
zcs->jobs[wJobID].dstFlushed = job.dstFlushed;
}
if ( job.jobCompleted
&& (job.dstFlushed == job.cSize) ) { /* output buffer fully flushed => move to next one */
DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one",
zcs->doneJobID, (U32)job.dstFlushed);
ZSTDMT_releaseBuffer(zcs->bufPool, job.dstBuff);
zcs->jobs[wJobID].dstBuff = g_nullBuffer;
zcs->jobs[wJobID].jobCompleted = 0;
zcs->consumed += job.srcSize;
zcs->produced += job.cSize;
zcs->doneJobID++;
} else {
zcs->jobs[wJobID].dstFlushed = job.dstFlushed; /* remember how much was flushed for next attempt */
} }
/* return value : how many bytes left in buffer ; fake it to 1 when unknown but >0 */
if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed);
if (zcs->doneJobID < zcs->nextJobID) return 1; /* still some more buffer to flush */
if (zcs->jobReady) return 1; /* some more work to do ! */
zcs->allJobsCompleted = zcs->frameEnded; /* last frame entirely flushed */
return 0; /* everything flushed */
} }
if (job.srcSize > job.consumed) return 1; /* current job not completely compressed */
}
if (zcs->doneJobID < zcs->nextJobID) return 1; /* some more jobs to flush */
if (zcs->jobReady) return 1; /* at least one more job to do ! */
if (zcs->inBuff.filled > 0) return 1; /* input not empty */
zcs->allJobsCompleted = zcs->frameEnded; /* last frame entirely flushed */
return 0; /* everything flushed */
}
/** ZSTDMT_compressStream_generic() :
@ -1220,7 +1224,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
}
/* check for potential compressed data ready to be flushed */
{ size_t const remainingToFlush = ZSTDMT_flushNextJob(mtctx, output, !forwardInputProgress); /* block if there was no forward input progress */
{ size_t const remainingToFlush = ZSTDMT_flushProduced(mtctx, output, !forwardInputProgress); /* block if there was no forward input progress */
if (input->pos < input->size) return MAX(remainingToFlush, 1); /* input not consumed : do not flush yet */
return remainingToFlush;
}
@ -1244,12 +1248,13 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* ou
if ( mtctx->jobReady /* one job ready for a worker to pick up */
|| (srcSize > 0) /* still some data within input buffer */
|| (endFrame && !mtctx->frameEnded)) { /* need a last 0-size block to end frame */
DEBUGLOG(5, "ZSTDMT_flushStream_internal : create a new job");
DEBUGLOG(5, "ZSTDMT_flushStream_internal : create a new job (%u bytes, end:%u)",
(U32)srcSize, endFrame);
CHECK_F( ZSTDMT_createCompressionJob(mtctx, srcSize, endFrame) );
}
/* check if there is any data available to flush */
return ZSTDMT_flushNextJob(mtctx, output, 1 /* blockToFlush */);
return ZSTDMT_flushProduced(mtctx, output, 1 /* blockToFlush */);
}

View File

@ -99,8 +99,8 @@ unsigned int FUZ_rand(unsigned int* seedPtr)
if (cond) { \
DISPLAY("Error => "); \
DISPLAY(__VA_ARGS__); \
DISPLAY(" (seed %u, test nb %u, line %u) \n", \
seed, testNb, __LINE__); \
DISPLAY(" (seed %u, test nb %u, line %u (sig %08X) \n", \
seed, testNb, __LINE__, coreSeed); \
goto _output_error; \
} }
@ -219,6 +219,7 @@ static int basicUnitTests(U32 seed, double compressibility)
size_t cSize;
int testResult = 0;
U32 testNb = 1;
U32 coreSeed = 0; /* just to conform with CHECK_Z macro display */
ZSTD_CStream* zc = ZSTD_createCStream();
ZSTD_DStream* zd = ZSTD_createDStream();
ZSTDMT_CCtx* mtctx = ZSTDMT_createCCtx(2);
@ -958,10 +959,13 @@ static int fuzzerTests(U32 seed, U32 nbTests, unsigned startTest, double compres
size_t maxTestSize;
/* init */
if (nbTests >= testNb) { DISPLAYUPDATE(2, "\r%6u/%6u ", testNb, nbTests); }
else { DISPLAYUPDATE(2, "\r%6u ", testNb); }
FUZ_rand(&coreSeed);
lseed = coreSeed ^ prime32;
if (nbTests >= testNb) {
DISPLAYUPDATE(2, "\r%6u/%6u (%08X) ", testNb, nbTests, lseed);
} else {
DISPLAYUPDATE(2, "\r%6u (%08X) ", testNb, lseed);
}
/* states full reset (deliberately not synchronized) */
/* some issues can only happen when reusing states */
@ -1171,7 +1175,6 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
UTIL_time_t const startClock = UTIL_getTime();
const BYTE* dict=NULL; /* can keep same dict on 2 consecutive tests */
size_t dictSize = 0;
U32 oldTestLog = 0;
int const cLevelMax = bigTests ? (U32)ZSTD_maxCLevel()-1 : g_cLevelMax_smallTests;
U32 const nbThreadsMax = bigTests ? 4 : 2;
@ -1193,6 +1196,7 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
RDG_genBuffer(cNoiseBuffer[4], srcBufferSize, 1.00, 0., coreSeed); /* sparse content */
memset(copyBuffer, 0x65, copyBufferSize); /* make copyBuffer considered initialized */
ZSTD_initDStream_usingDict(zd, NULL, 0); /* ensure at least one init */
DISPLAYLEVEL(6, "Creating initial context with %u threads \n", nbThreads);
/* catch up testNb */
for (testNb=1; testNb < startTest; testNb++)
@ -1205,14 +1209,14 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
size_t totalTestSize, totalGenSize, cSize;
XXH64_state_t xxhState;
U64 crcOrig;
U32 resetAllowed = 1;
size_t maxTestSize;
/* init */
if (testNb < nbTests) {
DISPLAYUPDATE(2, "\r%6u/%6u ", testNb, nbTests);
} else { DISPLAYUPDATE(2, "\r%6u ", testNb); }
FUZ_rand(&coreSeed);
if (nbTests >= testNb) {
DISPLAYUPDATE(2, "\r%6u/%6u (%08X) ", testNb, nbTests, coreSeed);
} else {
DISPLAYUPDATE(2, "\r%6u (%08X) ", testNb, coreSeed);
}
lseed = coreSeed ^ prime32;
/* states full reset (deliberately not synchronized) */
@ -1223,7 +1227,6 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
ZSTDMT_freeCCtx(zc);
zc = ZSTDMT_createCCtx(nbThreads);
CHECK(zc==NULL, "ZSTDMT_createCCtx allocation error")
resetAllowed=0;
}
if ((FUZ_rand(&lseed) & 0xFF) == 132) {
ZSTD_freeDStream(zd);
@ -1248,16 +1251,7 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
}
/* compression init */
if ((FUZ_rand(&lseed)&1) /* at beginning, to keep same nb of rand */
&& oldTestLog /* at least one test happened */ && resetAllowed) {
maxTestSize = FUZ_randomLength(&lseed, oldTestLog+2);
if (maxTestSize >= srcBufferSize) maxTestSize = srcBufferSize-1;
{ int const compressionLevel = (FUZ_rand(&lseed) % 5) + 1;
DISPLAYLEVEL(5, "Init with compression level = %i \n", compressionLevel);
CHECK_Z( ZSTDMT_initCStream(zc, compressionLevel) );
}
} else {
U32 const testLog = FUZ_rand(&lseed) % maxSrcLog;
{ U32 const testLog = FUZ_rand(&lseed) % maxSrcLog;
U32 const dictLog = FUZ_rand(&lseed) % maxSrcLog;
int const cLevelCandidate = ( FUZ_rand(&lseed)
% (ZSTD_maxCLevel() - (MAX(testLog, dictLog) / 2)) )
@ -1266,24 +1260,29 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
int const cLevelMin = MAX(cLevelThreadAdjusted, 1); /* no negative cLevel yet */
int const cLevel = MIN(cLevelMin, cLevelMax);
maxTestSize = FUZ_rLogLength(&lseed, testLog);
oldTestLog = testLog;
/* random dictionary selection */
dictSize = ((FUZ_rand(&lseed)&63)==1) ? FUZ_rLogLength(&lseed, dictLog) : 0;
{ size_t const dictStart = FUZ_rand(&lseed) % (srcBufferSize - dictSize);
dict = srcBuffer + dictStart;
}
{ U64 const pledgedSrcSize = (FUZ_rand(&lseed) & 3) ? ZSTD_CONTENTSIZE_UNKNOWN : maxTestSize;
ZSTD_parameters params = ZSTD_getParams(cLevel, pledgedSrcSize, dictSize);
DISPLAYLEVEL(5, "Init with windowLog = %u, pledgedSrcSize = %u, dictSize = %u \n",
params.cParams.windowLog, (U32)pledgedSrcSize, (U32)dictSize);
params.fParams.checksumFlag = FUZ_rand(&lseed) & 1;
params.fParams.noDictIDFlag = FUZ_rand(&lseed) & 1;
params.fParams.contentSizeFlag = FUZ_rand(&lseed) & 1;
DISPLAYLEVEL(5, "checksumFlag : %u \n", params.fParams.checksumFlag);
CHECK_Z( ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_overlapSectionLog, FUZ_rand(&lseed) % 12) );
CHECK_Z( ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_jobSize, FUZ_rand(&lseed) % (2*maxTestSize+1)) ); /* custome job size */
CHECK_Z( ZSTDMT_initCStream_advanced(zc, dict, dictSize, params, pledgedSrcSize) );
} }
if (FUZ_rand(&lseed)&1) { /* simple init */
int const compressionLevel = (FUZ_rand(&lseed) % 5) + 1;
DISPLAYLEVEL(5, "Init with compression level = %i \n", compressionLevel);
CHECK_Z( ZSTDMT_initCStream(zc, compressionLevel) );
} else { /* advanced init */
/* random dictionary selection */
dictSize = ((FUZ_rand(&lseed)&63)==1) ? FUZ_rLogLength(&lseed, dictLog) : 0;
{ size_t const dictStart = FUZ_rand(&lseed) % (srcBufferSize - dictSize);
dict = srcBuffer + dictStart;
}
{ U64 const pledgedSrcSize = (FUZ_rand(&lseed) & 3) ? ZSTD_CONTENTSIZE_UNKNOWN : maxTestSize;
ZSTD_parameters params = ZSTD_getParams(cLevel, pledgedSrcSize, dictSize);
DISPLAYLEVEL(5, "Init with windowLog = %u, pledgedSrcSize = %u, dictSize = %u \n",
params.cParams.windowLog, (U32)pledgedSrcSize, (U32)dictSize);
params.fParams.checksumFlag = FUZ_rand(&lseed) & 1;
params.fParams.noDictIDFlag = FUZ_rand(&lseed) & 1;
params.fParams.contentSizeFlag = FUZ_rand(&lseed) & 1;
DISPLAYLEVEL(5, "checksumFlag : %u \n", params.fParams.checksumFlag);
CHECK_Z( ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_overlapSectionLog, FUZ_rand(&lseed) % 12) );
CHECK_Z( ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_jobSize, FUZ_rand(&lseed) % (2*maxTestSize+1)) ); /* custome job size */
CHECK_Z( ZSTDMT_initCStream_advanced(zc, dict, dictSize, params, pledgedSrcSize) );
} } }
/* multi-segments compression test */
XXH64_reset(&xxhState, 0);
@ -1336,10 +1335,13 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
} }
crcOrig = XXH64_digest(&xxhState);
cSize = outBuff.pos;
DISPLAYLEVEL(5, "Frame completed : %u bytes \n", (U32)cSize);
DISPLAYLEVEL(5, "Frame completed : %u bytes compressed into %u bytes \n",
(U32)totalTestSize, (U32)cSize);
}
/* multi - fragments decompression test */
assert(totalTestSize < dstBufferSize);
memset(dstBuffer, 170, totalTestSize); /* init dest area */
if (!dictSize /* don't reset if dictionary : could be different */ && (FUZ_rand(&lseed) & 1)) {
CHECK_Z( ZSTD_resetDStream(zd) );
} else {
@ -1354,14 +1356,16 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
size_t const dstBuffSize = MIN(dstBufferSize - totalGenSize, randomDstSize);
inBuff.size = inBuff.pos + readCSrcSize;
outBuff.size = outBuff.pos + dstBuffSize;
DISPLAYLEVEL(6, "ZSTD_decompressStream input %u bytes \n", (U32)readCSrcSize);
DISPLAYLEVEL(6, "ZSTD_decompressStream input %u bytes into outBuff %u bytes \n",
(U32)readCSrcSize, (U32)dstBuffSize);
decompressionResult = ZSTD_decompressStream(zd, &outBuff, &inBuff);
if (ZSTD_isError(decompressionResult)) {
DISPLAY("ZSTD_decompressStream error : %s \n", ZSTD_getErrorName(decompressionResult));
findDiff(copyBuffer, dstBuffer, totalTestSize);
}
CHECK (ZSTD_isError(decompressionResult), "decompression error : %s", ZSTD_getErrorName(decompressionResult));
DISPLAYLEVEL(6, "total ingested (inBuff.pos) = %u \n", (U32)inBuff.pos);
DISPLAYLEVEL(6, "total ingested (inBuff.pos) = %u and produced (outBuff.pos) = %u \n",
(U32)inBuff.pos, (U32)outBuff.pos);
}
CHECK (outBuff.pos != totalTestSize, "decompressed data : wrong size (%u != %u)", (U32)outBuff.pos, (U32)totalTestSize);
CHECK (inBuff.pos != cSize, "compressed data should be fully read (%u != %u)", (U32)inBuff.pos, (U32)cSize);