Merge pull request #1000 from facebook/progressiveFlush

Progressive flush
This commit is contained in:
Yann Collet 2018-01-30 22:49:47 -08:00 committed by GitHub
commit 823a28a1f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 709 additions and 531 deletions

View File

@ -926,7 +926,7 @@ size_t ZSTD_CCtx_refPrefix_advanced(ZSTD_CCtx* cctx, const void* prefix, size_t
</p></pre><BR> </p></pre><BR>
<pre><b>typedef enum { <pre><b>typedef enum {
ZSTD_e_continue=0, </b>/* collect more data, encoder transparently decides when to output result, for optimal conditions */<b> ZSTD_e_continue=0, </b>/* collect more data, encoder decides when to output compressed result, for optimal conditions */<b>
ZSTD_e_flush, </b>/* flush any data provided so far - frame will continue, future data can still reference previous data for better compression */<b> ZSTD_e_flush, </b>/* flush any data provided so far - frame will continue, future data can still reference previous data for better compression */<b>
ZSTD_e_end </b>/* flush any remaining data and close current frame. Any additional data starts a new frame. */<b> ZSTD_e_end </b>/* flush any remaining data and close current frame. Any additional data starts a new frame. */<b>
} ZSTD_EndDirective; } ZSTD_EndDirective;
@ -945,10 +945,11 @@ size_t ZSTD_CCtx_refPrefix_advanced(ZSTD_CCtx* cctx, const void* prefix, size_t
and then immediately returns, just indicating that there is some data remaining to be flushed. and then immediately returns, just indicating that there is some data remaining to be flushed.
The function nonetheless guarantees forward progress : it will return only after it reads or write at least 1+ byte. The function nonetheless guarantees forward progress : it will return only after it reads or write at least 1+ byte.
- Exception : in multi-threading mode, if the first call requests a ZSTD_e_end directive, it is blocking : it will complete compression before giving back control to caller. - Exception : in multi-threading mode, if the first call requests a ZSTD_e_end directive, it is blocking : it will complete compression before giving back control to caller.
- @return provides the minimum amount of data remaining to be flushed from internal buffers - @return provides a minimum amount of data remaining to be flushed from internal buffers
or an error code, which can be tested using ZSTD_isError(). or an error code, which can be tested using ZSTD_isError().
if @return != 0, flush is not fully completed, there is still some data left within internal buffers. if @return != 0, flush is not fully completed, there is still some data left within internal buffers.
This is useful to determine if a ZSTD_e_flush or ZSTD_e_end directive is completed. This is useful for ZSTD_e_flush, since in this case more flushes are necessary to empty all buffers.
For ZSTD_e_end, @return == 0 when internal buffers are fully flushed and frame is completed.
- after a ZSTD_e_end directive, if internal buffer is not fully flushed (@return != 0), - after a ZSTD_e_end directive, if internal buffer is not fully flushed (@return != 0),
only ZSTD_e_end or ZSTD_e_flush operations are allowed. only ZSTD_e_end or ZSTD_e_flush operations are allowed.
Before starting a new compression job, or changing compression parameters, Before starting a new compression job, or changing compression parameters,

View File

@ -12,6 +12,7 @@
/* ====== Dependencies ======= */ /* ====== Dependencies ======= */
#include <stddef.h> /* size_t */ #include <stddef.h> /* size_t */
#include "pool.h" #include "pool.h"
#include "zstd_internal.h" /* ZSTD_malloc, ZSTD_free */
/* ====== Compiler specifics ====== */ /* ====== Compiler specifics ====== */
#if defined(_MSC_VER) #if defined(_MSC_VER)
@ -193,32 +194,54 @@ static int isQueueFull(POOL_ctx const* ctx) {
} }
} }
void POOL_add(void* ctxVoid, POOL_function function, void *opaque) {
POOL_ctx* const ctx = (POOL_ctx*)ctxVoid;
if (!ctx) { return; }
ZSTD_pthread_mutex_lock(&ctx->queueMutex); static void POOL_add_internal(POOL_ctx* ctx, POOL_function function, void *opaque)
{ POOL_job const job = {function, opaque}; {
POOL_job const job = {function, opaque};
assert(ctx != NULL);
if (ctx->shutdown) return;
/* Wait until there is space in the queue for the new job */ ctx->queueEmpty = 0;
while (isQueueFull(ctx) && !ctx->shutdown) { ctx->queue[ctx->queueTail] = job;
ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex); ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize;
}
/* The queue is still going => there is space */
if (!ctx->shutdown) {
ctx->queueEmpty = 0;
ctx->queue[ctx->queueTail] = job;
ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize;
}
}
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
ZSTD_pthread_cond_signal(&ctx->queuePopCond); ZSTD_pthread_cond_signal(&ctx->queuePopCond);
} }
#else /* ZSTD_MULTITHREAD not defined */ void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque)
/* No multi-threading support */ {
assert(ctx != NULL);
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
/* Wait until there is space in the queue for the new job */
while (isQueueFull(ctx) && (!ctx->shutdown)) {
ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
}
POOL_add_internal(ctx, function, opaque);
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
}
/* We don't need any data, but if it is empty malloc() might return NULL. */
int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque)
{
assert(ctx != NULL);
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
if (isQueueFull(ctx)) {
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
return 0;
}
POOL_add_internal(ctx, function, opaque);
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
return 1;
}
#else /* ZSTD_MULTITHREAD not defined */
/* ========================== */
/* No multi-threading support */
/* ========================== */
/* We don't need any data, but if it is empty, malloc() might return NULL. */
struct POOL_ctx_s { struct POOL_ctx_s {
int dummy; int dummy;
}; };
@ -240,11 +263,17 @@ void POOL_free(POOL_ctx* ctx) {
(void)ctx; (void)ctx;
} }
void POOL_add(void* ctx, POOL_function function, void* opaque) { void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque) {
(void)ctx; (void)ctx;
function(opaque); function(opaque);
} }
int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque) {
(void)ctx;
function(opaque);
return 1;
}
size_t POOL_sizeof(POOL_ctx* ctx) { size_t POOL_sizeof(POOL_ctx* ctx) {
if (ctx==NULL) return 0; /* supports sizeof NULL */ if (ctx==NULL) return 0; /* supports sizeof NULL */
assert(ctx == &g_ctx); assert(ctx == &g_ctx);

View File

@ -17,7 +17,8 @@ extern "C" {
#include <stddef.h> /* size_t */ #include <stddef.h> /* size_t */
#include "zstd_internal.h" /* ZSTD_customMem */ #define ZSTD_STATIC_LINKING_ONLY /* ZSTD_customMem */
#include "zstd.h"
typedef struct POOL_ctx_s POOL_ctx; typedef struct POOL_ctx_s POOL_ctx;
@ -27,35 +28,43 @@ typedef struct POOL_ctx_s POOL_ctx;
* The maximum number of queued jobs before blocking is `queueSize`. * The maximum number of queued jobs before blocking is `queueSize`.
* @return : POOL_ctx pointer on success, else NULL. * @return : POOL_ctx pointer on success, else NULL.
*/ */
POOL_ctx *POOL_create(size_t numThreads, size_t queueSize); POOL_ctx* POOL_create(size_t numThreads, size_t queueSize);
POOL_ctx *POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem); POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem);
/*! POOL_free() : /*! POOL_free() :
Free a thread pool returned by POOL_create(). Free a thread pool returned by POOL_create().
*/ */
void POOL_free(POOL_ctx *ctx); void POOL_free(POOL_ctx* ctx);
/*! POOL_sizeof() : /*! POOL_sizeof() :
return memory usage of pool returned by POOL_create(). return memory usage of pool returned by POOL_create().
*/ */
size_t POOL_sizeof(POOL_ctx *ctx); size_t POOL_sizeof(POOL_ctx* ctx);
/*! POOL_function : /*! POOL_function :
The function type that can be added to a thread pool. The function type that can be added to a thread pool.
*/ */
typedef void (*POOL_function)(void *); typedef void (*POOL_function)(void*);
/*! POOL_add_function : /*! POOL_add_function :
The function type for a generic thread pool add function. The function type for a generic thread pool add function.
*/ */
typedef void (*POOL_add_function)(void *, POOL_function, void *); typedef void (*POOL_add_function)(void*, POOL_function, void*);
/*! POOL_add() : /*! POOL_add() :
Add the job `function(opaque)` to the thread pool. Add the job `function(opaque)` to the thread pool. `ctx` must be valid.
Possibly blocks until there is room in the queue. Possibly blocks until there is room in the queue.
Note : The function may be executed asynchronously, so `opaque` must live until the function has been completed. Note : The function may be executed asynchronously, so `opaque` must live until the function has been completed.
*/ */
void POOL_add(void *ctx, POOL_function function, void *opaque); void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque);
/*! POOL_tryAdd() :
Add the job `function(opaque)` to the thread pool if a worker is available.
return immediately otherwise.
@return : 1 if successful, 0 if not.
*/
int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque);
#if defined (__cplusplus) #if defined (__cplusplus)

