diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index e218b637..d14a1cde 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -32,7 +32,7 @@ */ -/* ******************************************************* +/*-******************************************************* * Compiler specifics *********************************************************/ #ifdef _MSC_VER /* Visual Studio */ @@ -55,7 +55,7 @@ #include "mem.h" #define XXH_STATIC_LINKING_ONLY /* XXH64_state_t */ #include "xxhash.h" /* XXH_reset, update, digest */ -#define FSE_STATIC_LINKING_ONLY +#define FSE_STATIC_LINKING_ONLY /* FSE_encodeSymbol */ #include "fse.h" #define HUF_STATIC_LINKING_ONLY #include "huf.h" @@ -2758,6 +2758,274 @@ ZSTDLIB_API size_t ZSTD_compress_usingCDict(ZSTD_CCtx* cctx, +/*-====== Streaming ======-*/ + +typedef enum { zcss_init, zcss_load, zcss_flush, zcss_final } ZSTD_cStreamStage; + +struct ZSTD_CStream_s { + ZSTD_CCtx* zc; + char* inBuff; + size_t inBuffSize; + size_t inToCompress; + size_t inBuffPos; + size_t inBuffTarget; + size_t blockSize; + char* outBuff; + size_t outBuffSize; + size_t outBuffContentSize; + size_t outBuffFlushedSize; + ZSTD_cStreamStage stage; + U32 checksum; + U32 frameEnded; + ZSTD_customMem customMem; +}; /* typedef'd to ZSTD_CStream within "zstd.h" */ + +ZSTD_CStream* ZSTD_createCStream(void) +{ + return ZSTD_createCStream_advanced(defaultCustomMem); +} + +ZSTD_CStream* ZSTD_createCStream_advanced(ZSTD_customMem customMem) +{ + ZSTD_CStream* zcs; + + if (!customMem.customAlloc && !customMem.customFree) + customMem = defaultCustomMem; + + if (!customMem.customAlloc || !customMem.customFree) + return NULL; + + zcs = (ZSTD_CStream*)customMem.customAlloc(customMem.opaque, sizeof(ZSTD_CStream)); + if (zcs==NULL) return NULL; + memset(zcs, 0, sizeof(ZSTD_CStream)); + memcpy(&zcs->customMem, &customMem, sizeof(ZSTD_customMem)); + zcs->zc = ZSTD_createCCtx_advanced(customMem); + if (zcs->zc == NULL) { ZSTD_freeCStream(zcs); return NULL; } + return zcs; +} + +size_t ZSTD_freeCStream(ZSTD_CStream* zcs) +{ + if (zcs==NULL) return 0; /* support free on NULL */ + ZSTD_freeCCtx(zcs->zc); + if (zcs->inBuff) zcs->customMem.customFree(zcs->customMem.opaque, zcs->inBuff); + if (zcs->outBuff) zcs->customMem.customFree(zcs->customMem.opaque, zcs->outBuff); + zcs->customMem.customFree(zcs->customMem.opaque, zcs); + return 0; +} + + +/* Initialization */ + +size_t ZSTD_initCStream_advanced(ZSTD_CStream* zcs, + const void* dict, size_t dictSize, + ZSTD_parameters params, unsigned long long pledgedSrcSize) +{ + /* allocate buffers */ + { size_t const neededInBuffSize = (size_t)1 << params.cParams.windowLog; + if (zcs->inBuffSize < neededInBuffSize) { + zcs->inBuffSize = neededInBuffSize; + zcs->customMem.customFree(zcs->customMem.opaque, zcs->inBuff); /* should not be necessary */ + zcs->inBuff = (char*)zcs->customMem.customAlloc(zcs->customMem.opaque, neededInBuffSize); + if (zcs->inBuff == NULL) return ERROR(memory_allocation); + } + zcs->blockSize = MIN(ZSTD_BLOCKSIZE_ABSOLUTEMAX, neededInBuffSize); + } + if (zcs->outBuffSize < ZSTD_compressBound(zcs->blockSize)+1) { + zcs->outBuffSize = ZSTD_compressBound(zcs->blockSize)+1; + zcs->customMem.customFree(zcs->customMem.opaque, zcs->outBuff); /* should not be necessary */ + zcs->outBuff = (char*)zcs->customMem.customAlloc(zcs->customMem.opaque, zcs->outBuffSize); + if (zcs->outBuff == NULL) return ERROR(memory_allocation); + } + + { size_t const errorCode = ZSTD_compressBegin_advanced(zcs->zc, dict, dictSize, params, pledgedSrcSize); + if (ZSTD_isError(errorCode)) return errorCode; } + + zcs->inToCompress = 0; + zcs->inBuffPos = 0; + zcs->inBuffTarget = zcs->blockSize; + zcs->outBuffContentSize = zcs->outBuffFlushedSize = 0; + zcs->stage = zcss_load; + zcs->checksum = params.fParams.checksumFlag > 0; + zcs->frameEnded = 0; + return 0; /* ready to go */ +} + +size_t ZSTD_initCStream_usingDict(ZSTD_CStream* zcs, const void* dict, size_t dictSize, int compressionLevel) +{ + ZSTD_parameters const params = ZSTD_getParams(compressionLevel, 0, dictSize); + return ZSTD_initCStream_advanced(zcs, dict, dictSize, params, 0); +} + +size_t ZSTD_initCStream(ZSTD_CStream* zcs, int compressionLevel) +{ + return ZSTD_initCStream_usingDict(zcs, NULL, 0, compressionLevel); +} + + +/* Compression */ + +typedef enum { zsf_gather, zsf_flush, zsf_end } ZSTD_flush_e; + +MEM_STATIC size_t ZSTD_limitCopy(void* dst, size_t dstCapacity, const void* src, size_t srcSize) +{ + size_t const length = MIN(dstCapacity, srcSize); + memcpy(dst, src, length); + return length; +} + +static size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs, + void* dst, size_t* dstCapacityPtr, + const void* src, size_t* srcSizePtr, + ZSTD_flush_e const flush) +{ + U32 someMoreWork = 1; + const char* const istart = (const char*)src; + const char* const iend = istart + *srcSizePtr; + const char* ip = istart; + char* const ostart = (char*)dst; + char* const oend = ostart + *dstCapacityPtr; + char* op = ostart; + + while (someMoreWork) { + switch(zcs->stage) + { + case zcss_init: return ERROR(init_missing); /* call ZBUFF_compressInit() first ! */ + + case zcss_load: + /* complete inBuffer */ + { size_t const toLoad = zcs->inBuffTarget - zcs->inBuffPos; + size_t const loaded = ZSTD_limitCopy(zcs->inBuff + zcs->inBuffPos, toLoad, ip, iend-ip); + zcs->inBuffPos += loaded; + ip += loaded; + if ( (zcs->inBuffPos==zcs->inToCompress) || (!flush && (toLoad != loaded)) ) { + someMoreWork = 0; break; /* not enough input to get a full block : stop there, wait for more */ + } } + /* compress current block (note : this stage cannot be stopped in the middle) */ + { void* cDst; + size_t cSize; + size_t const iSize = zcs->inBuffPos - zcs->inToCompress; + size_t oSize = oend-op; + if (oSize >= ZSTD_compressBound(iSize)) + cDst = op; /* compress directly into output buffer (avoid flush stage) */ + else + cDst = zcs->outBuff, oSize = zcs->outBuffSize; + cSize = (flush == zsf_end) ? + ZSTD_compressEnd(zcs->zc, cDst, oSize, zcs->inBuff + zcs->inToCompress, iSize) : + ZSTD_compressContinue(zcs->zc, cDst, oSize, zcs->inBuff + zcs->inToCompress, iSize); + if (ZSTD_isError(cSize)) return cSize; + if (flush == zsf_end) zcs->frameEnded = 1; + /* prepare next block */ + zcs->inBuffTarget = zcs->inBuffPos + zcs->blockSize; + if (zcs->inBuffTarget > zcs->inBuffSize) + zcs->inBuffPos = 0, zcs->inBuffTarget = zcs->blockSize; /* note : inBuffSize >= blockSize */ + zcs->inToCompress = zcs->inBuffPos; + if (cDst == op) { op += cSize; break; } /* no need to flush */ + zcs->outBuffContentSize = cSize; + zcs->outBuffFlushedSize = 0; + zcs->stage = zcss_flush; /* pass-through to flush stage */ + } + + case zcss_flush: + { size_t const toFlush = zcs->outBuffContentSize - zcs->outBuffFlushedSize; + size_t const flushed = ZSTD_limitCopy(op, oend-op, zcs->outBuff + zcs->outBuffFlushedSize, toFlush); + op += flushed; + zcs->outBuffFlushedSize += flushed; + if (toFlush!=flushed) { someMoreWork = 0; break; } /* dst too small to store flushed data : stop there */ + zcs->outBuffContentSize = zcs->outBuffFlushedSize = 0; + zcs->stage = zcss_load; + break; + } + + case zcss_final: + someMoreWork = 0; /* do nothing */ + break; + + default: + return ERROR(GENERIC); /* impossible */ + } + } + + *srcSizePtr = ip - istart; + *dstCapacityPtr = op - ostart; + if (zcs->frameEnded) return 0; + { size_t hintInSize = zcs->inBuffTarget - zcs->inBuffPos; + if (hintInSize==0) hintInSize = zcs->blockSize; + return hintInSize; + } +} + +size_t ZSTD_compressStream(ZSTD_CStream* zcs, ZSTD_wCursor* output, ZSTD_rCursor* input) +{ + size_t sizeRead = input->size; + size_t sizeWritten = output->size; + size_t const result = ZSTD_compressStream_generic(zcs, output->ptr, &sizeWritten, input->ptr, &sizeRead, zsf_gather); + input->ptr = (const char*)(input->ptr) + sizeRead; + input->size -= sizeRead; + output->ptr = (char*)(output->ptr) + sizeWritten; + output->size -= sizeWritten; + output->nbBytesWritten += sizeWritten; + return result; +} + + +/* Finalize */ + +/*! ZSTD_flushStream() : +* @return : amount of data remaining to flush */ +size_t ZSTD_flushStream(ZSTD_CStream* zcs, ZSTD_wCursor* output) +{ + size_t srcSize = 0; + size_t sizeWritten = output->size; + size_t const result = ZSTD_compressStream_generic(zcs, output->ptr, &sizeWritten, &srcSize, &srcSize, zsf_flush); /* use a valid src address instead of NULL */ + output->ptr = (char*)(output->ptr) + sizeWritten; + output->size -= sizeWritten; + output->nbBytesWritten += sizeWritten; + if (ZSTD_isError(result)) return result; + return zcs->outBuffContentSize - zcs->outBuffFlushedSize; /* remaining to flush */ +} + + +size_t ZSTD_endStream(ZSTD_CStream* zcs, ZSTD_wCursor* output) +{ + BYTE* const ostart = (BYTE*)(output->ptr); + BYTE* const oend = ostart + output->size; + BYTE* op = ostart; + + if (zcs->stage != zcss_final) { + /* flush whatever remains */ + size_t srcSize = 0; + size_t sizeWritten = output->size; + size_t const notEnded = ZSTD_compressStream_generic(zcs, ostart, &sizeWritten, &srcSize, &srcSize, zsf_end); /* use a valid src address instead of NULL */ + size_t const remainingToFlush = zcs->outBuffContentSize - zcs->outBuffFlushedSize; + op += sizeWritten; + if (remainingToFlush) { + output->ptr = op; + output->size -= sizeWritten; + output->nbBytesWritten += sizeWritten; + return remainingToFlush + ZSTD_BLOCKHEADERSIZE /* final empty block */ + (zcs->checksum * 4); + } + /* create epilogue */ + zcs->stage = zcss_final; + zcs->outBuffContentSize = !notEnded ? 0 : + ZSTD_compressEnd(zcs->zc, zcs->outBuff, zcs->outBuffSize, NULL, 0); /* write epilogue, including final empty block, into outBuff */ + } + + /* flush epilogue */ + { size_t const toFlush = zcs->outBuffContentSize - zcs->outBuffFlushedSize; + size_t const flushed = ZSTD_limitCopy(op, oend-op, zcs->outBuff + zcs->outBuffFlushedSize, toFlush); + op += flushed; + zcs->outBuffFlushedSize += flushed; + output->ptr = op; + output->size = oend-op; + output->nbBytesWritten += op-ostart; + if (toFlush==flushed) zcs->stage = zcss_init; /* end reached */ + return toFlush - flushed; + } +} + + + /*-===== Pre-defined compression levels =====-*/ #define ZSTD_DEFAULT_CLEVEL 1 diff --git a/lib/zstd.h b/lib/zstd.h index 7bd8a05c..3c563e38 100644 --- a/lib/zstd.h +++ b/lib/zstd.h @@ -323,7 +323,52 @@ ZSTDLIB_API size_t ZSTD_sizeofDCtx(const ZSTD_DCtx* dctx); /* ****************************************************************** -* Buffer-less streaming functions (synchronous mode) +* Streaming +********************************************************************/ + +typedef struct ZSTD_readCursor_s { + const void* ptr; /* position of cursor - update to new position */ + size_t size; /* remaining buffer size to read - update preserves end of buffer */ +} ZSTD_rCursor; + +typedef struct ZSTD_writeCursor_s { + void* ptr; /* position of cursor - update to new position */ + size_t size; /* remaining buffer size to write - update preserves end of buffer */ + size_t nbBytesWritten; /* already written bytes - update adds bytes newly written (accumulator) */ +} ZSTD_wCursor; + + +/*====== compression ======*/ + +typedef struct ZSTD_CStream_s ZSTD_CStream; +ZSTD_CStream* ZSTD_createCStream(void); +size_t ZSTD_freeCStream(ZSTD_CStream* zcs); + +size_t ZSTD_initCStream(ZSTD_CStream* zcs, int compressionLevel); +size_t ZSTD_compressStream(ZSTD_CStream* zcs, ZSTD_wCursor* output, ZSTD_rCursor* input); +size_t ZSTD_flushStream(ZSTD_CStream* zcs, ZSTD_wCursor* output); +size_t ZSTD_endStream(ZSTD_CStream* zcs, ZSTD_wCursor* output); + +/* advanced */ +ZSTD_CStream* ZSTD_createCStream_advanced(ZSTD_customMem customMem); +size_t ZSTD_initCStream_usingDict(ZSTD_CStream* zcs, const void* dict, size_t dictSize, int compressionLevel); +size_t ZSTD_initCStream_advanced(ZSTD_CStream* zcs, const void* dict, size_t dictSize, + ZSTD_parameters params, unsigned long long pledgedSrcSize); + + +/*====== decompression ======*/ + +typedef struct ZSTD_DStream_s ZSTD_DStream; +ZSTD_DStream* ZSTD_createDStream(void); +size_t ZSTD_freeDStream(ZSTD_DStream* zds); + +size_t ZSTD_initDStream(ZSTD_DStream* zds); +size_t ZSTD_decompressStream(ZSTD_DStream* zds, ZSTD_wCursor* output, ZSTD_rCursor* input); + + + +/* ****************************************************************** +* Buffer-less and synchronous inner streaming functions ********************************************************************/ /* This is an advanced API, giving full control over buffer management, for users which need direct control over memory. * But it's also a complex one, with a lot of restrictions (documented below).