Merge pull request #2414 from terrelln/mt-progress
[lib] Ensure that multithreaded compression always makes some progress
This commit is contained in:
commit
c238db046f
@ -4441,26 +4441,42 @@ size_t ZSTD_compressStream2( ZSTD_CCtx* cctx,
|
|||||||
/* compression stage */
|
/* compression stage */
|
||||||
#ifdef ZSTD_MULTITHREAD
|
#ifdef ZSTD_MULTITHREAD
|
||||||
if (cctx->appliedParams.nbWorkers > 0) {
|
if (cctx->appliedParams.nbWorkers > 0) {
|
||||||
int const forceMaxProgress = (endOp == ZSTD_e_flush || endOp == ZSTD_e_end);
|
|
||||||
size_t flushMin;
|
size_t flushMin;
|
||||||
assert(forceMaxProgress || endOp == ZSTD_e_continue /* Protection for a new flush type */);
|
|
||||||
if (cctx->cParamsChanged) {
|
if (cctx->cParamsChanged) {
|
||||||
ZSTDMT_updateCParams_whileCompressing(cctx->mtctx, &cctx->requestedParams);
|
ZSTDMT_updateCParams_whileCompressing(cctx->mtctx, &cctx->requestedParams);
|
||||||
cctx->cParamsChanged = 0;
|
cctx->cParamsChanged = 0;
|
||||||
}
|
}
|
||||||
do {
|
for (;;) {
|
||||||
|
size_t const ipos = input->pos;
|
||||||
|
size_t const opos = output->pos;
|
||||||
flushMin = ZSTDMT_compressStream_generic(cctx->mtctx, output, input, endOp);
|
flushMin = ZSTDMT_compressStream_generic(cctx->mtctx, output, input, endOp);
|
||||||
if ( ZSTD_isError(flushMin)
|
if ( ZSTD_isError(flushMin)
|
||||||
|| (endOp == ZSTD_e_end && flushMin == 0) ) { /* compression completed */
|
|| (endOp == ZSTD_e_end && flushMin == 0) ) { /* compression completed */
|
||||||
ZSTD_CCtx_reset(cctx, ZSTD_reset_session_only);
|
ZSTD_CCtx_reset(cctx, ZSTD_reset_session_only);
|
||||||
}
|
}
|
||||||
FORWARD_IF_ERROR(flushMin, "ZSTDMT_compressStream_generic failed");
|
FORWARD_IF_ERROR(flushMin, "ZSTDMT_compressStream_generic failed");
|
||||||
} while (forceMaxProgress && flushMin != 0 && output->pos < output->size);
|
|
||||||
|
if (endOp == ZSTD_e_continue) {
|
||||||
|
/* We only require some progress with ZSTD_e_continue, not maximal progress.
|
||||||
|
* We're done if we've consumed or produced any bytes, or either buffer is
|
||||||
|
* full.
|
||||||
|
*/
|
||||||
|
if (input->pos != ipos || output->pos != opos || input->pos == input->size || output->pos == output->size)
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
assert(endOp == ZSTD_e_flush || endOp == ZSTD_e_end);
|
||||||
|
/* We require maximal progress. We're done when the flush is complete or the
|
||||||
|
* output buffer is full.
|
||||||
|
*/
|
||||||
|
if (flushMin == 0 || output->pos == output->size)
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
DEBUGLOG(5, "completed ZSTD_compressStream2 delegating to ZSTDMT_compressStream_generic");
|
DEBUGLOG(5, "completed ZSTD_compressStream2 delegating to ZSTDMT_compressStream_generic");
|
||||||
/* Either we don't require maximum forward progress, we've finished the
|
/* Either we don't require maximum forward progress, we've finished the
|
||||||
* flush, or we are out of output space.
|
* flush, or we are out of output space.
|
||||||
*/
|
*/
|
||||||
assert(!forceMaxProgress || flushMin == 0 || output->pos == output->size);
|
assert(endOp == ZSTD_e_continue || flushMin == 0 || output->pos == output->size);
|
||||||
ZSTD_setBufferExpectations(cctx, output, input);
|
ZSTD_setBufferExpectations(cctx, output, input);
|
||||||
return flushMin;
|
return flushMin;
|
||||||
}
|
}
|
||||||
|
@ -2299,6 +2299,7 @@ static int fuzzerTests_newAPI(U32 seed, int nbTests, int startTest,
|
|||||||
/* Ensure maximal forward progress for determinism */
|
/* Ensure maximal forward progress for determinism */
|
||||||
forwardProgress = (inBuff.pos != ipos) || (outBuff.pos != opos);
|
forwardProgress = (inBuff.pos != ipos) || (outBuff.pos != opos);
|
||||||
} while (forwardProgress);
|
} while (forwardProgress);
|
||||||
|
assert(inBuff.pos == inBuff.size);
|
||||||
|
|
||||||
XXH64_update(&xxhState, srcBuffer+srcStart, inBuff.pos);
|
XXH64_update(&xxhState, srcBuffer+srcStart, inBuff.pos);
|
||||||
memcpy(copyBuffer+totalTestSize, srcBuffer+srcStart, inBuff.pos);
|
memcpy(copyBuffer+totalTestSize, srcBuffer+srcStart, inBuff.pos);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user