added progress check for filewriting, put important shared data behind mutex when being read from/written to
This commit is contained in:
parent
a34bc30237
commit
ad66faf16a
@ -88,7 +88,9 @@ typedef struct {
|
|||||||
unsigned allJobsCompleted;
|
unsigned allJobsCompleted;
|
||||||
unsigned adaptParam;
|
unsigned adaptParam;
|
||||||
unsigned compressionCompletionMeasured;
|
unsigned compressionCompletionMeasured;
|
||||||
|
unsigned writeCompletionMeasured;
|
||||||
double compressionCompletion;
|
double compressionCompletion;
|
||||||
|
double writeCompletion;
|
||||||
mutex_t jobCompressed_mutex;
|
mutex_t jobCompressed_mutex;
|
||||||
cond_t jobCompressed_cond;
|
cond_t jobCompressed_cond;
|
||||||
mutex_t jobReady_mutex;
|
mutex_t jobReady_mutex;
|
||||||
@ -97,6 +99,8 @@ typedef struct {
|
|||||||
cond_t allJobsCompleted_cond;
|
cond_t allJobsCompleted_cond;
|
||||||
mutex_t jobWrite_mutex;
|
mutex_t jobWrite_mutex;
|
||||||
cond_t jobWrite_cond;
|
cond_t jobWrite_cond;
|
||||||
|
mutex_t completion_mutex;
|
||||||
|
mutex_t stats_mutex;
|
||||||
size_t lastDictSize;
|
size_t lastDictSize;
|
||||||
inBuff_t input;
|
inBuff_t input;
|
||||||
cStat_t stats;
|
cStat_t stats;
|
||||||
@ -156,6 +160,8 @@ static int freeCCtx(adaptCCtx* ctx)
|
|||||||
error |= destroyCond(&ctx->allJobsCompleted_cond);
|
error |= destroyCond(&ctx->allJobsCompleted_cond);
|
||||||
error |= destroyMutex(&ctx->jobWrite_mutex);
|
error |= destroyMutex(&ctx->jobWrite_mutex);
|
||||||
error |= destroyCond(&ctx->jobWrite_cond);
|
error |= destroyCond(&ctx->jobWrite_cond);
|
||||||
|
error |= destroyMutex(&ctx->completion_mutex);
|
||||||
|
error |= destroyMutex(&ctx->stats_mutex);
|
||||||
error |= ZSTD_isError(ZSTD_freeCCtx(ctx->cctx));
|
error |= ZSTD_isError(ZSTD_freeCCtx(ctx->cctx));
|
||||||
free(ctx->input.buffer.start);
|
free(ctx->input.buffer.start);
|
||||||
if (ctx->jobs){
|
if (ctx->jobs){
|
||||||
@ -200,6 +206,8 @@ static adaptCCtx* createCCtx(unsigned numJobs)
|
|||||||
pthreadError |= initCond(&ctx->allJobsCompleted_cond);
|
pthreadError |= initCond(&ctx->allJobsCompleted_cond);
|
||||||
pthreadError |= initMutex(&ctx->jobWrite_mutex);
|
pthreadError |= initMutex(&ctx->jobWrite_mutex);
|
||||||
pthreadError |= initCond(&ctx->jobWrite_cond);
|
pthreadError |= initCond(&ctx->jobWrite_cond);
|
||||||
|
pthreadError |= initMutex(&ctx->completion_mutex);
|
||||||
|
pthreadError |= initMutex(&ctx->stats_mutex);
|
||||||
if (pthreadError) return NULL;
|
if (pthreadError) return NULL;
|
||||||
}
|
}
|
||||||
ctx->numJobs = numJobs;
|
ctx->numJobs = numJobs;
|
||||||
@ -315,24 +323,44 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
|
|||||||
}
|
}
|
||||||
else if ((writeSlow || createSlow) && ctx->compressionLevel < (unsigned)ZSTD_maxCLevel()) {
|
else if ((writeSlow || createSlow) && ctx->compressionLevel < (unsigned)ZSTD_maxCLevel()) {
|
||||||
DEBUG(3, "increasing compression level %u\n", ctx->compressionLevel);
|
DEBUG(3, "increasing compression level %u\n", ctx->compressionLevel);
|
||||||
ctx->compressionLevel++;
|
double completion;
|
||||||
reset = 1;
|
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
|
||||||
|
completion = ctx->writeCompletion;
|
||||||
|
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
|
||||||
|
{
|
||||||
|
unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE - 1)) + 1;
|
||||||
|
unsigned const change = writeSlow ? MIN(maxChange, ZSTD_maxCLevel() - ctx->compressionLevel) : 1;
|
||||||
|
DEBUG(2, "writeSlow: %u, change: %u\n", writeSlow, change);
|
||||||
|
DEBUG(2, "write completion: %f\n", completion);
|
||||||
|
ctx->compressionLevel += change;
|
||||||
|
reset = 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else if (compressSlow && ctx->compressionLevel > 1) {
|
else if (compressSlow && ctx->compressionLevel > 1) {
|
||||||
double const completion = ctx->compressionCompletion;
|
double completion;
|
||||||
unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE-1)) + 1;
|
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
|
||||||
unsigned const change = MIN(maxChange, ctx->compressionLevel - 1);
|
completion = ctx->compressionCompletion;
|
||||||
DEBUG(3, "decreasing compression level %u\n", ctx->compressionLevel);
|
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
|
||||||
DEBUG(3, "completion: %f\n", completion);
|
{
|
||||||
ctx->compressionLevel -= change;
|
unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE-1)) + 1;
|
||||||
reset = 1;
|
unsigned const change = MIN(maxChange, ctx->compressionLevel - 1);
|
||||||
|
DEBUG(3, "decreasing compression level %u\n", ctx->compressionLevel);
|
||||||
|
DEBUG(3, "completion: %f\n", completion);
|
||||||
|
ctx->compressionLevel -= change;
|
||||||
|
reset = 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (reset) {
|
if (reset) {
|
||||||
ctx->stats.readyCounter = 0;
|
ctx->stats.readyCounter = 0;
|
||||||
ctx->stats.writeCounter = 0;
|
ctx->stats.writeCounter = 0;
|
||||||
ctx->stats.compressedCounter = 0;
|
ctx->stats.compressedCounter = 0;
|
||||||
|
|
||||||
|
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
|
||||||
ctx->compressionCompletion = 1;
|
ctx->compressionCompletion = 1;
|
||||||
ctx->compressionCompletionMeasured = 0;
|
ctx->compressionCompletionMeasured = 0;
|
||||||
|
ctx->writeCompletion = 1;
|
||||||
|
ctx->writeCompletionMeasured = 0;
|
||||||
|
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -455,12 +483,14 @@ static void* outputThread(void* arg)
|
|||||||
ctx->stats.waitCompressed++;
|
ctx->stats.waitCompressed++;
|
||||||
ctx->stats.compressedCounter++;
|
ctx->stats.compressedCounter++;
|
||||||
reduceCounters(ctx);
|
reduceCounters(ctx);
|
||||||
|
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
|
||||||
if (!ctx->compressionCompletionMeasured) {
|
if (!ctx->compressionCompletionMeasured) {
|
||||||
ctx->compressionCompletion = ZSTD_getCompletion(ctx->cctx);
|
ctx->compressionCompletion = ZSTD_getCompletion(ctx->cctx);
|
||||||
ctx->compressionCompletionMeasured = 1;
|
ctx->compressionCompletionMeasured = 1;
|
||||||
|
DEBUG(3, "output detected completion: %f\n", ctx->compressionCompletion);
|
||||||
}
|
}
|
||||||
|
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
|
||||||
adaptCompressionLevel(ctx);
|
adaptCompressionLevel(ctx);
|
||||||
DEBUG(3, "output detected completion: %f\n", ctx->compressionCompletion);
|
|
||||||
DEBUG(3, "waiting on job compressed, nextJob: %u\n", currJob);
|
DEBUG(3, "waiting on job compressed, nextJob: %u\n", currJob);
|
||||||
pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
|
pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
|
||||||
}
|
}
|
||||||
@ -484,6 +514,14 @@ static void* outputThread(void* arg)
|
|||||||
if (ret != writeSize) break;
|
if (ret != writeSize) break;
|
||||||
pos += ret;
|
pos += ret;
|
||||||
remaining -= ret;
|
remaining -= ret;
|
||||||
|
|
||||||
|
/* update completion variable for writing */
|
||||||
|
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
|
||||||
|
if (!ctx->writeCompletionMeasured) {
|
||||||
|
ctx->writeCompletion = 1 - (double)remaining/compressedSize;
|
||||||
|
}
|
||||||
|
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
|
||||||
|
|
||||||
if (remaining == 0) break;
|
if (remaining == 0) break;
|
||||||
}
|
}
|
||||||
if (pos != compressedSize) {
|
if (pos != compressedSize) {
|
||||||
@ -528,12 +566,10 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
|
|||||||
ctx->stats.waitWrite++;
|
ctx->stats.waitWrite++;
|
||||||
ctx->stats.writeCounter++;
|
ctx->stats.writeCounter++;
|
||||||
reduceCounters(ctx);
|
reduceCounters(ctx);
|
||||||
if (!ctx->compressionCompletion) {
|
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
|
||||||
ctx->compressionCompletion = ZSTD_getCompletion(ctx->cctx);
|
ctx->writeCompletionMeasured = 1;
|
||||||
ctx->compressionCompletionMeasured = 1;
|
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
|
||||||
}
|
|
||||||
adaptCompressionLevel(ctx);
|
adaptCompressionLevel(ctx);
|
||||||
DEBUG(3, "job creation detected completion %f\n", ctx->compressionCompletion);
|
|
||||||
DEBUG(3, "waiting on job Write, nextJob: %u\n", nextJob);
|
DEBUG(3, "waiting on job Write, nextJob: %u\n", nextJob);
|
||||||
pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex);
|
pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex);
|
||||||
}
|
}
|
||||||
@ -552,10 +588,7 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
|
|||||||
ctx->input.buffer.start = copy;
|
ctx->input.buffer.start = copy;
|
||||||
}
|
}
|
||||||
job->dictSize = ctx->lastDictSize;
|
job->dictSize = ctx->lastDictSize;
|
||||||
pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
|
|
||||||
ctx->jobReadyID++;
|
|
||||||
pthread_cond_signal(&ctx->jobReady_cond.pCond);
|
|
||||||
pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
|
|
||||||
DEBUG(3, "finished job creation %u\n", nextJob);
|
DEBUG(3, "finished job creation %u\n", nextJob);
|
||||||
ctx->nextJobID++;
|
ctx->nextJobID++;
|
||||||
DEBUG(3, "filled: %zu, srcSize: %zu\n", ctx->input.filled, srcSize);
|
DEBUG(3, "filled: %zu, srcSize: %zu\n", ctx->input.filled, srcSize);
|
||||||
@ -567,6 +600,13 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
|
|||||||
ctx->lastDictSize = srcSize;
|
ctx->lastDictSize = srcSize;
|
||||||
ctx->input.filled = srcSize;
|
ctx->input.filled = srcSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* signal job ready */
|
||||||
|
pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
|
||||||
|
ctx->jobReadyID++;
|
||||||
|
pthread_cond_signal(&ctx->jobReady_cond.pCond);
|
||||||
|
pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user