diff --git a/doc/zstd_manual.html b/doc/zstd_manual.html index a47b7142..0fa5c5d7 100644 --- a/doc/zstd_manual.html +++ b/doc/zstd_manual.html @@ -687,6 +687,8 @@ size_t ZSTD_insertBlock(ZSTD_DCtx* dctx, const void* blockStart, size_t blockSiz for a new compression operation. `maxChunkSize` indicates the size at which to automatically start a new seekable frame. `maxChunkSize == 0` implies the default maximum size. + `checksumFlag` indicates whether or not the seek table should include chunk + checksums on the uncompressed data for verification. @return : a size hint for input to provide for compression, or an error code checkable with ZSTD_isError() @@ -714,7 +716,7 @@ size_t ZSTD_insertBlock(ZSTD_DCtx* dctx, const void* blockStart, size_t blockSiz
ZSTD_seekable_CStream* ZSTD_seekable_createCStream(void); size_t ZSTD_seekable_freeCStream(ZSTD_seekable_CStream* zcs);
size_t ZSTD_seekable_initCStream(ZSTD_seekable_CStream* zcs, int compressionLevel, unsigned maxChunkSize); +Seekable compression functions
size_t ZSTD_seekable_initCStream(ZSTD_seekable_CStream* zcs, int compressionLevel, int checksumFlag, unsigned maxChunkSize); size_t ZSTD_seekable_compressStream(ZSTD_seekable_CStream* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input); size_t ZSTD_seekable_endChunk(ZSTD_seekable_CStream* zcs, ZSTD_outBuffer* output); size_t ZSTD_seekable_endStream(ZSTD_seekable_CStream* zcs, ZSTD_outBuffer* output); diff --git a/examples/seekable_compression.c b/examples/seekable_compression.c index 5ab36c3c..f4bceb10 100644 --- a/examples/seekable_compression.c +++ b/examples/seekable_compression.c @@ -68,7 +68,7 @@ static void compressFile_orDie(const char* fname, const char* outName, int cLeve ZSTD_seekable_CStream* const cstream = ZSTD_seekable_createCStream(); if (cstream==NULL) { fprintf(stderr, "ZSTD_seekable_createCStream() error \n"); exit(10); } - size_t const initResult = ZSTD_seekable_initCStream(cstream, cLevel, chunkSize); + size_t const initResult = ZSTD_seekable_initCStream(cstream, cLevel, 1, chunkSize); if (ZSTD_isError(initResult)) { fprintf(stderr, "ZSTD_seekable_initCStream() error : %s \n", ZSTD_getErrorName(initResult)); exit(11); } size_t read, toRead = buffInSize; @@ -77,15 +77,16 @@ static void compressFile_orDie(const char* fname, const char* outName, int cLeve while (input.pos < input.size) { ZSTD_outBuffer output = { buffOut, buffOutSize, 0 }; toRead = ZSTD_seekable_compressStream(cstream, &output , &input); /* toRead is guaranteed to be <= ZSTD_CStreamInSize() */ - if (ZSTD_isError(toRead)) { fprintf(stderr, "ZSTD_compressStream() error : %s \n", ZSTD_getErrorName(toRead)); exit(12); } + if (ZSTD_isError(toRead)) { fprintf(stderr, "ZSTD_seekable_compressStream() error : %s \n", ZSTD_getErrorName(toRead)); exit(12); } if (toRead > buffInSize) toRead = buffInSize; /* Safely handle case when `buffInSize` is manually changed to a value < ZSTD_CStreamInSize()*/ fwrite_orDie(buffOut, output.pos, fout); } } - ZSTD_outBuffer output = { buffOut, buffOutSize, 0 }; while (1) { + ZSTD_outBuffer output = { buffOut, buffOutSize, 0 }; size_t const remainingToFlush = ZSTD_seekable_endStream(cstream, &output); /* close stream */ + if (ZSTD_isError(remainingToFlush)) { fprintf(stderr, "ZSTD_seekable_endStream() error : %s \n", ZSTD_getErrorName(remainingToFlush)); exit(13); } fwrite_orDie(buffOut, output.pos, fout); if (!remainingToFlush) break; } @@ -123,4 +124,6 @@ int main(int argc, const char** argv) { const char* const outFileName = createOutFilename_orDie(inFileName); compressFile_orDie(inFileName, outFileName, 5, chunkSize); } + + return 0; } diff --git a/examples/seekable_decompression.c b/examples/seekable_decompression.c index 97563c3d..641e0429 100644 --- a/examples/seekable_decompression.c +++ b/examples/seekable_decompression.c @@ -143,6 +143,7 @@ static void decompressFile_orDie(const char* fname, unsigned startOffset, unsign toRead = result; } fwrite_orDie(buffOut, output.pos, fout); + if (toRead > buffInSize) toRead = buffInSize; } } while (result > 0); @@ -171,5 +172,6 @@ int main(int argc, const char** argv) unsigned const endOffset = (unsigned) atoi(argv[3]); decompressFile_orDie(inFilename, startOffset, endOffset); } + return 0; } diff --git a/lib/compress/zstdseek_compress.c b/lib/compress/zstdseek_compress.c index 284724e5..f9e108af 100644 --- a/lib/compress/zstdseek_compress.c +++ b/lib/compress/zstdseek_compress.c @@ -39,6 +39,8 @@ struct ZSTD_seekable_CStream_s { U32 maxChunkSize; int checksumFlag; + + int writingSeekTable; }; ZSTD_seekable_CStream* ZSTD_seekable_createCStream() @@ -52,6 +54,7 @@ ZSTD_seekable_CStream* ZSTD_seekable_createCStream() zcs->cstream = ZSTD_createCStream(); if (zcs->cstream == NULL) goto failed1; + /* allocate some initial space */ { size_t const CHUNKLOG_STARTING_CAPACITY = 16; zcs->chunklog.entries = malloc(sizeof(chunklogEntry_t) * CHUNKLOG_STARTING_CAPACITY); @@ -70,7 +73,7 @@ failed1: size_t ZSTD_seekable_freeCStream(ZSTD_seekable_CStream* zcs) { - if (zcs == NULL) return 0; /* support free on NULL */ + if (zcs == NULL) return 0; /* support free on null */ ZSTD_freeCStream(zcs->cstream); free(zcs->chunklog.entries); free(zcs); @@ -80,12 +83,14 @@ size_t ZSTD_seekable_freeCStream(ZSTD_seekable_CStream* zcs) size_t ZSTD_seekable_initCStream(ZSTD_seekable_CStream* zcs, int compressionLevel, + int checksumFlag, U32 maxChunkSize) { zcs->chunklog.size = 0; zcs->chunkCSize = 0; zcs->chunkDSize = 0; + /* make sure maxChunkSize has a reasonable value */ if (maxChunkSize > ZSTD_SEEKABLE_MAX_CHUNK_DECOMPRESSED_SIZE) { return ERROR(compressionParameter_unsupported); } @@ -94,11 +99,13 @@ size_t ZSTD_seekable_initCStream(ZSTD_seekable_CStream* zcs, ? maxChunkSize : ZSTD_SEEKABLE_MAX_CHUNK_DECOMPRESSED_SIZE; - zcs->checksumFlag = 0; + zcs->checksumFlag = checksumFlag; if (zcs->checksumFlag) { XXH64_reset(&zcs->xxhState, 0); } + zcs->writingSeekTable = 0; + return ZSTD_initCStream(zcs->cstream, compressionLevel); } @@ -114,10 +121,13 @@ static size_t ZSTD_seekable_logChunk(ZSTD_seekable_CStream* zcs) }; if (zcs->checksumFlag) zcs->chunklog.entries[zcs->chunklog.size].checksum = + /* take lower 32 bits of digest */ XXH64_digest(&zcs->xxhState) & 0xFFFFFFFFU; zcs->chunklog.size++; + /* grow the buffer if required */ if (zcs->chunklog.size == zcs->chunklog.capacity) { + /* exponential size increase for constant amortized runtime */ size_t const newCapacity = zcs->chunklog.capacity * 2; chunklogEntry_t* const newEntries = realloc(zcs->chunklog.entries, sizeof(chunklogEntry_t) * newCapacity); @@ -134,6 +144,7 @@ static size_t ZSTD_seekable_logChunk(ZSTD_seekable_CStream* zcs) size_t ZSTD_seekable_endChunk(ZSTD_seekable_CStream* zcs, ZSTD_outBuffer* output) { size_t const prevOutPos = output->pos; + /* end the frame */ size_t ret = ZSTD_endStream(zcs->cstream, output); zcs->chunkCSize += output->pos - prevOutPos; @@ -165,6 +176,7 @@ size_t ZSTD_seekable_compressStream(ZSTD_seekable_CStream* zcs, ZSTD_outBuffer* inLen = MIN(inLen, (size_t)(zcs->maxChunkSize - zcs->chunkDSize)); + /* if we haven't finished flushing the last chunk, don't start writing a new one */ if (inLen > 0) { ZSTD_inBuffer inTmp = { inBase, inLen, 0 }; size_t const prevOutPos = output->pos; @@ -184,8 +196,12 @@ size_t ZSTD_seekable_compressStream(ZSTD_seekable_CStream* zcs, ZSTD_outBuffer* } if (zcs->maxChunkSize == zcs->chunkDSize) { + /* log the chunk and start over */ size_t const ret = ZSTD_seekable_endChunk(zcs, output); if (ZSTD_isError(ret)) return ret; + + /* get the client ready for the next chunk */ + return (size_t)zcs->maxChunkSize; } return (size_t)(zcs->maxChunkSize - zcs->chunkDSize); @@ -204,62 +220,78 @@ static size_t ZSTD_seekable_seekTableSize(ZSTD_seekable_CStream* zcs) static size_t ZSTD_seekable_writeSeekTable(ZSTD_seekable_CStream* zcs, ZSTD_outBuffer* output) { BYTE* op = (BYTE*) output->dst; + BYTE tmp[4]; /* so that we can work with buffers too small to write a whole word to */ /* repurpose * zcs->chunkDSize: the current index in the table and - * zcs->chunkCSize: the amount of the table written so far */ + * zcs->chunkCSize: the amount of the table written so far + * + * This function is written this way so that if it has to return early + * because of a small buffer, it can keep going where it left off. + */ size_t const sizePerChunk = 8 + (zcs->checksumFlag?4:0); - size_t const seekTableLen = ZSD_seekable_seekTableSize(zcs); + size_t const seekTableLen = ZSTD_seekable_seekTableSize(zcs); - if (zcs->chunkCSize == 0) { - if (output->size - output->pos < 4) return seekTableLen - zcs->chunkCSize; - MEM_writeLE32(op + output->pos, ZSTD_MAGIC_SKIPPABLE_START); - output->pos += 4; - zcs->chunkCSize += 4; - } - if (zcs->chunkCSize == 4) { - if (output->size - output->pos < 4) return seekTableLen - zcs->chunkCSize; - MEM_writeLE32(op + output->pos, seekTableLen - ZSTD_skippableHeaderSize); - output->pos += 4; - zcs->chunkCSize += 4; - } +#define st_write32(x, o) \ + do { \ + if (zcs->chunkCSize < (o) + 4) { \ + size_t const lenWrite = MIN(output->size - output->pos, \ + (o) + 4 - zcs->chunkCSize); \ + MEM_writeLE32(tmp, (x)); \ + memcpy(op + output->pos, tmp + (zcs->chunkCSize - (o)), lenWrite); \ + zcs->chunkCSize += lenWrite; \ + output->pos += lenWrite; \ + if (lenWrite < 4) return seekTableLen - zcs->chunkCSize; \ + } \ + } while (0) + + st_write32(ZSTD_MAGIC_SKIPPABLE_START, 0); + st_write32(seekTableLen - ZSTD_skippableHeaderSize, 4); while (zcs->chunkDSize < zcs->chunklog.size) { - if (output->size - output->pos < sizePerChunk) return seekTableLen - zcs->chunkCSize; - MEM_writeLE32(op + output->pos + 0, zcs->chunklog.entries[zcs->chunkDSize].cSize); - MEM_writeLE32(op + output->pos + 4, zcs->chunklog.entries[zcs->chunkDSize].dSize); + st_write32(zcs->chunklog.entries[zcs->chunkDSize].cSize, + ZSTD_skippableHeaderSize + sizePerChunk * zcs->chunkDSize); + st_write32(zcs->chunklog.entries[zcs->chunkDSize].dSize, + ZSTD_skippableHeaderSize + sizePerChunk * zcs->chunkDSize + 4); if (zcs->checksumFlag) { - MEM_writeLE32(op + output->pos + 8, zcs->chunklog.entries[zcs->chunkDSize].checksum); + st_write32(zcs->chunklog.entries[zcs->chunkDSize].checksum, + ZSTD_skippableHeaderSize + sizePerChunk * zcs->chunkDSize + 8); } - output->pos += sizePerChunk; - zcs->chunkCSize += sizePerChunk; + zcs->chunkDSize++; } - if (output->size - output->pos < ZSTD_seekTableFooterSize) return seekTableLen - zcs->chunkCSize; - MEM_writeLE32(op + output->pos, zcs->chunklog.size); - { BYTE sfd = 0; + st_write32(zcs->chunklog.size, seekTableLen - ZSTD_seekTableFooterSize); + + if (output->size - output->pos < 1) return seekTableLen - zcs->chunkCSize; + if (zcs->chunkCSize < seekTableLen - 4) { + BYTE sfd = 0; sfd |= (zcs->checksumFlag) << 7; - op[output->pos + 4] = sfd; + op[output->pos] = sfd; + output->pos++; + zcs->chunkCSize++; } - MEM_writeLE32(op + output->pos + 5, ZSTD_SEEKABLE_MAGICNUMBER); - output->pos += ZSTD_seekTableFooterSize; - zcs->chunkCSize += ZSTD_seekTableFooterSize; + st_write32(ZSTD_SEEKABLE_MAGICNUMBER, seekTableLen - 4); if (zcs->chunkCSize != seekTableLen) return ERROR(GENERIC); return 0; + +#undef st_write32 } size_t ZSTD_seekable_endStream(ZSTD_seekable_CStream* zcs, ZSTD_outBuffer* output) { - if (zcs->chunkDSize) { + if (!zcs->writingSeekTable && zcs->chunkDSize) { const size_t endChunk = ZSTD_seekable_endChunk(zcs, output); + if (ZSTD_isError(endChunk)) return endChunk; /* return an accurate size hint */ - if (endChunk) return endChunk + ZSTD_seekable_seekTableLen(zcs); + if (endChunk) return endChunk + ZSTD_seekable_seekTableSize(zcs); } + zcs->writingSeekTable = 1; + return ZSTD_seekable_writeSeekTable(zcs, output); } diff --git a/lib/decompress/zstdseek_decompress.c b/lib/decompress/zstdseek_decompress.c index e76b4c61..5e303f26 100644 --- a/lib/decompress/zstdseek_decompress.c +++ b/lib/decompress/zstdseek_decompress.c @@ -28,6 +28,10 @@ typedef struct { int checksumFlag; } seekTable_t; +/** ZSTD_seekable_offsetToChunk() : + * Performs a binary search to find the last chunk with a decompressed offset + * <= pos + * @return : the chunk's index */ static U32 ZSTD_seekable_offsetToChunk(const seekTable_t* table, U64 pos) { U32 lo = 0; @@ -44,8 +48,9 @@ static U32 ZSTD_seekable_offsetToChunk(const seekTable_t* table, U64 pos) return lo; } +/* Stream decompressor state machine stages */ enum ZSTD_seekable_DStream_stage { - zsds_init, + zsds_init = 0, zsds_seek, zsds_decompress, zsds_done, @@ -75,6 +80,7 @@ ZSTD_seekable_DStream* ZSTD_seekable_createDStream(void) if (zds == NULL) return NULL; + /* also initializes stage to zsds_init */ memset(zds, 0, sizeof(*zds)); zds->dstream = ZSTD_createDStream(); @@ -88,7 +94,7 @@ ZSTD_seekable_DStream* ZSTD_seekable_createDStream(void) size_t ZSTD_seekable_freeDStream(ZSTD_seekable_DStream* zds) { - if (zds == NULL) return 0; + if (zds == NULL) return 0; /* support free on null */ ZSTD_freeDStream(zds->dstream); free(zds->seekTable.entries); free(zds); @@ -105,6 +111,7 @@ size_t ZSTD_seekable_loadSeekTable(ZSTD_seekable_DStream* zds, const void* src, U32 sizePerEntry; + /* footer is fixed size */ if (srcSize < ZSTD_seekTableFooterSize) return ZSTD_seekTableFooterSize; @@ -112,15 +119,13 @@ size_t ZSTD_seekable_loadSeekTable(ZSTD_seekable_DStream* zds, const void* src, return ERROR(prefix_unknown); } - { - BYTE const sfd = ip[-5]; + { BYTE const sfd = ip[-5]; checksumFlag = sfd >> 7; - - numChunks = MEM_readLE32(ip-9); - - sizePerEntry = 8 + (checksumFlag?4:0); } + numChunks = MEM_readLE32(ip-9); + sizePerEntry = 8 + (checksumFlag?4:0); + { U32 const tableSize = sizePerEntry * numChunks; U32 const frameSize = tableSize + ZSTD_seekTableFooterSize + ZSTD_skippableHeaderSize; @@ -151,6 +156,7 @@ size_t ZSTD_seekable_loadSeekTable(ZSTD_seekable_DStream* zds, const void* src, return ERROR(memory_allocation); } + /* compute cumulative positions */ for (idx = 0, pos = 0; idx < numChunks; idx++) { entries[idx].cOffset = cOffset; entries[idx].dOffset = dOffset; @@ -175,7 +181,8 @@ size_t ZSTD_seekable_loadSeekTable(ZSTD_seekable_DStream* zds, const void* src, size_t ZSTD_seekable_initDStream(ZSTD_seekable_DStream* zds, U64 rangeStart, U64 rangeEnd) { - /* restrict range to the end of the file, and not before the range start */ + /* restrict range to the end of the file, of non-negative size */ + rangeStart = MIN(rangeStart, zds->seekTable.entries[zds->seekTable.tableLen].dOffset); rangeEnd = MIN(rangeEnd, zds->seekTable.entries[zds->seekTable.tableLen].dOffset); rangeEnd = MAX(rangeEnd, rangeStart); @@ -192,6 +199,8 @@ size_t ZSTD_seekable_initDStream(ZSTD_seekable_DStream* zds, U64 rangeStart, U64 XXH64_reset(&zds->xxhState, 0); } + if (rangeStart == rangeEnd) zds->stage = zsds_done; + { const size_t ret = ZSTD_initDStream(zds->dstream); if (ZSTD_isError(ret)) return ret; } return 0; @@ -222,7 +231,7 @@ size_t ZSTD_seekable_decompressStream(ZSTD_seekable_DStream* zds, ZSTD_outBuffer while (1) { switch (zds->stage) { case zsds_init: - return ERROR(init_missing); + return ERROR(init_missing); /* ZSTD_seekable_initDStream should be called first */ case zsds_decompress: { BYTE* const outBase = (BYTE*)output->dst + output->pos; size_t const outLen = output->size - output->pos; @@ -248,7 +257,7 @@ size_t ZSTD_seekable_decompressStream(ZSTD_seekable_DStream* zds, ZSTD_outBuffer zds->compressedOffset += input->pos - prevInputPos; zds->decompressedOffset += outTmp.pos; - if (zds->seekTable.checksumFlag) { + if (jt->checksumFlag) { XXH64_update(&zds->xxhState, outTmp.dst, outTmp.pos); } @@ -256,7 +265,7 @@ size_t ZSTD_seekable_decompressStream(ZSTD_seekable_DStream* zds, ZSTD_outBuffer /* need more input */ return MIN( ZSTD_DStreamInSize(), - (size_t)(zds->seekTable.entries[zds->curChunk + 1] + (size_t)(jt->entries[zds->curChunk + 1] .cOffset - zds->compressedOffset)); } @@ -285,11 +294,11 @@ size_t ZSTD_seekable_decompressStream(ZSTD_seekable_DStream* zds, ZSTD_outBuffer output->pos += outTmp.pos; - if (zds->seekTable.checksumFlag) { + if (jt->checksumFlag) { XXH64_update(&zds->xxhState, outTmp.dst, outTmp.pos); if (ret == 0) { /* verify the checksum */ - U32 const digest = XXH64_digest(&zds->xxhState); + U32 const digest = XXH64_digest(&zds->xxhState) & 0xFFFFFFFFU; if (digest != jt->entries[zds->curChunk].checksum) { return ERROR(checksum_wrong); } @@ -306,17 +315,21 @@ size_t ZSTD_seekable_decompressStream(ZSTD_seekable_DStream* zds, ZSTD_outBuffer if (ret == 0) { /* frame is done */ + /* make sure this lines up with the expected frame border */ + if (zds->decompressedOffset != + jt->entries[zds->curChunk + 1].dOffset || + zds->compressedOffset != + jt->entries[zds->curChunk + 1].cOffset) + return ERROR(corruption_detected); ZSTD_resetDStream(zds->dstream); zds->stage = zsds_seek; break; } /* need more input */ - return MIN( - ZSTD_DStreamInSize(), - (size_t)(zds->seekTable.entries[zds->curChunk + 1] - .cOffset - - zds->compressedOffset)); + return MIN(ZSTD_DStreamInSize(), (size_t)( + jt->entries[zds->curChunk + 1].cOffset - + zds->compressedOffset)); } } case zsds_seek: { @@ -338,6 +351,7 @@ size_t ZSTD_seekable_decompressStream(ZSTD_seekable_DStream* zds, ZSTD_outBuffer zds->nextSeek = jt->entries[targetChunk].cOffset; zds->decompressedOffset = jt->entries[targetChunk].dOffset; + /* signal to user that a seek is required */ return ERROR(needSeek); } case zsds_done: diff --git a/lib/zstd.h b/lib/zstd.h index 5bbd1b74..748a9827 100644 --- a/lib/zstd.h +++ b/lib/zstd.h @@ -807,6 +807,8 @@ typedef struct ZSTD_seekable_DStream_s ZSTD_seekable_DStream; * for a new compression operation. * `maxChunkSize` indicates the size at which to automatically start a new * seekable frame. `maxChunkSize == 0` implies the default maximum size. +* `checksumFlag` indicates whether or not the seek table should include chunk +* checksums on the uncompressed data for verification. * @return : a size hint for input to provide for compression, or an error code * checkable with ZSTD_isError() * @@ -836,7 +838,7 @@ ZSTDLIB_API ZSTD_seekable_CStream* ZSTD_seekable_createCStream(void); ZSTDLIB_API size_t ZSTD_seekable_freeCStream(ZSTD_seekable_CStream* zcs); /*===== Seekable compression functions =====*/ -ZSTDLIB_API size_t ZSTD_seekable_initCStream(ZSTD_seekable_CStream* zcs, int compressionLevel, unsigned maxChunkSize); +ZSTDLIB_API size_t ZSTD_seekable_initCStream(ZSTD_seekable_CStream* zcs, int compressionLevel, int checksumFlag, unsigned maxChunkSize); ZSTDLIB_API size_t ZSTD_seekable_compressStream(ZSTD_seekable_CStream* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input); ZSTDLIB_API size_t ZSTD_seekable_endChunk(ZSTD_seekable_CStream* zcs, ZSTD_outBuffer* output); ZSTDLIB_API size_t ZSTD_seekable_endStream(ZSTD_seekable_CStream* zcs, ZSTD_outBuffer* output);