fix : ZSTDMT_compressStream_generic() can accept NULL input

also : converge implementations towards new version of ZSTDMT_compressStream_generic()
This commit is contained in:
cyan4973 2017-07-01 06:59:24 -07:00
parent 58bd0e70fc
commit 1bafe393e4

View File

@ -50,7 +50,7 @@ static unsigned long long GetCurrentClockTimeMicroseconds(void)
return ((((unsigned long long)newTicks)*(1000000))/_ticksPerSecond); }
}
#define MUTEX_WAIT_TIME_DLEVEL 5
#define MUTEX_WAIT_TIME_DLEVEL 6
#define PTHREAD_MUTEX_LOCK(mutex) { \
if (ZSTD_DEBUG>=MUTEX_WAIT_TIME_DLEVEL) { \
unsigned long long const beforeTime = GetCurrentClockTimeMicroseconds(); \
@ -878,7 +878,8 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
}
/* fill input buffer */
{ size_t const toLoad = MIN(input->size - input->pos, mtctx->inBuffSize - mtctx->inBuff.filled);
if (input->src) { /* support NULL input */
size_t const toLoad = MIN(input->size - input->pos, mtctx->inBuffSize - mtctx->inBuff.filled);
memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, toLoad);
input->pos += toLoad;
mtctx->inBuff.filled += toLoad;
@ -911,30 +912,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
{
size_t const newJobThreshold = zcs->dictSize + zcs->targetSectionSize + zcs->marginSize;
if (zcs->frameEnded) {
/* current frame being ended. Only flush is allowed. Or start new job with init */
DEBUGLOG(5, "ZSTDMT_compressStream: zcs::frameEnded==1");
return ERROR(stage_wrong);
}
if (zcs->nbThreads==1) {
return ZSTD_compressStream(zcs->cctxPool->cctx[0], output, input);
}
/* fill input buffer */
{ size_t const toLoad = MIN(input->size - input->pos, zcs->inBuffSize - zcs->inBuff.filled);
memcpy((char*)zcs->inBuff.buffer.start + zcs->inBuff.filled, (const char*)input->src + input->pos, toLoad);
input->pos += toLoad;
zcs->inBuff.filled += toLoad;
}
if ( (zcs->inBuff.filled >= newJobThreshold) /* filled enough : let's compress */
&& (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) { /* avoid overwriting job round buffer */
CHECK_F( ZSTDMT_createCompressionJob(zcs, zcs->targetSectionSize, 0 /* endFrame */) );
}
/* check for data to flush */
CHECK_F( ZSTDMT_flushNextJob(zcs, output, (zcs->inBuff.filled == zcs->inBuffSize) /* blockToFlush */) ); /* block if it wasn't possible to create new job due to saturation */
CHECK_F( ZSTDMT_compressStream_generic(zcs, output, input, ZSTD_e_continue) );
/* recommended next input size : fill current input buffer */
return zcs->inBuffSize - zcs->inBuff.filled; /* note : could be zero when input buffer is fully filled and no more availability to create new job */
@ -970,29 +948,3 @@ size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output)
return ZSTD_endStream(zcs->cctxPool->cctx[0], output);
return ZSTDMT_flushStream_internal(zcs, output, 1 /* endFrame */);
}
size_t ZSTDMT_compressStream_generic2(ZSTDMT_CCtx* mtctx,
ZSTD_outBuffer* output,
ZSTD_inBuffer* input,
ZSTD_EndDirective endOp)
{
DEBUGLOG(5, "ZSTDMT_compressStream_generic");
DEBUGLOG(5, "in: pos:%u / size:%u ; endOp=%u",
(U32)input->pos, (U32)input->size, (U32)endOp);
if (input->pos < input->size) /* some input to consume */
CHECK_F(ZSTDMT_compressStream(mtctx, output, input));
if (input->pos < input->size) /* input not consumed : do not flush yet */
endOp = ZSTD_e_continue;
switch(endOp)
{
case ZSTD_e_flush:
return ZSTDMT_flushStream(mtctx, output);
case ZSTD_e_end:
DEBUGLOG(5, "endOp:%u; calling ZSTDMT_endStream", (U32)endOp);
return ZSTDMT_endStream(mtctx, output);
case ZSTD_e_continue:
return 1;
default:
return ERROR(GENERIC); /* invalid endDirective */
}
}