make adaptCompressionLevel oscillate less

dev
Paul Cruz 2017-07-19 16:36:33 -07:00
parent 2a22c7915e
commit dcf609f835
1 changed files with 16 additions and 11 deletions

View File

@ -358,8 +358,8 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
DEBUG(3, "write completion: %f, create completion: %f\n", ctx->writeCompletion, ctx->createCompletion); DEBUG(3, "write completion: %f, create completion: %f\n", ctx->writeCompletion, ctx->createCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
{ {
unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE - 1)) + 1; unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE));
unsigned const change = writeSlow ? MIN(maxChange, ZSTD_maxCLevel() - ctx->compressionLevel) : 1; unsigned const change = MIN(maxChange, ZSTD_maxCLevel() - ctx->compressionLevel);
DEBUG(3, "writeSlow: %u, change: %u\n", writeSlow, change); DEBUG(3, "writeSlow: %u, change: %u\n", writeSlow, change);
DEBUG(3, "write completion: %f\n", completion); DEBUG(3, "write completion: %f\n", completion);
ctx->compressionLevel += change; ctx->compressionLevel += change;
@ -372,7 +372,7 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
completion = ctx->compressionCompletion; completion = ctx->compressionCompletion;
pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
{ {
unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE-1)) + 1; unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE));
unsigned const change = MIN(maxChange, ctx->compressionLevel - 1); unsigned const change = MIN(maxChange, ctx->compressionLevel - 1);
DEBUG(3, "decreasing compression level %u\n", ctx->compressionLevel); DEBUG(3, "decreasing compression level %u\n", ctx->compressionLevel);
DEBUG(3, "completion: %f\n", completion); DEBUG(3, "completion: %f\n", completion);
@ -388,10 +388,12 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
pthread_mutex_unlock(&ctx->stats_mutex.pMutex); pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
pthread_mutex_lock(&ctx->completion_mutex.pMutex); pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->compressionCompletion = 1; ctx->compressionCompletion = 0;
ctx->compressionCompletionMeasured = 0; ctx->compressionCompletionMeasured = 0;
ctx->writeCompletion = 1; ctx->writeCompletion = 0;
ctx->writeCompletionMeasured = 0; ctx->writeCompletionMeasured = 0;
ctx->createCompletion = 0;
ctx->createCompletionMeasured = 0;
pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
} }
} }
@ -423,6 +425,7 @@ static void* compressionThread(void* arg)
reduceCounters(ctx); reduceCounters(ctx);
pthread_mutex_lock(&ctx->completion_mutex.pMutex); pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->createCompletionMeasured = 1; ctx->createCompletionMeasured = 1;
DEBUG(2, "create completion: %f\n", ctx->createCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
DEBUG(3, "waiting on job ready, nextJob: %u\n", currJob); DEBUG(3, "waiting on job ready, nextJob: %u\n", currJob);
pthread_cond_wait(&ctx->jobReady_cond.pCond, &ctx->jobReady_mutex.pMutex); pthread_cond_wait(&ctx->jobReady_cond.pCond, &ctx->jobReady_mutex.pMutex);
@ -465,8 +468,8 @@ static void* compressionThread(void* arg)
do { do {
size_t const actualBlockSize = MIN(remaining, compressionBlockSize); size_t const actualBlockSize = MIN(remaining, compressionBlockSize);
DEBUG(2, "remaining: %zu\n", remaining); DEBUG(3, "remaining: %zu\n", remaining);
DEBUG(2, "actualBlockSize: %zu\n", actualBlockSize); DEBUG(3, "actualBlockSize: %zu\n", actualBlockSize);
/* continue compression */ /* continue compression */
if (currJob != 0 || blockNum != 0) { /* not first block of first job flush/overwrite the frame header */ if (currJob != 0 || blockNum != 0) { /* not first block of first job flush/overwrite the frame header */
@ -480,9 +483,9 @@ static void* compressionThread(void* arg)
ZSTD_invalidateRepCodes(ctx->cctx); ZSTD_invalidateRepCodes(ctx->cctx);
} }
{ {
DEBUG(2, "write out ending: %d\n", job->lastJob && (remaining == actualBlockSize)); DEBUG(3, "write out ending: %d\n", job->lastJob && (remaining == actualBlockSize));
DEBUG(2, "lastJob %u\n", job->lastJob); DEBUG(3, "lastJob %u\n", job->lastJob);
DEBUG(2, "compressionBlockSize %zu\n", compressionBlockSize); DEBUG(3, "compressionBlockSize %zu\n", compressionBlockSize);
size_t const ret = (job->lastJob && remaining == actualBlockSize) ? size_t const ret = (job->lastJob && remaining == actualBlockSize) ?
ZSTD_compressEnd (ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, actualBlockSize) : ZSTD_compressEnd (ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, actualBlockSize) :
ZSTD_compressContinue(ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, actualBlockSize); ZSTD_compressContinue(ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, actualBlockSize);
@ -560,6 +563,7 @@ static void* outputThread(void* arg)
reduceCounters(ctx); reduceCounters(ctx);
pthread_mutex_lock(&ctx->completion_mutex.pMutex); pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->compressionCompletionMeasured = 1; ctx->compressionCompletionMeasured = 1;
DEBUG(2, "compressionCompletion %f\n", ctx->compressionCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
DEBUG(3, "waiting on job compressed, nextJob: %u\n", currJob); DEBUG(3, "waiting on job compressed, nextJob: %u\n", currJob);
pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex); pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
@ -576,7 +580,7 @@ static void* outputThread(void* arg)
} }
{ {
// size_t const writeSize = fwrite(job->dst.start, 1, compressedSize, dstFile); // size_t const writeSize = fwrite(job->dst.start, 1, compressedSize, dstFile);
size_t const blockSize = 4 << 20; size_t const blockSize = 64 << 10; /* 64 KB */
size_t pos = 0; size_t pos = 0;
for ( ; ; ) { for ( ; ; ) {
size_t const writeSize = MIN(remaining, blockSize); size_t const writeSize = MIN(remaining, blockSize);
@ -640,6 +644,7 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
reduceCounters(ctx); reduceCounters(ctx);
pthread_mutex_lock(&ctx->completion_mutex.pMutex); pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->writeCompletionMeasured = 1; ctx->writeCompletionMeasured = 1;
DEBUG(2, "writeCompletion: %f\n", ctx->writeCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
DEBUG(3, "waiting on job Write, nextJob: %u\n", nextJob); DEBUG(3, "waiting on job Write, nextJob: %u\n", nextJob);
pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex); pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex);