added signal to other threads whenever error occurs
This commit is contained in:
parent
6be22f1f84
commit
b3c9e02bb6
@ -247,13 +247,31 @@ static adaptCCtx* createCCtx(unsigned numJobs)
|
|||||||
return ctx;
|
return ctx;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void signalErrorToThreads(adaptCCtx* ctx)
|
||||||
|
{
|
||||||
|
ctx->threadError = 1;
|
||||||
|
pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
|
||||||
|
pthread_cond_signal(&ctx->jobReady_cond.pCond);
|
||||||
|
pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
|
||||||
|
|
||||||
|
pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
|
||||||
|
pthread_cond_signal(&ctx->jobCompressed_cond.pCond);
|
||||||
|
pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
|
||||||
|
|
||||||
|
pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
|
||||||
|
pthread_cond_signal(&ctx->jobWrite_cond.pCond);
|
||||||
|
pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
|
||||||
|
|
||||||
|
pthread_mutex_lock(&ctx->allJobsCompleted_mutex.pMutex);
|
||||||
|
pthread_cond_signal(&ctx->allJobsCompleted_cond.pCond);
|
||||||
|
pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex);
|
||||||
|
}
|
||||||
|
|
||||||
static void waitUntilAllJobsCompleted(adaptCCtx* ctx)
|
static void waitUntilAllJobsCompleted(adaptCCtx* ctx)
|
||||||
{
|
{
|
||||||
if (!ctx) return;
|
if (!ctx) return;
|
||||||
pthread_mutex_lock(&ctx->allJobsCompleted_mutex.pMutex);
|
pthread_mutex_lock(&ctx->allJobsCompleted_mutex.pMutex);
|
||||||
while (ctx->allJobsCompleted == 0) {
|
while (ctx->allJobsCompleted == 0 && !ctx->threadError) {
|
||||||
pthread_cond_wait(&ctx->allJobsCompleted_cond.pCond, &ctx->allJobsCompleted_mutex.pMutex);
|
pthread_cond_wait(&ctx->allJobsCompleted_cond.pCond, &ctx->allJobsCompleted_mutex.pMutex);
|
||||||
}
|
}
|
||||||
pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex);
|
pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex);
|
||||||
@ -327,7 +345,7 @@ static void* compressionThread(void* arg)
|
|||||||
jobDescription* job = &ctx->jobs[currJobIndex];
|
jobDescription* job = &ctx->jobs[currJobIndex];
|
||||||
DEBUG(3, "compressionThread(): waiting on job ready\n");
|
DEBUG(3, "compressionThread(): waiting on job ready\n");
|
||||||
pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
|
pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
|
||||||
while(currJob + 1 > ctx->jobReadyID) {
|
while(currJob + 1 > ctx->jobReadyID && !ctx->threadError) {
|
||||||
ctx->stats.waitReady++;
|
ctx->stats.waitReady++;
|
||||||
ctx->stats.readyCounter++;
|
ctx->stats.readyCounter++;
|
||||||
DEBUG(3, "waiting on job ready, nextJob: %u\n", currJob);
|
DEBUG(3, "waiting on job ready, nextJob: %u\n", currJob);
|
||||||
@ -351,7 +369,7 @@ static void* compressionThread(void* arg)
|
|||||||
size_t const windowSizeError = ZSTD_setCCtxParameter(ctx->cctx, ZSTD_p_forceWindow, 1);
|
size_t const windowSizeError = ZSTD_setCCtxParameter(ctx->cctx, ZSTD_p_forceWindow, 1);
|
||||||
if (ZSTD_isError(dictModeError) || ZSTD_isError(initError) || ZSTD_isError(windowSizeError)) {
|
if (ZSTD_isError(dictModeError) || ZSTD_isError(initError) || ZSTD_isError(windowSizeError)) {
|
||||||
DISPLAY("Error: something went wrong while starting compression\n");
|
DISPLAY("Error: something went wrong while starting compression\n");
|
||||||
ctx->threadError = 1;
|
signalErrorToThreads(ctx);
|
||||||
return arg;
|
return arg;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -362,7 +380,7 @@ static void* compressionThread(void* arg)
|
|||||||
if (ZSTD_isError(hSize)) {
|
if (ZSTD_isError(hSize)) {
|
||||||
DISPLAY("Error: something went wrong while continuing compression\n");
|
DISPLAY("Error: something went wrong while continuing compression\n");
|
||||||
job->compressedSize = hSize;
|
job->compressedSize = hSize;
|
||||||
ctx->threadError = 1;
|
signalErrorToThreads(ctx);
|
||||||
return arg;
|
return arg;
|
||||||
}
|
}
|
||||||
ZSTD_invalidateRepCodes(ctx->cctx);
|
ZSTD_invalidateRepCodes(ctx->cctx);
|
||||||
@ -372,7 +390,7 @@ static void* compressionThread(void* arg)
|
|||||||
ZSTD_compressContinue(ctx->cctx, job->dst.start, job->dst.capacity, job->src.start + job->dictSize, job->src.size);
|
ZSTD_compressContinue(ctx->cctx, job->dst.start, job->dst.capacity, job->src.start + job->dictSize, job->src.size);
|
||||||
if (ZSTD_isError(job->compressedSize)) {
|
if (ZSTD_isError(job->compressedSize)) {
|
||||||
DISPLAY("Error: something went wrong during compression: %s\n", ZSTD_getErrorName(job->compressedSize));
|
DISPLAY("Error: something went wrong during compression: %s\n", ZSTD_getErrorName(job->compressedSize));
|
||||||
ctx->threadError = 1;
|
signalErrorToThreads(ctx);
|
||||||
return arg;
|
return arg;
|
||||||
}
|
}
|
||||||
job->dst.size = job->compressedSize;
|
job->dst.size = job->compressedSize;
|
||||||
@ -422,7 +440,7 @@ static void* outputThread(void* arg)
|
|||||||
jobDescription* job = &ctx->jobs[currJobIndex];
|
jobDescription* job = &ctx->jobs[currJobIndex];
|
||||||
DEBUG(3, "outputThread(): waiting on job compressed\n");
|
DEBUG(3, "outputThread(): waiting on job compressed\n");
|
||||||
pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
|
pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
|
||||||
while (currJob + 1 > ctx->jobCompressedID) {
|
while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) {
|
||||||
ctx->stats.waitCompressed++;
|
ctx->stats.waitCompressed++;
|
||||||
ctx->stats.compressedCounter++;
|
ctx->stats.compressedCounter++;
|
||||||
if (!ctx->completionMeasured) {
|
if (!ctx->completionMeasured) {
|
||||||
@ -439,14 +457,14 @@ static void* outputThread(void* arg)
|
|||||||
size_t const compressedSize = job->compressedSize;
|
size_t const compressedSize = job->compressedSize;
|
||||||
if (ZSTD_isError(compressedSize)) {
|
if (ZSTD_isError(compressedSize)) {
|
||||||
DISPLAY("Error: an error occurred during compression\n");
|
DISPLAY("Error: an error occurred during compression\n");
|
||||||
ctx->threadError = 1;
|
signalErrorToThreads(ctx);
|
||||||
return arg;
|
return arg;
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
size_t const writeSize = fwrite(job->dst.start, 1, compressedSize, dstFile);
|
size_t const writeSize = fwrite(job->dst.start, 1, compressedSize, dstFile);
|
||||||
if (writeSize != compressedSize) {
|
if (writeSize != compressedSize) {
|
||||||
DISPLAY("Error: an error occurred during file write operation\n");
|
DISPLAY("Error: an error occurred during file write operation\n");
|
||||||
ctx->threadError = 1;
|
signalErrorToThreads(ctx);
|
||||||
return arg;
|
return arg;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -482,7 +500,7 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
|
|||||||
DEBUG(3, "createCompressionJob(): wait for job write\n");
|
DEBUG(3, "createCompressionJob(): wait for job write\n");
|
||||||
pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
|
pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
|
||||||
DEBUG(3, "Creating new compression job -- nextJob: %u, jobCompressedID: %u, jobWriteID: %u, numJObs: %u\n", nextJob,ctx->jobCompressedID, ctx->jobWriteID, ctx->numJobs);
|
DEBUG(3, "Creating new compression job -- nextJob: %u, jobCompressedID: %u, jobWriteID: %u, numJObs: %u\n", nextJob,ctx->jobCompressedID, ctx->jobWriteID, ctx->numJobs);
|
||||||
while (nextJob - ctx->jobWriteID >= ctx->numJobs) {
|
while (nextJob - ctx->jobWriteID >= ctx->numJobs && !ctx->threadError) {
|
||||||
ctx->stats.waitWrite++;
|
ctx->stats.waitWrite++;
|
||||||
ctx->stats.writeCounter++;
|
ctx->stats.writeCounter++;
|
||||||
if (!ctx->completionMeasured) {
|
if (!ctx->completionMeasured) {
|
||||||
@ -545,7 +563,7 @@ static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadA
|
|||||||
pthread_t out;
|
pthread_t out;
|
||||||
if (pthread_create(&out, NULL, &outputThread, otArg)) {
|
if (pthread_create(&out, NULL, &outputThread, otArg)) {
|
||||||
DISPLAY("Error: could not create output thread\n");
|
DISPLAY("Error: could not create output thread\n");
|
||||||
ctx->threadError = 1;
|
signalErrorToThreads(ctx);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -555,7 +573,7 @@ static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadA
|
|||||||
pthread_t compression;
|
pthread_t compression;
|
||||||
if (pthread_create(&compression, NULL, &compressionThread, ctx)) {
|
if (pthread_create(&compression, NULL, &compressionThread, ctx)) {
|
||||||
DISPLAY("Error: could not create compression thread\n");
|
DISPLAY("Error: could not create compression thread\n");
|
||||||
ctx->threadError = 1;
|
signalErrorToThreads(ctx);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -565,7 +583,7 @@ static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadA
|
|||||||
size_t const readSize = fread(ctx->input.buffer.start + ctx->input.filled, 1, FILE_CHUNK_SIZE, srcFile);
|
size_t const readSize = fread(ctx->input.buffer.start + ctx->input.filled, 1, FILE_CHUNK_SIZE, srcFile);
|
||||||
if (readSize != FILE_CHUNK_SIZE && !feof(srcFile)) {
|
if (readSize != FILE_CHUNK_SIZE && !feof(srcFile)) {
|
||||||
DISPLAY("Error: problem occurred during read from src file\n");
|
DISPLAY("Error: problem occurred during read from src file\n");
|
||||||
ctx->threadError = 1;
|
signalErrorToThreads(ctx);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
g_streamedSize += readSize;
|
g_streamedSize += readSize;
|
||||||
@ -574,7 +592,7 @@ static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadA
|
|||||||
int const last = feof(srcFile);
|
int const last = feof(srcFile);
|
||||||
int const error = createCompressionJob(ctx, readSize, last);
|
int const error = createCompressionJob(ctx, readSize, last);
|
||||||
if (error != 0) {
|
if (error != 0) {
|
||||||
ctx->threadError = 1;
|
signalErrorToThreads(ctx);
|
||||||
return error;
|
return error;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user