change how completion is measured in compression thread

dev
Paul Cruz 2017-07-23 10:18:54 -07:00
parent 08d9e42ec6
commit 880f08d104
1 changed files with 32 additions and 16 deletions

View File

@ -25,7 +25,7 @@
#define DEFAULT_DISPLAY_LEVEL 1 #define DEFAULT_DISPLAY_LEVEL 1
#define DEFAULT_COMPRESSION_LEVEL 6 #define DEFAULT_COMPRESSION_LEVEL 6
#define DEFAULT_ADAPT_PARAM 0 #define DEFAULT_ADAPT_PARAM 0
#define MAX_COMPRESSION_LEVEL_CHANGE 3 #define MAX_COMPRESSION_LEVEL_CHANGE 2
static int g_displayLevel = DEFAULT_DISPLAY_LEVEL; static int g_displayLevel = DEFAULT_DISPLAY_LEVEL;
static unsigned g_compressionLevel = DEFAULT_COMPRESSION_LEVEL; static unsigned g_compressionLevel = DEFAULT_COMPRESSION_LEVEL;
@ -207,6 +207,9 @@ static int initCCtx(adaptCCtx* ctx, unsigned numJobs)
ctx->compressWaitCreateCompletion = 1; ctx->compressWaitCreateCompletion = 1;
ctx->compressWaitWriteCompletion = 1; ctx->compressWaitWriteCompletion = 1;
ctx->writeWaitCompressionCompletion = 1; ctx->writeWaitCompressionCompletion = 1;
ctx->createCompletion = 1;
ctx->writeCompletion = 1;
ctx->compressionCompletion = 1;
ctx->jobs = calloc(1, numJobs*sizeof(jobDescription)); ctx->jobs = calloc(1, numJobs*sizeof(jobDescription));
@ -387,16 +390,34 @@ static void* compressionThread(void* arg)
unsigned const currJobIndex = currJob % ctx->numJobs; unsigned const currJobIndex = currJob % ctx->numJobs;
jobDescription* job = &ctx->jobs[currJobIndex]; jobDescription* job = &ctx->jobs[currJobIndex];
DEBUG(3, "compressionThread(): waiting on job ready\n"); DEBUG(3, "compressionThread(): waiting on job ready\n");
{
/* check if compression thread will have to wait */
unsigned willWaitForCreate = 0;
unsigned willWaitForWrite = 0;
pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
if (currJob + 1 > ctx->jobReadyID) willWaitForCreate = 1;
pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
if (currJob - ctx->jobWriteID >= ctx->numJobs) willWaitForWrite = 1;
pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
if (willWaitForCreate || willWaitForWrite) {
ctx->compressWaitCreateCompletion = ctx->createCompletion;
ctx->compressWaitWriteCompletion = ctx->writeCompletion;
DEBUG(2, "compression will wait for create or write\n");
DEBUG(2, "create completion %f\n", ctx->compressWaitCreateCompletion);
DEBUG(2, "write completion %f\n", ctx->compressWaitWriteCompletion);
}
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
}
/* wait until job is ready */ /* wait until job is ready */
pthread_mutex_lock(&ctx->jobReady_mutex.pMutex); pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
while (currJob + 1 > ctx->jobReadyID && !ctx->threadError) { while (currJob + 1 > ctx->jobReadyID && !ctx->threadError) {
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
/* compression thread is waiting on creation thread, take measurement */
ctx->compressWaitCreateCompletion = ctx->createCompletion;
DEBUG(3, "compression thread waiting : compressWaitCreateCompletion %f\n", ctx->compressWaitCreateCompletion);
DEBUG(3, "create completion: %f\n", ctx->createCompletion);
DEBUG(2, "compression thread waiting for ready: %u, compressWaitCreateCompletion %f\n", currJob, ctx->compressWaitCreateCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
pthread_cond_wait(&ctx->jobReady_cond.pCond, &ctx->jobReady_mutex.pMutex); pthread_cond_wait(&ctx->jobReady_cond.pCond, &ctx->jobReady_mutex.pMutex);
} }
pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex); pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
@ -404,11 +425,6 @@ static void* compressionThread(void* arg)
/* wait until job previously in this space is written */ /* wait until job previously in this space is written */
pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex); pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
while (currJob - ctx->jobWriteID >= ctx->numJobs && !ctx->threadError) { while (currJob - ctx->jobWriteID >= ctx->numJobs && !ctx->threadError) {
/* compression thread is waiting on writer thread, take measurement */
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->compressWaitWriteCompletion = ctx->writeCompletion;
DEBUG(2, "compression thread waiting for write: %u, compressWaitWriteCompletion %f\n", currJob, ctx->compressWaitWriteCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex); pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex);
} }
pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex); pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
@ -488,7 +504,7 @@ static void* compressionThread(void* arg)
/* update completion */ /* update completion */
pthread_mutex_lock(&ctx->completion_mutex.pMutex); pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->compressionCompletion = 1 - (double)remaining/job->src.size; ctx->compressionCompletion = 1 - (double)remaining/job->src.size;
DEBUG(2, "compression completion %f\n", ctx->compressionCompletion); DEBUG(2, "compression completion %u %f\n", currJob, ctx->compressionCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
} }
} while (remaining != 0); } while (remaining != 0);
@ -579,7 +595,7 @@ static void* outputThread(void* arg)
/* update completion variable for writing */ /* update completion variable for writing */
pthread_mutex_lock(&ctx->completion_mutex.pMutex); pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->writeCompletion = 1 - (double)remaining/compressedSize; ctx->writeCompletion = 1 - (double)remaining/compressedSize;
DEBUG(2, "write completion %f\n", ctx->writeCompletion); DEBUG(2, "write completion %u %f\n", currJob, ctx->writeCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
if (remaining == 0) break; if (remaining == 0) break;
@ -721,7 +737,7 @@ static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadA
remaining -= ret; remaining -= ret;
pthread_mutex_lock(&ctx->completion_mutex.pMutex); pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->createCompletion = 1 - (double)remaining/((size_t)FILE_CHUNK_SIZE); ctx->createCompletion = 1 - (double)remaining/((size_t)FILE_CHUNK_SIZE);
DEBUG(2, "create completion: %f\n", ctx->createCompletion); DEBUG(2, "create completion %u %f\n", currJob, ctx->createCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
} }
if (remaining != 0 && !feof(srcFile)) { if (remaining != 0 && !feof(srcFile)) {