semi working version that stabilizes

This commit is contained in:
Paul Cruz 2017-07-20 18:45:33 -07:00
parent 82e488770c
commit 9259c7afa4

View File

@ -308,11 +308,6 @@ static void waitUntilAllJobsCompleted(adaptCCtx* ctx)
*/ */
static void adaptCompressionLevel(adaptCCtx* ctx) static void adaptCompressionLevel(adaptCCtx* ctx)
{ {
if (g_forceCompressionLevel) {
ctx->compressionLevel = g_compressionLevel;
}
else {
DEBUG(2, "compression level %u\n", ctx->compressionLevel);
/* check if compression is too slow */ /* check if compression is too slow */
unsigned createChange; unsigned createChange;
unsigned writeChange; unsigned writeChange;
@ -321,6 +316,7 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
createChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->createCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE; createChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->createCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE;
writeChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->writeCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE; writeChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->writeCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE;
compressionChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->compressionCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE; compressionChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->compressionCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE;
DEBUG(2, "compression level %u\n", ctx->compressionLevel);
DEBUG(2, "createCompletionMeasured %f\n", ctx->createCompletionMeasured); DEBUG(2, "createCompletionMeasured %f\n", ctx->createCompletionMeasured);
DEBUG(2, "compressionCompletionMeasured %f\n", ctx->compressionCompletionMeasured); DEBUG(2, "compressionCompletionMeasured %f\n", ctx->compressionCompletionMeasured);
DEBUG(2, "writeCompletionMeasured %f\n", ctx->writeCompletionMeasured); DEBUG(2, "writeCompletionMeasured %f\n", ctx->writeCompletionMeasured);
@ -328,6 +324,7 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
{ {
unsigned const compressionFastChange = MIN(MIN(createChange, writeChange), ZSTD_maxCLevel() - ctx->compressionLevel); unsigned const compressionFastChange = MIN(MIN(createChange, writeChange), ZSTD_maxCLevel() - ctx->compressionLevel);
DEBUG(2, "compressionFastChange %u\n", compressionFastChange); DEBUG(2, "compressionFastChange %u\n", compressionFastChange);
if (compressionFastChange) { if (compressionFastChange) {
@ -348,6 +345,9 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
ctx->writeCompletionMeasured = 1; ctx->writeCompletionMeasured = 1;
pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
DEBUG(2, "\n"); DEBUG(2, "\n");
if (g_forceCompressionLevel) {
ctx->compressionLevel = g_compressionLevel;
} }
} }
@ -368,10 +368,6 @@ static void* compressionThread(void* arg)
jobDescription* job = &ctx->jobs[currJobIndex]; jobDescription* job = &ctx->jobs[currJobIndex];
DEBUG(3, "compressionThread(): waiting on job ready\n"); DEBUG(3, "compressionThread(): waiting on job ready\n");
/* new job, reset completion */
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->compressionCompletion = 0;
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
pthread_mutex_lock(&ctx->jobReady_mutex.pMutex); pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
while(currJob + 1 > ctx->jobReadyID && !ctx->threadError) { while(currJob + 1 > ctx->jobReadyID && !ctx->threadError) {
@ -387,9 +383,9 @@ static void* compressionThread(void* arg)
} }
pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex); pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
/* reset create completion */ /* reset compression completion */
pthread_mutex_lock(&ctx->completion_mutex.pMutex); pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->createCompletion = 0; ctx->compressionCompletion = 0;
pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
DEBUG(3, "compressionThread(): continuing after job ready\n"); DEBUG(3, "compressionThread(): continuing after job ready\n");
@ -397,7 +393,7 @@ static void* compressionThread(void* arg)
DEBUG(3, "%.*s", (int)job->src.size, (char*)job->src.start); DEBUG(3, "%.*s", (int)job->src.size, (char*)job->src.start);
/* adapt compression level */ /* adapt compression level */
adaptCompressionLevel(ctx); if (currJob) adaptCompressionLevel(ctx);
/* compress the data */ /* compress the data */
{ {
@ -515,11 +511,6 @@ static void* outputThread(void* arg)
jobDescription* job = &ctx->jobs[currJobIndex]; jobDescription* job = &ctx->jobs[currJobIndex];
DEBUG(3, "outputThread(): waiting on job compressed\n"); DEBUG(3, "outputThread(): waiting on job compressed\n");
/* new job, reset completion */
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->writeCompletion = 0;
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex); pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) { while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) {
pthread_mutex_lock(&ctx->completion_mutex.pMutex); pthread_mutex_lock(&ctx->completion_mutex.pMutex);
@ -532,9 +523,9 @@ static void* outputThread(void* arg)
} }
pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex); pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
/* reset compression completion */ /* reset write completion */
pthread_mutex_lock(&ctx->completion_mutex.pMutex); pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->compressionCompletion = 0; ctx->writeCompletion = 0;
pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
DEBUG(3, "outputThread(): continuing after job compressed\n"); DEBUG(3, "outputThread(): continuing after job compressed\n");
@ -615,9 +606,9 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
} }
pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex); pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
/* reset write completion */ /* reset create completion */
pthread_mutex_lock(&ctx->completion_mutex.pMutex); pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->writeCompletion = 0; ctx->createCompletion = 0;
pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
DEBUG(3, "createCompressionJob(): continuing after job write\n"); DEBUG(3, "createCompressionJob(): continuing after job write\n");
@ -688,11 +679,6 @@ static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadA
size_t const readBlockSize = 1 << 15; size_t const readBlockSize = 1 << 15;
size_t remaining = FILE_CHUNK_SIZE; size_t remaining = FILE_CHUNK_SIZE;
/* new job reset completion */
pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->createCompletion = 0;
pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
while (remaining != 0 && !feof(srcFile)) { while (remaining != 0 && !feof(srcFile)) {
size_t const ret = fread(ctx->input.buffer.start + ctx->input.filled + pos, 1, readBlockSize, srcFile); size_t const ret = fread(ctx->input.buffer.start + ctx->input.filled + pos, 1, readBlockSize, srcFile);
if (ret != readBlockSize && !feof(srcFile)) { if (ret != readBlockSize && !feof(srcFile)) {