diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index d5193d52..74f9dc29 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -1080,9 +1080,9 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx) { ZSTD_frameProgression fps; DEBUGLOG(6, "ZSTDMT_getFrameProgression"); + fps.ingested = mtctx->consumed + mtctx->inBuff.filled; fps.consumed = mtctx->consumed; fps.produced = mtctx->produced; - fps.ingested = mtctx->consumed + mtctx->inBuff.filled; { unsigned jobNb; unsigned lastJobNb = mtctx->nextJobID + mtctx->jobReady; assert(mtctx->jobReady <= 1); DEBUGLOG(6, "ZSTDMT_getFrameProgression: jobs: from %u to <%u (jobReady:%u)", @@ -1092,8 +1092,8 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx) ZSTD_pthread_mutex_lock(&mtctx->jobs[wJobID].job_mutex); { size_t const cResult = mtctx->jobs[wJobID].cSize; size_t const produced = ZSTD_isError(cResult) ? 0 : cResult; - fps.consumed += mtctx->jobs[wJobID].consumed; fps.ingested += mtctx->jobs[wJobID].src.size; + fps.consumed += mtctx->jobs[wJobID].consumed; fps.produced += produced; } ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex); @@ -1545,6 +1545,8 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS /*! ZSTDMT_flushProduced() : + * flush whatever data has been produced but not yet flushed in current job. + * move to next job if current one is fully flushed. * `output` : `pos` will be updated with amount of data flushed . * `blockToFlush` : if >0, the function will block and wait if there is no data available to flush . * @return : amount of data remaining within internal buffer, 0 if no more, 1 if unknown but > 0, or an error code */ @@ -1593,6 +1595,7 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u mtctx->jobs[wJobID].cSize += 4; /* can write this shared value, as worker is no longer active */ mtctx->jobs[wJobID].frameChecksumNeeded = 0; } + if (cSize > 0) { /* compression is ongoing or completed */ size_t const toFlush = MIN(cSize - mtctx->jobs[wJobID].dstFlushed, output->size - output->pos); DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%u/%u, generated:%u)", @@ -1606,7 +1609,7 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u output->pos += toFlush; mtctx->jobs[wJobID].dstFlushed += toFlush; /* can write : this value is only used by mtctx */ - if ( (srcConsumed == srcSize) /* job completed */ + if ( (srcConsumed == srcSize) /* job is completed */ && (mtctx->jobs[wJobID].dstFlushed == cSize) ) { /* output buffer fully flushed => free this job position */ DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one", mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed); diff --git a/programs/fileio.c b/programs/fileio.c index 85367fdf..c1587f8f 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -727,11 +727,6 @@ static unsigned long long FIO_compressLz4Frame(cRess_t* ress, #endif -/*! FIO_compressFilename_internal() : - * same as FIO_compressFilename_extRess(), with `ress.desFile` already opened. - * @return : 0 : compression completed correctly, - * 1 : missing or pb opening srcFileName - */ static unsigned long long FIO_compressZstdFrame(const cRess_t* ressPtr, const char* srcFileName, U64 fileSize, @@ -763,7 +758,8 @@ FIO_compressZstdFrame(const cRess_t* ressPtr, directive = ZSTD_e_end; result = 1; - while (inBuff.pos != inBuff.size || (directive == ZSTD_e_end && result != 0)) { + while ((inBuff.pos != inBuff.size) /* input buffer must be entirely ingested */ + || (directive == ZSTD_e_end && result != 0) ) { ZSTD_outBuffer outBuff = { ress.dstBuffer, ress.dstBufferSize, 0 }; CHECK_V(result, ZSTD_compress_generic(ress.cctx, &outBuff, &inBuff, directive)); @@ -786,7 +782,8 @@ FIO_compressZstdFrame(const cRess_t* ressPtr, (U32)(zfp.consumed >> 20), (U32)(zfp.produced >> 20), cShare ); - } else { /* g_displayLevel == 2 */ + } else { + assert(g_displayLevel == 2); DISPLAYLEVEL(2, "\rRead : %u ", (U32)(zfp.consumed >> 20)); if (fileSize != UTIL_FILESIZE_UNKNOWN) DISPLAYLEVEL(2, "/ %u ", (U32)(fileSize >> 20));