rename completion variable, split up fwrite operations in order to track progress

dev
Paul Cruz 2017-07-18 13:30:29 -07:00
parent ae47eab2fd
commit 29c36cf051
1 changed files with 26 additions and 15 deletions

View File

@ -87,8 +87,8 @@ typedef struct {
unsigned jobWriteID;
unsigned allJobsCompleted;
unsigned adaptParam;
unsigned completionMeasured;
double completion;
unsigned compressionCompletionMeasured;
double compressionCompletion;
mutex_t jobCompressed_mutex;
cond_t jobCompressed_cond;
mutex_t jobReady_mutex;
@ -319,7 +319,7 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
reset = 1;
}
else if (compressSlow && ctx->compressionLevel > 1) {
double const completion = ctx->completion;
double const completion = ctx->compressionCompletion;
unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE-1)) + 1;
unsigned const change = MIN(maxChange, ctx->compressionLevel - 1);
DEBUG(3, "decreasing compression level %u\n", ctx->compressionLevel);
@ -331,8 +331,8 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
ctx->stats.readyCounter = 0;
ctx->stats.writeCounter = 0;
ctx->stats.compressedCounter = 0;
ctx->completion = 1;
ctx->completionMeasured = 0;
ctx->compressionCompletion = 1;
ctx->compressionCompletionMeasured = 0;
}
}
}
@ -455,12 +455,12 @@ static void* outputThread(void* arg)
ctx->stats.waitCompressed++;
ctx->stats.compressedCounter++;
reduceCounters(ctx);
if (!ctx->completionMeasured) {
ctx->completion = ZSTD_getCompletion(ctx->cctx);
ctx->completionMeasured = 1;
if (!ctx->compressionCompletionMeasured) {
ctx->compressionCompletion = ZSTD_getCompletion(ctx->cctx);
ctx->compressionCompletionMeasured = 1;
}
adaptCompressionLevel(ctx);
DEBUG(3, "output detected completion: %f\n", ctx->completion);
DEBUG(3, "output detected completion: %f\n", ctx->compressionCompletion);
DEBUG(3, "waiting on job compressed, nextJob: %u\n", currJob);
pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
}
@ -468,14 +468,25 @@ static void* outputThread(void* arg)
DEBUG(3, "outputThread(): continuing after job compressed\n");
{
size_t const compressedSize = job->compressedSize;
size_t remaining = compressedSize;
if (ZSTD_isError(compressedSize)) {
DISPLAY("Error: an error occurred during compression\n");
signalErrorToThreads(ctx);
return arg;
}
{
size_t const writeSize = fwrite(job->dst.start, 1, compressedSize, dstFile);
if (writeSize != compressedSize) {
// size_t const writeSize = fwrite(job->dst.start, 1, compressedSize, dstFile);
size_t const blockSize = 4 << 20;
size_t pos = 0;
for ( ; ; ) {
size_t const writeSize = MIN(remaining, blockSize);
size_t const ret = fwrite(job->dst.start + pos, 1, writeSize, dstFile);
if (ret != writeSize) break;
pos += ret;
remaining -= ret;
if (remaining == 0) break;
}
if (pos != compressedSize) {
DISPLAY("Error: an error occurred during file write operation\n");
signalErrorToThreads(ctx);
return arg;
@ -517,12 +528,12 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
ctx->stats.waitWrite++;
ctx->stats.writeCounter++;
reduceCounters(ctx);
if (!ctx->completionMeasured) {
ctx->completion = ZSTD_getCompletion(ctx->cctx);
ctx->completionMeasured = 1;
if (!ctx->compressionCompletion) {
ctx->compressionCompletion = ZSTD_getCompletion(ctx->cctx);
ctx->compressionCompletionMeasured = 1;
}
adaptCompressionLevel(ctx);
DEBUG(3, "job creation detected completion %f\n", ctx->completion);
DEBUG(3, "job creation detected completion %f\n", ctx->compressionCompletion);
DEBUG(3, "waiting on job Write, nextJob: %u\n", nextJob);
pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex);
}