diff --git a/contrib/adaptive-compression/v2 b/contrib/adaptive-compression/v2 index 0c0e194e..38034cda 100755 Binary files a/contrib/adaptive-compression/v2 and b/contrib/adaptive-compression/v2 differ diff --git a/contrib/adaptive-compression/v2.c b/contrib/adaptive-compression/v2.c index 5bbb8b9b..eaaecb76 100644 --- a/contrib/adaptive-compression/v2.c +++ b/contrib/adaptive-compression/v2.c @@ -35,10 +35,13 @@ typedef struct { unsigned numJobs; unsigned nextJobID; unsigned threadError; + unsigned allJobsCompleted; pthread_mutex_t jobCompleted_mutex; pthread_cond_t jobCompleted_cond; pthread_mutex_t jobReady_mutex; pthread_cond_t jobReady_cond; + pthread_mutex_t allJobsCompleted_mutex; + pthread_cond_t allJobsCompleted_cond; jobDescription* jobs; FILE* dstFile; } adaptCCtx; @@ -57,10 +60,13 @@ static adaptCCtx* createCCtx(unsigned numJobs, const char* const outFilename) pthread_cond_init(&ctx->jobCompleted_cond, NULL); pthread_mutex_init(&ctx->jobReady_mutex, NULL); pthread_cond_init(&ctx->jobReady_cond, NULL); + pthread_mutex_init(&ctx->allJobsCompleted_mutex, NULL); + pthread_cond_init(&ctx->allJobsCompleted_cond, NULL); ctx->numJobs = numJobs; ctx->jobs = calloc(1, numJobs*sizeof(jobDescription)); ctx->nextJobID = 0; ctx->threadError = 0; + ctx->allJobsCompleted = 0; if (!ctx->jobs) { DISPLAY("Error: could not allocate space for jobs during context creation\n"); return NULL; @@ -90,15 +96,21 @@ static void freeCompressionJobs(adaptCCtx* ctx) static int freeCCtx(adaptCCtx* ctx) { - /* TODO: wait until jobs finish */ - int const completedMutexError = pthread_mutex_destroy(&ctx->jobCompleted_mutex); - int const completedCondError = pthread_cond_destroy(&ctx->jobCompleted_cond); - int const readyMutexError = pthread_mutex_destroy(&ctx->jobReady_mutex); - int const readyCondError = pthread_cond_destroy(&ctx->jobReady_cond); - int const fileError = fclose(ctx->dstFile); - freeCompressionJobs(ctx); - free(ctx->jobs); - return completedMutexError | completedCondError | readyMutexError | readyCondError | fileError; + pthread_mutex_lock(&ctx->allJobsCompleted_mutex); + while (ctx->allJobsCompleted == 0) { + pthread_cond_wait(&ctx->allJobsCompleted_cond, &ctx->allJobsCompleted_mutex); + } + pthread_mutex_unlock(&ctx->allJobsCompleted_mutex); + { + int const completedMutexError = pthread_mutex_destroy(&ctx->jobCompleted_mutex); + int const completedCondError = pthread_cond_destroy(&ctx->jobCompleted_cond); + int const readyMutexError = pthread_mutex_destroy(&ctx->jobReady_mutex); + int const readyCondError = pthread_cond_destroy(&ctx->jobReady_cond); + int const fileError = fclose(ctx->dstFile); + freeCompressionJobs(ctx); + free(ctx->jobs); + return completedMutexError | completedCondError | readyMutexError | readyCondError | fileError; + } } static void* compressionThread(void* arg) @@ -164,6 +176,10 @@ static void* outputThread(void* arg) currJob++; if (currJob >= ctx->numJobs || ctx->threadError) { /* finished with all jobs */ + pthread_mutex_lock(&ctx->allJobsCompleted_mutex); + ctx->allJobsCompleted = 1; + pthread_cond_signal(&ctx->allJobsCompleted_cond); + pthread_mutex_unlock(&ctx->allJobsCompleted_mutex); break; } } @@ -267,6 +283,7 @@ int main(int argCount, const char* argv[]) goto cleanup; } } + /* create compression thread */ { pthread_t compression; @@ -297,7 +314,7 @@ int main(int argCount, const char* argv[]) } if (feof(srcFile)) break; } - DISPLAY("cleanup\n"); + cleanup: /* file compression completed */ ret |= (srcFile != NULL) ? fclose(srcFile) : 0;