joined normal streaming API with advanced one
This commit is contained in:
parent
da1f3066a3
commit
6ced8f7c7c
@ -3788,8 +3788,15 @@ size_t ZSTD_initCStream(ZSTD_CStream* zcs, int compressionLevel)
|
||||
|
||||
/*====== Compression ======*/
|
||||
|
||||
MEM_STATIC size_t ZSTD_limitCopy(void* dst, size_t dstCapacity,
|
||||
const void* src, size_t srcSize)
|
||||
static size_t ZSTD_nextInputSizeHint(const ZSTD_CCtx* cctx)
|
||||
{
|
||||
size_t hintInSize = cctx->inBuffTarget - cctx->inBuffPos;
|
||||
if (hintInSize==0) hintInSize = cctx->blockSize;
|
||||
return hintInSize;
|
||||
}
|
||||
|
||||
static size_t ZSTD_limitCopy(void* dst, size_t dstCapacity,
|
||||
const void* src, size_t srcSize)
|
||||
{
|
||||
size_t const length = MIN(dstCapacity, srcSize);
|
||||
if (length) memcpy(dst, src, length);
|
||||
@ -3797,7 +3804,7 @@ MEM_STATIC size_t ZSTD_limitCopy(void* dst, size_t dstCapacity,
|
||||
}
|
||||
|
||||
/** ZSTD_compressStream_generic():
|
||||
* internal function for all *compressStream*() variants and *compress_generic()
|
||||
* internal function for all *compressStream*() variants
|
||||
* non-static, because can be called from zstdmt_compress.c
|
||||
* @return : hint size for next input */
|
||||
size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs,
|
||||
@ -3937,19 +3944,25 @@ size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs,
|
||||
input->pos = ip - istart;
|
||||
output->pos = op - ostart;
|
||||
if (zcs->frameEnded) return 0;
|
||||
{ size_t hintInSize = zcs->inBuffTarget - zcs->inBuffPos;
|
||||
if (hintInSize==0) hintInSize = zcs->blockSize;
|
||||
return hintInSize;
|
||||
return ZSTD_nextInputSizeHint(zcs);
|
||||
}
|
||||
|
||||
static size_t ZSTD_nextInputSizeHint_MTorST(const ZSTD_CCtx* cctx)
|
||||
{
|
||||
#ifdef ZSTD_MULTITHREAD
|
||||
if (cctx->appliedParams.nbWorkers >= 1) {
|
||||
assert(cctx->mtctx != NULL);
|
||||
return ZSTDMT_nextInputSizeHint(cctx->mtctx);
|
||||
}
|
||||
#endif
|
||||
return ZSTD_nextInputSizeHint(cctx);
|
||||
|
||||
}
|
||||
|
||||
size_t ZSTD_compressStream(ZSTD_CStream* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
|
||||
{
|
||||
/* check conditions */
|
||||
if (output->pos > output->size) return ERROR(GENERIC);
|
||||
if (input->pos > input->size) return ERROR(GENERIC);
|
||||
|
||||
return ZSTD_compressStream_generic(zcs, output, input, ZSTD_e_continue);
|
||||
CHECK_F( ZSTD_compressStream2(zcs, output, input, ZSTD_e_continue) );
|
||||
return ZSTD_nextInputSizeHint_MTorST(zcs);
|
||||
}
|
||||
|
||||
|
||||
@ -4005,6 +4018,7 @@ size_t ZSTD_compressStream2( ZSTD_CCtx* cctx,
|
||||
assert(cctx->streamStage == zcss_load);
|
||||
assert(cctx->appliedParams.nbWorkers == 0);
|
||||
} }
|
||||
/* end of transparent initialization stage */
|
||||
|
||||
/* compression stage */
|
||||
#ifdef ZSTD_MULTITHREAD
|
||||
@ -4054,6 +4068,10 @@ size_t ZSTD_compress2(ZSTD_CCtx* cctx,
|
||||
ZSTD_e_end);
|
||||
assert(iPos == srcSize);
|
||||
if (ZSTD_isError(result)) return result;
|
||||
if (result != 0) { /* compression not completed, due to lack of output space */
|
||||
assert(oPos == dstCapacity);
|
||||
return ERROR(dstSize_tooSmall);
|
||||
}
|
||||
return oPos;
|
||||
}
|
||||
|
||||
@ -4064,20 +4082,20 @@ size_t ZSTD_compress2(ZSTD_CCtx* cctx,
|
||||
size_t ZSTD_flushStream(ZSTD_CStream* zcs, ZSTD_outBuffer* output)
|
||||
{
|
||||
ZSTD_inBuffer input = { NULL, 0, 0 };
|
||||
if (output->pos > output->size) return ERROR(GENERIC);
|
||||
CHECK_F( ZSTD_compressStream_generic(zcs, output, &input, ZSTD_e_flush) );
|
||||
return zcs->outBuffContentSize - zcs->outBuffFlushedSize; /* remaining to flush */
|
||||
return ZSTD_compressStream2(zcs, output, &input, ZSTD_e_flush);
|
||||
}
|
||||
|
||||
|
||||
size_t ZSTD_endStream(ZSTD_CStream* zcs, ZSTD_outBuffer* output)
|
||||
{
|
||||
ZSTD_inBuffer input = { NULL, 0, 0 };
|
||||
if (output->pos > output->size) return ERROR(GENERIC);
|
||||
CHECK_F( ZSTD_compressStream_generic(zcs, output, &input, ZSTD_e_end) );
|
||||
size_t const remainingToFlush = ZSTD_compressStream2(zcs, output, &input, ZSTD_e_end);
|
||||
CHECK_F( remainingToFlush );
|
||||
if (zcs->appliedParams.nbWorkers > 0) return remainingToFlush; /* minimal estimation */
|
||||
/* single thread mode : attempt to calculate remaining to flush more precisely */
|
||||
{ size_t const lastBlockSize = zcs->frameEnded ? 0 : ZSTD_BLOCKHEADERSIZE;
|
||||
size_t const checksumSize = zcs->frameEnded ? 0 : zcs->appliedParams.fParams.checksumFlag * 4;
|
||||
size_t const toFlush = zcs->outBuffContentSize - zcs->outBuffFlushedSize + lastBlockSize + checksumSize;
|
||||
size_t const toFlush = remainingToFlush + lastBlockSize + checksumSize;
|
||||
DEBUGLOG(4, "ZSTD_endStream : remaining to flush : %u", (U32)toFlush);
|
||||
return toFlush;
|
||||
}
|
||||
|
@ -1844,7 +1844,9 @@ typedef struct {
|
||||
* Otherwise, we will load as many bytes as possible and instruct the caller
|
||||
* to continue as normal.
|
||||
*/
|
||||
static syncPoint_t findSynchronizationPoint(ZSTDMT_CCtx const* mtctx, ZSTD_inBuffer const input) {
|
||||
static syncPoint_t
|
||||
findSynchronizationPoint(ZSTDMT_CCtx const* mtctx, ZSTD_inBuffer const input)
|
||||
{
|
||||
BYTE const* const istart = (BYTE const*)input.src + input.pos;
|
||||
U64 const primePower = mtctx->rsync.primePower;
|
||||
U64 const hitMask = mtctx->rsync.hitMask;
|
||||
@ -1908,6 +1910,13 @@ static syncPoint_t findSynchronizationPoint(ZSTDMT_CCtx const* mtctx, ZSTD_inBuf
|
||||
return syncPoint;
|
||||
}
|
||||
|
||||
size_t ZSTDMT_nextInputSizeHint(const ZSTDMT_CCtx* mtctx)
|
||||
{
|
||||
size_t hintInSize = mtctx->targetSectionSize - mtctx->inBuff.filled;
|
||||
if (hintInSize==0) hintInSize = mtctx->targetSectionSize;
|
||||
return hintInSize;
|
||||
}
|
||||
|
||||
/** ZSTDMT_compressStream_generic() :
|
||||
* internal use only - exposed to be invoked from zstd_compress.c
|
||||
* assumption : output and input are valid (pos <= size)
|
||||
|
@ -60,6 +60,7 @@ ZSTDLIB_API size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
|
||||
ZSTDLIB_API size_t ZSTDMT_initCStream(ZSTDMT_CCtx* mtctx, int compressionLevel);
|
||||
ZSTDLIB_API size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* mtctx, unsigned long long pledgedSrcSize); /**< if srcSize is not known at reset time, use ZSTD_CONTENTSIZE_UNKNOWN. Note: for compatibility with older programs, 0 means the same as ZSTD_CONTENTSIZE_UNKNOWN, but it will change in the future to mean "empty" */
|
||||
|
||||
ZSTDLIB_API size_t ZSTDMT_nextInputSizeHint(const ZSTDMT_CCtx* mtctx);
|
||||
ZSTDLIB_API size_t ZSTDMT_compressStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input);
|
||||
|
||||
ZSTDLIB_API size_t ZSTDMT_flushStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output); /**< @return : 0 == all flushed; >0 : still some data to be flushed; or an error code (ZSTD_isError()) */
|
||||
|
41
lib/zstd.h
41
lib/zstd.h
@ -289,13 +289,17 @@ typedef struct ZSTD_outBuffer_s {
|
||||
* A ZSTD_CStream object is required to track streaming operation.
|
||||
* Use ZSTD_createCStream() and ZSTD_freeCStream() to create/release resources.
|
||||
* ZSTD_CStream objects can be reused multiple times on consecutive compression operations.
|
||||
* It is recommended to re-use ZSTD_CStream in situations where many streaming operations will be achieved consecutively,
|
||||
* since it will play nicer with system's memory, by re-using already allocated memory.
|
||||
* Use one separate ZSTD_CStream per thread for parallel execution.
|
||||
* It is recommended to re-use ZSTD_CStream since it will play nicer with system's memory, by re-using already allocated memory.
|
||||
*
|
||||
* Start a new compression by initializing ZSTD_CStream context.
|
||||
* Use ZSTD_initCStream() to start a new compression operation.
|
||||
* Use variants ZSTD_initCStream_usingDict() or ZSTD_initCStream_usingCDict() for streaming with dictionary (experimental section)
|
||||
* For parallel execution, use one separate ZSTD_CStream per thread.
|
||||
*
|
||||
* note : since v1.3.0, ZSTD_CStream and ZSTD_CCtx are the same thing.
|
||||
*
|
||||
* Parameters are sticky : when starting a new compression on the same context,
|
||||
* it will re-use the same sticky parameters as previous compression session.
|
||||
* It's recommended to initialize the context before every usage.
|
||||
* Use ZSTD_initCStream() to set the parameter to a selected compression level.
|
||||
* Use advanced API (ZSTD_CCtx_setParameter(), etc.) to set more detailed parameters.
|
||||
*
|
||||
* Use ZSTD_compressStream() as many times as necessary to consume input stream.
|
||||
* The function will automatically update both `pos` fields within `input` and `output`.
|
||||
@ -306,10 +310,10 @@ typedef struct ZSTD_outBuffer_s {
|
||||
* If not, the caller must make some room to receive more compressed data,
|
||||
* typically by emptying output buffer, or allocating a new output buffer,
|
||||
* and then present again remaining input data.
|
||||
* @return : a size hint, preferred nb of bytes to use as input for next function call
|
||||
* or an error code, which can be tested using ZSTD_isError().
|
||||
* Note 1 : it's just a hint, to help latency a little, any other value will work fine.
|
||||
* Note 2 : size hint is guaranteed to be <= ZSTD_CStreamInSize()
|
||||
* @return : a size hint, preferred nb of bytes to use as input for next function call
|
||||
* or an error code, which can be tested using ZSTD_isError().
|
||||
* Note 1 : it's just a hint, to help latency a little, any value will work fine.
|
||||
* Note 2 : size hint is guaranteed to be <= ZSTD_CStreamInSize()
|
||||
*
|
||||
* At any moment, it's possible to flush whatever data might remain stuck within internal buffer,
|
||||
* using ZSTD_flushStream(). `output->pos` will be updated.
|
||||
@ -757,21 +761,22 @@ typedef enum {
|
||||
ZSTD_e_flush=1, /* flush any data provided so far,
|
||||
* it creates (at least) one new block, that can be decoded immediately on reception;
|
||||
* frame will continue: any future data can still reference previously compressed data, improving compression. */
|
||||
ZSTD_e_end=2 /* flush any remaining data and close current frame.
|
||||
* any additional data starts a new frame.
|
||||
* each frame is independent (does not reference any content from previous frame). */
|
||||
ZSTD_e_end=2 /* flush any remaining data _and_ close current frame.
|
||||
* note that frame is only closed after compressed data is fully flushed (return value == 0).
|
||||
* After that point, any additional data starts a new frame.
|
||||
* note : each frame is independent (does not reference any content from previous frame). */
|
||||
} ZSTD_EndDirective;
|
||||
|
||||
/*! ZSTD_compressStream2() :
|
||||
* Behave about the same as ZSTD_compressStream, with additional control on end directive.
|
||||
* Behaves about the same as ZSTD_compressStream, with additional control on end directive.
|
||||
* - Compression parameters are pushed into CCtx before starting compression, using ZSTD_CCtx_set*()
|
||||
* - Compression parameters cannot be changed once compression is started (save a list of exceptions in multi-threading mode)
|
||||
* - outpot->pos must be <= dstCapacity, input->pos must be <= srcSize
|
||||
* - outpot->pos and input->pos will be updated. They are guaranteed to remain below their respective limit.
|
||||
* - In single-thread mode (default), function is blocking : it completes its job before returning to caller.
|
||||
* - In multi-thread mode, function is non-blocking : it just acquires a copy of input, and distribute job to internal worker threads,
|
||||
* and then immediately returns, just indicating that there is some data remaining to be flushed.
|
||||
* The function nonetheless guarantees forward progress : it will return only after it reads or write at least 1+ byte.
|
||||
* - When nbWorkers==0 (default), function is blocking : it completes its job before returning to caller.
|
||||
* - When nbWorkers>=1, function is non-blocking : it just acquires a copy of input, and distributes jobs to internal worker threads,
|
||||
* and then immediately returns, just indicating that there is some data remaining to be flushed.
|
||||
* The function nonetheless guarantees forward progress : it will return only after it reads or write at least 1+ byte.
|
||||
* - Exception : if the first call requests a ZSTD_e_end directive, the function delegates to ZSTD_compress2() which is always blocking.
|
||||
* - @return provides a minimum amount of data remaining to be flushed from internal buffers
|
||||
* or an error code, which can be tested using ZSTD_isError().
|
||||
|
@ -233,11 +233,9 @@ static int FUZ_mallocTests_internal(unsigned seed, double compressibility, unsig
|
||||
mallocCounter_t malcount = INIT_MALLOC_COUNTER;
|
||||
ZSTD_customMem const cMem = { FUZ_mallocDebug, FUZ_freeDebug, &malcount };
|
||||
ZSTD_CCtx* const cctx = ZSTD_createCCtx_advanced(cMem);
|
||||
ZSTD_outBuffer out = { outBuffer, outSize, 0 };
|
||||
ZSTD_inBuffer in = { inBuffer, inSize, 0 };
|
||||
CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_compressionLevel, compressionLevel) );
|
||||
CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_nbWorkers, nbThreads) );
|
||||
while ( ZSTD_compressStream2(cctx, &out, &in, ZSTD_e_end) ) {}
|
||||
CHECK_Z( ZSTD_compress2(cctx, outBuffer, outSize, inBuffer, inSize) );
|
||||
ZSTD_freeCCtx(cctx);
|
||||
DISPLAYLEVEL(3, "compress_generic,-T%u,end level %i : ",
|
||||
nbThreads, compressionLevel);
|
||||
@ -1253,12 +1251,12 @@ static int basicUnitTests(U32 seed, double compressibility)
|
||||
if (ZSTD_isError(cSize_1pass)) goto _output_error;
|
||||
|
||||
CHECK( ZSTD_CCtx_setParameter(cctx, ZSTD_p_compressionLevel, compressionLevel) );
|
||||
{ ZSTD_inBuffer in = { CNBuffer, srcSize, 0 };
|
||||
ZSTD_outBuffer out = { compressedBuffer, compressedBufferSize, 0 };
|
||||
size_t const compressionResult = ZSTD_compressStream2(cctx, &out, &in, ZSTD_e_end);
|
||||
DISPLAYLEVEL(5, "simple=%zu vs %zu=advanced : ", cSize_1pass, out.pos);
|
||||
{ size_t const compressionResult = ZSTD_compress2(cctx,
|
||||
compressedBuffer, compressedBufferSize,
|
||||
CNBuffer, srcSize);
|
||||
DISPLAYLEVEL(5, "simple=%zu vs %zu=advanced : ", cSize_1pass, compressionResult);
|
||||
if (ZSTD_isError(compressionResult)) goto _output_error;
|
||||
if (out.pos != cSize_1pass) goto _output_error;
|
||||
if (compressionResult != cSize_1pass) goto _output_error;
|
||||
} }
|
||||
ZSTD_freeCCtx(cctx);
|
||||
}
|
||||
@ -1274,13 +1272,12 @@ static int basicUnitTests(U32 seed, double compressibility)
|
||||
CHECK( ZSTD_CCtx_setParameter(cctx, ZSTD_p_compressionLevel, 2) );
|
||||
CHECK( ZSTD_CCtx_setParameter(cctx, ZSTD_p_enableLongDistanceMatching, 1) );
|
||||
CHECK( ZSTD_CCtx_setParameter(cctx, ZSTD_p_windowLog, 18) );
|
||||
{ ZSTD_inBuffer in = { CNBuffer, inputSize, 0 };
|
||||
ZSTD_outBuffer out = { compressedBuffer, ZSTD_compressBound(inputSize), 0 };
|
||||
size_t const result = ZSTD_compressStream2(cctx, &out, &in, ZSTD_e_end);
|
||||
if (result != 0) goto _output_error;
|
||||
if (in.pos != in.size) goto _output_error;
|
||||
cSize = out.pos;
|
||||
xxh64 = XXH64(out.dst, out.pos, 0);
|
||||
{ size_t const compressedSize = ZSTD_compress2(cctx,
|
||||
compressedBuffer, ZSTD_compressBound(inputSize),
|
||||
CNBuffer, inputSize);
|
||||
CHECK(compressedSize);
|
||||
cSize = compressedSize;
|
||||
xxh64 = XXH64(compressedBuffer, compressedSize, 0);
|
||||
}
|
||||
DISPLAYLEVEL(3, "OK (compress : %u -> %u bytes)\n", (U32)inputSize, (U32)cSize);
|
||||
ZSTD_freeCCtx(cctx);
|
||||
@ -1291,14 +1288,13 @@ static int basicUnitTests(U32 seed, double compressibility)
|
||||
CHECK( ZSTD_CCtx_setParameter(cctx, ZSTD_p_windowLog, 18) );
|
||||
CHECK( ZSTD_CCtx_setParameter(cctx, ZSTD_p_enableLongDistanceMatching, 1) );
|
||||
CHECK( ZSTD_CCtx_setParameter(cctx, ZSTD_p_compressionLevel, 2) );
|
||||
{ ZSTD_inBuffer in = { CNBuffer, inputSize, 0 };
|
||||
ZSTD_outBuffer out = { compressedBuffer, ZSTD_compressBound(inputSize), 0 };
|
||||
size_t const result = ZSTD_compressStream2(cctx, &out, &in, ZSTD_e_end);
|
||||
if (result != 0) goto _output_error;
|
||||
if (in.pos != in.size) goto _output_error;
|
||||
if (out.pos != cSize) goto _output_error; /* must result in same compressed result, hence same size */
|
||||
if (XXH64(out.dst, out.pos, 0) != xxh64) goto _output_error; /* must result in exactly same content, hence same hash */
|
||||
DISPLAYLEVEL(3, "OK (compress : %u -> %u bytes)\n", (U32)inputSize, (U32)out.pos);
|
||||
{ size_t const result = ZSTD_compress2(cctx,
|
||||
compressedBuffer, ZSTD_compressBound(inputSize),
|
||||
CNBuffer, inputSize);
|
||||
CHECK(result);
|
||||
if (result != cSize) goto _output_error; /* must result in same compressed result, hence same size */
|
||||
if (XXH64(compressedBuffer, result, 0) != xxh64) goto _output_error; /* must result in exactly same content, hence same hash */
|
||||
DISPLAYLEVEL(3, "OK (compress : %u -> %u bytes)\n", (U32)inputSize, (U32)result);
|
||||
}
|
||||
ZSTD_freeCCtx(cctx);
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user