added mutex for compression level to avoid data race

This commit is contained in:
Paul Cruz 2017-08-02 10:27:33 -07:00
parent 69ef22c0ac
commit 8be7bba08c

View File

@ -59,7 +59,6 @@ typedef struct {
typedef struct { typedef struct {
buffer_t src; buffer_t src;
buffer_t dst; buffer_t dst;
unsigned compressionLevel;
unsigned jobID; unsigned jobID;
unsigned lastJobPlusOne; unsigned lastJobPlusOne;
size_t compressedSize; size_t compressedSize;
@ -78,7 +77,6 @@ typedef struct {
typedef struct { typedef struct {
unsigned compressionLevel; unsigned compressionLevel;
unsigned numActiveThreads;
unsigned numJobs; unsigned numJobs;
unsigned nextJobID; unsigned nextJobID;
unsigned threadError; unsigned threadError;
@ -141,6 +139,7 @@ typedef struct {
mutex_t compressionCompletion_mutex; mutex_t compressionCompletion_mutex;
mutex_t createCompletion_mutex; mutex_t createCompletion_mutex;
mutex_t writeCompletion_mutex; mutex_t writeCompletion_mutex;
mutex_t compressionLevel_mutex;
size_t lastDictSize; size_t lastDictSize;
inBuff_t input; inBuff_t input;
jobDescription* jobs; jobDescription* jobs;
@ -202,6 +201,7 @@ static int freeCCtx(adaptCCtx* ctx)
error |= destroyMutex(&ctx->compressionCompletion_mutex); error |= destroyMutex(&ctx->compressionCompletion_mutex);
error |= destroyMutex(&ctx->createCompletion_mutex); error |= destroyMutex(&ctx->createCompletion_mutex);
error |= destroyMutex(&ctx->writeCompletion_mutex); error |= destroyMutex(&ctx->writeCompletion_mutex);
error |= destroyMutex(&ctx->compressionLevel_mutex);
error |= ZSTD_isError(ZSTD_freeCCtx(ctx->cctx)); error |= ZSTD_isError(ZSTD_freeCCtx(ctx->cctx));
free(ctx->input.buffer.start); free(ctx->input.buffer.start);
if (ctx->jobs){ if (ctx->jobs){
@ -243,6 +243,7 @@ static int initCCtx(adaptCCtx* ctx, unsigned numJobs)
pthreadError |= initMutex(&ctx->compressionCompletion_mutex); pthreadError |= initMutex(&ctx->compressionCompletion_mutex);
pthreadError |= initMutex(&ctx->createCompletion_mutex); pthreadError |= initMutex(&ctx->createCompletion_mutex);
pthreadError |= initMutex(&ctx->writeCompletion_mutex); pthreadError |= initMutex(&ctx->writeCompletion_mutex);
pthreadError |= initMutex(&ctx->compressionLevel_mutex);
if (pthreadError) return pthreadError; if (pthreadError) return pthreadError;
} }
ctx->numJobs = numJobs; ctx->numJobs = numJobs;
@ -384,16 +385,22 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
double compressWaitWriteCompletion; double compressWaitWriteCompletion;
double writeWaitCompressionCompletion; double writeWaitCompressionCompletion;
double const threshold = 0.00001; double const threshold = 0.00001;
unsigned const prevCompressionLevel = ctx->compressionLevel; unsigned prevCompressionLevel;
pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
prevCompressionLevel = ctx->compressionLevel;
pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
if (g_forceCompressionLevel) { if (g_forceCompressionLevel) {
pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
ctx->compressionLevel = g_compressionLevel; ctx->compressionLevel = g_compressionLevel;
pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
return; return;
} }
DEBUG(2, "adapting compression level %u\n", ctx->compressionLevel); DEBUG(2, "adapting compression level %u\n", prevCompressionLevel);
/* read and reset completion measurements */ /* read and reset completion measurements */
pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex); pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
@ -414,7 +421,7 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex); pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
DEBUG(2, "convergence counter: %u\n", ctx->convergenceCounter); DEBUG(2, "convergence counter: %u\n", ctx->convergenceCounter);
assert(g_minCLevel <= ctx->compressionLevel && g_maxCLevel >= ctx->compressionLevel); assert(g_minCLevel <= prevCompressionLevel && g_maxCLevel >= prevCompressionLevel);
/* adaptation logic */ /* adaptation logic */
if (ctx->cooldown) ctx->cooldown--; if (ctx->cooldown) ctx->cooldown--;
@ -424,14 +431,16 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
/* use whichever one waited less because it was slower */ /* use whichever one waited less because it was slower */
double const completion = MAX(createWaitCompressionCompletion, writeWaitCompressionCompletion); double const completion = MAX(createWaitCompressionCompletion, writeWaitCompressionCompletion);
unsigned const change = convertCompletionToChange(completion); unsigned const change = convertCompletionToChange(completion);
unsigned const boundChange = MIN(change, ctx->compressionLevel - g_minCLevel); unsigned const boundChange = MIN(change, prevCompressionLevel - g_minCLevel);
if (ctx->convergenceCounter >= CONVERGENCE_LOWER_BOUND && boundChange != 0) { if (ctx->convergenceCounter >= CONVERGENCE_LOWER_BOUND && boundChange != 0) {
/* reset convergence counter, might have been a spike */ /* reset convergence counter, might have been a spike */
ctx->convergenceCounter = 0; ctx->convergenceCounter = 0;
DEBUG(2, "convergence counter reset, no change applied\n"); DEBUG(2, "convergence counter reset, no change applied\n");
} }
else if (boundChange != 0) { else if (boundChange != 0) {
pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
ctx->compressionLevel -= boundChange; ctx->compressionLevel -= boundChange;
pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
ctx->cooldown = CLEVEL_DECREASE_COOLDOWN; ctx->cooldown = CLEVEL_DECREASE_COOLDOWN;
ctx->convergenceCounter = 1; ctx->convergenceCounter = 1;
@ -442,14 +451,16 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
/* compress waiting on write */ /* compress waiting on write */
double const completion = MIN(compressWaitWriteCompletion, compressWaitCreateCompletion); double const completion = MIN(compressWaitWriteCompletion, compressWaitCreateCompletion);
unsigned const change = convertCompletionToChange(completion); unsigned const change = convertCompletionToChange(completion);
unsigned const boundChange = MIN(change, g_maxCLevel - ctx->compressionLevel); unsigned const boundChange = MIN(change, g_maxCLevel - prevCompressionLevel);
if (ctx->convergenceCounter >= CONVERGENCE_LOWER_BOUND && boundChange != 0) { if (ctx->convergenceCounter >= CONVERGENCE_LOWER_BOUND && boundChange != 0) {
/* reset convergence counter, might have been a spike */ /* reset convergence counter, might have been a spike */
ctx->convergenceCounter = 0; ctx->convergenceCounter = 0;
DEBUG(2, "convergence counter reset, no change applied\n"); DEBUG(2, "convergence counter reset, no change applied\n");
} }
else if (boundChange != 0) { else if (boundChange != 0) {
pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
ctx->compressionLevel += boundChange; ctx->compressionLevel += boundChange;
pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
ctx->cooldown = 0; ctx->cooldown = 0;
ctx->convergenceCounter = 1; ctx->convergenceCounter = 1;
@ -458,9 +469,11 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
} }
pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
if (ctx->compressionLevel == prevCompressionLevel) { if (ctx->compressionLevel == prevCompressionLevel) {
ctx->convergenceCounter++; ctx->convergenceCounter++;
} }
pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
} }
static size_t getUseableDictSize(unsigned compressionLevel) static size_t getUseableDictSize(unsigned compressionLevel)
@ -540,15 +553,23 @@ static void* compressionThread(void* arg)
/* adapt compression level */ /* adapt compression level */
if (currJob) adaptCompressionLevel(ctx); if (currJob) adaptCompressionLevel(ctx);
pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
DEBUG(2, "job %u compressed with level %u\n", currJob, ctx->compressionLevel); DEBUG(2, "job %u compressed with level %u\n", currJob, ctx->compressionLevel);
pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
/* compress the data */ /* compress the data */
{ {
size_t const compressionBlockSize = ZSTD_BLOCKSIZE_MAX; /* 128 KB */ size_t const compressionBlockSize = ZSTD_BLOCKSIZE_MAX; /* 128 KB */
unsigned const cLevel = ctx->compressionLevel; unsigned cLevel;
unsigned blockNum = 0; unsigned blockNum = 0;
size_t remaining = job->src.size; size_t remaining = job->src.size;
size_t srcPos = 0; size_t srcPos = 0;
size_t dstPos = 0; size_t dstPos = 0;
pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
cLevel = ctx->compressionLevel;
pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
/* reset compressed size */ /* reset compressed size */
job->compressedSize = 0; job->compressedSize = 0;
DEBUG(2, "calling ZSTD_compressBegin()\n"); DEBUG(2, "calling ZSTD_compressBegin()\n");
@ -712,7 +733,13 @@ static void* outputThread(void* arg)
} }
} }
} }
displayProgress(ctx->compressionLevel, job->lastJobPlusOne == currJob + 1); {
unsigned cLevel;
pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
cLevel = ctx->compressionLevel;
pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
displayProgress(cLevel, job->lastJobPlusOne == currJob + 1);
}
pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex); pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
ctx->jobWriteID++; ctx->jobWriteID++;
pthread_cond_signal(&ctx->jobWrite_cond.pCond); pthread_cond_signal(&ctx->jobWrite_cond.pCond);
@ -740,7 +767,6 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
jobDescription* const job = &ctx->jobs[nextJobIndex]; jobDescription* const job = &ctx->jobs[nextJobIndex];
job->compressionLevel = ctx->compressionLevel;
job->src.size = srcSize; job->src.size = srcSize;
job->jobID = nextJob; job->jobID = nextJob;
if (last) job->lastJobPlusOne = nextJob + 1; if (last) job->lastJobPlusOne = nextJob + 1;