zstdmt : can compress at block granularity
offering perspective of more accurate progression report.
This commit is contained in:
parent
863b2f8db4
commit
58ecf13e02
@ -22,6 +22,7 @@
|
||||
|
||||
/* ====== Dependencies ====== */
|
||||
#include <string.h> /* memcpy, memset */
|
||||
#include <limits.h> /* INT_MAX */
|
||||
#include "pool.h" /* threadpool */
|
||||
#include "threading.h" /* mutex */
|
||||
#include "zstd_compress_internal.h" /* MIN, ERROR, ZSTD_*, ZSTD_highbit32 */
|
||||
@ -129,7 +130,7 @@ static void ZSTDMT_freeBufferPool(ZSTDMT_bufferPool* bufPool)
|
||||
static size_t ZSTDMT_sizeof_bufferPool(ZSTDMT_bufferPool* bufPool)
|
||||
{
|
||||
size_t const poolSize = sizeof(*bufPool)
|
||||
+ (bufPool->totalBuffers - 1) * sizeof(buffer_t);
|
||||
+ (bufPool->totalBuffers - 1) * sizeof(buffer_t);
|
||||
unsigned u;
|
||||
size_t totalBufferSize = 0;
|
||||
ZSTD_pthread_mutex_lock(&bufPool->poolMutex);
|
||||
@ -201,20 +202,6 @@ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf)
|
||||
ZSTD_free(buf.start, bufPool->cMem);
|
||||
}
|
||||
|
||||
/* Sets parameters relevant to the compression job, initializing others to
|
||||
* default values. Notably, nbThreads should probably be zero. */
|
||||
static ZSTD_CCtx_params ZSTDMT_makeJobCCtxParams(ZSTD_CCtx_params const params)
|
||||
{
|
||||
ZSTD_CCtx_params jobParams;
|
||||
memset(&jobParams, 0, sizeof(jobParams));
|
||||
|
||||
jobParams.cParams = params.cParams;
|
||||
jobParams.fParams = params.fParams;
|
||||
jobParams.compressionLevel = params.compressionLevel;
|
||||
|
||||
jobParams.ldmParams = params.ldmParams;
|
||||
return jobParams;
|
||||
}
|
||||
|
||||
/* ===== CCtx Pool ===== */
|
||||
/* a single CCtx Pool can be invoked from multiple threads in parallel */
|
||||
@ -305,13 +292,16 @@ static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx)
|
||||
}
|
||||
|
||||
|
||||
/* ===== Thread worker ===== */
|
||||
/* ------------------------------------------ */
|
||||
/* ===== Thread worker ===== */
|
||||
/* ------------------------------------------ */
|
||||
|
||||
typedef struct {
|
||||
buffer_t src;
|
||||
const void* srcStart;
|
||||
size_t prefixSize;
|
||||
size_t srcSize;
|
||||
size_t readSize;
|
||||
buffer_t dstBuff;
|
||||
size_t cSize;
|
||||
size_t dstFlushed;
|
||||
@ -328,21 +318,19 @@ typedef struct {
|
||||
unsigned long long fullFrameSize;
|
||||
} ZSTDMT_jobDescription;
|
||||
|
||||
/* ZSTDMT_compressChunk() : POOL_function type */
|
||||
/* ZSTDMT_compressChunk() is a POOL_function type */
|
||||
void ZSTDMT_compressChunk(void* jobDescription)
|
||||
{
|
||||
ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription;
|
||||
ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(job->cctxPool);
|
||||
const void* const src = (const char*)job->srcStart + job->prefixSize;
|
||||
buffer_t dstBuff = job->dstBuff;
|
||||
DEBUGLOG(5, "ZSTDMT_compressChunk: job (first:%u) (last:%u) : prefixSize %u, srcSize %u ",
|
||||
job->firstChunk, job->lastChunk, (U32)job->prefixSize, (U32)job->srcSize);
|
||||
|
||||
/* ressources */
|
||||
if (cctx==NULL) {
|
||||
job->cSize = ERROR(memory_allocation);
|
||||
goto _endJob;
|
||||
}
|
||||
|
||||
if (dstBuff.start == NULL) {
|
||||
dstBuff = ZSTDMT_getBuffer(job->bufPool);
|
||||
if (dstBuff.start==NULL) {
|
||||
@ -350,30 +338,26 @@ void ZSTDMT_compressChunk(void* jobDescription)
|
||||
goto _endJob;
|
||||
}
|
||||
job->dstBuff = dstBuff;
|
||||
DEBUGLOG(5, "ZSTDMT_compressChunk: received dstBuff of size %u", (U32)dstBuff.size);
|
||||
}
|
||||
|
||||
/* init */
|
||||
if (job->cdict) {
|
||||
size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dm_auto, job->cdict, job->params, job->fullFrameSize);
|
||||
DEBUGLOG(4, "ZSTDMT_compressChunk: init using CDict (windowLog=%u)", job->params.cParams.windowLog);
|
||||
assert(job->firstChunk); /* only allowed for first job */
|
||||
if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; }
|
||||
} else { /* srcStart points at reloaded section */
|
||||
U64 const pledgedSrcSize = job->firstChunk ? job->fullFrameSize : ZSTD_CONTENTSIZE_UNKNOWN;
|
||||
ZSTD_CCtx_params jobParams = job->params; /* do not modify job->params ! copy it, modify the copy */
|
||||
size_t const forceWindowError = ZSTD_CCtxParam_setParameter(&jobParams, ZSTD_p_forceMaxWindow, !job->firstChunk);
|
||||
if (ZSTD_isError(forceWindowError)) {
|
||||
DEBUGLOG(5, "ZSTD_CCtxParam_setParameter error : %s ", ZSTD_getErrorName(forceWindowError));
|
||||
job->cSize = forceWindowError;
|
||||
goto _endJob;
|
||||
}
|
||||
DEBUGLOG(5, "ZSTDMT_compressChunk: invoking ZSTD_compressBegin_advanced_internal with windowLog = %u ", jobParams.cParams.windowLog);
|
||||
{ size_t const forceWindowError = ZSTD_CCtxParam_setParameter(&jobParams, ZSTD_p_forceMaxWindow, !job->firstChunk);
|
||||
if (ZSTD_isError(forceWindowError)) {
|
||||
job->cSize = forceWindowError;
|
||||
goto _endJob;
|
||||
} }
|
||||
{ size_t const initError = ZSTD_compressBegin_advanced_internal(cctx,
|
||||
job->srcStart, job->prefixSize, ZSTD_dm_rawContent, /* load dictionary in "content-only" mode (no header analysis) */
|
||||
NULL,
|
||||
jobParams, pledgedSrcSize);
|
||||
if (ZSTD_isError(initError)) {
|
||||
DEBUGLOG(5, "ZSTD_compressBegin_advanced_internal error : %s ", ZSTD_getErrorName(initError));
|
||||
job->cSize = initError;
|
||||
goto _endJob;
|
||||
} }
|
||||
@ -384,19 +368,50 @@ void ZSTDMT_compressChunk(void* jobDescription)
|
||||
ZSTD_invalidateRepCodes(cctx);
|
||||
}
|
||||
|
||||
DEBUGLOG(5, "Compressing into dstBuff of size %u", (U32)dstBuff.size);
|
||||
DEBUG_PRINTHEX(6, job->srcStart, 12);
|
||||
/* compress */
|
||||
#if 1
|
||||
job->cSize = (job->lastChunk) ?
|
||||
ZSTD_compressEnd (cctx, dstBuff.start, dstBuff.size, src, job->srcSize) :
|
||||
ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, job->srcSize);
|
||||
DEBUGLOG(5, "compressed %u bytes into %u bytes (first:%u) (last:%u) ",
|
||||
(unsigned)job->srcSize, (unsigned)job->cSize, job->firstChunk, job->lastChunk);
|
||||
DEBUGLOG(5, "dstBuff.size : %u ; => %s ", (U32)dstBuff.size, ZSTD_getErrorName(job->cSize));
|
||||
#else
|
||||
if (sizeof(size_t) > sizeof(int))
|
||||
assert(job->srcSize < ((size_t)INT_MAX) * ZSTD_BLOCKSIZE_MAX); /* check overflow */
|
||||
{ int const nbBlocks = (int)((job->srcSize + (ZSTD_BLOCKSIZE_MAX-1)) / ZSTD_BLOCKSIZE_MAX);
|
||||
const BYTE* ip = (const BYTE*) src;
|
||||
BYTE* const ostart = (BYTE*)dstBuff.start;
|
||||
BYTE* op = ostart;
|
||||
BYTE* oend = op + dstBuff.size;
|
||||
int blockNb;
|
||||
job->cSize = 0;
|
||||
for (blockNb = 0; blockNb < nbBlocks-1; blockNb++) {
|
||||
size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, ZSTD_BLOCKSIZE_MAX);
|
||||
if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; }
|
||||
ip += ZSTD_BLOCKSIZE_MAX;
|
||||
op += cSize; assert(op < oend);
|
||||
/* stats */
|
||||
job->cSize += cSize;
|
||||
job->readSize = ZSTD_BLOCKSIZE_MAX * (blockNb+1);
|
||||
}
|
||||
/* last block */
|
||||
{ size_t const lastBlockSize1 = job->srcSize & (ZSTD_BLOCKSIZE_MAX-1);
|
||||
size_t const lastBlockSize = (lastBlockSize1==0) ? ZSTD_BLOCKSIZE_MAX : lastBlockSize1;
|
||||
size_t const cSize = (job->lastChunk) ?
|
||||
ZSTD_compressEnd (cctx, op, oend-op, ip, lastBlockSize) :
|
||||
ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize);
|
||||
if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; }
|
||||
/* stats */
|
||||
job->cSize += cSize;
|
||||
job->readSize = ZSTD_BLOCKSIZE_MAX * (blockNb+1);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
_endJob:
|
||||
/* release */
|
||||
ZSTDMT_releaseCCtx(job->cctxPool, cctx);
|
||||
ZSTDMT_releaseBuffer(job->bufPool, job->src);
|
||||
job->src = g_nullBuffer; job->srcStart = NULL;
|
||||
/* report */
|
||||
ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex);
|
||||
job->jobCompleted = 1;
|
||||
job->jobScanned = 0;
|
||||
@ -440,6 +455,21 @@ struct ZSTDMT_CCtx_s {
|
||||
const ZSTD_CDict* cdict;
|
||||
};
|
||||
|
||||
/* Sets parameters relevant to the compression job, initializing others to
|
||||
* default values. Notably, nbThreads should probably be zero. */
|
||||
static ZSTD_CCtx_params ZSTDMT_makeJobCCtxParams(ZSTD_CCtx_params const params)
|
||||
{
|
||||
ZSTD_CCtx_params jobParams;
|
||||
memset(&jobParams, 0, sizeof(jobParams));
|
||||
|
||||
jobParams.cParams = params.cParams;
|
||||
jobParams.fParams = params.fParams;
|
||||
jobParams.compressionLevel = params.compressionLevel;
|
||||
|
||||
jobParams.ldmParams = params.ldmParams;
|
||||
return jobParams;
|
||||
}
|
||||
|
||||
static ZSTDMT_jobDescription* ZSTDMT_allocJobsTable(U32* nbJobsPtr, ZSTD_customMem cMem)
|
||||
{
|
||||
U32 const nbJobsLog2 = ZSTD_highbit32(*nbJobsPtr) + 1;
|
||||
@ -908,6 +938,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
|
||||
zcs->jobs[jobID].src = zcs->inBuff.buffer;
|
||||
zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
|
||||
zcs->jobs[jobID].srcSize = srcSize;
|
||||
zcs->jobs[jobID].readSize = 0;
|
||||
zcs->jobs[jobID].prefixSize = zcs->dictSize;
|
||||
assert(zcs->inBuff.filled >= srcSize + zcs->dictSize);
|
||||
zcs->jobs[jobID].params = zcs->params;
|
||||
|
@ -828,7 +828,7 @@ finish:
|
||||
/* Status */
|
||||
DISPLAYLEVEL(2, "\r%79s\r", "");
|
||||
DISPLAYLEVEL(2,"%-20s :%6.2f%% (%6llu => %6llu bytes, %s) \n", srcFileName,
|
||||
(double)compressedfilesize/(readsize+(!readsize) /* avoid div by zero */ )*100,
|
||||
(double)compressedfilesize / (readsize+(!readsize)/*avoid div by zero*/) * 100,
|
||||
(unsigned long long)readsize, (unsigned long long) compressedfilesize,
|
||||
dstFileName);
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user