added mutex for stats struct

This commit is contained in:
Paul Cruz 2017-07-18 15:55:58 -07:00
parent ad66faf16a
commit 2c4e4ddc50

View File

@ -288,10 +288,12 @@ static void waitUntilAllJobsCompleted(adaptCCtx* ctx)
/* this function normalizes counters when compression level is changing */
static void reduceCounters(adaptCCtx* ctx)
{
pthread_mutex_lock(&ctx->stats_mutex.pMutex);
unsigned const min = MIN(ctx->stats.compressedCounter, MIN(ctx->stats.writeCounter, ctx->stats.readyCounter));
ctx->stats.writeCounter -= min;
ctx->stats.compressedCounter -= min;
ctx->stats.readyCounter -= min;
pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
}
/*
@ -309,15 +311,22 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
}
else {
unsigned reset = 0;
unsigned const allSlow = ctx->adaptParam < ctx->stats.compressedCounter && ctx->adaptParam < ctx->stats.writeCounter && ctx->adaptParam < ctx->stats.readyCounter;
unsigned const compressWaiting = ctx->adaptParam < ctx->stats.readyCounter;
unsigned const writeWaiting = ctx->adaptParam < ctx->stats.compressedCounter;
unsigned const createWaiting = ctx->adaptParam < ctx->stats.writeCounter;
unsigned allSlow;
unsigned compressWaiting;
unsigned writeWaiting;
unsigned createWaiting;
pthread_mutex_lock(&ctx->stats_mutex.pMutex);
allSlow = ctx->adaptParam < ctx->stats.compressedCounter && ctx->adaptParam < ctx->stats.writeCounter && ctx->adaptParam < ctx->stats.readyCounter;
compressWaiting = ctx->adaptParam < ctx->stats.readyCounter;
writeWaiting = ctx->adaptParam < ctx->stats.compressedCounter;
createWaiting = ctx->adaptParam < ctx->stats.writeCounter;
pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
{
unsigned const writeSlow = (compressWaiting && createWaiting);
unsigned const compressSlow = (writeWaiting && createWaiting);
unsigned const createSlow = (compressWaiting && writeWaiting);
DEBUG(2, "createWaiting: %u, compressWaiting: %u, writeWaiting: %u\n", createWaiting, compressWaiting, writeWaiting);
DEBUG(2, "ready: %u compressed: %u write: %u\n", ctx->stats.readyCounter, ctx->stats.compressedCounter, ctx->stats.writeCounter);
if (allSlow) {
reset = 1;
}
@ -351,9 +360,11 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
}
}
if (reset) {
pthread_mutex_lock(&ctx->stats_mutex.pMutex);
ctx->stats.readyCounter = 0;
ctx->stats.writeCounter = 0;
ctx->stats.compressedCounter = 0;
pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->compressionCompletion = 1;
@ -363,6 +374,7 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
}
}
}
}
static size_t getUseableDictSize(unsigned compressionLevel)
@ -383,8 +395,10 @@ static void* compressionThread(void* arg)
DEBUG(3, "compressionThread(): waiting on job ready\n");
pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
while(currJob + 1 > ctx->jobReadyID && !ctx->threadError) {
pthread_mutex_lock(&ctx->stats_mutex.pMutex);
ctx->stats.waitReady++;
ctx->stats.readyCounter++;
pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
reduceCounters(ctx);
adaptCompressionLevel(ctx);
DEBUG(3, "waiting on job ready, nextJob: %u\n", currJob);
@ -480,8 +494,10 @@ static void* outputThread(void* arg)
DEBUG(3, "outputThread(): waiting on job compressed\n");
pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) {
pthread_mutex_lock(&ctx->stats_mutex.pMutex);
ctx->stats.waitCompressed++;
ctx->stats.compressedCounter++;
pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
reduceCounters(ctx);
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
if (!ctx->compressionCompletionMeasured) {
@ -563,8 +579,10 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
DEBUG(3, "Creating new compression job -- nextJob: %u, jobCompressedID: %u, jobWriteID: %u, numJObs: %u\n", nextJob,ctx->jobCompressedID, ctx->jobWriteID, ctx->numJobs);
while (nextJob - ctx->jobWriteID >= ctx->numJobs && !ctx->threadError) {
pthread_mutex_lock(&ctx->stats_mutex.pMutex);
ctx->stats.waitWrite++;
ctx->stats.writeCounter++;
pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
reduceCounters(ctx);
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->writeCompletionMeasured = 1;
@ -719,7 +737,9 @@ static int freeFileCompressionResources(fcResources* fcr)
{
int ret = 0;
waitUntilAllJobsCompleted(fcr->ctx);
pthread_mutex_lock(&fcr->ctx->stats_mutex.pMutex);
if (g_displayStats) printStats(fcr->ctx->stats);
pthread_mutex_unlock(&fcr->ctx->stats_mutex.pMutex);
ret |= (fcr->srcFile != NULL) ? fclose(fcr->srcFile) : 0;
ret |= (fcr->ctx != NULL) ? freeCCtx(fcr->ctx) : 0;
if (fcr->otArg) {