new zstdmt version using generic treadpool
This commit is contained in:
parent
c6a6417458
commit
3b29dbd9e8
@ -46,8 +46,8 @@ struct POOL_ctx_s {
|
|||||||
Waits for jobs and executes them.
|
Waits for jobs and executes them.
|
||||||
@returns : NULL on failure else non-null.
|
@returns : NULL on failure else non-null.
|
||||||
*/
|
*/
|
||||||
static void *POOL_thread(void *opaque) {
|
static void* POOL_thread(void* opaque) {
|
||||||
POOL_ctx *ctx = (POOL_ctx *)opaque;
|
POOL_ctx* const ctx = (POOL_ctx*)opaque;
|
||||||
if (!ctx) { return NULL; }
|
if (!ctx) { return NULL; }
|
||||||
for (;;) {
|
for (;;) {
|
||||||
/* Lock the mutex and wait for a non-empty queue or until shutdown */
|
/* Lock the mutex and wait for a non-empty queue or until shutdown */
|
||||||
@ -61,7 +61,7 @@ static void *POOL_thread(void *opaque) {
|
|||||||
return opaque;
|
return opaque;
|
||||||
}
|
}
|
||||||
/* Pop a job off the queue */
|
/* Pop a job off the queue */
|
||||||
{ POOL_job job = ctx->queue[ctx->queueHead];
|
{ POOL_job const job = ctx->queue[ctx->queueHead];
|
||||||
ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
|
ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
|
||||||
/* Unlock the mutex, signal a pusher, and run the job */
|
/* Unlock the mutex, signal a pusher, and run the job */
|
||||||
if (pthread_mutex_unlock(&ctx->queueMutex)) { return NULL; }
|
if (pthread_mutex_unlock(&ctx->queueMutex)) { return NULL; }
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
#include <stdlib.h> /* malloc */
|
#include <stdlib.h> /* malloc */
|
||||||
#include <pthread.h> /* posix only, to be replaced by a more portable version */
|
#include <pool.h> /* threadpool */
|
||||||
|
#include <pthread.h> /* mutex */
|
||||||
#include "zstd_internal.h" /* MIN, ERROR */
|
#include "zstd_internal.h" /* MIN, ERROR */
|
||||||
#include "zstdmt_compress.h"
|
#include "zstdmt_compress.h"
|
||||||
|
|
||||||
@ -43,176 +44,11 @@ if (g_debugLevel>=MUTEX_WAIT_TIME_DLEVEL) { \
|
|||||||
#define ZSTDMT_NBTHREADS_MAX 128
|
#define ZSTDMT_NBTHREADS_MAX 128
|
||||||
#define ZSTDMT_NBSTACKEDFRAMES_MAX (2*ZSTDMT_NBTHREADS_MAX)
|
#define ZSTDMT_NBSTACKEDFRAMES_MAX (2*ZSTDMT_NBTHREADS_MAX)
|
||||||
|
|
||||||
typedef struct frameToWrite_s {
|
|
||||||
const void* start;
|
|
||||||
size_t frameSize;
|
|
||||||
unsigned frameID;
|
|
||||||
unsigned isLastFrame;
|
|
||||||
} frameToWrite_t;
|
|
||||||
|
|
||||||
typedef struct ZSTDMT_dstBuffer_s {
|
|
||||||
ZSTD_outBuffer out;
|
|
||||||
unsigned frameIDToWrite;
|
|
||||||
pthread_mutex_t frameTable_mutex;
|
|
||||||
pthread_mutex_t allFramesWritten_mutex;
|
|
||||||
frameToWrite_t stackedFrame[ZSTDMT_NBSTACKEDFRAMES_MAX];
|
|
||||||
unsigned nbStackedFrames;
|
|
||||||
} ZSTDMT_dstBufferManager;
|
|
||||||
|
|
||||||
static ZSTDMT_dstBufferManager ZSTDMT_createDstBufferManager(void* dst, size_t dstCapacity)
|
|
||||||
{
|
|
||||||
ZSTDMT_dstBufferManager dbm;
|
|
||||||
dbm.out.dst = dst;
|
|
||||||
dbm.out.size = dstCapacity;
|
|
||||||
dbm.out.pos = 0;
|
|
||||||
dbm.frameIDToWrite = 0;
|
|
||||||
pthread_mutex_init(&dbm.frameTable_mutex, NULL);
|
|
||||||
pthread_mutex_t* const allFramesWritten_mutex = &dbm.allFramesWritten_mutex;
|
|
||||||
pthread_mutex_init(allFramesWritten_mutex, NULL);
|
|
||||||
PTHREAD_MUTEX_LOCK(allFramesWritten_mutex); /* maybe could be merged into init ? */
|
|
||||||
dbm.nbStackedFrames = 0;
|
|
||||||
return dbm;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* note : can fail if nbStackedFrames > ZSTDMT_NBSTACKEDFRAMES_MAX.
|
|
||||||
* note2 : can only be called from a section with frameTable_mutex already locked */
|
|
||||||
static void ZSTDMT_stackFrameToWrite(ZSTDMT_dstBufferManager* dstBufferManager, frameToWrite_t frame) {
|
|
||||||
dstBufferManager->stackedFrame[dstBufferManager->nbStackedFrames++] = frame;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
typedef struct buffer_s {
|
typedef struct buffer_s {
|
||||||
void* start;
|
void* start;
|
||||||
size_t bufferSize;
|
size_t size;
|
||||||
} buffer_t;
|
} buffer_t;
|
||||||
|
|
||||||
static buffer_t ZSTDMT_getDstBuffer(const ZSTDMT_dstBufferManager* dstBufferManager)
|
|
||||||
{
|
|
||||||
ZSTD_outBuffer const out = dstBufferManager->out;
|
|
||||||
buffer_t buf;
|
|
||||||
buf.start = (char*)(out.dst) + out.pos;
|
|
||||||
buf.bufferSize = out.size - out.pos;
|
|
||||||
return buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* condition : stackNumber < dstBufferManager->nbStackedFrames.
|
|
||||||
* note : there can only be one write at a time, due to frameID condition */
|
|
||||||
static size_t ZSTDMT_writeFrame(ZSTDMT_dstBufferManager* dstBufferManager, unsigned stackNumber)
|
|
||||||
{
|
|
||||||
ZSTD_outBuffer const out = dstBufferManager->out;
|
|
||||||
size_t const frameSize = dstBufferManager->stackedFrame[stackNumber].frameSize;
|
|
||||||
const void* const frameStart = dstBufferManager->stackedFrame[stackNumber].start;
|
|
||||||
if (out.pos + frameSize > out.size)
|
|
||||||
return ERROR(dstSize_tooSmall);
|
|
||||||
DEBUGLOG(3, "writing frame %u (%u bytes) ", dstBufferManager->stackedFrame[stackNumber].frameID, (U32)frameSize);
|
|
||||||
memcpy((char*)out.dst + out.pos, frameStart, frameSize);
|
|
||||||
dstBufferManager->out.pos += frameSize;
|
|
||||||
dstBufferManager->frameIDToWrite = dstBufferManager->stackedFrame[stackNumber].frameID + 1;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static size_t ZSTDMT_tryWriteFrame(ZSTDMT_dstBufferManager* dstBufferManager,
|
|
||||||
const void* src, size_t srcSize,
|
|
||||||
unsigned frameID, unsigned isLastFrame)
|
|
||||||
{
|
|
||||||
unsigned lastFrameWritten = 0;
|
|
||||||
|
|
||||||
/* check if correct frame ordering; stack otherwise */
|
|
||||||
DEBUGLOG(5, "considering writing frame %u ", frameID);
|
|
||||||
PTHREAD_MUTEX_LOCK(&dstBufferManager->frameTable_mutex);
|
|
||||||
if (frameID != dstBufferManager->frameIDToWrite) {
|
|
||||||
DEBUGLOG(4, "writing frameID %u : not possible, waiting for %u ", frameID, dstBufferManager->frameIDToWrite);
|
|
||||||
frameToWrite_t const frame = { src, srcSize, frameID, isLastFrame };
|
|
||||||
ZSTDMT_stackFrameToWrite(dstBufferManager, frame);
|
|
||||||
pthread_mutex_unlock(&dstBufferManager->frameTable_mutex);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
pthread_mutex_unlock(&dstBufferManager->frameTable_mutex);
|
|
||||||
|
|
||||||
/* write frame
|
|
||||||
* note : only one write possible due to frameID condition */
|
|
||||||
DEBUGLOG(3, "writing frame %u (%u bytes) ", frameID, (U32)srcSize);
|
|
||||||
ZSTD_outBuffer const out = dstBufferManager->out;
|
|
||||||
if (out.pos + srcSize > out.size)
|
|
||||||
return ERROR(dstSize_tooSmall);
|
|
||||||
if (frameID) /* frameID==0 compress directly in dst buffer */
|
|
||||||
memcpy((char*)out.dst + out.pos, src, srcSize);
|
|
||||||
dstBufferManager->out.pos += srcSize;
|
|
||||||
dstBufferManager->frameIDToWrite = frameID+1;
|
|
||||||
lastFrameWritten = isLastFrame;
|
|
||||||
|
|
||||||
/* check if more frames are stacked */
|
|
||||||
PTHREAD_MUTEX_LOCK(&dstBufferManager->frameTable_mutex);
|
|
||||||
unsigned frameWritten = dstBufferManager->nbStackedFrames>0;
|
|
||||||
while (frameWritten) {
|
|
||||||
unsigned u;
|
|
||||||
frameID++;
|
|
||||||
frameWritten = 0;
|
|
||||||
for (u=0; u<dstBufferManager->nbStackedFrames; u++) {
|
|
||||||
if (dstBufferManager->stackedFrame[u].frameID == frameID) {
|
|
||||||
pthread_mutex_unlock(&dstBufferManager->frameTable_mutex);
|
|
||||||
DEBUGLOG(4, "catch up frame %u ", frameID);
|
|
||||||
{ size_t const writeError = ZSTDMT_writeFrame(dstBufferManager, u);
|
|
||||||
if (ZSTD_isError(writeError)) return writeError; }
|
|
||||||
lastFrameWritten = dstBufferManager->stackedFrame[u].isLastFrame;
|
|
||||||
dstBufferManager->frameIDToWrite = frameID+1;
|
|
||||||
/* remove frame from stack */
|
|
||||||
PTHREAD_MUTEX_LOCK(&dstBufferManager->frameTable_mutex);
|
|
||||||
dstBufferManager->stackedFrame[u] = dstBufferManager->stackedFrame[dstBufferManager->nbStackedFrames-1];
|
|
||||||
dstBufferManager->nbStackedFrames -= 1;
|
|
||||||
frameWritten = dstBufferManager->nbStackedFrames>0;
|
|
||||||
break;
|
|
||||||
} } }
|
|
||||||
pthread_mutex_unlock(&dstBufferManager->frameTable_mutex);
|
|
||||||
|
|
||||||
/* end reached : last frame written */
|
|
||||||
if (lastFrameWritten) pthread_mutex_unlock(&dstBufferManager->allFramesWritten_mutex);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
typedef struct ZSTDMT_jobDescription_s {
|
|
||||||
const void* src; /* NULL means : kill thread */
|
|
||||||
size_t srcSize;
|
|
||||||
int compressionLevel;
|
|
||||||
ZSTDMT_dstBufferManager* dstManager;
|
|
||||||
unsigned frameNumber;
|
|
||||||
unsigned isLastFrame;
|
|
||||||
} ZSTDMT_jobDescription;
|
|
||||||
|
|
||||||
typedef struct ZSTDMT_jobAgency_s {
|
|
||||||
pthread_mutex_t jobAnnounce_mutex;
|
|
||||||
pthread_mutex_t jobApply_mutex;
|
|
||||||
ZSTDMT_jobDescription jobAnnounce;
|
|
||||||
} ZSTDMT_jobAgency;
|
|
||||||
|
|
||||||
/* ZSTDMT_postjob() :
|
|
||||||
* This function is blocking as long as previous posted job is not taken.
|
|
||||||
* It could be made non-blocking, with a storage queue.
|
|
||||||
* But blocking has benefits : on top of memory savings,
|
|
||||||
* the caller will be able to measure delay, allowing dynamic speed throttle (via compression level).
|
|
||||||
*/
|
|
||||||
static void ZSTDMT_postjob(ZSTDMT_jobAgency* jobAgency, ZSTDMT_jobDescription job)
|
|
||||||
{
|
|
||||||
DEBUGLOG(5, "starting job posting ");
|
|
||||||
PTHREAD_MUTEX_LOCK(&jobAgency->jobApply_mutex); /* wait for a thread to take previous job */
|
|
||||||
DEBUGLOG(5, "job posting mutex acquired ");
|
|
||||||
jobAgency->jobAnnounce = job; /* post job */
|
|
||||||
pthread_mutex_unlock(&jobAgency->jobAnnounce_mutex); /* announce */
|
|
||||||
DEBUGLOG(5, "job available now ");
|
|
||||||
}
|
|
||||||
|
|
||||||
static ZSTDMT_jobDescription ZSTDMT_getjob(ZSTDMT_jobAgency* jobAgency)
|
|
||||||
{
|
|
||||||
PTHREAD_MUTEX_LOCK(&jobAgency->jobAnnounce_mutex); /* should check return code */
|
|
||||||
ZSTDMT_jobDescription const job = jobAgency->jobAnnounce;
|
|
||||||
pthread_mutex_unlock(&jobAgency->jobApply_mutex);
|
|
||||||
return job;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
#define ZSTDMT_NBBUFFERSPOOLED_MAX ZSTDMT_NBTHREADS_MAX
|
#define ZSTDMT_NBBUFFERSPOOLED_MAX ZSTDMT_NBTHREADS_MAX
|
||||||
typedef struct ZSTDMT_bufferPool_s {
|
typedef struct ZSTDMT_bufferPool_s {
|
||||||
pthread_mutex_t bufferPool_mutex;
|
pthread_mutex_t bufferPool_mutex;
|
||||||
@ -227,7 +63,7 @@ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* pool, size_t bSize)
|
|||||||
pool->nbBuffers--;
|
pool->nbBuffers--;
|
||||||
buffer_t const buf = pool->bTable[pool->nbBuffers];
|
buffer_t const buf = pool->bTable[pool->nbBuffers];
|
||||||
pthread_mutex_unlock(&pool->bufferPool_mutex);
|
pthread_mutex_unlock(&pool->bufferPool_mutex);
|
||||||
size_t const availBufferSize = buf.bufferSize;
|
size_t const availBufferSize = buf.size;
|
||||||
if ((availBufferSize >= bSize) & (availBufferSize <= 10*bSize)) /* large enough, but not too much */
|
if ((availBufferSize >= bSize) & (availBufferSize <= 10*bSize)) /* large enough, but not too much */
|
||||||
return buf;
|
return buf;
|
||||||
free(buf.start); /* size conditions not respected : create a new buffer */
|
free(buf.start); /* size conditions not respected : create a new buffer */
|
||||||
@ -235,7 +71,7 @@ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* pool, size_t bSize)
|
|||||||
pthread_mutex_unlock(&pool->bufferPool_mutex);
|
pthread_mutex_unlock(&pool->bufferPool_mutex);
|
||||||
/* create new buffer */
|
/* create new buffer */
|
||||||
buffer_t buf;
|
buffer_t buf;
|
||||||
buf.bufferSize = bSize;
|
buf.size = bSize;
|
||||||
buf.start = calloc(1, bSize);
|
buf.start = calloc(1, bSize);
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
@ -255,79 +91,119 @@ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* pool, buffer_t buf)
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
struct ZSTDMT_CCtx_s {
|
typedef struct {
|
||||||
pthread_t pthread[ZSTDMT_NBTHREADS_MAX];
|
ZSTD_CCtx* cctx;
|
||||||
unsigned nbThreads;
|
const void* srcStart;
|
||||||
ZSTDMT_jobAgency jobAgency;
|
size_t srcSize;
|
||||||
ZSTDMT_bufferPool bufferPool;
|
buffer_t dstBuff;
|
||||||
};
|
int compressionLevel;
|
||||||
|
unsigned frameID;
|
||||||
|
size_t cSize;
|
||||||
|
unsigned jobCompleted;
|
||||||
|
pthread_mutex_t* jobCompleted_mutex;
|
||||||
|
} ZSTDMT_jobDescription;
|
||||||
|
|
||||||
static void* ZSTDMT_compressionThread(void* arg)
|
/* ZSTDMT_compressFrame() : POOL_function type */
|
||||||
|
void ZSTDMT_compressFrame(void* jobDescription)
|
||||||
{
|
{
|
||||||
if (arg==NULL) return NULL; /* error : should not be possible */
|
DEBUGLOG(5, "Entering ZSTDMT_compressFrame() ");
|
||||||
ZSTDMT_CCtx* const mtctx = (ZSTDMT_CCtx*) arg;
|
ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription;
|
||||||
ZSTDMT_jobAgency* const jobAgency = &mtctx->jobAgency;
|
DEBUGLOG(5, "compressing %u bytes with ZSTD_compressCCtx : ", (unsigned)job->srcSize);
|
||||||
ZSTDMT_bufferPool* const pool = &mtctx->bufferPool;
|
job->cSize = ZSTD_compressCCtx(job->cctx, job->dstBuff.start, job->dstBuff.size, job->srcStart, job->srcSize, job->compressionLevel);
|
||||||
ZSTD_CCtx* const cctx = ZSTD_createCCtx();
|
DEBUGLOG(5, "compressed to %u bytes ", (unsigned)job->cSize);
|
||||||
if (cctx==NULL) return NULL; /* allocation failure : thread not started */
|
job->jobCompleted = 1;
|
||||||
DEBUGLOG(3, "thread %li created ", (long int)pthread_self());
|
DEBUGLOG(5, "unlocking mutex jobCompleted_mutex");
|
||||||
for (;;) {
|
pthread_mutex_unlock(job->jobCompleted_mutex);
|
||||||
ZSTDMT_jobDescription const job = ZSTDMT_getjob(jobAgency);
|
DEBUGLOG(5, "ZSTDMT_compressFrame completed");
|
||||||
if (job.src == NULL) {
|
|
||||||
DEBUGLOG(4, "thread exit ");
|
|
||||||
ZSTD_freeCCtx(cctx);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
ZSTDMT_dstBufferManager* dstBufferManager = job.dstManager;
|
|
||||||
size_t const dstBufferCapacity = ZSTD_compressBound(job.srcSize);
|
|
||||||
DEBUGLOG(4, "requesting a dstBuffer for frame %u", job.frameNumber);
|
|
||||||
buffer_t const dstBuffer = job.frameNumber ? ZSTDMT_getBuffer(pool, dstBufferCapacity) : ZSTDMT_getDstBuffer(dstBufferManager); /* lack params */
|
|
||||||
DEBUGLOG(4, "start compressing frame %u", job.frameNumber);
|
|
||||||
//size_t const cSize = ZSTD_compress(dstBuffer.start, dstBuffer.bufferSize, job.src, job.srcSize, job.compressionLevel);
|
|
||||||
size_t const cSize = ZSTD_compressCCtx(cctx, dstBuffer.start, dstBuffer.bufferSize, job.src, job.srcSize, job.compressionLevel);
|
|
||||||
if (ZSTD_isError(cSize)) return (void*)(cSize); /* error - find a better way */
|
|
||||||
size_t const writeError = ZSTDMT_tryWriteFrame(dstBufferManager, dstBuffer.start, cSize, job.frameNumber, job.isLastFrame); /* pas clair */
|
|
||||||
if (ZSTD_isError(writeError)) return (void*)writeError;
|
|
||||||
if (job.frameNumber) ZSTDMT_releaseBuffer(pool, dstBuffer);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* note : calls to CCtxPool only from main thread */
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
unsigned totalCCtx;
|
||||||
|
unsigned availCCtx;
|
||||||
|
ZSTD_CCtx* cctx[1]; /* variable size */
|
||||||
|
} ZSTDMT_CCtxPool;
|
||||||
|
|
||||||
|
static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(unsigned nbThreads)
|
||||||
|
{
|
||||||
|
ZSTDMT_CCtxPool* const cctxPool = (ZSTDMT_CCtxPool*) calloc(1, sizeof(ZSTDMT_CCtxPool) + nbThreads*sizeof(ZSTD_CCtx*));
|
||||||
|
if (!cctxPool) return NULL;
|
||||||
|
{ unsigned u;
|
||||||
|
for (u=0; u<nbThreads; u++)
|
||||||
|
cctxPool->cctx[u] = ZSTD_createCCtx(); /* check for NULL result ! */
|
||||||
|
}
|
||||||
|
cctxPool->totalCCtx = cctxPool->availCCtx = nbThreads;
|
||||||
|
return cctxPool;
|
||||||
|
}
|
||||||
|
|
||||||
|
static ZSTD_CCtx* ZSTDMT_getCCtx(ZSTDMT_CCtxPool* pool)
|
||||||
|
{
|
||||||
|
if (pool->availCCtx) {
|
||||||
|
pool->availCCtx--;
|
||||||
|
return pool->cctx[pool->availCCtx];
|
||||||
|
}
|
||||||
|
/* should not be possible, since totalCCtx==nbThreads */
|
||||||
|
return ZSTD_createCCtx();
|
||||||
|
}
|
||||||
|
|
||||||
|
static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx)
|
||||||
|
{
|
||||||
|
if (pool->availCCtx < pool->totalCCtx)
|
||||||
|
pool->cctx[pool->availCCtx++] = cctx;
|
||||||
|
else
|
||||||
|
/* should not be possible, since totalCCtx==nbThreads */
|
||||||
|
ZSTD_freeCCtx(cctx);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void ZSTDMT_freeCCtxPool(ZSTDMT_CCtxPool* pool)
|
||||||
|
{
|
||||||
|
unsigned u;
|
||||||
|
for (u=0; u<pool->totalCCtx; u++)
|
||||||
|
ZSTD_freeCCtx(pool->cctx[u]);
|
||||||
|
free(pool);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
struct ZSTDMT_CCtx_s {
|
||||||
|
POOL_ctx* factory;
|
||||||
|
ZSTDMT_bufferPool buffPool;
|
||||||
|
ZSTDMT_CCtxPool* cctxPool;
|
||||||
|
unsigned nbThreads;
|
||||||
|
pthread_mutex_t jobCompleted_mutex;
|
||||||
|
ZSTDMT_jobDescription jobs[1]; /* variable size */
|
||||||
|
};
|
||||||
|
|
||||||
ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads)
|
ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads)
|
||||||
{
|
{
|
||||||
if ((nbThreads < 1) | (nbThreads > ZSTDMT_NBTHREADS_MAX)) return NULL;
|
if ((nbThreads < 1) | (nbThreads > ZSTDMT_NBTHREADS_MAX)) return NULL;
|
||||||
ZSTDMT_CCtx* const cctx = (ZSTDMT_CCtx*) calloc(1, sizeof(ZSTDMT_CCtx));
|
ZSTDMT_CCtx* const cctx = (ZSTDMT_CCtx*) calloc(1, sizeof(ZSTDMT_CCtx) + nbThreads*sizeof(ZSTDMT_jobDescription));
|
||||||
if (!cctx) return NULL;
|
if (!cctx) return NULL;
|
||||||
/* init jobAgency */
|
|
||||||
pthread_mutex_init(&cctx->jobAgency.jobAnnounce_mutex, NULL); /* check return value ? */
|
|
||||||
pthread_mutex_init(&cctx->jobAgency.jobApply_mutex, NULL);
|
|
||||||
PTHREAD_MUTEX_LOCK(&cctx->jobAgency.jobAnnounce_mutex); /* no job at beginning */
|
|
||||||
/* init bufferPool */
|
|
||||||
pthread_mutex_init(&cctx->bufferPool.bufferPool_mutex, NULL);
|
|
||||||
/* start all workers */
|
|
||||||
cctx->nbThreads = nbThreads;
|
cctx->nbThreads = nbThreads;
|
||||||
DEBUGLOG(2, "nbThreads : %u \n", nbThreads);
|
cctx->factory = POOL_create(nbThreads, 1);
|
||||||
unsigned t;
|
pthread_mutex_init(&cctx->buffPool.bufferPool_mutex, NULL);
|
||||||
for (t = 0; t < nbThreads; t++) {
|
cctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads);
|
||||||
pthread_create(&cctx->pthread[t], NULL, ZSTDMT_compressionThread, cctx); /* check return value ? */
|
pthread_mutex_init(&cctx->jobCompleted_mutex, NULL);
|
||||||
}
|
|
||||||
return cctx;
|
return cctx;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* cctx)
|
size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx) /* incompleted ! */
|
||||||
{
|
{
|
||||||
/* free threads */
|
POOL_free(mtctx->factory);
|
||||||
/* free mutex (if necessary) */
|
/* free mutexes (if necessary) */
|
||||||
/* free bufferPool */
|
/* free bufferPool */
|
||||||
free(cctx); /* incompleted ! */
|
ZSTDMT_freeCCtxPool(mtctx->cctxPool);
|
||||||
|
free(mtctx);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
|
size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
|
||||||
void* dst, size_t dstCapacity,
|
void* dst, size_t dstCapacity,
|
||||||
const void* src, size_t srcSize,
|
const void* src, size_t srcSize,
|
||||||
int compressionLevel)
|
int compressionLevel)
|
||||||
{
|
{
|
||||||
ZSTDMT_jobAgency* jobAgency = &mtctx->jobAgency;
|
|
||||||
ZSTD_parameters const params = ZSTD_getParams(compressionLevel, srcSize, 0);
|
ZSTD_parameters const params = ZSTD_getParams(compressionLevel, srcSize, 0);
|
||||||
size_t const frameSizeTarget = (size_t)1 << (params.cParams.windowLog + 2);
|
size_t const frameSizeTarget = (size_t)1 << (params.cParams.windowLog + 2);
|
||||||
unsigned const nbFramesMax = (unsigned)(srcSize / frameSizeTarget) + (srcSize < frameSizeTarget) /* min 1 */;
|
unsigned const nbFramesMax = (unsigned)(srcSize / frameSizeTarget) + (srcSize < frameSizeTarget) /* min 1 */;
|
||||||
@ -336,7 +212,7 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
|
|||||||
size_t remainingSrcSize = srcSize;
|
size_t remainingSrcSize = srcSize;
|
||||||
const char* const srcStart = (const char*)src;
|
const char* const srcStart = (const char*)src;
|
||||||
size_t frameStartPos = 0;
|
size_t frameStartPos = 0;
|
||||||
ZSTDMT_dstBufferManager dbm = ZSTDMT_createDstBufferManager(dst, dstCapacity);
|
|
||||||
|
|
||||||
DEBUGLOG(2, "windowLog : %u => frameSizeTarget : %u ", params.cParams.windowLog, (U32)frameSizeTarget);
|
DEBUGLOG(2, "windowLog : %u => frameSizeTarget : %u ", params.cParams.windowLog, (U32)frameSizeTarget);
|
||||||
DEBUGLOG(2, "nbFrames : %u (size : %u bytes) ", nbFrames, (U32)avgFrameSize);
|
DEBUGLOG(2, "nbFrames : %u (size : %u bytes) ", nbFrames, (U32)avgFrameSize);
|
||||||
@ -344,15 +220,46 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
|
|||||||
{ unsigned u;
|
{ unsigned u;
|
||||||
for (u=0; u<nbFrames; u++) {
|
for (u=0; u<nbFrames; u++) {
|
||||||
size_t const frameSize = MIN(remainingSrcSize, avgFrameSize);
|
size_t const frameSize = MIN(remainingSrcSize, avgFrameSize);
|
||||||
|
size_t const dstBufferCapacity = u ? ZSTD_compressBound(frameSize) : dstCapacity;
|
||||||
|
buffer_t const dstBuffer = u ? ZSTDMT_getBuffer(&mtctx->buffPool, dstBufferCapacity) : (buffer_t){ dst, dstCapacity };
|
||||||
|
ZSTD_CCtx* cctx = ZSTDMT_getCCtx(mtctx->cctxPool);
|
||||||
|
|
||||||
|
mtctx->jobs[u].srcStart = srcStart + frameStartPos;
|
||||||
|
mtctx->jobs[u].srcSize = frameSize;
|
||||||
|
mtctx->jobs[u].compressionLevel = compressionLevel;
|
||||||
|
mtctx->jobs[u].dstBuff = dstBuffer;
|
||||||
|
mtctx->jobs[u].cctx = cctx;
|
||||||
|
mtctx->jobs[u].frameID = u;
|
||||||
|
mtctx->jobs[u].jobCompleted = 0;
|
||||||
|
mtctx->jobs[u].jobCompleted_mutex = &mtctx->jobCompleted_mutex;
|
||||||
|
|
||||||
DEBUGLOG(3, "posting job %u (%u bytes)", u, (U32)frameSize);
|
DEBUGLOG(3, "posting job %u (%u bytes)", u, (U32)frameSize);
|
||||||
ZSTDMT_jobDescription const job = { srcStart+frameStartPos, frameSize, compressionLevel,
|
POOL_add(mtctx->factory, ZSTDMT_compressFrame, &mtctx->jobs[u]);
|
||||||
&dbm, u, u==(nbFrames-1) };
|
|
||||||
ZSTDMT_postjob(jobAgency, job);
|
|
||||||
frameStartPos += frameSize;
|
frameStartPos += frameSize;
|
||||||
remainingSrcSize -= frameSize;
|
remainingSrcSize -= frameSize;
|
||||||
} }
|
} }
|
||||||
|
/* note : since nbFrames <= nbThreads, all jobs should be running immediately in parallel */
|
||||||
|
|
||||||
|
{ unsigned frameID;
|
||||||
|
size_t dstPos = 0;
|
||||||
|
for (frameID=0; frameID<nbFrames; frameID++) {
|
||||||
|
DEBUGLOG(3, "ready to write frame %u ", frameID);
|
||||||
|
while (mtctx->jobs[frameID].jobCompleted==0) {
|
||||||
|
DEBUGLOG(4, "waiting for signal jobCompleted_mutex")
|
||||||
|
pthread_mutex_lock(&mtctx->jobCompleted_mutex);
|
||||||
|
}
|
||||||
|
{ size_t const cSize = mtctx->jobs[frameID].cSize;
|
||||||
|
if (ZSTD_isError(cSize)) return cSize;
|
||||||
|
if (dstPos + cSize > dstCapacity) return ERROR(dstSize_tooSmall);
|
||||||
|
if (frameID) memcpy((char*)dst + dstPos, mtctx->jobs[frameID].dstBuff.start, mtctx->jobs[frameID].cSize);
|
||||||
|
dstPos += cSize ;
|
||||||
|
}
|
||||||
|
ZSTDMT_releaseCCtx(mtctx->cctxPool, mtctx->jobs[frameID].cctx);
|
||||||
|
ZSTDMT_releaseBuffer(&mtctx->buffPool, mtctx->jobs[frameID].dstBuff);
|
||||||
|
}
|
||||||
|
DEBUGLOG(4, "compressed size : %u ", (U32)dstPos);
|
||||||
|
return dstPos;
|
||||||
|
}
|
||||||
|
|
||||||
PTHREAD_MUTEX_LOCK(&dbm.allFramesWritten_mutex);
|
|
||||||
DEBUGLOG(4, "compressed size : %u ", (U32)dbm.out.pos);
|
|
||||||
return dbm.out.pos;
|
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user