From e410d63d458142c3930341637f05bcc3c9397e78 Mon Sep 17 00:00:00 2001 From: Paul Cruz Date: Mon, 10 Jul 2017 15:37:14 -0700 Subject: [PATCH] made input buffer an internal part of the compression context --- contrib/adaptive-compression/adapt.c | 29 ++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/contrib/adaptive-compression/adapt.c b/contrib/adaptive-compression/adapt.c index 61fb3142..f87bdf8c 100644 --- a/contrib/adaptive-compression/adapt.c +++ b/contrib/adaptive-compression/adapt.c @@ -31,6 +31,11 @@ typedef struct { size_t size; } buffer_t; +typedef struct { + size_t filled; + buffer_t buffer; +} inBuff_t; + typedef struct { unsigned waitCompressed; unsigned waitReady; @@ -68,6 +73,7 @@ typedef struct { pthread_cond_t allJobsCompleted_cond; pthread_mutex_t jobWrite_mutex; pthread_cond_t jobWrite_cond; + inBuff_t input; cStat_t stats; jobDescription* jobs; FILE* dstFile; @@ -97,6 +103,7 @@ static int freeCCtx(adaptCCtx* ctx) int const jobWriteCondError = pthread_cond_destroy(&ctx->jobWrite_cond); int const fileCloseError = (ctx->dstFile != NULL && ctx->dstFile != stdout) ? fclose(ctx->dstFile) : 0; int const cctxError = ZSTD_isError(ZSTD_freeCCtx(ctx->cctx)) ? 1 : 0; + free(ctx->input.buffer.start); if (ctx->jobs){ freeCompressionJobs(ctx); free(ctx->jobs); @@ -115,7 +122,7 @@ static adaptCCtx* createCCtx(unsigned numJobs, const char* const outFilename) } memset(ctx, 0, sizeof(adaptCCtx)); ctx->compressionLevel = g_compressionLevel; - pthread_mutex_init(&ctx->jobCompressed_mutex, NULL); + pthread_mutex_init(&ctx->jobCompressed_mutex, NULL); pthread_cond_init(&ctx->jobCompressed_cond, NULL); pthread_mutex_init(&ctx->jobReady_mutex, NULL); pthread_cond_init(&ctx->jobReady_cond, NULL); @@ -134,6 +141,14 @@ static adaptCCtx* createCCtx(unsigned numJobs, const char* const outFilename) ctx->allJobsCompleted = 0; ctx->adaptParam = DEFAULT_ADAPT_PARAM; ctx->cctx = ZSTD_createCCtx(); + ctx->input.filled = 0; + ctx->input.buffer.size = 2 * FILE_CHUNK_SIZE; + ctx->input.buffer.start = malloc(ctx->input.buffer.size); + if (!ctx->input.buffer.start) { + DISPLAY("Error: could not allocate input buffer\n"); + freeCCtx(ctx); + return NULL; + } if (!ctx->cctx) { DISPLAY("Error: could not allocate ZSTD_CCtx\n"); freeCCtx(ctx); @@ -320,7 +335,7 @@ static void* outputThread(void* arg) return arg; } -static int createCompressionJob(adaptCCtx* ctx, BYTE* data, size_t srcSize) +static int createCompressionJob(adaptCCtx* ctx, size_t srcSize) { unsigned const nextJob = ctx->nextJobID; unsigned const nextJobIndex = nextJob % ctx->numJobs; @@ -351,7 +366,7 @@ static int createCompressionJob(adaptCCtx* ctx, BYTE* data, size_t srcSize) free(job->dst.start); return 1; } - memcpy(job->src.start, data, srcSize); + memcpy(job->src.start, ctx->input.buffer.start, srcSize); pthread_mutex_lock(&ctx->jobReady_mutex); ctx->jobReadyID++; pthread_cond_signal(&ctx->jobReady_cond); @@ -371,7 +386,6 @@ static void printStats(cStat_t stats) static int compressFilename(const char* const srcFilename, const char* const dstFilenameOrNull) { - BYTE* const src = malloc(FILE_CHUNK_SIZE); unsigned const stdinUsed = !strcmp(srcFilename, stdinmark); FILE* const srcFile = stdinUsed ? stdin : fopen(srcFilename, "rb"); const char* const outFilenameIntermediate = (stdinUsed && !dstFilenameOrNull) ? stdoutmark : dstFilenameOrNull; @@ -393,7 +407,7 @@ static int compressFilename(const char* const srcFilename, const char* const dst } /* checking for errors */ - if (!srcFilename || !outFilename || !src || !srcFile) { + if (!srcFilename || !outFilename || !srcFile) { DISPLAY("Error: initial variables could not be allocated\n"); ret = 1; goto cleanup; @@ -428,7 +442,7 @@ static int compressFilename(const char* const srcFilename, const char* const dst /* creating jobs */ for ( ; ; ) { - size_t const readSize = fread(src, 1, FILE_CHUNK_SIZE, srcFile); + size_t const readSize = fread(ctx->input.buffer.start, 1, FILE_CHUNK_SIZE, srcFile); if (readSize != FILE_CHUNK_SIZE && !feof(srcFile)) { DISPLAY("Error: problem occurred during read from src file\n"); ctx->threadError = 1; @@ -438,7 +452,7 @@ static int compressFilename(const char* const srcFilename, const char* const dst g_streamedSize += readSize; /* reading was fine, now create the compression job */ { - int const error = createCompressionJob(ctx, src, readSize); + int const error = createCompressionJob(ctx, readSize); if (error != 0) { ret = error; ctx->threadError = 1; @@ -458,7 +472,6 @@ cleanup: /* file compression completed */ ret |= (srcFile != NULL) ? fclose(srcFile) : 0; ret |= (ctx != NULL) ? freeCCtx(ctx) : 0; - free(src); return ret; }