From 880f08d1049ee320aa54673b99c9ec0e57c5e397 Mon Sep 17 00:00:00 2001 From: Paul Cruz Date: Sun, 23 Jul 2017 10:18:54 -0700 Subject: [PATCH] change how completion is measured in compression thread --- contrib/adaptive-compression/adapt.c | 48 ++++++++++++++++++---------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/contrib/adaptive-compression/adapt.c b/contrib/adaptive-compression/adapt.c index 68727cbd..24842145 100644 --- a/contrib/adaptive-compression/adapt.c +++ b/contrib/adaptive-compression/adapt.c @@ -25,7 +25,7 @@ #define DEFAULT_DISPLAY_LEVEL 1 #define DEFAULT_COMPRESSION_LEVEL 6 #define DEFAULT_ADAPT_PARAM 0 -#define MAX_COMPRESSION_LEVEL_CHANGE 3 +#define MAX_COMPRESSION_LEVEL_CHANGE 2 static int g_displayLevel = DEFAULT_DISPLAY_LEVEL; static unsigned g_compressionLevel = DEFAULT_COMPRESSION_LEVEL; @@ -207,6 +207,9 @@ static int initCCtx(adaptCCtx* ctx, unsigned numJobs) ctx->compressWaitCreateCompletion = 1; ctx->compressWaitWriteCompletion = 1; ctx->writeWaitCompressionCompletion = 1; + ctx->createCompletion = 1; + ctx->writeCompletion = 1; + ctx->compressionCompletion = 1; ctx->jobs = calloc(1, numJobs*sizeof(jobDescription)); @@ -387,16 +390,34 @@ static void* compressionThread(void* arg) unsigned const currJobIndex = currJob % ctx->numJobs; jobDescription* job = &ctx->jobs[currJobIndex]; DEBUG(3, "compressionThread(): waiting on job ready\n"); + + { + /* check if compression thread will have to wait */ + unsigned willWaitForCreate = 0; + unsigned willWaitForWrite = 0; + + pthread_mutex_lock(&ctx->jobReady_mutex.pMutex); + if (currJob + 1 > ctx->jobReadyID) willWaitForCreate = 1; + pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex); + + pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex); + if (currJob - ctx->jobWriteID >= ctx->numJobs) willWaitForWrite = 1; + pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex); + + pthread_mutex_lock(&ctx->completion_mutex.pMutex); + if (willWaitForCreate || willWaitForWrite) { + ctx->compressWaitCreateCompletion = ctx->createCompletion; + ctx->compressWaitWriteCompletion = ctx->writeCompletion; + DEBUG(2, "compression will wait for create or write\n"); + DEBUG(2, "create completion %f\n", ctx->compressWaitCreateCompletion); + DEBUG(2, "write completion %f\n", ctx->compressWaitWriteCompletion); + } + pthread_mutex_unlock(&ctx->completion_mutex.pMutex); + } + /* wait until job is ready */ pthread_mutex_lock(&ctx->jobReady_mutex.pMutex); while (currJob + 1 > ctx->jobReadyID && !ctx->threadError) { - pthread_mutex_lock(&ctx->completion_mutex.pMutex); - /* compression thread is waiting on creation thread, take measurement */ - ctx->compressWaitCreateCompletion = ctx->createCompletion; - DEBUG(3, "compression thread waiting : compressWaitCreateCompletion %f\n", ctx->compressWaitCreateCompletion); - DEBUG(3, "create completion: %f\n", ctx->createCompletion); - DEBUG(2, "compression thread waiting for ready: %u, compressWaitCreateCompletion %f\n", currJob, ctx->compressWaitCreateCompletion); - pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_cond_wait(&ctx->jobReady_cond.pCond, &ctx->jobReady_mutex.pMutex); } pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex); @@ -404,11 +425,6 @@ static void* compressionThread(void* arg) /* wait until job previously in this space is written */ pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex); while (currJob - ctx->jobWriteID >= ctx->numJobs && !ctx->threadError) { - /* compression thread is waiting on writer thread, take measurement */ - pthread_mutex_lock(&ctx->completion_mutex.pMutex); - ctx->compressWaitWriteCompletion = ctx->writeCompletion; - DEBUG(2, "compression thread waiting for write: %u, compressWaitWriteCompletion %f\n", currJob, ctx->compressWaitWriteCompletion); - pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex); } pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex); @@ -488,7 +504,7 @@ static void* compressionThread(void* arg) /* update completion */ pthread_mutex_lock(&ctx->completion_mutex.pMutex); ctx->compressionCompletion = 1 - (double)remaining/job->src.size; - DEBUG(2, "compression completion %f\n", ctx->compressionCompletion); + DEBUG(2, "compression completion %u %f\n", currJob, ctx->compressionCompletion); pthread_mutex_unlock(&ctx->completion_mutex.pMutex); } } while (remaining != 0); @@ -579,7 +595,7 @@ static void* outputThread(void* arg) /* update completion variable for writing */ pthread_mutex_lock(&ctx->completion_mutex.pMutex); ctx->writeCompletion = 1 - (double)remaining/compressedSize; - DEBUG(2, "write completion %f\n", ctx->writeCompletion); + DEBUG(2, "write completion %u %f\n", currJob, ctx->writeCompletion); pthread_mutex_unlock(&ctx->completion_mutex.pMutex); if (remaining == 0) break; @@ -721,7 +737,7 @@ static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadA remaining -= ret; pthread_mutex_lock(&ctx->completion_mutex.pMutex); ctx->createCompletion = 1 - (double)remaining/((size_t)FILE_CHUNK_SIZE); - DEBUG(2, "create completion: %f\n", ctx->createCompletion); + DEBUG(2, "create completion %u %f\n", currJob, ctx->createCompletion); pthread_mutex_unlock(&ctx->completion_mutex.pMutex); } if (remaining != 0 && !feof(srcFile)) {