added code for waiitng for all jobs to finish

This commit is contained in:
Paul Cruz 2017-07-03 19:24:22 -07:00
parent dd447bb9a7
commit 95ea54b4cf
2 changed files with 27 additions and 10 deletions

Binary file not shown.

View File

@ -35,10 +35,13 @@ typedef struct {
unsigned numJobs; unsigned numJobs;
unsigned nextJobID; unsigned nextJobID;
unsigned threadError; unsigned threadError;
unsigned allJobsCompleted;
pthread_mutex_t jobCompleted_mutex; pthread_mutex_t jobCompleted_mutex;
pthread_cond_t jobCompleted_cond; pthread_cond_t jobCompleted_cond;
pthread_mutex_t jobReady_mutex; pthread_mutex_t jobReady_mutex;
pthread_cond_t jobReady_cond; pthread_cond_t jobReady_cond;
pthread_mutex_t allJobsCompleted_mutex;
pthread_cond_t allJobsCompleted_cond;
jobDescription* jobs; jobDescription* jobs;
FILE* dstFile; FILE* dstFile;
} adaptCCtx; } adaptCCtx;
@ -57,10 +60,13 @@ static adaptCCtx* createCCtx(unsigned numJobs, const char* const outFilename)
pthread_cond_init(&ctx->jobCompleted_cond, NULL); pthread_cond_init(&ctx->jobCompleted_cond, NULL);
pthread_mutex_init(&ctx->jobReady_mutex, NULL); pthread_mutex_init(&ctx->jobReady_mutex, NULL);
pthread_cond_init(&ctx->jobReady_cond, NULL); pthread_cond_init(&ctx->jobReady_cond, NULL);
pthread_mutex_init(&ctx->allJobsCompleted_mutex, NULL);
pthread_cond_init(&ctx->allJobsCompleted_cond, NULL);
ctx->numJobs = numJobs; ctx->numJobs = numJobs;
ctx->jobs = calloc(1, numJobs*sizeof(jobDescription)); ctx->jobs = calloc(1, numJobs*sizeof(jobDescription));
ctx->nextJobID = 0; ctx->nextJobID = 0;
ctx->threadError = 0; ctx->threadError = 0;
ctx->allJobsCompleted = 0;
if (!ctx->jobs) { if (!ctx->jobs) {
DISPLAY("Error: could not allocate space for jobs during context creation\n"); DISPLAY("Error: could not allocate space for jobs during context creation\n");
return NULL; return NULL;
@ -90,7 +96,12 @@ static void freeCompressionJobs(adaptCCtx* ctx)
static int freeCCtx(adaptCCtx* ctx) static int freeCCtx(adaptCCtx* ctx)
{ {
/* TODO: wait until jobs finish */ pthread_mutex_lock(&ctx->allJobsCompleted_mutex);
while (ctx->allJobsCompleted == 0) {
pthread_cond_wait(&ctx->allJobsCompleted_cond, &ctx->allJobsCompleted_mutex);
}
pthread_mutex_unlock(&ctx->allJobsCompleted_mutex);
{
int const completedMutexError = pthread_mutex_destroy(&ctx->jobCompleted_mutex); int const completedMutexError = pthread_mutex_destroy(&ctx->jobCompleted_mutex);
int const completedCondError = pthread_cond_destroy(&ctx->jobCompleted_cond); int const completedCondError = pthread_cond_destroy(&ctx->jobCompleted_cond);
int const readyMutexError = pthread_mutex_destroy(&ctx->jobReady_mutex); int const readyMutexError = pthread_mutex_destroy(&ctx->jobReady_mutex);
@ -99,6 +110,7 @@ static int freeCCtx(adaptCCtx* ctx)
freeCompressionJobs(ctx); freeCompressionJobs(ctx);
free(ctx->jobs); free(ctx->jobs);
return completedMutexError | completedCondError | readyMutexError | readyCondError | fileError; return completedMutexError | completedCondError | readyMutexError | readyCondError | fileError;
}
} }
static void* compressionThread(void* arg) static void* compressionThread(void* arg)
@ -164,6 +176,10 @@ static void* outputThread(void* arg)
currJob++; currJob++;
if (currJob >= ctx->numJobs || ctx->threadError) { if (currJob >= ctx->numJobs || ctx->threadError) {
/* finished with all jobs */ /* finished with all jobs */
pthread_mutex_lock(&ctx->allJobsCompleted_mutex);
ctx->allJobsCompleted = 1;
pthread_cond_signal(&ctx->allJobsCompleted_cond);
pthread_mutex_unlock(&ctx->allJobsCompleted_mutex);
break; break;
} }
} }
@ -267,6 +283,7 @@ int main(int argCount, const char* argv[])
goto cleanup; goto cleanup;
} }
} }
/* create compression thread */ /* create compression thread */
{ {
pthread_t compression; pthread_t compression;
@ -297,7 +314,7 @@ int main(int argCount, const char* argv[])
} }
if (feof(srcFile)) break; if (feof(srcFile)) break;
} }
DISPLAY("cleanup\n");
cleanup: cleanup:
/* file compression completed */ /* file compression completed */
ret |= (srcFile != NULL) ? fclose(srcFile) : 0; ret |= (srcFile != NULL) ? fclose(srcFile) : 0;