taking the maximum of the completion level reads in order to determine which one was waiting more

This commit is contained in:
Paul Cruz 2017-07-21 16:05:01 -07:00
parent 05fe8dd47c
commit 6455ec482c

View File

@ -354,23 +354,29 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
if (1-createWaitCompressionCompletion > threshold && 1-writeWaitCompressionCompletion > threshold) { if (1-createWaitCompressionCompletion > threshold && 1-writeWaitCompressionCompletion > threshold) {
/* both create and write threads waiting on compression */ /* both create and write threads waiting on compression */
/* use writeWaitCompressionCompletion */ /* use writeWaitCompressionCompletion */
unsigned const change = (unsigned)((1-writeWaitCompressionCompletion) * MAX_COMPRESSION_LEVEL_CHANGE); double const completion = MAX(createWaitCompressionCompletion, writeWaitCompressionCompletion);
unsigned const change = (unsigned)((1-completion) * MAX_COMPRESSION_LEVEL_CHANGE);
unsigned const boundChange = MIN(change, ctx->compressionLevel - 1); unsigned const boundChange = MIN(change, ctx->compressionLevel - 1);
ctx->compressionLevel -= boundChange; ctx->compressionLevel -= boundChange;
DEBUG(2, "create and write threads waiting, tried to decrease compression level by %u\n", boundChange);
} }
else if (1-createWaitWriteCompletion > threshold && 1-compressWaitWriteCompletion > threshold) { else if (1-createWaitWriteCompletion > threshold && 1-compressWaitWriteCompletion > threshold) {
/* both create and compression thread waiting on write */ /* both create and compression thread waiting on write */
/* use createWaitWriteCompletion */ /* use createWaitWriteCompletion */
unsigned const change = (unsigned)((1-createWaitWriteCompletion) * MAX_COMPRESSION_LEVEL_CHANGE); double const completion = MAX(createWaitWriteCompletion, compressWaitWriteCompletion);
unsigned const change = (unsigned)((1-completion) * MAX_COMPRESSION_LEVEL_CHANGE);
unsigned const boundChange = MIN(change, ZSTD_maxCLevel() - ctx->compressionLevel); unsigned const boundChange = MIN(change, ZSTD_maxCLevel() - ctx->compressionLevel);
ctx->compressionLevel += boundChange; ctx->compressionLevel += boundChange;
DEBUG(2, "create and compression threads waiting, tried to increase compression level by %u\n", boundChange);
} }
else if (1-writeWaitCreateCompletion > threshold && 1-compressWaitCreateCompletion > threshold) { else if (1-writeWaitCreateCompletion > threshold && 1-compressWaitCreateCompletion > threshold) {
/* both compression and write waiting on create */ /* both compression and write waiting on create */
/* use compressWaitCreateCompletion */ /* use compressWaitCreateCompletion */
unsigned const change = (unsigned)((1-compressWaitCreateCompletion) * MAX_COMPRESSION_LEVEL_CHANGE); double const completion = MAX(writeWaitCreateCompletion, compressWaitCreateCompletion);
unsigned const change = (unsigned)((1-completion) * MAX_COMPRESSION_LEVEL_CHANGE);
unsigned const boundChange = MIN(change, ZSTD_maxCLevel() - ctx->compressionLevel); unsigned const boundChange = MIN(change, ZSTD_maxCLevel() - ctx->compressionLevel);
ctx->compressionLevel += boundChange; ctx->compressionLevel += boundChange;
DEBUG(2, "compression and write threads waiting, tried to increase compression level by %u\n", boundChange);
} }
if (g_forceCompressionLevel) { if (g_forceCompressionLevel) {
@ -404,8 +410,8 @@ static void* compressionThread(void* arg)
ctx->compressWaitWriteCompletion = ctx->writeCompletion; ctx->compressWaitWriteCompletion = ctx->writeCompletion;
DEBUG(3, "compression thread waiting : compressWaitCreateCompletion %f : compressWaitWriteCompletion %f\n", ctx->compressWaitCreateCompletion, ctx->compressWaitWriteCompletion); DEBUG(3, "compression thread waiting : compressWaitCreateCompletion %f : compressWaitWriteCompletion %f\n", ctx->compressWaitCreateCompletion, ctx->compressWaitWriteCompletion);
DEBUG(3, "create completion: %f\n", ctx->createCompletion); DEBUG(3, "create completion: %f\n", ctx->createCompletion);
DEBUG(2, "compression thread waiting for nextJob: %u, compressWaitCreateCompletion %f, compressWaitWriteCompletion %f\n", currJob, ctx->compressWaitCreateCompletion, ctx->compressWaitWriteCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
DEBUG(2, "waiting on job ready, nextJob: %u\n", currJob);
pthread_cond_wait(&ctx->jobReady_cond.pCond, &ctx->jobReady_mutex.pMutex); pthread_cond_wait(&ctx->jobReady_cond.pCond, &ctx->jobReady_mutex.pMutex);
} }
pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex); pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
@ -422,6 +428,7 @@ static void* compressionThread(void* arg)
/* adapt compression level */ /* adapt compression level */
if (currJob) adaptCompressionLevel(ctx); if (currJob) adaptCompressionLevel(ctx);
DEBUG(2, "job %u compressed with level %u\n", currJob, ctx->compressionLevel);
/* compress the data */ /* compress the data */
{ {
size_t const compressionBlockSize = 1 << 17; /* 128 KB */ size_t const compressionBlockSize = 1 << 17; /* 128 KB */
@ -487,7 +494,7 @@ static void* compressionThread(void* arg)
/* update completion */ /* update completion */
pthread_mutex_lock(&ctx->completion_mutex.pMutex); pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->compressionCompletion = 1 - (double)remaining/job->src.size; ctx->compressionCompletion = 1 - (double)remaining/job->src.size;
DEBUG(3, "compression completion %f\n", ctx->compressionCompletion); DEBUG(2, "compression completion %f\n", ctx->compressionCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
} }
} while (remaining != 0); } while (remaining != 0);
@ -545,8 +552,8 @@ static void* outputThread(void* arg)
ctx->writeWaitCompressionCompletion = ctx->compressionCompletion; ctx->writeWaitCompressionCompletion = ctx->compressionCompletion;
ctx->writeWaitCreateCompletion = ctx->createCompletion; ctx->writeWaitCreateCompletion = ctx->createCompletion;
DEBUG(3, "write thread waiting : writeWaitCreateCompletion %f : writeWaitCompressionCompletion %f\n", ctx->writeWaitCreateCompletion, ctx->writeWaitCompressionCompletion); DEBUG(3, "write thread waiting : writeWaitCreateCompletion %f : writeWaitCompressionCompletion %f\n", ctx->writeWaitCreateCompletion, ctx->writeWaitCompressionCompletion);
DEBUG(2, "writer thread waiting for nextJob: %u, writeWaitCompressionCompletion %f, writeWaitCreateCompletion %f\n", currJob, ctx->writeWaitCompressionCompletion, ctx->writeWaitCreateCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
DEBUG(2, "waiting on job compressed, nextJob: %u\n", currJob);
pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex); pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
} }
pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex); pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
@ -579,7 +586,7 @@ static void* outputThread(void* arg)
/* update completion variable for writing */ /* update completion variable for writing */
pthread_mutex_lock(&ctx->completion_mutex.pMutex); pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->writeCompletion = 1 - (double)remaining/compressedSize; ctx->writeCompletion = 1 - (double)remaining/compressedSize;
DEBUG(3, "write completion %f\n", ctx->writeCompletion); DEBUG(2, "write completion %f\n", ctx->writeCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
if (remaining == 0) break; if (remaining == 0) break;
@ -630,8 +637,8 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
ctx->createWaitWriteCompletion = ctx->writeCompletion; ctx->createWaitWriteCompletion = ctx->writeCompletion;
DEBUG(3, "creation thread waiting : createWaitCompressionCompletion %f : createWaitWriteCompletion %f\n", ctx->createWaitCompressionCompletion, ctx->createWaitWriteCompletion); DEBUG(3, "creation thread waiting : createWaitCompressionCompletion %f : createWaitWriteCompletion %f\n", ctx->createWaitCompressionCompletion, ctx->createWaitWriteCompletion);
DEBUG(3, "writeCompletion: %f\n", ctx->writeCompletion); DEBUG(3, "writeCompletion: %f\n", ctx->writeCompletion);
DEBUG(2, "create thread waiting for nextJob: %u, createWaitCompressionCompletion %f, createWaitWriteCompletion %f\n", nextJob, ctx->createWaitCompressionCompletion, ctx->createWaitWriteCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
DEBUG(2, "waiting on job Write, nextJob: %u\n", nextJob);
pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex); pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex);
} }
pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex); pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
@ -721,7 +728,7 @@ static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadA
remaining -= ret; remaining -= ret;
pthread_mutex_lock(&ctx->completion_mutex.pMutex); pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->createCompletion = 1 - (double)remaining/((size_t)FILE_CHUNK_SIZE); ctx->createCompletion = 1 - (double)remaining/((size_t)FILE_CHUNK_SIZE);
DEBUG(3, "create completion: %f\n", ctx->createCompletion); DEBUG(2, "create completion: %f\n", ctx->createCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
} }
if (remaining != 0 && !feof(srcFile)) { if (remaining != 0 && !feof(srcFile)) {