removed useless measurements
parent
95bef759b3
commit
08d9e42ec6
|
@ -77,11 +77,9 @@ typedef struct {
|
||||||
unsigned jobWriteID;
|
unsigned jobWriteID;
|
||||||
unsigned allJobsCompleted;
|
unsigned allJobsCompleted;
|
||||||
unsigned adaptParam;
|
unsigned adaptParam;
|
||||||
double createWaitWriteCompletion;
|
|
||||||
double createWaitCompressionCompletion;
|
double createWaitCompressionCompletion;
|
||||||
double compressWaitCreateCompletion;
|
double compressWaitCreateCompletion;
|
||||||
double compressWaitWriteCompletion;
|
double compressWaitWriteCompletion;
|
||||||
double writeWaitCreateCompletion;
|
|
||||||
double writeWaitCompressionCompletion;
|
double writeWaitCompressionCompletion;
|
||||||
double compressionCompletion;
|
double compressionCompletion;
|
||||||
double writeCompletion;
|
double writeCompletion;
|
||||||
|
@ -205,11 +203,9 @@ static int initCCtx(adaptCCtx* ctx, unsigned numJobs)
|
||||||
ctx->lastDictSize = 0;
|
ctx->lastDictSize = 0;
|
||||||
|
|
||||||
|
|
||||||
ctx->createWaitWriteCompletion = 1;
|
|
||||||
ctx->createWaitCompressionCompletion = 1;
|
ctx->createWaitCompressionCompletion = 1;
|
||||||
ctx->compressWaitCreateCompletion = 1;
|
ctx->compressWaitCreateCompletion = 1;
|
||||||
ctx->compressWaitWriteCompletion = 1;
|
ctx->compressWaitWriteCompletion = 1;
|
||||||
ctx->writeWaitCreateCompletion = 1;
|
|
||||||
ctx->writeWaitCompressionCompletion = 1;
|
ctx->writeWaitCompressionCompletion = 1;
|
||||||
|
|
||||||
ctx->jobs = calloc(1, numJobs*sizeof(jobDescription));
|
ctx->jobs = calloc(1, numJobs*sizeof(jobDescription));
|
||||||
|
@ -316,11 +312,9 @@ static void waitUntilAllJobsCompleted(adaptCCtx* ctx)
|
||||||
*/
|
*/
|
||||||
static void adaptCompressionLevel(adaptCCtx* ctx)
|
static void adaptCompressionLevel(adaptCCtx* ctx)
|
||||||
{
|
{
|
||||||
double createWaitWriteCompletion;
|
|
||||||
double createWaitCompressionCompletion;
|
double createWaitCompressionCompletion;
|
||||||
double compressWaitCreateCompletion;
|
double compressWaitCreateCompletion;
|
||||||
double compressWaitWriteCompletion;
|
double compressWaitWriteCompletion;
|
||||||
double writeWaitCreateCompletion;
|
|
||||||
double writeWaitCompressionCompletion;
|
double writeWaitCompressionCompletion;
|
||||||
double const threshold = 0.00001;
|
double const threshold = 0.00001;
|
||||||
|
|
||||||
|
@ -328,25 +322,19 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
|
||||||
/* read and reset completion measurements */
|
/* read and reset completion measurements */
|
||||||
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
|
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
|
||||||
DEBUG(2, "rc %f\n", ctx->createWaitCompressionCompletion);
|
DEBUG(2, "rc %f\n", ctx->createWaitCompressionCompletion);
|
||||||
DEBUG(2, "rw %f\n", ctx->createWaitWriteCompletion);
|
|
||||||
DEBUG(2, "cr %f\n", ctx->compressWaitCreateCompletion);
|
DEBUG(2, "cr %f\n", ctx->compressWaitCreateCompletion);
|
||||||
DEBUG(2, "cw %f\n", ctx->compressWaitWriteCompletion);
|
DEBUG(2, "cw %f\n", ctx->compressWaitWriteCompletion);
|
||||||
DEBUG(2, "wr %f\n", ctx->writeWaitCreateCompletion);
|
|
||||||
DEBUG(2, "wc %f\n\n", ctx->writeWaitCompressionCompletion);
|
DEBUG(2, "wc %f\n\n", ctx->writeWaitCompressionCompletion);
|
||||||
|
|
||||||
createWaitCompressionCompletion = ctx->createWaitCompressionCompletion;
|
createWaitCompressionCompletion = ctx->createWaitCompressionCompletion;
|
||||||
createWaitWriteCompletion = ctx->createWaitWriteCompletion;
|
|
||||||
compressWaitCreateCompletion = ctx->compressWaitCreateCompletion;
|
compressWaitCreateCompletion = ctx->compressWaitCreateCompletion;
|
||||||
compressWaitWriteCompletion = ctx->compressWaitWriteCompletion;
|
compressWaitWriteCompletion = ctx->compressWaitWriteCompletion;
|
||||||
writeWaitCreateCompletion = ctx->writeWaitCreateCompletion;
|
|
||||||
writeWaitCompressionCompletion = ctx->writeWaitCompressionCompletion;
|
writeWaitCompressionCompletion = ctx->writeWaitCompressionCompletion;
|
||||||
|
|
||||||
DEBUG(2, "resetting adaptive variables\n");
|
DEBUG(2, "resetting adaptive variables\n");
|
||||||
ctx->createWaitWriteCompletion = 1;
|
|
||||||
ctx->createWaitCompressionCompletion = 1;
|
ctx->createWaitCompressionCompletion = 1;
|
||||||
ctx->compressWaitCreateCompletion = 1;
|
ctx->compressWaitCreateCompletion = 1;
|
||||||
ctx->compressWaitWriteCompletion = 1;
|
ctx->compressWaitWriteCompletion = 1;
|
||||||
ctx->writeWaitCreateCompletion = 1;
|
|
||||||
ctx->writeWaitCompressionCompletion = 1;
|
ctx->writeWaitCompressionCompletion = 1;
|
||||||
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
|
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
|
||||||
|
|
||||||
|
@ -360,19 +348,18 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
|
||||||
ctx->compressionLevel -= boundChange;
|
ctx->compressionLevel -= boundChange;
|
||||||
DEBUG(2, "create and write threads waiting, tried to decrease compression level by %u\n", boundChange);
|
DEBUG(2, "create and write threads waiting, tried to decrease compression level by %u\n", boundChange);
|
||||||
}
|
}
|
||||||
else if (1-createWaitWriteCompletion > threshold && 1-compressWaitWriteCompletion > threshold) {
|
else if (1-compressWaitWriteCompletion > threshold) {
|
||||||
/* both create and compression thread waiting on write */
|
/* both create and compression thread waiting on write */
|
||||||
/* use createWaitWriteCompletion */
|
double const completion = compressWaitWriteCompletion;
|
||||||
double const completion = MAX(createWaitWriteCompletion, compressWaitWriteCompletion);
|
|
||||||
unsigned const change = (unsigned)((1-completion) * MAX_COMPRESSION_LEVEL_CHANGE);
|
unsigned const change = (unsigned)((1-completion) * MAX_COMPRESSION_LEVEL_CHANGE);
|
||||||
unsigned const boundChange = MIN(change, ZSTD_maxCLevel() - ctx->compressionLevel);
|
unsigned const boundChange = MIN(change, ZSTD_maxCLevel() - ctx->compressionLevel);
|
||||||
ctx->compressionLevel += boundChange;
|
ctx->compressionLevel += boundChange;
|
||||||
DEBUG(2, "create and compression threads waiting, tried to increase compression level by %u\n", boundChange);
|
DEBUG(2, "create and compression threads waiting, tried to increase compression level by %u\n", boundChange);
|
||||||
}
|
}
|
||||||
else if (1-writeWaitCreateCompletion > threshold && 1-compressWaitCreateCompletion > threshold) {
|
else if (1-compressWaitCreateCompletion > threshold) {
|
||||||
/* both compression and write waiting on create */
|
/* both compression and write waiting on create */
|
||||||
/* use compressWaitCreateCompletion */
|
/* use compressWaitCreateCompletion */
|
||||||
double const completion = MAX(writeWaitCreateCompletion, compressWaitCreateCompletion);
|
double const completion = compressWaitCreateCompletion;
|
||||||
unsigned const change = (unsigned)((1-completion) * MAX_COMPRESSION_LEVEL_CHANGE);
|
unsigned const change = (unsigned)((1-completion) * MAX_COMPRESSION_LEVEL_CHANGE);
|
||||||
unsigned const boundChange = MIN(change, ZSTD_maxCLevel() - ctx->compressionLevel);
|
unsigned const boundChange = MIN(change, ZSTD_maxCLevel() - ctx->compressionLevel);
|
||||||
ctx->compressionLevel += boundChange;
|
ctx->compressionLevel += boundChange;
|
||||||
|
@ -417,6 +404,7 @@ static void* compressionThread(void* arg)
|
||||||
/* wait until job previously in this space is written */
|
/* wait until job previously in this space is written */
|
||||||
pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
|
pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
|
||||||
while (currJob - ctx->jobWriteID >= ctx->numJobs && !ctx->threadError) {
|
while (currJob - ctx->jobWriteID >= ctx->numJobs && !ctx->threadError) {
|
||||||
|
/* compression thread is waiting on writer thread, take measurement */
|
||||||
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
|
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
|
||||||
ctx->compressWaitWriteCompletion = ctx->writeCompletion;
|
ctx->compressWaitWriteCompletion = ctx->writeCompletion;
|
||||||
DEBUG(2, "compression thread waiting for write: %u, compressWaitWriteCompletion %f\n", currJob, ctx->compressWaitWriteCompletion);
|
DEBUG(2, "compression thread waiting for write: %u, compressWaitWriteCompletion %f\n", currJob, ctx->compressWaitWriteCompletion);
|
||||||
|
@ -554,11 +542,10 @@ static void* outputThread(void* arg)
|
||||||
pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
|
pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
|
||||||
while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) {
|
while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) {
|
||||||
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
|
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
|
||||||
/* write thread is waiting, take measurement of compression completion */
|
/* write thread is waiting on compression thread */
|
||||||
ctx->writeWaitCompressionCompletion = ctx->compressionCompletion;
|
ctx->writeWaitCompressionCompletion = ctx->compressionCompletion;
|
||||||
ctx->writeWaitCreateCompletion = ctx->createCompletion;
|
DEBUG(3, "write thread waiting : writeWaitCompressionCompletion %f\n", ctx->writeWaitCompressionCompletion);
|
||||||
DEBUG(3, "write thread waiting : writeWaitCreateCompletion %f : writeWaitCompressionCompletion %f\n", ctx->writeWaitCreateCompletion, ctx->writeWaitCompressionCompletion);
|
DEBUG(2, "writer thread waiting for nextJob: %u, writeWaitCompressionCompletion %f\n", currJob, ctx->writeWaitCompressionCompletion);
|
||||||
DEBUG(2, "writer thread waiting for nextJob: %u, writeWaitCompressionCompletion %f, writeWaitCreateCompletion %f\n", currJob, ctx->writeWaitCompressionCompletion, ctx->writeWaitCreateCompletion);
|
|
||||||
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
|
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
|
||||||
pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
|
pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
|
||||||
}
|
}
|
||||||
|
@ -642,10 +629,9 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
|
||||||
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
|
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
|
||||||
/* creation thread is waiting, take measurement of completion */
|
/* creation thread is waiting, take measurement of completion */
|
||||||
ctx->createWaitCompressionCompletion = ctx->compressionCompletion;
|
ctx->createWaitCompressionCompletion = ctx->compressionCompletion;
|
||||||
ctx->createWaitWriteCompletion = ctx->writeCompletion;
|
DEBUG(3, "creation thread waiting : createWaitCompressionCompletion %f\n", ctx->createWaitCompressionCompletion);
|
||||||
DEBUG(3, "creation thread waiting : createWaitCompressionCompletion %f : createWaitWriteCompletion %f\n", ctx->createWaitCompressionCompletion, ctx->createWaitWriteCompletion);
|
|
||||||
DEBUG(3, "writeCompletion: %f\n", ctx->writeCompletion);
|
DEBUG(3, "writeCompletion: %f\n", ctx->writeCompletion);
|
||||||
DEBUG(2, "create thread waiting for nextJob: %u, createWaitCompressionCompletion %f, createWaitWriteCompletion %f\n", nextJob, ctx->createWaitCompressionCompletion, ctx->createWaitWriteCompletion);
|
DEBUG(2, "create thread waiting for nextJob: %u, createWaitCompressionCompletion %f\n", nextJob, ctx->createWaitCompressionCompletion);
|
||||||
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
|
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
|
||||||
pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
|
pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue