diff --git a/lib/common/error_private.c b/lib/common/error_private.c index 6bc86da7..2d752cd2 100644 --- a/lib/common/error_private.c +++ b/lib/common/error_private.c @@ -24,7 +24,8 @@ const char* ERR_getErrorString(ERR_enum code) case PREFIX(frameParameter_unsupported): return "Unsupported frame parameter"; case PREFIX(frameParameter_unsupportedBy32bits): return "Frame parameter unsupported in 32-bits mode"; case PREFIX(frameParameter_windowTooLarge): return "Frame requires too much memory for decoding"; - case PREFIX(compressionParameter_unsupported): return "Compression parameter is out of bound"; + case PREFIX(compressionParameter_unsupported): return "Compression parameter is not supported"; + case PREFIX(compressionParameter_outOfBound): return "Compression parameter is out of bound"; case PREFIX(init_missing): return "Context should be init first"; case PREFIX(memory_allocation): return "Allocation error : not enough memory"; case PREFIX(stage_wrong): return "Operation not authorized at current processing stage"; diff --git a/lib/common/zstd_errors.h b/lib/common/zstd_errors.h index e067acf6..19f1597a 100644 --- a/lib/common/zstd_errors.h +++ b/lib/common/zstd_errors.h @@ -35,8 +35,11 @@ extern "C" { #endif /*-**************************************** -* error codes list -******************************************/ + * error codes list + * note : this API is still considered unstable + * it should not be used with a dynamic library + * only static linking is allowed + ******************************************/ typedef enum { ZSTD_error_no_error, ZSTD_error_GENERIC, @@ -47,6 +50,7 @@ typedef enum { ZSTD_error_frameParameter_unsupportedBy32bits, ZSTD_error_frameParameter_windowTooLarge, ZSTD_error_compressionParameter_unsupported, + ZSTD_error_compressionParameter_outOfBound, ZSTD_error_init_missing, ZSTD_error_memory_allocation, ZSTD_error_stage_wrong, @@ -67,7 +71,7 @@ typedef enum { /*! ZSTD_getErrorCode() : convert a `size_t` function result into a `ZSTD_ErrorCode` enum type, - which can be used to compare directly with enum list published into "error_public.h" */ + which can be used to compare with enum list published above */ ZSTDERRORLIB_API ZSTD_ErrorCode ZSTD_getErrorCode(size_t functionResult); ZSTDERRORLIB_API const char* ZSTD_getErrorString(ZSTD_ErrorCode code); diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index 1be0e97b..dfaa1854 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -241,7 +241,7 @@ size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned v { # define CLAMPCHECK(val,min,max) { \ if ((valmax)) { \ - return ERROR(compressionParameter_unsupported); \ + return ERROR(compressionParameter_outOfBound); \ } } if (cctx->streamStage != zcss_init) return ERROR(stage_wrong); @@ -342,19 +342,20 @@ size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned v if (cctx->staticSize) /* MT not compatible with static alloc */ return ERROR(compressionParameter_unsupported); ZSTDMT_freeCCtx(cctx->mtctx); - cctx->nbThreads = value; + cctx->nbThreads = 1; cctx->mtctx = ZSTDMT_createCCtx(value); if (cctx->mtctx == NULL) return ERROR(memory_allocation); - } - cctx->nbThreads = 1; + cctx->nbThreads = value; + } else + cctx->nbThreads = 1; return 0; - case ZSTDMT_p_jobSize: + case ZSTD_p_jobSize: if (cctx->nbThreads <= 1) return ERROR(compressionParameter_unsupported); assert(cctx->mtctx != NULL); return ZSTDMT_setMTCtxParameter(cctx->mtctx, ZSTDMT_p_sectionSize, value); - case ZSTDMT_p_overlapSizeLog: + case ZSTD_p_overlapSizeLog: if (cctx->nbThreads <= 1) return ERROR(compressionParameter_unsupported); assert(cctx->mtctx != NULL); return ZSTDMT_setMTCtxParameter(cctx->mtctx, ZSTDMT_p_overlapSectionLog, value); @@ -3648,8 +3649,8 @@ static size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs, && (zcs->inBuffPos == zcs->inToCompress) ) { /* empty */ someMoreWork = 0; break; - } - } + } + } /* compress current block (note : this stage cannot be stopped in the middle) */ DEBUGLOG(5, "stream compression stage (flushMode==%u)", flushMode); { void* cDst; @@ -3658,7 +3659,7 @@ static size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs, size_t oSize = oend-op; unsigned const lastBlock = (flushMode == ZSTD_e_end) && (ip==iend); if (oSize >= ZSTD_compressBound(iSize)) - cDst = op; /* compress directly into output buffer (skip flush stage) */ + cDst = op; /* compress into output buffer, to skip flush stage */ else cDst = zcs->outBuff, oSize = zcs->outBuffSize; cSize = lastBlock ? @@ -3667,7 +3668,6 @@ static size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs, ZSTD_compressContinue(zcs, cDst, oSize, zcs->inBuff + zcs->inToCompress, iSize); if (ZSTD_isError(cSize)) return cSize; - DEBUGLOG(5, "cSize = %u (lastBlock:%u)", (U32)cSize, lastBlock); zcs->frameEnded = lastBlock; /* prepare next block */ zcs->inBuffTarget = zcs->inBuffPos + zcs->blockSize; @@ -3681,7 +3681,7 @@ static size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs, if (cDst == op) { /* no need to flush */ op += cSize; if (zcs->frameEnded) { - DEBUGLOG(5, "Frame directly completed"); + DEBUGLOG(5, "Frame completed directly in outBuffer"); someMoreWork = 0; zcs->streamStage = zcss_init; } @@ -3707,7 +3707,7 @@ static size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs, } zcs->outBuffContentSize = zcs->outBuffFlushedSize = 0; if (zcs->frameEnded) { - DEBUGLOG(5, "Frame completed"); + DEBUGLOG(5, "Frame completed on flush"); someMoreWork = 0; zcs->streamStage = zcss_init; break; @@ -3781,15 +3781,19 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx, #ifdef ZSTD_MULTITHREAD if (cctx->nbThreads > 1) { + DEBUGLOG(5, "calling ZSTDMT_compressStream_generic(%i,...)", endOp); size_t const flushMin = ZSTDMT_compressStream_generic(cctx->mtctx, output, input, endOp); + DEBUGLOG(5, "ZSTDMT result : %u", (U32)flushMin); if (ZSTD_isError(flushMin)) cctx->streamStage = zcss_init; + if (endOp == ZSTD_e_end && flushMin==0) + cctx->streamStage = zcss_init; /* compression completed */ return flushMin; } #endif - DEBUGLOG(5, "starting ZSTD_compressStream_generic"); + DEBUGLOG(5, "calling ZSTD_compressStream_generic(%i,...)", endOp); CHECK_F( ZSTD_compressStream_generic(cctx, output, input, endOp) ); - DEBUGLOG(5, "completing ZSTD_compress_generic"); + DEBUGLOG(5, "completed ZSTD_compress_generic"); return cctx->outBuffContentSize - cctx->outBuffFlushedSize; /* remaining to flush */ } diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 09841282..a4b4f516 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -704,7 +704,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi zcs->inBuff.filled); DEBUGLOG(5, "new inBuff pre-filled"); zcs->dictSize = newDictSize; - } else { + } else { /* if (endFrame==1) */ zcs->inBuff.buffer = g_nullBuffer; zcs->inBuff.filled = 0; zcs->dictSize = 0; @@ -768,7 +768,7 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi zcs->jobs[wJobID].jobScanned = 1; } { size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos); - DEBUGLOG(4, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID); + DEBUGLOG(5, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID); memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite); output->pos += toWrite; job.dstFlushed += toWrite; @@ -808,11 +808,11 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu if ( (zcs->inBuff.filled >= newJobThreshold) /* filled enough : let's compress */ && (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) { /* avoid overwriting job round buffer */ - CHECK_F( ZSTDMT_createCompressionJob(zcs, zcs->targetSectionSize, 0) ); + CHECK_F( ZSTDMT_createCompressionJob(zcs, zcs->targetSectionSize, 0 /* blockToFlush */) ); } /* check for data to flush */ - CHECK_F( ZSTDMT_flushNextJob(zcs, output, (zcs->inBuff.filled == zcs->inBuffSize)) ); /* block if it wasn't possible to create new job due to saturation */ + CHECK_F( ZSTDMT_flushNextJob(zcs, output, (zcs->inBuff.filled == zcs->inBuffSize) /* blockToFlush */) ); /* block if it wasn't possible to create new job due to saturation */ /* recommended next input size : fill current input buffer */ return zcs->inBuffSize - zcs->inBuff.filled; /* note : could be zero when input buffer is fully filled and no more availability to create new job */ @@ -823,16 +823,20 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp { size_t const srcSize = zcs->inBuff.filled - zcs->dictSize; - if (srcSize) DEBUGLOG(4, "flushing : %u bytes left to compress", (U32)srcSize); + if (srcSize) + DEBUGLOG(5, "flushing : %u bytes left to compress", (U32)srcSize); if ( ((srcSize > 0) || (endFrame && !zcs->frameEnded)) && (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) { + DEBUGLOG(5, "create new job with %u bytes to compress", (U32)srcSize); + DEBUGLOG(5, "end order : %u", endFrame); CHECK_F( ZSTDMT_createCompressionJob(zcs, srcSize, endFrame) ); + DEBUGLOG(5, "resulting zcs->frameEnded : %u", zcs->frameEnded); } /* check if there is any data available to flush */ DEBUGLOG(5, "zcs->doneJobID : %u ; zcs->nextJobID : %u", - zcs->doneJobID, zcs->nextJobID); - return ZSTDMT_flushNextJob(zcs, output, 1); + zcs->doneJobID, zcs->nextJobID); + return ZSTDMT_flushNextJob(zcs, output, 1 /*blockToFlush */); } @@ -840,14 +844,14 @@ size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output) { if (zcs->nbThreads==1) return ZSTD_flushStream(zcs->cctxPool->cctx[0], output); - return ZSTDMT_flushStream_internal(zcs, output, 0); + return ZSTDMT_flushStream_internal(zcs, output, 0 /* endFrame */); } size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output) { if (zcs->nbThreads==1) return ZSTD_endStream(zcs->cctxPool->cctx[0], output); - return ZSTDMT_flushStream_internal(zcs, output, 1); + return ZSTDMT_flushStream_internal(zcs, output, 1 /* endFrame */); } size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, @@ -855,7 +859,8 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, ZSTD_inBuffer* input, ZSTD_EndDirective endOp) { - CHECK_F(ZSTDMT_compressStream(mtctx, output, input)); + if (input->pos < input->size) /* exclude final flushes */ + CHECK_F(ZSTDMT_compressStream(mtctx, output, input)); switch(endOp) { case ZSTD_e_flush: diff --git a/lib/zstd.h b/lib/zstd.h index a1954381..1e73b527 100644 --- a/lib/zstd.h +++ b/lib/zstd.h @@ -702,11 +702,11 @@ typedef enum { * More threads improve speed, but also increase memory usage. * Can only receive a value > 1 if ZSTD_MULTITHREAD is enabled. * Special: value 0 means "do not change nbThreads" */ - ZSTDMT_p_jobSize, /* Size of a compression job. Each compression job is completed in parallel. + ZSTD_p_jobSize, /* Size of a compression job. Each compression job is completed in parallel. * 0 means default, which is dynamically determined based on compression parameters. * Job size must be a minimum of overlapSize, or 1 KB, whichever is largest * The minimum size is automatically and transparently enforced */ - ZSTDMT_p_overlapSizeLog, /* Size of previous input reloaded at the beginning of each job. + ZSTD_p_overlapSizeLog, /* Size of previous input reloaded at the beginning of each job. * 0 => no overlap, 6(default) => use 1/8th of windowSize, >=9 => use full windowSize */ /* advanced parameters - may not remain available after API update */