zstdmt : flush() only lock to read shared job members

Other job members are accessed directly.
This avoids a full job copy, which would access everything,
including a few members that are supposed to be used by worker only,
uselessly requiring additional locks to avoid race conditions.
dev
Yann Collet 2018-01-26 12:15:43 -08:00
parent d2b62b6fa5
commit 79b6e28b0a
1 changed files with 58 additions and 57 deletions

View File

@ -309,16 +309,16 @@ static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx)
typedef struct {
size_t consumed; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */
size_t cSize; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */
ZSTD_pthread_mutex_t* mtctx_mutex; /* Thread-safe - used by mtctx and (all) worker */
ZSTD_pthread_cond_t* mtctx_cond; /* Thread-safe - used by mtctx and (all) worker */
ZSTDMT_CCtxPool* cctxPool; /* Thread-safe - used by mtctx and (all) worker */
ZSTDMT_bufferPool* bufPool; /* Thread-safe - used by mtctx and (all) worker */
buffer_t dstBuff; /* set by worker (or mtctx), then read by worker, then modified by mtctx => no barrier */
size_t cSize; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx, then set0 by mtctx */
ZSTD_pthread_mutex_t* mtctx_mutex; /* Thread-safe - used by mtctx and (all) workers */
ZSTD_pthread_cond_t* mtctx_cond; /* Thread-safe - used by mtctx and (all) workers */
ZSTDMT_CCtxPool* cctxPool; /* Thread-safe - used by mtctx and (all) workers */
ZSTDMT_bufferPool* bufPool; /* Thread-safe - used by mtctx and (all) workers */
buffer_t dstBuff; /* set by worker (or mtctx), then read by worker & mtctx, then modified by mtctx => no barrier */
buffer_t srcBuff; /* set by mtctx, then released by worker => no barrier */
const void* prefixStart; /* set by mtctx, then read by worker => no barrier */
const void* prefixStart; /* set by mtctx, then read and set0 by worker => no barrier */
size_t prefixSize; /* set by mtctx, then read by worker => no barrier */
size_t srcSize; /* set by mtctx, then read by worker => no barrier */
size_t srcSize; /* set by mtctx, then read by worker & mtctx => no barrier */
unsigned firstChunk; /* set by mtctx, then read by worker => no barrier */
unsigned lastChunk; /* set by mtctx, then read by worker => no barrier */
ZSTD_CCtx_params params; /* set by mtctx, then read by worker => no barrier */
@ -341,15 +341,13 @@ void ZSTDMT_compressChunk(void* jobDescription)
job->cSize = ERROR(memory_allocation);
goto _endJob;
}
if (dstBuff.start == NULL) {
if (dstBuff.start == NULL) { /* streaming job : doesn't provide a dstBuffer */
dstBuff = ZSTDMT_getBuffer(job->bufPool);
if (dstBuff.start==NULL) {
job->cSize = ERROR(memory_allocation);
goto _endJob;
}
ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex);
job->dstBuff = dstBuff; /* this value can be read in ZSTDMT_flush, when it copies the whole job */
ZSTD_pthread_mutex_unlock(job->mtctx_mutex);
}
/* init */
@ -1087,6 +1085,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS
if ( (srcSize == 0)
&& (mtctx->nextJobID>0)/*single chunk must also write frame header*/ ) {
DEBUGLOG(5, "ZSTDMT_createCompressionJob: creating a last empty block to end frame");
assert(endOp == ZSTD_e_end); /* only possible case : need to end the frame with an empty last block */
ZSTDMT_writeLastEmptyBlock(mtctx->jobs + jobID);
mtctx->nextJobID++;
@ -1094,12 +1093,12 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS
}
}
DEBUGLOG(5, "ZSTDMT_createCompressionJob: posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)",
DEBUGLOG(5, "ZSTDMT_createCompressionJob: posting job %u : %u bytes (end:%u, jobNb == %u (mod:%u))",
mtctx->nextJobID,
(U32)mtctx->jobs[jobID].srcSize,
mtctx->jobs[jobID].lastChunk,
mtctx->doneJobID,
mtctx->doneJobID & mtctx->jobIDMask);
mtctx->nextJobID,
jobID);
if (POOL_tryAdd(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[jobID])) {
mtctx->nextJobID++;
mtctx->jobReady = 0;
@ -1118,15 +1117,17 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS
static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, unsigned blockToFlush, ZSTD_EndDirective end)
{
unsigned const wJobID = mtctx->doneJobID & mtctx->jobIDMask;
DEBUGLOG(5, "ZSTDMT_flushProduced (blocking:%u)", blockToFlush);
DEBUGLOG(5, "ZSTDMT_flushProduced (blocking:%u , job %u <= %u)",
blockToFlush, mtctx->doneJobID, mtctx->nextJobID);
assert(output->size >= output->pos);
ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex);
if (blockToFlush && (mtctx->doneJobID < mtctx->nextJobID)) {
if ( blockToFlush
&& (mtctx->doneJobID < mtctx->nextJobID) ) {
assert(mtctx->jobs[wJobID].dstFlushed <= mtctx->jobs[wJobID].cSize);
while (mtctx->jobs[wJobID].dstFlushed == mtctx->jobs[wJobID].cSize) {
while (mtctx->jobs[wJobID].dstFlushed == mtctx->jobs[wJobID].cSize) { /* nothing to flush */
if (mtctx->jobs[wJobID].consumed == mtctx->jobs[wJobID].srcSize) {
DEBUGLOG(5, "job %u is completely consumed (%u == %u) => don't wait for cond",
DEBUGLOG(5, "job %u is completely consumed (%u == %u) => don't wait for cond, there will be none",
mtctx->doneJobID, (U32)mtctx->jobs[wJobID].consumed, (U32)mtctx->jobs[wJobID].srcSize);
break;
}
@ -1135,60 +1136,60 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u
ZSTD_pthread_cond_wait(&mtctx->mtctx_cond, &mtctx->mtctx_mutex); /* block when nothing available to flush but more to come */
} }
/* some output is available to be flushed */
{ ZSTDMT_jobDescription job = mtctx->jobs[wJobID];
/* try to flush something */
{ size_t cSize = mtctx->jobs[wJobID].cSize;
size_t const srcConsumed = mtctx->jobs[wJobID].consumed;
ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex);
if (ZSTD_isError(job.cSize)) {
if (ZSTD_isError(cSize)) {
DEBUGLOG(5, "ZSTDMT_flushProduced: job %u : compression error detected : %s",
mtctx->doneJobID, ZSTD_getErrorName(job.cSize));
mtctx->doneJobID, ZSTD_getErrorName(cSize));
ZSTDMT_waitForAllJobsCompleted(mtctx);
ZSTDMT_releaseAllJobResources(mtctx);
return job.cSize;
return cSize;
}
/* add frame checksum if necessary (can only happen once) */
assert(job.consumed <= job.srcSize);
if ( (job.consumed == job.srcSize)
&& job.frameChecksumNeeded ) {
assert(srcConsumed <= mtctx->jobs[wJobID].srcSize);
if ( (srcConsumed == mtctx->jobs[wJobID].srcSize) /* job completed -> worker no longer active */
&& mtctx->jobs[wJobID].frameChecksumNeeded ) {
U32 const checksum = (U32)XXH64_digest(&mtctx->xxhState);
DEBUGLOG(4, "ZSTDMT_flushProduced: writing checksum : %08X \n", checksum);
MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum);
job.cSize += 4;
mtctx->jobs[wJobID].cSize += 4;
MEM_writeLE32((char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].cSize, checksum);
cSize += 4;
mtctx->jobs[wJobID].cSize += 4; /* can write this shared value, as worker is no longer active */
mtctx->jobs[wJobID].frameChecksumNeeded = 0;
}
assert(job.cSize >= job.dstFlushed);
if (job.dstBuff.start != NULL) { /* dst buffer present : some work is ongoing or completed */
size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%.1f%%)",
(U32)toWrite, mtctx->doneJobID, (double)job.consumed / (job.srcSize + !job.srcSize /*avoid div0*/) * 100);
memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
if (cSize > 0) { /* compression is ongoing or completed */
size_t const toWrite = MIN(cSize - mtctx->jobs[wJobID].dstFlushed, output->size - output->pos);
DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%u/%u)",
(U32)toWrite, mtctx->doneJobID, (U32)srcConsumed, (U32)mtctx->jobs[wJobID].srcSize);
assert(cSize >= mtctx->jobs[wJobID].dstFlushed);
memcpy((char*)output->dst + output->pos, (const char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].dstFlushed, toWrite);
output->pos += toWrite;
job.dstFlushed += toWrite;
mtctx->jobs[wJobID].dstFlushed += toWrite; /* can write : this value is only used by mtctx */
if ( (job.consumed == job.srcSize) /* job completed */
&& (job.dstFlushed == job.cSize) ) { /* output buffer fully flushed => free this job position */
if ( (srcConsumed == mtctx->jobs[wJobID].srcSize) /* job completed */
&& (mtctx->jobs[wJobID].dstFlushed == cSize) ) { /* output buffer fully flushed => free this job position */
DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one",
mtctx->doneJobID, (U32)job.dstFlushed);
assert(job.srcBuff.start == NULL); /* srcBuff supposed already released */
ZSTDMT_releaseBuffer(mtctx->bufPool, job.dstBuff);
mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed);
assert(mtctx->jobs[wJobID].srcBuff.start == NULL); /* srcBuff supposed already released */
ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[wJobID].dstBuff);
mtctx->jobs[wJobID].dstBuff = g_nullBuffer;
mtctx->consumed += job.srcSize;
mtctx->produced += job.cSize;
mtctx->jobs[wJobID].cSize = 0; /* ensure this job slot is considered "not started" in future check */
mtctx->consumed += mtctx->jobs[wJobID].srcSize;
mtctx->produced += cSize;
mtctx->doneJobID++;
} else {
mtctx->jobs[wJobID].dstFlushed = job.dstFlushed; /* remember how much was flushed for next attempt */
} }
/* return value : how many bytes left in buffer ; fake it to 1 when unknown but >0 */
if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed);
if (job.srcSize > job.consumed) return 1; /* current job not completely compressed */
if (cSize > mtctx->jobs[wJobID].dstFlushed) return (cSize - mtctx->jobs[wJobID].dstFlushed);
if (mtctx->jobs[wJobID].srcSize > srcConsumed) return 1; /* current job not completely compressed */
}
if (mtctx->doneJobID < mtctx->nextJobID) return 1; /* some more jobs to flush */
if (mtctx->jobReady) return 1; /* one job is ready and queued! */
if (mtctx->inBuff.filled > 0) return 1; /* input not empty */
mtctx->allJobsCompleted = mtctx->frameEnded; /* last frame entirely flushed */
if (end == ZSTD_e_end) return !mtctx->frameEnded; /* for ZSTD_e_end, question becomes : is frame completed ? It may need a last null-block */
return 0; /* everything flushed */
if (mtctx->doneJobID < mtctx->nextJobID) return 1; /* some more jobs ongoing */
if (mtctx->jobReady) return 1; /* one job is ready to push, just not yet in the list */
if (mtctx->inBuff.filled > 0) return 1; /* input is not empty, and still needs to be converted into a job */
mtctx->allJobsCompleted = mtctx->frameEnded; /* all chunks are entirely flushed => if this one is last one, frame is completed */
if (end == ZSTD_e_end) return !mtctx->frameEnded; /* for ZSTD_e_end, question becomes : is frame completed ? instead of : are internal buffers fully flushed ? */
return 0; /* internal buffers fully flushed */
}
@ -1217,10 +1218,10 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
}
/* single-pass shortcut (note : synchronous-mode) */
if ( (mtctx->nextJobID == 0) /* just started */
&& (mtctx->inBuff.filled == 0) /* nothing buffered */
&& (!mtctx->jobReady) /* no job already created */
&& (endOp == ZSTD_e_end) /* end order */
if ( (mtctx->nextJobID == 0) /* just started */
&& (mtctx->inBuff.filled == 0) /* nothing buffered */
&& (!mtctx->jobReady) /* no job already created */
&& (endOp == ZSTD_e_end) /* end order */
&& (output->size - output->pos >= ZSTD_compressBound(input->size - input->pos)) ) { /* enough space in dst */
size_t const cSize = ZSTDMT_compress_advanced_internal(mtctx,
(char*)output->dst + output->pos, output->size - output->pos,