error out when --adapt is associated with --single-thread

since they are not compatible
dev
Yann Collet 2018-09-19 14:49:13 -07:00
parent 2f78228f65
commit 89bc309d90
2 changed files with 39 additions and 25 deletions

View File

@ -736,13 +736,14 @@ ZSTDLIB_API size_t ZSTD_initCStream_usingCDict_advanced(ZSTD_CStream* zcs, const
/*! ZSTD_resetCStream() :
* start a new compression job, using same parameters from previous job.
* This is typically useful to skip dictionary loading stage, since it will re-use it in-place..
* This is typically useful to skip dictionary loading stage, since it will re-use it in-place.
* Note that zcs must be init at least once before using ZSTD_resetCStream().
* If pledgedSrcSize is not known at reset time, use macro ZSTD_CONTENTSIZE_UNKNOWN.
* If pledgedSrcSize > 0, its value must be correct, as it will be written in header, and controlled at the end.
* For the time being, pledgedSrcSize==0 is interpreted as "srcSize unknown" for compatibility with older programs,
* but it will change to mean "empty" in future version, so use macro ZSTD_CONTENTSIZE_UNKNOWN instead.
* @return : 0, or an error code (which can be tested using ZSTD_isError()) */
* @return : 0, or an error code (which can be tested using ZSTD_isError())
*/
ZSTDLIB_API size_t ZSTD_resetCStream(ZSTD_CStream* zcs, unsigned long long pledgedSrcSize);
@ -755,21 +756,27 @@ typedef struct {
unsigned nbActiveWorkers; /* MT only : nb of workers actively compressing at probe time */
} ZSTD_frameProgression;
/* ZSTD_getFrameProgression():
/* ZSTD_getFrameProgression() :
* tells how much data has been ingested (read from input)
* consumed (input actually compressed) and produced (output) for current frame.
* Therefore, (ingested - consumed) is amount of input data buffered internally, not yet compressed.
* Can report progression inside worker threads (multi-threading and non-blocking mode).
* Note : (ingested - consumed) is amount of input data buffered internally, not yet compressed.
* Aggregates progression inside active worker threads.
*/
ZSTDLIB_API ZSTD_frameProgression ZSTD_getFrameProgression(const ZSTD_CCtx* cctx);
/*! ZSTD_toFlushNow()
/*! ZSTD_toFlushNow() :
* Tell how many bytes are ready to be flushed immediately.
* Useful for multithreading scenarios (nbWorkers >= 1).
* Probe the oldest active job (not yet entirely flushed) and check its output buffer.
* If return 0, it means there is no active job, or
* it means oldest job is still active, but everything produced has been flushed so far,
* therefore flushing is limited by speed of oldest job. */
* Probe the oldest active job, defined as oldest job not yet entirely flushed,
* and check its output buffer.
* @return : amount of data stored in oldest job and ready to be flushed immediately.
* if @return == 0, it means either :
* + there is no active job (could be checked with ZSTD_frameProgression()), or
* + oldest job is still actively compressing data,
* but everything it has produced has also been flushed so far,
* therefore flushing speed is currently limited by production speed of oldest job
* irrespective of the speed of concurrent newer jobs.
*/
ZSTDLIB_API size_t ZSTD_toFlushNow(ZSTD_CCtx* cctx);

View File

@ -283,7 +283,11 @@ void FIO_setOverlapLog(unsigned overlapLog){
g_overlapLog = overlapLog;
}
static U32 g_adaptiveMode = 0;
void FIO_setAdaptiveMode(unsigned adapt) { g_adaptiveMode = adapt; }
void FIO_setAdaptiveMode(unsigned adapt) {
if ((adapt>0) && (g_nbWorkers==0))
EXM_THROW(1, "Adaptive mode is not compatible with single thread mode \n");
g_adaptiveMode = adapt;
}
static U32 g_ldmFlag = 0;
void FIO_setLdmFlag(unsigned ldmFlag) {
g_ldmFlag = (ldmFlag>0);
@ -541,7 +545,8 @@ static void FIO_freeCResources(cRess_t ress)
#ifdef ZSTD_GZCOMPRESS
static unsigned long long FIO_compressGzFrame(cRess_t* ress,
static unsigned long long
FIO_compressGzFrame(cRess_t* ress,
const char* srcFileName, U64 const srcFileSize,
int compressionLevel, U64* readsize)
{
@ -623,9 +628,10 @@ static unsigned long long FIO_compressGzFrame(cRess_t* ress,
#ifdef ZSTD_LZMACOMPRESS
static unsigned long long FIO_compressLzmaFrame(cRess_t* ress,
const char* srcFileName, U64 const srcFileSize,
int compressionLevel, U64* readsize, int plain_lzma)
static unsigned long long
FIO_compressLzmaFrame(cRess_t* ress,
const char* srcFileName, U64 const srcFileSize,
int compressionLevel, U64* readsize, int plain_lzma)
{
unsigned long long inFileSize = 0, outFileSize = 0;
lzma_stream strm = LZMA_STREAM_INIT;
@ -698,9 +704,10 @@ static unsigned long long FIO_compressLzmaFrame(cRess_t* ress,
#define LZ4F_max64KB max64KB
#endif
static int FIO_LZ4_GetBlockSize_FromBlockId (int id) { return (1 << (8 + (2 * id))); }
static unsigned long long FIO_compressLz4Frame(cRess_t* ress,
const char* srcFileName, U64 const srcFileSize,
int compressionLevel, U64* readsize)
static unsigned long long
FIO_compressLz4Frame(cRess_t* ress,
const char* srcFileName, U64 const srcFileSize,
int compressionLevel, U64* readsize)
{
const size_t blockSize = FIO_LZ4_GetBlockSize_FromBlockId(LZ4F_max64KB);
unsigned long long inFileSize = 0, outFileSize = 0;
@ -838,7 +845,7 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
/* count stats */
inputPresented++;
if (oldIPos == inBuff.pos) inputBlocked++;
if (oldIPos == inBuff.pos) inputBlocked++; /* input buffer is full and can't take any more : input speed is faster than consumption rate */
if (!toFlushNow) flushWaiting = 1;
/* Write compressed stream */
@ -846,7 +853,7 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
(U32)directive, (U32)inBuff.pos, (U32)inBuff.size, (U32)outBuff.pos);
if (outBuff.pos) {
size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile);
if (sizeCheck!=outBuff.pos)
if (sizeCheck != outBuff.pos)
EXM_THROW(25, "Write error : cannot write compressed block");
compressedfilesize += outBuff.pos;
}
@ -857,24 +864,24 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
double const cShare = (double)zfp.produced / (zfp.consumed + !zfp.consumed/*avoid div0*/) * 100;
/* check output speed */
if (zfp.currentJobID > 1) {
static ZSTD_frameProgression cpszfp = { 0, 0, 0, 0, 0, 0 };
if (zfp.currentJobID > 1) { /* only possible if nbWorkers >= 1 */
static ZSTD_frameProgression cpszfp = { 0, 0, 0, 0, 0, 0 }; /* note : requires fileio to run main thread */
unsigned long long newlyProduced = zfp.produced - cpszfp.produced;
unsigned long long newlyFlushed = zfp.flushed - cpszfp.flushed;
assert(zfp.produced >= cpszfp.produced);
cpszfp = zfp;
assert(g_nbWorkers >= 1);
if ( (zfp.ingested == cpszfp.ingested) /* no data read : input buffer full */
&& (zfp.consumed == cpszfp.consumed) /* no data compressed : no more buffer to compress OR compression is really slow */
&& (zfp.nbActiveWorkers == 0) /* confirmed : no compression : either no more buffer to compress OR not enough data to start first worker */
&& (zfp.currentJobID > 0) /* first job started : only remaining reason is no more available buffer to start compression */
) {
DISPLAYLEVEL(6, "all buffers full : compression stopped => slow down \n")
speedChange = slower;
}
cpszfp = zfp;
if ( (newlyProduced > (newlyFlushed * 9 / 8)) /* compression produces more data than output can flush (though production can be spiky, due to work unit : (N==4)*block sizes) */
&& (flushWaiting == 0) /* flush speed was never slowed by lack of production, so it's operating at max capacity */
) {