reworked adaptCompressionLevel to only account for completion information

dev
Paul Cruz 2017-07-20 16:19:16 -07:00
parent 7ab758a640
commit a19916425d
1 changed files with 119 additions and 148 deletions

View File

@ -29,7 +29,6 @@
static int g_displayLevel = DEFAULT_DISPLAY_LEVEL;
static unsigned g_compressionLevel = DEFAULT_COMPRESSION_LEVEL;
static unsigned g_displayStats = 0;
static UTIL_time_t g_startTime;
static size_t g_streamedSize = 0;
static unsigned g_useProgressBar = 0;
@ -47,15 +46,6 @@ typedef struct {
buffer_t buffer;
} inBuff_t;
typedef struct {
unsigned waitCompressed;
unsigned waitReady;
unsigned waitWrite;
unsigned readyCounter;
unsigned compressedCounter;
unsigned writeCounter;
} cStat_t;
typedef struct {
buffer_t src;
buffer_t dst;
@ -102,10 +92,9 @@ typedef struct {
mutex_t jobWrite_mutex;
cond_t jobWrite_cond;
mutex_t completion_mutex;
mutex_t stats_mutex;
mutex_t wait_mutex;
size_t lastDictSize;
inBuff_t input;
cStat_t stats;
jobDescription* jobs;
ZSTD_CCtx* cctx;
} adaptCCtx;
@ -163,7 +152,7 @@ static int freeCCtx(adaptCCtx* ctx)
error |= destroyMutex(&ctx->jobWrite_mutex);
error |= destroyCond(&ctx->jobWrite_cond);
error |= destroyMutex(&ctx->completion_mutex);
error |= destroyMutex(&ctx->stats_mutex);
error |= destroyMutex(&ctx->wait_mutex);
error |= ZSTD_isError(ZSTD_freeCCtx(ctx->cctx));
free(ctx->input.buffer.start);
if (ctx->jobs){
@ -203,7 +192,7 @@ static int initCCtx(adaptCCtx* ctx, unsigned numJobs)
pthreadError |= initMutex(&ctx->jobWrite_mutex);
pthreadError |= initCond(&ctx->jobWrite_cond);
pthreadError |= initMutex(&ctx->completion_mutex);
pthreadError |= initMutex(&ctx->stats_mutex);
pthreadError |= initMutex(&ctx->wait_mutex);
if (pthreadError) return pthreadError;
}
ctx->numJobs = numJobs;
@ -211,6 +200,10 @@ static int initCCtx(adaptCCtx* ctx, unsigned numJobs)
ctx->jobCompressedID = 0;
ctx->jobWriteID = 0;
ctx->lastDictSize = 0;
ctx->createCompletionMeasured = 1;
ctx->compressionCompletionMeasured = 1;
ctx->writeCompletionMeasured = 1;
ctx->jobs = calloc(1, numJobs*sizeof(jobDescription));
if (!ctx->jobs) {
@ -305,17 +298,6 @@ static void waitUntilAllJobsCompleted(adaptCCtx* ctx)
pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex);
}
/* this function normalizes counters when compression level is changing */
static void reduceCounters(adaptCCtx* ctx)
{
pthread_mutex_lock(&ctx->stats_mutex.pMutex);
unsigned const min = MIN(ctx->stats.compressedCounter, MIN(ctx->stats.writeCounter, ctx->stats.readyCounter));
ctx->stats.writeCounter -= min;
ctx->stats.compressedCounter -= min;
ctx->stats.readyCounter -= min;
pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
}
/*
* Compression level is changed depending on which part of the compression process is lagging
* Currently, three theads exist for job creation, compression, and file writing respectively.
@ -330,67 +312,42 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
ctx->compressionLevel = g_compressionLevel;
}
else {
unsigned reset = 0;
unsigned allSlow;
unsigned compressWaiting;
unsigned writeWaiting;
unsigned createWaiting;
DEBUG(2, "compression level %u\n", ctx->compressionLevel);
/* check if compression is too slow */
unsigned createChange;
unsigned writeChange;
unsigned compressionChange;
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
createChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->createCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE;
writeChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->writeCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE;
compressionChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->compressionCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE;
DEBUG(2, "createCompletionMeasured %f\n", ctx->createCompletionMeasured);
DEBUG(2, "compressionCompletionMeasured %f\n", ctx->compressionCompletionMeasured);
DEBUG(2, "writeCompletionMeasured %f\n", ctx->writeCompletionMeasured);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
pthread_mutex_lock(&ctx->stats_mutex.pMutex);
allSlow = ctx->adaptParam < ctx->stats.compressedCounter && ctx->adaptParam < ctx->stats.writeCounter && ctx->adaptParam < ctx->stats.readyCounter;
compressWaiting = ctx->adaptParam < ctx->stats.readyCounter;
writeWaiting = ctx->adaptParam < ctx->stats.compressedCounter;
createWaiting = ctx->adaptParam < ctx->stats.writeCounter;
pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
DEBUG(2, "createWaiting %u\n", createWaiting);
DEBUG(2, "compressWaiting %u\n", compressWaiting);
DEBUG(2, "writeWaiting %u\n\n", writeWaiting);
{
unsigned const writeSlow = (compressWaiting && createWaiting);
unsigned const compressSlow = (writeWaiting && createWaiting);
unsigned const createSlow = (compressWaiting && writeWaiting);
DEBUG(3, "createWaiting: %u, compressWaiting: %u, writeWaiting: %u\n", createWaiting, compressWaiting, writeWaiting);
if (allSlow) {
reset = 1;
unsigned const compressionFastChange = MIN(MIN(createChange, writeChange), ZSTD_maxCLevel() - ctx->compressionLevel);
DEBUG(2, "compressionFastChange %u\n", compressionFastChange);
if (compressionFastChange) {
DEBUG(2, "compression level too low\n");
ctx->compressionLevel += compressionFastChange;
}
else if ((writeSlow || createSlow) && ctx->compressionLevel < (unsigned)ZSTD_maxCLevel()) {
DEBUG(2, "increasing compression level %u\n", ctx->compressionLevel);
double completion;
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
completion = writeSlow ? ctx->writeCompletionMeasured : ctx->createCompletionMeasured;
DEBUG(2, "write completion: %f, create completion: %f\n", ctx->writeCompletionMeasured, ctx->createCompletionMeasured);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
{
unsigned const maxChange = MAX_COMPRESSION_LEVEL_CHANGE - (unsigned)(completion*MAX_COMPRESSION_LEVEL_CHANGE);
unsigned const change = MIN(maxChange, ZSTD_maxCLevel() - ctx->compressionLevel);
DEBUG(3, "writeSlow: %u, change: %u\n", writeSlow, change);
DEBUG(3, "write completion: %f\n", completion);
ctx->compressionLevel += change;
reset = 1;
}
}
else if (compressSlow && ctx->compressionLevel > 1) {
double completion;
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
completion = ctx->compressionCompletionMeasured;
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
{
unsigned const maxChange = MAX_COMPRESSION_LEVEL_CHANGE - (unsigned)(completion*MAX_COMPRESSION_LEVEL_CHANGE);
unsigned const change = MIN(maxChange, ctx->compressionLevel - 1);
DEBUG(2, "decreasing compression level %u\n", ctx->compressionLevel);
DEBUG(2, "completion: %f\n", completion);
ctx->compressionLevel -= change;
reset = 1;
}
}
if (reset) {
pthread_mutex_lock(&ctx->stats_mutex.pMutex);
ctx->stats.readyCounter = 0;
ctx->stats.writeCounter = 0;
ctx->stats.compressedCounter = 0;
pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
else {
unsigned const compressionSlowChange = MIN(compressionChange, ctx->compressionLevel-1);
DEBUG(2, "compression level too high\n");
ctx->compressionLevel -= compressionSlowChange;
}
}
/* reset */
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->createCompletionMeasured = 1;
ctx->compressionCompletionMeasured = 1;
ctx->writeCompletionMeasured = 1;
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
DEBUG(2, "\n");
}
}
@ -410,21 +367,31 @@ static void* compressionThread(void* arg)
unsigned const currJobIndex = currJob % ctx->numJobs;
jobDescription* job = &ctx->jobs[currJobIndex];
DEBUG(3, "compressionThread(): waiting on job ready\n");
/* new job, reset completion */
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->compressionCompletion = 0;
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
while(currJob + 1 > ctx->jobReadyID && !ctx->threadError) {
pthread_mutex_lock(&ctx->stats_mutex.pMutex);
ctx->stats.waitReady++;
ctx->stats.readyCounter++;
pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
reduceCounters(ctx);
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
/* compression thread is waiting, take measurements of write completion and read completion */
ctx->createCompletionMeasured = ctx->createCompletion;
ctx->writeCompletionMeasured = ctx->writeCompletion;
DEBUG(2, "compression thread waiting : createCompletionMeasured %f : writeCompletionMeasured %f\n", ctx->createCompletionMeasured, ctx->writeCompletionMeasured);
DEBUG(3, "create completion: %f\n", ctx->createCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
DEBUG(3, "waiting on job ready, nextJob: %u\n", currJob);
pthread_cond_wait(&ctx->jobReady_cond.pCond, &ctx->jobReady_mutex.pMutex);
}
pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
/* reset create completion */
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->createCompletion = 0;
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
DEBUG(3, "compressionThread(): continuing after job ready\n");
DEBUG(3, "DICTIONARY ENDED\n");
DEBUG(3, "%.*s", (int)job->src.size, (char*)job->src.start);
@ -497,7 +464,7 @@ static void* compressionThread(void* arg)
/* update completion */
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->compressionCompletion = 1 - (double)remaining/job->src.size;
DEBUG(2, "update on job %u: compression completion %f\n", currJob, ctx->compressionCompletion);
DEBUG(3, "update on job %u: compression completion %f\n", currJob, ctx->compressionCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
}
} while (remaining != 0);
@ -547,21 +514,29 @@ static void* outputThread(void* arg)
unsigned const currJobIndex = currJob % ctx->numJobs;
jobDescription* job = &ctx->jobs[currJobIndex];
DEBUG(3, "outputThread(): waiting on job compressed\n");
/* new job, reset completion */
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->writeCompletion = 0;
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) {
pthread_mutex_lock(&ctx->stats_mutex.pMutex);
ctx->stats.waitCompressed++;
ctx->stats.compressedCounter++;
pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
reduceCounters(ctx);
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
/* write thread is waiting, take measurement of compression completion */
ctx->compressionCompletionMeasured = ctx->compressionCompletion;
DEBUG(2, "waited on job %u: compressionCompletion %f\n", currJob, ctx->compressionCompletion);
DEBUG(2, "write thread waiting : compressionCompletionMeasured %f\n", ctx->compressionCompletionMeasured);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
DEBUG(3, "waiting on job compressed, nextJob: %u\n", currJob);
pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
}
pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
/* reset compression completion */
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->compressionCompletion = 0;
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
DEBUG(3, "outputThread(): continuing after job compressed\n");
{
size_t const compressedSize = job->compressedSize;
@ -615,6 +590,7 @@ static void* outputThread(void* arg)
pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex);
break;
}
}
return arg;
}
@ -628,19 +604,21 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
DEBUG(3, "Creating new compression job -- nextJob: %u, jobCompressedID: %u, jobWriteID: %u, numJObs: %u\n", nextJob,ctx->jobCompressedID, ctx->jobWriteID, ctx->numJobs);
while (nextJob - ctx->jobWriteID >= ctx->numJobs && !ctx->threadError) {
pthread_mutex_lock(&ctx->stats_mutex.pMutex);
ctx->stats.waitWrite++;
ctx->stats.writeCounter++;
pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
reduceCounters(ctx);
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->writeCompletionMeasured = ctx->writeCompletion;
/* creation thread is waiting, take measurement of compression completion */
ctx->compressionCompletionMeasured = ctx->compressionCompletion;
DEBUG(2, "creation thread waiting : compression completion measured : %f\n", ctx->compressionCompletionMeasured);
DEBUG(3, "writeCompletion: %f\n", ctx->writeCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
DEBUG(3, "waiting on job Write, nextJob: %u\n", nextJob);
pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex);
}
pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
/* reset write completion */
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->writeCompletion = 0;
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
DEBUG(3, "createCompressionJob(): continuing after job write\n");
DEBUG(3, "filled: %zu, srcSize: %zu\n", ctx->input.filled, srcSize);
@ -677,14 +655,6 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
return 0;
}
static void printStats(cStat_t stats)
{
DISPLAY("========STATISTICS========\n");
DISPLAY("# times waited on job ready: %u\n", stats.waitReady);
DISPLAY("# times waited on job compressed: %u\n", stats.waitCompressed);
DISPLAY("# times waited on job Write: %u\n\n", stats.waitWrite);
}
static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadArg* otArg)
{
if (!ctx || !srcFile || !otArg) {
@ -710,48 +680,56 @@ static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadA
return 1;
}
}
{
unsigned currJob = 0;
/* creating jobs */
for ( ; ; ) {
size_t pos = 0;
size_t const readBlockSize = 1 << 15;
size_t remaining = FILE_CHUNK_SIZE;
/* creating jobs */
for ( ; ; ) {
size_t pos = 0;
size_t const readBlockSize = 1 << 15;
size_t remaining = FILE_CHUNK_SIZE;
while (remaining != 0 && !feof(srcFile)) {
size_t const ret = fread(ctx->input.buffer.start + ctx->input.filled + pos, 1, readBlockSize, srcFile);
if (ret != readBlockSize && !feof(srcFile)) {
/* error could not read correct number of bytes */
/* new job reset completion */
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->createCompletion = 0;
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
while (remaining != 0 && !feof(srcFile)) {
size_t const ret = fread(ctx->input.buffer.start + ctx->input.filled + pos, 1, readBlockSize, srcFile);
if (ret != readBlockSize && !feof(srcFile)) {
/* error could not read correct number of bytes */
DISPLAY("Error: problem occurred during read from src file\n");
signalErrorToThreads(ctx);
return 1;
}
pos += ret;
remaining -= ret;
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->createCompletion = 1 - (double)remaining/((size_t)FILE_CHUNK_SIZE);
DEBUG(3, "create completion: %f\n", ctx->createCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
}
if (remaining != 0 && !feof(srcFile)) {
DISPLAY("Error: problem occurred during read from src file\n");
signalErrorToThreads(ctx);
return 1;
}
pos += ret;
remaining -= ret;
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->createCompletion = 1 - (double)remaining/((size_t)FILE_CHUNK_SIZE);
DEBUG(3, "create completion: %f\n", ctx->createCompletion);
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
}
if (remaining != 0 && !feof(srcFile)) {
DISPLAY("Error: problem occurred during read from src file\n");
signalErrorToThreads(ctx);
return 1;
}
g_streamedSize += pos;
/* reading was fine, now create the compression job */
{
int const last = feof(srcFile);
int const error = createCompressionJob(ctx, pos, last);
if (error != 0) {
signalErrorToThreads(ctx);
return error;
g_streamedSize += pos;
/* reading was fine, now create the compression job */
{
int const last = feof(srcFile);
int const error = createCompressionJob(ctx, pos, last);
if (error != 0) {
signalErrorToThreads(ctx);
return error;
}
}
currJob++;
if (feof(srcFile)) {
DEBUG(3, "THE STREAM OF DATA ENDED %u\n", ctx->nextJobID);
break;
}
}
if (feof(srcFile)) {
DEBUG(3, "THE STREAM OF DATA ENDED %u\n", ctx->nextJobID);
break;
}
}
/* success -- created all jobs */
return 0;
}
@ -803,9 +781,6 @@ static int freeFileCompressionResources(fcResources* fcr)
{
int ret = 0;
waitUntilAllJobsCompleted(fcr->ctx);
pthread_mutex_lock(&fcr->ctx->stats_mutex.pMutex);
if (g_displayStats) printStats(fcr->ctx->stats);
pthread_mutex_unlock(&fcr->ctx->stats_mutex.pMutex);
ret |= (fcr->srcFile != NULL) ? fclose(fcr->srcFile) : 0;
ret |= (fcr->ctx != NULL) ? freeCCtx(fcr->ctx) : 0;
if (fcr->otArg) {
@ -873,7 +848,6 @@ static void help()
PRINT(" -oFILE : specify the output file name\n");
PRINT(" -v : display debug information\n");
PRINT(" -i# : provide initial compression level\n");
PRINT(" -s : display information stats\n");
PRINT(" -h : display help/information\n");
PRINT(" -f : force the compression level to stay constant\n");
}
@ -913,9 +887,6 @@ int main(int argCount, const char* argv[])
g_compressionLevel = readU32FromChar(&argument);
DEBUG(3, "g_compressionLevel: %u\n", g_compressionLevel);
break;
case 's':
g_displayStats = 1;
break;
case 'h':
help();
goto _main_exit;