From cc714f3bd3346fe62530ef0ae3c8179e4ef6423c Mon Sep 17 00:00:00 2001 From: Paul Cruz Date: Wed, 5 Jul 2017 16:54:34 -0700 Subject: [PATCH] added print statements and debuglog --- contrib/adaptive-compression/multi.c | 23 ++++++++++++++++++++++- contrib/adaptive-compression/pipetests.sh | 2 ++ 2 files changed, 24 insertions(+), 1 deletion(-) create mode 100755 contrib/adaptive-compression/pipetests.sh diff --git a/contrib/adaptive-compression/multi.c b/contrib/adaptive-compression/multi.c index 089dfe5a..40045381 100644 --- a/contrib/adaptive-compression/multi.c +++ b/contrib/adaptive-compression/multi.c @@ -1,9 +1,11 @@ #define DISPLAY(...) fprintf(stderr, __VA_ARGS__) +#define DEBUGLOG(l, ...) { if (g_displayLevel>=l) { DISPLAY(__VA_ARGS__); } } #define FILE_CHUNK_SIZE 4 << 20 #define MAX_NUM_JOBS 50; #define stdinmark "/*stdin*\\" #define stdoutmark "/*stdout*\\" #define MAX_PATH 256 +#define DEFAULT_DISPLAY_LEVEL 1 typedef unsigned char BYTE; #include /* fprintf */ @@ -12,7 +14,7 @@ typedef unsigned char BYTE; #include /* memset */ #include "zstd.h" - +static int g_displayLevel = DEFAULT_DISPLAY_LEVEL; typedef struct { void* start; @@ -158,11 +160,13 @@ static void* compressionThread(void* arg) for ( ; ; ) { unsigned const currJobIndex = currJob % ctx->numJobs; jobDescription* job = &ctx->jobs[currJobIndex]; + // DEBUGLOG(2, "compressionThread(): waiting on job ready\n"); pthread_mutex_lock(job->jobReady_mutex); while(job->jobReady == 0) { pthread_cond_wait(job->jobReady_cond, job->jobReady_mutex); } pthread_mutex_unlock(job->jobReady_mutex); + // DEBUGLOG(2, "compressionThread(): continuing after job ready\n"); /* compress the data */ { size_t const compressedSize = ZSTD_compress(job->dst.start, job->dst.size, job->src.start, job->src.size, job->compressionLevel); @@ -175,11 +179,13 @@ static void* compressionThread(void* arg) } pthread_mutex_lock(job->jobCompleted_mutex); job->jobCompleted = 1; + DEBUGLOG(2, "signaling for job %u\n", currJob); pthread_cond_signal(job->jobCompleted_cond); pthread_mutex_unlock(job->jobCompleted_mutex); currJob++; if (currJob >= ctx->lastJobID || ctx->threadError) { /* finished compressing all jobs */ + DEBUGLOG(2, "all jobs finished compressing\n"); break; } } @@ -194,11 +200,14 @@ static void* outputThread(void* arg) for ( ; ; ) { unsigned const currJobIndex = currJob % ctx->numJobs; jobDescription* job = &ctx->jobs[currJobIndex]; + DEBUGLOG(2, "outputThread(): waiting on job completed\n"); pthread_mutex_lock(job->jobCompleted_mutex); while (job->jobCompleted == 0) { + DEBUGLOG(2, "inside job completed wait loop waiting on %u\n", currJob); pthread_cond_wait(job->jobCompleted_cond, job->jobCompleted_mutex); } pthread_mutex_unlock(job->jobCompleted_mutex); + DEBUGLOG(2, "outputThread(): continuing after job completed\n"); { size_t const compressedSize = job->compressedSize; if (ZSTD_isError(compressedSize)) { @@ -214,12 +223,17 @@ static void* outputThread(void* arg) } } currJob++; + DEBUGLOG(2, "locking job write mutex\n"); pthread_mutex_lock(job->jobWrite_mutex); job->jobWritten = 1; pthread_cond_signal(job->jobWrite_cond); pthread_mutex_unlock(job->jobWrite_mutex); + DEBUGLOG(2, "unlocking job write mutex\n"); + + DEBUGLOG(2, "checking if done: %u/%u\n", currJob, ctx->lastJobID); if (currJob >= ctx->lastJobID || ctx->threadError) { /* finished with all jobs */ + DEBUGLOG(2, "all jobs finished writing\n"); pthread_mutex_lock(&ctx->allJobsCompleted_mutex); ctx->allJobsCompleted = 1; pthread_cond_signal(&ctx->allJobsCompleted_cond); @@ -235,11 +249,13 @@ static int createCompressionJob(adaptCCtx* ctx, BYTE* data, size_t srcSize) unsigned const nextJob = ctx->nextJobID; unsigned const nextJobIndex = nextJob % ctx->numJobs; jobDescription* job = &ctx->jobs[nextJobIndex]; + // DEBUGLOG(2, "createCompressionJob(): wait for job write\n"); pthread_mutex_lock(job->jobWrite_mutex); while (job->jobWritten == 0) { pthread_cond_wait(job->jobWrite_cond, job->jobWrite_mutex); } pthread_mutex_unlock(job->jobWrite_mutex); + // DEBUGLOG(2, "createCompressionJob(): continuing after job write\n"); job->compressionLevel = ctx->compressionLevel; job->src.start = malloc(srcSize); job->src.size = srcSize; @@ -329,6 +345,7 @@ static int compressFilename(const char* const srcFilename, const char* const dst } } if (feof(srcFile)) { + DEBUGLOG(2, "THE STREAM OF DATA ENDED %u\n", ctx->nextJobID); ctx->lastJobID = ctx->nextJobID; break; } @@ -384,6 +401,10 @@ int main(int argCount, const char* argv[]) outFilename = argument; continue; } + else if (strlen(argument) > 1 && argument[1] == 'v') { + g_displayLevel++; + continue; + } else { DISPLAY("Error: invalid argument provided\n"); ret = 1; diff --git a/contrib/adaptive-compression/pipetests.sh b/contrib/adaptive-compression/pipetests.sh new file mode 100755 index 00000000..2924cd5a --- /dev/null +++ b/contrib/adaptive-compression/pipetests.sh @@ -0,0 +1,2 @@ +make clean multi +pv -q -L 50m tests/test2048.pdf | ./multi -v -otmp.zst