changed how the detection of the last job works

This commit is contained in:
Paul Cruz 2017-07-10 16:27:58 -07:00
parent c36552ef8a
commit 01fc7c4244

View File

@ -50,6 +50,7 @@ typedef struct {
buffer_t dst; buffer_t dst;
unsigned compressionLevel; unsigned compressionLevel;
unsigned jobID; unsigned jobID;
unsigned lastJob;
size_t compressedSize; size_t compressedSize;
} jobDescription; } jobDescription;
@ -57,7 +58,6 @@ typedef struct {
unsigned compressionLevel; unsigned compressionLevel;
unsigned numActiveThreads; unsigned numActiveThreads;
unsigned numJobs; unsigned numJobs;
unsigned lastJobID;
unsigned nextJobID; unsigned nextJobID;
unsigned threadError; unsigned threadError;
unsigned jobReadyID; unsigned jobReadyID;
@ -134,15 +134,15 @@ static adaptCCtx* createCCtx(unsigned numJobs, const char* const outFilename)
ctx->jobReadyID = 0; ctx->jobReadyID = 0;
ctx->jobCompressedID = 0; ctx->jobCompressedID = 0;
ctx->jobWriteID = 0; ctx->jobWriteID = 0;
ctx->lastJobID = -1; /* intentional underflow */
ctx->jobs = calloc(1, numJobs*sizeof(jobDescription)); ctx->jobs = calloc(1, numJobs*sizeof(jobDescription));
/* allocating buffers for jobs */ /* initializing jobs */
{ {
unsigned jobNum; unsigned jobNum;
for (jobNum=0; jobNum<numJobs; jobNum++) { for (jobNum=0; jobNum<numJobs; jobNum++) {
jobDescription* job = &ctx->jobs[jobNum]; jobDescription* job = &ctx->jobs[jobNum];
job->src.start = malloc(FILE_CHUNK_SIZE); job->src.start = malloc(FILE_CHUNK_SIZE);
job->dst.start = malloc(ZSTD_compressBound(FILE_CHUNK_SIZE)); job->dst.start = malloc(ZSTD_compressBound(FILE_CHUNK_SIZE));
job->lastJob = 0;
if (!job->src.start || !job->dst.start) { if (!job->src.start || !job->dst.start) {
DISPLAY("Could not allocate buffers for jobs\n"); DISPLAY("Could not allocate buffers for jobs\n");
freeCCtx(ctx); freeCCtx(ctx);
@ -265,7 +265,7 @@ static void* compressionThread(void* arg)
pthread_mutex_unlock(&ctx->jobCompressed_mutex); pthread_mutex_unlock(&ctx->jobCompressed_mutex);
DEBUGLOG(2, "finished job compression %u\n", currJob); DEBUGLOG(2, "finished job compression %u\n", currJob);
currJob++; currJob++;
if (currJob >= ctx->lastJobID || ctx->threadError) { if (job->lastJob || ctx->threadError) {
/* finished compressing all jobs */ /* finished compressing all jobs */
DEBUGLOG(2, "all jobs finished compressing\n"); DEBUGLOG(2, "all jobs finished compressing\n");
break; break;
@ -327,7 +327,7 @@ static void* outputThread(void* arg)
} }
DEBUGLOG(2, "finished job write %u\n", currJob); DEBUGLOG(2, "finished job write %u\n", currJob);
currJob++; currJob++;
displayProgress(currJob, ctx->compressionLevel, currJob >= ctx->lastJobID); displayProgress(currJob, ctx->compressionLevel, job->lastJob);
DEBUGLOG(2, "locking job write mutex\n"); DEBUGLOG(2, "locking job write mutex\n");
pthread_mutex_lock(&ctx->jobWrite_mutex); pthread_mutex_lock(&ctx->jobWrite_mutex);
ctx->jobWriteID++; ctx->jobWriteID++;
@ -335,8 +335,7 @@ static void* outputThread(void* arg)
pthread_mutex_unlock(&ctx->jobWrite_mutex); pthread_mutex_unlock(&ctx->jobWrite_mutex);
DEBUGLOG(2, "unlocking job write mutex\n"); DEBUGLOG(2, "unlocking job write mutex\n");
DEBUGLOG(2, "checking if done: %u/%u\n", currJob, ctx->lastJobID); if (job->lastJob || ctx->threadError) {
if (currJob >= ctx->lastJobID || ctx->threadError) {
/* finished with all jobs */ /* finished with all jobs */
DEBUGLOG(2, "all jobs finished writing\n"); DEBUGLOG(2, "all jobs finished writing\n");
pthread_mutex_lock(&ctx->allJobsCompleted_mutex); pthread_mutex_lock(&ctx->allJobsCompleted_mutex);
@ -349,7 +348,7 @@ static void* outputThread(void* arg)
return arg; return arg;
} }
static int createCompressionJob(adaptCCtx* ctx, size_t srcSize) static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
{ {
unsigned const nextJob = ctx->nextJobID; unsigned const nextJob = ctx->nextJobID;
unsigned const nextJobIndex = nextJob % ctx->numJobs; unsigned const nextJobIndex = nextJob % ctx->numJobs;
@ -371,6 +370,7 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize)
job->src.size = srcSize; job->src.size = srcSize;
job->dst.size = ZSTD_compressBound(srcSize); job->dst.size = ZSTD_compressBound(srcSize);
job->jobID = nextJob; job->jobID = nextJob;
job->lastJob = last;
memcpy(job->src.start, ctx->input.buffer.start, srcSize); memcpy(job->src.start, ctx->input.buffer.start, srcSize);
pthread_mutex_lock(&ctx->jobReady_mutex); pthread_mutex_lock(&ctx->jobReady_mutex);
ctx->jobReadyID++; ctx->jobReadyID++;
@ -457,7 +457,8 @@ static int compressFilename(const char* const srcFilename, const char* const dst
g_streamedSize += readSize; g_streamedSize += readSize;
/* reading was fine, now create the compression job */ /* reading was fine, now create the compression job */
{ {
int const error = createCompressionJob(ctx, readSize); int const last = feof(srcFile);
int const error = createCompressionJob(ctx, readSize, last);
if (error != 0) { if (error != 0) {
ret = error; ret = error;
ctx->threadError = 1; ctx->threadError = 1;
@ -466,7 +467,6 @@ static int compressFilename(const char* const srcFilename, const char* const dst
} }
if (feof(srcFile)) { if (feof(srcFile)) {
DEBUGLOG(2, "THE STREAM OF DATA ENDED %u\n", ctx->nextJobID); DEBUGLOG(2, "THE STREAM OF DATA ENDED %u\n", ctx->nextJobID);
ctx->lastJobID = ctx->nextJobID;
break; break;
} }
} }