diff --git a/contrib/adaptive-compression/adapt.c b/contrib/adaptive-compression/adapt.c index eef6c932..7cbf2c99 100644 --- a/contrib/adaptive-compression/adapt.c +++ b/contrib/adaptive-compression/adapt.c @@ -59,7 +59,6 @@ typedef struct { typedef struct { buffer_t src; buffer_t dst; - unsigned compressionLevel; unsigned jobID; unsigned lastJobPlusOne; size_t compressedSize; @@ -78,7 +77,6 @@ typedef struct { typedef struct { unsigned compressionLevel; - unsigned numActiveThreads; unsigned numJobs; unsigned nextJobID; unsigned threadError; @@ -141,6 +139,7 @@ typedef struct { mutex_t compressionCompletion_mutex; mutex_t createCompletion_mutex; mutex_t writeCompletion_mutex; + mutex_t compressionLevel_mutex; size_t lastDictSize; inBuff_t input; jobDescription* jobs; @@ -202,6 +201,7 @@ static int freeCCtx(adaptCCtx* ctx) error |= destroyMutex(&ctx->compressionCompletion_mutex); error |= destroyMutex(&ctx->createCompletion_mutex); error |= destroyMutex(&ctx->writeCompletion_mutex); + error |= destroyMutex(&ctx->compressionLevel_mutex); error |= ZSTD_isError(ZSTD_freeCCtx(ctx->cctx)); free(ctx->input.buffer.start); if (ctx->jobs){ @@ -243,6 +243,7 @@ static int initCCtx(adaptCCtx* ctx, unsigned numJobs) pthreadError |= initMutex(&ctx->compressionCompletion_mutex); pthreadError |= initMutex(&ctx->createCompletion_mutex); pthreadError |= initMutex(&ctx->writeCompletion_mutex); + pthreadError |= initMutex(&ctx->compressionLevel_mutex); if (pthreadError) return pthreadError; } ctx->numJobs = numJobs; @@ -384,16 +385,22 @@ static void adaptCompressionLevel(adaptCCtx* ctx) double compressWaitWriteCompletion; double writeWaitCompressionCompletion; double const threshold = 0.00001; - unsigned const prevCompressionLevel = ctx->compressionLevel; + unsigned prevCompressionLevel; + + pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex); + prevCompressionLevel = ctx->compressionLevel; + pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex); if (g_forceCompressionLevel) { + pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex); ctx->compressionLevel = g_compressionLevel; + pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex); return; } - DEBUG(2, "adapting compression level %u\n", ctx->compressionLevel); + DEBUG(2, "adapting compression level %u\n", prevCompressionLevel); /* read and reset completion measurements */ pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex); @@ -414,7 +421,7 @@ static void adaptCompressionLevel(adaptCCtx* ctx) pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex); DEBUG(2, "convergence counter: %u\n", ctx->convergenceCounter); - assert(g_minCLevel <= ctx->compressionLevel && g_maxCLevel >= ctx->compressionLevel); + assert(g_minCLevel <= prevCompressionLevel && g_maxCLevel >= prevCompressionLevel); /* adaptation logic */ if (ctx->cooldown) ctx->cooldown--; @@ -424,14 +431,16 @@ static void adaptCompressionLevel(adaptCCtx* ctx) /* use whichever one waited less because it was slower */ double const completion = MAX(createWaitCompressionCompletion, writeWaitCompressionCompletion); unsigned const change = convertCompletionToChange(completion); - unsigned const boundChange = MIN(change, ctx->compressionLevel - g_minCLevel); + unsigned const boundChange = MIN(change, prevCompressionLevel - g_minCLevel); if (ctx->convergenceCounter >= CONVERGENCE_LOWER_BOUND && boundChange != 0) { /* reset convergence counter, might have been a spike */ ctx->convergenceCounter = 0; DEBUG(2, "convergence counter reset, no change applied\n"); } else if (boundChange != 0) { + pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex); ctx->compressionLevel -= boundChange; + pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex); ctx->cooldown = CLEVEL_DECREASE_COOLDOWN; ctx->convergenceCounter = 1; @@ -442,14 +451,16 @@ static void adaptCompressionLevel(adaptCCtx* ctx) /* compress waiting on write */ double const completion = MIN(compressWaitWriteCompletion, compressWaitCreateCompletion); unsigned const change = convertCompletionToChange(completion); - unsigned const boundChange = MIN(change, g_maxCLevel - ctx->compressionLevel); + unsigned const boundChange = MIN(change, g_maxCLevel - prevCompressionLevel); if (ctx->convergenceCounter >= CONVERGENCE_LOWER_BOUND && boundChange != 0) { /* reset convergence counter, might have been a spike */ ctx->convergenceCounter = 0; DEBUG(2, "convergence counter reset, no change applied\n"); } else if (boundChange != 0) { + pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex); ctx->compressionLevel += boundChange; + pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex); ctx->cooldown = 0; ctx->convergenceCounter = 1; @@ -458,9 +469,11 @@ static void adaptCompressionLevel(adaptCCtx* ctx) } + pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex); if (ctx->compressionLevel == prevCompressionLevel) { ctx->convergenceCounter++; } + pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex); } static size_t getUseableDictSize(unsigned compressionLevel) @@ -540,15 +553,23 @@ static void* compressionThread(void* arg) /* adapt compression level */ if (currJob) adaptCompressionLevel(ctx); + pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex); DEBUG(2, "job %u compressed with level %u\n", currJob, ctx->compressionLevel); + pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex); + /* compress the data */ { size_t const compressionBlockSize = ZSTD_BLOCKSIZE_MAX; /* 128 KB */ - unsigned const cLevel = ctx->compressionLevel; + unsigned cLevel; unsigned blockNum = 0; size_t remaining = job->src.size; size_t srcPos = 0; size_t dstPos = 0; + + pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex); + cLevel = ctx->compressionLevel; + pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex); + /* reset compressed size */ job->compressedSize = 0; DEBUG(2, "calling ZSTD_compressBegin()\n"); @@ -712,7 +733,13 @@ static void* outputThread(void* arg) } } } - displayProgress(ctx->compressionLevel, job->lastJobPlusOne == currJob + 1); + { + unsigned cLevel; + pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex); + cLevel = ctx->compressionLevel; + pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex); + displayProgress(cLevel, job->lastJobPlusOne == currJob + 1); + } pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex); ctx->jobWriteID++; pthread_cond_signal(&ctx->jobWrite_cond.pCond); @@ -740,7 +767,6 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last) jobDescription* const job = &ctx->jobs[nextJobIndex]; - job->compressionLevel = ctx->compressionLevel; job->src.size = srcSize; job->jobID = nextJob; if (last) job->lastJobPlusOne = nextJob + 1;