fixed the problem with pipeline tests by changing how jobs move through the threads
This commit is contained in:
parent
cc714f3bd3
commit
6f3ad1b22e
@ -26,15 +26,6 @@ typedef struct {
|
|||||||
buffer_t dst;
|
buffer_t dst;
|
||||||
unsigned compressionLevel;
|
unsigned compressionLevel;
|
||||||
unsigned jobID;
|
unsigned jobID;
|
||||||
unsigned jobCompleted;
|
|
||||||
unsigned jobReady;
|
|
||||||
unsigned jobWritten;
|
|
||||||
pthread_mutex_t* jobCompleted_mutex;
|
|
||||||
pthread_cond_t* jobCompleted_cond;
|
|
||||||
pthread_mutex_t* jobReady_mutex;
|
|
||||||
pthread_cond_t* jobReady_cond;
|
|
||||||
pthread_mutex_t* jobWrite_mutex;
|
|
||||||
pthread_cond_t* jobWrite_cond;
|
|
||||||
size_t compressedSize;
|
size_t compressedSize;
|
||||||
} jobDescription;
|
} jobDescription;
|
||||||
|
|
||||||
@ -45,6 +36,9 @@ typedef struct {
|
|||||||
unsigned lastJobID;
|
unsigned lastJobID;
|
||||||
unsigned nextJobID;
|
unsigned nextJobID;
|
||||||
unsigned threadError;
|
unsigned threadError;
|
||||||
|
unsigned jobReadyID;
|
||||||
|
unsigned jobCompletedID;
|
||||||
|
unsigned jobWrittenID;
|
||||||
unsigned allJobsCompleted;
|
unsigned allJobsCompleted;
|
||||||
pthread_mutex_t jobCompleted_mutex;
|
pthread_mutex_t jobCompleted_mutex;
|
||||||
pthread_cond_t jobCompleted_cond;
|
pthread_cond_t jobCompleted_cond;
|
||||||
@ -107,20 +101,11 @@ static adaptCCtx* createCCtx(unsigned numJobs, const char* const outFilename)
|
|||||||
pthread_mutex_init(&ctx->jobWrite_mutex, NULL);
|
pthread_mutex_init(&ctx->jobWrite_mutex, NULL);
|
||||||
pthread_cond_init(&ctx->jobWrite_cond, NULL);
|
pthread_cond_init(&ctx->jobWrite_cond, NULL);
|
||||||
ctx->numJobs = numJobs;
|
ctx->numJobs = numJobs;
|
||||||
|
ctx->jobReadyID = 0;
|
||||||
|
ctx->jobCompletedID = 0;
|
||||||
|
ctx->jobWrittenID = 0;
|
||||||
ctx->lastJobID = -1; /* intentional underflow */
|
ctx->lastJobID = -1; /* intentional underflow */
|
||||||
ctx->jobs = calloc(1, numJobs*sizeof(jobDescription));
|
ctx->jobs = calloc(1, numJobs*sizeof(jobDescription));
|
||||||
{
|
|
||||||
unsigned u;
|
|
||||||
for (u=0; u<numJobs; u++) {
|
|
||||||
ctx->jobs[u].jobCompleted_mutex = &ctx->jobCompleted_mutex;
|
|
||||||
ctx->jobs[u].jobCompleted_cond = &ctx->jobCompleted_cond;
|
|
||||||
ctx->jobs[u].jobReady_mutex = &ctx->jobReady_mutex;
|
|
||||||
ctx->jobs[u].jobReady_cond = &ctx->jobReady_cond;
|
|
||||||
ctx->jobs[u].jobWrite_mutex = &ctx->jobWrite_mutex;
|
|
||||||
ctx->jobs[u].jobWrite_cond = &ctx->jobWrite_cond;
|
|
||||||
ctx->jobs[u].jobWritten = 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ctx->nextJobID = 0;
|
ctx->nextJobID = 0;
|
||||||
ctx->threadError = 0;
|
ctx->threadError = 0;
|
||||||
ctx->allJobsCompleted = 0;
|
ctx->allJobsCompleted = 0;
|
||||||
@ -161,11 +146,11 @@ static void* compressionThread(void* arg)
|
|||||||
unsigned const currJobIndex = currJob % ctx->numJobs;
|
unsigned const currJobIndex = currJob % ctx->numJobs;
|
||||||
jobDescription* job = &ctx->jobs[currJobIndex];
|
jobDescription* job = &ctx->jobs[currJobIndex];
|
||||||
// DEBUGLOG(2, "compressionThread(): waiting on job ready\n");
|
// DEBUGLOG(2, "compressionThread(): waiting on job ready\n");
|
||||||
pthread_mutex_lock(job->jobReady_mutex);
|
pthread_mutex_lock(&ctx->jobReady_mutex);
|
||||||
while(job->jobReady == 0) {
|
while(currJob + 1 > ctx->jobReadyID) {
|
||||||
pthread_cond_wait(job->jobReady_cond, job->jobReady_mutex);
|
pthread_cond_wait(&ctx->jobReady_cond, &ctx->jobReady_mutex);
|
||||||
}
|
}
|
||||||
pthread_mutex_unlock(job->jobReady_mutex);
|
pthread_mutex_unlock(&ctx->jobReady_mutex);
|
||||||
// DEBUGLOG(2, "compressionThread(): continuing after job ready\n");
|
// DEBUGLOG(2, "compressionThread(): continuing after job ready\n");
|
||||||
/* compress the data */
|
/* compress the data */
|
||||||
{
|
{
|
||||||
@ -177,11 +162,11 @@ static void* compressionThread(void* arg)
|
|||||||
}
|
}
|
||||||
job->compressedSize = compressedSize;
|
job->compressedSize = compressedSize;
|
||||||
}
|
}
|
||||||
pthread_mutex_lock(job->jobCompleted_mutex);
|
pthread_mutex_lock(&ctx->jobCompleted_mutex);
|
||||||
job->jobCompleted = 1;
|
ctx->jobCompletedID++;
|
||||||
DEBUGLOG(2, "signaling for job %u\n", currJob);
|
DEBUGLOG(2, "signaling for job %u\n", currJob);
|
||||||
pthread_cond_signal(job->jobCompleted_cond);
|
pthread_cond_signal(&ctx->jobCompleted_cond);
|
||||||
pthread_mutex_unlock(job->jobCompleted_mutex);
|
pthread_mutex_unlock(&ctx->jobCompleted_mutex);
|
||||||
currJob++;
|
currJob++;
|
||||||
if (currJob >= ctx->lastJobID || ctx->threadError) {
|
if (currJob >= ctx->lastJobID || ctx->threadError) {
|
||||||
/* finished compressing all jobs */
|
/* finished compressing all jobs */
|
||||||
@ -201,12 +186,12 @@ static void* outputThread(void* arg)
|
|||||||
unsigned const currJobIndex = currJob % ctx->numJobs;
|
unsigned const currJobIndex = currJob % ctx->numJobs;
|
||||||
jobDescription* job = &ctx->jobs[currJobIndex];
|
jobDescription* job = &ctx->jobs[currJobIndex];
|
||||||
DEBUGLOG(2, "outputThread(): waiting on job completed\n");
|
DEBUGLOG(2, "outputThread(): waiting on job completed\n");
|
||||||
pthread_mutex_lock(job->jobCompleted_mutex);
|
pthread_mutex_lock(&ctx->jobCompleted_mutex);
|
||||||
while (job->jobCompleted == 0) {
|
while (currJob + 1 > ctx->jobCompletedID) {
|
||||||
DEBUGLOG(2, "inside job completed wait loop waiting on %u\n", currJob);
|
DEBUGLOG(2, "inside job completed wait loop waiting on %u\n", currJob);
|
||||||
pthread_cond_wait(job->jobCompleted_cond, job->jobCompleted_mutex);
|
pthread_cond_wait(&ctx->jobCompleted_cond, &ctx->jobCompleted_mutex);
|
||||||
}
|
}
|
||||||
pthread_mutex_unlock(job->jobCompleted_mutex);
|
pthread_mutex_unlock(&ctx->jobCompleted_mutex);
|
||||||
DEBUGLOG(2, "outputThread(): continuing after job completed\n");
|
DEBUGLOG(2, "outputThread(): continuing after job completed\n");
|
||||||
{
|
{
|
||||||
size_t const compressedSize = job->compressedSize;
|
size_t const compressedSize = job->compressedSize;
|
||||||
@ -224,10 +209,10 @@ static void* outputThread(void* arg)
|
|||||||
}
|
}
|
||||||
currJob++;
|
currJob++;
|
||||||
DEBUGLOG(2, "locking job write mutex\n");
|
DEBUGLOG(2, "locking job write mutex\n");
|
||||||
pthread_mutex_lock(job->jobWrite_mutex);
|
pthread_mutex_lock(&ctx->jobWrite_mutex);
|
||||||
job->jobWritten = 1;
|
ctx->jobWrittenID++;
|
||||||
pthread_cond_signal(job->jobWrite_cond);
|
pthread_cond_signal(&ctx->jobWrite_cond);
|
||||||
pthread_mutex_unlock(job->jobWrite_mutex);
|
pthread_mutex_unlock(&ctx->jobWrite_mutex);
|
||||||
DEBUGLOG(2, "unlocking job write mutex\n");
|
DEBUGLOG(2, "unlocking job write mutex\n");
|
||||||
|
|
||||||
DEBUGLOG(2, "checking if done: %u/%u\n", currJob, ctx->lastJobID);
|
DEBUGLOG(2, "checking if done: %u/%u\n", currJob, ctx->lastJobID);
|
||||||
@ -250,23 +235,17 @@ static int createCompressionJob(adaptCCtx* ctx, BYTE* data, size_t srcSize)
|
|||||||
unsigned const nextJobIndex = nextJob % ctx->numJobs;
|
unsigned const nextJobIndex = nextJob % ctx->numJobs;
|
||||||
jobDescription* job = &ctx->jobs[nextJobIndex];
|
jobDescription* job = &ctx->jobs[nextJobIndex];
|
||||||
// DEBUGLOG(2, "createCompressionJob(): wait for job write\n");
|
// DEBUGLOG(2, "createCompressionJob(): wait for job write\n");
|
||||||
pthread_mutex_lock(job->jobWrite_mutex);
|
pthread_mutex_lock(&ctx->jobWrite_mutex);
|
||||||
while (job->jobWritten == 0) {
|
while (nextJob - ctx->jobWrittenID >= ctx->numJobs) {
|
||||||
pthread_cond_wait(job->jobWrite_cond, job->jobWrite_mutex);
|
pthread_cond_wait(&ctx->jobWrite_cond, &ctx->jobWrite_mutex);
|
||||||
}
|
}
|
||||||
pthread_mutex_unlock(job->jobWrite_mutex);
|
pthread_mutex_unlock(&ctx->jobWrite_mutex);
|
||||||
// DEBUGLOG(2, "createCompressionJob(): continuing after job write\n");
|
// DEBUGLOG(2, "createCompressionJob(): continuing after job write\n");
|
||||||
job->compressionLevel = ctx->compressionLevel;
|
job->compressionLevel = ctx->compressionLevel;
|
||||||
job->src.start = malloc(srcSize);
|
job->src.start = malloc(srcSize);
|
||||||
job->src.size = srcSize;
|
job->src.size = srcSize;
|
||||||
job->dst.size = ZSTD_compressBound(srcSize);
|
job->dst.size = ZSTD_compressBound(srcSize);
|
||||||
job->dst.start = malloc(job->dst.size);
|
job->dst.start = malloc(job->dst.size);
|
||||||
job->jobCompleted = 0;
|
|
||||||
job->jobWritten = 0;
|
|
||||||
job->jobCompleted_cond = &ctx->jobCompleted_cond;
|
|
||||||
job->jobCompleted_mutex = &ctx->jobCompleted_mutex;
|
|
||||||
job->jobReady_cond = &ctx->jobReady_cond;
|
|
||||||
job->jobReady_mutex = &ctx->jobReady_mutex;
|
|
||||||
job->jobID = nextJob;
|
job->jobID = nextJob;
|
||||||
if (!job->src.start || !job->dst.start) {
|
if (!job->src.start || !job->dst.start) {
|
||||||
/* problem occurred, free things then return */
|
/* problem occurred, free things then return */
|
||||||
@ -276,10 +255,10 @@ static int createCompressionJob(adaptCCtx* ctx, BYTE* data, size_t srcSize)
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
memcpy(job->src.start, data, srcSize);
|
memcpy(job->src.start, data, srcSize);
|
||||||
pthread_mutex_lock(job->jobReady_mutex);
|
pthread_mutex_lock(&ctx->jobReady_mutex);
|
||||||
job->jobReady = 1;
|
ctx->jobReadyID++;
|
||||||
pthread_cond_signal(job->jobReady_cond);
|
pthread_cond_signal(&ctx->jobReady_cond);
|
||||||
pthread_mutex_unlock(job->jobReady_mutex);
|
pthread_mutex_unlock(&ctx->jobReady_mutex);
|
||||||
ctx->nextJobID++;
|
ctx->nextJobID++;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user