From b3c9e02bb6ab8807990ddc85a99f9953b9ee3578 Mon Sep 17 00:00:00 2001 From: Paul Cruz Date: Mon, 17 Jul 2017 15:34:58 -0700 Subject: [PATCH] added signal to other threads whenever error occurs --- contrib/adaptive-compression/adapt.c | 44 ++++++++++++++++++++-------- 1 file changed, 31 insertions(+), 13 deletions(-) diff --git a/contrib/adaptive-compression/adapt.c b/contrib/adaptive-compression/adapt.c index 3aec50c0..01a73a66 100644 --- a/contrib/adaptive-compression/adapt.c +++ b/contrib/adaptive-compression/adapt.c @@ -247,13 +247,31 @@ static adaptCCtx* createCCtx(unsigned numJobs) return ctx; } +static void signalErrorToThreads(adaptCCtx* ctx) +{ + ctx->threadError = 1; + pthread_mutex_lock(&ctx->jobReady_mutex.pMutex); + pthread_cond_signal(&ctx->jobReady_cond.pCond); + pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex); + pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex); + pthread_cond_signal(&ctx->jobCompressed_cond.pCond); + pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex); + + pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex); + pthread_cond_signal(&ctx->jobWrite_cond.pCond); + pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex); + + pthread_mutex_lock(&ctx->allJobsCompleted_mutex.pMutex); + pthread_cond_signal(&ctx->allJobsCompleted_cond.pCond); + pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex); +} static void waitUntilAllJobsCompleted(adaptCCtx* ctx) { if (!ctx) return; pthread_mutex_lock(&ctx->allJobsCompleted_mutex.pMutex); - while (ctx->allJobsCompleted == 0) { + while (ctx->allJobsCompleted == 0 && !ctx->threadError) { pthread_cond_wait(&ctx->allJobsCompleted_cond.pCond, &ctx->allJobsCompleted_mutex.pMutex); } pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex); @@ -327,7 +345,7 @@ static void* compressionThread(void* arg) jobDescription* job = &ctx->jobs[currJobIndex]; DEBUG(3, "compressionThread(): waiting on job ready\n"); pthread_mutex_lock(&ctx->jobReady_mutex.pMutex); - while(currJob + 1 > ctx->jobReadyID) { + while(currJob + 1 > ctx->jobReadyID && !ctx->threadError) { ctx->stats.waitReady++; ctx->stats.readyCounter++; DEBUG(3, "waiting on job ready, nextJob: %u\n", currJob); @@ -351,7 +369,7 @@ static void* compressionThread(void* arg) size_t const windowSizeError = ZSTD_setCCtxParameter(ctx->cctx, ZSTD_p_forceWindow, 1); if (ZSTD_isError(dictModeError) || ZSTD_isError(initError) || ZSTD_isError(windowSizeError)) { DISPLAY("Error: something went wrong while starting compression\n"); - ctx->threadError = 1; + signalErrorToThreads(ctx); return arg; } } @@ -362,7 +380,7 @@ static void* compressionThread(void* arg) if (ZSTD_isError(hSize)) { DISPLAY("Error: something went wrong while continuing compression\n"); job->compressedSize = hSize; - ctx->threadError = 1; + signalErrorToThreads(ctx); return arg; } ZSTD_invalidateRepCodes(ctx->cctx); @@ -372,7 +390,7 @@ static void* compressionThread(void* arg) ZSTD_compressContinue(ctx->cctx, job->dst.start, job->dst.capacity, job->src.start + job->dictSize, job->src.size); if (ZSTD_isError(job->compressedSize)) { DISPLAY("Error: something went wrong during compression: %s\n", ZSTD_getErrorName(job->compressedSize)); - ctx->threadError = 1; + signalErrorToThreads(ctx); return arg; } job->dst.size = job->compressedSize; @@ -422,7 +440,7 @@ static void* outputThread(void* arg) jobDescription* job = &ctx->jobs[currJobIndex]; DEBUG(3, "outputThread(): waiting on job compressed\n"); pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex); - while (currJob + 1 > ctx->jobCompressedID) { + while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) { ctx->stats.waitCompressed++; ctx->stats.compressedCounter++; if (!ctx->completionMeasured) { @@ -439,14 +457,14 @@ static void* outputThread(void* arg) size_t const compressedSize = job->compressedSize; if (ZSTD_isError(compressedSize)) { DISPLAY("Error: an error occurred during compression\n"); - ctx->threadError = 1; + signalErrorToThreads(ctx); return arg; } { size_t const writeSize = fwrite(job->dst.start, 1, compressedSize, dstFile); if (writeSize != compressedSize) { DISPLAY("Error: an error occurred during file write operation\n"); - ctx->threadError = 1; + signalErrorToThreads(ctx); return arg; } } @@ -482,7 +500,7 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last) DEBUG(3, "createCompressionJob(): wait for job write\n"); pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex); DEBUG(3, "Creating new compression job -- nextJob: %u, jobCompressedID: %u, jobWriteID: %u, numJObs: %u\n", nextJob,ctx->jobCompressedID, ctx->jobWriteID, ctx->numJobs); - while (nextJob - ctx->jobWriteID >= ctx->numJobs) { + while (nextJob - ctx->jobWriteID >= ctx->numJobs && !ctx->threadError) { ctx->stats.waitWrite++; ctx->stats.writeCounter++; if (!ctx->completionMeasured) { @@ -545,7 +563,7 @@ static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadA pthread_t out; if (pthread_create(&out, NULL, &outputThread, otArg)) { DISPLAY("Error: could not create output thread\n"); - ctx->threadError = 1; + signalErrorToThreads(ctx); return 1; } } @@ -555,7 +573,7 @@ static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadA pthread_t compression; if (pthread_create(&compression, NULL, &compressionThread, ctx)) { DISPLAY("Error: could not create compression thread\n"); - ctx->threadError = 1; + signalErrorToThreads(ctx); return 1; } } @@ -565,7 +583,7 @@ static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadA size_t const readSize = fread(ctx->input.buffer.start + ctx->input.filled, 1, FILE_CHUNK_SIZE, srcFile); if (readSize != FILE_CHUNK_SIZE && !feof(srcFile)) { DISPLAY("Error: problem occurred during read from src file\n"); - ctx->threadError = 1; + signalErrorToThreads(ctx); return 1; } g_streamedSize += readSize; @@ -574,7 +592,7 @@ static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadA int const last = feof(srcFile); int const error = createCompressionJob(ctx, readSize, last); if (error != 0) { - ctx->threadError = 1; + signalErrorToThreads(ctx); return error; } }