From 7ab758a640629665ba43f4f8ce5fa38c9260b1ba Mon Sep 17 00:00:00 2001 From: Paul Cruz Date: Thu, 20 Jul 2017 10:53:51 -0700 Subject: [PATCH] changed how completion is actually sampled --- contrib/adaptive-compression/adapt.c | 65 ++++++++++++---------------- 1 file changed, 27 insertions(+), 38 deletions(-) diff --git a/contrib/adaptive-compression/adapt.c b/contrib/adaptive-compression/adapt.c index ce92d914..cf232ca7 100644 --- a/contrib/adaptive-compression/adapt.c +++ b/contrib/adaptive-compression/adapt.c @@ -24,8 +24,8 @@ #define MAX_PATH 256 #define DEFAULT_DISPLAY_LEVEL 1 #define DEFAULT_COMPRESSION_LEVEL 6 -#define DEFAULT_ADAPT_PARAM 1 -#define MAX_COMPRESSION_LEVEL_CHANGE 3 +#define DEFAULT_ADAPT_PARAM 0 +#define MAX_COMPRESSION_LEVEL_CHANGE 4 static int g_displayLevel = DEFAULT_DISPLAY_LEVEL; static unsigned g_compressionLevel = DEFAULT_COMPRESSION_LEVEL; @@ -87,9 +87,9 @@ typedef struct { unsigned jobWriteID; unsigned allJobsCompleted; unsigned adaptParam; - unsigned compressionCompletionMeasured; - unsigned writeCompletionMeasured; - unsigned createCompletionMeasured; + double compressionCompletionMeasured; + double writeCompletionMeasured; + double createCompletionMeasured; double compressionCompletion; double writeCompletion; double createCompletion; @@ -342,6 +342,9 @@ static void adaptCompressionLevel(adaptCCtx* ctx) writeWaiting = ctx->adaptParam < ctx->stats.compressedCounter; createWaiting = ctx->adaptParam < ctx->stats.writeCounter; pthread_mutex_unlock(&ctx->stats_mutex.pMutex); + DEBUG(2, "createWaiting %u\n", createWaiting); + DEBUG(2, "compressWaiting %u\n", compressWaiting); + DEBUG(2, "writeWaiting %u\n\n", writeWaiting); { unsigned const writeSlow = (compressWaiting && createWaiting); unsigned const compressSlow = (writeWaiting && createWaiting); @@ -351,14 +354,14 @@ static void adaptCompressionLevel(adaptCCtx* ctx) reset = 1; } else if ((writeSlow || createSlow) && ctx->compressionLevel < (unsigned)ZSTD_maxCLevel()) { - DEBUG(3, "increasing compression level %u\n", ctx->compressionLevel); + DEBUG(2, "increasing compression level %u\n", ctx->compressionLevel); double completion; pthread_mutex_lock(&ctx->completion_mutex.pMutex); - completion = writeSlow ? ctx->writeCompletion : ctx->createCompletion; - DEBUG(3, "write completion: %f, create completion: %f\n", ctx->writeCompletion, ctx->createCompletion); + completion = writeSlow ? ctx->writeCompletionMeasured : ctx->createCompletionMeasured; + DEBUG(2, "write completion: %f, create completion: %f\n", ctx->writeCompletionMeasured, ctx->createCompletionMeasured); pthread_mutex_unlock(&ctx->completion_mutex.pMutex); { - unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE)); + unsigned const maxChange = MAX_COMPRESSION_LEVEL_CHANGE - (unsigned)(completion*MAX_COMPRESSION_LEVEL_CHANGE); unsigned const change = MIN(maxChange, ZSTD_maxCLevel() - ctx->compressionLevel); DEBUG(3, "writeSlow: %u, change: %u\n", writeSlow, change); DEBUG(3, "write completion: %f\n", completion); @@ -369,13 +372,13 @@ static void adaptCompressionLevel(adaptCCtx* ctx) else if (compressSlow && ctx->compressionLevel > 1) { double completion; pthread_mutex_lock(&ctx->completion_mutex.pMutex); - completion = ctx->compressionCompletion; + completion = ctx->compressionCompletionMeasured; pthread_mutex_unlock(&ctx->completion_mutex.pMutex); { - unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE)); + unsigned const maxChange = MAX_COMPRESSION_LEVEL_CHANGE - (unsigned)(completion*MAX_COMPRESSION_LEVEL_CHANGE); unsigned const change = MIN(maxChange, ctx->compressionLevel - 1); - DEBUG(3, "decreasing compression level %u\n", ctx->compressionLevel); - DEBUG(3, "completion: %f\n", completion); + DEBUG(2, "decreasing compression level %u\n", ctx->compressionLevel); + DEBUG(2, "completion: %f\n", completion); ctx->compressionLevel -= change; reset = 1; } @@ -386,15 +389,6 @@ static void adaptCompressionLevel(adaptCCtx* ctx) ctx->stats.writeCounter = 0; ctx->stats.compressedCounter = 0; pthread_mutex_unlock(&ctx->stats_mutex.pMutex); - - pthread_mutex_lock(&ctx->completion_mutex.pMutex); - ctx->compressionCompletion = 0; - ctx->compressionCompletionMeasured = 0; - ctx->writeCompletion = 0; - ctx->writeCompletionMeasured = 0; - ctx->createCompletion = 0; - ctx->createCompletionMeasured = 0; - pthread_mutex_unlock(&ctx->completion_mutex.pMutex); } } } @@ -424,8 +418,8 @@ static void* compressionThread(void* arg) pthread_mutex_unlock(&ctx->stats_mutex.pMutex); reduceCounters(ctx); pthread_mutex_lock(&ctx->completion_mutex.pMutex); - ctx->createCompletionMeasured = 1; - DEBUG(2, "create completion: %f\n", ctx->createCompletion); + ctx->createCompletionMeasured = ctx->createCompletion; + DEBUG(3, "create completion: %f\n", ctx->createCompletion); pthread_mutex_unlock(&ctx->completion_mutex.pMutex); DEBUG(3, "waiting on job ready, nextJob: %u\n", currJob); pthread_cond_wait(&ctx->jobReady_cond.pCond, &ctx->jobReady_mutex.pMutex); @@ -502,9 +496,8 @@ static void* compressionThread(void* arg) /* update completion */ pthread_mutex_lock(&ctx->completion_mutex.pMutex); - if (!ctx->compressionCompletionMeasured) { - ctx->compressionCompletion = 1 - (double)remaining/job->src.size; - } + ctx->compressionCompletion = 1 - (double)remaining/job->src.size; + DEBUG(2, "update on job %u: compression completion %f\n", currJob, ctx->compressionCompletion); pthread_mutex_unlock(&ctx->completion_mutex.pMutex); } } while (remaining != 0); @@ -562,8 +555,8 @@ static void* outputThread(void* arg) pthread_mutex_unlock(&ctx->stats_mutex.pMutex); reduceCounters(ctx); pthread_mutex_lock(&ctx->completion_mutex.pMutex); - ctx->compressionCompletionMeasured = 1; - DEBUG(2, "compressionCompletion %f\n", ctx->compressionCompletion); + ctx->compressionCompletionMeasured = ctx->compressionCompletion; + DEBUG(2, "waited on job %u: compressionCompletion %f\n", currJob, ctx->compressionCompletion); pthread_mutex_unlock(&ctx->completion_mutex.pMutex); DEBUG(3, "waiting on job compressed, nextJob: %u\n", currJob); pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex); @@ -580,7 +573,7 @@ static void* outputThread(void* arg) } { // size_t const writeSize = fwrite(job->dst.start, 1, compressedSize, dstFile); - size_t const blockSize = 64 << 10; /* 64 KB */ + size_t const blockSize = compressedSize >> 7; size_t pos = 0; for ( ; ; ) { size_t const writeSize = MIN(remaining, blockSize); @@ -591,9 +584,7 @@ static void* outputThread(void* arg) /* update completion variable for writing */ pthread_mutex_lock(&ctx->completion_mutex.pMutex); - if (!ctx->writeCompletionMeasured) { - ctx->writeCompletion = 1 - (double)remaining/compressedSize; - } + ctx->writeCompletion = 1 - (double)remaining/compressedSize; pthread_mutex_unlock(&ctx->completion_mutex.pMutex); if (remaining == 0) break; @@ -643,8 +634,8 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last) pthread_mutex_unlock(&ctx->stats_mutex.pMutex); reduceCounters(ctx); pthread_mutex_lock(&ctx->completion_mutex.pMutex); - ctx->writeCompletionMeasured = 1; - DEBUG(2, "writeCompletion: %f\n", ctx->writeCompletion); + ctx->writeCompletionMeasured = ctx->writeCompletion; + DEBUG(3, "writeCompletion: %f\n", ctx->writeCompletion); pthread_mutex_unlock(&ctx->completion_mutex.pMutex); DEBUG(3, "waiting on job Write, nextJob: %u\n", nextJob); pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex); @@ -736,9 +727,7 @@ static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadA pos += ret; remaining -= ret; pthread_mutex_lock(&ctx->completion_mutex.pMutex); - if (!ctx->createCompletionMeasured) { - ctx->createCompletion = 1 - (double)remaining/((size_t)FILE_CHUNK_SIZE); - } + ctx->createCompletion = 1 - (double)remaining/((size_t)FILE_CHUNK_SIZE); DEBUG(3, "create completion: %f\n", ctx->createCompletion); pthread_mutex_unlock(&ctx->completion_mutex.pMutex); }