completed ZSTDMT streaming compression

Provides the baseline compression API :
size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel);
size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input);
size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output);
size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output);

Not tested yet
dev
Yann Collet 2017-01-17 15:31:16 -08:00
parent 5b726dbe4d
commit a73c412932
1 changed files with 119 additions and 29 deletions

View File

@ -2,10 +2,11 @@
#include <string.h> /* memcpy */ #include <string.h> /* memcpy */
#include <pool.h> /* threadpool */ #include <pool.h> /* threadpool */
#include "threading.h" /* mutex */ #include "threading.h" /* mutex */
#include "zstd_internal.h" /* MIN, ERROR, ZSTD_* */ #include "zstd_internal.h" /* MIN, ERROR, ZSTD_*, ZSTD_highbit32 */
#include "zstdmt_compress.h" #include "zstdmt_compress.h"
#if 0 #if 0
# include <stdio.h> # include <stdio.h>
# include <unistd.h> # include <unistd.h>
# include <sys/times.h> # include <sys/times.h>
@ -163,8 +164,14 @@ static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx)
/* ===== Thread worker ===== */ /* ===== Thread worker ===== */
typedef struct {
buffer_t buffer;
size_t filled;
} inBuff_t;
typedef struct { typedef struct {
ZSTD_CCtx* cctx; ZSTD_CCtx* cctx;
buffer_t src;
const void* srcStart; const void* srcStart;
size_t srcSize; size_t srcSize;
buffer_t dstBuff; buffer_t dstBuff;
@ -208,25 +215,41 @@ _endJob:
} }
/* ------------------------------------------ */
/* ===== Multi-threaded compression ===== */ /* ===== Multi-threaded compression ===== */
/* ------------------------------------------ */
struct ZSTDMT_CCtx_s { struct ZSTDMT_CCtx_s {
POOL_ctx* factory; POOL_ctx* factory;
ZSTDMT_bufferPool* buffPool; ZSTDMT_bufferPool* buffPool;
ZSTDMT_CCtxPool* cctxPool; ZSTDMT_CCtxPool* cctxPool;
unsigned nbThreads;
pthread_mutex_t jobCompleted_mutex; pthread_mutex_t jobCompleted_mutex;
pthread_cond_t jobCompleted_cond; pthread_cond_t jobCompleted_cond;
ZSTDMT_jobDescription jobs[1]; /* variable size */ size_t targetSectionSize;
size_t inBuffSize;
inBuff_t inBuff;
ZSTD_parameters params;
unsigned nbThreads;
unsigned jobIDMask;
unsigned doneJobID;
unsigned nextJobID;
unsigned frameEnded;
ZSTDMT_jobDescription jobs[1]; /* variable size (must lies at the end) */
}; };
ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads) ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads)
{ {
ZSTDMT_CCtx* cctx; ZSTDMT_CCtx* cctx;
U32 const minNbJobs = nbThreads + 1;
U32 const nbJobsLog2 = ZSTD_highbit32(minNbJobs) + 1;
U32 const nbJobs = 1 << nbJobsLog2;
DEBUGLOG(4, "nbThreads : %u ; minNbJobs : %u ; nbJobsLog2 : %u ; nbJobs : %u \n",
nbThreads, minNbJobs, nbJobsLog2, nbJobs);
if ((nbThreads < 1) | (nbThreads > ZSTDMT_NBTHREADS_MAX)) return NULL; if ((nbThreads < 1) | (nbThreads > ZSTDMT_NBTHREADS_MAX)) return NULL;
cctx = (ZSTDMT_CCtx*) calloc(1, sizeof(ZSTDMT_CCtx) + nbThreads*sizeof(ZSTDMT_jobDescription)); cctx = (ZSTDMT_CCtx*) calloc(1, sizeof(ZSTDMT_CCtx) + nbJobs*sizeof(ZSTDMT_jobDescription));
if (!cctx) return NULL; if (!cctx) return NULL;
cctx->nbThreads = nbThreads; cctx->nbThreads = nbThreads;
cctx->jobIDMask = nbJobs - 1;
cctx->factory = POOL_create(nbThreads, 1); cctx->factory = POOL_create(nbThreads, 1);
cctx->buffPool = ZSTDMT_createBufferPool(nbThreads); cctx->buffPool = ZSTDMT_createBufferPool(nbThreads);
cctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads); cctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads);
@ -338,46 +361,46 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
/* ======= Streaming API ======= */ /* ======= Streaming API ======= */
/* ====================================== */ /* ====================================== */
#if 0 #if 1
size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) { size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) {
zcs->params = ZSTD_getParams(compressionLevel, 0, 0); zcs->params = ZSTD_getParams(compressionLevel, 0, 0);
zcs->targetSectionSize = 1 << (zcs->params.cParams.windowLog + 2); zcs->targetSectionSize = (size_t)1 << (zcs->params.cParams.windowLog + 2);
zcs->inBuffSize = 5 * (1 << zcs->params.cParams.windowLog); zcs->inBuffSize = 5 * (1 << zcs->params.cParams.windowLog);
zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize); /* check for NULL ! */ zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize); /* check for NULL ! */
zcs->inBuff.current = 0; zcs->inBuff.filled = 0;
zcs->doneJobID = 0; zcs->doneJobID = 0;
zcs->nextJobID = 0; zcs->nextJobID = 0;
zcs->frameEnded = 0;
return 0; return 0;
} }
typedef struct {
buffer_t buffer;
unsigned current;
} inBuff_t;
size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input) size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
{ {
if (zcs->frameEnded) return ERROR(stage_wrong);
/* fill input buffer */ /* fill input buffer */
{ size_t const toLoad = MIN(input->size - input->pos, zcs->inBuffSize - zcs->inBuff.current); { size_t const toLoad = MIN(input->size - input->pos, zcs->inBuffSize - zcs->inBuff.filled);
memcpy((char*)zcs->inBuff.buffer.start + zcs->inBuff.current, input->src, toLoad); memcpy((char*)zcs->inBuff.buffer.start + zcs->inBuff.filled, input->src, toLoad);
input->pos += toLoad; input->pos += toLoad;
} }
if (zcs->inBuff.current == zcs->inBuffSize) { /* filled enough : let's compress */ if (zcs->inBuff.filled == zcs->inBuffSize) { /* filled enough : let's compress */
size_t const dstBufferCapacity = ZSTD_compressBound(zcs->targetSectionSize); size_t const dstBufferCapacity = ZSTD_compressBound(zcs->targetSectionSize);
buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->targetSectionSize); /* should check for NULL */ buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity); /* should check for NULL */
ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool); /* should check for NULL */ ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool); /* should check for NULL */
unsigned const jobID = zcs->nextJobID & zcs->jobIDmask; unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
zcs->jobs[jobID].srcStart = zcs->inBuff.start; zcs->jobs[jobID].src = zcs->inBuff.buffer;
zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
zcs->jobs[jobID].srcSize = zcs->targetSectionSize; zcs->jobs[jobID].srcSize = zcs->targetSectionSize;
zcs->jobs[jobID].fullFrameSize = 0; zcs->jobs[jobID].fullFrameSize = 0;
zcs->jobs[jobID].compressionLevel = zcs->compressionLevel; zcs->jobs[jobID].params = zcs->params;
zcs->jobs[jobID].dstBuff = dstBuffer; zcs->jobs[jobID].dstBuff = dstBuffer;
zcs->jobs[jobID].cctx = cctx; zcs->jobs[jobID].cctx = cctx;
zcs->jobs[jobID].frameID = (jobID>0); zcs->jobs[jobID].firstChunk = (jobID==0);
zcs->jobs[jobID].lastChunk = 0;
zcs->jobs[jobID].jobCompleted = 0; zcs->jobs[jobID].jobCompleted = 0;
zcs->jobs[jobID].dstFlushed = 0; zcs->jobs[jobID].dstFlushed = 0;
zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex; zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex;
@ -385,22 +408,22 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
/* get a new buffer for next input - save remaining into it */ /* get a new buffer for next input - save remaining into it */
zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize); /* check for NULL ! */ zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize); /* check for NULL ! */
zcs->inBuff.current = zcs->inBuffSize - zcs->targetSectionSize; zcs->inBuff.filled = (U32)(zcs->inBuffSize - zcs->targetSectionSize);
memcpy(zcs->inBuff.buffer.start, (char*)zcs->jobs[jobID].srcStart + zcs->targetSectionSize, zcs->inBuff.current); memcpy(zcs->inBuff.buffer.start, (const char*)zcs->jobs[jobID].srcStart + zcs->targetSectionSize, zcs->inBuff.filled);
DEBUGLOG(3, "posting job %u (%u bytes)", jobID, (U32)zcs->jobs[jobID].srcSize); DEBUGLOG(3, "posting job %u (%u bytes)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize);
POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]); POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]);
zcs->nextJobID++; zcs->nextJobID++;
} }
/* check if there is any data available to flush */ /* check if there is any data available to flush */
{ unsigned const jobID = zcs->doneJobID & zcs->jobIDmask; { unsigned const jobID = zcs->doneJobID & zcs->jobIDMask;
ZSTDMT_jobDescription job = zcs->jobs[jobID]; ZSTDMT_jobDescription job = zcs->jobs[jobID];
if (job.jobCompleted) { /* job completed : output can be flushed */ if (job.jobCompleted) { /* job completed : output can be flushed */
size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos); size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); zcs->jobs[jobID].cctx = NULL; /* release cctx for future task */ ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); zcs->jobs[jobID].cctx = NULL; /* release cctx for future task */
free(job.srcStart); zcs->jobs[jobID].srcStart = NULL; /* note : need a buff_t for release */ ZSTDMT_releaseBuffer(zcs->buffPool, job.src); zcs->jobs[jobID].srcStart = NULL; zcs->jobs[jobID].src = (buffer_t) { NULL, 0 };
memcpy((char*)output->dst + output->pos, job.dstBuff.start + job.dstFlushed, toWrite); memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
output->pos += toWrite; output->pos += toWrite;
job.dstFlushed += toWrite; job.dstFlushed += toWrite;
if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => next one */ if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => next one */
@ -411,10 +434,77 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
} } } }
/* recommended next input size : fill current input buffer */ /* recommended next input size : fill current input buffer */
return zcs->inBuffSize - zcs->inBuff.current; return zcs->inBuffSize - zcs->inBuff.filled;
}
static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned endFrame)
{
size_t const srcSize = zcs->inBuff.filled;
if ((srcSize > 0) || (endFrame && !zcs->frameEnded)) {
size_t const dstBufferCapacity = ZSTD_compressBound(srcSize);
buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity); /* should check for NULL */
ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool); /* should check for NULL */
unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
zcs->jobs[jobID].src = zcs->inBuff.buffer;
zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
zcs->jobs[jobID].srcSize = srcSize;
zcs->jobs[jobID].fullFrameSize = 0;
zcs->jobs[jobID].params = zcs->params;
zcs->jobs[jobID].dstBuff = dstBuffer;
zcs->jobs[jobID].cctx = cctx;
zcs->jobs[jobID].firstChunk = (jobID==0);
zcs->jobs[jobID].lastChunk = endFrame;
zcs->jobs[jobID].jobCompleted = 0;
zcs->jobs[jobID].dstFlushed = 0;
zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex;
zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond;
/* get a new buffer for next input */
if (!endFrame) {
zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize); /* check for NULL ! */
zcs->inBuff.filled = 0;
} else {
zcs->frameEnded = 1;
}
DEBUGLOG(3, "posting job %u (%u bytes)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize);
POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]);
zcs->nextJobID++;
}
/* check if there is any data available to flush */
{ unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
ZSTDMT_jobDescription job = zcs->jobs[wJobID];
if (job.jobCompleted) { /* job completed : output can be flushed */
size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); zcs->jobs[wJobID].cctx = NULL; /* release cctx for future task */
ZSTDMT_releaseBuffer(zcs->buffPool, job.src); zcs->jobs[wJobID].srcStart = NULL; zcs->jobs[wJobID].src = (buffer_t) { NULL, 0 };
memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
output->pos += toWrite;
job.dstFlushed += toWrite;
if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => next one */
ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff); zcs->jobs[wJobID].dstBuff = (buffer_t) { NULL, 0 };
zcs->doneJobID++;
} else {
zcs->jobs[wJobID].dstFlushed = job.dstFlushed;
} }
/* return value : how many bytes left in buffer ; fake it to 1 if unknown but >0 */
if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed);
return (zcs->doneJobID < zcs->nextJobID);
}
}
size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output)
{
return ZSTDMT_flushStream_internal(zcs, output, 0);
}
size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output)
{
return ZSTDMT_flushStream_internal(zcs, output, 1);
} }
size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output);
size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output);
#endif #endif