diff --git a/contrib/adaptive-compression/adapt.c b/contrib/adaptive-compression/adapt.c index cf232ca7..84e689a2 100644 --- a/contrib/adaptive-compression/adapt.c +++ b/contrib/adaptive-compression/adapt.c @@ -29,7 +29,6 @@ static int g_displayLevel = DEFAULT_DISPLAY_LEVEL; static unsigned g_compressionLevel = DEFAULT_COMPRESSION_LEVEL; -static unsigned g_displayStats = 0; static UTIL_time_t g_startTime; static size_t g_streamedSize = 0; static unsigned g_useProgressBar = 0; @@ -47,15 +46,6 @@ typedef struct { buffer_t buffer; } inBuff_t; -typedef struct { - unsigned waitCompressed; - unsigned waitReady; - unsigned waitWrite; - unsigned readyCounter; - unsigned compressedCounter; - unsigned writeCounter; -} cStat_t; - typedef struct { buffer_t src; buffer_t dst; @@ -102,10 +92,9 @@ typedef struct { mutex_t jobWrite_mutex; cond_t jobWrite_cond; mutex_t completion_mutex; - mutex_t stats_mutex; + mutex_t wait_mutex; size_t lastDictSize; inBuff_t input; - cStat_t stats; jobDescription* jobs; ZSTD_CCtx* cctx; } adaptCCtx; @@ -163,7 +152,7 @@ static int freeCCtx(adaptCCtx* ctx) error |= destroyMutex(&ctx->jobWrite_mutex); error |= destroyCond(&ctx->jobWrite_cond); error |= destroyMutex(&ctx->completion_mutex); - error |= destroyMutex(&ctx->stats_mutex); + error |= destroyMutex(&ctx->wait_mutex); error |= ZSTD_isError(ZSTD_freeCCtx(ctx->cctx)); free(ctx->input.buffer.start); if (ctx->jobs){ @@ -203,7 +192,7 @@ static int initCCtx(adaptCCtx* ctx, unsigned numJobs) pthreadError |= initMutex(&ctx->jobWrite_mutex); pthreadError |= initCond(&ctx->jobWrite_cond); pthreadError |= initMutex(&ctx->completion_mutex); - pthreadError |= initMutex(&ctx->stats_mutex); + pthreadError |= initMutex(&ctx->wait_mutex); if (pthreadError) return pthreadError; } ctx->numJobs = numJobs; @@ -211,6 +200,10 @@ static int initCCtx(adaptCCtx* ctx, unsigned numJobs) ctx->jobCompressedID = 0; ctx->jobWriteID = 0; ctx->lastDictSize = 0; + ctx->createCompletionMeasured = 1; + ctx->compressionCompletionMeasured = 1; + ctx->writeCompletionMeasured = 1; + ctx->jobs = calloc(1, numJobs*sizeof(jobDescription)); if (!ctx->jobs) { @@ -305,17 +298,6 @@ static void waitUntilAllJobsCompleted(adaptCCtx* ctx) pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex); } -/* this function normalizes counters when compression level is changing */ -static void reduceCounters(adaptCCtx* ctx) -{ - pthread_mutex_lock(&ctx->stats_mutex.pMutex); - unsigned const min = MIN(ctx->stats.compressedCounter, MIN(ctx->stats.writeCounter, ctx->stats.readyCounter)); - ctx->stats.writeCounter -= min; - ctx->stats.compressedCounter -= min; - ctx->stats.readyCounter -= min; - pthread_mutex_unlock(&ctx->stats_mutex.pMutex); -} - /* * Compression level is changed depending on which part of the compression process is lagging * Currently, three theads exist for job creation, compression, and file writing respectively. @@ -330,67 +312,42 @@ static void adaptCompressionLevel(adaptCCtx* ctx) ctx->compressionLevel = g_compressionLevel; } else { - unsigned reset = 0; - unsigned allSlow; - unsigned compressWaiting; - unsigned writeWaiting; - unsigned createWaiting; + DEBUG(2, "compression level %u\n", ctx->compressionLevel); + /* check if compression is too slow */ + unsigned createChange; + unsigned writeChange; + unsigned compressionChange; + pthread_mutex_lock(&ctx->completion_mutex.pMutex); + createChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->createCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE; + writeChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->writeCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE; + compressionChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->compressionCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE; + DEBUG(2, "createCompletionMeasured %f\n", ctx->createCompletionMeasured); + DEBUG(2, "compressionCompletionMeasured %f\n", ctx->compressionCompletionMeasured); + DEBUG(2, "writeCompletionMeasured %f\n", ctx->writeCompletionMeasured); + pthread_mutex_unlock(&ctx->completion_mutex.pMutex); - pthread_mutex_lock(&ctx->stats_mutex.pMutex); - allSlow = ctx->adaptParam < ctx->stats.compressedCounter && ctx->adaptParam < ctx->stats.writeCounter && ctx->adaptParam < ctx->stats.readyCounter; - compressWaiting = ctx->adaptParam < ctx->stats.readyCounter; - writeWaiting = ctx->adaptParam < ctx->stats.compressedCounter; - createWaiting = ctx->adaptParam < ctx->stats.writeCounter; - pthread_mutex_unlock(&ctx->stats_mutex.pMutex); - DEBUG(2, "createWaiting %u\n", createWaiting); - DEBUG(2, "compressWaiting %u\n", compressWaiting); - DEBUG(2, "writeWaiting %u\n\n", writeWaiting); { - unsigned const writeSlow = (compressWaiting && createWaiting); - unsigned const compressSlow = (writeWaiting && createWaiting); - unsigned const createSlow = (compressWaiting && writeWaiting); - DEBUG(3, "createWaiting: %u, compressWaiting: %u, writeWaiting: %u\n", createWaiting, compressWaiting, writeWaiting); - if (allSlow) { - reset = 1; + unsigned const compressionFastChange = MIN(MIN(createChange, writeChange), ZSTD_maxCLevel() - ctx->compressionLevel); + DEBUG(2, "compressionFastChange %u\n", compressionFastChange); + + if (compressionFastChange) { + DEBUG(2, "compression level too low\n"); + ctx->compressionLevel += compressionFastChange; } - else if ((writeSlow || createSlow) && ctx->compressionLevel < (unsigned)ZSTD_maxCLevel()) { - DEBUG(2, "increasing compression level %u\n", ctx->compressionLevel); - double completion; - pthread_mutex_lock(&ctx->completion_mutex.pMutex); - completion = writeSlow ? ctx->writeCompletionMeasured : ctx->createCompletionMeasured; - DEBUG(2, "write completion: %f, create completion: %f\n", ctx->writeCompletionMeasured, ctx->createCompletionMeasured); - pthread_mutex_unlock(&ctx->completion_mutex.pMutex); - { - unsigned const maxChange = MAX_COMPRESSION_LEVEL_CHANGE - (unsigned)(completion*MAX_COMPRESSION_LEVEL_CHANGE); - unsigned const change = MIN(maxChange, ZSTD_maxCLevel() - ctx->compressionLevel); - DEBUG(3, "writeSlow: %u, change: %u\n", writeSlow, change); - DEBUG(3, "write completion: %f\n", completion); - ctx->compressionLevel += change; - reset = 1; - } - } - else if (compressSlow && ctx->compressionLevel > 1) { - double completion; - pthread_mutex_lock(&ctx->completion_mutex.pMutex); - completion = ctx->compressionCompletionMeasured; - pthread_mutex_unlock(&ctx->completion_mutex.pMutex); - { - unsigned const maxChange = MAX_COMPRESSION_LEVEL_CHANGE - (unsigned)(completion*MAX_COMPRESSION_LEVEL_CHANGE); - unsigned const change = MIN(maxChange, ctx->compressionLevel - 1); - DEBUG(2, "decreasing compression level %u\n", ctx->compressionLevel); - DEBUG(2, "completion: %f\n", completion); - ctx->compressionLevel -= change; - reset = 1; - } - } - if (reset) { - pthread_mutex_lock(&ctx->stats_mutex.pMutex); - ctx->stats.readyCounter = 0; - ctx->stats.writeCounter = 0; - ctx->stats.compressedCounter = 0; - pthread_mutex_unlock(&ctx->stats_mutex.pMutex); + else { + unsigned const compressionSlowChange = MIN(compressionChange, ctx->compressionLevel-1); + DEBUG(2, "compression level too high\n"); + ctx->compressionLevel -= compressionSlowChange; } } + + /* reset */ + pthread_mutex_lock(&ctx->completion_mutex.pMutex); + ctx->createCompletionMeasured = 1; + ctx->compressionCompletionMeasured = 1; + ctx->writeCompletionMeasured = 1; + pthread_mutex_unlock(&ctx->completion_mutex.pMutex); + DEBUG(2, "\n"); } } @@ -410,21 +367,31 @@ static void* compressionThread(void* arg) unsigned const currJobIndex = currJob % ctx->numJobs; jobDescription* job = &ctx->jobs[currJobIndex]; DEBUG(3, "compressionThread(): waiting on job ready\n"); + + /* new job, reset completion */ + pthread_mutex_lock(&ctx->completion_mutex.pMutex); + ctx->compressionCompletion = 0; + pthread_mutex_unlock(&ctx->completion_mutex.pMutex); + pthread_mutex_lock(&ctx->jobReady_mutex.pMutex); while(currJob + 1 > ctx->jobReadyID && !ctx->threadError) { - pthread_mutex_lock(&ctx->stats_mutex.pMutex); - ctx->stats.waitReady++; - ctx->stats.readyCounter++; - pthread_mutex_unlock(&ctx->stats_mutex.pMutex); - reduceCounters(ctx); pthread_mutex_lock(&ctx->completion_mutex.pMutex); + /* compression thread is waiting, take measurements of write completion and read completion */ ctx->createCompletionMeasured = ctx->createCompletion; + ctx->writeCompletionMeasured = ctx->writeCompletion; + DEBUG(2, "compression thread waiting : createCompletionMeasured %f : writeCompletionMeasured %f\n", ctx->createCompletionMeasured, ctx->writeCompletionMeasured); DEBUG(3, "create completion: %f\n", ctx->createCompletion); pthread_mutex_unlock(&ctx->completion_mutex.pMutex); DEBUG(3, "waiting on job ready, nextJob: %u\n", currJob); pthread_cond_wait(&ctx->jobReady_cond.pCond, &ctx->jobReady_mutex.pMutex); } pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex); + + /* reset create completion */ + pthread_mutex_lock(&ctx->completion_mutex.pMutex); + ctx->createCompletion = 0; + pthread_mutex_unlock(&ctx->completion_mutex.pMutex); + DEBUG(3, "compressionThread(): continuing after job ready\n"); DEBUG(3, "DICTIONARY ENDED\n"); DEBUG(3, "%.*s", (int)job->src.size, (char*)job->src.start); @@ -497,7 +464,7 @@ static void* compressionThread(void* arg) /* update completion */ pthread_mutex_lock(&ctx->completion_mutex.pMutex); ctx->compressionCompletion = 1 - (double)remaining/job->src.size; - DEBUG(2, "update on job %u: compression completion %f\n", currJob, ctx->compressionCompletion); + DEBUG(3, "update on job %u: compression completion %f\n", currJob, ctx->compressionCompletion); pthread_mutex_unlock(&ctx->completion_mutex.pMutex); } } while (remaining != 0); @@ -547,21 +514,29 @@ static void* outputThread(void* arg) unsigned const currJobIndex = currJob % ctx->numJobs; jobDescription* job = &ctx->jobs[currJobIndex]; DEBUG(3, "outputThread(): waiting on job compressed\n"); + + /* new job, reset completion */ + pthread_mutex_lock(&ctx->completion_mutex.pMutex); + ctx->writeCompletion = 0; + pthread_mutex_unlock(&ctx->completion_mutex.pMutex); + pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex); while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) { - pthread_mutex_lock(&ctx->stats_mutex.pMutex); - ctx->stats.waitCompressed++; - ctx->stats.compressedCounter++; - pthread_mutex_unlock(&ctx->stats_mutex.pMutex); - reduceCounters(ctx); pthread_mutex_lock(&ctx->completion_mutex.pMutex); + /* write thread is waiting, take measurement of compression completion */ ctx->compressionCompletionMeasured = ctx->compressionCompletion; - DEBUG(2, "waited on job %u: compressionCompletion %f\n", currJob, ctx->compressionCompletion); + DEBUG(2, "write thread waiting : compressionCompletionMeasured %f\n", ctx->compressionCompletionMeasured); pthread_mutex_unlock(&ctx->completion_mutex.pMutex); DEBUG(3, "waiting on job compressed, nextJob: %u\n", currJob); pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex); } pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex); + + /* reset compression completion */ + pthread_mutex_lock(&ctx->completion_mutex.pMutex); + ctx->compressionCompletion = 0; + pthread_mutex_unlock(&ctx->completion_mutex.pMutex); + DEBUG(3, "outputThread(): continuing after job compressed\n"); { size_t const compressedSize = job->compressedSize; @@ -615,6 +590,7 @@ static void* outputThread(void* arg) pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex); break; } + } return arg; } @@ -628,19 +604,21 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last) pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex); DEBUG(3, "Creating new compression job -- nextJob: %u, jobCompressedID: %u, jobWriteID: %u, numJObs: %u\n", nextJob,ctx->jobCompressedID, ctx->jobWriteID, ctx->numJobs); while (nextJob - ctx->jobWriteID >= ctx->numJobs && !ctx->threadError) { - pthread_mutex_lock(&ctx->stats_mutex.pMutex); - ctx->stats.waitWrite++; - ctx->stats.writeCounter++; - pthread_mutex_unlock(&ctx->stats_mutex.pMutex); - reduceCounters(ctx); pthread_mutex_lock(&ctx->completion_mutex.pMutex); - ctx->writeCompletionMeasured = ctx->writeCompletion; + /* creation thread is waiting, take measurement of compression completion */ + ctx->compressionCompletionMeasured = ctx->compressionCompletion; + DEBUG(2, "creation thread waiting : compression completion measured : %f\n", ctx->compressionCompletionMeasured); DEBUG(3, "writeCompletion: %f\n", ctx->writeCompletion); pthread_mutex_unlock(&ctx->completion_mutex.pMutex); DEBUG(3, "waiting on job Write, nextJob: %u\n", nextJob); pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex); } pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex); + + /* reset write completion */ + pthread_mutex_lock(&ctx->completion_mutex.pMutex); + ctx->writeCompletion = 0; + pthread_mutex_unlock(&ctx->completion_mutex.pMutex); DEBUG(3, "createCompressionJob(): continuing after job write\n"); DEBUG(3, "filled: %zu, srcSize: %zu\n", ctx->input.filled, srcSize); @@ -677,14 +655,6 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last) return 0; } -static void printStats(cStat_t stats) -{ - DISPLAY("========STATISTICS========\n"); - DISPLAY("# times waited on job ready: %u\n", stats.waitReady); - DISPLAY("# times waited on job compressed: %u\n", stats.waitCompressed); - DISPLAY("# times waited on job Write: %u\n\n", stats.waitWrite); -} - static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadArg* otArg) { if (!ctx || !srcFile || !otArg) { @@ -710,48 +680,56 @@ static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadA return 1; } } + { + unsigned currJob = 0; + /* creating jobs */ + for ( ; ; ) { + size_t pos = 0; + size_t const readBlockSize = 1 << 15; + size_t remaining = FILE_CHUNK_SIZE; - /* creating jobs */ - for ( ; ; ) { - size_t pos = 0; - size_t const readBlockSize = 1 << 15; - size_t remaining = FILE_CHUNK_SIZE; - while (remaining != 0 && !feof(srcFile)) { - size_t const ret = fread(ctx->input.buffer.start + ctx->input.filled + pos, 1, readBlockSize, srcFile); - if (ret != readBlockSize && !feof(srcFile)) { - /* error could not read correct number of bytes */ + /* new job reset completion */ + pthread_mutex_lock(&ctx->completion_mutex.pMutex); + ctx->createCompletion = 0; + pthread_mutex_unlock(&ctx->completion_mutex.pMutex); + + while (remaining != 0 && !feof(srcFile)) { + size_t const ret = fread(ctx->input.buffer.start + ctx->input.filled + pos, 1, readBlockSize, srcFile); + if (ret != readBlockSize && !feof(srcFile)) { + /* error could not read correct number of bytes */ + DISPLAY("Error: problem occurred during read from src file\n"); + signalErrorToThreads(ctx); + return 1; + } + pos += ret; + remaining -= ret; + pthread_mutex_lock(&ctx->completion_mutex.pMutex); + ctx->createCompletion = 1 - (double)remaining/((size_t)FILE_CHUNK_SIZE); + DEBUG(3, "create completion: %f\n", ctx->createCompletion); + pthread_mutex_unlock(&ctx->completion_mutex.pMutex); + } + if (remaining != 0 && !feof(srcFile)) { DISPLAY("Error: problem occurred during read from src file\n"); signalErrorToThreads(ctx); return 1; } - pos += ret; - remaining -= ret; - pthread_mutex_lock(&ctx->completion_mutex.pMutex); - ctx->createCompletion = 1 - (double)remaining/((size_t)FILE_CHUNK_SIZE); - DEBUG(3, "create completion: %f\n", ctx->createCompletion); - pthread_mutex_unlock(&ctx->completion_mutex.pMutex); - } - if (remaining != 0 && !feof(srcFile)) { - DISPLAY("Error: problem occurred during read from src file\n"); - signalErrorToThreads(ctx); - return 1; - } - g_streamedSize += pos; - /* reading was fine, now create the compression job */ - { - int const last = feof(srcFile); - int const error = createCompressionJob(ctx, pos, last); - if (error != 0) { - signalErrorToThreads(ctx); - return error; + g_streamedSize += pos; + /* reading was fine, now create the compression job */ + { + int const last = feof(srcFile); + int const error = createCompressionJob(ctx, pos, last); + if (error != 0) { + signalErrorToThreads(ctx); + return error; + } + } + currJob++; + if (feof(srcFile)) { + DEBUG(3, "THE STREAM OF DATA ENDED %u\n", ctx->nextJobID); + break; } } - if (feof(srcFile)) { - DEBUG(3, "THE STREAM OF DATA ENDED %u\n", ctx->nextJobID); - break; - } } - /* success -- created all jobs */ return 0; } @@ -803,9 +781,6 @@ static int freeFileCompressionResources(fcResources* fcr) { int ret = 0; waitUntilAllJobsCompleted(fcr->ctx); - pthread_mutex_lock(&fcr->ctx->stats_mutex.pMutex); - if (g_displayStats) printStats(fcr->ctx->stats); - pthread_mutex_unlock(&fcr->ctx->stats_mutex.pMutex); ret |= (fcr->srcFile != NULL) ? fclose(fcr->srcFile) : 0; ret |= (fcr->ctx != NULL) ? freeCCtx(fcr->ctx) : 0; if (fcr->otArg) { @@ -873,7 +848,6 @@ static void help() PRINT(" -oFILE : specify the output file name\n"); PRINT(" -v : display debug information\n"); PRINT(" -i# : provide initial compression level\n"); - PRINT(" -s : display information stats\n"); PRINT(" -h : display help/information\n"); PRINT(" -f : force the compression level to stay constant\n"); } @@ -913,9 +887,6 @@ int main(int argCount, const char* argv[]) g_compressionLevel = readU32FromChar(&argument); DEBUG(3, "g_compressionLevel: %u\n", g_compressionLevel); break; - case 's': - g_displayStats = 1; - break; case 'h': help(); goto _main_exit;