Merge branch 'dev' into fix44168

This commit is contained in:
Yann Collet 2022-01-31 17:31:55 -08:00
commit cdee6a7dbd
51 changed files with 2063 additions and 316 deletions

View File

@ -102,7 +102,7 @@ matrix:
travis_retry curl -o ~/ninja.zip -L 'https://github.com/ninja-build/ninja/releases/download/v1.9.0/ninja-linux.zip' &&
unzip ~/ninja.zip -d ~/.local/bin
- |
travis_retry curl -o ~/get-pip.py -L 'https://bootstrap.pypa.io/get-pip.py' &&
travis_retry curl -o ~/get-pip.py -L 'https://bootstrap.pypa.io/pip/3.6/get-pip.py' &&
python3 ~/get-pip.py --user &&
pip3 install --user meson
script:

View File

@ -353,7 +353,7 @@ size_t FSE_buildDTable_raw (FSE_DTable* dt, unsigned nbBits);
size_t FSE_buildDTable_rle (FSE_DTable* dt, unsigned char symbolValue);
/**< build a fake FSE_DTable, designed to always generate the same symbolValue */
#define FSE_DECOMPRESS_WKSP_SIZE_U32(maxTableLog, maxSymbolValue) (FSE_DTABLE_SIZE_U32(maxTableLog) + FSE_BUILD_DTABLE_WKSP_SIZE_U32(maxTableLog, maxSymbolValue) + (FSE_MAX_SYMBOL_VALUE + 1) / 2 + 1)
#define FSE_DECOMPRESS_WKSP_SIZE_U32(maxTableLog, maxSymbolValue) (FSE_DTABLE_SIZE_U32(maxTableLog) + 1 + FSE_BUILD_DTABLE_WKSP_SIZE_U32(maxTableLog, maxSymbolValue) + (FSE_MAX_SYMBOL_VALUE + 1) / 2 + 1)
#define FSE_DECOMPRESS_WKSP_SIZE(maxTableLog, maxSymbolValue) (FSE_DECOMPRESS_WKSP_SIZE_U32(maxTableLog, maxSymbolValue) * sizeof(unsigned))
size_t FSE_decompress_wksp(void* dst, size_t dstCapacity, const void* cSrc, size_t cSrcSize, unsigned maxLog, void* workSpace, size_t wkspSize);
/**< same as FSE_decompress(), using an externally allocated `workSpace` produced with `FSE_DECOMPRESS_WKSP_SIZE_U32(maxLog, maxSymbolValue)` */

View File

@ -342,7 +342,8 @@ FORCE_INLINE_TEMPLATE size_t FSE_decompress_wksp_body(
}
if (FSE_DECOMPRESS_WKSP_SIZE(tableLog, maxSymbolValue) > wkspSize) return ERROR(tableLog_tooLarge);
workSpace = wksp->dtable + FSE_DTABLE_SIZE_U32(tableLog);
assert(sizeof(*wksp) + FSE_DTABLE_SIZE(tableLog) <= wkspSize);
workSpace = (BYTE*)workSpace + sizeof(*wksp) + FSE_DTABLE_SIZE(tableLog);
wkspSize -= sizeof(*wksp) + FSE_DTABLE_SIZE(tableLog);
CHECK_F( FSE_buildDTable_internal(wksp->dtable, wksp->ncount, maxSymbolValue, tableLog, workSpace, wkspSize) );

View File

@ -164,6 +164,7 @@ Advanced arguments :
--filelist FILE : read list of files to operate upon from FILE
--output-dir-flat DIR : processed files are stored into DIR
--output-dir-mirror DIR : processed files are stored into DIR respecting original directory structure
--[no-]asyncio : use asynchronous IO (default: enabled)
--[no-]check : during compression, add XXH64 integrity checksum to frame (default: enabled). If specified with -d, decompressor will ignore/validate checksums in compressed frame (default: validate).
-- : All arguments after "--" are treated as files

View File

