diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 303bc1f7..5b253f16 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -1003,6 +1003,10 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi { unsigned const jobID = zcs->nextJobID & zcs->jobIDMask; + unsigned const limitID = zcs->doneJobID & zcs->jobIDMask; + if ((zcs->doneJobID < zcs->nextJobID) & (jobID == limitID)) + return 0; /* new job would overwrite unflushed older job */ + if (!zcs->jobReady) { DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ", zcs->nextJobID, (U32)srcSize, (U32)zcs->prefixSize); @@ -1210,27 +1214,15 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, } } if ( (mtctx->jobReady) - || ( (mtctx->inBuff.filled >= newJobThreshold) /* filled enough : let's compress */ - && (mtctx->nextJobID <= mtctx->doneJobID + mtctx->jobIDMask) ) ) { /* avoid overwriting job round buffer */ + || (mtctx->inBuff.filled >= newJobThreshold) /* filled enough : let's compress */ + || ( (endOp != ZSTD_e_continue) && (mtctx->inBuff.filled > 0) ) ) { /* avoid overwriting job round buffer */ CHECK_F( ZSTDMT_createCompressionJob(mtctx, mtctx->targetSectionSize, 0 /* endFrame */) ); } /* check for potential compressed data ready to be flushed */ { size_t const remainingToFlush = ZSTDMT_flushNextJob(mtctx, output, !forwardInputProgress); /* block if there was no forward input progress */ if (input->pos < input->size) return MAX(remainingToFlush, 1); /* input not consumed : do not flush yet */ - if (mtctx->jobReady) return remainingToFlush; /* some more input ready to be compressed */ - - switch(endOp) - { - case ZSTD_e_flush: - return ZSTDMT_flushStream(mtctx, output); - case ZSTD_e_end: - return ZSTDMT_endStream(mtctx, output); - case ZSTD_e_continue: - return 1; - default: - return ERROR(GENERIC); /* invalid endDirective */ - } + return remainingToFlush; } } @@ -1249,8 +1241,9 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* ou size_t const srcSize = mtctx->inBuff.filled - mtctx->prefixSize; DEBUGLOG(5, "ZSTDMT_flushStream_internal"); - if ( (mtctx->jobReady || (srcSize > 0) || (endFrame && !mtctx->frameEnded)) - && (mtctx->nextJobID <= mtctx->doneJobID + mtctx->jobIDMask) ) { + if ( mtctx->jobReady /* one job ready for a worker to pick up */ + || (srcSize > 0) /* still some data within input buffer */ + || (endFrame && !mtctx->frameEnded)) { /* need a last 0-size block to end frame */ DEBUGLOG(5, "ZSTDMT_flushStream_internal : create a new job"); CHECK_F( ZSTDMT_createCompressionJob(mtctx, srcSize, endFrame) ); } diff --git a/tests/zstreamtest.c b/tests/zstreamtest.c index b21611ab..6ec14bb8 100644 --- a/tests/zstreamtest.c +++ b/tests/zstreamtest.c @@ -863,6 +863,10 @@ static size_t findDiff(const void* buf1, const void* buf2, size_t max) for (u=0; u No difference detected within %u bytes \n", (U32)max); + return u; + } DISPLAY("Error at position %u / %u \n", (U32)u, (U32)max); if (u>=3) DISPLAY(" %02X %02X %02X ", @@ -1352,8 +1356,8 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp outBuff.size = outBuff.pos + dstBuffSize; DISPLAYLEVEL(6, "ZSTD_decompressStream input %u bytes \n", (U32)readCSrcSize); decompressionResult = ZSTD_decompressStream(zd, &outBuff, &inBuff); - if (ZSTD_getErrorCode(decompressionResult) == ZSTD_error_corruption_detected) { - DISPLAY("ZSTD_decompressStream: checksum error : \n"); + if (ZSTD_isError(decompressionResult)) { + DISPLAY("ZSTD_decompressStream error : %s \n", ZSTD_getErrorName(decompressionResult)); findDiff(copyBuffer, dstBuffer, totalTestSize); } CHECK (ZSTD_isError(decompressionResult), "decompression error : %s", ZSTD_getErrorName(decompressionResult));