[zstdmt] Fix jobsize bugs (#1205)
[zstdmt] Fix jobsize bugs * `ZSTDMT_serialState_reset()` should use `targetSectionSize`, not `jobSize` when sizing the seqstore. Add an assert that checks that we sized the seqstore using the right job size. * `ZSTDMT_compressionJob()` should check if `rawSeqStore.seq == NULL`. * `ZSTDMT_initCStream_internal()` should not adjust `mtctx->params.jobSize` (clamping to MIN/MAX is okay).
This commit is contained in:
parent
3b53bfe4f3
commit
b426bcc097
@ -459,7 +459,7 @@ typedef struct {
|
|||||||
ZSTD_window_t ldmWindow; /* A thread-safe copy of ldmState.window */
|
ZSTD_window_t ldmWindow; /* A thread-safe copy of ldmState.window */
|
||||||
} serialState_t;
|
} serialState_t;
|
||||||
|
|
||||||
static int ZSTDMT_serialState_reset(serialState_t* serialState, ZSTDMT_seqPool* seqPool, ZSTD_CCtx_params params)
|
static int ZSTDMT_serialState_reset(serialState_t* serialState, ZSTDMT_seqPool* seqPool, ZSTD_CCtx_params params, size_t jobSize)
|
||||||
{
|
{
|
||||||
/* Adjust parameters */
|
/* Adjust parameters */
|
||||||
if (params.ldmParams.enableLdm) {
|
if (params.ldmParams.enableLdm) {
|
||||||
@ -486,7 +486,7 @@ static int ZSTDMT_serialState_reset(serialState_t* serialState, ZSTDMT_seqPool*
|
|||||||
serialState->params.ldmParams.hashLog -
|
serialState->params.ldmParams.hashLog -
|
||||||
serialState->params.ldmParams.bucketSizeLog;
|
serialState->params.ldmParams.bucketSizeLog;
|
||||||
/* Size the seq pool tables */
|
/* Size the seq pool tables */
|
||||||
ZSTDMT_setNbSeq(seqPool, ZSTD_ldm_getMaxNbSeq(params.ldmParams, params.jobSize));
|
ZSTDMT_setNbSeq(seqPool, ZSTD_ldm_getMaxNbSeq(params.ldmParams, jobSize));
|
||||||
/* Reset the window */
|
/* Reset the window */
|
||||||
ZSTD_window_clear(&serialState->ldmState.window);
|
ZSTD_window_clear(&serialState->ldmState.window);
|
||||||
serialState->ldmWindow = serialState->ldmState.window;
|
serialState->ldmWindow = serialState->ldmState.window;
|
||||||
@ -506,6 +506,7 @@ static int ZSTDMT_serialState_reset(serialState_t* serialState, ZSTDMT_seqPool*
|
|||||||
memset(serialState->ldmState.bucketOffsets, 0, bucketSize);
|
memset(serialState->ldmState.bucketOffsets, 0, bucketSize);
|
||||||
}
|
}
|
||||||
serialState->params = params;
|
serialState->params = params;
|
||||||
|
serialState->params.jobSize = (U32)jobSize;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -547,6 +548,7 @@ static void ZSTDMT_serialState_update(serialState_t* serialState,
|
|||||||
size_t error;
|
size_t error;
|
||||||
assert(seqStore.seq != NULL && seqStore.pos == 0 &&
|
assert(seqStore.seq != NULL && seqStore.pos == 0 &&
|
||||||
seqStore.size == 0 && seqStore.capacity > 0);
|
seqStore.size == 0 && seqStore.capacity > 0);
|
||||||
|
assert(src.size <= serialState->params.jobSize);
|
||||||
ZSTD_window_update(&serialState->ldmState.window, src.start, src.size);
|
ZSTD_window_update(&serialState->ldmState.window, src.start, src.size);
|
||||||
error = ZSTD_ldm_generateSequences(
|
error = ZSTD_ldm_generateSequences(
|
||||||
&serialState->ldmState, &seqStore,
|
&serialState->ldmState, &seqStore,
|
||||||
@ -635,13 +637,6 @@ void ZSTDMT_compressionJob(void* jobDescription)
|
|||||||
rawSeqStore_t rawSeqStore = ZSTDMT_getSeq(job->seqPool);
|
rawSeqStore_t rawSeqStore = ZSTDMT_getSeq(job->seqPool);
|
||||||
buffer_t dstBuff = job->dstBuff;
|
buffer_t dstBuff = job->dstBuff;
|
||||||
|
|
||||||
/* Don't compute the checksum for chunks, since we compute it externally,
|
|
||||||
* but write it in the header.
|
|
||||||
*/
|
|
||||||
if (job->jobID != 0) jobParams.fParams.checksumFlag = 0;
|
|
||||||
/* Don't run LDM for the chunks, since we handle it externally */
|
|
||||||
jobParams.ldmParams.enableLdm = 0;
|
|
||||||
|
|
||||||
/* ressources */
|
/* ressources */
|
||||||
if (cctx==NULL) {
|
if (cctx==NULL) {
|
||||||
job->cSize = ERROR(memory_allocation);
|
job->cSize = ERROR(memory_allocation);
|
||||||
@ -655,6 +650,18 @@ void ZSTDMT_compressionJob(void* jobDescription)
|
|||||||
}
|
}
|
||||||
job->dstBuff = dstBuff; /* this value can be read in ZSTDMT_flush, when it copies the whole job */
|
job->dstBuff = dstBuff; /* this value can be read in ZSTDMT_flush, when it copies the whole job */
|
||||||
}
|
}
|
||||||
|
if (jobParams.ldmParams.enableLdm && rawSeqStore.seq == NULL) {
|
||||||
|
job->cSize = ERROR(memory_allocation);
|
||||||
|
goto _endJob;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Don't compute the checksum for chunks, since we compute it externally,
|
||||||
|
* but write it in the header.
|
||||||
|
*/
|
||||||
|
if (job->jobID != 0) jobParams.fParams.checksumFlag = 0;
|
||||||
|
/* Don't run LDM for the chunks, since we handle it externally */
|
||||||
|
jobParams.ldmParams.enableLdm = 0;
|
||||||
|
|
||||||
|
|
||||||
/* init */
|
/* init */
|
||||||
if (job->cdict) {
|
if (job->cdict) {
|
||||||
@ -972,6 +979,8 @@ size_t ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params,
|
|||||||
if ( (value > 0) /* value==0 => automatic job size */
|
if ( (value > 0) /* value==0 => automatic job size */
|
||||||
& (value < ZSTDMT_JOBSIZE_MIN) )
|
& (value < ZSTDMT_JOBSIZE_MIN) )
|
||||||
value = ZSTDMT_JOBSIZE_MIN;
|
value = ZSTDMT_JOBSIZE_MIN;
|
||||||
|
if (value > ZSTDMT_JOBSIZE_MAX)
|
||||||
|
value = ZSTDMT_JOBSIZE_MAX;
|
||||||
params->jobSize = value;
|
params->jobSize = value;
|
||||||
return value;
|
return value;
|
||||||
case ZSTDMT_p_overlapSectionLog :
|
case ZSTDMT_p_overlapSectionLog :
|
||||||
@ -998,6 +1007,21 @@ size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
size_t ZSTDMT_getMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, unsigned* value)
|
||||||
|
{
|
||||||
|
switch (parameter) {
|
||||||
|
case ZSTDMT_p_jobSize:
|
||||||
|
*value = mtctx->params.jobSize;
|
||||||
|
break;
|
||||||
|
case ZSTDMT_p_overlapSectionLog:
|
||||||
|
*value = mtctx->params.overlapSizeLog;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
return ERROR(parameter_unsupported);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
/* Sets parameters relevant to the compression job,
|
/* Sets parameters relevant to the compression job,
|
||||||
* initializing others to default values. */
|
* initializing others to default values. */
|
||||||
static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params)
|
static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params)
|
||||||
@ -1143,7 +1167,7 @@ static size_t ZSTDMT_compress_advanced_internal(
|
|||||||
|
|
||||||
assert(avgJobSize >= 256 KB); /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), required to compress directly into Dst (no additional buffer) */
|
assert(avgJobSize >= 256 KB); /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), required to compress directly into Dst (no additional buffer) */
|
||||||
ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgJobSize) );
|
ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(avgJobSize) );
|
||||||
if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params))
|
if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, avgJobSize))
|
||||||
return ERROR(memory_allocation);
|
return ERROR(memory_allocation);
|
||||||
|
|
||||||
CHECK_F( ZSTDMT_expandJobsTable(mtctx, nbJobs) ); /* only expands if necessary */
|
CHECK_F( ZSTDMT_expandJobsTable(mtctx, nbJobs) ); /* only expands if necessary */
|
||||||
@ -1280,9 +1304,7 @@ size_t ZSTDMT_initCStream_internal(
|
|||||||
if (params.nbWorkers != mtctx->params.nbWorkers)
|
if (params.nbWorkers != mtctx->params.nbWorkers)
|
||||||
CHECK_F( ZSTDMT_resize(mtctx, params.nbWorkers) );
|
CHECK_F( ZSTDMT_resize(mtctx, params.nbWorkers) );
|
||||||
|
|
||||||
if (params.jobSize == 0) {
|
if (params.jobSize > 0 && params.jobSize < ZSTDMT_JOBSIZE_MIN) params.jobSize = ZSTDMT_JOBSIZE_MIN;
|
||||||
params.jobSize = 1U << ZSTDMT_computeTargetJobLog(params);
|
|
||||||
}
|
|
||||||
if (params.jobSize > ZSTDMT_JOBSIZE_MAX) params.jobSize = ZSTDMT_JOBSIZE_MAX;
|
if (params.jobSize > ZSTDMT_JOBSIZE_MAX) params.jobSize = ZSTDMT_JOBSIZE_MAX;
|
||||||
|
|
||||||
mtctx->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN); /* do not trigger multi-threading when srcSize is too small */
|
mtctx->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN); /* do not trigger multi-threading when srcSize is too small */
|
||||||
@ -1321,7 +1343,9 @@ size_t ZSTDMT_initCStream_internal(
|
|||||||
mtctx->targetPrefixSize = (size_t)1 << ZSTDMT_computeOverlapLog(params);
|
mtctx->targetPrefixSize = (size_t)1 << ZSTDMT_computeOverlapLog(params);
|
||||||
DEBUGLOG(4, "overlapLog=%u => %u KB", params.overlapSizeLog, (U32)(mtctx->targetPrefixSize>>10));
|
DEBUGLOG(4, "overlapLog=%u => %u KB", params.overlapSizeLog, (U32)(mtctx->targetPrefixSize>>10));
|
||||||
mtctx->targetSectionSize = params.jobSize;
|
mtctx->targetSectionSize = params.jobSize;
|
||||||
if (mtctx->targetSectionSize < ZSTDMT_JOBSIZE_MIN) mtctx->targetSectionSize = ZSTDMT_JOBSIZE_MIN;
|
if (mtctx->targetSectionSize == 0) {
|
||||||
|
mtctx->targetSectionSize = 1ULL << ZSTDMT_computeTargetJobLog(params);
|
||||||
|
}
|
||||||
if (mtctx->targetSectionSize < mtctx->targetPrefixSize) mtctx->targetSectionSize = mtctx->targetPrefixSize; /* job size must be >= overlap size */
|
if (mtctx->targetSectionSize < mtctx->targetPrefixSize) mtctx->targetSectionSize = mtctx->targetPrefixSize; /* job size must be >= overlap size */
|
||||||
DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(mtctx->targetSectionSize>>10), params.jobSize);
|
DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(mtctx->targetSectionSize>>10), params.jobSize);
|
||||||
DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->targetSectionSize>>10));
|
DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->targetSectionSize>>10));
|
||||||
@ -1363,7 +1387,7 @@ size_t ZSTDMT_initCStream_internal(
|
|||||||
mtctx->allJobsCompleted = 0;
|
mtctx->allJobsCompleted = 0;
|
||||||
mtctx->consumed = 0;
|
mtctx->consumed = 0;
|
||||||
mtctx->produced = 0;
|
mtctx->produced = 0;
|
||||||
if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params))
|
if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, mtctx->targetSectionSize))
|
||||||
return ERROR(memory_allocation);
|
return ERROR(memory_allocation);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -95,6 +95,11 @@ typedef enum {
|
|||||||
* @return : 0, or an error code (which can be tested using ZSTD_isError()) */
|
* @return : 0, or an error code (which can be tested using ZSTD_isError()) */
|
||||||
ZSTDLIB_API size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, unsigned value);
|
ZSTDLIB_API size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, unsigned value);
|
||||||
|
|
||||||
|
/* ZSTDMT_getMTCtxParameter() :
|
||||||
|
* Query the ZSTDMT_CCtx for a parameter value.
|
||||||
|
* @return : 0, or an error code (which can be tested using ZSTD_isError()) */
|
||||||
|
ZSTDLIB_API size_t ZSTDMT_getMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter, unsigned* value);
|
||||||
|
|
||||||
|
|
||||||
/*! ZSTDMT_compressStream_generic() :
|
/*! ZSTDMT_compressStream_generic() :
|
||||||
* Combines ZSTDMT_compressStream() with optional ZSTDMT_flushStream() or ZSTDMT_endStream()
|
* Combines ZSTDMT_compressStream() with optional ZSTDMT_flushStream() or ZSTDMT_endStream()
|
||||||
|
@ -844,7 +844,12 @@ static int basicUnitTests(U32 seed, double compressibility)
|
|||||||
/* Basic multithreading compression test */
|
/* Basic multithreading compression test */
|
||||||
DISPLAYLEVEL(3, "test%3i : compress %u bytes with multiple threads : ", testNb++, COMPRESSIBLE_NOISE_LENGTH);
|
DISPLAYLEVEL(3, "test%3i : compress %u bytes with multiple threads : ", testNb++, COMPRESSIBLE_NOISE_LENGTH);
|
||||||
{ ZSTD_parameters const params = ZSTD_getParams(1, 0, 0);
|
{ ZSTD_parameters const params = ZSTD_getParams(1, 0, 0);
|
||||||
|
unsigned jobSize;
|
||||||
|
CHECK_Z( ZSTDMT_getMTCtxParameter(mtctx, ZSTDMT_p_jobSize, &jobSize));
|
||||||
|
CHECK(jobSize != 0, "job size non-zero");
|
||||||
CHECK_Z( ZSTDMT_initCStream_advanced(mtctx, CNBuffer, dictSize, params, CNBufferSize) );
|
CHECK_Z( ZSTDMT_initCStream_advanced(mtctx, CNBuffer, dictSize, params, CNBufferSize) );
|
||||||
|
CHECK_Z( ZSTDMT_getMTCtxParameter(mtctx, ZSTDMT_p_jobSize, &jobSize));
|
||||||
|
CHECK(jobSize != 0, "job size non-zero");
|
||||||
}
|
}
|
||||||
outBuff.dst = compressedBuffer;
|
outBuff.dst = compressedBuffer;
|
||||||
outBuff.size = compressedBufferSize;
|
outBuff.size = compressedBufferSize;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user