From dcf609f835b0c0fb2499d13c8fa755d99808341a Mon Sep 17 00:00:00 2001 From: Paul Cruz Date: Wed, 19 Jul 2017 16:36:33 -0700 Subject: [PATCH] make adaptCompressionLevel oscillate less --- contrib/adaptive-compression/adapt.c | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/contrib/adaptive-compression/adapt.c b/contrib/adaptive-compression/adapt.c index cc72d155..ce92d914 100644 --- a/contrib/adaptive-compression/adapt.c +++ b/contrib/adaptive-compression/adapt.c @@ -358,8 +358,8 @@ static void adaptCompressionLevel(adaptCCtx* ctx) DEBUG(3, "write completion: %f, create completion: %f\n", ctx->writeCompletion, ctx->createCompletion); pthread_mutex_unlock(&ctx->completion_mutex.pMutex); { - unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE - 1)) + 1; - unsigned const change = writeSlow ? MIN(maxChange, ZSTD_maxCLevel() - ctx->compressionLevel) : 1; + unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE)); + unsigned const change = MIN(maxChange, ZSTD_maxCLevel() - ctx->compressionLevel); DEBUG(3, "writeSlow: %u, change: %u\n", writeSlow, change); DEBUG(3, "write completion: %f\n", completion); ctx->compressionLevel += change; @@ -372,7 +372,7 @@ static void adaptCompressionLevel(adaptCCtx* ctx) completion = ctx->compressionCompletion; pthread_mutex_unlock(&ctx->completion_mutex.pMutex); { - unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE-1)) + 1; + unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE)); unsigned const change = MIN(maxChange, ctx->compressionLevel - 1); DEBUG(3, "decreasing compression level %u\n", ctx->compressionLevel); DEBUG(3, "completion: %f\n", completion); @@ -388,10 +388,12 @@ static void adaptCompressionLevel(adaptCCtx* ctx) pthread_mutex_unlock(&ctx->stats_mutex.pMutex); pthread_mutex_lock(&ctx->completion_mutex.pMutex); - ctx->compressionCompletion = 1; + ctx->compressionCompletion = 0; ctx->compressionCompletionMeasured = 0; - ctx->writeCompletion = 1; + ctx->writeCompletion = 0; ctx->writeCompletionMeasured = 0; + ctx->createCompletion = 0; + ctx->createCompletionMeasured = 0; pthread_mutex_unlock(&ctx->completion_mutex.pMutex); } } @@ -423,6 +425,7 @@ static void* compressionThread(void* arg) reduceCounters(ctx); pthread_mutex_lock(&ctx->completion_mutex.pMutex); ctx->createCompletionMeasured = 1; + DEBUG(2, "create completion: %f\n", ctx->createCompletion); pthread_mutex_unlock(&ctx->completion_mutex.pMutex); DEBUG(3, "waiting on job ready, nextJob: %u\n", currJob); pthread_cond_wait(&ctx->jobReady_cond.pCond, &ctx->jobReady_mutex.pMutex); @@ -465,8 +468,8 @@ static void* compressionThread(void* arg) do { size_t const actualBlockSize = MIN(remaining, compressionBlockSize); - DEBUG(2, "remaining: %zu\n", remaining); - DEBUG(2, "actualBlockSize: %zu\n", actualBlockSize); + DEBUG(3, "remaining: %zu\n", remaining); + DEBUG(3, "actualBlockSize: %zu\n", actualBlockSize); /* continue compression */ if (currJob != 0 || blockNum != 0) { /* not first block of first job flush/overwrite the frame header */ @@ -480,9 +483,9 @@ static void* compressionThread(void* arg) ZSTD_invalidateRepCodes(ctx->cctx); } { - DEBUG(2, "write out ending: %d\n", job->lastJob && (remaining == actualBlockSize)); - DEBUG(2, "lastJob %u\n", job->lastJob); - DEBUG(2, "compressionBlockSize %zu\n", compressionBlockSize); + DEBUG(3, "write out ending: %d\n", job->lastJob && (remaining == actualBlockSize)); + DEBUG(3, "lastJob %u\n", job->lastJob); + DEBUG(3, "compressionBlockSize %zu\n", compressionBlockSize); size_t const ret = (job->lastJob && remaining == actualBlockSize) ? ZSTD_compressEnd (ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, actualBlockSize) : ZSTD_compressContinue(ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, actualBlockSize); @@ -560,6 +563,7 @@ static void* outputThread(void* arg) reduceCounters(ctx); pthread_mutex_lock(&ctx->completion_mutex.pMutex); ctx->compressionCompletionMeasured = 1; + DEBUG(2, "compressionCompletion %f\n", ctx->compressionCompletion); pthread_mutex_unlock(&ctx->completion_mutex.pMutex); DEBUG(3, "waiting on job compressed, nextJob: %u\n", currJob); pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex); @@ -576,7 +580,7 @@ static void* outputThread(void* arg) } { // size_t const writeSize = fwrite(job->dst.start, 1, compressedSize, dstFile); - size_t const blockSize = 4 << 20; + size_t const blockSize = 64 << 10; /* 64 KB */ size_t pos = 0; for ( ; ; ) { size_t const writeSize = MIN(remaining, blockSize); @@ -640,6 +644,7 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last) reduceCounters(ctx); pthread_mutex_lock(&ctx->completion_mutex.pMutex); ctx->writeCompletionMeasured = 1; + DEBUG(2, "writeCompletion: %f\n", ctx->writeCompletion); pthread_mutex_unlock(&ctx->completion_mutex.pMutex); DEBUG(3, "waiting on job Write, nextJob: %u\n", nextJob); pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex);