diff --git a/contrib/adaptive-compression/adapt.c b/contrib/adaptive-compression/adapt.c index 01a73a66..62f5ec91 100644 --- a/contrib/adaptive-compression/adapt.c +++ b/contrib/adaptive-compression/adapt.c @@ -25,7 +25,7 @@ #define DEFAULT_DISPLAY_LEVEL 1 #define DEFAULT_COMPRESSION_LEVEL 6 #define DEFAULT_ADAPT_PARAM 1 -#define MAX_COMPRESSION_LEVEL_CHANGE 10 +#define MAX_COMPRESSION_LEVEL_CHANGE 3 static int g_displayLevel = DEFAULT_DISPLAY_LEVEL; static unsigned g_compressionLevel = DEFAULT_COMPRESSION_LEVEL; @@ -277,6 +277,15 @@ static void waitUntilAllJobsCompleted(adaptCCtx* ctx) pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex); } +/* this function normalizes counters when compression level is changing */ +static void reduceCounters(adaptCCtx* ctx) +{ + unsigned const min = MIN(ctx->stats.compressedCounter, MIN(ctx->stats.writeCounter, ctx->stats.readyCounter)); + ctx->stats.writeCounter -= min; + ctx->stats.compressedCounter -= min; + ctx->stats.readyCounter -= min; +} + /* * Compression level is changed depending on which part of the compression process is lagging * Currently, three theads exist for job creation, compression, and file writing respectively. @@ -285,10 +294,10 @@ static void waitUntilAllJobsCompleted(adaptCCtx* ctx) * compression thread lag => decreased compression level * detecting which thread is lagging is done by keeping track of how many calls each thread makes to pthread_cond_wait */ -static unsigned adaptCompressionLevel(adaptCCtx* ctx) +static void adaptCompressionLevel(adaptCCtx* ctx) { if (g_forceCompressionLevel) { - return g_compressionLevel; + ctx->compressionLevel = g_compressionLevel; } else { unsigned reset = 0; @@ -296,10 +305,11 @@ static unsigned adaptCompressionLevel(adaptCCtx* ctx) unsigned const compressWaiting = ctx->adaptParam < ctx->stats.readyCounter; unsigned const writeWaiting = ctx->adaptParam < ctx->stats.compressedCounter; unsigned const createWaiting = ctx->adaptParam < ctx->stats.writeCounter; - unsigned const writeSlow = ((compressWaiting && createWaiting) || (createWaiting && !writeWaiting)); - unsigned const compressSlow = ((writeWaiting && createWaiting) || (writeWaiting && !compressWaiting)); - unsigned const createSlow = ((compressWaiting && writeWaiting) || (compressWaiting && !createWaiting)); - DEBUG(3, "ready: %u compressed: %u write: %u\n", ctx->stats.readyCounter, ctx->stats.compressedCounter, ctx->stats.writeCounter); + unsigned const writeSlow = (compressWaiting && createWaiting); + unsigned const compressSlow = (writeWaiting && createWaiting); + unsigned const createSlow = (compressWaiting && writeWaiting); + DEBUG(2, "createWaiting: %u, compressWaiting: %u, writeWaiting: %u\n", createWaiting, compressWaiting, writeWaiting); + DEBUG(2, "ready: %u compressed: %u write: %u\n", ctx->stats.readyCounter, ctx->stats.compressedCounter, ctx->stats.writeCounter); if (allSlow) { reset = 1; } @@ -310,10 +320,10 @@ static unsigned adaptCompressionLevel(adaptCCtx* ctx) } else if (compressSlow && ctx->compressionLevel > 1) { double const completion = ctx->completion; - unsigned const maxChange = (unsigned)((1-completion) * MAX_COMPRESSION_LEVEL_CHANGE); + unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE-1)) + 1; unsigned const change = MIN(maxChange, ctx->compressionLevel - 1); DEBUG(3, "decreasing compression level %u\n", ctx->compressionLevel); - DEBUG(2, "completion: %f\n", completion); + DEBUG(3, "completion: %f\n", completion); ctx->compressionLevel -= change; reset = 1; } @@ -324,7 +334,6 @@ static unsigned adaptCompressionLevel(adaptCCtx* ctx) ctx->completion = 1; ctx->completionMeasured = 0; } - return ctx->compressionLevel; } } @@ -348,6 +357,8 @@ static void* compressionThread(void* arg) while(currJob + 1 > ctx->jobReadyID && !ctx->threadError) { ctx->stats.waitReady++; ctx->stats.readyCounter++; + reduceCounters(ctx); + adaptCompressionLevel(ctx); DEBUG(3, "waiting on job ready, nextJob: %u\n", currJob); pthread_cond_wait(&ctx->jobReady_cond.pCond, &ctx->jobReady_mutex.pMutex); } @@ -357,13 +368,13 @@ static void* compressionThread(void* arg) DEBUG(3, "%.*s", (int)job->src.size, (char*)job->src.start); /* compress the data */ { - unsigned const cLevel = adaptCompressionLevel(ctx); + unsigned const cLevel = ctx->compressionLevel; DEBUG(3, "cLevel used: %u\n", cLevel); DEBUG(3, "compression level used: %u\n", cLevel); /* begin compression */ { size_t const useDictSize = MIN(getUseableDictSize(cLevel), job->dictSize); - DEBUG(2, "useDictSize: %zu, job->dictSize: %zu\n", useDictSize, job->dictSize); + DEBUG(3, "useDictSize: %zu, job->dictSize: %zu\n", useDictSize, job->dictSize); size_t const dictModeError = ZSTD_setCCtxParameter(ctx->cctx, ZSTD_p_forceRawDict, 1); size_t const initError = ZSTD_compressBegin_usingDict(ctx->cctx, job->src.start + job->dictSize - useDictSize, useDictSize, cLevel); size_t const windowSizeError = ZSTD_setCCtxParameter(ctx->cctx, ZSTD_p_forceWindow, 1); @@ -443,11 +454,13 @@ static void* outputThread(void* arg) while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) { ctx->stats.waitCompressed++; ctx->stats.compressedCounter++; + reduceCounters(ctx); if (!ctx->completionMeasured) { ctx->completion = ZSTD_getCompletion(ctx->cctx); ctx->completionMeasured = 1; } - DEBUG(2, "output detected completion: %f\n", ctx->completion); + adaptCompressionLevel(ctx); + DEBUG(3, "output detected completion: %f\n", ctx->completion); DEBUG(3, "waiting on job compressed, nextJob: %u\n", currJob); pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex); } @@ -503,11 +516,13 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last) while (nextJob - ctx->jobWriteID >= ctx->numJobs && !ctx->threadError) { ctx->stats.waitWrite++; ctx->stats.writeCounter++; + reduceCounters(ctx); if (!ctx->completionMeasured) { ctx->completion = ZSTD_getCompletion(ctx->cctx); ctx->completionMeasured = 1; } - DEBUG(2, "job creation detected completion %f\n", ctx->completion); + adaptCompressionLevel(ctx); + DEBUG(3, "job creation detected completion %f\n", ctx->completion); DEBUG(3, "waiting on job Write, nextJob: %u\n", nextJob); pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex); }