zstdmt : simplify job creation

job will not be created when not enough room within job Table
dev
Yann Collet 2018-01-19 13:19:59 -08:00
parent dc69623453
commit 940634a610
2 changed files with 16 additions and 19 deletions

View File

@ -1003,6 +1003,10 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
{ {
unsigned const jobID = zcs->nextJobID & zcs->jobIDMask; unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
unsigned const limitID = zcs->doneJobID & zcs->jobIDMask;
if ((zcs->doneJobID < zcs->nextJobID) & (jobID == limitID))
return 0; /* new job would overwrite unflushed older job */
if (!zcs->jobReady) { if (!zcs->jobReady) {
DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ", DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ",
zcs->nextJobID, (U32)srcSize, (U32)zcs->prefixSize); zcs->nextJobID, (U32)srcSize, (U32)zcs->prefixSize);
@ -1210,27 +1214,15 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
} } } }
if ( (mtctx->jobReady) if ( (mtctx->jobReady)
|| ( (mtctx->inBuff.filled >= newJobThreshold) /* filled enough : let's compress */ || (mtctx->inBuff.filled >= newJobThreshold) /* filled enough : let's compress */
&& (mtctx->nextJobID <= mtctx->doneJobID + mtctx->jobIDMask) ) ) { /* avoid overwriting job round buffer */ || ( (endOp != ZSTD_e_continue) && (mtctx->inBuff.filled > 0) ) ) { /* avoid overwriting job round buffer */
CHECK_F( ZSTDMT_createCompressionJob(mtctx, mtctx->targetSectionSize, 0 /* endFrame */) ); CHECK_F( ZSTDMT_createCompressionJob(mtctx, mtctx->targetSectionSize, 0 /* endFrame */) );
} }
/* check for potential compressed data ready to be flushed */ /* check for potential compressed data ready to be flushed */
{ size_t const remainingToFlush = ZSTDMT_flushNextJob(mtctx, output, !forwardInputProgress); /* block if there was no forward input progress */ { size_t const remainingToFlush = ZSTDMT_flushNextJob(mtctx, output, !forwardInputProgress); /* block if there was no forward input progress */
if (input->pos < input->size) return MAX(remainingToFlush, 1); /* input not consumed : do not flush yet */ if (input->pos < input->size) return MAX(remainingToFlush, 1); /* input not consumed : do not flush yet */
if (mtctx->jobReady) return remainingToFlush; /* some more input ready to be compressed */ return remainingToFlush;
switch(endOp)
{
case ZSTD_e_flush:
return ZSTDMT_flushStream(mtctx, output);
case ZSTD_e_end:
return ZSTDMT_endStream(mtctx, output);
case ZSTD_e_continue:
return 1;
default:
return ERROR(GENERIC); /* invalid endDirective */
}
} }
} }
@ -1249,8 +1241,9 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* ou
size_t const srcSize = mtctx->inBuff.filled - mtctx->prefixSize; size_t const srcSize = mtctx->inBuff.filled - mtctx->prefixSize;
DEBUGLOG(5, "ZSTDMT_flushStream_internal"); DEBUGLOG(5, "ZSTDMT_flushStream_internal");
if ( (mtctx->jobReady || (srcSize > 0) || (endFrame && !mtctx->frameEnded)) if ( mtctx->jobReady /* one job ready for a worker to pick up */
&& (mtctx->nextJobID <= mtctx->doneJobID + mtctx->jobIDMask) ) { || (srcSize > 0) /* still some data within input buffer */
|| (endFrame && !mtctx->frameEnded)) { /* need a last 0-size block to end frame */
DEBUGLOG(5, "ZSTDMT_flushStream_internal : create a new job"); DEBUGLOG(5, "ZSTDMT_flushStream_internal : create a new job");
CHECK_F( ZSTDMT_createCompressionJob(mtctx, srcSize, endFrame) ); CHECK_F( ZSTDMT_createCompressionJob(mtctx, srcSize, endFrame) );
} }

View File

@ -863,6 +863,10 @@ static size_t findDiff(const void* buf1, const void* buf2, size_t max)
for (u=0; u<max; u++) { for (u=0; u<max; u++) {
if (b1[u] != b2[u]) break; if (b1[u] != b2[u]) break;
} }
if (u==max) {
DISPLAY("=> No difference detected within %u bytes \n", (U32)max);
return u;
}
DISPLAY("Error at position %u / %u \n", (U32)u, (U32)max); DISPLAY("Error at position %u / %u \n", (U32)u, (U32)max);
if (u>=3) if (u>=3)
DISPLAY(" %02X %02X %02X ", DISPLAY(" %02X %02X %02X ",
@ -1352,8 +1356,8 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
outBuff.size = outBuff.pos + dstBuffSize; outBuff.size = outBuff.pos + dstBuffSize;
DISPLAYLEVEL(6, "ZSTD_decompressStream input %u bytes \n", (U32)readCSrcSize); DISPLAYLEVEL(6, "ZSTD_decompressStream input %u bytes \n", (U32)readCSrcSize);
decompressionResult = ZSTD_decompressStream(zd, &outBuff, &inBuff); decompressionResult = ZSTD_decompressStream(zd, &outBuff, &inBuff);
if (ZSTD_getErrorCode(decompressionResult) == ZSTD_error_corruption_detected) { if (ZSTD_isError(decompressionResult)) {
DISPLAY("ZSTD_decompressStream: checksum error : \n"); DISPLAY("ZSTD_decompressStream error : %s \n", ZSTD_getErrorName(decompressionResult));
findDiff(copyBuffer, dstBuffer, totalTestSize); findDiff(copyBuffer, dstBuffer, totalTestSize);
} }
CHECK (ZSTD_isError(decompressionResult), "decompression error : %s", ZSTD_getErrorName(decompressionResult)); CHECK (ZSTD_isError(decompressionResult), "decompression error : %s", ZSTD_getErrorName(decompressionResult));