@ -27,10 +27,11 @@
#include <string.h> /* memset */
#include <stdio.h> /* fprintf, fopen, ftello64 */
#include <errno.h> /* errno */
#include <assert.h>
#include "timefn.h" /* UTIL_time_t, UTIL_clockSpanMicro, UTIL_getTime */
#include "../lib/common/debug.h" /* assert */
#include "../lib/common/mem.h" /* read */
#include "../lib/zstd_errors.h"
#include "dibio.h"
@ -193,7 +194,8 @@ static U32 DiB_rand(U32* src)
static void DiB_shuffle(const char** fileNamesTable, unsigned nbFiles) {
U32 seed = 0xFD2FB528;
unsigned i;
assert(nbFiles >= 1);
if (nbFiles == 0)
return;
for (i = nbFiles - 1; i > 0; --i) {
unsigned const j = DiB_rand(&seed) % (i + 1);
const char* const tmp = fileNamesTable[j];
@ -379,7 +381,7 @@ int DiB_trainFromFiles(const char* dictFileName, size_t maxDictSize,
srcBuffer, &loadedSize, sampleSizes, fs.nbSamples, fileNamesTable,
nbFiles, chunkSize, displayLevel);
{ size_t dictSize;
{ size_t dictSize = ZSTD_error_GENERIC;
if (params) {
DiB_fillNoise((char*)srcBuffer + loadedSize, NOISELENGTH); /* guard band, for end of buffer condition */
dictSize = ZDICT_trainFromBuffer_legacy(dictBuffer, maxDictSize,
@ -399,8 +401,7 @@ int DiB_trainFromFiles(const char* dictFileName, size_t maxDictSize,
dictSize = ZDICT_trainFromBuffer_cover(dictBuffer, maxDictSize, srcBuffer,
sampleSizes, nbSamplesLoaded, *coverParams);
}
} else {
assert(fastCoverParams != NULL);
} else if (fastCoverParams != NULL) {
if (optimize) {
dictSize = ZDICT_optimizeTrainFromBuffer_fastCover(dictBuffer, maxDictSize,
srcBuffer, sampleSizes, nbSamplesLoaded,
@ -415,6 +416,8 @@ int DiB_trainFromFiles(const char* dictFileName, size_t maxDictSize,
dictSize = ZDICT_trainFromBuffer_fastCover(dictBuffer, maxDictSize, srcBuffer,
sampleSizes, nbSamplesLoaded, *fastCoverParams);
}
} else {
assert(0 /* Impossible */);
}
if (ZDICT_isError(dictSize)) {
DISPLAYLEVEL(1, "dictionary training failed : %s \n", ZDICT_getErrorName(dictSize)); /* should not happen */

View File

@ -289,7 +289,7 @@ FIO_prefs_t* FIO_createPreferences(void)
ret->literalCompressionMode = ZSTD_ps_auto;
ret->excludeCompressedFiles = 0;
ret->allowBlockDevices = 0;
ret->asyncIO = 0;
ret->asyncIO = AIO_supported();
return ret;
}
@ -848,16 +848,12 @@ static int FIO_removeMultiFilesWarning(FIO_ctx_t* const fCtx, const FIO_prefs_t*
* Compression
************************************************************************/
typedef struct {
FILE* srcFile;
FILE* dstFile;
void* srcBuffer;
size_t srcBufferSize;
void* dstBuffer;
size_t dstBufferSize;
void* dictBuffer;
size_t dictBufferSize;
const char* dictFileName;
ZSTD_CStream* cctx;
WritePoolCtx_t *writeCtx;
ReadPoolCtx_t *readCtx;
} cRess_t;
/** ZSTD_cycleLog() :
@ -906,9 +902,6 @@ static cRess_t FIO_createCResources(FIO_prefs_t* const prefs,
if (ress.cctx == NULL)
EXM_THROW(30, "allocation error (%s): can't create ZSTD_CCtx",
strerror(errno));
ress.srcBufferSize = ZSTD_CStreamInSize();
ress.srcBuffer = malloc(ress.srcBufferSize);
ress.dstBufferSize = ZSTD_CStreamOutSize();
/* need to update memLimit before calling createDictBuffer
* because of memLimit check inside it */
@ -916,10 +909,10 @@ static cRess_t FIO_createCResources(FIO_prefs_t* const prefs,
unsigned long long const ssSize = (unsigned long long)prefs->streamSrcSize;
FIO_adjustParamsForPatchFromMode(prefs, &comprParams, UTIL_getFileSize(dictFileName), ssSize > 0 ? ssSize : maxSrcFileSize, cLevel);
}
ress.dstBuffer = malloc(ress.dstBufferSize);
ress.dictBufferSize = FIO_createDictBuffer(&ress.dictBuffer, dictFileName, prefs); /* works with dictFileName==NULL */
if (!ress.srcBuffer || !ress.dstBuffer)
EXM_THROW(31, "allocation error : not enough memory");
ress.writeCtx = AIO_WritePool_create(prefs, ZSTD_CStreamOutSize());
ress.readCtx = AIO_ReadPool_create(prefs, ZSTD_CStreamInSize());
/* Advanced parameters, including dictionary */
if (dictFileName && (ress.dictBuffer==NULL))
@ -982,9 +975,9 @@ static cRess_t FIO_createCResources(FIO_prefs_t* const prefs,
static void FIO_freeCResources(const cRess_t* const ress)
{
free(ress->srcBuffer);
free(ress->dstBuffer);
free(ress->dictBuffer);
AIO_WritePool_free(ress->writeCtx);
AIO_ReadPool_free(ress->readCtx);
ZSTD_freeCStream(ress->cctx); /* never fails */
}
@ -997,6 +990,7 @@ FIO_compressGzFrame(const cRess_t* ress, /* buffers & handlers are used, but no
{
unsigned long long inFileSize = 0, outFileSize = 0;
z_stream strm;
IOJob_t *writeJob = NULL;
if (compressionLevel > Z_BEST_COMPRESSION)
compressionLevel = Z_BEST_COMPRESSION;
@ -1012,51 +1006,58 @@ FIO_compressGzFrame(const cRess_t* ress, /* buffers & handlers are used, but no
EXM_THROW(71, "zstd: %s: deflateInit2 error %d \n", srcFileName, ret);
} }
writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
strm.next_in = 0;
strm.avail_in = 0;
strm.next_out = (Bytef*)ress->dstBuffer;
strm.avail_out = (uInt)ress->dstBufferSize;
strm.next_out = (Bytef*)writeJob->buffer;
strm.avail_out = (uInt)writeJob->bufferSize;
while (1) {
int ret;
if (strm.avail_in == 0) {
size_t const inSize = fread(ress->srcBuffer, 1, ress->srcBufferSize, ress->srcFile);
if (inSize == 0) break;
inFileSize += inSize;
strm.next_in = (z_const unsigned char*)ress->srcBuffer;
strm.avail_in = (uInt)inSize;
AIO_ReadPool_fillBuffer(ress->readCtx, ZSTD_CStreamInSize());
if (ress->readCtx->srcBufferLoaded == 0) break;
inFileSize += ress->readCtx->srcBufferLoaded;
strm.next_in = (z_const unsigned char*)ress->readCtx->srcBuffer;
strm.avail_in = (uInt)ress->readCtx->srcBufferLoaded;
}
ret = deflate(&strm, Z_NO_FLUSH);
{
size_t const availBefore = strm.avail_in;
ret = deflate(&strm, Z_NO_FLUSH);
AIO_ReadPool_consumeBytes(ress->readCtx, availBefore - strm.avail_in);
}
if (ret != Z_OK)
EXM_THROW(72, "zstd: %s: deflate error %d \n", srcFileName, ret);
{ size_t const cSize = ress->dstBufferSize - strm.avail_out;
{ size_t const cSize = writeJob->bufferSize - strm.avail_out;
if (cSize) {
if (fwrite(ress->dstBuffer, 1, cSize, ress->dstFile) != cSize)
EXM_THROW(73, "Write error : cannot write to output file : %s ", strerror(errno));
writeJob->usedBufferSize = cSize;
AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
outFileSize += cSize;
strm.next_out = (Bytef*)ress->dstBuffer;
strm.avail_out = (uInt)ress->dstBufferSize;
} }
strm.next_out = (Bytef*)writeJob->buffer;
strm.avail_out = (uInt)writeJob->bufferSize;
} }
if (srcFileSize == UTIL_FILESIZE_UNKNOWN) {
DISPLAYUPDATE(2, "\rRead : %u MB ==> %.2f%% ",
(unsigned)(inFileSize>>20),
(double)outFileSize/inFileSize*100)
(unsigned)(inFileSize>>20),
(double)outFileSize/inFileSize*100)
} else {
DISPLAYUPDATE(2, "\rRead : %u / %u MB ==> %.2f%% ",
(unsigned)(inFileSize>>20), (unsigned)(srcFileSize>>20),
(double)outFileSize/inFileSize*100);
} }
(unsigned)(inFileSize>>20), (unsigned)(srcFileSize>>20),
(double)outFileSize/inFileSize*100);
} }
while (1) {
int const ret = deflate(&strm, Z_FINISH);
{ size_t const cSize = ress->dstBufferSize - strm.avail_out;
{ size_t const cSize = writeJob->bufferSize - strm.avail_out;
if (cSize) {
if (fwrite(ress->dstBuffer, 1, cSize, ress->dstFile) != cSize)
EXM_THROW(75, "Write error : %s ", strerror(errno));
writeJob->usedBufferSize = cSize;
AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
outFileSize += cSize;
strm.next_out = (Bytef*)ress->dstBuffer;
strm.avail_out = (uInt)ress->dstBufferSize;
} }
strm.next_out = (Bytef*)writeJob->buffer;
strm.avail_out = (uInt)writeJob->bufferSize;
} }
if (ret == Z_STREAM_END) break;
if (ret != Z_BUF_ERROR)
EXM_THROW(77, "zstd: %s: deflate error %d \n", srcFileName, ret);
@ -1067,6 +1068,8 @@ FIO_compressGzFrame(const cRess_t* ress, /* buffers & handlers are used, but no
EXM_THROW(79, "zstd: %s: deflateEnd error %d \n", srcFileName, ret);
} }
*readsize = inFileSize;
AIO_WritePool_releaseIoJob(writeJob);
AIO_WritePool_sparseWriteEnd(ress->writeCtx);
return outFileSize;
}
#endif
@ -1082,6 +1085,7 @@ FIO_compressLzmaFrame(cRess_t* ress,
lzma_stream strm = LZMA_STREAM_INIT;
lzma_action action = LZMA_RUN;
lzma_ret ret;
IOJob_t *writeJob = NULL;
if (compressionLevel < 0) compressionLevel = 0;
if (compressionLevel > 9) compressionLevel = 9;
@ -1099,31 +1103,37 @@ FIO_compressLzmaFrame(cRess_t* ress,
EXM_THROW(83, "zstd: %s: lzma_easy_encoder error %d", srcFileName, ret);
}
writeJob =AIO_WritePool_acquireJob(ress->writeCtx);
strm.next_out = (Bytef*)writeJob->buffer;
strm.avail_out = (uInt)writeJob->bufferSize;
strm.next_in = 0;
strm.avail_in = 0;
strm.next_out = (BYTE*)ress->dstBuffer;
strm.avail_out = ress->dstBufferSize;
while (1) {
if (strm.avail_in == 0) {
size_t const inSize = fread(ress->srcBuffer, 1, ress->srcBufferSize, ress->srcFile);
if (inSize == 0) action = LZMA_FINISH;
size_t const inSize = AIO_ReadPool_fillBuffer(ress->readCtx, ZSTD_CStreamInSize());
if (ress->readCtx->srcBufferLoaded == 0) action = LZMA_FINISH;
inFileSize += inSize;
strm.next_in = (BYTE const*)ress->srcBuffer;
strm.avail_in = inSize;
strm.next_in = (BYTE const*)ress->readCtx->srcBuffer;
strm.avail_in = ress->readCtx->srcBufferLoaded;
}
{
size_t const availBefore = strm.avail_in;
ret = lzma_code(&strm, action);
AIO_ReadPool_consumeBytes(ress->readCtx, availBefore - strm.avail_in);
}
ret = lzma_code(&strm, action);
if (ret != LZMA_OK && ret != LZMA_STREAM_END)
EXM_THROW(84, "zstd: %s: lzma_code encoding error %d", srcFileName, ret);
{ size_t const compBytes = ress->dstBufferSize - strm.avail_out;
{ size_t const compBytes = writeJob->bufferSize - strm.avail_out;
if (compBytes) {
if (fwrite(ress->dstBuffer, 1, compBytes, ress->dstFile) != compBytes)
EXM_THROW(85, "Write error : %s", strerror(errno));
writeJob->usedBufferSize = compBytes;
AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
outFileSize += compBytes;
strm.next_out = (BYTE*)ress->dstBuffer;
strm.avail_out = ress->dstBufferSize;
strm.next_out = (Bytef*)writeJob->buffer;
strm.avail_out = writeJob->bufferSize;
} }
if (srcFileSize == UTIL_FILESIZE_UNKNOWN)
DISPLAYUPDATE(2, "\rRead : %u MB ==> %.2f%%",
@ -1139,6 +1149,9 @@ FIO_compressLzmaFrame(cRess_t* ress,
lzma_end(&strm);
*readsize = inFileSize;
AIO_WritePool_releaseIoJob(writeJob);
AIO_WritePool_sparseWriteEnd(ress->writeCtx);
return outFileSize;
}
#endif
@ -1164,15 +1177,18 @@ FIO_compressLz4Frame(cRess_t* ress,
LZ4F_preferences_t prefs;
LZ4F_compressionContext_t ctx;
IOJob_t* writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
LZ4F_errorCode_t const errorCode = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
if (LZ4F_isError(errorCode))
EXM_THROW(31, "zstd: failed to create lz4 compression context");
memset(&prefs, 0, sizeof(prefs));
assert(blockSize <= ress->srcBufferSize);
assert(blockSize <= ress->readCtx->base.jobBufferSize);
prefs.autoFlush = 1;
/* autoflush off to mitigate a bug in lz4<=1.9.3 for compression level 12 */
prefs.autoFlush = 0;
prefs.compressionLevel = compressionLevel;
prefs.frameInfo.blockMode = LZ4F_blockLinked;
prefs.frameInfo.blockSizeID = LZ4F_max64KB;
@ -1180,27 +1196,25 @@ FIO_compressLz4Frame(cRess_t* ress,
#if LZ4_VERSION_NUMBER >= 10600
prefs.frameInfo.contentSize = (srcFileSize==UTIL_FILESIZE_UNKNOWN) ? 0 : srcFileSize;
#endif
assert(LZ4F_compressBound(blockSize, &prefs) <= ress->dstBufferSize);
assert(LZ4F_compressBound(blockSize, &prefs) <= writeJob->bufferSize);
{
size_t readSize;
size_t headerSize = LZ4F_compressBegin(ctx, ress->dstBuffer, ress->dstBufferSize, &prefs);
size_t headerSize = LZ4F_compressBegin(ctx, writeJob->buffer, writeJob->bufferSize, &prefs);
if (LZ4F_isError(headerSize))
EXM_THROW(33, "File header generation failed : %s",
LZ4F_getErrorName(headerSize));
if (fwrite(ress->dstBuffer, 1, headerSize, ress->dstFile) != headerSize)
EXM_THROW(34, "Write error : %s (cannot write header)", strerror(errno));
writeJob->usedBufferSize = headerSize;
AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
outFileSize += headerSize;
/* Read first block */
readSize = fread(ress->srcBuffer, (size_t)1, (size_t)blockSize, ress->srcFile);
inFileSize += readSize;
inFileSize += AIO_ReadPool_fillBuffer(ress->readCtx, blockSize);
/* Main Loop */
while (readSize>0) {
size_t const outSize = LZ4F_compressUpdate(ctx,
ress->dstBuffer, ress->dstBufferSize,
ress->srcBuffer, readSize, NULL);
while (ress->readCtx->srcBufferLoaded) {
size_t inSize = MIN(blockSize, ress->readCtx->srcBufferLoaded);
size_t const outSize = LZ4F_compressUpdate(ctx, writeJob->buffer, writeJob->bufferSize,
ress->readCtx->srcBuffer, inSize, NULL);
if (LZ4F_isError(outSize))
EXM_THROW(35, "zstd: %s: lz4 compression failed : %s",
srcFileName, LZ4F_getErrorName(outSize));
@ -1216,33 +1230,29 @@ FIO_compressLz4Frame(cRess_t* ress,
}
/* Write Block */
{ size_t const sizeCheck = fwrite(ress->dstBuffer, 1, outSize, ress->dstFile);
if (sizeCheck != outSize)
EXM_THROW(36, "Write error : %s", strerror(errno));
}
writeJob->usedBufferSize = outSize;
AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
/* Read next block */
readSize = fread(ress->srcBuffer, (size_t)1, (size_t)blockSize, ress->srcFile);
inFileSize += readSize;
AIO_ReadPool_consumeBytes(ress->readCtx, inSize);
inFileSize += AIO_ReadPool_fillBuffer(ress->readCtx, blockSize);
}
if (ferror(ress->srcFile)) EXM_THROW(37, "Error reading %s ", srcFileName);
/* End of Stream mark */
headerSize = LZ4F_compressEnd(ctx, ress->dstBuffer, ress->dstBufferSize, NULL);
headerSize = LZ4F_compressEnd(ctx, writeJob->buffer, writeJob->bufferSize, NULL);
if (LZ4F_isError(headerSize))
EXM_THROW(38, "zstd: %s: lz4 end of file generation failed : %s",
srcFileName, LZ4F_getErrorName(headerSize));
{ size_t const sizeCheck = fwrite(ress->dstBuffer, 1, headerSize, ress->dstFile);
if (sizeCheck != headerSize)
EXM_THROW(39, "Write error : %s (cannot write end of stream)",
strerror(errno));
}
writeJob->usedBufferSize = headerSize;
AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
outFileSize += headerSize;
}
*readsize = inFileSize;
LZ4F_freeCompressionContext(ctx);
AIO_WritePool_releaseIoJob(writeJob);
AIO_WritePool_sparseWriteEnd(ress->writeCtx);
return outFileSize;
}
@ -1257,8 +1267,8 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
int compressionLevel, U64* readsize)
{
cRess_t const ress = *ressPtr;
FILE* const srcFile = ress.srcFile;
FILE* const dstFile = ress.dstFile;
IOJob_t *writeJob = AIO_WritePool_acquireJob(ressPtr->writeCtx);
U64 compressedfilesize = 0;
ZSTD_EndDirective directive = ZSTD_e_continue;
U64 pledgedSrcSize = ZSTD_CONTENTSIZE_UNKNOWN;
@ -1303,12 +1313,12 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
do {
size_t stillToFlush;
/* Fill input Buffer */
size_t const inSize = fread(ress.srcBuffer, (size_t)1, ress.srcBufferSize, srcFile);
ZSTD_inBuffer inBuff = { ress.srcBuffer, inSize, 0 };
size_t const inSize = AIO_ReadPool_fillBuffer(ress.readCtx, ZSTD_CStreamInSize());
ZSTD_inBuffer inBuff = { ress.readCtx->srcBuffer, ress.readCtx->srcBufferLoaded, 0 };
DISPLAYLEVEL(6, "fread %u bytes from source \n", (unsigned)inSize);
*readsize += inSize;
if ((inSize == 0) || (*readsize == fileSize))
if ((ress.readCtx->srcBufferLoaded == 0) || (*readsize == fileSize))
directive = ZSTD_e_end;
stillToFlush = 1;
@ -1316,9 +1326,10 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
|| (directive == ZSTD_e_end && stillToFlush != 0) ) {
size_t const oldIPos = inBuff.pos;
ZSTD_outBuffer outBuff = { ress.dstBuffer, ress.dstBufferSize, 0 };
ZSTD_outBuffer outBuff= { writeJob->buffer, writeJob->bufferSize, 0 };
size_t const toFlushNow = ZSTD_toFlushNow(ress.cctx);
CHECK_V(stillToFlush, ZSTD_compressStream2(ress.cctx, &outBuff, &inBuff, directive));
AIO_ReadPool_consumeBytes(ress.readCtx, inBuff.pos - oldIPos);
/* count stats */
inputPresented++;
@ -1327,12 +1338,10 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
/* Write compressed stream */
DISPLAYLEVEL(6, "ZSTD_compress_generic(end:%u) => input pos(%u)<=(%u)size ; output generated %u bytes \n",
(unsigned)directive, (unsigned)inBuff.pos, (unsigned)inBuff.size, (unsigned)outBuff.pos);
(unsigned)directive, (unsigned)inBuff.pos, (unsigned)inBuff.size, (unsigned)outBuff.pos);
if (outBuff.pos) {
size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile);
if (sizeCheck != outBuff.pos)
EXM_THROW(25, "Write error : %s (cannot write compressed block)",
strerror(errno));
writeJob->usedBufferSize = outBuff.pos;
AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
compressedfilesize += outBuff.pos;
}
@ -1464,14 +1473,14 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
} /* while ((inBuff.pos != inBuff.size) */
} while (directive != ZSTD_e_end);
if (ferror(srcFile)) {
EXM_THROW(26, "Read error : I/O error");
}
if (fileSize != UTIL_FILESIZE_UNKNOWN && *readsize != fileSize) {
EXM_THROW(27, "Read error : Incomplete read : %llu / %llu B",
(unsigned long long)*readsize, (unsigned long long)fileSize);
}
AIO_WritePool_releaseIoJob(writeJob);
AIO_WritePool_sparseWriteEnd(ressPtr->writeCtx);
return compressedfilesize;
}
@ -1572,7 +1581,7 @@ FIO_compressFilename_internal(FIO_ctx_t* const fCtx,
/*! FIO_compressFilename_dstFile() :
* open dstFileName, or pass-through if ress.dstFile != NULL,
* open dstFileName, or pass-through if ress.file != NULL,
* then start compression with FIO_compressFilename_internal().
* Manages source removal (--rm) and file permissions transfer.
* note : ress.srcFile must be != NULL,
@ -1591,8 +1600,9 @@ static int FIO_compressFilename_dstFile(FIO_ctx_t* const fCtx,
int result;
stat_t statbuf;
int transferMTime = 0;
assert(ress.srcFile != NULL);
if (ress.dstFile == NULL) {
FILE *dstFile;
assert(AIO_ReadPool_getFile(ress.readCtx) != NULL);
if (AIO_WritePool_getFile(ress.writeCtx) == NULL) {
int dstFilePermissions = DEFAULT_FILE_PERMISSIONS;
if ( strcmp (srcFileName, stdinmark)
&& strcmp (dstFileName, stdoutmark)
@ -1604,8 +1614,9 @@ static int FIO_compressFilename_dstFile(FIO_ctx_t* const fCtx,
closeDstFile = 1;
DISPLAYLEVEL(6, "FIO_compressFilename_dstFile: opening dst: %s \n", dstFileName);
ress.dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFilePermissions);
if (ress.dstFile==NULL) return 1; /* could not open dstFileName */
dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFilePermissions);
if (dstFile==NULL) return 1; /* could not open dstFileName */
AIO_WritePool_setFile(ress.writeCtx, dstFile);
/* Must only be added after FIO_openDstFile() succeeds.
* Otherwise we may delete the destination file if it already exists,
* and the user presses Ctrl-C when asked if they wish to overwrite.
@ -1616,13 +1627,10 @@ static int FIO_compressFilename_dstFile(FIO_ctx_t* const fCtx,
result = FIO_compressFilename_internal(fCtx, prefs, ress, dstFileName, srcFileName, compressionLevel);
if (closeDstFile) {
FILE* const dstFile = ress.dstFile;
ress.dstFile = NULL;
clearHandler();
DISPLAYLEVEL(6, "FIO_compressFilename_dstFile: closing dst: %s \n", dstFileName);
if (fclose(dstFile)) { /* error closing dstFile */
if (AIO_WritePool_closeFile(ress.writeCtx)) { /* error closing file */
DISPLAYLEVEL(1, "zstd: %s: %s \n", dstFileName, strerror(errno));
result=1;
}
@ -1668,6 +1676,7 @@ FIO_compressFilename_srcFile(FIO_ctx_t* const fCtx,
int compressionLevel)
{
int result;
FILE* srcFile;
DISPLAYLEVEL(6, "FIO_compressFilename_srcFile: %s \n", srcFileName);
/* ensure src is not a directory */
@ -1691,13 +1700,13 @@ FIO_compressFilename_srcFile(FIO_ctx_t* const fCtx,
return 0;
}
ress.srcFile = FIO_openSrcFile(prefs, srcFileName);
if (ress.srcFile == NULL) return 1; /* srcFile could not be opened */
srcFile = FIO_openSrcFile(prefs, srcFileName);
if (srcFile == NULL) return 1; /* srcFile could not be opened */
AIO_ReadPool_setFile(ress.readCtx, srcFile);
result = FIO_compressFilename_dstFile(fCtx, prefs, ress, dstFileName, srcFileName, compressionLevel);
AIO_ReadPool_closeFile(ress.readCtx);
fclose(ress.srcFile);
ress.srcFile = NULL;
if ( prefs->removeSrcFile /* --rm */
&& result == 0 /* success */
&& strcmp(srcFileName, stdinmark) /* exception : don't erase stdin */
@ -1844,23 +1853,24 @@ int FIO_compressMultipleFilenames(FIO_ctx_t* const fCtx,
/* init */
assert(outFileName != NULL || suffix != NULL);
if (outFileName != NULL) { /* output into a single destination (stdout typically) */
FILE *dstFile;
if (FIO_removeMultiFilesWarning(fCtx, prefs, outFileName, 1 /* displayLevelCutoff */)) {
FIO_freeCResources(&ress);
return 1;
}
ress.dstFile = FIO_openDstFile(fCtx, prefs, NULL, outFileName, DEFAULT_FILE_PERMISSIONS);
if (ress.dstFile == NULL) { /* could not open outFileName */
dstFile = FIO_openDstFile(fCtx, prefs, NULL, outFileName, DEFAULT_FILE_PERMISSIONS);
if (dstFile == NULL) { /* could not open outFileName */
error = 1;
} else {
AIO_WritePool_setFile(ress.writeCtx, dstFile);
for (; fCtx->currFileIdx < fCtx->nbFilesTotal; ++fCtx->currFileIdx) {
status = FIO_compressFilename_srcFile(fCtx, prefs, ress, outFileName, inFileNamesTable[fCtx->currFileIdx], compressionLevel);
if (!status) fCtx->nbFilesProcessed++;
error |= status;
}
if (fclose(ress.dstFile))
if (AIO_WritePool_closeFile(ress.writeCtx))
EXM_THROW(29, "Write error (%s) : cannot properly close %s",
strerror(errno), outFileName);
ress.dstFile = NULL;
}
} else {
if (outMirroredRootDirName)
@ -1916,13 +1926,10 @@ int FIO_compressMultipleFilenames(FIO_ctx_t* const fCtx,
/* **************************************************************************
* Decompression
***************************************************************************/
typedef struct {
void* srcBuffer;
size_t srcBufferSize;
size_t srcBufferLoaded;
ZSTD_DStream* dctx;
WritePoolCtx_t *writeCtx;
ReadPoolCtx_t *readCtx;
} dRess_t;
static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFileName)
@ -1940,11 +1947,6 @@ static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFi
CHECK( ZSTD_DCtx_setMaxWindowSize(ress.dctx, prefs->memLimit) );
CHECK( ZSTD_DCtx_setParameter(ress.dctx, ZSTD_d_forceIgnoreChecksum, !prefs->checksumFlag));
ress.srcBufferSize = ZSTD_DStreamInSize();
ress.srcBuffer = malloc(ress.srcBufferSize);
if (!ress.srcBuffer)
EXM_THROW(61, "Allocation error : not enough memory");
/* dictionary */
{ void* dictBuffer;
size_t const dictBufferSize = FIO_createDictBuffer(&dictBuffer, dictFileName, prefs);
@ -1953,6 +1955,7 @@ static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFi
}
ress.writeCtx = AIO_WritePool_create(prefs, ZSTD_DStreamOutSize());
ress.readCtx = AIO_ReadPool_create(prefs, ZSTD_DStreamInSize());
return ress;
}
@ -1960,47 +1963,31 @@ static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFi
static void FIO_freeDResources(dRess_t ress)
{
CHECK( ZSTD_freeDStream(ress.dctx) );
free(ress.srcBuffer);
AIO_WritePool_free(ress.writeCtx);
}
/* FIO_consumeDSrcBuffer:
* Consumes len bytes from srcBuffer's start and moves the remaining data and srcBufferLoaded accordingly. */
static void FIO_consumeDSrcBuffer(dRess_t *ress, size_t len) {
assert(ress->srcBufferLoaded >= len);
ress->srcBufferLoaded -= len;
memmove(ress->srcBuffer, (char *)ress->srcBuffer + len, ress->srcBufferLoaded);
AIO_ReadPool_free(ress.readCtx);
}
/** FIO_passThrough() : just copy input into output, for compatibility with gzip -df mode
@return : 0 (no error) */
static int FIO_passThrough(const FIO_prefs_t* const prefs,
FILE* foutput, FILE* finput,
void* buffer, size_t bufferSize,
size_t alreadyLoaded)
static int FIO_passThrough(dRess_t *ress)
{
size_t const blockSize = MIN(64 KB, bufferSize);
size_t readFromInput;
unsigned storedSkips = 0;
size_t const blockSize = MIN(MIN(64 KB, ZSTD_DStreamInSize()), ZSTD_DStreamOutSize());
IOJob_t *writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
AIO_ReadPool_fillBuffer(ress->readCtx, blockSize);
/* assumption : ress->srcBufferLoaded bytes already loaded and stored within buffer */
{ size_t const sizeCheck = fwrite(buffer, 1, alreadyLoaded, foutput);
if (sizeCheck != alreadyLoaded) {
DISPLAYLEVEL(1, "Pass-through write error : %s\n", strerror(errno));
return 1;
} }
do {
readFromInput = fread(buffer, 1, blockSize, finput);
storedSkips = AIO_fwriteSparse(foutput, buffer, readFromInput, prefs, storedSkips);
} while (readFromInput == blockSize);
if (ferror(finput)) {
DISPLAYLEVEL(1, "Pass-through read error : %s\n", strerror(errno));
return 1;
while(ress->readCtx->srcBufferLoaded) {
size_t writeSize;
writeSize = MIN(blockSize, ress->readCtx->srcBufferLoaded);
assert(writeSize <= writeJob->bufferSize);
memcpy(writeJob->buffer, ress->readCtx->srcBuffer, writeSize);
writeJob->usedBufferSize = writeSize;
AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
AIO_ReadPool_consumeBytes(ress->readCtx, writeSize);
AIO_ReadPool_fillBuffer(ress->readCtx, blockSize);
}
assert(feof(finput));
AIO_fwriteSparseEnd(prefs, foutput, storedSkips);
assert(ress->readCtx->reachedEof);
AIO_WritePool_releaseIoJob(writeJob);
AIO_WritePool_sparseWriteEnd(ress->writeCtx);
return 0;
}
@ -2018,7 +2005,7 @@ FIO_zstdErrorHelp(const FIO_prefs_t* const prefs,
return;
/* Try to decode the frame header */
err = ZSTD_getFrameHeader(&header, ress->srcBuffer, ress->srcBufferLoaded);
err = ZSTD_getFrameHeader(&header, ress->readCtx->srcBuffer, ress->readCtx->srcBufferLoaded);
if (err == 0) {
unsigned long long const windowSize = header.windowSize;
unsigned const windowLog = FIO_highbit64(windowSize) + ((windowSize & (windowSize - 1)) != 0);
@ -2041,7 +2028,7 @@ FIO_zstdErrorHelp(const FIO_prefs_t* const prefs,
*/
#define FIO_ERROR_FRAME_DECODING ((unsigned long long)(-2))
static unsigned long long
FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput,
FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress,
const FIO_prefs_t* const prefs,
const char* srcFileName,
U64 alreadyDecoded) /* for multi-frames streams */
@ -2057,16 +2044,11 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput,
ZSTD_DCtx_reset(ress->dctx, ZSTD_reset_session_only);
/* Header loading : ensures ZSTD_getFrameHeader() will succeed */
{ size_t const toDecode = ZSTD_FRAMEHEADERSIZE_MAX;
if (ress->srcBufferLoaded < toDecode) {
size_t const toRead = toDecode - ress->srcBufferLoaded;
void* const startPosition = (char*)ress->srcBuffer + ress->srcBufferLoaded;
ress->srcBufferLoaded += fread(startPosition, 1, toRead, finput);
} }
AIO_ReadPool_fillBuffer(ress->readCtx, ZSTD_FRAMEHEADERSIZE_MAX);
/* Main decompression Loop */
while (1) {
ZSTD_inBuffer inBuff = { ress->srcBuffer, ress->srcBufferLoaded, 0 };
ZSTD_inBuffer inBuff = { ress->readCtx->srcBuffer, ress->readCtx->srcBufferLoaded, 0 };
ZSTD_outBuffer outBuff= { writeJob->buffer, writeJob->bufferSize, 0 };
size_t const readSizeHint = ZSTD_decompressStream(ress->dctx, &outBuff, &inBuff);
const int displayLevel = (g_display_prefs.progressSetting == FIO_ps_always) ? 1 : 2;
@ -2088,7 +2070,7 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput,
if (srcFileNameSize > 18) {
const char* truncatedSrcFileName = srcFileName + srcFileNameSize - 15;
DISPLAYUPDATE(displayLevel, "\rDecompress: %2u/%2u files. Current: ...%s : %.*f%s... ",
fCtx->currFileIdx+1, fCtx->nbFilesTotal, truncatedSrcFileName, hrs.precision, hrs.value, hrs.suffix);
fCtx->currFileIdx+1, fCtx->nbFilesTotal, truncatedSrcFileName, hrs.precision, hrs.value, hrs.suffix);
} else {
DISPLAYUPDATE(displayLevel, "\rDecompress: %2u/%2u files. Current: %s : %.*f%s... ",
fCtx->currFileIdx+1, fCtx->nbFilesTotal, srcFileName, hrs.precision, hrs.value, hrs.suffix);
@ -2098,23 +2080,21 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput,
srcFileName, hrs.precision, hrs.value, hrs.suffix);
}
FIO_consumeDSrcBuffer(ress, inBuff.pos);
AIO_ReadPool_consumeBytes(ress->readCtx, inBuff.pos);
if (readSizeHint == 0) break; /* end of frame */
/* Fill input buffer */
{ size_t const toDecode = MIN(readSizeHint, ress->srcBufferSize); /* support large skippable frames */
if (ress->srcBufferLoaded < toDecode) {
size_t const toRead = toDecode - ress->srcBufferLoaded; /* > 0 */
void* const startPosition = (char*)ress->srcBuffer + ress->srcBufferLoaded;
size_t const readSize = fread(startPosition, 1, toRead, finput);
{ size_t const toDecode = MIN(readSizeHint, ZSTD_DStreamInSize()); /* support large skippable frames */
if (ress->readCtx->srcBufferLoaded < toDecode) {
size_t const readSize = AIO_ReadPool_fillBuffer(ress->readCtx, toDecode);
if (readSize==0) {
DISPLAYLEVEL(1, "%s : Read error (39) : premature end \n",
srcFileName);
srcFileName);
AIO_WritePool_releaseIoJob(writeJob);
return FIO_ERROR_FRAME_DECODING;
}
ress->srcBufferLoaded += readSize;
} } }
} } }
AIO_WritePool_releaseIoJob(writeJob);
AIO_WritePool_sparseWriteEnd(ress->writeCtx);
@ -2125,7 +2105,7 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput,
#ifdef ZSTD_GZDECOMPRESS
static unsigned long long
FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName)
FIO_decompressGzFrame(dRess_t* ress, const char* srcFileName)
{
unsigned long long outFileSize = 0;
z_stream strm;
@ -2145,16 +2125,16 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName)
writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
strm.next_out = (Bytef*)writeJob->buffer;
strm.avail_out = (uInt)writeJob->bufferSize;
strm.avail_in = (uInt)ress->srcBufferLoaded;
strm.next_in = (z_const unsigned char*)ress->srcBuffer;
strm.avail_in = (uInt)ress->readCtx->srcBufferLoaded;
strm.next_in = (z_const unsigned char*)ress->readCtx->srcBuffer;
for ( ; ; ) {
int ret;
if (strm.avail_in == 0) {
ress->srcBufferLoaded = fread(ress->srcBuffer, 1, ress->srcBufferSize, srcFile);
if (ress->srcBufferLoaded == 0) flush = Z_FINISH;
strm.next_in = (z_const unsigned char*)ress->srcBuffer;
strm.avail_in = (uInt)ress->srcBufferLoaded;
AIO_ReadPool_consumeAndRefill(ress->readCtx);
if (ress->readCtx->srcBufferLoaded == 0) flush = Z_FINISH;
strm.next_in = (z_const unsigned char*)ress->readCtx->srcBuffer;
strm.avail_in = (uInt)ress->readCtx->srcBufferLoaded;
}
ret = inflate(&strm, flush);
if (ret == Z_BUF_ERROR) {
@ -2177,7 +2157,7 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName)
if (ret == Z_STREAM_END) break;
}
FIO_consumeDSrcBuffer(ress, ress->srcBufferLoaded - strm.avail_in);
AIO_ReadPool_consumeBytes(ress->readCtx, ress->readCtx->srcBufferLoaded - strm.avail_in);
if ( (inflateEnd(&strm) != Z_OK) /* release resources ; error detected */
&& (decodingError==0) ) {
@ -2192,7 +2172,7 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName)
#ifdef ZSTD_LZMADECOMPRESS
static unsigned long long
FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile,
FIO_decompressLzmaFrame(dRess_t* ress,
const char* srcFileName, int plain_lzma)
{
unsigned long long outFileSize = 0;
@ -2220,16 +2200,16 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile,
writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
strm.next_out = (Bytef*)writeJob->buffer;
strm.avail_out = (uInt)writeJob->bufferSize;
strm.next_in = (BYTE const*)ress->srcBuffer;
strm.avail_in = ress->srcBufferLoaded;
strm.next_in = (BYTE const*)ress->readCtx->srcBuffer;
strm.avail_in = ress->readCtx->srcBufferLoaded;
for ( ; ; ) {
lzma_ret ret;
if (strm.avail_in == 0) {
ress->srcBufferLoaded = fread(ress->srcBuffer, 1, ress->srcBufferSize, srcFile);
if (ress->srcBufferLoaded == 0) action = LZMA_FINISH;
strm.next_in = (BYTE const*)ress->srcBuffer;
strm.avail_in = ress->srcBufferLoaded;
AIO_ReadPool_consumeAndRefill(ress->readCtx);
if (ress->readCtx->srcBufferLoaded == 0) action = LZMA_FINISH;
strm.next_in = (BYTE const*)ress->readCtx->srcBuffer;
strm.avail_in = ress->readCtx->srcBufferLoaded;
}
ret = lzma_code(&strm, action);
@ -2253,7 +2233,7 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile,
if (ret == LZMA_STREAM_END) break;
}
FIO_consumeDSrcBuffer(ress, ress->srcBufferLoaded - strm.avail_in);
AIO_ReadPool_consumeBytes(ress->readCtx, ress->readCtx->srcBufferLoaded - strm.avail_in);
lzma_end(&strm);
AIO_WritePool_releaseIoJob(writeJob);
AIO_WritePool_sparseWriteEnd(ress->writeCtx);
@ -2263,8 +2243,7 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile,
#ifdef ZSTD_LZ4DECOMPRESS
static unsigned long long
FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile,
const char* srcFileName)
FIO_decompressLz4Frame(dRess_t* ress, const char* srcFileName)
{
unsigned long long filesize = 0;
LZ4F_errorCode_t nextToLoad = 4;
@ -2282,34 +2261,27 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile,
/* Main Loop */
for (;nextToLoad;) {
size_t readSize;
size_t pos = 0;
size_t decodedBytes = writeJob->bufferSize;
int fullBufferDecoded = 0;
/* Read input */
nextToLoad = MIN(nextToLoad, ress->srcBufferSize-ress->srcBufferLoaded);
readSize = fread((char *)ress->srcBuffer + ress->srcBufferLoaded, 1, nextToLoad, srcFile);
if(!readSize && ferror(srcFile)) {
DISPLAYLEVEL(1, "zstd: %s: read error \n", srcFileName);
decodingError=1;
break;
}
if(!readSize && !ress->srcBufferLoaded) break; /* reached end of file */
ress->srcBufferLoaded += readSize;
AIO_ReadPool_fillBuffer(ress->readCtx, nextToLoad);
if(!ress->readCtx->srcBufferLoaded) break; /* reached end of file */
while ((pos < ress->srcBufferLoaded) || fullBufferDecoded) { /* still to read, or still to flush */
while ((pos < ress->readCtx->srcBufferLoaded) || fullBufferDecoded) { /* still to read, or still to flush */
/* Decode Input (at least partially) */
size_t remaining = ress->srcBufferLoaded - pos;
size_t remaining = ress->readCtx->srcBufferLoaded - pos;
decodedBytes = writeJob->bufferSize;
nextToLoad = LZ4F_decompress(dCtx, writeJob->buffer, &decodedBytes, (char*)(ress->srcBuffer)+pos, &remaining, NULL);
nextToLoad = LZ4F_decompress(dCtx, writeJob->buffer, &decodedBytes, (char*)(ress->readCtx->srcBuffer)+pos,
&remaining, NULL);
if (LZ4F_isError(nextToLoad)) {
DISPLAYLEVEL(1, "zstd: %s: lz4 decompression error : %s \n",
srcFileName, LZ4F_getErrorName(nextToLoad));
decodingError = 1; nextToLoad = 0; break;
}
pos += remaining;
assert(pos <= ress->srcBufferLoaded);
assert(pos <= ress->readCtx->srcBufferLoaded);
fullBufferDecoded = decodedBytes == writeJob->bufferSize;
/* Write Block */
@ -2324,7 +2296,7 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile,
if (!nextToLoad) break;
}
FIO_consumeDSrcBuffer(ress, pos);
AIO_ReadPool_consumeBytes(ress->readCtx, pos);
}
if (nextToLoad!=0) {
DISPLAYLEVEL(1, "zstd: %s: unfinished lz4 stream \n", srcFileName);
@ -2348,23 +2320,20 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile,
* 1 : error
*/
static int FIO_decompressFrames(FIO_ctx_t* const fCtx,
dRess_t ress, FILE* srcFile,
const FIO_prefs_t* const prefs,
const char* dstFileName, const char* srcFileName)
dRess_t ress, const FIO_prefs_t* const prefs,
const char* dstFileName, const char* srcFileName)
{
unsigned readSomething = 0;
unsigned long long filesize = 0;
assert(srcFile != NULL);
/* for each frame */
for ( ; ; ) {
/* check magic number -> version */
size_t const toRead = 4;
const BYTE* const buf = (const BYTE*)ress.srcBuffer;
if (ress.srcBufferLoaded < toRead) /* load up to 4 bytes for header */
ress.srcBufferLoaded += fread((char*)ress.srcBuffer + ress.srcBufferLoaded,
(size_t)1, toRead - ress.srcBufferLoaded, srcFile);
if (ress.srcBufferLoaded==0) {
const BYTE* buf;
AIO_ReadPool_fillBuffer(ress.readCtx, toRead);
buf = (const BYTE*)ress.readCtx->srcBuffer;
if (ress.readCtx->srcBufferLoaded==0) {
if (readSomething==0) { /* srcFile is empty (which is invalid) */
DISPLAYLEVEL(1, "zstd: %s: unexpected end of file \n", srcFileName);
return 1;
@ -2372,17 +2341,17 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx,
break; /* no more input */
}
readSomething = 1; /* there is at least 1 byte in srcFile */
if (ress.srcBufferLoaded < toRead) {
if (ress.readCtx->srcBufferLoaded < toRead) {
DISPLAYLEVEL(1, "zstd: %s: unknown header \n", srcFileName);
return 1;
}
if (ZSTD_isFrame(buf, ress.srcBufferLoaded)) {
unsigned long long const frameSize = FIO_decompressZstdFrame(fCtx, &ress, srcFile, prefs, srcFileName, filesize);
if (ZSTD_isFrame(buf, ress.readCtx->srcBufferLoaded)) {
unsigned long long const frameSize = FIO_decompressZstdFrame(fCtx, &ress, prefs, srcFileName, filesize);
if (frameSize == FIO_ERROR_FRAME_DECODING) return 1;
filesize += frameSize;
} else if (buf[0] == 31 && buf[1] == 139) { /* gz magic number */
#ifdef ZSTD_GZDECOMPRESS
unsigned long long const frameSize = FIO_decompressGzFrame(&ress, srcFile, srcFileName);
unsigned long long const frameSize = FIO_decompressGzFrame(&ress, srcFileName);
if (frameSize == FIO_ERROR_FRAME_DECODING) return 1;
filesize += frameSize;
#else
@ -2392,7 +2361,7 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx,
} else if ((buf[0] == 0xFD && buf[1] == 0x37) /* xz magic number */
|| (buf[0] == 0x5D && buf[1] == 0x00)) { /* lzma header (no magic number) */
#ifdef ZSTD_LZMADECOMPRESS
unsigned long long const frameSize = FIO_decompressLzmaFrame(&ress, srcFile, srcFileName, buf[0] != 0xFD);
unsigned long long const frameSize = FIO_decompressLzmaFrame(&ress, srcFileName, buf[0] != 0xFD);
if (frameSize == FIO_ERROR_FRAME_DECODING) return 1;
filesize += frameSize;
#else
@ -2401,7 +2370,7 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx,
#endif
} else if (MEM_readLE32(buf) == LZ4_MAGICNUMBER) {
#ifdef ZSTD_LZ4DECOMPRESS
unsigned long long const frameSize = FIO_decompressLz4Frame(&ress, srcFile, srcFileName);
unsigned long long const frameSize = FIO_decompressLz4Frame(&ress, srcFileName);
if (frameSize == FIO_ERROR_FRAME_DECODING) return 1;
filesize += frameSize;
#else
@ -2409,10 +2378,7 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx,
return 1;
#endif
} else if ((prefs->overwrite) && !strcmp (dstFileName, stdoutmark)) { /* pass-through mode */
return FIO_passThrough(prefs,
AIO_WritePool_getFile(ress.writeCtx), srcFile,
ress.srcBuffer, ress.srcBufferSize,
ress.srcBufferLoaded);
return FIO_passThrough(&ress);
} else {
DISPLAYLEVEL(1, "zstd: %s: unsupported format \n", srcFileName);
return 1;
@ -2432,15 +2398,14 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx,
}
/** FIO_decompressDstFile() :
open `dstFileName`,
or path-through if ress.dstFile is already != 0,
open `dstFileName`, or pass-through if writeCtx's file is already != 0,
then start decompression process (FIO_decompressFrames()).
@return : 0 : OK
1 : operation aborted
*/
static int FIO_decompressDstFile(FIO_ctx_t* const fCtx,
FIO_prefs_t* const prefs,
dRess_t ress, FILE* srcFile,
dRess_t ress,
const char* dstFileName, const char* srcFileName)
{
int result;
@ -2472,7 +2437,7 @@ static int FIO_decompressDstFile(FIO_ctx_t* const fCtx,
addHandler(dstFileName);
}
result = FIO_decompressFrames(fCtx, ress, srcFile, prefs, dstFileName, srcFileName);
result = FIO_decompressFrames(fCtx, ress, prefs, dstFileName, srcFileName);
if (releaseDstFile) {
clearHandler();
@ -2513,9 +2478,11 @@ static int FIO_decompressSrcFile(FIO_ctx_t* const fCtx, FIO_prefs_t* const prefs
srcFile = FIO_openSrcFile(prefs, srcFileName);
if (srcFile==NULL) return 1;
ress.srcBufferLoaded = 0;
AIO_ReadPool_setFile(ress.readCtx, srcFile);
result = FIO_decompressDstFile(fCtx, prefs, ress, srcFile, dstFileName, srcFileName);
result = FIO_decompressDstFile(fCtx, prefs, ress, dstFileName, srcFileName);
AIO_ReadPool_setFile(ress.readCtx, NULL);
/* Close file */
if (fclose(srcFile)) {

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) Yann Collet, Facebook, Inc.
* Copyright (c) Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under both the BSD-style license (found in the
@ -29,7 +29,8 @@
/** AIO_fwriteSparse() :
* @return : storedSkips,
* argument for next call to AIO_fwriteSparse() or AIO_fwriteSparseEnd() */
unsigned AIO_fwriteSparse(FILE* file,
static unsigned
AIO_fwriteSparse(FILE* file,
const void* buffer, size_t bufferSize,
const FIO_prefs_t* const prefs,
unsigned storedSkips)
@ -45,7 +46,7 @@ unsigned AIO_fwriteSparse(FILE* file,
if (!prefs->sparseFileSupport) { /* normal write */
size_t const sizeCheck = fwrite(buffer, 1, bufferSize, file);
if (sizeCheck != bufferSize)
EXM_THROW(70, "Write error : cannot write decoded block : %s",
EXM_THROW(70, "Write error : cannot write block : %s",
strerror(errno));
return 0;
}
@ -77,7 +78,7 @@ unsigned AIO_fwriteSparse(FILE* file,
storedSkips = 0;
/* write the rest */
if (fwrite(ptrT + nb0T, sizeof(size_t), nbNon0ST, file) != nbNon0ST)
EXM_THROW(93, "Write error : cannot write decoded block : %s",
EXM_THROW(93, "Write error : cannot write block : %s",
strerror(errno));
}
ptrT += seg0SizeT;
@ -106,7 +107,8 @@ unsigned AIO_fwriteSparse(FILE* file,
return storedSkips;
}
void AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips)
static void
AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips)
{
if (prefs->testMode) assert(storedSkips == 0);
if (storedSkips>0) {
@ -127,17 +129,25 @@ void AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned st
* AsyncIO functionality
************************************************************************/
/* AIO_supported:
* Returns 1 if AsyncIO is supported on the system, 0 otherwise. */
int AIO_supported(void) {
#ifdef ZSTD_MULTITHREAD
return 1;
#else
return 0;
#endif
}
/* ***********************************
* General IoPool implementation
*************************************/
static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) {
void *buffer;
IOJob_t *job;
job = (IOJob_t*) malloc(sizeof(IOJob_t));
buffer = malloc(bufferSize);
IOJob_t* const job = (IOJob_t*) malloc(sizeof(IOJob_t));
void* const buffer = malloc(bufferSize);
if(!job || !buffer)
EXM_THROW(101, "Allocation error : not enough memory");
EXM_THROW(101, "Allocation error : not enough memory");
job->buffer = buffer;
job->bufferSize = bufferSize;
job->usedBufferSize = 0;
@ -151,49 +161,47 @@ static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) {
/* AIO_IOPool_createThreadPool:
* Creates a thread pool and a mutex for threaded IO pool.
* Displays warning if asyncio is requested but MT isn't available. */
static void AIO_IOPool_createThreadPool(IOPoolCtx_t *ctx, const FIO_prefs_t *prefs) {
static void AIO_IOPool_createThreadPool(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs) {
ctx->threadPool = NULL;
if(prefs->asyncIO) {
if (ZSTD_pthread_mutex_init(&ctx->ioJobsMutex, NULL))
EXM_THROW(102,"Failed creating write availableJobs mutex");
EXM_THROW(102,"Failed creating write availableJobs mutex");
/* We want MAX_IO_JOBS-2 queue items because we need to always have 1 free buffer to
* decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */
assert(MAX_IO_JOBS >= 2);
ctx->threadPool = POOL_create(1, MAX_IO_JOBS - 2);
if (!ctx->threadPool)
EXM_THROW(104, "Failed creating writer thread pool");
EXM_THROW(104, "Failed creating writer thread pool");
}
}
/* AIO_IOPool_init:
* Allocates and sets and a new write pool including its included availableJobs. */
static void AIO_IOPool_init(IOPoolCtx_t *ctx, FIO_prefs_t* const prefs, POOL_function poolFunction, size_t bufferSize) {
static void AIO_IOPool_init(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs, POOL_function poolFunction, size_t bufferSize) {
int i;
AIO_IOPool_createThreadPool(ctx, prefs);
ctx->prefs = prefs;
ctx->poolFunction = poolFunction;
ctx->totalIoJobs = ctx->threadPool ? MAX_IO_JOBS : 1;
ctx->totalIoJobs = ctx->threadPool ? MAX_IO_JOBS : 2;
ctx->availableJobsCount = ctx->totalIoJobs;
for(i=0; i < ctx->availableJobsCount; i++) {
ctx->availableJobs[i] = AIO_IOPool_createIoJob(ctx, bufferSize);
}
ctx->jobBufferSize = bufferSize;
ctx->file = NULL;
}
/* AIO_IOPool_releaseIoJob:
* Releases an acquired job back to the pool. Doesn't execute the job. */
static void AIO_IOPool_releaseIoJob(IOJob_t *job) {
IOPoolCtx_t *ctx = (IOPoolCtx_t *) job->ctx;
if(ctx->threadPool) {
static void AIO_IOPool_releaseIoJob(IOJob_t* job) {
IOPoolCtx_t* const ctx = (IOPoolCtx_t *) job->ctx;
if(ctx->threadPool)
ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
assert(ctx->availableJobsCount < MAX_IO_JOBS);
ctx->availableJobs[ctx->availableJobsCount++] = job;
assert(ctx->availableJobsCount < ctx->totalIoJobs);
ctx->availableJobs[ctx->availableJobsCount++] = job;
if(ctx->threadPool)
ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
} else {
assert(ctx->availableJobsCount == 0);
ctx->availableJobsCount++;
}
}
/* AIO_IOPool_join:
@ -225,19 +233,15 @@ static void AIO_IOPool_destroy(IOPoolCtx_t* ctx) {
/* AIO_IOPool_acquireJob:
* Returns an available io job to be used for a future io. */
static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t *ctx) {
static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t* ctx) {
IOJob_t *job;
assert(ctx->file != NULL || ctx->prefs->testMode);
if(ctx->threadPool) {
if(ctx->threadPool)
ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
assert(ctx->availableJobsCount > 0);
job = (IOJob_t*) ctx->availableJobs[--ctx->availableJobsCount];
assert(ctx->availableJobsCount > 0);
job = (IOJob_t*) ctx->availableJobs[--ctx->availableJobsCount];
if(ctx->threadPool)
ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
} else {
assert(ctx->availableJobsCount == 1);
ctx->availableJobsCount--;
job = (IOJob_t*)ctx->availableJobs[0];
}
job->usedBufferSize = 0;
job->file = ctx->file;
job->offset = 0;
@ -249,22 +253,22 @@ static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t *ctx) {
* Sets the destination file for future files in the pool.
* Requires completion of all queues write jobs and release of all otherwise acquired jobs.
* Also requires ending of sparse write if a previous file was used in sparse mode. */
static void AIO_IOPool_setFile(IOPoolCtx_t *ctx, FILE* file) {
static void AIO_IOPool_setFile(IOPoolCtx_t* ctx, FILE* file) {
assert(ctx!=NULL);
AIO_IOPool_join(ctx);
assert(ctx->availableJobsCount == ctx->totalIoJobs);
ctx->file = file;
}
static FILE* AIO_IOPool_getFile(IOPoolCtx_t *ctx) {
static FILE* AIO_IOPool_getFile(const IOPoolCtx_t* ctx) {
return ctx->file;
}
/* AIO_IOPool_enqueueJob:
* Enqueues an io job for execution.
* The queued job shouldn't be used directly after queueing it. */
static void AIO_IOPool_enqueueJob(IOJob_t *job) {
IOPoolCtx_t* ctx = (IOPoolCtx_t *)job->ctx;
static void AIO_IOPool_enqueueJob(IOJob_t* job) {
IOPoolCtx_t* const ctx = (IOPoolCtx_t *)job->ctx;
if(ctx->threadPool)
POOL_add(ctx->threadPool, ctx->poolFunction, job);
else
@ -277,7 +281,7 @@ static void AIO_IOPool_enqueueJob(IOJob_t *job) {
/* AIO_WritePool_acquireJob:
* Returns an available write job to be used for a future write. */
IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t *ctx) {
IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t* ctx) {
return AIO_IOPool_acquireJob(&ctx->base);
}
@ -294,7 +298,7 @@ void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job) {
/* AIO_WritePool_sparseWriteEnd:
* Ends sparse writes to the current file.
* Blocks on completion of all current write jobs before executing. */
void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t *ctx) {
void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t* ctx) {
assert(ctx != NULL);
if(ctx->base.threadPool)
POOL_joinJobs(ctx->base.threadPool);
@ -306,28 +310,28 @@ void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t *ctx) {
* Sets the destination file for future writes in the pool.
* Requires completion of all queues write jobs and release of all otherwise acquired jobs.
* Also requires ending of sparse write if a previous file was used in sparse mode. */
void AIO_WritePool_setFile(WritePoolCtx_t *ctx, FILE* file) {
void AIO_WritePool_setFile(WritePoolCtx_t* ctx, FILE* file) {
AIO_IOPool_setFile(&ctx->base, file);
assert(ctx->storedSkips == 0);
}
/* AIO_WritePool_getFile:
* Returns the file the writePool is currently set to write to. */
FILE* AIO_WritePool_getFile(WritePoolCtx_t *ctx) {
FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx) {
return AIO_IOPool_getFile(&ctx->base);
}
/* AIO_WritePool_releaseIoJob:
* Releases an acquired job back to the pool. Doesn't execute the job. */
void AIO_WritePool_releaseIoJob(IOJob_t *job) {
void AIO_WritePool_releaseIoJob(IOJob_t* job) {
AIO_IOPool_releaseIoJob(job);
}
/* AIO_WritePool_closeFile:
* Ends sparse write and closes the writePool's current file and sets the file to NULL.
* Requires completion of all queues write jobs and release of all otherwise acquired jobs. */
int AIO_WritePool_closeFile(WritePoolCtx_t *ctx) {
FILE *dstFile = ctx->base.file;
int AIO_WritePool_closeFile(WritePoolCtx_t* ctx) {
FILE* const dstFile = ctx->base.file;
assert(dstFile!=NULL || ctx->base.prefs->testMode!=0);
AIO_WritePool_sparseWriteEnd(ctx);
AIO_IOPool_setFile(&ctx->base, NULL);
@ -337,16 +341,16 @@ int AIO_WritePool_closeFile(WritePoolCtx_t *ctx) {
/* AIO_WritePool_executeWriteJob:
* Executes a write job synchronously. Can be used as a function for a thread pool. */
static void AIO_WritePool_executeWriteJob(void* opaque){
IOJob_t* job = (IOJob_t*) opaque;
WritePoolCtx_t* ctx = (WritePoolCtx_t*) job->ctx;
IOJob_t* const job = (IOJob_t*) opaque;
WritePoolCtx_t* const ctx = (WritePoolCtx_t*) job->ctx;
ctx->storedSkips = AIO_fwriteSparse(job->file, job->buffer, job->usedBufferSize, ctx->base.prefs, ctx->storedSkips);
AIO_IOPool_releaseIoJob(job);
}
/* AIO_WritePool_create:
* Allocates and sets and a new write pool including its included jobs. */
WritePoolCtx_t* AIO_WritePool_create(FIO_prefs_t* const prefs, size_t bufferSize) {
WritePoolCtx_t* ctx = (WritePoolCtx_t*) malloc(sizeof(WritePoolCtx_t));
WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize) {
WritePoolCtx_t* const ctx = (WritePoolCtx_t*) malloc(sizeof(WritePoolCtx_t));
if(!ctx) EXM_THROW(100, "Allocation error : not enough memory");
AIO_IOPool_init(&ctx->base, prefs, AIO_WritePool_executeWriteJob, bufferSize);
ctx->storedSkips = 0;
@ -363,3 +367,256 @@ void AIO_WritePool_free(WritePoolCtx_t* ctx) {
assert(ctx->storedSkips==0);
free(ctx);
}
/* ***********************************
* ReadPool implementation
*************************************/
static void AIO_ReadPool_releaseAllCompletedJobs(ReadPoolCtx_t* ctx) {
int i;
for(i=0; i<ctx->completedJobsCount; i++) {
IOJob_t* job = (IOJob_t*) ctx->completedJobs[i];
AIO_IOPool_releaseIoJob(job);
}
ctx->completedJobsCount = 0;
}
static void AIO_ReadPool_addJobToCompleted(IOJob_t* job) {
ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
if(ctx->base.threadPool)
ZSTD_pthread_mutex_lock(&ctx->base.ioJobsMutex);
assert(ctx->completedJobsCount < MAX_IO_JOBS);
ctx->completedJobs[ctx->completedJobsCount++] = job;
if(ctx->base.threadPool) {
ZSTD_pthread_cond_signal(&ctx->jobCompletedCond);
ZSTD_pthread_mutex_unlock(&ctx->base.ioJobsMutex);
}
}
/* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked:
* Looks through the completed jobs for a job matching the waitingOnOffset and returns it,
* if job wasn't found returns NULL.
* IMPORTANT: assumes ioJobsMutex is locked. */
static IOJob_t* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ReadPoolCtx_t* ctx) {
IOJob_t *job = NULL;
int i;
/* This implementation goes through all completed jobs and looks for the one matching the next offset.
* While not strictly needed for a single threaded reader implementation (as in such a case we could expect
* reads to be completed in order) this implementation was chosen as it better fits other asyncio
* interfaces (such as io_uring) that do not provide promises regarding order of completion. */
for (i=0; i<ctx->completedJobsCount; i++) {
job = (IOJob_t *) ctx->completedJobs[i];
if (job->offset == ctx->waitingOnOffset) {
ctx->completedJobs[i] = ctx->completedJobs[--ctx->completedJobsCount];
return job;
}
}
return NULL;
}
/* AIO_ReadPool_numReadsInFlight:
* Returns the number of IO read jobs currrently in flight. */
static size_t AIO_ReadPool_numReadsInFlight(ReadPoolCtx_t* ctx) {
const size_t jobsHeld = (ctx->currentJobHeld==NULL ? 0 : 1);
return ctx->base.totalIoJobs - (ctx->base.availableJobsCount + ctx->completedJobsCount + jobsHeld);
}
/* AIO_ReadPool_getNextCompletedJob:
* Returns a completed IOJob_t for the next read in line based on waitingOnOffset and advances waitingOnOffset.
* Would block. */
static IOJob_t* AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t* ctx) {
IOJob_t *job = NULL;
if (ctx->base.threadPool)
ZSTD_pthread_mutex_lock(&ctx->base.ioJobsMutex);
job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);
/* As long as we didn't find the job matching the next read, and we have some reads in flight continue waiting */
while (!job && (AIO_ReadPool_numReadsInFlight(ctx) > 0)) {
assert(ctx->base.threadPool != NULL); /* we shouldn't be here if we work in sync mode */
ZSTD_pthread_cond_wait(&ctx->jobCompletedCond, &ctx->base.ioJobsMutex);
job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);
}
if(job) {
assert(job->offset == ctx->waitingOnOffset);
ctx->waitingOnOffset += job->usedBufferSize;
}
if (ctx->base.threadPool)
ZSTD_pthread_mutex_unlock(&ctx->base.ioJobsMutex);
return job;
}
/* AIO_ReadPool_executeReadJob:
* Executes a read job synchronously. Can be used as a function for a thread pool. */
static void AIO_ReadPool_executeReadJob(void* opaque){
IOJob_t* const job = (IOJob_t*) opaque;
ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
if(ctx->reachedEof) {
job->usedBufferSize = 0;
AIO_ReadPool_addJobToCompleted(job);
return;
}
job->usedBufferSize = fread(job->buffer, 1, job->bufferSize, job->file);
if(job->usedBufferSize < job->bufferSize) {
if(ferror(job->file)) {
EXM_THROW(37, "Read error");
} else if(feof(job->file)) {
ctx->reachedEof = 1;
} else {
EXM_THROW(37, "Unexpected short read");
}
}
AIO_ReadPool_addJobToCompleted(job);
}
static void AIO_ReadPool_enqueueRead(ReadPoolCtx_t* ctx) {
IOJob_t* const job = AIO_IOPool_acquireJob(&ctx->base);
job->offset = ctx->nextReadOffset;
ctx->nextReadOffset += job->bufferSize;
AIO_IOPool_enqueueJob(job);
}
static void AIO_ReadPool_startReading(ReadPoolCtx_t* ctx) {
int i;
for (i = 0; i < ctx->base.availableJobsCount; i++) {
AIO_ReadPool_enqueueRead(ctx);
}
}
/* AIO_ReadPool_setFile:
* Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL.
* Waits for all current enqueued tasks to complete if a previous file was set. */
void AIO_ReadPool_setFile(ReadPoolCtx_t* ctx, FILE* file) {
assert(ctx!=NULL);
AIO_IOPool_join(&ctx->base);
AIO_ReadPool_releaseAllCompletedJobs(ctx);
if (ctx->currentJobHeld) {
AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld);
ctx->currentJobHeld = NULL;
}
AIO_IOPool_setFile(&ctx->base, file);
ctx->nextReadOffset = 0;
ctx->waitingOnOffset = 0;
ctx->srcBuffer = ctx->coalesceBuffer;
ctx->srcBufferLoaded = 0;
ctx->reachedEof = 0;
if(file != NULL)
AIO_ReadPool_startReading(ctx);
}
/* AIO_ReadPool_create:
* Allocates and sets and a new readPool including its included jobs.
* bufferSize should be set to the maximal buffer we want to read at a time, will also be used
* as our basic read size. */
ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize) {
ReadPoolCtx_t* const ctx = (ReadPoolCtx_t*) malloc(sizeof(ReadPoolCtx_t));
if(!ctx) EXM_THROW(100, "Allocation error : not enough memory");
AIO_IOPool_init(&ctx->base, prefs, AIO_ReadPool_executeReadJob, bufferSize);
ctx->coalesceBuffer = (U8*) malloc(bufferSize * 2);
ctx->srcBuffer = ctx->coalesceBuffer;
ctx->srcBufferLoaded = 0;
ctx->completedJobsCount = 0;
ctx->currentJobHeld = NULL;
if(ctx->base.threadPool)
if (ZSTD_pthread_cond_init(&ctx->jobCompletedCond, NULL))
EXM_THROW(103,"Failed creating write jobCompletedCond mutex");
return ctx;
}
/* AIO_ReadPool_free:
* Frees and releases a readPool and its resources. Closes source file. */
void AIO_ReadPool_free(ReadPoolCtx_t* ctx) {
if(AIO_ReadPool_getFile(ctx))
AIO_ReadPool_closeFile(ctx);
if(ctx->base.threadPool)
ZSTD_pthread_cond_destroy(&ctx->jobCompletedCond);
AIO_IOPool_destroy(&ctx->base);
free(ctx->coalesceBuffer);
free(ctx);
}
/* AIO_ReadPool_consumeBytes:
* Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */
void AIO_ReadPool_consumeBytes(ReadPoolCtx_t* ctx, size_t n) {
assert(n <= ctx->srcBufferLoaded);
ctx->srcBufferLoaded -= n;
ctx->srcBuffer += n;
}
/* AIO_ReadPool_releaseCurrentlyHeldAndGetNext:
* Release the current held job and get the next one, returns NULL if no next job available. */
static IOJob_t* AIO_ReadPool_releaseCurrentHeldAndGetNext(ReadPoolCtx_t* ctx) {
if (ctx->currentJobHeld) {
AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld);
ctx->currentJobHeld = NULL;
AIO_ReadPool_enqueueRead(ctx);
}
ctx->currentJobHeld = AIO_ReadPool_getNextCompletedJob(ctx);
return (IOJob_t*) ctx->currentJobHeld;
}
/* AIO_ReadPool_fillBuffer:
* Tries to fill the buffer with at least n or jobBufferSize bytes (whichever is smaller).
* Returns if srcBuffer has at least the expected number of bytes loaded or if we've reached the end of the file.
* Return value is the number of bytes added to the buffer.
* Note that srcBuffer might have up to 2 times jobBufferSize bytes. */
size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t* ctx, size_t n) {
IOJob_t *job;
int useCoalesce = 0;
if(n > ctx->base.jobBufferSize)
n = ctx->base.jobBufferSize;
/* We are good, don't read anything */
if (ctx->srcBufferLoaded >= n)
return 0;
/* We still have bytes loaded, but not enough to satisfy caller. We need to get the next job
* and coalesce the remaining bytes with the next job's buffer */
if (ctx->srcBufferLoaded > 0) {
useCoalesce = 1;
memcpy(ctx->coalesceBuffer, ctx->srcBuffer, ctx->srcBufferLoaded);
ctx->srcBuffer = ctx->coalesceBuffer;
}
/* Read the next chunk */
job = AIO_ReadPool_releaseCurrentHeldAndGetNext(ctx);
if(!job)
return 0;
if(useCoalesce) {
assert(ctx->srcBufferLoaded + job->usedBufferSize <= 2*ctx->base.jobBufferSize);
memcpy(ctx->coalesceBuffer + ctx->srcBufferLoaded, job->buffer, job->usedBufferSize);
ctx->srcBufferLoaded += job->usedBufferSize;
}
else {
ctx->srcBuffer = (U8 *) job->buffer;
ctx->srcBufferLoaded = job->usedBufferSize;
}
return job->usedBufferSize;
}
/* AIO_ReadPool_consumeAndRefill:
* Consumes the current buffer and refills it with bufferSize bytes. */
size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t* ctx) {
AIO_ReadPool_consumeBytes(ctx, ctx->srcBufferLoaded);
return AIO_ReadPool_fillBuffer(ctx, ctx->base.jobBufferSize);
}
/* AIO_ReadPool_getFile:
* Returns the current file set for the read pool. */
FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t* ctx) {
return AIO_IOPool_getFile(&ctx->base);
}
/* AIO_ReadPool_closeFile:
* Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */
int AIO_ReadPool_closeFile(ReadPoolCtx_t* ctx) {
FILE* const file = AIO_ReadPool_getFile(ctx);
AIO_ReadPool_setFile(ctx, NULL);
return fclose(file);
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) Yann Collet, Facebook, Inc.
* Copyright (c) Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under both the BSD-style license (found in the
@ -28,7 +28,7 @@ typedef struct {
/* These struct fields should be set only on creation and not changed afterwards */
POOL_ctx* threadPool;
int totalIoJobs;
FIO_prefs_t* prefs;
const FIO_prefs_t* prefs;
POOL_function poolFunction;
/* Controls the file we currently write to, make changes only by using provided utility functions */
@ -39,8 +39,36 @@ typedef struct {
ZSTD_pthread_mutex_t ioJobsMutex;
void* availableJobs[MAX_IO_JOBS];
int availableJobsCount;
size_t jobBufferSize;
} IOPoolCtx_t;
typedef struct {
IOPoolCtx_t base;
/* State regarding the currently read file */
int reachedEof;
U64 nextReadOffset;
U64 waitingOnOffset;
/* We may hold an IOJob object as needed if we actively expose its buffer. */
void *currentJobHeld;
/* Coalesce buffer is used to join two buffers in case where we need to read more bytes than left in
* the first of them. Shouldn't be accessed from outside ot utility functions. */
U8 *coalesceBuffer;
/* Read buffer can be used by consumer code, take care when copying this pointer aside as it might
* change when consuming / refilling buffer. */
U8 *srcBuffer;
size_t srcBufferLoaded;
/* We need to know what tasks completed so we can use their buffers when their time comes.
* Should only be accessed after locking base.ioJobsMutex . */
void* completedJobs[MAX_IO_JOBS];
int completedJobsCount;
ZSTD_pthread_cond_t jobCompletedCond;
} ReadPoolCtx_t;
typedef struct {
IOPoolCtx_t base;
unsigned storedSkips;
@ -59,15 +87,10 @@ typedef struct {
U64 offset;
} IOJob_t;
/** AIO_fwriteSparse() :
* @return : storedSkips,
* argument for next call to AIO_fwriteSparse() or AIO_fwriteSparseEnd() */
unsigned AIO_fwriteSparse(FILE* file,
const void* buffer, size_t bufferSize,
const FIO_prefs_t* const prefs,
unsigned storedSkips);
/* AIO_supported:
* Returns 1 if AsyncIO is supported on the system, 0 otherwise. */
int AIO_supported(void);
void AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips);
/* AIO_WritePool_releaseIoJob:
* Releases an acquired job back to the pool. Doesn't execute the job. */
@ -97,7 +120,7 @@ void AIO_WritePool_setFile(WritePoolCtx_t *ctx, FILE* file);
/* AIO_WritePool_getFile:
* Returns the file the writePool is currently set to write to. */
FILE* AIO_WritePool_getFile(WritePoolCtx_t *ctx);
FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx);
/* AIO_WritePool_closeFile:
* Ends sparse write and closes the writePool's current file and sets the file to NULL.
@ -107,12 +130,50 @@ int AIO_WritePool_closeFile(WritePoolCtx_t *ctx);
/* AIO_WritePool_create:
* Allocates and sets and a new write pool including its included jobs.
* bufferSize should be set to the maximal buffer we want to write to at a time. */
WritePoolCtx_t* AIO_WritePool_create(FIO_prefs_t* const prefs, size_t bufferSize);
WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize);
/* AIO_WritePool_free:
* Frees and releases a writePool and its resources. Closes destination file. */
void AIO_WritePool_free(WritePoolCtx_t* ctx);
/* AIO_ReadPool_create:
* Allocates and sets and a new readPool including its included jobs.
* bufferSize should be set to the maximal buffer we want to read at a time, will also be used
* as our basic read size. */
ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize);
/* AIO_ReadPool_free:
* Frees and releases a readPool and its resources. Closes source file. */
void AIO_ReadPool_free(ReadPoolCtx_t* ctx);
/* AIO_ReadPool_consumeBytes:
* Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */
void AIO_ReadPool_consumeBytes(ReadPoolCtx_t *ctx, size_t n);
/* AIO_ReadPool_fillBuffer:
* Makes sure buffer has at least n bytes loaded (as long as n is not bigger than the initalized bufferSize).
* Returns if srcBuffer has at least n bytes loaded or if we've reached the end of the file.
* Return value is the number of bytes added to the buffer.
* Note that srcBuffer might have up to 2 times bufferSize bytes. */
size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t *ctx, size_t n);
/* AIO_ReadPool_consumeAndRefill:
* Consumes the current buffer and refills it with bufferSize bytes. */
size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t *ctx);
/* AIO_ReadPool_setFile:
* Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL.
* Waits for all current enqueued tasks to complete if a previous file was set. */
void AIO_ReadPool_setFile(ReadPoolCtx_t *ctx, FILE* file);
/* AIO_ReadPool_getFile:
* Returns the current file set for the read pool. */
FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t *ctx);
/* AIO_ReadPool_closeFile:
* Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */
int AIO_ReadPool_closeFile(ReadPoolCtx_t *ctx);
#if defined (__cplusplus)
}
#endif

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) Yann Collet, Facebook, Inc.
* Copyright (c) Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under both the BSD-style license (found in the

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) Yann Collet, Facebook, Inc.
* Copyright (c) Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under both the BSD-style license (found in the
@ -70,4 +70,4 @@ typedef struct FIO_prefs_s {
int allowBlockDevices;
} FIO_prefs_t;
#endif /* FILEIO_TYPES_HEADER */
#endif /* FILEIO_TYPES_HEADER */

View File

@ -46,6 +46,7 @@
# include "zstdcli_trace.h"
#endif
#include "../lib/zstd.h" /* ZSTD_VERSION_STRING, ZSTD_minCLevel, ZSTD_maxCLevel */
#include "fileio_asyncio.h"
/*-************************************
@ -179,7 +180,8 @@ static void usage_advanced(const char* programName)
#ifdef UTIL_HAS_MIRRORFILELIST
DISPLAYOUT( "--output-dir-mirror DIR : processed files are stored into DIR respecting original directory structure \n");
#endif
if (AIO_supported())
DISPLAYOUT( "--[no-]asyncio : use asynchronous IO (default: enabled) \n");
#ifndef ZSTD_NOCOMPRESS
DISPLAYOUT( "--[no-]check : during compression, add XXH64 integrity checksum to frame (default: enabled)");
@ -242,9 +244,6 @@ static void usage_advanced(const char* programName)
DISPLAYOUT( " -l : print information about zstd compressed files \n");
DISPLAYOUT( "--test : test compressed file integrity \n");
DISPLAYOUT( " -M# : Set a memory usage limit for decompression \n");
#ifdef ZSTD_MULTITHREAD
DISPLAYOUT( "--[no-]asyncio : use threaded asynchronous IO for output (default: disabled) \n");
#endif
# if ZSTD_SPARSE_DEFAULT
DISPLAYOUT( "--[no-]sparse : sparse mode (default: enabled on file, disabled on stdout) \n");
# else
@ -807,9 +806,7 @@ int main(int argCount, const char* argv[])
separateFiles = 0,
setRealTimePrio = 0,
singleThread = 0,
#ifdef ZSTD_MULTITHREAD
defaultLogicalCores = 0,
#endif
showDefaultCParams = 0,
ultra=0,
contentSize=1;
@ -1002,7 +999,6 @@ int main(int argCount, const char* argv[])
if (longCommandWArg(&argument, "--target-compressed-block-size=")) { targetCBlockSize = readSizeTFromChar(&argument); continue; }
if (longCommandWArg(&argument, "--size-hint=")) { srcSizeHint = readSizeTFromChar(&argument); continue; }
if (longCommandWArg(&argument, "--output-dir-flat")) { NEXT_FIELD(outDirName); continue; }
#ifdef ZSTD_MULTITHREAD
if (longCommandWArg(&argument, "--auto-threads")) {
const char* threadDefault = NULL;
NEXT_FIELD(threadDefault);
@ -1010,7 +1006,6 @@ int main(int argCount, const char* argv[])
defaultLogicalCores = 1;
continue;
}
#endif
#ifdef UTIL_HAS_MIRRORFILELIST
if (longCommandWArg(&argument, "--output-dir-mirror")) { NEXT_FIELD(outMirroredDirName); continue; }
#endif
@ -1226,7 +1221,7 @@ int main(int argCount, const char* argv[])
}
}
#else
(void)singleThread; (void)nbWorkers;
(void)singleThread; (void)nbWorkers; (void)defaultLogicalCores;
#endif
g_utilDisplayLevel = g_displayLevel;
@ -1463,6 +1458,7 @@ int main(int argCount, const char* argv[])
FIO_setTargetCBlockSize(prefs, targetCBlockSize);
FIO_setSrcSizeHint(prefs, srcSizeHint);
FIO_setLiteralCompressionMode(prefs, literalCompressionMode);
FIO_setSparseWrite(prefs, 0);
if (adaptMin > cLevel) cLevel = adaptMin;
if (adaptMax < cLevel) cLevel = adaptMax;

View File

@ -297,7 +297,7 @@ check: shortest
fuzztest: test-fuzzer test-zstream test-decodecorpus
.PHONY: test
test: test-zstd test-fullbench test-fuzzer test-zstream test-invalidDictionaries test-legacy test-decodecorpus
test: test-zstd test-fullbench test-fuzzer test-zstream test-invalidDictionaries test-legacy test-decodecorpus test-cli-tests
ifeq ($(QEMU_SYS),)
test: test-pool
endif
@ -322,6 +322,12 @@ test-zstd test-zstd32 test-zstd-nolegacy: datagen
file $(ZSTD)
EXE_PREFIX="$(QEMU_SYS)" ZSTD_BIN="$(ZSTD)" DATAGEN_BIN=./datagen ./playTests.sh $(ZSTDRTTEST)
test-cli-tests: ZSTD = $(PRGDIR)/zstd
test-cli-tests: zstd datagen
file $(ZSTD)
./cli-tests/run.py --exec-prefix="$(QEMU_SYS)" --zstd="$(ZSTD)" --datagen=./datagen
test-fullbench: fullbench datagen
$(QEMU_SYS) ./fullbench -i1
$(QEMU_SYS) ./fullbench -i1 -P0

4
tests/cli-tests/.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
scratch/
!bin/
!datagen
!zstdcat

248
tests/cli-tests/README.md Normal file
View File

@ -0,0 +1,248 @@
# CLI tests
The CLI tests are focused on testing the zstd CLI.
They are intended to be simple tests that the CLI and arguments work as advertised.
They are not intended to test the library, only the code in `programs/`.
The library will get incidental coverage, but if you find yourself trying to trigger a specific condition in the library, this is the wrong tool.
## Test runner usage
The test runner `run.py` will run tests against the in-tree build of `zstd` and `datagen` by default. Which means that `zstd` and `datagen` must be built.
The `zstd` binary used can be passed with `--zstd /path/to/zstd`.
Additionally, to run `zstd` through a tool like `valgrind` or `qemu`, set the `--exec-prefix 'valgrind -q'` flag.
Similarly, the `--datagen`, and `--zstdgrep` flags can be set to specify
the paths to their respective binaries. However, these tools do not use
the `EXEC_PREFIX`.
Each test executes in its own scratch directory under `scratch/test/name`. E.g. `scratch/basic/help.sh/`. Normally these directories are removed after the test executes. However, the `--preserve` flag will preserve these directories after execution, and save the tests exit code, stdout, and stderr in the scratch directory to `exit`, `stderr`, and `stdout` respectively. This can be useful for debugging/editing a test and updating the expected output.
### Running all the tests
By default the test runner `run.py` will run all the tests, and report the results.
Examples:
```
./run.py
./run.py --preserve
./run.py --zstd ../../build/programs/zstd --datagen ../../build/tests/datagen
```
### Running specific tests
A set of test names can be passed to the test runner `run.py` to only execute those tests.
This can be useful for writing or debugging a test, especially with `--preserve`.
The test name can either be the path to the test file, or the test name, which is the path relative to the test directory.
Examples:
```
./run.py basic/help.sh
./run.py --preserve basic/help.sh basic/version.sh
./run.py --preserve --verbose basic/help.sh
```
## Writing a test
Test cases are arbitrary executables, and can be written in any language, but are generally shell scripts.
After the script executes, the exit code, stderr, and stdout are compared against the expectations.
Each test is run in a clean directory that the test can use for intermediate files. This directory will be cleaned up at the end of the test, unless `--preserve` is passed to the test runner. Additionally, the `setup` script can prepare the directory before the test runs.
### Calling zstd, utilities, and environment variables
The `$PATH` for tests is prepended with the `bin/` sub-directory, which contains helper scripts for ease of testing.
The `zstd` binary will call the zstd binary specified by `run.py` with the correct `$EXEC_PREFIX`.
Similarly, `datagen`, `unzstd`, `zstdgrep`, `zstdcat`, etc, are provided.
Helper utilities like `cmp_size`, `println`, and `die` are provided here too. See their scripts for details.
Common shell script libraries are provided under `common/`, with helper variables and functions. They can be sourced with `source "$COMMON/library.sh`.
Lastly, environment variables are provided for testing, which can be listed when calling `run.py` with `--verbose`.
They are generally used by the helper scripts in `bin/` to coordinate everything.
### Basic test case
When executing your `$TEST` executable, by default the exit code is expected to be `0`. However, you can provide an alterate expected exit code in a `$TEST.exit` file.
When executing your `$TEST` exectuable, by default the expected stderr and stdout are empty. However, you can override the default by providing one of three files:
* `$TEST.{stdout,stderr}.exact`
* `$TEST.{stdout,stderr}.glob`
* `$TEST.{stdout,stderr}.ignore`
If you provide a `.exact` file, the output is expected to exactly match, byte-for-byte.
If you provide a `.glob` file, the output is expected to match the expected file, where each line is interpreted as a glob syntax. Additionally, a line containing only `...` matches all lines until the next expected line matches.
If you provide a `.ignore` file, the output is ignored.
#### Passing examples
All these examples pass.
Exit 1, and change the expectation to be 1.
```
exit-1.sh
---
#!/bin/sh
exit 1
---
exit-1.sh.exit
---
1
---
```
Check the stdout output exactly matches.
```
echo.sh
---
#!/bin/sh
echo "hello world"
---
echo.sh.stdout.exact
---
hello world
---
```
Check the stderr output using a glob.
```
random.sh
---
#!/bin/sh
head -c 10 < /dev/urandom | xxd >&2
---
random.sh.stderr.glob
---
00000000: * * * * * *
```
Multiple lines can be matched with ...
```
random-num-lines.sh
---
#!/bin/sh
echo hello
seq 0 $RANDOM
echo world
---
random-num-lines.sh.stdout.glob
---
hello
0
...
world
---
```
#### Failing examples
Exit code is expected to be 0, but is 1.
```
exit-1.sh
---
#!/bin/sh
exit 1
---
```
Stdout is expected to be empty, but isn't.
```
echo.sh
---
#!/bin/sh
echo hello world
```
Stderr is expected to be hello but is world.
```
hello.sh
---
#!/bin/sh
echo world >&2
---
hello.sh.stderr.exact
---
hello
---
```
### Setup & teardown scripts
Finally, test writing can be eased with setup and teardown scripts.
Each directory in the test directory is a test-suite consisting of all tests within that directory (but not sub-directories).
This test suite can come with 4 scripts to help test writing:
* `setup_once`
* `teardown_once`
* `setup`
* `teardown`
The `setup_once` and `teardown_once` are run once before and after all the tests in the suite respectively.
They operate in the scratch directory for the test suite, which is the parent directory of each scratch directory for each test case.
They can do work that is shared between tests to improve test efficiency.
For example, the `dictionaries/setup_once` script builds several dictionaries, for use in the `dictionaries` tests.
The `setup` and `teardown` scripts run before and after each test case respectively, in the test case's scratch directory.
These scripts can do work that is shared between test cases to make tests more succinct.
For example, the `dictionaries/setup` script copies the dictionaries built by the `dictionaries/setup_once` script into the test's scratch directory, to make them easier to use, and make sure they aren't accidentally modified.
#### Examples
```
basic/setup
---
#!/bin/sh
# Create some files for testing with
datagen > file
datagen > file0
datagen > file1
---
basic/test.sh
---
#!/bin/sh
zstd file file0 file1
---
dictionaries/setup_once
---
#!/bin/sh
set -e
mkdir files/ dicts/
for i in $(seq 10); do
datagen -g1000 > files/$i
done
zstd --train -r files/ -o dicts/0
---
dictionaries/setup
---
#!/bin/sh
# Runs in the test case's scratch directory.
# The test suite's scratch directory that
# `setup_once` operates in is the parent directory.
cp -r ../files ../dicts .
---
```

10
tests/cli-tests/basic/help.sh Executable file
View File

@ -0,0 +1,10 @@
#!/bin/sh
set -e
println "+ zstd -h"
zstd -h
println "+ zstd -H"
zstd -H
println "+ zstd --help"
zstd --help

View File

@ -0,0 +1,34 @@
+ zstd -h
*** zstd command line interface *-bits v1.*.*, by Yann Collet ***
Usage :
zstd *args* *FILE(s)* *-o file*
FILE : a filename
with no FILE, or when FILE is - , read standard input
Arguments :
-# : # compression level*
-d : decompression
-D DICT: use DICT as Dictionary for compression or decompression
-o file: result stored into `file` (only 1 output file)
-f : disable input and output checks. Allows overwriting existing files,
input from console, output to stdout, operating on links,
block devices, etc.
--rm : remove source file(s) after successful de/compression
-k : preserve source file(s) (default)
-h/-H : display help/long help and exit
Advanced arguments :
-V : display Version number and exit
...
+ zstd -H
...
Arguments :
...
Advanced arguments :
...
+ zstd --help
...
Arguments :
...
Advanced arguments :
...

View File

@ -0,0 +1,6 @@
#!/bin/sh
set -e
zstd -V
zstd --version

View File

@ -0,0 +1,2 @@
*** zstd command line interface *-bits v1.*.*, by Yann Collet ***
*** zstd command line interface *-bits v1.*.*, by Yann Collet ***

44
tests/cli-tests/bin/cmp_size Executable file
View File

@ -0,0 +1,44 @@
#!/bin/sh
set -e
usage()
{
printf "USAGE:\n\t$0 [-eq|-ne|-lt|-le|-gt|-ge] FILE1 FILE2\n"
}
help()
{
printf "Small utility to compare file sizes without printing them with set -x.\n\n"
usage
}
case "$1" in
-h) help; exit 0 ;;
--help) help; exit 0 ;;
esac
if ! test -f $2; then
printf "FILE1='%b' is not a file\n\n" "$2"
usage
exit 1
fi
if ! test -f $3; then
printf "FILE2='%b' is not a file\n\n" "$3"
usage
exit 1
fi
size1=$(wc -c < $2)
size2=$(wc -c < $3)
case "$1" in
-eq) [ "$size1" -eq "$size2" ] ;;
-ne) [ "$size1" -ne "$size2" ] ;;
-lt) [ "$size1" -lt "$size2" ] ;;
-le) [ "$size1" -le "$size2" ] ;;
-gt) [ "$size1" -gt "$size2" ] ;;
-ge) [ "$size1" -ge "$size2" ] ;;
esac

3
tests/cli-tests/bin/datagen Executable file
View File

@ -0,0 +1,3 @@
#!/bin/sh
"$DATAGEN_BIN" $@

4
tests/cli-tests/bin/die Executable file
View File

@ -0,0 +1,4 @@
#!/bin/sh
println "${*}" 1>&2
exit 1

2
tests/cli-tests/bin/println Executable file
View File

@ -0,0 +1,2 @@
#!/bin/sh
printf '%b\n' "${*}"

1
tests/cli-tests/bin/unzstd Symbolic link
View File

@ -0,0 +1 @@
zstd

7
tests/cli-tests/bin/zstd Executable file
View File

@ -0,0 +1,7 @@
#!/bin/sh
if [ -z "$EXEC_PREFIX" ]; then
"$ZSTD_BIN" $@
else
$EXEC_PREFIX "$ZSTD_BIN" $@
fi

1
tests/cli-tests/bin/zstdcat Symbolic link
View File

@ -0,0 +1 @@
zstd

2
tests/cli-tests/bin/zstdgrep Executable file
View File

@ -0,0 +1,2 @@
#!/bin/sh
"$ZSTDGREP_BIN" $@

View File

@ -0,0 +1,19 @@
#!/bin/sh
. "$COMMON/platform.sh"
zstd_supports_format()
{
zstd -h | grep > $INTOVOID -- "--format=$1"
}
format_extension()
{
if [ "$1" = "zstd" ]; then
printf "zst"
elif [ "$1" = "gzip" ]; then
printf "gz"
else
printf "$1"
fi
}

View File

@ -0,0 +1,13 @@
. "$COMMON/platform.sh"
MTIME="stat -c %Y"
case "$UNAME" in
Darwin | FreeBSD | OpenBSD | NetBSD) MTIME="stat -f %m" ;;
esac
assertSameMTime() {
MT1=$($MTIME "$1")
MT2=$($MTIME "$2")
echo MTIME $MT1 $MT2
[ "$MT1" = "$MT2" ] || die "mtime on $1 doesn't match mtime on $2 ($MT1 != $MT2)"
}

View File

@ -0,0 +1,18 @@
. "$COMMON/platform.sh"
GET_PERMS="stat -c %a"
case "$UNAME" in
Darwin | FreeBSD | OpenBSD | NetBSD) GET_PERMS="stat -f %Lp" ;;
esac
assertFilePermissions() {
STAT1=$($GET_PERMS "$1")
STAT2=$2
[ "$STAT1" = "$STAT2" ] || die "permissions on $1 don't match expected ($STAT1 != $STAT2)"
}
assertSamePermissions() {
STAT1=$($GET_PERMS "$1")
STAT2=$($GET_PERMS "$2")
[ "$STAT1" = "$STAT2" ] || die "permissions on $1 don't match those on $2 ($STAT1 != $STAT2)"
}

View File

@ -0,0 +1,37 @@
#!/bin/sh
UNAME=$(uname)
isWindows=false
INTOVOID="/dev/null"
case "$UNAME" in
GNU) DEVDEVICE="/dev/random" ;;
*) DEVDEVICE="/dev/zero" ;;
esac
case "$OS" in
Windows*)
isWindows=true
INTOVOID="NUL"
DEVDEVICE="NUL"
;;
esac
case "$UNAME" in
Darwin) MD5SUM="md5 -r" ;;
FreeBSD) MD5SUM="gmd5sum" ;;
NetBSD) MD5SUM="md5 -n" ;;
OpenBSD) MD5SUM="md5" ;;
*) MD5SUM="md5sum" ;;
esac
DIFF="diff"
case "$UNAME" in
SunOS) DIFF="gdiff" ;;
esac
if echo hello | zstd -v -T2 2>&1 > $INTOVOID | grep -q 'multi-threading is disabled'
then
hasMT=""
else
hasMT="true"
fi

View File

@ -0,0 +1,6 @@
#!/bin/sh
set -e
# Test --adapt
zstd -f file --adapt -c | zstd -t

View File

@ -0,0 +1,30 @@
#!/bin/sh
set -e
# Uncomment the set -v line for debugging
# set -v
# Test compression flags and check that they work
zstd file ; zstd -t file.zst
zstd -f file ; zstd -t file.zst
zstd -f -z file ; zstd -t file.zst
zstd -f -k file ; zstd -t file.zst
zstd -f -C file ; zstd -t file.zst
zstd -f --check file ; zstd -t file.zst
zstd -f --no-check file ; zstd -t file.zst
zstd -f -- file ; zstd -t file.zst
# Test output file compression
zstd -o file-out.zst ; zstd -t file-out.zst
zstd -fo file-out.zst; zstd -t file-out.zst
# Test compression to stdout
zstd -c file | zstd -t
zstd --stdout file | zstd -t
println bob | zstd | zstd -t
# Test --rm
cp file file-rm
zstd --rm file-rm; zstd -t file-rm.zst
test ! -f file-rm

View File

@ -0,0 +1,10 @@
#!/bin/sh
set -e
# Test --[no-]compress-literals
zstd file --no-compress-literals -1 -c | zstd -t
zstd file --no-compress-literals -19 -c | zstd -t
zstd file --no-compress-literals --fast=1 -c | zstd -t
zstd file --compress-literals -1 -c | zstd -t
zstd file --compress-literals --fast=1 -c | zstd -t

View File

@ -0,0 +1,16 @@
#!/bin/sh
. "$COMMON/format.sh"
set -e
# Test --format
zstd --format=zstd file -f
zstd -t file.zst
for format in "gzip" "lz4" "xz" "lzma"; do
if zstd_supports_format $format; then
zstd --format=$format file
zstd -t file.$(format_extension $format)
zstd -c --format=$format file | zstd -t --format=$format
fi
done

View File

@ -0,0 +1,64 @@
#!/bin/sh
set -e
set -v
datagen > file
# Compress with various levels and ensure that their sizes are ordered
zstd --fast=10 file -o file-f10.zst
zstd --fast=1 file -o file-f1.zst
zstd -1 file -o file-1.zst
zstd -19 file -o file-19.zst
zstd -22 --ultra file -o file-22.zst
zstd -t file-f10.zst file-f1.zst file-1.zst file-19.zst file-22.zst
cmp_size -ne file-19.zst file-22.zst
cmp_size -lt file-19.zst file-1.zst
cmp_size -lt file-1.zst file-f1.zst
cmp_size -lt file-f1.zst file-f10.zst
# Test default levels
zstd --fast file -f
cmp file.zst file-f1.zst || die "--fast is not level -1"
zstd -0 file -o file-0.zst
zstd -f file
cmp file.zst file-0.zst || die "Level 0 is not the default level"
# Test level clamping
zstd -99 file -o file-99.zst
cmp file-19.zst file-99.zst || die "Level 99 is clamped to 19"
zstd --fast=200000 file -c | zstd -t
zstd -5000000000 -f file && die "Level too large, must fail" ||:
zstd --fast=5000000000 -f file && die "Level too large, must fail" ||:
# Test setting a level through the environment variable
ZSTD_CLEVEL=-10 zstd file -o file-f10-env.zst
ZSTD_CLEVEL=1 zstd file -o file-1-env.zst
ZSTD_CLEVEL=+19 zstd file -o file-19-env.zst
ZSTD_CLEVEL=+99 zstd file -o file-99-env.zst
cmp file-f10.zst file-f10-env.zst || die "Environment variable failed to set level"
cmp file-1.zst file-1-env.zst || die "Environment variable failed to set level"
cmp file-19.zst file-19-env.zst || die "Environment variable failed to set level"
cmp file-99.zst file-99-env.zst || die "Environment variable failed to set level"
# Test invalid environment clevel is the default level
zstd -f file
ZSTD_CLEVEL=- zstd -f file -o file-env.zst ; cmp file.zst file-env.zst
ZSTD_CLEVEL=+ zstd -f file -o file-env.zst ; cmp file.zst file-env.zst
ZSTD_CLEVEL=a zstd -f file -o file-env.zst ; cmp file.zst file-env.zst
ZSTD_CLEVEL=-a zstd -f file -o file-env.zst ; cmp file.zst file-env.zst
ZSTD_CLEVEL=+a zstd -f file -o file-env.zst ; cmp file.zst file-env.zst
ZSTD_CLEVEL=3a7 zstd -f file -o file-env.zst ; cmp file.zst file-env.zst
ZSTD_CLEVEL=5000000000 zstd -f file -o file-env.zst; cmp file.zst file-env.zst
# Test environment clevel is overridden by command line
ZSTD_CLEVEL=10 zstd -f file -1 -o file-1-env.zst
ZSTD_CLEVEL=10 zstd -f file --fast=1 -o file-f1-env.zst
cmp file-1.zst file-1-env.zst || die "Environment variable not overridden"
cmp file-f1.zst file-f1-env.zst || die "Environment variable not overridden"

View File

@ -0,0 +1,71 @@
datagen > file
# Compress with various levels and ensure that their sizes are ordered
zstd --fast=10 file -o file-f10.zst
zstd --fast=1 file -o file-f1.zst
zstd -1 file -o file-1.zst
zstd -19 file -o file-19.zst
zstd -22 --ultra file -o file-22.zst
zstd -t file-f10.zst file-f1.zst file-1.zst file-19.zst file-22.zst
cmp_size -ne file-19.zst file-22.zst
cmp_size -lt file-19.zst file-1.zst
cmp_size -lt file-1.zst file-f1.zst
cmp_size -lt file-f1.zst file-f10.zst
# Test default levels
zstd --fast file -f
cmp file.zst file-f1.zst || die "--fast is not level -1"
zstd -0 file -o file-0.zst
zstd -f file
cmp file.zst file-0.zst || die "Level 0 is not the default level"
# Test level clamping
zstd -99 file -o file-99.zst
Warning : compression level higher than max, reduced to 19
cmp file-19.zst file-99.zst || die "Level 99 is clamped to 19"
zstd --fast=200000 file -c | zstd -t
zstd -5000000000 -f file && die "Level too large, must fail" ||:
error: numeric value overflows 32-bit unsigned int
zstd --fast=5000000000 -f file && die "Level too large, must fail" ||:
error: numeric value overflows 32-bit unsigned int
# Test setting a level through the environment variable
ZSTD_CLEVEL=-10 zstd file -o file-f10-env.zst
ZSTD_CLEVEL=1 zstd file -o file-1-env.zst
ZSTD_CLEVEL=+19 zstd file -o file-19-env.zst
ZSTD_CLEVEL=+99 zstd file -o file-99-env.zst
Warning : compression level higher than max, reduced to 19
cmp file-f10.zst file-f10-env.zst || die "Environment variable failed to set level"
cmp file-1.zst file-1-env.zst || die "Environment variable failed to set level"
cmp file-19.zst file-19-env.zst || die "Environment variable failed to set level"
cmp file-99.zst file-99-env.zst || die "Environment variable failed to set level"
# Test invalid environment clevel is the default level
zstd -f file
ZSTD_CLEVEL=- zstd -f file -o file-env.zst ; cmp file.zst file-env.zst
Ignore environment variable setting ZSTD_CLEVEL=-: not a valid integer value
ZSTD_CLEVEL=+ zstd -f file -o file-env.zst ; cmp file.zst file-env.zst
Ignore environment variable setting ZSTD_CLEVEL=+: not a valid integer value
ZSTD_CLEVEL=a zstd -f file -o file-env.zst ; cmp file.zst file-env.zst
Ignore environment variable setting ZSTD_CLEVEL=a: not a valid integer value
ZSTD_CLEVEL=-a zstd -f file -o file-env.zst ; cmp file.zst file-env.zst
Ignore environment variable setting ZSTD_CLEVEL=-a: not a valid integer value
ZSTD_CLEVEL=+a zstd -f file -o file-env.zst ; cmp file.zst file-env.zst
Ignore environment variable setting ZSTD_CLEVEL=+a: not a valid integer value
ZSTD_CLEVEL=3a7 zstd -f file -o file-env.zst ; cmp file.zst file-env.zst
Ignore environment variable setting ZSTD_CLEVEL=3a7: not a valid integer value
ZSTD_CLEVEL=5000000000 zstd -f file -o file-env.zst; cmp file.zst file-env.zst
Ignore environment variable setting ZSTD_CLEVEL=5000000000: numeric value too large
# Test environment clevel is overridden by command line
ZSTD_CLEVEL=10 zstd -f file -1 -o file-1-env.zst
ZSTD_CLEVEL=10 zstd -f file --fast=1 -o file-f1-env.zst
cmp file-1.zst file-1-env.zst || die "Environment variable not overridden"
cmp file-f1.zst file-f1-env.zst || die "Environment variable not overridden"

View File

@ -0,0 +1,7 @@
#!/bin/sh
set -e
# Test --long
zstd -f file --long ; zstd -t file.zst
zstd -f file --long=20; zstd -t file.zst

View File

@ -0,0 +1,11 @@
#!/bin/sh
set -e
# Test multi-threaded flags
zstd --single-thread file -f ; zstd -t file.zst
zstd -T2 -f file ; zstd -t file.zst
zstd --rsyncable -f file ; zstd -t file.zst
zstd -T0 -f file ; zstd -t file.zst
zstd -T0 --auto-threads=logical -f file ; zstd -t file.zst
zstd -T0 --auto-threads=physical -f file; zstd -t file.zst

View File

@ -0,0 +1,7 @@
#!/bin/sh
set -e
# Test --[no-]row-match-finder
zstd file -7f --row-match-finder
zstd file -7f --no-row-match-finder

View File

@ -0,0 +1,7 @@
#!/bin/sh
set -e
datagen > file
datagen > file0
datagen > file1

View File

@ -0,0 +1,7 @@
#!/bin/sh
set -e
# Test stream size & hint
datagen -g7654 | zstd --stream-size=7654 | zstd -t
datagen -g7654 | zstd --size-hint=7000 | zstd -t

View File

@ -0,0 +1,3 @@
#!/bin/sh
set -v
zstd --train

View File

@ -0,0 +1 @@
14

View File

@ -0,0 +1,5 @@
zstd --train
! Warning : nb of samples too low for proper processing !
! Please provide _one file per sample_.
! Alternatively, split files into fixed-size blocks representative of samples, with -B#
Error 14 : nb of samples too low

View File

@ -0,0 +1,29 @@
#!/bin/sh
. "$COMMON/platform.sh"
set -e
if [ false ]; then
for seed in $(seq 100); do
datagen -g1000 -s$seed > file$seed
done
zstd --train -r . -o dict0 -qq
for seed in $(seq 101 200); do
datagen -g1000 -s$seed > file$seed
done
zstd --train -r . -o dict1 -qq
[ "$($MD5SUM < dict0)" != "$($MD5SUM < dict1)" ] || die "dictionaries must not match"
datagen -g1000 -s0 > file0
fi
set -v
zstd files/0 -D dicts/0
zstd -t files/0.zst -D dicts/0
zstd -t files/0.zst -D dicts/1 && die "Must fail" ||:
zstd -t files/0.zst && die "Must fail" ||:

View File

@ -0,0 +1,6 @@
zstd files/0 -D dicts/0
zstd -t files/0.zst -D dicts/0
zstd -t files/0.zst -D dicts/1 && die "Must fail" ||:
files/0.zst : Decoding error (36) : Dictionary mismatch
zstd -t files/0.zst && die "Must fail" ||:
files/0.zst : Decoding error (36) : Dictionary mismatch

View File

@ -0,0 +1,6 @@
#!/bin/sh
set -e
cp -r ../files .
cp -r ../dicts .

View File

@ -0,0 +1,24 @@
#!/bin/sh
set -e
. "$COMMON/platform.sh"
mkdir files/ dicts/
for seed in $(seq 50); do
datagen -g1000 -s$seed > files/$seed
done
zstd --train -r files -o dicts/0 -qq
for seed in $(seq 51 100); do
datagen -g1000 -s$seed > files/$seed
done
zstd --train -r files -o dicts/1 -qq
cmp dicts/0 dicts/1 && die "dictionaries must not match!"
datagen -g1000 > files/0

687
tests/cli-tests/run.py Executable file
View File

@ -0,0 +1,687 @@
#!/usr/bin/env python3
# ################################################################
# Copyright (c) Facebook, Inc.
# All rights reserved.
#
# This source code is licensed under both the BSD-style license (found in the
# LICENSE file in the root directory of this source tree) and the GPLv2 (found
# in the COPYING file in the root directory of this source tree).
# You may select, at your option, one of the above-listed licenses.
# ##########################################################################
import argparse
import contextlib
import copy
import fnmatch
import os
import shutil
import subprocess
import sys
import tempfile
import typing
EXCLUDED_DIRS = {
"bin",
"common",
"scratch",
}
EXCLUDED_BASENAMES = {
"setup",
"setup_once",
"teardown",
"teardown_once",
"README.md",
"run.py",
".gitignore",
}
EXCLUDED_SUFFIXES = [
".exact",
".glob",
".ignore",
".exit",
]
def exclude_dir(dirname: str) -> bool:
"""
Should files under the directory :dirname: be excluded from the test runner?
"""
if dirname in EXCLUDED_DIRS:
return True
return False
def exclude_file(filename: str) -> bool:
"""Should the file :filename: be excluded from the test runner?"""
if filename in EXCLUDED_BASENAMES:
return True
for suffix in EXCLUDED_SUFFIXES:
if filename.endswith(suffix):
return True
return False
def read_file(filename: str) -> bytes:
"""Reads the file :filename: and returns the contents as bytes."""
with open(filename, "rb") as f:
return f.read()
def diff(a: bytes, b: bytes) -> str:
"""Returns a diff between two different byte-strings :a: and :b:."""
assert a != b
with tempfile.NamedTemporaryFile("wb") as fa:
fa.write(a)
fa.flush()
with tempfile.NamedTemporaryFile("wb") as fb:
fb.write(b)
fb.flush()
diff_bytes = subprocess.run(["diff", fa.name, fb.name], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL).stdout
return diff_bytes.decode("utf8")
def pop_line(data: bytes) -> typing.Tuple[typing.Optional[bytes], bytes]:
"""
Pop the first line from :data: and returns the first line and the remainder
of the data as a tuple. If :data: is empty, returns :(None, data):. Otherwise
the first line always ends in a :\n:, even if it is the last line and :data:
doesn't end in :\n:.
"""
NEWLINE = b"\n"[0]
if data == b'':
return (None, data)
newline_idx = data.find(b"\n")
if newline_idx == -1:
end_idx = len(data)
else:
end_idx = newline_idx + 1
line = data[:end_idx]
data = data[end_idx:]
assert len(line) != 0
if line[-1] != NEWLINE:
line += NEWLINE
return (line, data)
def glob_line_matches(actual: bytes, expect: bytes) -> bool:
"""
Does the `actual` line match the expected glob line `expect`?
"""
return fnmatch.fnmatchcase(actual.strip(), expect.strip())
def glob_diff(actual: bytes, expect: bytes) -> bytes:
"""
Returns None if the :actual: content matches the expected glob :expect:,
otherwise returns the diff bytes.
"""
diff = b''
actual_line, actual = pop_line(actual)
expect_line, expect = pop_line(expect)
while True:
# Handle end of file conditions - allow extra newlines
while expect_line is None and actual_line == b"\n":
actual_line, actual = pop_line(actual)
while actual_line is None and expect_line == b"\n":
expect_line, expect = pop_line(expect)
if expect_line is None and actual_line is None:
if diff == b'':
return None
return diff
elif expect_line is None:
diff += b"---\n"
while actual_line != None:
diff += b"> "
diff += actual_line
actual_line, actual = pop_line(actual)
return diff
elif actual_line is None:
diff += b"---\n"
while expect_line != None:
diff += b"< "
diff += expect_line
expect_line, expect = pop_line(expect)
return diff
assert expect_line is not None
assert actual_line is not None
if expect_line == b'...\n':
next_expect_line, expect = pop_line(expect)
if next_expect_line is None:
if diff == b'':
return None
return diff
while not glob_line_matches(actual_line, next_expect_line):
actual_line, actual = pop_line(actual)
if actual_line is None:
diff += b"---\n"
diff += b"< "
diff += next_expect_line
return diff
expect_line = next_expect_line
continue
if not glob_line_matches(actual_line, expect_line):
diff += b'---\n'
diff += b'< ' + expect_line
diff += b'> ' + actual_line
actual_line, actual = pop_line(actual)
expect_line, expect = pop_line(expect)
class Options:
"""Options configuring how to run a :TestCase:."""
def __init__(
self,
env: typing.Dict[str, str],
timeout: typing.Optional[int],
verbose: bool,
preserve: bool,
scratch_dir: str,
test_dir: str,
) -> None:
self.env = env
self.timeout = timeout
self.verbose = verbose
self.preserve = preserve
self.scratch_dir = scratch_dir
self.test_dir = test_dir
class TestCase:
"""
Logic and state related to running a single test case.
1. Initialize the test case.
2. Launch the test case with :TestCase.launch():.
This will start the test execution in a subprocess, but
not wait for completion. So you could launch multiple test
cases in parallel. This will now print any test output.
3. Analyze the results with :TestCase.analyze():. This will
join the test subprocess, check the results against the
expectations, and print the results to stdout.
:TestCase.run(): is also provided which combines the launch & analyze
steps for single-threaded use-cases.
All other methods, prefixed with _, are private helper functions.
"""
def __init__(self, test_filename: str, options: Options) -> None:
"""
Initialize the :TestCase: for the test located in :test_filename:
with the given :options:.
"""
self._opts = options
self._test_file = test_filename
self._test_name = os.path.normpath(
os.path.relpath(test_filename, start=self._opts.test_dir)
)
self._success = {}
self._message = {}
self._test_stdin = None
self._scratch_dir = os.path.abspath(os.path.join(self._opts.scratch_dir, self._test_name))
@property
def name(self) -> str:
"""Returns the unique name for the test."""
return self._test_name
def launch(self) -> None:
"""
Launch the test case as a subprocess, but do not block on completion.
This allows users to run multiple tests in parallel. Results aren't yet
printed out.
"""
self._launch_test()
def analyze(self) -> bool:
"""
Must be called after :TestCase.launch():. Joins the test subprocess and
checks the results against expectations. Finally prints the results to
stdout and returns the success.
"""
self._join_test()
self._check_exit()
self._check_stderr()
self._check_stdout()
self._analyze_results()
return self._succeeded
def run(self) -> bool:
"""Shorthand for combining both :TestCase.launch(): and :TestCase.analyze():."""
self.launch()
return self.analyze()
def _log(self, *args, **kwargs) -> None:
"""Logs test output."""
print(file=sys.stdout, *args, **kwargs)
def _vlog(self, *args, **kwargs) -> None:
"""Logs verbose test output."""
if self._opts.verbose:
print(file=sys.stdout, *args, **kwargs)
def _test_environment(self) -> typing.Dict[str, str]:
"""
Returns the environment to be used for the
test subprocess.
"""
env = copy.copy(os.environ)
for k, v in self._opts.env.items():
self._vlog(f"${k}='{v}'")
env[k] = v
def _launch_test(self) -> None:
"""Launch the test subprocess, but do not join it."""
args = [os.path.abspath(self._test_file)]
stdin_name = f"{self._test_file}.stdin"
if os.path.exists(stdin_name):
self._test_stdin = open(stdin_name, "rb")
stdin = self._test_stdin
else:
stdin = subprocess.DEVNULL
cwd = self._scratch_dir
env = self._test_environment()
self._test_process = subprocess.Popen(
args=args,
stdin=stdin,
cwd=cwd,
env=env,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE
)
def _join_test(self) -> None:
"""Join the test process and save stderr, stdout, and the exit code."""
(stdout, stderr) = self._test_process.communicate(timeout=self._opts.timeout)
self._output = {}
self._output["stdout"] = stdout
self._output["stderr"] = stderr
self._exit_code = self._test_process.returncode
self._test_process = None
if self._test_stdin is not None:
self._test_stdin.close()
self._test_stdin = None
def _check_output_exact(self, out_name: str, expected: bytes) -> None:
"""
Check the output named :out_name: for an exact match against the :expected: content.
Saves the success and message.
"""
check_name = f"check_{out_name}"
actual = self._output[out_name]
if actual == expected:
self._success[check_name] = True
self._message[check_name] = f"{out_name} matches!"
else:
self._success[check_name] = False
self._message[check_name] = f"{out_name} does not match!\n> diff expected actual\n{diff(expected, actual)}"
def _check_output_glob(self, out_name: str, expected: bytes) -> None:
"""
Check the output named :out_name: for a glob match against the :expected: glob.
Saves the success and message.
"""
check_name = f"check_{out_name}"
actual = self._output[out_name]
diff = glob_diff(actual, expected)
if diff is None:
self._success[check_name] = True
self._message[check_name] = f"{out_name} matches!"
else:
utf8_diff = diff.decode('utf8')
self._success[check_name] = False
self._message[check_name] = f"{out_name} does not match!\n> diff expected actual\n{utf8_diff}"
def _check_output(self, out_name: str) -> None:
"""
Checks the output named :out_name: for a match against the expectation.
We check for a .exact, .glob, and a .ignore file. If none are found we
expect that the output should be empty.
If :Options.preserve: was set then we save the scratch directory and
save the stderr, stdout, and exit code to the scratch directory for
debugging.
"""
if self._opts.preserve:
# Save the output to the scratch directory
actual_name = os.path.join(self._scratch_dir, f"{out_name}")
with open(actual_name, "wb") as f:
f.write(self._output[out_name])
exact_name = f"{self._test_file}.{out_name}.exact"
glob_name = f"{self._test_file}.{out_name}.glob"
ignore_name = f"{self._test_file}.{out_name}.ignore"
if os.path.exists(exact_name):
return self._check_output_exact(out_name, read_file(exact_name))
elif os.path.exists(glob_name):
return self._check_output_glob(out_name, read_file(glob_name))
elif os.path.exists(ignore_name):
check_name = f"check_{out_name}"
self._success[check_name] = True
self._message[check_name] = f"{out_name} ignored!"
else:
return self._check_output_exact(out_name, bytes())
def _check_stderr(self) -> None:
"""Checks the stderr output against the expectation."""
self._check_output("stderr")
def _check_stdout(self) -> None:
"""Checks the stdout output against the expectation."""
self._check_output("stdout")
def _check_exit(self) -> None:
"""
Checks the exit code against expectations. If a .exit file
exists, we expect that the exit code matches the contents.
Otherwise we expect the exit code to be zero.
If :Options.preserve: is set we save the exit code to the
scratch directory under the filename "exit".
"""
if self._opts.preserve:
exit_name = os.path.join(self._scratch_dir, "exit")
with open(exit_name, "w") as f:
f.write(str(self._exit_code) + "\n")
exit_name = f"{self._test_file}.exit"
if os.path.exists(exit_name):
exit_code: int = int(read_file(exit_name))
else:
exit_code: int = 0
if exit_code == self._exit_code:
self._success["check_exit"] = True
self._message["check_exit"] = "Exit code matches!"
else:
self._success["check_exit"] = False
self._message["check_exit"] = f"Exit code mismatch! Expected {exit_code} but got {self._exit_code}"
def _analyze_results(self) -> None:
"""
After all tests have been checked, collect all the successes
and messages, and print the results to stdout.
"""
STATUS = {True: "PASS", False: "FAIL"}
checks = sorted(self._success.keys())
self._succeeded = all(self._success.values())
self._log(f"{STATUS[self._succeeded]}: {self._test_name}")
if not self._succeeded or self._opts.verbose:
for check in checks:
if self._opts.verbose or not self._success[check]:
self._log(f"{STATUS[self._success[check]]}: {self._test_name}.{check}")
self._log(self._message[check])
self._log("----------------------------------------")
class TestSuite:
"""
Setup & teardown test suite & cases.
This class is intended to be used as a context manager.
TODO: Make setup/teardown failure emit messages, not throw exceptions.
"""
def __init__(self, test_directory: str, options: Options) -> None:
self._opts = options
self._test_dir = os.path.abspath(test_directory)
rel_test_dir = os.path.relpath(test_directory, start=self._opts.test_dir)
assert not rel_test_dir.startswith(os.path.sep)
self._scratch_dir = os.path.normpath(os.path.join(self._opts.scratch_dir, rel_test_dir))
def __enter__(self) -> 'TestSuite':
self._setup_once()
return self
def __exit__(self, _exc_type, _exc_value, _traceback) -> None:
self._teardown_once()
@contextlib.contextmanager
def test_case(self, test_basename: str) -> TestCase:
"""
Context manager for a test case in the test suite.
Pass the basename of the test relative to the :test_directory:.
"""
assert os.path.dirname(test_basename) == ""
try:
self._setup(test_basename)
test_filename = os.path.join(self._test_dir, test_basename)
yield TestCase(test_filename, self._opts)
finally:
self._teardown(test_basename)
def _remove_scratch_dir(self, dir: str) -> None:
"""Helper to remove a scratch directory with sanity checks"""
assert "scratch" in dir
assert dir.startswith(self._scratch_dir)
assert os.path.exists(dir)
shutil.rmtree(dir)
def _setup_once(self) -> None:
if os.path.exists(self._scratch_dir):
self._remove_scratch_dir(self._scratch_dir)
os.makedirs(self._scratch_dir)
setup_script = os.path.join(self._test_dir, "setup_once")
if os.path.exists(setup_script):
self._run_script(setup_script, cwd=self._scratch_dir)
def _teardown_once(self) -> None:
assert os.path.exists(self._scratch_dir)
teardown_script = os.path.join(self._test_dir, "teardown_once")
if os.path.exists(teardown_script):
self._run_script(teardown_script, cwd=self._scratch_dir)
if not self._opts.preserve:
self._remove_scratch_dir(self._scratch_dir)
def _setup(self, test_basename: str) -> None:
test_scratch_dir = os.path.join(self._scratch_dir, test_basename)
assert not os.path.exists(test_scratch_dir)
os.makedirs(test_scratch_dir)
setup_script = os.path.join(self._test_dir, "setup")
if os.path.exists(setup_script):
self._run_script(setup_script, cwd=test_scratch_dir)
def _teardown(self, test_basename: str) -> None:
test_scratch_dir = os.path.join(self._scratch_dir, test_basename)
assert os.path.exists(test_scratch_dir)
teardown_script = os.path.join(self._test_dir, "teardown")
if os.path.exists(teardown_script):
self._run_script(teardown_script, cwd=test_scratch_dir)
if not self._opts.preserve:
self._remove_scratch_dir(test_scratch_dir)
def _run_script(self, script: str, cwd: str) -> None:
env = copy.copy(os.environ)
for k, v in self._opts.env.items():
env[k] = v
try:
subprocess.run(
args=[script],
stdin=subprocess.DEVNULL,
capture_output=True,
cwd=cwd,
env=env,
check=True,
)
except subprocess.CalledProcessError as e:
print(f"{script} failed with exit code {e.returncode}!")
print(f"stderr:\n{e.stderr}")
print(f"stdout:\n{e.stdout}")
raise
TestSuites = typing.Dict[str, typing.List[str]]
def get_all_tests(options: Options) -> TestSuites:
"""
Find all the test in the test directory and return the test suites.
"""
test_suites = {}
for root, dirs, files in os.walk(options.test_dir, topdown=True):
dirs[:] = [d for d in dirs if not exclude_dir(d)]
test_cases = []
for file in files:
if not exclude_file(file):
test_cases.append(file)
assert root == os.path.normpath(root)
test_suites[root] = test_cases
return test_suites
def resolve_listed_tests(
tests: typing.List[str], options: Options
) -> TestSuites:
"""
Resolve the list of tests passed on the command line into their
respective test suites. Tests can either be paths, or test names
relative to the test directory.
"""
test_suites = {}
for test in tests:
if not os.path.exists(test):
test = os.path.join(options.test_dir, test)
if not os.path.exists(test):
raise RuntimeError(f"Test {test} does not exist!")
test = os.path.normpath(os.path.abspath(test))
assert test.startswith(options.test_dir)
test_suite = os.path.dirname(test)
test_case = os.path.basename(test)
test_suites.setdefault(test_suite, []).append(test_case)
return test_suites
def run_tests(test_suites: TestSuites, options: Options) -> bool:
"""
Runs all the test in the :test_suites: with the given :options:.
Prints the results to stdout.
"""
tests = {}
for test_dir, test_files in test_suites.items():
with TestSuite(test_dir, options) as test_suite:
test_files = sorted(set(test_files))
for test_file in test_files:
with test_suite.test_case(test_file) as test_case:
tests[test_case.name] = test_case.run()
successes = 0
for test, status in tests.items():
if status:
successes += 1
else:
print(f"FAIL: {test}")
if successes == len(tests):
print(f"PASSED all {len(tests)} tests!")
return True
else:
print(f"FAILED {len(tests) - successes} / {len(tests)} tests!")
return False
if __name__ == "__main__":
CLI_TEST_DIR = os.path.dirname(sys.argv[0])
REPO_DIR = os.path.join(CLI_TEST_DIR, "..", "..")
PROGRAMS_DIR = os.path.join(REPO_DIR, "programs")
TESTS_DIR = os.path.join(REPO_DIR, "tests")
ZSTD_PATH = os.path.join(PROGRAMS_DIR, "zstd")
ZSTDGREP_PATH = os.path.join(PROGRAMS_DIR, "zstdgrep")
DATAGEN_PATH = os.path.join(TESTS_DIR, "datagen")
parser = argparse.ArgumentParser(
(
"Runs the zstd CLI tests. Exits nonzero on failure. Default arguments are\n"
"generally correct. Pass --preserve to preserve test output for debugging,\n"
"and --verbose to get verbose test output.\n"
)
)
parser.add_argument(
"--preserve",
action="store_true",
help="Preserve the scratch directory TEST_DIR/scratch/ for debugging purposes."
)
parser.add_argument("--verbose", action="store_true", help="Verbose test output.")
parser.add_argument("--timeout", default=60, type=int, help="Test case timeout in seconds. Set to 0 to disable timeouts.")
parser.add_argument(
"--exec-prefix",
default=None,
help="Sets the EXEC_PREFIX environment variable. Prefix to invocations of the zstd CLI."
)
parser.add_argument(
"--zstd",
default=ZSTD_PATH,
help="Sets the ZSTD_BIN environment variable. Path of the zstd CLI."
)
parser.add_argument(
"--zstdgrep",
default=ZSTDGREP_PATH,
help="Sets the ZSTDGREP_BIN environment variable. Path of the zstdgrep CLI."
)
parser.add_argument(
"--datagen",
default=DATAGEN_PATH,
help="Sets the DATAGEN_BIN environment variable. Path to the datagen CLI."
)
parser.add_argument(
"--test-dir",
default=CLI_TEST_DIR,
help=(
"Runs the tests under this directory. "
"Adds TEST_DIR/bin/ to path. "
"Scratch directory located in TEST_DIR/scratch/."
)
)
parser.add_argument(
"tests",
nargs="*",
help="Run only these test cases. Can either be paths or test names relative to TEST_DIR/"
)
args = parser.parse_args()
if args.timeout <= 0:
args.timeout = None
args.test_dir = os.path.normpath(os.path.abspath(args.test_dir))
bin_dir = os.path.join(args.test_dir, "bin")
scratch_dir = os.path.join(args.test_dir, "scratch")
env = {}
if args.exec_prefix is not None:
env["EXEC_PREFIX"] = args.exec_prefix
env["ZSTD_BIN"] = os.path.abspath(args.zstd)
env["DATAGEN_BIN"] = os.path.abspath(args.datagen)
env["ZSTDGREP_BIN"] = os.path.abspath(args.zstdgrep)
env["COMMON"] = os.path.abspath(os.path.join(args.test_dir, "common"))
env["PATH"] = os.path.abspath(bin_dir) + ":" + os.getenv("PATH", "")
opts = Options(
env=env,
timeout=args.timeout,
verbose=args.verbose,
preserve=args.preserve,
test_dir=args.test_dir,
scratch_dir=scratch_dir,
)
if len(args.tests) == 0:
tests = get_all_tests(opts)
else:
tests = resolve_listed_tests(args.tests, opts)
success = run_tests(tests, opts)
if success:
sys.exit(0)
else:
sys.exit(1)

View File

@ -124,7 +124,7 @@ int main(int argc, const char** argv)
DISPLAYLEVEL(3, "Seed = %u \n", (unsigned)seed);
RDG_genStdout(size, (double)probaU32/100, litProba, seed);
DISPLAYLEVEL(1, "\n");
DISPLAYLEVEL(3, "\n");
return 0;
}

View File

@ -260,10 +260,13 @@ zstd -dc - < tmp.zst > $INTOVOID
zstd -d < tmp.zst > $INTOVOID # implicit stdout when stdin is used
zstd -d - < tmp.zst > $INTOVOID
println "test : impose memory limitation (must fail)"
zstd -d -f tmp.zst -M2K -c > $INTOVOID && die "decompression needs more memory than allowed"
zstd -d -f tmp.zst --memlimit=2K -c > $INTOVOID && die "decompression needs more memory than allowed" # long command
zstd -d -f tmp.zst --memory=2K -c > $INTOVOID && die "decompression needs more memory than allowed" # long command
zstd -d -f tmp.zst --memlimit-decompress=2K -c > $INTOVOID && die "decompression needs more memory than allowed" # long command
datagen -g500K > tmplimit
zstd -f tmplimit
zstd -d -f tmplimit.zst -M2K -c > $INTOVOID && die "decompression needs more memory than allowed"
zstd -d -f tmplimit.zst --memlimit=2K -c > $INTOVOID && die "decompression needs more memory than allowed" # long command
zstd -d -f tmplimit.zst --memory=2K -c > $INTOVOID && die "decompression needs more memory than allowed" # long command
zstd -d -f tmplimit.zst --memlimit-decompress=2K -c > $INTOVOID && die "decompression needs more memory than allowed" # long command
rm -f tmplimit tmplimit.zst
println "test : overwrite protection"
zstd -q tmp && die "overwrite check failed!"
println "test : force overwrite"
@ -1596,11 +1599,11 @@ elif [ "$longCSize19wlog23" -gt "$optCSize19wlog23" ]; then
exit 1
fi
println "\n===> zstd asyncio decompression tests "
println "\n===> zstd asyncio tests "
addFrame() {
datagen -g2M -s$2 >> tmp_uncompressed
datagen -g2M -s$2 | zstd --format=$1 >> tmp_compressed.zst
datagen -g2M -s$2 | zstd -1 --format=$1 >> tmp_compressed.zst
}
addTwoFrames() {