measure multiple completion levels during each wait

dev
Paul Cruz 2017-07-21 13:38:24 -07:00
parent 721c6a8b97
commit db109f8fef
1 changed files with 75 additions and 48 deletions

View File

@ -25,7 +25,7 @@
#define DEFAULT_DISPLAY_LEVEL 1 #define DEFAULT_DISPLAY_LEVEL 1
#define DEFAULT_COMPRESSION_LEVEL 6 #define DEFAULT_COMPRESSION_LEVEL 6
#define DEFAULT_ADAPT_PARAM 0 #define DEFAULT_ADAPT_PARAM 0
#define MAX_COMPRESSION_LEVEL_CHANGE 4 #define MAX_COMPRESSION_LEVEL_CHANGE 3
static int g_displayLevel = DEFAULT_DISPLAY_LEVEL; static int g_displayLevel = DEFAULT_DISPLAY_LEVEL;
static unsigned g_compressionLevel = DEFAULT_COMPRESSION_LEVEL; static unsigned g_compressionLevel = DEFAULT_COMPRESSION_LEVEL;
@ -77,9 +77,12 @@ typedef struct {
unsigned jobWriteID; unsigned jobWriteID;
unsigned allJobsCompleted; unsigned allJobsCompleted;
unsigned adaptParam; unsigned adaptParam;
double compressionCompletionMeasured; double createWaitWriteCompletion;
double writeCompletionMeasured; double createWaitCompressionCompletion;
double createCompletionMeasured; double compressWaitCreateCompletion;
double compressWaitWriteCompletion;
double writeWaitCreateCompletion;
double writeWaitCompressionCompletion;
double compressionCompletion; double compressionCompletion;
double writeCompletion; double writeCompletion;
double createCompletion; double createCompletion;
@ -200,9 +203,14 @@ static int initCCtx(adaptCCtx* ctx, unsigned numJobs)
ctx->jobCompressedID = 0; ctx->jobCompressedID = 0;
ctx->jobWriteID = 0; ctx->jobWriteID = 0;
ctx->lastDictSize = 0; ctx->lastDictSize = 0;
ctx->createCompletionMeasured = 1;
ctx->compressionCompletionMeasured = 1;
ctx->writeCompletionMeasured = 1; ctx->createWaitWriteCompletion = 1;
ctx->createWaitCompressionCompletion = 1;
ctx->compressWaitCreateCompletion = 1;
ctx->compressWaitWriteCompletion = 1;
ctx->writeWaitCreateCompletion = 1;
ctx->writeWaitCompressionCompletion = 1;
ctx->jobs = calloc(1, numJobs*sizeof(jobDescription)); ctx->jobs = calloc(1, numJobs*sizeof(jobDescription));
@ -308,45 +316,61 @@ static void waitUntilAllJobsCompleted(adaptCCtx* ctx)
*/ */
static void adaptCompressionLevel(adaptCCtx* ctx) static void adaptCompressionLevel(adaptCCtx* ctx)
{ {
double createCompletion, compressionCompletion, writeCompletion; double createWaitWriteCompletion;
double createWaitCompressionCompletion;
double compressWaitCreateCompletion;
double compressWaitWriteCompletion;
double writeWaitCreateCompletion;
double writeWaitCompressionCompletion;
double const threshold = 0.00001; double const threshold = 0.00001;
/* read and reset completion measurements */
pthread_mutex_lock(&ctx->completion_mutex.pMutex); pthread_mutex_lock(&ctx->completion_mutex.pMutex);
createCompletion = ctx->createCompletionMeasured; DEBUG(2, "rc %f\n", ctx->createWaitCompressionCompletion);
compressionCompletion = ctx->compressionCompletionMeasured; DEBUG(2, "rw %f\n", ctx->createWaitWriteCompletion);
writeCompletion = ctx->writeCompletionMeasured; DEBUG(2, "cr %f\n", ctx->compressWaitCreateCompletion);
DEBUG(2, "cw %f\n", ctx->compressWaitWriteCompletion);
DEBUG(2, "wr %f\n", ctx->writeWaitCreateCompletion);
DEBUG(2, "wc %f\n\n", ctx->writeWaitCompressionCompletion);
createWaitCompressionCompletion = ctx->createWaitCompressionCompletion;
createWaitWriteCompletion = ctx->createWaitWriteCompletion;
compressWaitCreateCompletion = ctx->compressWaitCreateCompletion;
compressWaitWriteCompletion = ctx->compressWaitWriteCompletion;
writeWaitCreateCompletion = ctx->writeWaitCreateCompletion;
writeWaitCompressionCompletion = ctx->writeWaitCompressionCompletion;
ctx->createWaitWriteCompletion = 1;
ctx->createWaitCompressionCompletion = 1;
ctx->compressWaitCreateCompletion = 1;
ctx->compressWaitWriteCompletion = 1;
ctx->writeWaitCreateCompletion = 1;
ctx->writeWaitCompressionCompletion = 1;
pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
DEBUG(2, "create completion: %f\n", createCompletion); /* adaptation logic */
DEBUG(2, "compression completion: %f\n", compressionCompletion); if (1-createWaitCompressionCompletion > threshold && 1-writeWaitCompressionCompletion > threshold) {
DEBUG(2, "write completion: %f\n", writeCompletion); /* both create and write threads waiting on compression */
/* adapt compression based on bottleneck */ /* use writeWaitCompressionCompletion */
if (1 - createCompletion > threshold) { unsigned const change = (unsigned)((1-writeWaitCompressionCompletion) * MAX_COMPRESSION_LEVEL_CHANGE);
/* job creation was not finished, compression thread waited */
unsigned const change = MAX_COMPRESSION_LEVEL_CHANGE - createCompletion * MAX_COMPRESSION_LEVEL_CHANGE;
unsigned const boundChange = MIN(change, ZSTD_maxCLevel() - ctx->compressionLevel);
DEBUG(2, "increasing compression level %u by %u\n", ctx->compressionLevel, change);
ctx->compressionLevel += boundChange;
}
else if (1 - writeCompletion > threshold) {
/* write thread was not finished, compression thread waited */
unsigned const change = MAX_COMPRESSION_LEVEL_CHANGE - writeCompletion * MAX_COMPRESSION_LEVEL_CHANGE;
unsigned const boundChange = MIN(change, ZSTD_maxCLevel() - ctx->compressionLevel);
DEBUG(2, "increasing compression level %u by %u\n", ctx->compressionLevel, change);
ctx->compressionLevel += boundChange;
}
else if (1 - compressionCompletion > threshold) {
/* compression thread was not finished, one of the other two threads waited */
unsigned const change = MAX_COMPRESSION_LEVEL_CHANGE - compressionCompletion * MAX_COMPRESSION_LEVEL_CHANGE;
unsigned const boundChange = MIN(change, ctx->compressionLevel - 1); unsigned const boundChange = MIN(change, ctx->compressionLevel - 1);
DEBUG(2, "decreasing compression level %u by %u\n", ctx->compressionLevel, change);
ctx->compressionLevel -= boundChange; ctx->compressionLevel -= boundChange;
} }
/* reset */ else if (1-createWaitWriteCompletion > threshold && 1-compressWaitWriteCompletion > threshold) {
pthread_mutex_lock(&ctx->completion_mutex.pMutex); /* both create and compression thread waiting on write */
ctx->createCompletionMeasured = 1; /* use createWaitWriteCompletion */
ctx->compressionCompletionMeasured = 1; unsigned const change = (unsigned)((1-createWaitWriteCompletion) * MAX_COMPRESSION_LEVEL_CHANGE);
ctx->writeCompletionMeasured = 1; unsigned const boundChange = MIN(change, ZSTD_maxCLevel() - ctx->compressionLevel);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex); ctx->compressionLevel += boundChange;
}
else if (1-writeWaitCreateCompletion > threshold && 1-compressWaitCreateCompletion > threshold) {
/* both compression and write waiting on create */
/* use compressWaitCreateCompletion */
unsigned const change = (unsigned)((1-compressWaitCreateCompletion) * MAX_COMPRESSION_LEVEL_CHANGE);
unsigned const boundChange = MIN(change, ZSTD_maxCLevel() - ctx->compressionLevel);
ctx->compressionLevel += boundChange;
}
if (g_forceCompressionLevel) { if (g_forceCompressionLevel) {
ctx->compressionLevel = g_compressionLevel; ctx->compressionLevel = g_compressionLevel;
@ -375,9 +399,9 @@ static void* compressionThread(void* arg)
while(currJob + 1 > ctx->jobReadyID && !ctx->threadError) { while(currJob + 1 > ctx->jobReadyID && !ctx->threadError) {
pthread_mutex_lock(&ctx->completion_mutex.pMutex); pthread_mutex_lock(&ctx->completion_mutex.pMutex);
/* compression thread is waiting, take measurements of write completion and read completion */ /* compression thread is waiting, take measurements of write completion and read completion */
ctx->createCompletionMeasured = ctx->createCompletion; ctx->compressWaitCreateCompletion = ctx->createCompletion;
ctx->writeCompletionMeasured = ctx->writeCompletion; ctx->compressWaitWriteCompletion = ctx->writeCompletion;
DEBUG(3, "compression thread waiting : createCompletionMeasured %f : writeCompletionMeasured %f\n", ctx->createCompletionMeasured, ctx->writeCompletionMeasured); DEBUG(3, "compression thread waiting : compressWaitCreateCompletion %f : compressWaitWriteCompletion %f\n", ctx->compressWaitCreateCompletion, ctx->compressWaitWriteCompletion);
DEBUG(3, "create completion: %f\n", ctx->createCompletion); DEBUG(3, "create completion: %f\n", ctx->createCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
DEBUG(3, "waiting on job ready, nextJob: %u\n", currJob); DEBUG(3, "waiting on job ready, nextJob: %u\n", currJob);
@ -462,7 +486,7 @@ static void* compressionThread(void* arg)
/* update completion */ /* update completion */
pthread_mutex_lock(&ctx->completion_mutex.pMutex); pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->compressionCompletion = 1 - (double)remaining/job->src.size; ctx->compressionCompletion = 1 - (double)remaining/job->src.size;
DEBUG(3, "update on job %u: compression completion %f\n", currJob, ctx->compressionCompletion); DEBUG(3, "compression completion %f\n", ctx->compressionCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
} }
} while (remaining != 0); } while (remaining != 0);
@ -517,8 +541,9 @@ static void* outputThread(void* arg)
while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) { while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) {
pthread_mutex_lock(&ctx->completion_mutex.pMutex); pthread_mutex_lock(&ctx->completion_mutex.pMutex);
/* write thread is waiting, take measurement of compression completion */ /* write thread is waiting, take measurement of compression completion */
ctx->compressionCompletionMeasured = ctx->compressionCompletion; ctx->writeWaitCompressionCompletion = ctx->compressionCompletion;
DEBUG(3, "write thread waiting : compressionCompletionMeasured %f\n", ctx->compressionCompletionMeasured); ctx->writeWaitCreateCompletion = ctx->createCompletion;
DEBUG(3, "write thread waiting : writeWaitCreateCompletion %f : writeWaitCompressionCompletion %f\n", ctx->writeWaitCreateCompletion, ctx->writeWaitCompressionCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
DEBUG(3, "waiting on job compressed, nextJob: %u\n", currJob); DEBUG(3, "waiting on job compressed, nextJob: %u\n", currJob);
pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex); pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
@ -541,7 +566,7 @@ static void* outputThread(void* arg)
} }
{ {
// size_t const writeSize = fwrite(job->dst.start, 1, compressedSize, dstFile); // size_t const writeSize = fwrite(job->dst.start, 1, compressedSize, dstFile);
size_t const blockSize = MAX(compressedSize >> 7, 64 << 10); size_t const blockSize = MAX(compressedSize >> 7, 1 << 10);
size_t pos = 0; size_t pos = 0;
for ( ; ; ) { for ( ; ; ) {
size_t const writeSize = MIN(remaining, blockSize); size_t const writeSize = MIN(remaining, blockSize);
@ -553,6 +578,7 @@ static void* outputThread(void* arg)
/* update completion variable for writing */ /* update completion variable for writing */
pthread_mutex_lock(&ctx->completion_mutex.pMutex); pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->writeCompletion = 1 - (double)remaining/compressedSize; ctx->writeCompletion = 1 - (double)remaining/compressedSize;
DEBUG(3, "write completion %f\n", ctx->writeCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
if (remaining == 0) break; if (remaining == 0) break;
@ -599,8 +625,9 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
while (nextJob - ctx->jobWriteID >= ctx->numJobs && !ctx->threadError) { while (nextJob - ctx->jobWriteID >= ctx->numJobs && !ctx->threadError) {
pthread_mutex_lock(&ctx->completion_mutex.pMutex); pthread_mutex_lock(&ctx->completion_mutex.pMutex);
/* creation thread is waiting, take measurement of compression completion */ /* creation thread is waiting, take measurement of compression completion */
ctx->compressionCompletionMeasured = ctx->compressionCompletion; ctx->createWaitCompressionCompletion = ctx->compressionCompletion;
DEBUG(3, "creation thread waiting : compression completion measured : %f\n", ctx->compressionCompletionMeasured); ctx->createWaitWriteCompletion = ctx->writeCompletion;
DEBUG(3, "creation thread waiting : createWaitCompressionCompletion %f : createWaitWriteCompletion %f\n", ctx->createWaitCompressionCompletion, ctx->createWaitWriteCompletion);
DEBUG(3, "writeCompletion: %f\n", ctx->writeCompletion); DEBUG(3, "writeCompletion: %f\n", ctx->writeCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
DEBUG(3, "waiting on job Write, nextJob: %u\n", nextJob); DEBUG(3, "waiting on job Write, nextJob: %u\n", nextJob);