made input buffer an internal part of the compression context

dev
Paul Cruz 2017-07-10 15:37:14 -07:00
parent cc7f8e4d71
commit e410d63d45
1 changed files with 21 additions and 8 deletions

View File

@ -31,6 +31,11 @@ typedef struct {
size_t size;
} buffer_t;
typedef struct {
size_t filled;
buffer_t buffer;
} inBuff_t;
typedef struct {
unsigned waitCompressed;
unsigned waitReady;
@ -68,6 +73,7 @@ typedef struct {
pthread_cond_t allJobsCompleted_cond;
pthread_mutex_t jobWrite_mutex;
pthread_cond_t jobWrite_cond;
inBuff_t input;
cStat_t stats;
jobDescription* jobs;
FILE* dstFile;
@ -97,6 +103,7 @@ static int freeCCtx(adaptCCtx* ctx)
int const jobWriteCondError = pthread_cond_destroy(&ctx->jobWrite_cond);
int const fileCloseError = (ctx->dstFile != NULL && ctx->dstFile != stdout) ? fclose(ctx->dstFile) : 0;
int const cctxError = ZSTD_isError(ZSTD_freeCCtx(ctx->cctx)) ? 1 : 0;
free(ctx->input.buffer.start);
if (ctx->jobs){
freeCompressionJobs(ctx);
free(ctx->jobs);
@ -115,7 +122,7 @@ static adaptCCtx* createCCtx(unsigned numJobs, const char* const outFilename)
}
memset(ctx, 0, sizeof(adaptCCtx));
ctx->compressionLevel = g_compressionLevel;
pthread_mutex_init(&ctx->jobCompressed_mutex, NULL);
pthread_mutex_init(&ctx->jobCompressed_mutex, NULL);
pthread_cond_init(&ctx->jobCompressed_cond, NULL);
pthread_mutex_init(&ctx->jobReady_mutex, NULL);
pthread_cond_init(&ctx->jobReady_cond, NULL);
@ -134,6 +141,14 @@ static adaptCCtx* createCCtx(unsigned numJobs, const char* const outFilename)
ctx->allJobsCompleted = 0;
ctx->adaptParam = DEFAULT_ADAPT_PARAM;
ctx->cctx = ZSTD_createCCtx();
ctx->input.filled = 0;
ctx->input.buffer.size = 2 * FILE_CHUNK_SIZE;
ctx->input.buffer.start = malloc(ctx->input.buffer.size);
if (!ctx->input.buffer.start) {
DISPLAY("Error: could not allocate input buffer\n");
freeCCtx(ctx);
return NULL;
}
if (!ctx->cctx) {
DISPLAY("Error: could not allocate ZSTD_CCtx\n");
freeCCtx(ctx);
@ -320,7 +335,7 @@ static void* outputThread(void* arg)
return arg;
}
static int createCompressionJob(adaptCCtx* ctx, BYTE* data, size_t srcSize)
static int createCompressionJob(adaptCCtx* ctx, size_t srcSize)
{
unsigned const nextJob = ctx->nextJobID;
unsigned const nextJobIndex = nextJob % ctx->numJobs;
@ -351,7 +366,7 @@ static int createCompressionJob(adaptCCtx* ctx, BYTE* data, size_t srcSize)
free(job->dst.start);
return 1;
}
memcpy(job->src.start, data, srcSize);
memcpy(job->src.start, ctx->input.buffer.start, srcSize);
pthread_mutex_lock(&ctx->jobReady_mutex);
ctx->jobReadyID++;
pthread_cond_signal(&ctx->jobReady_cond);
@ -371,7 +386,6 @@ static void printStats(cStat_t stats)
static int compressFilename(const char* const srcFilename, const char* const dstFilenameOrNull)
{
BYTE* const src = malloc(FILE_CHUNK_SIZE);
unsigned const stdinUsed = !strcmp(srcFilename, stdinmark);
FILE* const srcFile = stdinUsed ? stdin : fopen(srcFilename, "rb");
const char* const outFilenameIntermediate = (stdinUsed && !dstFilenameOrNull) ? stdoutmark : dstFilenameOrNull;
@ -393,7 +407,7 @@ static int compressFilename(const char* const srcFilename, const char* const dst
}
/* checking for errors */
if (!srcFilename || !outFilename || !src || !srcFile) {
if (!srcFilename || !outFilename || !srcFile) {
DISPLAY("Error: initial variables could not be allocated\n");
ret = 1;
goto cleanup;
@ -428,7 +442,7 @@ static int compressFilename(const char* const srcFilename, const char* const dst
/* creating jobs */
for ( ; ; ) {
size_t const readSize = fread(src, 1, FILE_CHUNK_SIZE, srcFile);
size_t const readSize = fread(ctx->input.buffer.start, 1, FILE_CHUNK_SIZE, srcFile);
if (readSize != FILE_CHUNK_SIZE && !feof(srcFile)) {
DISPLAY("Error: problem occurred during read from src file\n");
ctx->threadError = 1;
@ -438,7 +452,7 @@ static int compressFilename(const char* const srcFilename, const char* const dst
g_streamedSize += readSize;
/* reading was fine, now create the compression job */
{
int const error = createCompressionJob(ctx, src, readSize);
int const error = createCompressionJob(ctx, readSize);
if (error != 0) {
ret = error;
ctx->threadError = 1;
@ -458,7 +472,6 @@ cleanup:
/* file compression completed */
ret |= (srcFile != NULL) ? fclose(srcFile) : 0;
ret |= (ctx != NULL) ? freeCCtx(ctx) : 0;
free(src);
return ret;
}