View File

@ -1824,7 +1824,7 @@ static size_t ZSTD_compressBlock_internal(ZSTD_CCtx* zc,
void* dst, size_t dstCapacity, void* dst, size_t dstCapacity,
const void* src, size_t srcSize) const void* src, size_t srcSize)
{ {
DEBUGLOG(5, "ZSTD_compressBlock_internal (dstCapacity=%u) (dictLimit=%u, nextToUpdate=%u)", DEBUGLOG(5, "ZSTD_compressBlock_internal (dstCapacity=%u, dictLimit=%u, nextToUpdate=%u)",
(U32)dstCapacity, zc->blockState.matchState.dictLimit, zc->blockState.matchState.nextToUpdate); (U32)dstCapacity, zc->blockState.matchState.dictLimit, zc->blockState.matchState.nextToUpdate);
if (srcSize < MIN_CBLOCK_SIZE+ZSTD_blockHeaderSize+1) if (srcSize < MIN_CBLOCK_SIZE+ZSTD_blockHeaderSize+1)
return 0; /* don't even attempt compression below a certain srcSize */ return 0; /* don't even attempt compression below a certain srcSize */
@ -1837,9 +1837,9 @@ static size_t ZSTD_compressBlock_internal(ZSTD_CCtx* zc,
if (current > zc->blockState.matchState.nextToUpdate + 384) if (current > zc->blockState.matchState.nextToUpdate + 384)
zc->blockState.matchState.nextToUpdate = current - MIN(192, (U32)(current - zc->blockState.matchState.nextToUpdate - 384)); zc->blockState.matchState.nextToUpdate = current - MIN(192, (U32)(current - zc->blockState.matchState.nextToUpdate - 384));
} }
/* find and store sequences */
{ /* select and store sequences */
U32 const extDict = zc->blockState.matchState.lowLimit < zc->blockState.matchState.dictLimit; { U32 const extDict = zc->blockState.matchState.lowLimit < zc->blockState.matchState.dictLimit;
size_t lastLLSize; size_t lastLLSize;
{ int i; for (i = 0; i < ZSTD_REP_NUM; ++i) zc->blockState.nextCBlock->rep[i] = zc->blockState.prevCBlock->rep[i]; } { int i; for (i = 0; i < ZSTD_REP_NUM; ++i) zc->blockState.nextCBlock->rep[i] = zc->blockState.prevCBlock->rep[i]; }
if (zc->appliedParams.ldmParams.enableLdm) { if (zc->appliedParams.ldmParams.enableLdm) {
@ -1848,26 +1848,20 @@ static size_t ZSTD_compressBlock_internal(ZSTD_CCtx* zc,
U32 rep[ZSTD_REP_NUM], ZSTD_CCtx_params const* params, U32 rep[ZSTD_REP_NUM], ZSTD_CCtx_params const* params,
void const* src, size_t srcSize); void const* src, size_t srcSize);
ZSTD_ldmBlockCompressor const ldmBlockCompressor = extDict ? ZSTD_compressBlock_ldm_extDict : ZSTD_compressBlock_ldm; ZSTD_ldmBlockCompressor const ldmBlockCompressor = extDict ? ZSTD_compressBlock_ldm_extDict : ZSTD_compressBlock_ldm;
lastLLSize = ldmBlockCompressor(&zc->ldmState, &zc->blockState.matchState, &zc->seqStore, zc->blockState.nextCBlock->rep, &zc->appliedParams, src, srcSize); lastLLSize = ldmBlockCompressor(&zc->ldmState, &zc->blockState.matchState, &zc->seqStore, zc->blockState.nextCBlock->rep, &zc->appliedParams, src, srcSize);
} else { } else { /* not long range mode */
ZSTD_blockCompressor const blockCompressor = ZSTD_selectBlockCompressor(zc->appliedParams.cParams.strategy, extDict); ZSTD_blockCompressor const blockCompressor = ZSTD_selectBlockCompressor(zc->appliedParams.cParams.strategy, extDict);
lastLLSize = blockCompressor(&zc->blockState.matchState, &zc->seqStore, zc->blockState.nextCBlock->rep, &zc->appliedParams.cParams, src, srcSize); lastLLSize = blockCompressor(&zc->blockState.matchState, &zc->seqStore, zc->blockState.nextCBlock->rep, &zc->appliedParams.cParams, src, srcSize);
} }
{ { const BYTE* const lastLiterals = (const BYTE*)src + srcSize - lastLLSize;
const BYTE* const anchor = (const BYTE*)src + srcSize - lastLLSize; ZSTD_storeLastLiterals(&zc->seqStore, lastLiterals, lastLLSize);
ZSTD_storeLastLiterals(&zc->seqStore, anchor, lastLLSize); } }
}
} /* encode sequences and literals */
/* encode */ { size_t const cSize = ZSTD_compressSequences(&zc->seqStore, &zc->blockState.prevCBlock->entropy, &zc->blockState.nextCBlock->entropy, &zc->appliedParams.cParams, dst, dstCapacity, srcSize, zc->entropyWorkspace);
{ if (ZSTD_isError(cSize) || cSize == 0) return cSize;
size_t const cSize = ZSTD_compressSequences(&zc->seqStore, &zc->blockState.prevCBlock->entropy, &zc->blockState.nextCBlock->entropy, &zc->appliedParams.cParams, dst, dstCapacity, srcSize, zc->entropyWorkspace);
if (ZSTD_isError(cSize) || cSize == 0)
return cSize;
/* confirm repcodes and entropy tables */ /* confirm repcodes and entropy tables */
{ { ZSTD_compressedBlockState_t* const tmp = zc->blockState.prevCBlock;
ZSTD_compressedBlockState_t* const tmp = zc->blockState.prevCBlock;
zc->blockState.prevCBlock = zc->blockState.nextCBlock; zc->blockState.prevCBlock = zc->blockState.nextCBlock;
zc->blockState.nextCBlock = tmp; zc->blockState.nextCBlock = tmp;
} }
@ -2030,6 +2024,20 @@ static size_t ZSTD_writeFrameHeader(void* dst, size_t dstCapacity,
return pos; return pos;
} }
/* ZSTD_writeLastEmptyBlock() :
* output an empty Block with end-of-frame mark to complete a frame
* @return : size of data written into `dst` (== ZSTD_blockHeaderSize (defined in zstd_internal.h))
* or an error code if `dstCapcity` is too small (<ZSTD_blockHeaderSize)
*/
size_t ZSTD_writeLastEmptyBlock(void* dst, size_t dstCapacity)
{
if (dstCapacity < ZSTD_blockHeaderSize) return ERROR(dstSize_tooSmall);
{ U32 const cBlockHeader24 = 1 /*lastBlock*/ + (((U32)bt_raw)<<1); /* 0 size */
MEM_writeLE24(dst, cBlockHeader24);
return ZSTD_blockHeaderSize;
}
}
static void ZSTD_manageWindowContinuity(ZSTD_matchState_t* ms, void const* src, size_t srcSize) static void ZSTD_manageWindowContinuity(ZSTD_matchState_t* ms, void const* src, size_t srcSize)
{ {
@ -2396,7 +2404,7 @@ static size_t ZSTD_writeEpilogue(ZSTD_CCtx* cctx, void* dst, size_t dstCapacity)
BYTE* op = ostart; BYTE* op = ostart;
size_t fhSize = 0; size_t fhSize = 0;
DEBUGLOG(5, "ZSTD_writeEpilogue"); DEBUGLOG(4, "ZSTD_writeEpilogue");
if (cctx->stage == ZSTDcs_created) return ERROR(stage_wrong); /* init missing */ if (cctx->stage == ZSTDcs_created) return ERROR(stage_wrong); /* init missing */
/* special case : empty frame */ /* special case : empty frame */
@ -2420,6 +2428,7 @@ static size_t ZSTD_writeEpilogue(ZSTD_CCtx* cctx, void* dst, size_t dstCapacity)
if (cctx->appliedParams.fParams.checksumFlag) { if (cctx->appliedParams.fParams.checksumFlag) {
U32 const checksum = (U32) XXH64_digest(&cctx->xxhState); U32 const checksum = (U32) XXH64_digest(&cctx->xxhState);
if (dstCapacity<4) return ERROR(dstSize_tooSmall); if (dstCapacity<4) return ERROR(dstSize_tooSmall);
DEBUGLOG(4, "ZSTD_writeEpilogue: write checksum : %08X", checksum);
MEM_writeLE32(op, checksum); MEM_writeLE32(op, checksum);
op += 4; op += 4;
} }

View File

@ -481,4 +481,13 @@ size_t ZSTD_compress_advanced_internal(ZSTD_CCtx* cctx,
const void* dict,size_t dictSize, const void* dict,size_t dictSize,
ZSTD_CCtx_params params); ZSTD_CCtx_params params);
/* ZSTD_writeLastEmptyBlock() :
* output an empty Block with end-of-frame mark to complete a frame
* @return : size of data written into `dst` (== ZSTD_blockHeaderSize (defined in zstd_internal.h))
* or an error code if `dstCapcity` is too small (<ZSTD_blockHeaderSize)
*/
size_t ZSTD_writeLastEmptyBlock(void* dst, size_t dstCapacity);
#endif /* ZSTD_COMPRESS_H */ #endif /* ZSTD_COMPRESS_H */

File diff suppressed because it is too large Load Diff

View File

@ -97,11 +97,12 @@ ZSTDLIB_API size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter
/*! ZSTDMT_compressStream_generic() : /*! ZSTDMT_compressStream_generic() :
* Combines ZSTDMT_compressStream() with ZSTDMT_flushStream() or ZSTDMT_endStream() * Combines ZSTDMT_compressStream() with optional ZSTDMT_flushStream() or ZSTDMT_endStream()
* depending on flush directive. * depending on flush directive.
* @return : minimum amount of data still to be flushed * @return : minimum amount of data still to be flushed
* 0 if fully flushed * 0 if fully flushed
* or an error code */ * or an error code
* note : needs to be init using any ZSTD_initCStream*() variant */
ZSTDLIB_API size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, ZSTDLIB_API size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
ZSTD_outBuffer* output, ZSTD_outBuffer* output,
ZSTD_inBuffer* input, ZSTD_inBuffer* input,

View File

@ -1122,7 +1122,7 @@ ZSTDLIB_API size_t ZSTD_CCtx_refPrefix_advanced(ZSTD_CCtx* cctx, const void* pre
typedef enum { typedef enum {
ZSTD_e_continue=0, /* collect more data, encoder transparently decides when to output result, for optimal conditions */ ZSTD_e_continue=0, /* collect more data, encoder decides when to output compressed result, for optimal conditions */
ZSTD_e_flush, /* flush any data provided so far - frame will continue, future data can still reference previous data for better compression */ ZSTD_e_flush, /* flush any data provided so far - frame will continue, future data can still reference previous data for better compression */
ZSTD_e_end /* flush any remaining data and close current frame. Any additional data starts a new frame. */ ZSTD_e_end /* flush any remaining data and close current frame. Any additional data starts a new frame. */
} ZSTD_EndDirective; } ZSTD_EndDirective;
@ -1138,10 +1138,11 @@ typedef enum {
* and then immediately returns, just indicating that there is some data remaining to be flushed. * and then immediately returns, just indicating that there is some data remaining to be flushed.
* The function nonetheless guarantees forward progress : it will return only after it reads or write at least 1+ byte. * The function nonetheless guarantees forward progress : it will return only after it reads or write at least 1+ byte.
* - Exception : in multi-threading mode, if the first call requests a ZSTD_e_end directive, it is blocking : it will complete compression before giving back control to caller. * - Exception : in multi-threading mode, if the first call requests a ZSTD_e_end directive, it is blocking : it will complete compression before giving back control to caller.
* - @return provides the minimum amount of data remaining to be flushed from internal buffers * - @return provides a minimum amount of data remaining to be flushed from internal buffers
* or an error code, which can be tested using ZSTD_isError(). * or an error code, which can be tested using ZSTD_isError().
* if @return != 0, flush is not fully completed, there is still some data left within internal buffers. * if @return != 0, flush is not fully completed, there is still some data left within internal buffers.
* This is useful to determine if a ZSTD_e_flush or ZSTD_e_end directive is completed. * This is useful for ZSTD_e_flush, since in this case more flushes are necessary to empty all buffers.
* For ZSTD_e_end, @return == 0 when internal buffers are fully flushed and frame is completed.
* - after a ZSTD_e_end directive, if internal buffer is not fully flushed (@return != 0), * - after a ZSTD_e_end directive, if internal buffer is not fully flushed (@return != 0),
* only ZSTD_e_end or ZSTD_e_flush operations are allowed. * only ZSTD_e_end or ZSTD_e_flush operations are allowed.
* Before starting a new compression job, or changing compression parameters, * Before starting a new compression job, or changing compression parameters,

View File

@ -792,6 +792,7 @@ static int FIO_compressFilename_internal(cRess_t ress,
/* Fill input Buffer */ /* Fill input Buffer */
size_t const inSize = fread(ress.srcBuffer, (size_t)1, ress.srcBufferSize, srcFile); size_t const inSize = fread(ress.srcBuffer, (size_t)1, ress.srcBufferSize, srcFile);
ZSTD_inBuffer inBuff = { ress.srcBuffer, inSize, 0 }; ZSTD_inBuffer inBuff = { ress.srcBuffer, inSize, 0 };
DISPLAYLEVEL(6, "fread %u bytes from source \n", (U32)inSize);
readsize += inSize; readsize += inSize;
if (inSize == 0 || (fileSize != UTIL_FILESIZE_UNKNOWN && readsize == fileSize)) if (inSize == 0 || (fileSize != UTIL_FILESIZE_UNKNOWN && readsize == fileSize))
@ -803,32 +804,23 @@ static int FIO_compressFilename_internal(cRess_t ress,
CHECK_V(result, ZSTD_compress_generic(ress.cctx, &outBuff, &inBuff, directive)); CHECK_V(result, ZSTD_compress_generic(ress.cctx, &outBuff, &inBuff, directive));
/* Write compressed stream */ /* Write compressed stream */
DISPLAYLEVEL(6, "ZSTD_compress_generic,ZSTD_e_continue: generated %u bytes \n", DISPLAYLEVEL(6, "ZSTD_compress_generic(end:%u) => intput pos(%u)<=(%u)size ; output generated %u bytes \n",
(U32)outBuff.pos); (U32)directive, (U32)inBuff.pos, (U32)inBuff.size, (U32)outBuff.pos);
if (outBuff.pos) { if (outBuff.pos) {
size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile); size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile);
if (sizeCheck!=outBuff.pos) if (sizeCheck!=outBuff.pos)
EXM_THROW(25, "Write error : cannot write compressed block into %s", dstFileName); EXM_THROW(25, "Write error : cannot write compressed block into %s", dstFileName);
compressedfilesize += outBuff.pos; compressedfilesize += outBuff.pos;
} }
if (READY_FOR_UPDATE()) {
ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);
DISPLAYUPDATE(2, "\rRead :%6u MB - Consumed :%6u MB - Compressed :%6u MB => %.2f%%",
(U32)(zfp.ingested >> 20),
(U32)(zfp.consumed >> 20),
(U32)(zfp.produced >> 20),
(double)zfp.produced / (zfp.consumed + !zfp.consumed/*avoid div0*/) * 100 );
}
} }
#if 1
if (READY_FOR_UPDATE()) {
ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);
DISPLAYUPDATE(2, "\rRead :%6u MB - Consumed :%6u MB - Compressed :%6u MB => %.2f%%",
(U32)(zfp.ingested >> 20),
(U32)(zfp.consumed >> 20),
(U32)(zfp.produced >> 20),
(double)zfp.produced / (zfp.consumed + !zfp.consumed/*avoid div0*/) * 100 );
}
#else
if (fileSize == UTIL_FILESIZE_UNKNOWN) {
DISPLAYUPDATE(2, "\rRead : %u MB", (U32)(readsize>>20));
} else {
DISPLAYUPDATE(2, "\rRead : %u / %u MB",
(U32)(readsize>>20), (U32)(fileSize>>20));
}
#endif
} while (directive != ZSTD_e_end); } while (directive != ZSTD_e_end);
finish: finish:

