diff --git a/contrib/adaptive-compression/adapt.c b/contrib/adaptive-compression/adapt.c index 5a3b723d..e07333fc 100644 --- a/contrib/adaptive-compression/adapt.c +++ b/contrib/adaptive-compression/adapt.c @@ -354,23 +354,29 @@ static void adaptCompressionLevel(adaptCCtx* ctx) if (1-createWaitCompressionCompletion > threshold && 1-writeWaitCompressionCompletion > threshold) { /* both create and write threads waiting on compression */ /* use writeWaitCompressionCompletion */ - unsigned const change = (unsigned)((1-writeWaitCompressionCompletion) * MAX_COMPRESSION_LEVEL_CHANGE); + double const completion = MAX(createWaitCompressionCompletion, writeWaitCompressionCompletion); + unsigned const change = (unsigned)((1-completion) * MAX_COMPRESSION_LEVEL_CHANGE); unsigned const boundChange = MIN(change, ctx->compressionLevel - 1); ctx->compressionLevel -= boundChange; + DEBUG(2, "create and write threads waiting, tried to decrease compression level by %u\n", boundChange); } else if (1-createWaitWriteCompletion > threshold && 1-compressWaitWriteCompletion > threshold) { /* both create and compression thread waiting on write */ /* use createWaitWriteCompletion */ - unsigned const change = (unsigned)((1-createWaitWriteCompletion) * MAX_COMPRESSION_LEVEL_CHANGE); + double const completion = MAX(createWaitWriteCompletion, compressWaitWriteCompletion); + unsigned const change = (unsigned)((1-completion) * MAX_COMPRESSION_LEVEL_CHANGE); unsigned const boundChange = MIN(change, ZSTD_maxCLevel() - ctx->compressionLevel); ctx->compressionLevel += boundChange; + DEBUG(2, "create and compression threads waiting, tried to increase compression level by %u\n", boundChange); } else if (1-writeWaitCreateCompletion > threshold && 1-compressWaitCreateCompletion > threshold) { /* both compression and write waiting on create */ /* use compressWaitCreateCompletion */ - unsigned const change = (unsigned)((1-compressWaitCreateCompletion) * MAX_COMPRESSION_LEVEL_CHANGE); + double const completion = MAX(writeWaitCreateCompletion, compressWaitCreateCompletion); + unsigned const change = (unsigned)((1-completion) * MAX_COMPRESSION_LEVEL_CHANGE); unsigned const boundChange = MIN(change, ZSTD_maxCLevel() - ctx->compressionLevel); ctx->compressionLevel += boundChange; + DEBUG(2, "compression and write threads waiting, tried to increase compression level by %u\n", boundChange); } if (g_forceCompressionLevel) { @@ -404,8 +410,8 @@ static void* compressionThread(void* arg) ctx->compressWaitWriteCompletion = ctx->writeCompletion; DEBUG(3, "compression thread waiting : compressWaitCreateCompletion %f : compressWaitWriteCompletion %f\n", ctx->compressWaitCreateCompletion, ctx->compressWaitWriteCompletion); DEBUG(3, "create completion: %f\n", ctx->createCompletion); + DEBUG(2, "compression thread waiting for nextJob: %u, compressWaitCreateCompletion %f, compressWaitWriteCompletion %f\n", currJob, ctx->compressWaitCreateCompletion, ctx->compressWaitWriteCompletion); pthread_mutex_unlock(&ctx->completion_mutex.pMutex); - DEBUG(2, "waiting on job ready, nextJob: %u\n", currJob); pthread_cond_wait(&ctx->jobReady_cond.pCond, &ctx->jobReady_mutex.pMutex); } pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex); @@ -422,6 +428,7 @@ static void* compressionThread(void* arg) /* adapt compression level */ if (currJob) adaptCompressionLevel(ctx); + DEBUG(2, "job %u compressed with level %u\n", currJob, ctx->compressionLevel); /* compress the data */ { size_t const compressionBlockSize = 1 << 17; /* 128 KB */ @@ -487,7 +494,7 @@ static void* compressionThread(void* arg) /* update completion */ pthread_mutex_lock(&ctx->completion_mutex.pMutex); ctx->compressionCompletion = 1 - (double)remaining/job->src.size; - DEBUG(3, "compression completion %f\n", ctx->compressionCompletion); + DEBUG(2, "compression completion %f\n", ctx->compressionCompletion); pthread_mutex_unlock(&ctx->completion_mutex.pMutex); } } while (remaining != 0); @@ -545,8 +552,8 @@ static void* outputThread(void* arg) ctx->writeWaitCompressionCompletion = ctx->compressionCompletion; ctx->writeWaitCreateCompletion = ctx->createCompletion; DEBUG(3, "write thread waiting : writeWaitCreateCompletion %f : writeWaitCompressionCompletion %f\n", ctx->writeWaitCreateCompletion, ctx->writeWaitCompressionCompletion); + DEBUG(2, "writer thread waiting for nextJob: %u, writeWaitCompressionCompletion %f, writeWaitCreateCompletion %f\n", currJob, ctx->writeWaitCompressionCompletion, ctx->writeWaitCreateCompletion); pthread_mutex_unlock(&ctx->completion_mutex.pMutex); - DEBUG(2, "waiting on job compressed, nextJob: %u\n", currJob); pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex); } pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex); @@ -579,7 +586,7 @@ static void* outputThread(void* arg) /* update completion variable for writing */ pthread_mutex_lock(&ctx->completion_mutex.pMutex); ctx->writeCompletion = 1 - (double)remaining/compressedSize; - DEBUG(3, "write completion %f\n", ctx->writeCompletion); + DEBUG(2, "write completion %f\n", ctx->writeCompletion); pthread_mutex_unlock(&ctx->completion_mutex.pMutex); if (remaining == 0) break; @@ -630,8 +637,8 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last) ctx->createWaitWriteCompletion = ctx->writeCompletion; DEBUG(3, "creation thread waiting : createWaitCompressionCompletion %f : createWaitWriteCompletion %f\n", ctx->createWaitCompressionCompletion, ctx->createWaitWriteCompletion); DEBUG(3, "writeCompletion: %f\n", ctx->writeCompletion); + DEBUG(2, "create thread waiting for nextJob: %u, createWaitCompressionCompletion %f, createWaitWriteCompletion %f\n", nextJob, ctx->createWaitCompressionCompletion, ctx->createWaitWriteCompletion); pthread_mutex_unlock(&ctx->completion_mutex.pMutex); - DEBUG(2, "waiting on job Write, nextJob: %u\n", nextJob); pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex); } pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex); @@ -721,7 +728,7 @@ static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadA remaining -= ret; pthread_mutex_lock(&ctx->completion_mutex.pMutex); ctx->createCompletion = 1 - (double)remaining/((size_t)FILE_CHUNK_SIZE); - DEBUG(3, "create completion: %f\n", ctx->createCompletion); + DEBUG(2, "create completion: %f\n", ctx->createCompletion); pthread_mutex_unlock(&ctx->completion_mutex.pMutex); } if (remaining != 0 && !feof(srcFile)) {