View File

@ -99,7 +99,7 @@ unsigned int FUZ_rand(unsigned int* seedPtr)
if (cond) { \ if (cond) { \
DISPLAY("Error => "); \ DISPLAY("Error => "); \
DISPLAY(__VA_ARGS__); \ DISPLAY(__VA_ARGS__); \
DISPLAY(" (seed %u, test nb %u, line %u) \n", \ DISPLAY(" (seed %u, test nb %u, line %u) \n", \
seed, testNb, __LINE__); \ seed, testNb, __LINE__); \
goto _output_error; \ goto _output_error; \
} } } }
@ -219,6 +219,7 @@ static int basicUnitTests(U32 seed, double compressibility)
size_t cSize; size_t cSize;
int testResult = 0; int testResult = 0;
U32 testNb = 1; U32 testNb = 1;
U32 coreSeed = 0; /* this name to conform with CHECK_Z macro display */
ZSTD_CStream* zc = ZSTD_createCStream(); ZSTD_CStream* zc = ZSTD_createCStream();
ZSTD_DStream* zd = ZSTD_createDStream(); ZSTD_DStream* zd = ZSTD_createDStream();
ZSTDMT_CCtx* mtctx = ZSTDMT_createCCtx(2); ZSTDMT_CCtx* mtctx = ZSTDMT_createCCtx(2);
@ -372,7 +373,7 @@ static int basicUnitTests(U32 seed, double compressibility)
DISPLAYLEVEL(3, "OK (%u bytes) \n", (U32)s); DISPLAYLEVEL(3, "OK (%u bytes) \n", (U32)s);
} }
/* Byte-by-byte decompression test */ /* Decompression by small increment */
DISPLAYLEVEL(3, "test%3i : decompress byte-by-byte : ", testNb++); DISPLAYLEVEL(3, "test%3i : decompress byte-by-byte : ", testNb++);
{ /* skippable frame */ { /* skippable frame */
size_t r = 1; size_t r = 1;
@ -382,8 +383,10 @@ static int basicUnitTests(U32 seed, double compressibility)
inBuff.pos = 0; inBuff.pos = 0;
outBuff.pos = 0; outBuff.pos = 0;
while (r) { /* skippable frame */ while (r) { /* skippable frame */
inBuff.size = inBuff.pos + 1; size_t const inSize = FUZ_rand(&coreSeed) & 15;
outBuff.size = outBuff.pos + 1; size_t const outSize = FUZ_rand(&coreSeed) & 15;
inBuff.size = inBuff.pos + inSize;
outBuff.size = outBuff.pos + outSize;
r = ZSTD_decompressStream(zd, &outBuff, &inBuff); r = ZSTD_decompressStream(zd, &outBuff, &inBuff);
if (ZSTD_isError(r)) goto _output_error; if (ZSTD_isError(r)) goto _output_error;
} }
@ -391,8 +394,10 @@ static int basicUnitTests(U32 seed, double compressibility)
ZSTD_initDStream_usingDict(zd, CNBuffer, dictSize); ZSTD_initDStream_usingDict(zd, CNBuffer, dictSize);
r=1; r=1;
while (r) { while (r) {
inBuff.size = inBuff.pos + 1; size_t const inSize = FUZ_rand(&coreSeed) & 15;
outBuff.size = outBuff.pos + 1; size_t const outSize = FUZ_rand(&coreSeed) & 15;
inBuff.size = inBuff.pos + inSize;
outBuff.size = outBuff.pos + outSize;
r = ZSTD_decompressStream(zd, &outBuff, &inBuff); r = ZSTD_decompressStream(zd, &outBuff, &inBuff);
if (ZSTD_isError(r)) goto _output_error; if (ZSTD_isError(r)) goto _output_error;
} }
@ -694,10 +699,13 @@ static int basicUnitTests(U32 seed, double compressibility)
inBuff.src = CNBuffer; inBuff.src = CNBuffer;
inBuff.size = CNBufferSize; inBuff.size = CNBufferSize;
inBuff.pos = 0; inBuff.pos = 0;
CHECK_Z( ZSTDMT_compressStream_generic(mtctx, &outBuff, &inBuff, ZSTD_e_end) ); { size_t const compressResult = ZSTDMT_compressStream_generic(mtctx, &outBuff, &inBuff, ZSTD_e_end);
if (compressResult != 0) goto _output_error; /* compression must be completed in a single round */
}
if (inBuff.pos != inBuff.size) goto _output_error; /* entire input should be consumed */ if (inBuff.pos != inBuff.size) goto _output_error; /* entire input should be consumed */
{ size_t const r = ZSTDMT_endStream(mtctx, &outBuff); { size_t const compressedSize = ZSTD_findFrameCompressedSize(compressedBuffer, outBuff.pos);
if (r != 0) goto _output_error; } /* error, or some data not flushed */ if (compressedSize != outBuff.pos) goto _output_error; /* must be a full valid frame */
}
DISPLAYLEVEL(3, "OK \n"); DISPLAYLEVEL(3, "OK \n");
/* Complex multithreading + dictionary test */ /* Complex multithreading + dictionary test */
@ -863,11 +871,21 @@ static size_t findDiff(const void* buf1, const void* buf2, size_t max)
for (u=0; u<max; u++) { for (u=0; u<max; u++) {
if (b1[u] != b2[u]) break; if (b1[u] != b2[u]) break;
} }
if (u==max) {
DISPLAY("=> No difference detected within %u bytes \n", (U32)max);
return u;
}
DISPLAY("Error at position %u / %u \n", (U32)u, (U32)max); DISPLAY("Error at position %u / %u \n", (U32)u, (U32)max);
DISPLAY(" %02X %02X %02X :%02X: %02X %02X %02X %02X %02X \n", if (u>=3)
b1[u-3], b1[u-2], b1[u-1], b1[u-0], b1[u+1], b1[u+2], b1[u+3], b1[u+4], b1[u+5]); DISPLAY(" %02X %02X %02X ",
DISPLAY(" %02X %02X %02X :%02X: %02X %02X %02X %02X %02X \n", b1[u-3], b1[u-2], b1[u-1]);
b2[u-3], b2[u-2], b2[u-1], b2[u-0], b2[u+1], b2[u+2], b2[u+3], b2[u+4], b2[u+5]); DISPLAY(" :%02X: %02X %02X %02X %02X %02X \n",
b1[u], b1[u+1], b1[u+2], b1[u+3], b1[u+4], b1[u+5]);
if (u>=3)
DISPLAY(" %02X %02X %02X ",
b2[u-3], b2[u-2], b2[u-1]);
DISPLAY(" :%02X: %02X %02X %02X %02X %02X \n",
b2[u], b2[u+1], b2[u+2], b2[u+3], b2[u+4], b2[u+5]);
return u; return u;
} }
@ -948,10 +966,13 @@ static int fuzzerTests(U32 seed, U32 nbTests, unsigned startTest, double compres
size_t maxTestSize; size_t maxTestSize;
/* init */ /* init */
if (nbTests >= testNb) { DISPLAYUPDATE(2, "\r%6u/%6u ", testNb, nbTests); }
else { DISPLAYUPDATE(2, "\r%6u ", testNb); }
FUZ_rand(&coreSeed); FUZ_rand(&coreSeed);
lseed = coreSeed ^ prime32; lseed = coreSeed ^ prime32;
if (nbTests >= testNb) {
DISPLAYUPDATE(2, "\r%6u/%6u ", testNb, nbTests);
} else {
DISPLAYUPDATE(2, "\r%6u ", testNb);
}
/* states full reset (deliberately not synchronized) */ /* states full reset (deliberately not synchronized) */
/* some issues can only happen when reusing states */ /* some issues can only happen when reusing states */
@ -1161,7 +1182,6 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
UTIL_time_t const startClock = UTIL_getTime(); UTIL_time_t const startClock = UTIL_getTime();
const BYTE* dict=NULL; /* can keep same dict on 2 consecutive tests */ const BYTE* dict=NULL; /* can keep same dict on 2 consecutive tests */
size_t dictSize = 0; size_t dictSize = 0;
U32 oldTestLog = 0;
int const cLevelMax = bigTests ? (U32)ZSTD_maxCLevel()-1 : g_cLevelMax_smallTests; int const cLevelMax = bigTests ? (U32)ZSTD_maxCLevel()-1 : g_cLevelMax_smallTests;
U32 const nbThreadsMax = bigTests ? 4 : 2; U32 const nbThreadsMax = bigTests ? 4 : 2;
@ -1183,6 +1203,7 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
RDG_genBuffer(cNoiseBuffer[4], srcBufferSize, 1.00, 0., coreSeed); /* sparse content */ RDG_genBuffer(cNoiseBuffer[4], srcBufferSize, 1.00, 0., coreSeed); /* sparse content */
memset(copyBuffer, 0x65, copyBufferSize); /* make copyBuffer considered initialized */ memset(copyBuffer, 0x65, copyBufferSize); /* make copyBuffer considered initialized */
ZSTD_initDStream_usingDict(zd, NULL, 0); /* ensure at least one init */ ZSTD_initDStream_usingDict(zd, NULL, 0); /* ensure at least one init */
DISPLAYLEVEL(6, "Creating initial context with %u threads \n", nbThreads);
/* catch up testNb */ /* catch up testNb */
for (testNb=1; testNb < startTest; testNb++) for (testNb=1; testNb < startTest; testNb++)
@ -1195,14 +1216,14 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
size_t totalTestSize, totalGenSize, cSize; size_t totalTestSize, totalGenSize, cSize;
XXH64_state_t xxhState; XXH64_state_t xxhState;
U64 crcOrig; U64 crcOrig;
U32 resetAllowed = 1;
size_t maxTestSize; size_t maxTestSize;
/* init */
if (testNb < nbTests) {
DISPLAYUPDATE(2, "\r%6u/%6u ", testNb, nbTests);
} else { DISPLAYUPDATE(2, "\r%6u ", testNb); }
FUZ_rand(&coreSeed); FUZ_rand(&coreSeed);
if (nbTests >= testNb) {
DISPLAYUPDATE(2, "\r%6u/%6u ", testNb, nbTests);
} else {
DISPLAYUPDATE(2, "\r%6u ", testNb);
}
lseed = coreSeed ^ prime32; lseed = coreSeed ^ prime32;
/* states full reset (deliberately not synchronized) */ /* states full reset (deliberately not synchronized) */
@ -1213,7 +1234,6 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
ZSTDMT_freeCCtx(zc); ZSTDMT_freeCCtx(zc);
zc = ZSTDMT_createCCtx(nbThreads); zc = ZSTDMT_createCCtx(nbThreads);
CHECK(zc==NULL, "ZSTDMT_createCCtx allocation error") CHECK(zc==NULL, "ZSTDMT_createCCtx allocation error")
resetAllowed=0;
} }
if ((FUZ_rand(&lseed) & 0xFF) == 132) { if ((FUZ_rand(&lseed) & 0xFF) == 132) {
ZSTD_freeDStream(zd); ZSTD_freeDStream(zd);
@ -1238,16 +1258,7 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
} }
/* compression init */ /* compression init */
if ((FUZ_rand(&lseed)&1) /* at beginning, to keep same nb of rand */ { U32 const testLog = FUZ_rand(&lseed) % maxSrcLog;
&& oldTestLog /* at least one test happened */ && resetAllowed) {
maxTestSize = FUZ_randomLength(&lseed, oldTestLog+2);
if (maxTestSize >= srcBufferSize) maxTestSize = srcBufferSize-1;
{ int const compressionLevel = (FUZ_rand(&lseed) % 5) + 1;
DISPLAYLEVEL(5, "Init with compression level = %i \n", compressionLevel);
CHECK_Z( ZSTDMT_initCStream(zc, compressionLevel) );
}
} else {
U32 const testLog = FUZ_rand(&lseed) % maxSrcLog;
U32 const dictLog = FUZ_rand(&lseed) % maxSrcLog; U32 const dictLog = FUZ_rand(&lseed) % maxSrcLog;
int const cLevelCandidate = ( FUZ_rand(&lseed) int const cLevelCandidate = ( FUZ_rand(&lseed)
% (ZSTD_maxCLevel() - (MAX(testLog, dictLog) / 2)) ) % (ZSTD_maxCLevel() - (MAX(testLog, dictLog) / 2)) )
@ -1256,24 +1267,29 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
int const cLevelMin = MAX(cLevelThreadAdjusted, 1); /* no negative cLevel yet */ int const cLevelMin = MAX(cLevelThreadAdjusted, 1); /* no negative cLevel yet */
int const cLevel = MIN(cLevelMin, cLevelMax); int const cLevel = MIN(cLevelMin, cLevelMax);
maxTestSize = FUZ_rLogLength(&lseed, testLog); maxTestSize = FUZ_rLogLength(&lseed, testLog);
oldTestLog = testLog;
/* random dictionary selection */ if (FUZ_rand(&lseed)&1) { /* simple init */
dictSize = ((FUZ_rand(&lseed)&63)==1) ? FUZ_rLogLength(&lseed, dictLog) : 0; int const compressionLevel = (FUZ_rand(&lseed) % 5) + 1;
{ size_t const dictStart = FUZ_rand(&lseed) % (srcBufferSize - dictSize); DISPLAYLEVEL(5, "Init with compression level = %i \n", compressionLevel);
dict = srcBuffer + dictStart; CHECK_Z( ZSTDMT_initCStream(zc, compressionLevel) );
} } else { /* advanced init */
{ U64 const pledgedSrcSize = (FUZ_rand(&lseed) & 3) ? ZSTD_CONTENTSIZE_UNKNOWN : maxTestSize; /* random dictionary selection */
ZSTD_parameters params = ZSTD_getParams(cLevel, pledgedSrcSize, dictSize); dictSize = ((FUZ_rand(&lseed)&63)==1) ? FUZ_rLogLength(&lseed, dictLog) : 0;
DISPLAYLEVEL(5, "Init with windowLog = %u, pledgedSrcSize = %u, dictSize = %u \n", { size_t const dictStart = FUZ_rand(&lseed) % (srcBufferSize - dictSize);
params.cParams.windowLog, (U32)pledgedSrcSize, (U32)dictSize); dict = srcBuffer + dictStart;
params.fParams.checksumFlag = FUZ_rand(&lseed) & 1; }
params.fParams.noDictIDFlag = FUZ_rand(&lseed) & 1; { U64 const pledgedSrcSize = (FUZ_rand(&lseed) & 3) ? ZSTD_CONTENTSIZE_UNKNOWN : maxTestSize;
params.fParams.contentSizeFlag = FUZ_rand(&lseed) & 1; ZSTD_parameters params = ZSTD_getParams(cLevel, pledgedSrcSize, dictSize);
DISPLAYLEVEL(5, "checksumFlag : %u \n", params.fParams.checksumFlag); DISPLAYLEVEL(5, "Init with windowLog = %u, pledgedSrcSize = %u, dictSize = %u \n",
CHECK_Z( ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_overlapSectionLog, FUZ_rand(&lseed) % 12) ); params.cParams.windowLog, (U32)pledgedSrcSize, (U32)dictSize);
CHECK_Z( ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_jobSize, FUZ_rand(&lseed) % (2*maxTestSize+1)) ); /* custome job size */ params.fParams.checksumFlag = FUZ_rand(&lseed) & 1;
CHECK_Z( ZSTDMT_initCStream_advanced(zc, dict, dictSize, params, pledgedSrcSize) ); params.fParams.noDictIDFlag = FUZ_rand(&lseed) & 1;
} } params.fParams.contentSizeFlag = FUZ_rand(&lseed) & 1;
DISPLAYLEVEL(5, "checksumFlag : %u \n", params.fParams.checksumFlag);
CHECK_Z( ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_overlapSectionLog, FUZ_rand(&lseed) % 12) );
CHECK_Z( ZSTDMT_setMTCtxParameter(zc, ZSTDMT_p_jobSize, FUZ_rand(&lseed) % (2*maxTestSize+1)) ); /* custome job size */
CHECK_Z( ZSTDMT_initCStream_advanced(zc, dict, dictSize, params, pledgedSrcSize) );
} } }
/* multi-segments compression test */ /* multi-segments compression test */
XXH64_reset(&xxhState, 0); XXH64_reset(&xxhState, 0);
@ -1326,10 +1342,13 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
} } } }
crcOrig = XXH64_digest(&xxhState); crcOrig = XXH64_digest(&xxhState);
cSize = outBuff.pos; cSize = outBuff.pos;
DISPLAYLEVEL(5, "Frame completed : %u bytes \n", (U32)cSize); DISPLAYLEVEL(5, "Frame completed : %u bytes compressed into %u bytes \n",
(U32)totalTestSize, (U32)cSize);
} }
/* multi - fragments decompression test */ /* multi - fragments decompression test */
assert(totalTestSize < dstBufferSize);
memset(dstBuffer, 170, totalTestSize); /* init dest area */
if (!dictSize /* don't reset if dictionary : could be different */ && (FUZ_rand(&lseed) & 1)) { if (!dictSize /* don't reset if dictionary : could be different */ && (FUZ_rand(&lseed) & 1)) {
CHECK_Z( ZSTD_resetDStream(zd) ); CHECK_Z( ZSTD_resetDStream(zd) );
} else { } else {
@ -1344,10 +1363,16 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
size_t const dstBuffSize = MIN(dstBufferSize - totalGenSize, randomDstSize); size_t const dstBuffSize = MIN(dstBufferSize - totalGenSize, randomDstSize);
inBuff.size = inBuff.pos + readCSrcSize; inBuff.size = inBuff.pos + readCSrcSize;
outBuff.size = outBuff.pos + dstBuffSize; outBuff.size = outBuff.pos + dstBuffSize;
DISPLAYLEVEL(6, "ZSTD_decompressStream input %u bytes \n", (U32)readCSrcSize); DISPLAYLEVEL(6, "ZSTD_decompressStream input %u bytes into outBuff %u bytes \n",
(U32)readCSrcSize, (U32)dstBuffSize);
decompressionResult = ZSTD_decompressStream(zd, &outBuff, &inBuff); decompressionResult = ZSTD_decompressStream(zd, &outBuff, &inBuff);
if (ZSTD_isError(decompressionResult)) {
DISPLAY("ZSTD_decompressStream error : %s \n", ZSTD_getErrorName(decompressionResult));
findDiff(copyBuffer, dstBuffer, totalTestSize);
}
CHECK (ZSTD_isError(decompressionResult), "decompression error : %s", ZSTD_getErrorName(decompressionResult)); CHECK (ZSTD_isError(decompressionResult), "decompression error : %s", ZSTD_getErrorName(decompressionResult));
DISPLAYLEVEL(6, "inBuff.pos = %u \n", (U32)readCSrcSize); DISPLAYLEVEL(6, "total ingested (inBuff.pos) = %u and produced (outBuff.pos) = %u \n",
(U32)inBuff.pos, (U32)outBuff.pos);
} }
CHECK (outBuff.pos != totalTestSize, "decompressed data : wrong size (%u != %u)", (U32)outBuff.pos, (U32)totalTestSize); CHECK (outBuff.pos != totalTestSize, "decompressed data : wrong size (%u != %u)", (U32)outBuff.pos, (U32)totalTestSize);
CHECK (inBuff.pos != cSize, "compressed data should be fully read (%u != %u)", (U32)inBuff.pos, (U32)cSize); CHECK (inBuff.pos != cSize, "compressed data should be fully read (%u != %u)", (U32)inBuff.pos, (U32)cSize);
@ -1586,7 +1611,11 @@ static int fuzzerTests_newAPI(U32 seed, U32 nbTests, unsigned startTest, double
} }
/* mess with frame parameters */ /* mess with frame parameters */
if (FUZ_rand(&lseed) & 1) CHECK_Z( setCCtxParameter(zc, cctxParams, ZSTD_p_checksumFlag, FUZ_rand(&lseed) & 1, useOpaqueAPI) ); if (FUZ_rand(&lseed) & 1) {
U32 const checksumFlag = FUZ_rand(&lseed) & 1;
DISPLAYLEVEL(5, "t%u: frame checksum : %u \n", testNb, checksumFlag);
CHECK_Z( setCCtxParameter(zc, cctxParams, ZSTD_p_checksumFlag, checksumFlag, useOpaqueAPI) );
}
if (FUZ_rand(&lseed) & 1) CHECK_Z( setCCtxParameter(zc, cctxParams, ZSTD_p_dictIDFlag, FUZ_rand(&lseed) & 1, useOpaqueAPI) ); if (FUZ_rand(&lseed) & 1) CHECK_Z( setCCtxParameter(zc, cctxParams, ZSTD_p_dictIDFlag, FUZ_rand(&lseed) & 1, useOpaqueAPI) );
if (FUZ_rand(&lseed) & 1) CHECK_Z( setCCtxParameter(zc, cctxParams, ZSTD_p_contentSizeFlag, FUZ_rand(&lseed) & 1, useOpaqueAPI) ); if (FUZ_rand(&lseed) & 1) CHECK_Z( setCCtxParameter(zc, cctxParams, ZSTD_p_contentSizeFlag, FUZ_rand(&lseed) & 1, useOpaqueAPI) );
if (FUZ_rand(&lseed) & 1) { if (FUZ_rand(&lseed) & 1) {
@ -1651,8 +1680,8 @@ static int fuzzerTests_newAPI(U32 seed, U32 nbTests, unsigned startTest, double
outBuff.size = outBuff.pos + dstBuffSize; outBuff.size = outBuff.pos + dstBuffSize;
CHECK_Z( ZSTD_compress_generic(zc, &outBuff, &inBuff, flush) ); CHECK_Z( ZSTD_compress_generic(zc, &outBuff, &inBuff, flush) );
DISPLAYLEVEL(6, "t%u: compress consumed %u bytes (total : %u) \n", DISPLAYLEVEL(6, "t%u: compress consumed %u bytes (total : %u) ; flush: %u (total : %u) \n",
testNb, (U32)inBuff.pos, (U32)(totalTestSize + inBuff.pos)); testNb, (U32)inBuff.pos, (U32)(totalTestSize + inBuff.pos), (U32)flush, (U32)outBuff.pos);
XXH64_update(&xxhState, srcBuffer+srcStart, inBuff.pos); XXH64_update(&xxhState, srcBuffer+srcStart, inBuff.pos);
memcpy(copyBuffer+totalTestSize, srcBuffer+srcStart, inBuff.pos); memcpy(copyBuffer+totalTestSize, srcBuffer+srcStart, inBuff.pos);
@ -1660,14 +1689,15 @@ static int fuzzerTests_newAPI(U32 seed, U32 nbTests, unsigned startTest, double
} }
/* final frame epilogue */ /* final frame epilogue */
{ size_t remainingToFlush = (size_t)(-1); { size_t remainingToFlush = 1;
while (remainingToFlush) { while (remainingToFlush) {
ZSTD_inBuffer inBuff = { NULL, 0, 0 }; ZSTD_inBuffer inBuff = { NULL, 0, 0 };
size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog+1); size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog+1);
size_t const adjustedDstSize = MIN(cBufferSize - cSize, randomDstSize); size_t const adjustedDstSize = MIN(cBufferSize - cSize, randomDstSize);
outBuff.size = outBuff.pos + adjustedDstSize; outBuff.size = outBuff.pos + adjustedDstSize;
DISPLAYLEVEL(6, "End-flush into dst buffer of size %u \n", (U32)adjustedDstSize); DISPLAYLEVEL(6, "t%u: End-flush into dst buffer of size %u \n", testNb, (U32)adjustedDstSize);
remainingToFlush = ZSTD_compress_generic(zc, &outBuff, &inBuff, ZSTD_e_end); remainingToFlush = ZSTD_compress_generic(zc, &outBuff, &inBuff, ZSTD_e_end);
DISPLAYLEVEL(6, "t%u: Total flushed so far : %u bytes \n", testNb, (U32)outBuff.pos);
CHECK( ZSTD_isError(remainingToFlush), CHECK( ZSTD_isError(remainingToFlush),
"ZSTD_compress_generic w/ ZSTD_e_end error : %s", "ZSTD_compress_generic w/ ZSTD_e_end error : %s",
ZSTD_getErrorName(remainingToFlush) ); ZSTD_getErrorName(remainingToFlush) );
@ -1694,14 +1724,20 @@ static int fuzzerTests_newAPI(U32 seed, U32 nbTests, unsigned startTest, double
size_t const dstBuffSize = MIN(dstBufferSize - totalGenSize, randomDstSize); size_t const dstBuffSize = MIN(dstBufferSize - totalGenSize, randomDstSize);
inBuff.size = inBuff.pos + readCSrcSize; inBuff.size = inBuff.pos + readCSrcSize;
outBuff.size = outBuff.pos + dstBuffSize; outBuff.size = outBuff.pos + dstBuffSize;
DISPLAYLEVEL(6, "ZSTD_decompressStream input %u bytes (pos:%u/%u)\n", DISPLAYLEVEL(6, "decompression presented %u new bytes (pos:%u/%u)\n",
(U32)readCSrcSize, (U32)inBuff.pos, (U32)cSize); (U32)readCSrcSize, (U32)inBuff.pos, (U32)cSize);
decompressionResult = ZSTD_decompressStream(zd, &outBuff, &inBuff); decompressionResult = ZSTD_decompressStream(zd, &outBuff, &inBuff);
DISPLAYLEVEL(6, "so far: consumed = %u, produced = %u \n",
(U32)inBuff.pos, (U32)outBuff.pos);
if (ZSTD_isError(decompressionResult)) {
DISPLAY("ZSTD_decompressStream error : %s \n", ZSTD_getErrorName(decompressionResult));
findDiff(copyBuffer, dstBuffer, totalTestSize);
}
CHECK (ZSTD_isError(decompressionResult), "decompression error : %s", ZSTD_getErrorName(decompressionResult)); CHECK (ZSTD_isError(decompressionResult), "decompression error : %s", ZSTD_getErrorName(decompressionResult));
DISPLAYLEVEL(6, "inBuff.pos = %u \n", (U32)readCSrcSize); CHECK (inBuff.pos > cSize, "ZSTD_decompressStream consumes too much input : %u > %u ", (U32)inBuff.pos, (U32)cSize);
} }
CHECK (outBuff.pos != totalTestSize, "decompressed data : wrong size (%u != %u)", (U32)outBuff.pos, (U32)totalTestSize);
CHECK (inBuff.pos != cSize, "compressed data should be fully read (%u != %u)", (U32)inBuff.pos, (U32)cSize); CHECK (inBuff.pos != cSize, "compressed data should be fully read (%u != %u)", (U32)inBuff.pos, (U32)cSize);
CHECK (outBuff.pos != totalTestSize, "decompressed data : wrong size (%u != %u)", (U32)outBuff.pos, (U32)totalTestSize);
{ U64 const crcDest = XXH64(dstBuffer, totalTestSize, 0); { U64 const crcDest = XXH64(dstBuffer, totalTestSize, 0);
if (crcDest!=crcOrig) findDiff(copyBuffer, dstBuffer, totalTestSize); if (crcDest!=crcOrig) findDiff(copyBuffer, dstBuffer, totalTestSize);
CHECK (crcDest!=crcOrig, "decompressed data corrupted"); CHECK (crcDest!=crcOrig, "decompressed data corrupted");