diff --git a/.travis.yml b/.travis.yml index 6bf99f1b..b0489bd6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -92,7 +92,17 @@ matrix: - gcc-aarch64-linux-gnu - libc6-dev-arm64-cross - - env: Ubu=14.04 Cmd='make ppctest && make clean && make ppc64test' + - env: Ubu=14.04 Cmd='make ppctest' + dist: trusty + sudo: required + addons: + apt: + packages: + - qemu-system-ppc + - qemu-user-static + - gcc-powerpc-linux-gnu + + - env: Ubu=14.04 Cmd='make ppc64test' dist: trusty sudo: required addons: @@ -101,7 +111,6 @@ matrix: - qemu-system-ppc - qemu-user-static - gcc-powerpc-linux-gnu - - libc6-dev-armel-cross - env: Ubu=14.04 Cmd='make -C lib all && CFLAGS="-O1 -g" make -C zlibWrapper valgrindTest && make -C tests valgrindTest' os: linux @@ -114,7 +123,7 @@ matrix: - env: Ubu=14.04 Cmd="make gpptest && make clean && make gnu90test && make clean && make c99test && make clean && make gnu99test && make clean - && make clangtest && make clean && make -C contrib/pzstd googletest32 + && make clangtest && make clean && make -C contrib/pzstd googletest32 && make -C contrib/pzstd all32 && make -C contrib/pzstd check && make -C contrib/pzstd clean" os: linux dist: trusty diff --git a/Makefile b/Makefile index 8569ee66..14d1510a 100644 --- a/Makefile +++ b/Makefile @@ -50,6 +50,11 @@ zstd: @$(MAKE) -C $(PRGDIR) $@ cp $(PRGDIR)/zstd$(EXT) . +.PHONY: zstdmt +zstdmt: + @$(MAKE) -C $(PRGDIR) $@ + cp $(PRGDIR)/zstd$(EXT) ./zstdmt$(EXT) + .PHONY: zlibwrapper zlibwrapper: $(MAKE) -C $(ZWRAPDIR) test diff --git a/build/VS2008/zstd/zstd.vcproj b/build/VS2008/zstd/zstd.vcproj index 3bd5ed53..0beb59dd 100644 --- a/build/VS2008/zstd/zstd.vcproj +++ b/build/VS2008/zstd/zstd.vcproj @@ -380,6 +380,14 @@ RelativePath="..\..\..\lib\decompress\huf_decompress.c" > + + + + @@ -432,6 +440,10 @@ RelativePath="..\..\..\programs\zstdcli.c" > + + - - @@ -474,6 +482,14 @@ RelativePath="..\..\..\lib\common\mem.h" > + + + + @@ -490,6 +506,10 @@ RelativePath="..\..\..\lib\zstd.h" > + + @@ -534,6 +554,10 @@ RelativePath="..\..\..\lib\legacy\zstd_v07.h" > + + diff --git a/build/VS2010/zstd/zstd.vcxproj b/build/VS2010/zstd/zstd.vcxproj index 2b30966a..3939c554 100644 --- a/build/VS2010/zstd/zstd.vcxproj +++ b/build/VS2010/zstd/zstd.vcxproj @@ -21,11 +21,14 @@ + + + @@ -46,7 +49,10 @@ + + + @@ -67,6 +73,7 @@ + @@ -138,7 +145,7 @@ false - $(IncludePath);$(SolutionDir)..\..\lib;$(SolutionDir)..\..\lib\legacy;$(SolutionDir)..\..\lib\common;$(SolutionDir)..\..\lib\dictBuilder;$(UniversalCRT_IncludePath); + $(IncludePath);$(SolutionDir)..\..\lib;$(SolutionDir)..\..\lib\compress;$(SolutionDir)..\..\lib\legacy;$(SolutionDir)..\..\lib\common;$(SolutionDir)..\..\lib\dictBuilder;$(UniversalCRT_IncludePath); false $(LibraryPath); @@ -207,6 +214,7 @@ false false MultiThreaded + /DZSTD_MULTITHREAD %(AdditionalOptions) Console @@ -219,4 +227,4 @@ - + \ No newline at end of file diff --git a/build/cmake/lib/CMakeLists.txt b/build/cmake/lib/CMakeLists.txt index 34a639cd..db752784 100644 --- a/build/cmake/lib/CMakeLists.txt +++ b/build/cmake/lib/CMakeLists.txt @@ -1,30 +1,10 @@ # ################################################################ -# zstd - Makefile -# Copyright (C) Yann Collet 2014-2016 -# All rights reserved. -# -# BSD license -# -# Redistribution and use in source and binary forms, with or without modification, -# are permitted provided that the following conditions are met: -# -# * Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. -# -# * Redistributions in binary form must reproduce the above copyright notice, this -# list of conditions and the following disclaimer in the documentation and/or -# other materials provided with the distribution. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED -# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR -# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES -# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; -# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON -# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# * Copyright (c) 2014-present, Yann Collet, Facebook, Inc. +# * All rights reserved. +# * +# * This source code is licensed under the BSD-style license found in the +# * LICENSE file in the root directory of this source tree. An additional grant +# * of patent rights can be found in the PATENTS file in the same directory. # # You can contact the author at : # - zstd homepage : http://www.zstd.net/ @@ -58,13 +38,16 @@ MESSAGE("ZSTD VERSION ${LIBVER_MAJOR}.${LIBVER_MINOR}.${LIBVER_RELEASE}") SET(Sources ${LIBRARY_DIR}/common/entropy_common.c + ${LIBRARY_DIR}/common/fse_decompress.c + ${LIBRARY_DIR}/common/threading.c + ${LIBRARY_DIR}/common/pool.c ${LIBRARY_DIR}/common/zstd_common.c ${LIBRARY_DIR}/common/error_private.c ${LIBRARY_DIR}/common/xxhash.c - ${LIBRARY_DIR}/common/fse_decompress.c ${LIBRARY_DIR}/compress/fse_compress.c ${LIBRARY_DIR}/compress/huf_compress.c ${LIBRARY_DIR}/compress/zstd_compress.c + ${LIBRARY_DIR}/compress/zstdmt_compress.c ${LIBRARY_DIR}/decompress/huf_decompress.c ${LIBRARY_DIR}/decompress/zstd_decompress.c ${LIBRARY_DIR}/dictBuilder/cover.c diff --git a/build/cmake/programs/CMakeLists.txt b/build/cmake/programs/CMakeLists.txt index c2931b09..9b3c3acc 100644 --- a/build/cmake/programs/CMakeLists.txt +++ b/build/cmake/programs/CMakeLists.txt @@ -1,30 +1,10 @@ # ################################################################ -# zstd - Makefile -# Copyright (C) Yann Collet 2014-2016 -# All rights reserved. -# -# BSD license -# -# Redistribution and use in source and binary forms, with or without modification, -# are permitted provided that the following conditions are met: -# -# * Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. -# -# * Redistributions in binary form must reproduce the above copyright notice, this -# list of conditions and the following disclaimer in the documentation and/or -# other materials provided with the distribution. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED -# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR -# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES -# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; -# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON -# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# * Copyright (c) 2015-present, Yann Collet, Facebook, Inc. +# * All rights reserved. +# * +# * This source code is licensed under the BSD-style license found in the +# * LICENSE file in the root directory of this source tree. An additional grant +# * of patent rights can be found in the PATENTS file in the same directory. # # You can contact the author at : # - zstd homepage : http://www.zstd.net/ @@ -40,7 +20,7 @@ SET(ROOT_DIR ../../..) # Define programs directory, where sources and header files are located SET(LIBRARY_DIR ${ROOT_DIR}/lib) SET(PROGRAMS_DIR ${ROOT_DIR}/programs) -INCLUDE_DIRECTORIES(${PROGRAMS_DIR} ${LIBRARY_DIR} ${LIBRARY_DIR}/common ${LIBRARY_DIR}/dictBuilder) +INCLUDE_DIRECTORIES(${PROGRAMS_DIR} ${LIBRARY_DIR} ${LIBRARY_DIR}/common ${LIBRARY_DIR}/compression ${LIBRARY_DIR}/dictBuilder) IF (ZSTD_LEGACY_SUPPORT) SET(PROGRAMS_LEGACY_DIR ${PROGRAMS_DIR}/legacy) diff --git a/lib/Makefile b/lib/Makefile index 01b4183c..c4a5ecb9 100644 --- a/lib/Makefile +++ b/lib/Makefile @@ -38,6 +38,8 @@ CPPFLAGS += -I./legacy -DZSTD_LEGACY_SUPPORT=1 ZSTD_FILES+= $(wildcard legacy/*.c) endif +ZSTD_OBJ := $(patsubst %.c,%.o,$(ZSTD_FILES)) + # OS X linker doesn't support -soname, and use different extension # see : https://developer.apple.com/library/mac/documentation/DeveloperTools/Conceptual/DynamicLibraries/100-Articles/DynamicLibraryDesignGuidelines.html ifeq ($(shell uname), Darwin) @@ -62,10 +64,9 @@ default: lib all: lib libzstd.a: ARFLAGS = rcs -libzstd.a: $(ZSTD_FILES) +libzstd.a: $(ZSTD_OBJ) @echo compiling static library - @$(CC) $(FLAGS) -c $^ - @$(AR) $(ARFLAGS) $@ *.o + @$(AR) $(ARFLAGS) $@ $^ $(LIBZSTD): LDFLAGS += -shared -fPIC -fvisibility=hidden $(LIBZSTD): $(ZSTD_FILES) @@ -86,7 +87,7 @@ lib: libzstd.a libzstd clean: @$(RM) core *.o *.a *.gcda *.$(SHARED_EXT) *.$(SHARED_EXT).* libzstd.pc dll/libzstd.dll dll/libzstd.lib - @$(RM) decompress/*.o + @$(RM) common/*.o compress/*.o decompress/*.o dictBuilder/*.o legacy/*.o deprecated/*.o @echo Cleaning library completed #----------------------------------------------------------------------------- diff --git a/lib/common/pool.c b/lib/common/pool.c new file mode 100644 index 00000000..693217f2 --- /dev/null +++ b/lib/common/pool.c @@ -0,0 +1,194 @@ +/** + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + + +/* ====== Dependencies ======= */ +#include /* size_t */ +#include /* malloc, calloc, free */ +#include "pool.h" + +/* ====== Compiler specifics ====== */ +#if defined(_MSC_VER) +# pragma warning(disable : 4204) /* disable: C4204: non-constant aggregate initializer */ +#endif + + +#ifdef ZSTD_MULTITHREAD + +#include /* pthread adaptation */ + +/* A job is a function and an opaque argument */ +typedef struct POOL_job_s { + POOL_function function; + void *opaque; +} POOL_job; + +struct POOL_ctx_s { + /* Keep track of the threads */ + pthread_t *threads; + size_t numThreads; + + /* The queue is a circular buffer */ + POOL_job *queue; + size_t queueHead; + size_t queueTail; + size_t queueSize; + /* The mutex protects the queue */ + pthread_mutex_t queueMutex; + /* Condition variable for pushers to wait on when the queue is full */ + pthread_cond_t queuePushCond; + /* Condition variables for poppers to wait on when the queue is empty */ + pthread_cond_t queuePopCond; + /* Indicates if the queue is shutting down */ + int shutdown; +}; + +/* POOL_thread() : + Work thread for the thread pool. + Waits for jobs and executes them. + @returns : NULL on failure else non-null. +*/ +static void* POOL_thread(void* opaque) { + POOL_ctx* const ctx = (POOL_ctx*)opaque; + if (!ctx) { return NULL; } + for (;;) { + /* Lock the mutex and wait for a non-empty queue or until shutdown */ + pthread_mutex_lock(&ctx->queueMutex); + while (ctx->queueHead == ctx->queueTail && !ctx->shutdown) { + pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex); + } + /* empty => shutting down: so stop */ + if (ctx->queueHead == ctx->queueTail) { + pthread_mutex_unlock(&ctx->queueMutex); + return opaque; + } + /* Pop a job off the queue */ + { POOL_job const job = ctx->queue[ctx->queueHead]; + ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize; + /* Unlock the mutex, signal a pusher, and run the job */ + pthread_mutex_unlock(&ctx->queueMutex); + pthread_cond_signal(&ctx->queuePushCond); + job.function(job.opaque); + } + } + /* Unreachable */ +} + +POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) { + POOL_ctx *ctx; + /* Check the parameters */ + if (!numThreads || !queueSize) { return NULL; } + /* Allocate the context and zero initialize */ + ctx = (POOL_ctx *)calloc(1, sizeof(POOL_ctx)); + if (!ctx) { return NULL; } + /* Initialize the job queue. + * It needs one extra space since one space is wasted to differentiate empty + * and full queues. + */ + ctx->queueSize = queueSize + 1; + ctx->queue = (POOL_job *)malloc(ctx->queueSize * sizeof(POOL_job)); + ctx->queueHead = 0; + ctx->queueTail = 0; + pthread_mutex_init(&ctx->queueMutex, NULL); + pthread_cond_init(&ctx->queuePushCond, NULL); + pthread_cond_init(&ctx->queuePopCond, NULL); + ctx->shutdown = 0; + /* Allocate space for the thread handles */ + ctx->threads = (pthread_t *)malloc(numThreads * sizeof(pthread_t)); + ctx->numThreads = 0; + /* Check for errors */ + if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; } + /* Initialize the threads */ + { size_t i; + for (i = 0; i < numThreads; ++i) { + if (pthread_create(&ctx->threads[i], NULL, &POOL_thread, ctx)) { + ctx->numThreads = i; + POOL_free(ctx); + return NULL; + } } + ctx->numThreads = numThreads; + } + return ctx; +} + +/*! POOL_join() : + Shutdown the queue, wake any sleeping threads, and join all of the threads. +*/ +static void POOL_join(POOL_ctx *ctx) { + /* Shut down the queue */ + pthread_mutex_lock(&ctx->queueMutex); + ctx->shutdown = 1; + pthread_mutex_unlock(&ctx->queueMutex); + /* Wake up sleeping threads */ + pthread_cond_broadcast(&ctx->queuePushCond); + pthread_cond_broadcast(&ctx->queuePopCond); + /* Join all of the threads */ + { size_t i; + for (i = 0; i < ctx->numThreads; ++i) { + pthread_join(ctx->threads[i], NULL); + } } +} + +void POOL_free(POOL_ctx *ctx) { + if (!ctx) { return; } + POOL_join(ctx); + pthread_mutex_destroy(&ctx->queueMutex); + pthread_cond_destroy(&ctx->queuePushCond); + pthread_cond_destroy(&ctx->queuePopCond); + if (ctx->queue) free(ctx->queue); + if (ctx->threads) free(ctx->threads); + free(ctx); +} + +void POOL_add(void *ctxVoid, POOL_function function, void *opaque) { + POOL_ctx *ctx = (POOL_ctx *)ctxVoid; + if (!ctx) { return; } + + pthread_mutex_lock(&ctx->queueMutex); + { POOL_job const job = {function, opaque}; + /* Wait until there is space in the queue for the new job */ + size_t newTail = (ctx->queueTail + 1) % ctx->queueSize; + while (ctx->queueHead == newTail && !ctx->shutdown) { + pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex); + newTail = (ctx->queueTail + 1) % ctx->queueSize; + } + /* The queue is still going => there is space */ + if (!ctx->shutdown) { + ctx->queue[ctx->queueTail] = job; + ctx->queueTail = newTail; + } + } + pthread_mutex_unlock(&ctx->queueMutex); + pthread_cond_signal(&ctx->queuePopCond); +} + +#else /* ZSTD_MULTITHREAD not defined */ +/* No multi-threading support */ + +/* We don't need any data, but if it is empty malloc() might return NULL. */ +struct POOL_ctx_s { + int data; +}; + +POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) { + (void)numThreads; + (void)queueSize; + return (POOL_ctx *)malloc(sizeof(POOL_ctx)); +} + +void POOL_free(POOL_ctx *ctx) { + if (ctx) free(ctx); +} + +void POOL_add(void *ctx, POOL_function function, void *opaque) { + (void)ctx; + function(opaque); +} + +#endif /* ZSTD_MULTITHREAD */ diff --git a/lib/common/pool.h b/lib/common/pool.h new file mode 100644 index 00000000..c26f543f --- /dev/null +++ b/lib/common/pool.h @@ -0,0 +1,46 @@ +/** + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ +#ifndef POOL_H +#define POOL_H + +#include /* size_t */ + +typedef struct POOL_ctx_s POOL_ctx; + +/*! POOL_create() : + Create a thread pool with at most `numThreads` threads. + `numThreads` must be at least 1. + The maximum number of queued jobs before blocking is `queueSize`. + `queueSize` must be at least 1. + @return : The POOL_ctx pointer on success else NULL. +*/ +POOL_ctx *POOL_create(size_t numThreads, size_t queueSize); + +/*! POOL_free() : + Free a thread pool returned by POOL_create(). +*/ +void POOL_free(POOL_ctx *ctx); + +/*! POOL_function : + The function type that can be added to a thread pool. +*/ +typedef void (*POOL_function)(void *); +/*! POOL_add_function : + The function type for a generic thread pool add function. +*/ +typedef void (*POOL_add_function)(void *, POOL_function, void *); + +/*! POOL_add() : + Add the job `function(opaque)` to the thread pool. + Possibly blocks until there is room in the queue. + Note : The function may be executed asynchronously, so `opaque` must live until the function has been completed. +*/ +void POOL_add(void *ctx, POOL_function function, void *opaque); + +#endif diff --git a/lib/common/threading.c b/lib/common/threading.c new file mode 100644 index 00000000..b56e594b --- /dev/null +++ b/lib/common/threading.c @@ -0,0 +1,79 @@ + +/** + * Copyright (c) 2016 Tino Reichardt + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + * + * You can contact the author at: + * - zstdmt source repository: https://github.com/mcmilk/zstdmt + */ + +/** + * This file will hold wrapper for systems, which do not support pthreads + */ + +/* ====== Compiler specifics ====== */ +#if defined(_MSC_VER) +# pragma warning(disable : 4206) /* disable: C4206: translation unit is empty (when ZSTD_MULTITHREAD is not defined) */ +#endif + + +#if defined(ZSTD_MULTITHREAD) && defined(_WIN32) + +/** + * Windows minimalist Pthread Wrapper, based on : + * http://www.cse.wustl.edu/~schmidt/win32-cv-1.html + */ + + +/* === Dependencies === */ +#include +#include +#include "threading.h" + + +/* === Implementation === */ + +static unsigned __stdcall worker(void *arg) +{ + pthread_t* const thread = (pthread_t*) arg; + thread->arg = thread->start_routine(thread->arg); + return 0; +} + +int pthread_create(pthread_t* thread, const void* unused, + void* (*start_routine) (void*), void* arg) +{ + (void)unused; + thread->arg = arg; + thread->start_routine = start_routine; + thread->handle = (HANDLE) _beginthreadex(NULL, 0, worker, thread, 0, NULL); + + if (!thread->handle) + return errno; + else + return 0; +} + +int _pthread_join(pthread_t * thread, void **value_ptr) +{ + DWORD result; + + if (!thread->handle) return 0; + + result = WaitForSingleObject(thread->handle, INFINITE); + switch (result) { + case WAIT_OBJECT_0: + if (value_ptr) *value_ptr = thread->arg; + return 0; + case WAIT_ABANDONED: + return EINVAL; + default: + return GetLastError(); + } +} + +#endif /* ZSTD_MULTITHREAD */ diff --git a/lib/common/threading.h b/lib/common/threading.h new file mode 100644 index 00000000..74b2ec04 --- /dev/null +++ b/lib/common/threading.h @@ -0,0 +1,104 @@ + +/** + * Copyright (c) 2016 Tino Reichardt + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + * + * You can contact the author at: + * - zstdmt source repository: https://github.com/mcmilk/zstdmt + */ + +#ifndef THREADING_H_938743 +#define THREADING_H_938743 + +#if defined (__cplusplus) +extern "C" { +#endif + +#if defined(ZSTD_MULTITHREAD) && defined(_WIN32) + +/** + * Windows minimalist Pthread Wrapper, based on : + * http://www.cse.wustl.edu/~schmidt/win32-cv-1.html + */ +#ifdef WINVER +# undef WINVER +#endif +#define WINVER 0x0600 + +#ifdef _WIN32_WINNT +# undef _WIN32_WINNT +#endif +#define _WIN32_WINNT 0x0600 + +#ifndef WIN32_LEAN_AND_MEAN +# define WIN32_LEAN_AND_MEAN +#endif + +#include + +/* mutex */ +#define pthread_mutex_t CRITICAL_SECTION +#define pthread_mutex_init(a,b) InitializeCriticalSection((a)) +#define pthread_mutex_destroy(a) DeleteCriticalSection((a)) +#define pthread_mutex_lock(a) EnterCriticalSection((a)) +#define pthread_mutex_unlock(a) LeaveCriticalSection((a)) + +/* condition variable */ +#define pthread_cond_t CONDITION_VARIABLE +#define pthread_cond_init(a, b) InitializeConditionVariable((a)) +#define pthread_cond_destroy(a) /* No delete */ +#define pthread_cond_wait(a, b) SleepConditionVariableCS((a), (b), INFINITE) +#define pthread_cond_signal(a) WakeConditionVariable((a)) +#define pthread_cond_broadcast(a) WakeAllConditionVariable((a)) + +/* pthread_create() and pthread_join() */ +typedef struct { + HANDLE handle; + void* (*start_routine)(void*); + void* arg; +} pthread_t; + +int pthread_create(pthread_t* thread, const void* unused, + void* (*start_routine) (void*), void* arg); + +#define pthread_join(a, b) _pthread_join(&(a), (b)) +int _pthread_join(pthread_t* thread, void** value_ptr); + +/** + * add here more wrappers as required + */ + + +#elif defined(ZSTD_MULTITHREAD) /* posix assumed ; need a better detection mathod */ +/* === POSIX Systems === */ +# include + +#else /* ZSTD_MULTITHREAD not defined */ +/* No multithreading support */ + +#define pthread_mutex_t int /* #define rather than typedef, as sometimes pthread support is implicit, resulting in duplicated symbols */ +#define pthread_mutex_init(a,b) +#define pthread_mutex_destroy(a) +#define pthread_mutex_lock(a) +#define pthread_mutex_unlock(a) + +#define pthread_cond_t int +#define pthread_cond_init(a,b) +#define pthread_cond_destroy(a) +#define pthread_cond_wait(a,b) +#define pthread_cond_signal(a) +#define pthread_cond_broadcast(a) + +/* do not use pthread_t */ + +#endif /* ZSTD_MULTITHREAD */ + +#if defined (__cplusplus) +} +#endif + +#endif /* THREADING_H_938743 */ diff --git a/lib/common/zstd_internal.h b/lib/common/zstd_internal.h index 96e05775..4b56ce1a 100644 --- a/lib/common/zstd_internal.h +++ b/lib/common/zstd_internal.h @@ -267,4 +267,13 @@ MEM_STATIC U32 ZSTD_highbit32(U32 val) } +/* hidden functions */ + +/* ZSTD_invalidateRepCodes() : + * ensures next compression will not use repcodes from previous block. + * Note : only works with regular variant; + * do not use with extDict variant ! */ +void ZSTD_invalidateRepCodes(ZSTD_CCtx* cctx); + + #endif /* ZSTD_CCOMMON_H_MODULE */ diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index df53db47..b6cf3764 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -60,10 +60,11 @@ struct ZSTD_CCtx_s { U32 nextToUpdate; /* index from which to continue dictionary update */ U32 nextToUpdate3; /* index from which to continue dictionary update */ U32 hashLog3; /* dispatch table : larger == faster, more memory */ - U32 loadedDictEnd; + U32 loadedDictEnd; /* index of end of dictionary */ + U32 forceWindow; /* force back-references to respect limit of 1<customMem), &customMem, sizeof(customMem)); + cctx->customMem = customMem; return cctx; } @@ -118,6 +119,15 @@ size_t ZSTD_sizeof_CCtx(const ZSTD_CCtx* cctx) return sizeof(*cctx) + cctx->workSpaceSize; } +size_t ZSTD_setCCtxParameter(ZSTD_CCtx* cctx, ZSTD_CCtxParameter param, unsigned value) +{ + switch(param) + { + case ZSTD_p_forceWindow : cctx->forceWindow = value>0; cctx->loadedDictEnd = 0; return 0; + default: return ERROR(parameter_unknown); + } +} + const seqStore_t* ZSTD_getSeqStore(const ZSTD_CCtx* ctx) /* hidden interface */ { return &(ctx->seqStore); @@ -317,6 +327,14 @@ static size_t ZSTD_resetCCtx_advanced (ZSTD_CCtx* zc, } } +/* ZSTD_invalidateRepCodes() : + * ensures next compression will not use repcodes from previous block. + * Note : only works with regular variant; + * do not use with extDict variant ! */ +void ZSTD_invalidateRepCodes(ZSTD_CCtx* cctx) { + int i; + for (i=0; irep[i] = 0; +} /*! ZSTD_copyCCtx() : * Duplicate an existing context `srcCCtx` into another one `dstCCtx`. @@ -734,12 +752,19 @@ _check_compressibility: if ((size_t)(op-ostart) >= maxCSize) return 0; } /* confirm repcodes */ - { int i; for (i=0; irep[i] = zc->savedRep[i]; } + { int i; for (i=0; irep[i] = zc->repToConfirm[i]; } return op - ostart; } +#if 0 /* for debug */ +# define STORESEQ_DEBUG +#include /* fprintf */ +U32 g_startDebug = 0; +const BYTE* g_start = NULL; +#endif + /*! ZSTD_storeSeq() : Store a sequence (literal length, literals, offset code and match length code) into seqStore_t. `offsetCode` : distance to match, or 0 == repCode. @@ -747,13 +772,14 @@ _check_compressibility: */ MEM_STATIC void ZSTD_storeSeq(seqStore_t* seqStorePtr, size_t litLength, const void* literals, U32 offsetCode, size_t matchCode) { -#if 0 /* for debug */ - static const BYTE* g_start = NULL; - const U32 pos = (U32)((const BYTE*)literals - g_start); - if (g_start==NULL) g_start = (const BYTE*)literals; - //if ((pos > 1) && (pos < 50000)) - printf("Cpos %6u :%5u literals & match %3u bytes at distance %6u \n", - pos, (U32)litLength, (U32)matchCode+MINMATCH, (U32)offsetCode); +#ifdef STORESEQ_DEBUG + if (g_startDebug) { + const U32 pos = (U32)((const BYTE*)literals - g_start); + if (g_start==NULL) g_start = (const BYTE*)literals; + if ((pos > 1895000) && (pos < 1895300)) + fprintf(stderr, "Cpos %6u :%5u literals & match %3u bytes at distance %6u \n", + pos, (U32)litLength, (U32)matchCode+MINMATCH, (U32)offsetCode); + } #endif /* copy Literals */ ZSTD_wildcopy(seqStorePtr->lit, literals, litLength); @@ -1003,8 +1029,8 @@ void ZSTD_compressBlock_fast_generic(ZSTD_CCtx* cctx, } } } /* save reps for next block */ - cctx->savedRep[0] = offset_1 ? offset_1 : offsetSaved; - cctx->savedRep[1] = offset_2 ? offset_2 : offsetSaved; + cctx->repToConfirm[0] = offset_1 ? offset_1 : offsetSaved; + cctx->repToConfirm[1] = offset_2 ? offset_2 : offsetSaved; /* Last Literals */ { size_t const lastLLSize = iend - anchor; @@ -1118,7 +1144,7 @@ static void ZSTD_compressBlock_fast_extDict_generic(ZSTD_CCtx* ctx, } } } /* save reps for next block */ - ctx->savedRep[0] = offset_1; ctx->savedRep[1] = offset_2; + ctx->repToConfirm[0] = offset_1; ctx->repToConfirm[1] = offset_2; /* Last Literals */ { size_t const lastLLSize = iend - anchor; @@ -1272,8 +1298,8 @@ void ZSTD_compressBlock_doubleFast_generic(ZSTD_CCtx* cctx, } } } /* save reps for next block */ - cctx->savedRep[0] = offset_1 ? offset_1 : offsetSaved; - cctx->savedRep[1] = offset_2 ? offset_2 : offsetSaved; + cctx->repToConfirm[0] = offset_1 ? offset_1 : offsetSaved; + cctx->repToConfirm[1] = offset_2 ? offset_2 : offsetSaved; /* Last Literals */ { size_t const lastLLSize = iend - anchor; @@ -1422,7 +1448,7 @@ static void ZSTD_compressBlock_doubleFast_extDict_generic(ZSTD_CCtx* ctx, } } } /* save reps for next block */ - ctx->savedRep[0] = offset_1; ctx->savedRep[1] = offset_2; + ctx->repToConfirm[0] = offset_1; ctx->repToConfirm[1] = offset_2; /* Last Literals */ { size_t const lastLLSize = iend - anchor; @@ -1954,8 +1980,8 @@ _storeSequence: } } /* Save reps for next block */ - ctx->savedRep[0] = offset_1 ? offset_1 : savedOffset; - ctx->savedRep[1] = offset_2 ? offset_2 : savedOffset; + ctx->repToConfirm[0] = offset_1 ? offset_1 : savedOffset; + ctx->repToConfirm[1] = offset_2 ? offset_2 : savedOffset; /* Last Literals */ { size_t const lastLLSize = iend - anchor; @@ -2149,7 +2175,7 @@ _storeSequence: } } /* Save reps for next block */ - ctx->savedRep[0] = offset_1; ctx->savedRep[1] = offset_2; + ctx->repToConfirm[0] = offset_1; ctx->repToConfirm[1] = offset_2; /* Last Literals */ { size_t const lastLLSize = iend - anchor; @@ -2408,12 +2434,14 @@ static size_t ZSTD_compressContinue_internal (ZSTD_CCtx* cctx, cctx->nextSrc = ip + srcSize; - { size_t const cSize = frame ? + if (srcSize) { + size_t const cSize = frame ? ZSTD_compress_generic (cctx, dst, dstCapacity, src, srcSize, lastFrameChunk) : ZSTD_compressBlock_internal (cctx, dst, dstCapacity, src, srcSize); if (ZSTD_isError(cSize)) return cSize; return cSize + fhSize; - } + } else + return fhSize; } @@ -2449,7 +2477,7 @@ static size_t ZSTD_loadDictionaryContent(ZSTD_CCtx* zc, const void* src, size_t zc->dictBase = zc->base; zc->base += ip - zc->nextSrc; zc->nextToUpdate = zc->dictLimit; - zc->loadedDictEnd = (U32)(iend - zc->base); + zc->loadedDictEnd = zc->forceWindow ? 0 : (U32)(iend - zc->base); zc->nextSrc = iend; if (srcSize <= HASH_READ_SIZE) return 0; @@ -2593,7 +2621,6 @@ static size_t ZSTD_compress_insertDictionary(ZSTD_CCtx* zc, const void* dict, si } } - /*! ZSTD_compressBegin_internal() : * @return : 0, or an error code */ static size_t ZSTD_compressBegin_internal(ZSTD_CCtx* cctx, @@ -2625,9 +2652,9 @@ size_t ZSTD_compressBegin_usingDict(ZSTD_CCtx* cctx, const void* dict, size_t di } -size_t ZSTD_compressBegin(ZSTD_CCtx* zc, int compressionLevel) +size_t ZSTD_compressBegin(ZSTD_CCtx* cctx, int compressionLevel) { - return ZSTD_compressBegin_usingDict(zc, NULL, 0, compressionLevel); + return ZSTD_compressBegin_usingDict(cctx, NULL, 0, compressionLevel); } @@ -2815,7 +2842,7 @@ static ZSTD_parameters ZSTD_getParamsFromCDict(const ZSTD_CDict* cdict) { return ZSTD_getParamsFromCCtx(cdict->refContext); } -size_t ZSTD_compressBegin_usingCDict(ZSTD_CCtx* cctx, const ZSTD_CDict* cdict, U64 pledgedSrcSize) +size_t ZSTD_compressBegin_usingCDict(ZSTD_CCtx* cctx, const ZSTD_CDict* cdict, unsigned long long pledgedSrcSize) { if (cdict->dictContentSize) CHECK_F(ZSTD_copyCCtx(cctx, cdict->refContext, pledgedSrcSize)) else CHECK_F(ZSTD_compressBegin_advanced(cctx, NULL, 0, cdict->refContext->params, pledgedSrcSize)); diff --git a/lib/compress/zstd_opt.h b/lib/compress/zstd_opt.h index f071c4f3..8393e7b4 100644 --- a/lib/compress/zstd_opt.h +++ b/lib/compress/zstd_opt.h @@ -38,7 +38,7 @@ MEM_STATIC void ZSTD_rescaleFreqs(seqStore_t* ssPtr, const BYTE* src, size_t src ssPtr->cachedLiterals = NULL; ssPtr->cachedPrice = ssPtr->cachedLitLength = 0; - ssPtr->staticPrices = 0; + ssPtr->staticPrices = 0; if (ssPtr->litLengthSum == 0) { if (srcSize <= 1024) ssPtr->staticPrices = 1; @@ -56,7 +56,7 @@ MEM_STATIC void ZSTD_rescaleFreqs(seqStore_t* ssPtr, const BYTE* src, size_t src for (u=0; u<=MaxLit; u++) { ssPtr->litFreq[u] = 1 + (ssPtr->litFreq[u]>>ZSTD_FREQ_DIV); - ssPtr->litSum += ssPtr->litFreq[u]; + ssPtr->litSum += ssPtr->litFreq[u]; } for (u=0; u<=MaxLL; u++) ssPtr->litLengthFreq[u] = 1; @@ -634,7 +634,7 @@ _storeSequence: /* cur, last_pos, best_mlen, best_off have to be set */ } } /* for (cur=0; cur < last_pos; ) */ /* Save reps for next block */ - { int i; for (i=0; isavedRep[i] = rep[i]; } + { int i; for (i=0; irepToConfirm[i] = rep[i]; } /* Last Literals */ { size_t const lastLLSize = iend - anchor; @@ -907,7 +907,7 @@ _storeSequence: /* cur, last_pos, best_mlen, best_off have to be set */ } } /* for (cur=0; cur < last_pos; ) */ /* Save reps for next block */ - { int i; for (i=0; isavedRep[i] = rep[i]; } + { int i; for (i=0; irepToConfirm[i] = rep[i]; } /* Last Literals */ { size_t lastLLSize = iend - anchor; diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c new file mode 100644 index 00000000..5f0bf2ab --- /dev/null +++ b/lib/compress/zstdmt_compress.c @@ -0,0 +1,732 @@ +/** + * Copyright (c) 2016-present, Yann Collet, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + + +/* ====== Tuning parameters ====== */ +#define ZSTDMT_NBTHREADS_MAX 128 + + +/* ====== Compiler specifics ====== */ +#if defined(_MSC_VER) +# pragma warning(disable : 4204) /* disable: C4204: non-constant aggregate initializer */ +#endif + + +/* ====== Dependencies ====== */ +#include /* malloc */ +#include /* memcpy */ +#include /* threadpool */ +#include "threading.h" /* mutex */ +#include "zstd_internal.h" /* MIN, ERROR, ZSTD_*, ZSTD_highbit32 */ +#include "zstdmt_compress.h" +#define XXH_STATIC_LINKING_ONLY /* XXH64_state_t */ +#include "xxhash.h" + + +/* ====== Debug ====== */ +#if 0 + +# include +# include +# include + static unsigned g_debugLevel = 3; +# define DEBUGLOGRAW(l, ...) if (l<=g_debugLevel) { fprintf(stderr, __VA_ARGS__); } +# define DEBUGLOG(l, ...) if (l<=g_debugLevel) { fprintf(stderr, __FILE__ ": "); fprintf(stderr, __VA_ARGS__); fprintf(stderr, " \n"); } + +# define DEBUG_PRINTHEX(l,p,n) { \ + unsigned debug_u; \ + for (debug_u=0; debug_u<(n); debug_u++) \ + DEBUGLOGRAW(l, "%02X ", ((const unsigned char*)(p))[debug_u]); \ + DEBUGLOGRAW(l, " \n"); \ +} + +static unsigned long long GetCurrentClockTimeMicroseconds() +{ + static clock_t _ticksPerSecond = 0; + if (_ticksPerSecond <= 0) _ticksPerSecond = sysconf(_SC_CLK_TCK); + + struct tms junk; clock_t newTicks = (clock_t) times(&junk); + return ((((unsigned long long)newTicks)*(1000000))/_ticksPerSecond); +} + +#define MUTEX_WAIT_TIME_DLEVEL 5 +#define PTHREAD_MUTEX_LOCK(mutex) \ +if (g_debugLevel>=MUTEX_WAIT_TIME_DLEVEL) { \ + unsigned long long beforeTime = GetCurrentClockTimeMicroseconds(); \ + pthread_mutex_lock(mutex); \ + unsigned long long afterTime = GetCurrentClockTimeMicroseconds(); \ + unsigned long long elapsedTime = (afterTime-beforeTime); \ + if (elapsedTime > 1000) { /* or whatever threshold you like; I'm using 1 millisecond here */ \ + DEBUGLOG(MUTEX_WAIT_TIME_DLEVEL, "Thread took %llu microseconds to acquire mutex %s \n", \ + elapsedTime, #mutex); \ + } \ +} else pthread_mutex_lock(mutex); + +#else + +# define DEBUGLOG(l, ...) {} /* disabled */ +# define PTHREAD_MUTEX_LOCK(m) pthread_mutex_lock(m) +# define DEBUG_PRINTHEX(l,p,n) {} + +#endif + + +/* ===== Buffer Pool ===== */ + +typedef struct buffer_s { + void* start; + size_t size; +} buffer_t; + +static const buffer_t g_nullBuffer = { NULL, 0 }; + +typedef struct ZSTDMT_bufferPool_s { + unsigned totalBuffers; + unsigned nbBuffers; + buffer_t bTable[1]; /* variable size */ +} ZSTDMT_bufferPool; + +static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned nbThreads) +{ + unsigned const maxNbBuffers = 2*nbThreads + 2; + ZSTDMT_bufferPool* const bufPool = (ZSTDMT_bufferPool*)calloc(1, sizeof(ZSTDMT_bufferPool) + (maxNbBuffers-1) * sizeof(buffer_t)); + if (bufPool==NULL) return NULL; + bufPool->totalBuffers = maxNbBuffers; + bufPool->nbBuffers = 0; + return bufPool; +} + +static void ZSTDMT_freeBufferPool(ZSTDMT_bufferPool* bufPool) +{ + unsigned u; + if (!bufPool) return; /* compatibility with free on NULL */ + for (u=0; utotalBuffers; u++) + free(bufPool->bTable[u].start); + free(bufPool); +} + +/* assumption : invocation from main thread only ! */ +static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* pool, size_t bSize) +{ + if (pool->nbBuffers) { /* try to use an existing buffer */ + buffer_t const buf = pool->bTable[--(pool->nbBuffers)]; + size_t const availBufferSize = buf.size; + if ((availBufferSize >= bSize) & (availBufferSize <= 10*bSize)) /* large enough, but not too much */ + return buf; + free(buf.start); /* size conditions not respected : scratch this buffer and create a new one */ + } + /* create new buffer */ + { buffer_t buffer; + void* const start = malloc(bSize); + if (start==NULL) bSize = 0; + buffer.start = start; /* note : start can be NULL if malloc fails ! */ + buffer.size = bSize; + return buffer; + } +} + +/* store buffer for later re-use, up to pool capacity */ +static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* pool, buffer_t buf) +{ + if (buf.start == NULL) return; /* release on NULL */ + if (pool->nbBuffers < pool->totalBuffers) { + pool->bTable[pool->nbBuffers++] = buf; /* store for later re-use */ + return; + } + /* Reached bufferPool capacity (should not happen) */ + free(buf.start); +} + + +/* ===== CCtx Pool ===== */ + +typedef struct { + unsigned totalCCtx; + unsigned availCCtx; + ZSTD_CCtx* cctx[1]; /* variable size */ +} ZSTDMT_CCtxPool; + +/* assumption : CCtxPool invocation only from main thread */ + +/* note : all CCtx borrowed from the pool should be released back to the pool _before_ freeing the pool */ +static void ZSTDMT_freeCCtxPool(ZSTDMT_CCtxPool* pool) +{ + unsigned u; + for (u=0; utotalCCtx; u++) + ZSTD_freeCCtx(pool->cctx[u]); /* note : compatible with free on NULL */ + free(pool); +} + +/* ZSTDMT_createCCtxPool() : + * implies nbThreads >= 1 , checked by caller ZSTDMT_createCCtx() */ +static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(unsigned nbThreads) +{ + ZSTDMT_CCtxPool* const cctxPool = (ZSTDMT_CCtxPool*) calloc(1, sizeof(ZSTDMT_CCtxPool) + (nbThreads-1)*sizeof(ZSTD_CCtx*)); + if (!cctxPool) return NULL; + cctxPool->totalCCtx = nbThreads; + cctxPool->availCCtx = 1; /* at least one cctx for single-thread mode */ + cctxPool->cctx[0] = ZSTD_createCCtx(); + if (!cctxPool->cctx[0]) { ZSTDMT_freeCCtxPool(cctxPool); return NULL; } + DEBUGLOG(1, "cctxPool created, with %u threads", nbThreads); + return cctxPool; +} + +static ZSTD_CCtx* ZSTDMT_getCCtx(ZSTDMT_CCtxPool* pool) +{ + if (pool->availCCtx) { + pool->availCCtx--; + return pool->cctx[pool->availCCtx]; + } + return ZSTD_createCCtx(); /* note : can be NULL, when creation fails ! */ +} + +static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx) +{ + if (cctx==NULL) return; /* compatibility with release on NULL */ + if (pool->availCCtx < pool->totalCCtx) + pool->cctx[pool->availCCtx++] = cctx; + else + /* pool overflow : should not happen, since totalCCtx==nbThreads */ + ZSTD_freeCCtx(cctx); +} + + +/* ===== Thread worker ===== */ + +typedef struct { + buffer_t buffer; + size_t filled; +} inBuff_t; + +typedef struct { + ZSTD_CCtx* cctx; + buffer_t src; + const void* srcStart; + size_t srcSize; + size_t dictSize; + buffer_t dstBuff; + size_t cSize; + size_t dstFlushed; + unsigned firstChunk; + unsigned lastChunk; + unsigned jobCompleted; + unsigned jobScanned; + pthread_mutex_t* jobCompleted_mutex; + pthread_cond_t* jobCompleted_cond; + ZSTD_parameters params; + ZSTD_CDict* cdict; + unsigned long long fullFrameSize; +} ZSTDMT_jobDescription; + +/* ZSTDMT_compressChunk() : POOL_function type */ +void ZSTDMT_compressChunk(void* jobDescription) +{ + ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription; + const void* const src = (const char*)job->srcStart + job->dictSize; + buffer_t const dstBuff = job->dstBuff; + DEBUGLOG(3, "job (first:%u) (last:%u) : dictSize %u, srcSize %u", job->firstChunk, job->lastChunk, (U32)job->dictSize, (U32)job->srcSize); + if (job->cdict) { + size_t const initError = ZSTD_compressBegin_usingCDict(job->cctx, job->cdict, job->fullFrameSize); + if (job->cdict) DEBUGLOG(3, "using CDict "); + if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; } + } else { + size_t const initError = ZSTD_compressBegin_advanced(job->cctx, job->srcStart, job->dictSize, job->params, job->fullFrameSize); + if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; } + ZSTD_setCCtxParameter(job->cctx, ZSTD_p_forceWindow, 1); + } + if (!job->firstChunk) { /* flush frame header */ + size_t const hSize = ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, src, 0); + if (ZSTD_isError(hSize)) { job->cSize = hSize; goto _endJob; } + ZSTD_invalidateRepCodes(job->cctx); + } + + DEBUGLOG(4, "Compressing : "); + DEBUG_PRINTHEX(4, job->srcStart, 12); + job->cSize = (job->lastChunk) ? /* last chunk signal */ + ZSTD_compressEnd (job->cctx, dstBuff.start, dstBuff.size, src, job->srcSize) : + ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, src, job->srcSize); + DEBUGLOG(3, "compressed %u bytes into %u bytes (first:%u) (last:%u)", (unsigned)job->srcSize, (unsigned)job->cSize, job->firstChunk, job->lastChunk); + +_endJob: + PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); + job->jobCompleted = 1; + job->jobScanned = 0; + pthread_cond_signal(job->jobCompleted_cond); + pthread_mutex_unlock(job->jobCompleted_mutex); +} + + +/* ------------------------------------------ */ +/* ===== Multi-threaded compression ===== */ +/* ------------------------------------------ */ + +struct ZSTDMT_CCtx_s { + POOL_ctx* factory; + ZSTDMT_bufferPool* buffPool; + ZSTDMT_CCtxPool* cctxPool; + pthread_mutex_t jobCompleted_mutex; + pthread_cond_t jobCompleted_cond; + size_t targetSectionSize; + size_t inBuffSize; + size_t dictSize; + size_t targetDictSize; + inBuff_t inBuff; + ZSTD_parameters params; + XXH64_state_t xxhState; + unsigned nbThreads; + unsigned jobIDMask; + unsigned doneJobID; + unsigned nextJobID; + unsigned frameEnded; + unsigned allJobsCompleted; + unsigned overlapWrLog; + unsigned long long frameContentSize; + size_t sectionSize; + ZSTD_CDict* cdict; + ZSTD_CStream* cstream; + ZSTDMT_jobDescription jobs[1]; /* variable size (must lies at the end) */ +}; + +ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads) +{ + ZSTDMT_CCtx* cctx; + U32 const minNbJobs = nbThreads + 2; + U32 const nbJobsLog2 = ZSTD_highbit32(minNbJobs) + 1; + U32 const nbJobs = 1 << nbJobsLog2; + DEBUGLOG(5, "nbThreads : %u ; minNbJobs : %u ; nbJobsLog2 : %u ; nbJobs : %u \n", + nbThreads, minNbJobs, nbJobsLog2, nbJobs); + if ((nbThreads < 1) | (nbThreads > ZSTDMT_NBTHREADS_MAX)) return NULL; + cctx = (ZSTDMT_CCtx*) calloc(1, sizeof(ZSTDMT_CCtx) + nbJobs*sizeof(ZSTDMT_jobDescription)); + if (!cctx) return NULL; + cctx->nbThreads = nbThreads; + cctx->jobIDMask = nbJobs - 1; + cctx->allJobsCompleted = 1; + cctx->sectionSize = 0; + cctx->overlapWrLog = 3; + cctx->factory = POOL_create(nbThreads, 1); + cctx->buffPool = ZSTDMT_createBufferPool(nbThreads); + cctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads); + if (!cctx->factory | !cctx->buffPool | !cctx->cctxPool) { /* one object was not created */ + ZSTDMT_freeCCtx(cctx); + return NULL; + } + if (nbThreads==1) { + cctx->cstream = ZSTD_createCStream(); + if (!cctx->cstream) { + ZSTDMT_freeCCtx(cctx); return NULL; + } } + pthread_mutex_init(&cctx->jobCompleted_mutex, NULL); /* Todo : check init function return */ + pthread_cond_init(&cctx->jobCompleted_cond, NULL); + DEBUGLOG(4, "mt_cctx created, for %u threads \n", nbThreads); + return cctx; +} + +/* ZSTDMT_releaseAllJobResources() : + * Ensure all workers are killed first. */ +static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx) +{ + unsigned jobID; + for (jobID=0; jobID <= mtctx->jobIDMask; jobID++) { + ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[jobID].dstBuff); + mtctx->jobs[jobID].dstBuff = g_nullBuffer; + ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[jobID].src); + mtctx->jobs[jobID].src = g_nullBuffer; + ZSTDMT_releaseCCtx(mtctx->cctxPool, mtctx->jobs[jobID].cctx); + mtctx->jobs[jobID].cctx = NULL; + } + memset(mtctx->jobs, 0, (mtctx->jobIDMask+1)*sizeof(ZSTDMT_jobDescription)); + ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->inBuff.buffer); + mtctx->inBuff.buffer = g_nullBuffer; + mtctx->allJobsCompleted = 1; +} + +size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx) +{ + if (mtctx==NULL) return 0; /* compatible with free on NULL */ + POOL_free(mtctx->factory); + if (!mtctx->allJobsCompleted) ZSTDMT_releaseAllJobResources(mtctx); /* stop workers first */ + ZSTDMT_freeBufferPool(mtctx->buffPool); /* release job resources into pools first */ + ZSTDMT_freeCCtxPool(mtctx->cctxPool); + ZSTD_freeCDict(mtctx->cdict); + ZSTD_freeCStream(mtctx->cstream); + pthread_mutex_destroy(&mtctx->jobCompleted_mutex); + pthread_cond_destroy(&mtctx->jobCompleted_cond); + free(mtctx); + return 0; +} + +size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSDTMT_parameter parameter, unsigned value) +{ + switch(parameter) + { + case ZSTDMT_p_sectionSize : + mtctx->sectionSize = value; + return 0; + case ZSTDMT_p_overlapSectionRLog : + mtctx->overlapWrLog = value; + return 0; + default : + return ERROR(compressionParameter_unsupported); + } +} + + +/* ------------------------------------------ */ +/* ===== Multi-threaded compression ===== */ +/* ------------------------------------------ */ + +size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, + void* dst, size_t dstCapacity, + const void* src, size_t srcSize, + int compressionLevel) +{ + ZSTD_parameters params = ZSTD_getParams(compressionLevel, srcSize, 0); + size_t const chunkTargetSize = (size_t)1 << (params.cParams.windowLog + 2); + unsigned const nbChunksMax = (unsigned)(srcSize / chunkTargetSize) + (srcSize < chunkTargetSize) /* min 1 */; + unsigned nbChunks = MIN(nbChunksMax, mtctx->nbThreads); + size_t const proposedChunkSize = (srcSize + (nbChunks-1)) / nbChunks; + size_t const avgChunkSize = ((proposedChunkSize & 0x1FFFF) < 0xFFFF) ? proposedChunkSize + 0xFFFF : proposedChunkSize; /* avoid too small last block */ + size_t remainingSrcSize = srcSize; + const char* const srcStart = (const char*)src; + size_t frameStartPos = 0; + + DEBUGLOG(3, "windowLog : %2u => chunkTargetSize : %u bytes ", params.cParams.windowLog, (U32)chunkTargetSize); + DEBUGLOG(2, "nbChunks : %2u (chunkSize : %u bytes) ", nbChunks, (U32)avgChunkSize); + params.fParams.contentSizeFlag = 1; + + if (nbChunks==1) { /* fallback to single-thread mode */ + ZSTD_CCtx* const cctx = mtctx->cctxPool->cctx[0]; + return ZSTD_compressCCtx(cctx, dst, dstCapacity, src, srcSize, compressionLevel); + } + + { unsigned u; + for (u=0; ubuffPool, dstBufferCapacity) : dstAsBuffer; + ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(mtctx->cctxPool); + + if ((cctx==NULL) || (dstBuffer.start==NULL)) { + mtctx->jobs[u].cSize = ERROR(memory_allocation); /* job result */ + mtctx->jobs[u].jobCompleted = 1; + nbChunks = u+1; + break; /* let's wait for previous jobs to complete, but don't start new ones */ + } + + mtctx->jobs[u].srcStart = srcStart + frameStartPos; + mtctx->jobs[u].srcSize = chunkSize; + mtctx->jobs[u].fullFrameSize = srcSize; + mtctx->jobs[u].params = params; + mtctx->jobs[u].dstBuff = dstBuffer; + mtctx->jobs[u].cctx = cctx; + mtctx->jobs[u].firstChunk = (u==0); + mtctx->jobs[u].lastChunk = (u==nbChunks-1); + mtctx->jobs[u].jobCompleted = 0; + mtctx->jobs[u].jobCompleted_mutex = &mtctx->jobCompleted_mutex; + mtctx->jobs[u].jobCompleted_cond = &mtctx->jobCompleted_cond; + + DEBUGLOG(3, "posting job %u (%u bytes)", u, (U32)chunkSize); + DEBUG_PRINTHEX(3, mtctx->jobs[u].srcStart, 12); + POOL_add(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[u]); + + frameStartPos += chunkSize; + remainingSrcSize -= chunkSize; + } } + /* note : since nbChunks <= nbThreads, all jobs should be running immediately in parallel */ + + { unsigned chunkID; + size_t error = 0, dstPos = 0; + for (chunkID=0; chunkIDjobCompleted_mutex); + while (mtctx->jobs[chunkID].jobCompleted==0) { + DEBUGLOG(4, "waiting for jobCompleted signal from chunk %u", chunkID); + pthread_cond_wait(&mtctx->jobCompleted_cond, &mtctx->jobCompleted_mutex); + } + pthread_mutex_unlock(&mtctx->jobCompleted_mutex); + DEBUGLOG(3, "ready to write chunk %u ", chunkID); + + ZSTDMT_releaseCCtx(mtctx->cctxPool, mtctx->jobs[chunkID].cctx); + mtctx->jobs[chunkID].cctx = NULL; + mtctx->jobs[chunkID].srcStart = NULL; + { size_t const cSize = mtctx->jobs[chunkID].cSize; + if (ZSTD_isError(cSize)) error = cSize; + if ((!error) && (dstPos + cSize > dstCapacity)) error = ERROR(dstSize_tooSmall); + if (chunkID) { /* note : chunk 0 is already written directly into dst */ + if (!error) memcpy((char*)dst + dstPos, mtctx->jobs[chunkID].dstBuff.start, cSize); + ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[chunkID].dstBuff); + mtctx->jobs[chunkID].dstBuff = g_nullBuffer; + } + dstPos += cSize ; + } + } + if (!error) DEBUGLOG(3, "compressed size : %u ", (U32)dstPos); + return error ? error : dstPos; + } + +} + + +/* ====================================== */ +/* ======= Streaming API ======= */ +/* ====================================== */ + +static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* zcs) { + while (zcs->doneJobID < zcs->nextJobID) { + unsigned const jobID = zcs->doneJobID & zcs->jobIDMask; + PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex); + while (zcs->jobs[jobID].jobCompleted==0) { + DEBUGLOG(4, "waiting for jobCompleted signal from chunk %u", zcs->doneJobID); /* we want to block when waiting for data to flush */ + pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); + } + pthread_mutex_unlock(&zcs->jobCompleted_mutex); + zcs->doneJobID++; + } +} + + +static size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs, + const void* dict, size_t dictSize, unsigned updateDict, + ZSTD_parameters params, unsigned long long pledgedSrcSize) +{ + ZSTD_customMem const cmem = { NULL, NULL, NULL }; + DEBUGLOG(3, "Started new compression, with windowLog : %u", params.cParams.windowLog); + if (zcs->nbThreads==1) return ZSTD_initCStream_advanced(zcs->cstream, dict, dictSize, params, pledgedSrcSize); + if (zcs->allJobsCompleted == 0) { /* previous job not correctly finished */ + ZSTDMT_waitForAllJobsCompleted(zcs); + ZSTDMT_releaseAllJobResources(zcs); + zcs->allJobsCompleted = 1; + } + zcs->params = params; + if (updateDict) { + ZSTD_freeCDict(zcs->cdict); zcs->cdict = NULL; + if (dict && dictSize) { + zcs->cdict = ZSTD_createCDict_advanced(dict, dictSize, 0, params, cmem); + if (zcs->cdict == NULL) return ERROR(memory_allocation); + } } + zcs->frameContentSize = pledgedSrcSize; + zcs->targetSectionSize = zcs->sectionSize ? zcs->sectionSize : (size_t)1 << (zcs->params.cParams.windowLog + 2); + zcs->targetSectionSize = MAX(ZSTDMT_SECTION_SIZE_MIN, zcs->targetSectionSize); + zcs->targetDictSize = zcs->overlapWrLog < 10 ? (size_t)1 << (zcs->params.cParams.windowLog - zcs->overlapWrLog) : 0; + zcs->inBuffSize = zcs->targetSectionSize + ((size_t)1 << zcs->params.cParams.windowLog) /* margin */ + zcs->targetDictSize; + zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize); + if (zcs->inBuff.buffer.start == NULL) return ERROR(memory_allocation); + zcs->inBuff.filled = 0; + zcs->dictSize = 0; + zcs->doneJobID = 0; + zcs->nextJobID = 0; + zcs->frameEnded = 0; + zcs->allJobsCompleted = 0; + if (params.fParams.checksumFlag) XXH64_reset(&zcs->xxhState, 0); + return 0; +} + +size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* zcs, + const void* dict, size_t dictSize, + ZSTD_parameters params, unsigned long long pledgedSrcSize) +{ + return ZSTDMT_initCStream_internal(zcs, dict, dictSize, 1, params, pledgedSrcSize); +} + +/* ZSTDMT_resetCStream() : + * pledgedSrcSize is optional and can be zero == unknown */ +size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize) +{ + if (zcs->nbThreads==1) return ZSTD_resetCStream(zcs->cstream, pledgedSrcSize); + return ZSTDMT_initCStream_internal(zcs, NULL, 0, 0, zcs->params, pledgedSrcSize); +} + +size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) { + ZSTD_parameters const params = ZSTD_getParams(compressionLevel, 0, 0); + return ZSTDMT_initCStream_internal(zcs, NULL, 0, 1, params, 0); +} + + +static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsigned endFrame) +{ + size_t const dstBufferCapacity = ZSTD_compressBound(srcSize); + buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity); + ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool); + unsigned const jobID = zcs->nextJobID & zcs->jobIDMask; + + if ((cctx==NULL) || (dstBuffer.start==NULL)) { + zcs->jobs[jobID].jobCompleted = 1; + zcs->nextJobID++; + ZSTDMT_waitForAllJobsCompleted(zcs); + ZSTDMT_releaseAllJobResources(zcs); + return ERROR(memory_allocation); + } + + DEBUGLOG(4, "preparing job %u to compress %u bytes with %u preload ", zcs->nextJobID, (U32)srcSize, (U32)zcs->dictSize); + zcs->jobs[jobID].src = zcs->inBuff.buffer; + zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start; + zcs->jobs[jobID].srcSize = srcSize; + zcs->jobs[jobID].dictSize = zcs->dictSize; /* note : zcs->inBuff.filled is presumed >= srcSize + dictSize */ + zcs->jobs[jobID].params = zcs->params; + if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0; /* do not calculate checksum within sections, just keep it in header for first section */ + zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL; + zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize; + zcs->jobs[jobID].dstBuff = dstBuffer; + zcs->jobs[jobID].cctx = cctx; + zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0); + zcs->jobs[jobID].lastChunk = endFrame; + zcs->jobs[jobID].jobCompleted = 0; + zcs->jobs[jobID].dstFlushed = 0; + zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex; + zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond; + + /* get a new buffer for next input */ + if (!endFrame) { + size_t const newDictSize = MIN(srcSize + zcs->dictSize, zcs->targetDictSize); + zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize); + if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */ + zcs->jobs[jobID].jobCompleted = 1; + zcs->nextJobID++; + ZSTDMT_waitForAllJobsCompleted(zcs); + ZSTDMT_releaseAllJobResources(zcs); + return ERROR(memory_allocation); + } + DEBUGLOG(5, "inBuff filled to %u", (U32)zcs->inBuff.filled); + zcs->inBuff.filled -= srcSize + zcs->dictSize - newDictSize; + DEBUGLOG(5, "new job : filled to %u, with %u dict and %u src", (U32)zcs->inBuff.filled, (U32)newDictSize, (U32)(zcs->inBuff.filled - newDictSize)); + memmove(zcs->inBuff.buffer.start, (const char*)zcs->jobs[jobID].srcStart + zcs->dictSize + srcSize - newDictSize, zcs->inBuff.filled); + DEBUGLOG(5, "new inBuff pre-filled"); + zcs->dictSize = newDictSize; + } else { + zcs->inBuff.buffer = g_nullBuffer; + zcs->inBuff.filled = 0; + zcs->dictSize = 0; + zcs->frameEnded = 1; + if (zcs->nextJobID == 0) + zcs->params.fParams.checksumFlag = 0; /* single chunk : checksum is calculated directly within worker thread */ + } + + DEBUGLOG(3, "posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize, zcs->jobs[jobID].lastChunk, zcs->doneJobID, zcs->doneJobID & zcs->jobIDMask); + POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]); /* this call is blocking when thread worker pool is exhausted */ + zcs->nextJobID++; + return 0; +} + + +/* ZSTDMT_flushNextJob() : + * output : will be updated with amount of data flushed . + * blockToFlush : if >0, the function will block and wait if there is no data available to flush . + * @return : amount of data remaining within internal buffer, 1 if unknown but > 0, 0 if no more, or an error code */ +static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush) +{ + unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask; + if (zcs->doneJobID == zcs->nextJobID) return 0; /* all flushed ! */ + PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex); + while (zcs->jobs[wJobID].jobCompleted==0) { + DEBUGLOG(5, "waiting for jobCompleted signal from job %u", zcs->doneJobID); + if (!blockToFlush) { pthread_mutex_unlock(&zcs->jobCompleted_mutex); return 0; } /* nothing ready to be flushed => skip */ + pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); /* block when nothing available to flush */ + } + pthread_mutex_unlock(&zcs->jobCompleted_mutex); + /* compression job completed : output can be flushed */ + { ZSTDMT_jobDescription job = zcs->jobs[wJobID]; + if (!job.jobScanned) { + if (ZSTD_isError(job.cSize)) { + DEBUGLOG(5, "compression error detected "); + ZSTDMT_waitForAllJobsCompleted(zcs); + ZSTDMT_releaseAllJobResources(zcs); + return job.cSize; + } + ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx); + zcs->jobs[wJobID].cctx = NULL; + DEBUGLOG(5, "zcs->params.fParams.checksumFlag : %u ", zcs->params.fParams.checksumFlag); + if (zcs->params.fParams.checksumFlag) { + XXH64_update(&zcs->xxhState, (const char*)job.srcStart + job.dictSize, job.srcSize); + if (zcs->frameEnded && (zcs->doneJobID+1 == zcs->nextJobID)) { /* write checksum at end of last section */ + U32 const checksum = (U32)XXH64_digest(&zcs->xxhState); + DEBUGLOG(4, "writing checksum : %08X \n", checksum); + MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum); + job.cSize += 4; + zcs->jobs[wJobID].cSize += 4; + } } + ZSTDMT_releaseBuffer(zcs->buffPool, job.src); + zcs->jobs[wJobID].srcStart = NULL; + zcs->jobs[wJobID].src = g_nullBuffer; + zcs->jobs[wJobID].jobScanned = 1; + } + { size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos); + DEBUGLOG(4, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID); + memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite); + output->pos += toWrite; + job.dstFlushed += toWrite; + } + if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => move to next one */ + ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff); + zcs->jobs[wJobID].dstBuff = g_nullBuffer; + zcs->jobs[wJobID].jobCompleted = 0; + zcs->doneJobID++; + } else { + zcs->jobs[wJobID].dstFlushed = job.dstFlushed; + } + /* return value : how many bytes left in buffer ; fake it to 1 if unknown but >0 */ + if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed); + if (zcs->doneJobID < zcs->nextJobID) return 1; /* still some buffer to flush */ + zcs->allJobsCompleted = zcs->frameEnded; /* frame completed and entirely flushed */ + return 0; /* everything flushed */ +} } + + +size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input) +{ + if (zcs->frameEnded) return ERROR(stage_wrong); /* current frame being ended. Only flush is allowed. Restart with init */ + if (zcs->nbThreads==1) return ZSTD_compressStream(zcs->cstream, output, input); + + /* fill input buffer */ + { size_t const toLoad = MIN(input->size - input->pos, zcs->inBuffSize - zcs->inBuff.filled); + memcpy((char*)zcs->inBuff.buffer.start + zcs->inBuff.filled, input->src, toLoad); + input->pos += toLoad; + zcs->inBuff.filled += toLoad; + } + + if ( (zcs->inBuff.filled == zcs->inBuffSize) /* filled enough : let's compress */ + && (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) { /* avoid overwriting job round buffer */ + CHECK_F( ZSTDMT_createCompressionJob(zcs, zcs->targetSectionSize, 0) ); + } + + /* check for data to flush */ + CHECK_F( ZSTDMT_flushNextJob(zcs, output, (zcs->inBuff.filled == zcs->inBuffSize)) ); /* block if it wasn't possible to create new job due to saturation */ + + /* recommended next input size : fill current input buffer */ + return zcs->inBuffSize - zcs->inBuff.filled; /* note : could be zero when input buffer is fully filled and no more availability to create new job */ +} + + +static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned endFrame) +{ + size_t const srcSize = zcs->inBuff.filled - zcs->dictSize; + + if (srcSize) DEBUGLOG(4, "flushing : %u bytes left to compress", (U32)srcSize); + if ( ((srcSize > 0) || (endFrame && !zcs->frameEnded)) + && (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) { + CHECK_F( ZSTDMT_createCompressionJob(zcs, srcSize, endFrame) ); + } + + /* check if there is any data available to flush */ + DEBUGLOG(5, "zcs->doneJobID : %u ; zcs->nextJobID : %u ", zcs->doneJobID, zcs->nextJobID); + return ZSTDMT_flushNextJob(zcs, output, 1); +} + + +size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output) +{ + if (zcs->nbThreads==1) return ZSTD_flushStream(zcs->cstream, output); + return ZSTDMT_flushStream_internal(zcs, output, 0); +} + +size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output) +{ + if (zcs->nbThreads==1) return ZSTD_endStream(zcs->cstream, output); + return ZSTDMT_flushStream_internal(zcs, output, 1); +} diff --git a/lib/compress/zstdmt_compress.h b/lib/compress/zstdmt_compress.h new file mode 100644 index 00000000..92de52d6 --- /dev/null +++ b/lib/compress/zstdmt_compress.h @@ -0,0 +1,64 @@ +/** + * Copyright (c) 2016-present, Yann Collet, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + + +/* Note : All prototypes defined in this file shall be considered experimental. + * There is no guarantee of API continuity (yet) on any of these prototypes */ + +/* === Dependencies === */ +#include /* size_t */ +#define ZSTD_STATIC_LINKING_ONLY /* ZSTD_parameters */ +#include "zstd.h" /* ZSTD_inBuffer, ZSTD_outBuffer, ZSTDLIB_API */ + + +/* === Simple one-pass functions === */ + +typedef struct ZSTDMT_CCtx_s ZSTDMT_CCtx; +ZSTDLIB_API ZSTDMT_CCtx* ZSTDMT_createCCtx(unsigned nbThreads); +ZSTDLIB_API size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* cctx); + +ZSTDLIB_API size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* cctx, + void* dst, size_t dstCapacity, + const void* src, size_t srcSize, + int compressionLevel); + + +/* === Streaming functions === */ + +ZSTDLIB_API size_t ZSTDMT_initCStream(ZSTDMT_CCtx* mtctx, int compressionLevel); +ZSTDLIB_API size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* mtctx, unsigned long long pledgedSrcSize); /**< pledgedSrcSize is optional and can be zero == unknown */ + +ZSTDLIB_API size_t ZSTDMT_compressStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input); + +ZSTDLIB_API size_t ZSTDMT_flushStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output); /**< @return : 0 == all flushed; >0 : still some data to be flushed; or an error code (ZSTD_isError()) */ +ZSTDLIB_API size_t ZSTDMT_endStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output); /**< @return : 0 == all flushed; >0 : still some data to be flushed; or an error code (ZSTD_isError()) */ + + +/* === Advanced functions and parameters === */ + +#ifndef ZSTDMT_SECTION_SIZE_MIN +# define ZSTDMT_SECTION_SIZE_MIN (1U << 20) /* 1 MB - Minimum size of each compression job */ +#endif + +ZSTDLIB_API size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* mtctx, const void* dict, size_t dictSize, /**< dict can be released after init, a local copy is preserved within zcs */ + ZSTD_parameters params, unsigned long long pledgedSrcSize); /**< pledgedSrcSize is optional and can be zero == unknown */ + +/* ZSDTMT_parameter : + * List of parameters that can be set using ZSTDMT_setMTCtxParameter() */ +typedef enum { + ZSTDMT_p_sectionSize, /* size of input "section". Each section is compressed in parallel. 0 means default, which is dynamically determined within compression functions */ + ZSTDMT_p_overlapSectionRLog /* reverse log of overlapped section; 0 == use a complete window, 3(default) == use 1/8th of window, values >=10 means no overlap */ +} ZSDTMT_parameter; + +/* ZSTDMT_setMTCtxParameter() : + * allow setting individual parameters, one at a time, among a list of enums defined in ZSTDMT_parameter. + * The function must be called typically after ZSTD_createCCtx(). + * Parameters not explicitly reset by ZSTDMT_init*() remain the same in consecutive compression sessions. + * @return : 0, or an error code (which can be tested using ZSTD_isError()) */ +ZSTDLIB_API size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSDTMT_parameter parameter, unsigned value); diff --git a/lib/decompress/zstd_decompress.c b/lib/decompress/zstd_decompress.c index c53f3c3d..9c04503d 100644 --- a/lib/decompress/zstd_decompress.c +++ b/lib/decompress/zstd_decompress.c @@ -1973,7 +1973,7 @@ size_t ZSTD_setDStreamParameter(ZSTD_DStream* zds, switch(paramType) { default : return ERROR(parameter_unknown); - case ZSTDdsp_maxWindowSize : zds->maxWindowSize = paramValue ? paramValue : (U32)(-1); break; + case DStream_p_maxWindowSize : zds->maxWindowSize = paramValue ? paramValue : (U32)(-1); break; } return 0; } diff --git a/lib/zstd.h b/lib/zstd.h index 5e4420b0..f5cbf4b4 100644 --- a/lib/zstd.h +++ b/lib/zstd.h @@ -369,9 +369,9 @@ typedef struct { } ZSTD_compressionParameters; typedef struct { - unsigned contentSizeFlag; /**< 1: content size will be in frame header (if known). */ - unsigned checksumFlag; /**< 1: will generate a 32-bits checksum at end of frame, to be used for error detection by decompressor */ - unsigned noDictIDFlag; /**< 1: no dict ID will be saved into frame header (if dictionary compression) */ + unsigned contentSizeFlag; /**< 1: content size will be in frame header (when known) */ + unsigned checksumFlag; /**< 1: generate a 32-bits checksum at end of frame, for error detection */ + unsigned noDictIDFlag; /**< 1: no dictID will be saved into frame header (if dictionary compression) */ } ZSTD_frameParameters; typedef struct { @@ -401,6 +401,14 @@ ZSTDLIB_API ZSTD_CCtx* ZSTD_createCCtx_advanced(ZSTD_customMem customMem); * Gives the amount of memory used by a given ZSTD_CCtx */ ZSTDLIB_API size_t ZSTD_sizeof_CCtx(const ZSTD_CCtx* cctx); +typedef enum { + ZSTD_p_forceWindow /* Force back-references to remain < windowSize, even when referencing Dictionary content (default:0)*/ +} ZSTD_CCtxParameter; +/*! ZSTD_setCCtxParameter() : + * Set advanced parameters, selected through enum ZSTD_CCtxParameter + * @result : 0, or an error code (which can be tested with ZSTD_isError()) */ +ZSTDLIB_API size_t ZSTD_setCCtxParameter(ZSTD_CCtx* cctx, ZSTD_CCtxParameter param, unsigned value); + /*! ZSTD_createCDict_byReference() : * Create a digested dictionary for compression * Dictionary content is simply referenced, and therefore stays in dictBuffer. @@ -519,7 +527,7 @@ ZSTDLIB_API size_t ZSTD_sizeof_CStream(const ZSTD_CStream* zcs); /*===== Advanced Streaming decompression functions =====*/ -typedef enum { ZSTDdsp_maxWindowSize } ZSTD_DStreamParameter_e; +typedef enum { DStream_p_maxWindowSize } ZSTD_DStreamParameter_e; ZSTDLIB_API ZSTD_DStream* ZSTD_createDStream_advanced(ZSTD_customMem customMem); ZSTDLIB_API size_t ZSTD_initDStream_usingDict(ZSTD_DStream* zds, const void* dict, size_t dictSize); /**< note: a dict will not be used if dict == NULL or dictSize < 8 */ ZSTDLIB_API size_t ZSTD_setDStreamParameter(ZSTD_DStream* zds, ZSTD_DStreamParameter_e paramType, unsigned paramValue); @@ -561,10 +569,10 @@ ZSTDLIB_API size_t ZSTD_sizeof_DStream(const ZSTD_DStream* zds); In which case, it will "discard" the relevant memory section from its history. Finish a frame with ZSTD_compressEnd(), which will write the last block(s) and optional checksum. - It's possible to use a NULL,0 src content, in which case, it will write a final empty block to end the frame, - Without last block mark, frames will be considered unfinished (broken) by decoders. + It's possible to use srcSize==0, in which case, it will write a final empty block to end the frame. + Without last block mark, frames will be considered unfinished (corrupted) by decoders. - You can then reuse `ZSTD_CCtx` (ZSTD_compressBegin()) to compress some new frame. + `ZSTD_CCtx` object can be re-used (ZSTD_compressBegin()) to compress some new frame. */ /*===== Buffer-less streaming compression functions =====*/ @@ -572,6 +580,7 @@ ZSTDLIB_API size_t ZSTD_compressBegin(ZSTD_CCtx* cctx, int compressionLevel); ZSTDLIB_API size_t ZSTD_compressBegin_usingDict(ZSTD_CCtx* cctx, const void* dict, size_t dictSize, int compressionLevel); ZSTDLIB_API size_t ZSTD_compressBegin_advanced(ZSTD_CCtx* cctx, const void* dict, size_t dictSize, ZSTD_parameters params, unsigned long long pledgedSrcSize); ZSTDLIB_API size_t ZSTD_copyCCtx(ZSTD_CCtx* cctx, const ZSTD_CCtx* preparedCCtx, unsigned long long pledgedSrcSize); +ZSTDLIB_API size_t ZSTD_compressBegin_usingCDict(ZSTD_CCtx* cctx, const ZSTD_CDict* cdict, unsigned long long pledgedSrcSize); ZSTDLIB_API size_t ZSTD_compressContinue(ZSTD_CCtx* cctx, void* dst, size_t dstCapacity, const void* src, size_t srcSize); ZSTDLIB_API size_t ZSTD_compressEnd(ZSTD_CCtx* cctx, void* dst, size_t dstCapacity, const void* src, size_t srcSize); diff --git a/programs/Makefile b/programs/Makefile index a4c149a0..4392939d 100644 --- a/programs/Makefile +++ b/programs/Makefile @@ -1,5 +1,5 @@ # ########################################################################## -# Copyright (c) 2016-present, Yann Collet, Facebook, Inc. +# Copyright (c) 2015-present, Yann Collet, Facebook, Inc. # All rights reserved. # # This Makefile is validated for Linux, macOS, *BSD, Hurd, Solaris, MSYS2 targets @@ -24,7 +24,7 @@ else ALIGN_LOOP = endif -CPPFLAGS+= -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(ZSTDDIR)/dictBuilder +CPPFLAGS+= -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(ZSTDDIR)/compress -I$(ZSTDDIR)/dictBuilder CFLAGS ?= -O3 CFLAGS += -Wall -Wextra -Wcast-qual -Wcast-align -Wshadow -Wstrict-aliasing=1 \ -Wswitch-enum -Wdeclaration-after-statement -Wstrict-prototypes -Wundef \ @@ -34,8 +34,8 @@ FLAGS = $(CPPFLAGS) $(CFLAGS) $(LDFLAGS) ZSTDCOMMON_FILES := $(ZSTDDIR)/common/*.c -ZSTDCOMP_FILES := $(ZSTDDIR)/compress/zstd_compress.c $(ZSTDDIR)/compress/fse_compress.c $(ZSTDDIR)/compress/huf_compress.c -ZSTDDECOMP_FILES := $(ZSTDDIR)/decompress/huf_decompress.c +ZSTDCOMP_FILES := $(ZSTDDIR)/compress/*.c +ZSTDDECOMP_FILES := $(ZSTDDIR)/decompress/*.c ZSTD_FILES := $(ZSTDDECOMP_FILES) $(ZSTDCOMMON_FILES) $(ZSTDCOMP_FILES) ZDICT_FILES := $(ZSTDDIR)/dictBuilder/*.c ZSTDDECOMP_O = $(ZSTDDIR)/decompress/zstd_decompress.o @@ -49,6 +49,8 @@ CPPFLAGS += -I$(ZSTDDIR)/legacy ZSTDLEGACY_FILES:= $(ZSTDDIR)/legacy/*.c endif +ZSTDLIB_FILES := $(wildcard $(ZSTD_FILES)) $(wildcard $(ZSTDLEGACY_FILES)) $(wildcard $(ZDICT_FILES)) +ZSTDLIB_OBJ := $(patsubst %.c,%.o,$(ZSTDLIB_FILES)) # Define *.exe as extension for Windows systems ifneq (,$(filter Windows%,$(OS))) @@ -74,8 +76,7 @@ all: zstd $(ZSTDDECOMP_O): CFLAGS += $(ALIGN_LOOP) zstd : CPPFLAGS += -DZSTD_LEGACY_SUPPORT=$(ZSTD_LEGACY_SUPPORT) -zstd : $(ZSTDDECOMP_O) $(ZSTD_FILES) $(ZSTDLEGACY_FILES) $(ZDICT_FILES) \ - zstdcli.c fileio.c bench.c datagen.c dibio.c +zstd : $(ZSTDLIB_OBJ) zstdcli.o fileio.o bench.o datagen.o dibio.o ifneq (,$(filter Windows%,$(OS))) windres/generate_res.bat endif @@ -83,8 +84,7 @@ endif zstd32 : CPPFLAGS += -DZSTD_LEGACY_SUPPORT=$(ZSTD_LEGACY_SUPPORT) -zstd32 : $(ZSTDDIR)/decompress/zstd_decompress.c $(ZSTD_FILES) $(ZSTDLEGACY_FILES) $(ZDICT_FILES) \ - zstdcli.c fileio.c bench.c datagen.c dibio.c +zstd32 : $(ZSTDLIB_FILES) zstdcli.c fileio.c bench.c datagen.c dibio.c ifneq (,$(filter Windows%,$(OS))) windres/generate_res.bat endif @@ -106,32 +106,33 @@ zstd-pgo : clean zstd $(RM) $(ZSTDDECOMP_O) $(MAKE) zstd MOREFLAGS=-fprofile-use -zstd-frugal: $(ZSTDDECOMP_O) $(ZSTD_FILES) zstdcli.c fileio.c +zstd-frugal: $(ZSTD_FILES) zstdcli.c fileio.c $(CC) $(FLAGS) -DZSTD_NOBENCH -DZSTD_NODICT $^ -o zstd$(EXT) -zstd-small: clean_decomp_o - ZSTD_LEGACY_SUPPORT=0 CFLAGS="-Os -s" $(MAKE) zstd-frugal +zstd-small: + CFLAGS="-Os -s" $(MAKE) zstd-frugal -zstd-decompress-clean: $(ZSTDDECOMP_O) $(ZSTDCOMMON_FILES) $(ZSTDDECOMP_FILES) zstdcli.c fileio.c - $(CC) $(FLAGS) -DZSTD_NOBENCH -DZSTD_NODICT -DZSTD_NOCOMPRESS $^ -o zstd-decompress$(EXT) - -zstd-decompress: clean_decomp_o - ZSTD_LEGACY_SUPPORT=0 $(MAKE) zstd-decompress-clean +zstd-decompress: $(ZSTDCOMMON_FILES) $(ZSTDDECOMP_FILES) zstdcli.c fileio.c + $(CC) $(FLAGS) -DZSTD_NOBENCH -DZSTD_NODICT -DZSTD_NOCOMPRESS $^ -o $@$(EXT) zstd-compress: $(ZSTDCOMMON_FILES) $(ZSTDCOMP_FILES) zstdcli.c fileio.c $(CC) $(FLAGS) -DZSTD_NOBENCH -DZSTD_NODICT -DZSTD_NODECOMPRESS $^ -o $@$(EXT) -gzstd: clean_decomp_o +gzstd: @echo "int main(){}" | $(CC) -o have_zlib -x c - -lz && echo found zlib || echo did not found zlib @if [ -s have_zlib ]; then \ echo building gzstd with .gz decompression support \ - && rm have_zlib$(EXT) \ + && $(RM) have_zlib$(EXT) fileio.o \ && CPPFLAGS=-DZSTD_GZDECOMPRESS LDFLAGS="-lz" $(MAKE) zstd; \ else \ echo "WARNING : no zlib, building gzstd with only .zst files support : NO .gz SUPPORT !!!" \ && $(MAKE) zstd; \ fi +zstdmt: CPPFLAGS += -DZSTD_MULTITHREAD +zstdmt: LDFLAGS += -lpthread +zstdmt: zstd + generate_res: windres/generate_res.bat @@ -166,7 +167,7 @@ ifneq (,$(filter $(shell uname),OpenBSD FreeBSD NetBSD DragonFly SunOS)) MANDIR ?= $(PREFIX)/man/man1 else MANDIR ?= $(PREFIX)/share/man/man1 -endif +endif INSTALL_PROGRAM ?= $(INSTALL) -m 755 INSTALL_SCRIPT ?= $(INSTALL) -m 755 diff --git a/programs/bench.c b/programs/bench.c index 9a4732a3..1ca40d6b 100644 --- a/programs/bench.c +++ b/programs/bench.c @@ -9,6 +9,14 @@ +/* ************************************** +* Tuning parameters +****************************************/ +#ifndef BMK_TIMETEST_DEFAULT_S /* default minimum time per test */ +#define BMK_TIMETEST_DEFAULT_S 3 +#endif + + /* ************************************** * Compiler Warnings ****************************************/ @@ -24,7 +32,7 @@ #include "util.h" /* UTIL_getFileSize, UTIL_sleep */ #include /* malloc, free */ #include /* memset */ -#include /* fprintf, fopen, ftello64 */ +#include /* fprintf, fopen */ #include /* clock_t, clock, CLOCKS_PER_SEC */ #include "mem.h" @@ -43,7 +51,6 @@ # define ZSTD_GIT_COMMIT_STRING ZSTD_EXPAND_AND_QUOTE(ZSTD_GIT_COMMIT) #endif -#define NBSECONDS 3 #define TIMELOOP_MICROSEC 1*1000000ULL /* 1 second */ #define ACTIVEPERIOD_MICROSEC 70*1000000ULL /* 70 seconds */ #define COOLPERIOD_SEC 10 @@ -92,8 +99,6 @@ static clock_t g_time = 0; /* ************************************* * Benchmark Parameters ***************************************/ -static U32 g_nbSeconds = NBSECONDS; -static size_t g_blockSize = 0; static int g_additionalParam = 0; static U32 g_decodeOnly = 0; @@ -101,19 +106,30 @@ void BMK_setNotificationLevel(unsigned level) { g_displayLevel=level; } void BMK_setAdditionalParam(int additionalParam) { g_additionalParam=additionalParam; } -void BMK_SetNbSeconds(unsigned nbSeconds) +static U32 g_nbSeconds = BMK_TIMETEST_DEFAULT_S; +void BMK_setNbSeconds(unsigned nbSeconds) { g_nbSeconds = nbSeconds; - DISPLAYLEVEL(3, "- test >= %u seconds per compression / decompression -\n", g_nbSeconds); + DISPLAYLEVEL(3, "- test >= %u seconds per compression / decompression - \n", g_nbSeconds); } -void BMK_SetBlockSize(size_t blockSize) +static size_t g_blockSize = 0; +void BMK_setBlockSize(size_t blockSize) { g_blockSize = blockSize; - DISPLAYLEVEL(2, "using blocks of size %u KB \n", (U32)(blockSize>>10)); + if (g_blockSize) DISPLAYLEVEL(2, "using blocks of size %u KB \n", (U32)(blockSize>>10)); +} + +void BMK_setDecodeOnlyMode(unsigned decodeFlag) { g_decodeOnly = (decodeFlag>0); } + +static U32 g_nbThreads = 1; +void BMK_setNbThreads(unsigned nbThreads) { +#ifndef ZSTD_MULTITHREAD + if (nbThreads > 1) DISPLAYLEVEL(2, "Note : multi-threading is disabled \n"); +#endif + g_nbThreads = nbThreads; } -void BMK_setDecodeOnly(unsigned decodeFlag) { g_decodeOnly = (decodeFlag>0); } /* ******************************************************** * Bench functions @@ -132,6 +148,8 @@ typedef struct { #define MIN(a,b) ((a)<(b) ? (a) : (b)) #define MAX(a,b) ((a)>(b) ? (a) : (b)) +#include "compress/zstdmt_compress.h" + static int BMK_benchMem(const void* srcBuffer, size_t srcSize, const char* displayName, int cLevel, const size_t* fileSizes, U32 nbFiles, @@ -145,6 +163,7 @@ static int BMK_benchMem(const void* srcBuffer, size_t srcSize, size_t const maxCompressedSize = ZSTD_compressBound(srcSize) + (maxNbBlocks * 1024); /* add some room for safety */ void* const compressedBuffer = malloc(maxCompressedSize); void* resultBuffer = malloc(srcSize); + ZSTDMT_CCtx* const mtctx = ZSTDMT_createCCtx(g_nbThreads); ZSTD_CCtx* const ctx = ZSTD_createCCtx(); ZSTD_DCtx* const dctx = ZSTD_createDCtx(); size_t const loadedCompressedSize = srcSize; @@ -212,7 +231,7 @@ static int BMK_benchMem(const void* srcBuffer, size_t srcSize, /* Bench */ { U64 fastestC = (U64)(-1LL), fastestD = (U64)(-1LL); U64 const crcOrig = g_decodeOnly ? 0 : XXH64(srcBuffer, srcSize, 0); - UTIL_time_t coolTime; + UTIL_time_t coolTime, coolTick; U64 const maxTime = (g_nbSeconds * TIMELOOP_MICROSEC) + 1; U64 totalCTime=0, totalDTime=0; U32 cCompleted=g_decodeOnly, dCompleted=0; @@ -220,25 +239,27 @@ static int BMK_benchMem(const void* srcBuffer, size_t srcSize, const char* const marks[NB_MARKS] = { " |", " /", " =", "\\" }; U32 markNb = 0; + UTIL_initTimer(&coolTick); UTIL_getTime(&coolTime); DISPLAYLEVEL(2, "\r%79s\r", ""); while (!cCompleted || !dCompleted) { - UTIL_time_t clockStart; /* overheat protection */ - if (UTIL_clockSpanMicro(coolTime, ticksPerSecond) > ACTIVEPERIOD_MICROSEC) { + if (UTIL_clockSpanMicro(coolTime, coolTick) > ACTIVEPERIOD_MICROSEC) { DISPLAYLEVEL(2, "\rcooling down ... \r"); UTIL_sleep(COOLPERIOD_SEC); UTIL_getTime(&coolTime); } if (!g_decodeOnly) { + UTIL_time_t clockTick, clockStart; /* Compression */ DISPLAYLEVEL(2, "%2s-%-17.17s :%10u ->\r", marks[markNb], displayName, (U32)srcSize); if (!cCompleted) memset(compressedBuffer, 0xE5, maxCompressedSize); /* warm up and erase result buffer */ UTIL_sleepMilli(1); /* give processor time to other processes */ UTIL_waitForNextTick(ticksPerSecond); + UTIL_initTimer(&clockTick); UTIL_getTime(&clockStart); if (!cCompleted) { /* still some time to do compression tests */ @@ -265,19 +286,26 @@ static int BMK_benchMem(const void* srcBuffer, size_t srcSize, blockTable[blockNb].srcPtr,blockTable[blockNb].srcSize, cdict); } else { +#ifdef ZSTD_MULTITHREAD /* note : limitation : MT single-pass does not support compression with dictionary */ + rSize = ZSTDMT_compressCCtx(mtctx, + blockTable[blockNb].cPtr, blockTable[blockNb].cRoom, + blockTable[blockNb].srcPtr,blockTable[blockNb].srcSize, + cLevel); +#else rSize = ZSTD_compress_advanced (ctx, blockTable[blockNb].cPtr, blockTable[blockNb].cRoom, blockTable[blockNb].srcPtr,blockTable[blockNb].srcSize, NULL, 0, zparams); +#endif } if (ZSTD_isError(rSize)) EXM_THROW(1, "ZSTD_compress_usingCDict() failed : %s", ZSTD_getErrorName(rSize)); blockTable[blockNb].cSize = rSize; } nbLoops++; - } while (UTIL_clockSpanMicro(clockStart, ticksPerSecond) < clockLoop); + } while (UTIL_clockSpanMicro(clockStart, clockTick) < clockLoop); ZSTD_freeCDict(cdict); - { U64 const clockSpan = UTIL_clockSpanMicro(clockStart, ticksPerSecond); - if (clockSpan < fastestC*nbLoops) fastestC = clockSpan / nbLoops; - totalCTime += clockSpan; + { U64 const clockSpanMicro = UTIL_clockSpanMicro(clockStart, clockTick); + if (clockSpanMicro < fastestC*nbLoops) fastestC = clockSpanMicro / nbLoops; + totalCTime += clockSpanMicro; cCompleted = (totalCTime >= maxTime); } } @@ -292,20 +320,24 @@ static int BMK_benchMem(const void* srcBuffer, size_t srcSize, memcpy(compressedBuffer, srcBuffer, loadedCompressedSize); } - (void)fastestD; (void)crcOrig; /* unused when decompression disabled */ -#if 1 +#if 0 /* disable decompression test */ + dCompleted=1; + (void)totalDTime; (void)fastestD; (void)crcOrig; /* unused when decompression disabled */ +#else /* Decompression */ if (!dCompleted) memset(resultBuffer, 0xD6, srcSize); /* warm result buffer */ UTIL_sleepMilli(1); /* give processor time to other processes */ UTIL_waitForNextTick(ticksPerSecond); - UTIL_getTime(&clockStart); if (!dCompleted) { U64 clockLoop = g_nbSeconds ? TIMELOOP_MICROSEC : 1; U32 nbLoops = 0; + UTIL_time_t clockStart, clockTick; ZSTD_DDict* const ddict = ZSTD_createDDict(dictBuffer, dictBufferSize); if (!ddict) EXM_THROW(2, "ZSTD_createDDict() allocation failure"); + UTIL_initTimer(&clockTick); + UTIL_getTime(&clockStart); do { U32 blockNb; for (blockNb=0; blockNb= maxTime); } } @@ -353,6 +385,17 @@ static int BMK_benchMem(const void* srcBuffer, size_t srcSize, pos = (U32)(u - bacc); bNb = pos / (128 KB); DISPLAY("(block %u, sub %u, pos %u) \n", segNb, bNb, pos); + if (u>5) { + int n; + for (n=-5; n<0; n++) DISPLAY("%02X ", ((const BYTE*)srcBuffer)[u+n]); + DISPLAY(" :%02X: ", ((const BYTE*)srcBuffer)[u]); + for (n=1; n<3; n++) DISPLAY("%02X ", ((const BYTE*)srcBuffer)[u+n]); + DISPLAY(" \n"); + for (n=-5; n<0; n++) DISPLAY("%02X ", ((const BYTE*)resultBuffer)[u+n]); + DISPLAY(" :%02X: ", ((const BYTE*)resultBuffer)[u]); + for (n=1; n<3; n++) DISPLAY("%02X ", ((const BYTE*)resultBuffer)[u+n]); + DISPLAY(" \n"); + } break; } if (u==srcSize-1) { /* should never happen */ @@ -378,6 +421,7 @@ static int BMK_benchMem(const void* srcBuffer, size_t srcSize, free(blockTable); free(compressedBuffer); free(resultBuffer); + ZSTDMT_freeCCtx(mtctx); ZSTD_freeCCtx(ctx); ZSTD_freeDCtx(dctx); return 0; diff --git a/programs/bench.h b/programs/bench.h index 314f3465..2918c02b 100644 --- a/programs/bench.h +++ b/programs/bench.h @@ -15,14 +15,15 @@ #define ZSTD_STATIC_LINKING_ONLY /* ZSTD_compressionParameters */ #include "zstd.h" /* ZSTD_compressionParameters */ -int BMK_benchFiles(const char** fileNamesTable, unsigned nbFiles,const char* dictFileName, +int BMK_benchFiles(const char** fileNamesTable, unsigned nbFiles,const char* dictFileName, int cLevel, int cLevelLast, ZSTD_compressionParameters* compressionParams); /* Set Parameters */ -void BMK_SetNbSeconds(unsigned nbLoops); -void BMK_SetBlockSize(size_t blockSize); -void BMK_setAdditionalParam(int additionalParam); +void BMK_setNbSeconds(unsigned nbLoops); +void BMK_setBlockSize(size_t blockSize); +void BMK_setNbThreads(unsigned nbThreads); void BMK_setNotificationLevel(unsigned level); -void BMK_setDecodeOnly(unsigned decodeFlag); +void BMK_setAdditionalParam(int additionalParam); +void BMK_setDecodeOnlyMode(unsigned decodeFlag); #endif /* BENCH_H_121279284357 */ diff --git a/programs/fileio.c b/programs/fileio.c index a112cc04..ac7dffb3 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -7,6 +7,7 @@ * of patent rights can be found in the PATENTS file in the same directory. */ + /* ************************************* * Compiler Options ***************************************/ @@ -34,11 +35,14 @@ #include "fileio.h" #define ZSTD_STATIC_LINKING_ONLY /* ZSTD_magicNumber, ZSTD_frameHeaderSize_max */ #include "zstd.h" -#ifdef ZSTD_GZDECOMPRESS -#include "zlib.h" -#if !defined(z_const) - #define z_const +#ifdef ZSTD_MULTITHREAD +# include "zstdmt_compress.h" #endif +#ifdef ZSTD_GZDECOMPRESS +# include "zlib.h" +# if !defined(z_const) +# define z_const +# endif #endif @@ -103,7 +107,23 @@ static U32 g_removeSrcFile = 0; void FIO_setRemoveSrcFile(unsigned flag) { g_removeSrcFile = (flag>0); } static U32 g_memLimit = 0; void FIO_setMemLimit(unsigned memLimit) { g_memLimit = memLimit; } - +static U32 g_nbThreads = 1; +void FIO_setNbThreads(unsigned nbThreads) { +#ifndef ZSTD_MULTITHREAD + if (nbThreads > 1) DISPLAYLEVEL(2, "Note : multi-threading is disabled \n"); +#endif + g_nbThreads = nbThreads; +} +static U32 g_blockSize = 0; +void FIO_setBlockSize(unsigned blockSize) { + if (blockSize && g_nbThreads==1) + DISPLAYLEVEL(2, "Setting block size is useless in single-thread mode \n"); +#ifdef ZSTD_MULTITHREAD + if (blockSize-1 < ZSTDMT_SECTION_SIZE_MIN-1) /* intentional underflow */ + DISPLAYLEVEL(2, "Note : minimum block size is %u KB \n", (ZSTDMT_SECTION_SIZE_MIN>>10)); +#endif + g_blockSize = blockSize; +} /*-************************************* @@ -226,23 +246,34 @@ static size_t FIO_loadFile(void** bufferPtr, const char* fileName) * Compression ************************************************************************/ typedef struct { + FILE* srcFile; + FILE* dstFile; void* srcBuffer; size_t srcBufferSize; void* dstBuffer; size_t dstBufferSize; +#ifdef ZSTD_MULTITHREAD + ZSTDMT_CCtx* cctx; +#else ZSTD_CStream* cctx; - FILE* dstFile; - FILE* srcFile; +#endif } cRess_t; -static cRess_t FIO_createCResources(const char* dictFileName, int cLevel, +static cRess_t FIO_createCResources(const char* dictFileName, int cLevel, U64 srcSize, ZSTD_compressionParameters* comprParams) { cRess_t ress; memset(&ress, 0, sizeof(ress)); +#ifdef ZSTD_MULTITHREAD + ress.cctx = ZSTDMT_createCCtx(g_nbThreads); + if (ress.cctx == NULL) EXM_THROW(30, "zstd: allocation error : can't create ZSTD_CStream"); + if (cLevel==ZSTD_maxCLevel()) + ZSTDMT_setMTCtxParameter(ress.cctx, ZSTDMT_p_overlapSectionRLog, 0); /* use complete window for overlap */ +#else ress.cctx = ZSTD_createCStream(); if (ress.cctx == NULL) EXM_THROW(30, "zstd: allocation error : can't create ZSTD_CStream"); +#endif ress.srcBufferSize = ZSTD_CStreamInSize(); ress.srcBuffer = malloc(ress.srcBufferSize); ress.dstBufferSize = ZSTD_CStreamOutSize(); @@ -264,8 +295,14 @@ static cRess_t FIO_createCResources(const char* dictFileName, int cLevel, if (comprParams->searchLength) params.cParams.searchLength = comprParams->searchLength; if (comprParams->targetLength) params.cParams.targetLength = comprParams->targetLength; if (comprParams->strategy) params.cParams.strategy = (ZSTD_strategy)(comprParams->strategy - 1); +#ifdef ZSTD_MULTITHREAD + { size_t const errorCode = ZSTDMT_initCStream_advanced(ress.cctx, dictBuffer, dictBuffSize, params, srcSize); + if (ZSTD_isError(errorCode)) EXM_THROW(33, "Error initializing CStream : %s", ZSTD_getErrorName(errorCode)); + ZSTDMT_setMTCtxParameter(ress.cctx, ZSTDMT_p_sectionSize, g_blockSize); +#else { size_t const errorCode = ZSTD_initCStream_advanced(ress.cctx, dictBuffer, dictBuffSize, params, srcSize); if (ZSTD_isError(errorCode)) EXM_THROW(33, "Error initializing CStream : %s", ZSTD_getErrorName(errorCode)); +#endif } } free(dictBuffer); } @@ -277,7 +314,11 @@ static void FIO_freeCResources(cRess_t ress) { free(ress.srcBuffer); free(ress.dstBuffer); +#ifdef ZSTD_MULTITHREAD + ZSTDMT_freeCCtx(ress.cctx); +#else ZSTD_freeCStream(ress.cctx); /* never fails */ +#endif } @@ -296,7 +337,11 @@ static int FIO_compressFilename_internal(cRess_t ress, U64 const fileSize = UTIL_getFileSize(srcFileName); /* init */ +#ifdef ZSTD_MULTITHREAD + { size_t const resetError = ZSTDMT_resetCStream(ress.cctx, fileSize); +#else { size_t const resetError = ZSTD_resetCStream(ress.cctx, fileSize); +#endif if (ZSTD_isError(resetError)) EXM_THROW(21, "Error initializing compression : %s", ZSTD_getErrorName(resetError)); } @@ -308,31 +353,39 @@ static int FIO_compressFilename_internal(cRess_t ress, readsize += inSize; DISPLAYUPDATE(2, "\rRead : %u MB ", (U32)(readsize>>20)); - /* Compress using buffered streaming */ { ZSTD_inBuffer inBuff = { ress.srcBuffer, inSize, 0 }; - ZSTD_outBuffer outBuff= { ress.dstBuffer, ress.dstBufferSize, 0 }; - { size_t const result = ZSTD_compressStream(ress.cctx, &outBuff, &inBuff); - if (ZSTD_isError(result)) EXM_THROW(23, "Compression error : %s ", ZSTD_getErrorName(result)); } - if (inBuff.pos != inBuff.size) - /* inBuff should be entirely consumed since buffer sizes are recommended ones */ - EXM_THROW(24, "Compression error : input block not fully consumed"); + while (inBuff.pos != inBuff.size) { /* note : is there any possibility of endless loop ? for example, if outBuff is not large enough ? */ + ZSTD_outBuffer outBuff= { ress.dstBuffer, ress.dstBufferSize, 0 }; +#ifdef ZSTD_MULTITHREAD + size_t const result = ZSTDMT_compressStream(ress.cctx, &outBuff, &inBuff); +#else + size_t const result = ZSTD_compressStream(ress.cctx, &outBuff, &inBuff); +#endif + if (ZSTD_isError(result)) EXM_THROW(23, "Compression error : %s ", ZSTD_getErrorName(result)); - /* Write cBlock */ - { size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile); - if (sizeCheck!=outBuff.pos) EXM_THROW(25, "Write error : cannot write compressed block into %s", dstFileName); } - compressedfilesize += outBuff.pos; - } + /* Write compressed stream */ + if (outBuff.pos) { + size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile); + if (sizeCheck!=outBuff.pos) EXM_THROW(25, "Write error : cannot write compressed block into %s", dstFileName); + compressedfilesize += outBuff.pos; + } } } DISPLAYUPDATE(2, "\rRead : %u MB ==> %.2f%% ", (U32)(readsize>>20), (double)compressedfilesize/readsize*100); } /* End of Frame */ - { ZSTD_outBuffer outBuff = { ress.dstBuffer, ress.dstBufferSize, 0 }; - size_t const result = ZSTD_endStream(ress.cctx, &outBuff); - if (result!=0) EXM_THROW(26, "Compression error : cannot create frame end"); - - { size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile); - if (sizeCheck!=outBuff.pos) EXM_THROW(27, "Write error : cannot write frame end into %s", dstFileName); } - compressedfilesize += outBuff.pos; + { size_t result = 1; + while (result!=0) { /* note : is there any possibility of endless loop ? */ + ZSTD_outBuffer outBuff = { ress.dstBuffer, ress.dstBufferSize, 0 }; +#ifdef ZSTD_MULTITHREAD + result = ZSTDMT_endStream(ress.cctx, &outBuff); +#else + result = ZSTD_endStream(ress.cctx, &outBuff); +#endif + if (ZSTD_isError(result)) EXM_THROW(26, "Compression error during frame end : %s", ZSTD_getErrorName(result)); + { size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile); + if (sizeCheck!=outBuff.pos) EXM_THROW(27, "Write error : cannot write frame end into %s", dstFileName); } + compressedfilesize += outBuff.pos; + } } /* Status */ @@ -481,7 +534,7 @@ static dRess_t FIO_createDResources(const char* dictFileName) /* Allocation */ ress.dctx = ZSTD_createDStream(); if (ress.dctx==NULL) EXM_THROW(60, "Can't create ZSTD_DStream"); - ZSTD_setDStreamParameter(ress.dctx, ZSTDdsp_maxWindowSize, g_memLimit); + ZSTD_setDStreamParameter(ress.dctx, DStream_p_maxWindowSize, g_memLimit); ress.srcBufferSize = ZSTD_DStreamInSize(); ress.srcBuffer = malloc(ress.srcBufferSize); ress.dstBufferSize = ZSTD_DStreamOutSize(); @@ -632,7 +685,7 @@ unsigned long long FIO_decompressFrame(dRess_t* ress, if (ZSTD_isError(readSizeHint)) EXM_THROW(36, "Decoding error : %s", ZSTD_getErrorName(readSizeHint)); /* Write block */ - storedSkips = FIO_fwriteSparse(ress->dstFile, ress->dstBuffer, outBuff.pos, storedSkips); + storedSkips = FIO_fwriteSparse(ress->dstFile, ress->dstBuffer, outBuff.pos, storedSkips); frameSize += outBuff.pos; DISPLAYUPDATE(2, "\rDecoded : %u MB... ", (U32)((alreadyDecoded+frameSize)>>20) ); diff --git a/programs/fileio.h b/programs/fileio.h index b7165833..11178bcc 100644 --- a/programs/fileio.h +++ b/programs/fileio.h @@ -12,12 +12,13 @@ #define FILEIO_H_23981798732 #define ZSTD_STATIC_LINKING_ONLY /* ZSTD_compressionParameters */ -#include "zstd.h" /* ZSTD_compressionParameters */ +#include "zstd.h" /* ZSTD_* */ #if defined (__cplusplus) extern "C" { #endif + /* ************************************* * Special i/o constants **************************************/ @@ -40,6 +41,8 @@ void FIO_setDictIDFlag(unsigned dictIDFlag); void FIO_setChecksumFlag(unsigned checksumFlag); void FIO_setRemoveSrcFile(unsigned flag); void FIO_setMemLimit(unsigned memLimit); +void FIO_setNbThreads(unsigned nbThreads); +void FIO_setBlockSize(unsigned blockSize); /*-************************************* diff --git a/programs/util.h b/programs/util.h index aaa4b7c1..651027ba 100644 --- a/programs/util.h +++ b/programs/util.h @@ -95,18 +95,26 @@ extern "C" { /*-**************************************** * Time functions ******************************************/ -#if !defined(_WIN32) - typedef clock_t UTIL_time_t; - UTIL_STATIC void UTIL_initTimer(UTIL_time_t* ticksPerSecond) { *ticksPerSecond=0; } - UTIL_STATIC void UTIL_getTime(UTIL_time_t* x) { *x = clock(); } - UTIL_STATIC U64 UTIL_getSpanTimeMicro(UTIL_time_t ticksPerSecond, UTIL_time_t clockStart, UTIL_time_t clockEnd) { (void)ticksPerSecond; return 1000000ULL * (clockEnd - clockStart) / CLOCKS_PER_SEC; } - UTIL_STATIC U64 UTIL_getSpanTimeNano(UTIL_time_t ticksPerSecond, UTIL_time_t clockStart, UTIL_time_t clockEnd) { (void)ticksPerSecond; return 1000000000ULL * (clockEnd - clockStart) / CLOCKS_PER_SEC; } -#else +#if (PLATFORM_POSIX_VERSION >= 1) +#include +#include /* times */ + typedef U64 UTIL_time_t; + UTIL_STATIC void UTIL_initTimer(UTIL_time_t* ticksPerSecond) { *ticksPerSecond=sysconf(_SC_CLK_TCK); } + UTIL_STATIC void UTIL_getTime(UTIL_time_t* x) { struct tms junk; clock_t newTicks = (clock_t) times(&junk); (void)junk; *x = (UTIL_time_t)newTicks; } + UTIL_STATIC U64 UTIL_getSpanTimeMicro(UTIL_time_t ticksPerSecond, UTIL_time_t clockStart, UTIL_time_t clockEnd) { return 1000000ULL * (clockEnd - clockStart) / ticksPerSecond; } + UTIL_STATIC U64 UTIL_getSpanTimeNano(UTIL_time_t ticksPerSecond, UTIL_time_t clockStart, UTIL_time_t clockEnd) { return 1000000000ULL * (clockEnd - clockStart) / ticksPerSecond; } +#elif defined(_WIN32) /* Windows */ typedef LARGE_INTEGER UTIL_time_t; UTIL_STATIC void UTIL_initTimer(UTIL_time_t* ticksPerSecond) { if (!QueryPerformanceFrequency(ticksPerSecond)) fprintf(stderr, "ERROR: QueryPerformance not present\n"); } UTIL_STATIC void UTIL_getTime(UTIL_time_t* x) { QueryPerformanceCounter(x); } UTIL_STATIC U64 UTIL_getSpanTimeMicro(UTIL_time_t ticksPerSecond, UTIL_time_t clockStart, UTIL_time_t clockEnd) { return 1000000ULL*(clockEnd.QuadPart - clockStart.QuadPart)/ticksPerSecond.QuadPart; } UTIL_STATIC U64 UTIL_getSpanTimeNano(UTIL_time_t ticksPerSecond, UTIL_time_t clockStart, UTIL_time_t clockEnd) { return 1000000000ULL*(clockEnd.QuadPart - clockStart.QuadPart)/ticksPerSecond.QuadPart; } +#else /* relies on standard C (note : clock_t measurements can be wrong when using multi-threading) */ + typedef clock_t UTIL_time_t; + UTIL_STATIC void UTIL_initTimer(UTIL_time_t* ticksPerSecond) { *ticksPerSecond=0; } + UTIL_STATIC void UTIL_getTime(UTIL_time_t* x) { *x = clock(); } + UTIL_STATIC U64 UTIL_getSpanTimeMicro(UTIL_time_t ticksPerSecond, UTIL_time_t clockStart, UTIL_time_t clockEnd) { (void)ticksPerSecond; return 1000000ULL * (clockEnd - clockStart) / CLOCKS_PER_SEC; } + UTIL_STATIC U64 UTIL_getSpanTimeNano(UTIL_time_t ticksPerSecond, UTIL_time_t clockStart, UTIL_time_t clockEnd) { (void)ticksPerSecond; return 1000000000ULL * (clockEnd - clockStart) / CLOCKS_PER_SEC; } #endif diff --git a/programs/zstdcli.c b/programs/zstdcli.c index 54c66a32..64f2c919 100644 --- a/programs/zstdcli.c +++ b/programs/zstdcli.c @@ -20,6 +20,7 @@ #endif + /*-************************************ * Dependencies **************************************/ @@ -110,12 +111,16 @@ static int usage_advanced(const char* programName) DISPLAY( " -q : suppress warnings; specify twice to suppress errors too\n"); DISPLAY( " -c : force write to standard output, even if it is the console\n"); #ifdef UTIL_HAS_CREATEFILELIST - DISPLAY( " -r : operate recursively on directories\n"); + DISPLAY( " -r : operate recursively on directories \n"); #endif #ifndef ZSTD_NOCOMPRESS DISPLAY( "--ultra : enable levels beyond %i, up to %i (requires more memory)\n", ZSTDCLI_CLEVEL_MAX, ZSTD_maxCLevel()); DISPLAY( "--no-dictID : don't write dictID into header (dictionary compression)\n"); - DISPLAY( "--[no-]check : integrity check (default:enabled)\n"); + DISPLAY( "--[no-]check : integrity check (default:enabled) \n"); +#ifdef ZSTD_MULTITHREAD + DISPLAY( " -T# : use # threads for compression (default:1) \n"); + DISPLAY( " -B# : select size of independent sections (default:0==automatic) \n"); +#endif #endif #ifndef ZSTD_NODECOMPRESS DISPLAY( "--test : test compressed file integrity \n"); @@ -256,7 +261,10 @@ int main(int argCount, const char* argv[]) nextArgumentIsDictID=0, nextArgumentsAreFiles=0, ultra=0, - lastCommand = 0; + lastCommand = 0, + nbThreads = 1; + unsigned bench_nbSeconds = 3; /* would be better if this value was synchronized from bench */ + size_t blockSize = 0; zstd_operation_mode operation = zom_compress; ZSTD_compressionParameters compressionParams; int cLevel = ZSTDCLI_CLEVEL_DEFAULT; @@ -393,7 +401,7 @@ int main(int argCount, const char* argv[]) /* Decoding */ case 'd': #ifndef ZSTD_NOBENCH - if (operation==zom_bench) { BMK_setDecodeOnly(1); argument++; break; } /* benchmark decode (hidden option) */ + if (operation==zom_bench) { BMK_setDecodeOnlyMode(1); argument++; break; } /* benchmark decode (hidden option) */ #endif operation=zom_decompress; argument++; break; @@ -437,34 +445,38 @@ int main(int argCount, const char* argv[]) #ifndef ZSTD_NOBENCH /* Benchmark */ - case 'b': operation=zom_bench; argument++; break; + case 'b': + operation=zom_bench; + argument++; + break; /* range bench (benchmark only) */ case 'e': - /* compression Level */ - argument++; - cLevelLast = readU32FromChar(&argument); - break; + /* compression Level */ + argument++; + cLevelLast = readU32FromChar(&argument); + break; /* Modify Nb Iterations (benchmark only) */ case 'i': argument++; - { U32 const iters = readU32FromChar(&argument); - BMK_setNotificationLevel(displayLevel); - BMK_SetNbSeconds(iters); - } + bench_nbSeconds = readU32FromChar(&argument); break; /* cut input into blocks (benchmark only) */ case 'B': argument++; - { size_t const bSize = readU32FromChar(&argument); - BMK_setNotificationLevel(displayLevel); - BMK_SetBlockSize(bSize); - } + blockSize = readU32FromChar(&argument); break; + #endif /* ZSTD_NOBENCH */ + /* nb of threads (hidden option) */ + case 'T': + argument++; + nbThreads = readU32FromChar(&argument); + break; + /* Dictionary Selection level */ case 's': argument++; @@ -553,6 +565,9 @@ int main(int argCount, const char* argv[]) if (operation==zom_bench) { #ifndef ZSTD_NOBENCH BMK_setNotificationLevel(displayLevel); + BMK_setBlockSize(blockSize); + BMK_setNbThreads(nbThreads); + BMK_setNbSeconds(bench_nbSeconds); BMK_benchFiles(filenameTable, filenameIdx, dictFileName, cLevel, cLevelLast, &compressionParams); #endif goto _end; @@ -603,7 +618,7 @@ int main(int argCount, const char* argv[]) } } #endif - /* No warning message in pipe mode (stdin + stdout) or multi-files mode */ + /* No status message in pipe mode (stdin - stdout) or multi-files mode */ if (!strcmp(filenameTable[0], stdinmark) && outFileName && !strcmp(outFileName,stdoutmark) && (displayLevel==2)) displayLevel=1; if ((filenameIdx>1) & (displayLevel==2)) displayLevel=1; @@ -611,6 +626,8 @@ int main(int argCount, const char* argv[]) FIO_setNotificationLevel(displayLevel); if (operation==zom_compress) { #ifndef ZSTD_NOCOMPRESS + FIO_setNbThreads(nbThreads); + FIO_setBlockSize((U32)blockSize); if ((filenameIdx==1) && outFileName) operationResult = FIO_compressFilename(outFileName, filenameTable[0], dictFileName, cLevel, &compressionParams); else diff --git a/tests/.gitignore b/tests/.gitignore index e932ad91..53520238 100644 --- a/tests/.gitignore +++ b/tests/.gitignore @@ -6,6 +6,7 @@ fuzzer32 fuzzer-dll zbufftest zbufftest32 +zbufftest-dll zstreamtest zstreamtest32 zstreamtest-dll @@ -15,6 +16,7 @@ paramgrill32 roundTripCrash longmatch symbols +pool invalidDictionaries # Tmp test directory diff --git a/tests/Makefile b/tests/Makefile index 15fdc77f..937f3b41 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -26,10 +26,10 @@ PYTHON ?= python3 TESTARTEFACT := versionsTest namespaceTest -CPPFLAGS+= -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(ZSTDDIR)/dictBuilder -I$(ZSTDDIR)/deprecated -I$(PRGDIR) +CPPFLAGS+= -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(ZSTDDIR)/compress -I$(ZSTDDIR)/dictBuilder -I$(ZSTDDIR)/deprecated -I$(PRGDIR) CFLAGS ?= -O3 -CFLAGS += -Wall -Wextra -Wcast-qual -Wcast-align -Wshadow -Wstrict-aliasing=1 \ - -Wswitch-enum -Wdeclaration-after-statement -Wstrict-prototypes -Wundef +CFLAGS += -g -Wall -Wextra -Wcast-qual -Wcast-align -Wshadow -Wstrict-aliasing=1 \ + -Wswitch-enum -Wdeclaration-after-statement -Wstrict-prototypes -Wundef CFLAGS += $(MOREFLAGS) FLAGS = $(CPPFLAGS) $(CFLAGS) $(LDFLAGS) @@ -46,8 +46,10 @@ ZDICT_FILES := $(ZSTDDIR)/dictBuilder/*.c # Define *.exe as extension for Windows systems ifneq (,$(filter Windows%,$(OS))) EXT =.exe +MULTITHREAD = -DZSTD_MULTITHREAD else EXT = +MULTITHREAD = -pthread -DZSTD_MULTITHREAD endif VOID = /dev/null @@ -122,10 +124,10 @@ zbufftest-dll : $(ZSTDDIR)/common/xxhash.c $(PRGDIR)/datagen.c zbufftest.c $(CC) $(CPPFLAGS) $(CFLAGS) $^ $(LDFLAGS) -o $@$(EXT) zstreamtest : $(ZSTD_FILES) $(ZDICT_FILES) $(PRGDIR)/datagen.c zstreamtest.c - $(CC) $(FLAGS) $^ -o $@$(EXT) + $(CC) $(FLAGS) $(MULTITHREAD) $^ -o $@$(EXT) zstreamtest32 : $(ZSTD_FILES) $(ZDICT_FILES) $(PRGDIR)/datagen.c zstreamtest.c - $(CC) -m32 $(FLAGS) $^ -o $@$(EXT) + $(CC) -m32 $(FLAGS) $(MULTITHREAD) $^ -o $@$(EXT) zstreamtest-dll : LDFLAGS+= -L$(ZSTDDIR) -lzstd zstreamtest-dll : $(ZSTDDIR)/common/xxhash.c $(PRGDIR)/datagen.c zstreamtest.c @@ -156,6 +158,9 @@ else $(CC) $(FLAGS) $^ -o $@$(EXT) -Wl,-rpath=$(ZSTDDIR) $(ZSTDDIR)/libzstd.so endif +pool : pool.c $(ZSTDDIR)/common/pool.c $(ZSTDDIR)/common/threading.c + $(CC) $(FLAGS) $(MULTITHREAD) $^ -o $@$(EXT) + namespaceTest: if $(CC) namespaceTest.c ../lib/common/xxhash.c -o $@ ; then echo compilation should fail; exit 1 ; fi $(RM) $@ @@ -174,7 +179,7 @@ clean: fuzzer-dll$(EXT) zstreamtest-dll$(EXT) zbufftest-dll$(EXT)\ zstreamtest$(EXT) zstreamtest32$(EXT) \ datagen$(EXT) paramgrill$(EXT) roundTripCrash$(EXT) longmatch$(EXT) \ - symbols$(EXT) invalidDictionaries$(EXT) + symbols$(EXT) invalidDictionaries$(EXT) pool$(EXT) @echo Cleaning completed @@ -220,7 +225,7 @@ zstd-playTests: datagen file $(ZSTD) ZSTD="$(QEMU_SYS) $(ZSTD)" ./playTests.sh $(ZSTDRTTEST) -test: test-zstd test-fullbench test-fuzzer test-zstream test-longmatch test-invalidDictionaries +test: test-zstd test-fullbench test-fuzzer test-zstream test-longmatch test-invalidDictionaries test-pool test32: test-zstd32 test-fullbench32 test-fuzzer32 test-zstream32 @@ -286,4 +291,7 @@ test-invalidDictionaries: invalidDictionaries test-symbols: symbols $(QEMU_SYS) ./symbols +test-pool: pool + $(QEMU_SYS) ./pool + endif diff --git a/tests/fuzzer.c b/tests/fuzzer.c index 00cfb057..60546c07 100644 --- a/tests/fuzzer.c +++ b/tests/fuzzer.c @@ -755,6 +755,7 @@ static int fuzzerTests(U32 seed, U32 nbTests, unsigned startTest, U32 const maxD CHECK (ZSTD_isError(errorCode), "ZSTD_copyCCtx error : %s", ZSTD_getErrorName(errorCode)); } } XXH64_reset(&xxhState, 0); + ZSTD_setCCtxParameter(ctx, ZSTD_p_forceWindow, FUZ_rand(&lseed) & 1); { U32 const nbChunks = (FUZ_rand(&lseed) & 127) + 2; U32 n; for (totalTestSize=0, cSize=0, n=0 ; n world.zstd cat hello.zstd world.zstd > helloworld.zstd $ZSTD -dc helloworld.zstd > result.tmp cat result.tmp -sdiff helloworld.tmp result.tmp +$DIFF helloworld.tmp result.tmp $ECHO "frame concatenation without checksum" $ZSTD -c hello.tmp > hello.zstd --no-check $ZSTD -c world.tmp > world.zstd --no-check cat hello.zstd world.zstd > helloworld.zstd $ZSTD -dc helloworld.zstd > result.tmp cat result.tmp -sdiff helloworld.tmp result.tmp +$DIFF helloworld.tmp result.tmp rm ./*.tmp ./*.zstd $ECHO "frame concatenation tests completed" diff --git a/tests/pool.c b/tests/pool.c new file mode 100644 index 00000000..adc5947d --- /dev/null +++ b/tests/pool.c @@ -0,0 +1,70 @@ +#include "pool.h" +#include "threading.h" +#include +#include + +#define ASSERT_TRUE(p) \ + do { \ + if (!(p)) { \ + return 1; \ + } \ + } while (0) +#define ASSERT_FALSE(p) ASSERT_TRUE(!(p)) +#define ASSERT_EQ(lhs, rhs) ASSERT_TRUE((lhs) == (rhs)) + +struct data { + pthread_mutex_t mutex; + unsigned data[16]; + size_t i; +}; + +void fn(void *opaque) { + struct data *data = (struct data *)opaque; + pthread_mutex_lock(&data->mutex); + data->data[data->i] = data->i; + ++data->i; + pthread_mutex_unlock(&data->mutex); +} + +int testOrder(size_t numThreads, size_t queueSize) { + struct data data; + POOL_ctx *ctx = POOL_create(numThreads, queueSize); + ASSERT_TRUE(ctx); + data.i = 0; + pthread_mutex_init(&data.mutex, NULL); + { + size_t i; + for (i = 0; i < 16; ++i) { + POOL_add(ctx, &fn, &data); + } + } + POOL_free(ctx); + ASSERT_EQ(16, data.i); + { + size_t i; + for (i = 0; i < data.i; ++i) { + ASSERT_EQ(i, data.data[i]); + } + } + pthread_mutex_destroy(&data.mutex); + return 0; +} + +int main(int argc, const char **argv) { + size_t numThreads; + for (numThreads = 1; numThreads <= 4; ++numThreads) { + size_t queueSize; + for (queueSize = 1; queueSize <= 2; ++queueSize) { + if (testOrder(numThreads, queueSize)) { + printf("FAIL: testOrder\n"); + return 1; + } + } + } + printf("PASS: testOrder\n"); + (void)argc; + (void)argv; + return (POOL_create(0, 1) || POOL_create(1, 0)) ? printf("FAIL: testInvalid\n"), 1 + : printf("PASS: testInvalid\n"), 0; + return 0; +} diff --git a/tests/zstreamtest.c b/tests/zstreamtest.c index 4024e5ed..bef8734c 100644 --- a/tests/zstreamtest.c +++ b/tests/zstreamtest.c @@ -29,6 +29,7 @@ #define ZSTD_STATIC_LINKING_ONLY /* ZSTD_maxCLevel, ZSTD_customMem, ZSTD_getDictID_fromFrame */ #include "zstd.h" /* ZSTD_compressBound */ #include "zstd_errors.h" /* ZSTD_error_srcSize_wrong */ +#include "zstdmt_compress.h" #include "zdict.h" /* ZDICT_trainFromBuffer */ #include "datagen.h" /* RDG_genBuffer */ #define XXH_STATIC_LINKING_ONLY /* XXH64_state_t */ @@ -184,7 +185,7 @@ static int basicUnitTests(U32 seed, double compressibility, ZSTD_customMem custo cSize = skippableFrameSize + 8; /* Basic compression test */ - DISPLAYLEVEL(4, "test%3i : compress %u bytes : ", testNb++, COMPRESSIBLE_NOISE_LENGTH); + DISPLAYLEVEL(3, "test%3i : compress %u bytes : ", testNb++, COMPRESSIBLE_NOISE_LENGTH); ZSTD_initCStream_usingDict(zc, CNBuffer, 128 KB, 1); outBuff.dst = (char*)(compressedBuffer)+cSize; outBuff.size = compressedBufferSize; @@ -198,16 +199,16 @@ static int basicUnitTests(U32 seed, double compressibility, ZSTD_customMem custo { size_t const r = ZSTD_endStream(zc, &outBuff); if (r != 0) goto _output_error; } /* error, or some data not flushed */ cSize += outBuff.pos; - DISPLAYLEVEL(4, "OK (%u bytes : %.2f%%)\n", (U32)cSize, (double)cSize/COMPRESSIBLE_NOISE_LENGTH*100); + DISPLAYLEVEL(3, "OK (%u bytes : %.2f%%)\n", (U32)cSize, (double)cSize/COMPRESSIBLE_NOISE_LENGTH*100); - DISPLAYLEVEL(4, "test%3i : check CStream size : ", testNb++); + DISPLAYLEVEL(3, "test%3i : check CStream size : ", testNb++); { size_t const s = ZSTD_sizeof_CStream(zc); if (ZSTD_isError(s)) goto _output_error; - DISPLAYLEVEL(4, "OK (%u bytes) \n", (U32)s); + DISPLAYLEVEL(3, "OK (%u bytes) \n", (U32)s); } /* skippable frame test */ - DISPLAYLEVEL(4, "test%3i : decompress skippable frame : ", testNb++); + DISPLAYLEVEL(3, "test%3i : decompress skippable frame : ", testNb++); ZSTD_initDStream_usingDict(zd, CNBuffer, 128 KB); inBuff.src = compressedBuffer; inBuff.size = cSize; @@ -218,45 +219,45 @@ static int basicUnitTests(U32 seed, double compressibility, ZSTD_customMem custo { size_t const r = ZSTD_decompressStream(zd, &outBuff, &inBuff); if (r != 0) goto _output_error; } if (outBuff.pos != 0) goto _output_error; /* skippable frame len is 0 */ - DISPLAYLEVEL(4, "OK \n"); + DISPLAYLEVEL(3, "OK \n"); /* Basic decompression test */ inBuff2 = inBuff; - DISPLAYLEVEL(4, "test%3i : decompress %u bytes : ", testNb++, COMPRESSIBLE_NOISE_LENGTH); + DISPLAYLEVEL(3, "test%3i : decompress %u bytes : ", testNb++, COMPRESSIBLE_NOISE_LENGTH); ZSTD_initDStream_usingDict(zd, CNBuffer, 128 KB); - { size_t const r = ZSTD_setDStreamParameter(zd, ZSTDdsp_maxWindowSize, 1000000000); /* large limit */ + { size_t const r = ZSTD_setDStreamParameter(zd, DStream_p_maxWindowSize, 1000000000); /* large limit */ if (ZSTD_isError(r)) goto _output_error; } { size_t const remaining = ZSTD_decompressStream(zd, &outBuff, &inBuff); if (remaining != 0) goto _output_error; } /* should reach end of frame == 0; otherwise, some data left, or an error */ if (outBuff.pos != CNBufferSize) goto _output_error; /* should regenerate the same amount */ if (inBuff.pos != inBuff.size) goto _output_error; /* should have read the entire frame */ - DISPLAYLEVEL(4, "OK \n"); + DISPLAYLEVEL(3, "OK \n"); /* Re-use without init */ - DISPLAYLEVEL(4, "test%3i : decompress again without init (re-use previous settings): ", testNb++); + DISPLAYLEVEL(3, "test%3i : decompress again without init (re-use previous settings): ", testNb++); outBuff.pos = 0; { size_t const remaining = ZSTD_decompressStream(zd, &outBuff, &inBuff2); if (remaining != 0) goto _output_error; } /* should reach end of frame == 0; otherwise, some data left, or an error */ if (outBuff.pos != CNBufferSize) goto _output_error; /* should regenerate the same amount */ if (inBuff.pos != inBuff.size) goto _output_error; /* should have read the entire frame */ - DISPLAYLEVEL(4, "OK \n"); + DISPLAYLEVEL(3, "OK \n"); /* check regenerated data is byte exact */ - DISPLAYLEVEL(4, "test%3i : check decompressed result : ", testNb++); + DISPLAYLEVEL(3, "test%3i : check decompressed result : ", testNb++); { size_t i; for (i=0; i 100 bytes */ - DISPLAYLEVEL(4, "OK (%s)\n", ZSTD_getErrorName(r)); } + DISPLAYLEVEL(3, "OK (%s)\n", ZSTD_getErrorName(r)); } _end: @@ -464,6 +465,11 @@ static size_t findDiff(const void* buf1, const void* buf2, size_t max) for (u=0; u= testNb) { DISPLAYUPDATE(2, "\r%6u/%6u ", testNb, nbTests); } + else { DISPLAYUPDATE(2, "\r%6u ", testNb); } + FUZ_rand(&coreSeed); + lseed = coreSeed ^ prime32; + + /* states full reset (deliberately not synchronized) */ + /* some issues can only happen when reusing states */ + if ((FUZ_rand(&lseed) & 0xFF) == 131) { + U32 const nbThreads = (FUZ_rand(&lseed) % 6) + 1; + ZSTDMT_freeCCtx(zc); + zc = ZSTDMT_createCCtx(nbThreads); + resetAllowed=0; + } + if ((FUZ_rand(&lseed) & 0xFF) == 132) { + ZSTD_freeDStream(zd); + zd = ZSTD_createDStream(); + ZSTD_initDStream_usingDict(zd, NULL, 0); /* ensure at least one init */ + } + + /* srcBuffer selection [0-4] */ + { U32 buffNb = FUZ_rand(&lseed) & 0x7F; + if (buffNb & 7) buffNb=2; /* most common : compressible (P) */ + else { + buffNb >>= 3; + if (buffNb & 7) { + const U32 tnb[2] = { 1, 3 }; /* barely/highly compressible */ + buffNb = tnb[buffNb >> 3]; + } else { + const U32 tnb[2] = { 0, 4 }; /* not compressible / sparse */ + buffNb = tnb[buffNb >> 3]; + } } + srcBuffer = cNoiseBuffer[buffNb]; + } + + /* compression init */ + if ((FUZ_rand(&lseed)&1) /* at beginning, to keep same nb of rand */ + && oldTestLog /* at least one test happened */ && resetAllowed) { + maxTestSize = FUZ_randomLength(&lseed, oldTestLog+2); + if (maxTestSize >= srcBufferSize) maxTestSize = srcBufferSize-1; + { int const compressionLevel = (FUZ_rand(&lseed) % 5) + 1; + size_t const resetError = ZSTDMT_initCStream(zc, compressionLevel); + CHECK(ZSTD_isError(resetError), "ZSTDMT_initCStream error : %s", ZSTD_getErrorName(resetError)); + } + } else { + U32 const testLog = FUZ_rand(&lseed) % maxSrcLog; + U32 const cLevel = (FUZ_rand(&lseed) % (ZSTD_maxCLevel() - (testLog/3))) + 1; + maxTestSize = FUZ_rLogLength(&lseed, testLog); + oldTestLog = testLog; + /* random dictionary selection */ + dictSize = ((FUZ_rand(&lseed)&63)==1) ? FUZ_randomLength(&lseed, maxSampleLog) : 0; + { size_t const dictStart = FUZ_rand(&lseed) % (srcBufferSize - dictSize); + dict = srcBuffer + dictStart; + } + { U64 const pledgedSrcSize = (FUZ_rand(&lseed) & 3) ? 0 : maxTestSize; + ZSTD_parameters params = ZSTD_getParams(cLevel, pledgedSrcSize, dictSize); + DISPLAYLEVEL(5, "Init with windowLog = %u \n", params.cParams.windowLog); + params.fParams.checksumFlag = FUZ_rand(&lseed) & 1; + params.fParams.noDictIDFlag = FUZ_rand(&lseed) & 1; + { size_t const initError = ZSTDMT_initCStream_advanced(zc, dict, dictSize, params, pledgedSrcSize); + CHECK (ZSTD_isError(initError),"ZSTDMT_initCStream_advanced error : %s", ZSTD_getErrorName(initError)); + } } } + + /* multi-segments compression test */ + XXH64_reset(&xxhState, 0); + { ZSTD_outBuffer outBuff = { cBuffer, cBufferSize, 0 } ; + U32 n; + for (n=0, cSize=0, totalTestSize=0 ; totalTestSize < maxTestSize ; n++) { + /* compress random chunks into randomly sized dst buffers */ + { size_t const randomSrcSize = FUZ_randomLength(&lseed, maxSampleLog); + size_t const srcSize = MIN (maxTestSize-totalTestSize, randomSrcSize); + size_t const srcStart = FUZ_rand(&lseed) % (srcBufferSize - srcSize); + size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog); + size_t const dstBuffSize = MIN(cBufferSize - cSize, randomDstSize); + ZSTD_inBuffer inBuff = { srcBuffer+srcStart, srcSize, 0 }; + outBuff.size = outBuff.pos + dstBuffSize; + + DISPLAYLEVEL(5, "Sending %u bytes to compress \n", (U32)srcSize); + { size_t const compressionError = ZSTDMT_compressStream(zc, &outBuff, &inBuff); + CHECK (ZSTD_isError(compressionError), "compression error : %s", ZSTD_getErrorName(compressionError)); } + DISPLAYLEVEL(5, "%u bytes read by ZSTDMT_compressStream \n", (U32)inBuff.pos); + + XXH64_update(&xxhState, srcBuffer+srcStart, inBuff.pos); + memcpy(copyBuffer+totalTestSize, srcBuffer+srcStart, inBuff.pos); + totalTestSize += inBuff.pos; + } + + /* random flush operation, to mess around */ + if ((FUZ_rand(&lseed) & 15) == 0) { + size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog); + size_t const adjustedDstSize = MIN(cBufferSize - cSize, randomDstSize); + outBuff.size = outBuff.pos + adjustedDstSize; + DISPLAYLEVEL(5, "Flushing into dst buffer of size %u \n", (U32)adjustedDstSize); + { size_t const flushError = ZSTDMT_flushStream(zc, &outBuff); + CHECK (ZSTD_isError(flushError), "flush error : %s", ZSTD_getErrorName(flushError)); + } } } + + /* final frame epilogue */ + { size_t remainingToFlush = (size_t)(-1); + while (remainingToFlush) { + size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog); + size_t const adjustedDstSize = MIN(cBufferSize - cSize, randomDstSize); + outBuff.size = outBuff.pos + adjustedDstSize; + DISPLAYLEVEL(5, "Ending into dst buffer of size %u \n", (U32)adjustedDstSize); + remainingToFlush = ZSTDMT_endStream(zc, &outBuff); + CHECK (ZSTD_isError(remainingToFlush), "flush error : %s", ZSTD_getErrorName(remainingToFlush)); + DISPLAYLEVEL(5, "endStream : remainingToFlush : %u \n", (U32)remainingToFlush); + } } + DISPLAYLEVEL(5, "Frame completed \n"); + crcOrig = XXH64_digest(&xxhState); + cSize = outBuff.pos; + } + + /* multi - fragments decompression test */ + if (!dictSize /* don't reset if dictionary : could be different */ && (FUZ_rand(&lseed) & 1)) { + CHECK (ZSTD_isError(ZSTD_resetDStream(zd)), "ZSTD_resetDStream failed"); + } else { + ZSTD_initDStream_usingDict(zd, dict, dictSize); + } + { size_t decompressionResult = 1; + ZSTD_inBuffer inBuff = { cBuffer, cSize, 0 }; + ZSTD_outBuffer outBuff= { dstBuffer, dstBufferSize, 0 }; + for (totalGenSize = 0 ; decompressionResult ; ) { + size_t const readCSrcSize = FUZ_randomLength(&lseed, maxSampleLog); + size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog); + size_t const dstBuffSize = MIN(dstBufferSize - totalGenSize, randomDstSize); + inBuff.size = inBuff.pos + readCSrcSize; + outBuff.size = inBuff.pos + dstBuffSize; + decompressionResult = ZSTD_decompressStream(zd, &outBuff, &inBuff); + CHECK (ZSTD_isError(decompressionResult), "decompression error : %s", ZSTD_getErrorName(decompressionResult)); + } + CHECK (outBuff.pos != totalTestSize, "decompressed data : wrong size (%u != %u)", (U32)outBuff.pos, (U32)totalTestSize); + CHECK (inBuff.pos != cSize, "compressed data should be fully read (%u != %u)", (U32)inBuff.pos, (U32)cSize); + { U64 const crcDest = XXH64(dstBuffer, totalTestSize, 0); + if (crcDest!=crcOrig) findDiff(copyBuffer, dstBuffer, totalTestSize); + CHECK (crcDest!=crcOrig, "decompressed data corrupted"); + } } + + /*===== noisy/erroneous src decompression test =====*/ + + /* add some noise */ + { U32 const nbNoiseChunks = (FUZ_rand(&lseed) & 7) + 2; + U32 nn; for (nn=0; nn='0') && (*argument<='9')) { @@ -783,7 +1037,7 @@ int main(int argc, const char** argv) } break; - case 'T': + case 'T': /* limit tests by time */ argument++; nbTests=0; g_clockTime=0; while ((*argument>='0') && (*argument<='9')) { @@ -796,7 +1050,7 @@ int main(int argc, const char** argv) g_clockTime *= CLOCKS_PER_SEC; break; - case 's': + case 's': /* manually select seed */ argument++; seed=0; seedset=1; @@ -807,7 +1061,7 @@ int main(int argc, const char** argv) } break; - case 't': + case 't': /* select starting test number */ argument++; testNb=0; while ((*argument>='0') && (*argument<='9')) { @@ -851,12 +1105,12 @@ int main(int argc, const char** argv) if (testNb==0) { result = basicUnitTests(0, ((double)proba) / 100, customNULL); /* constant seed for predictability */ if (!result) { - DISPLAYLEVEL(4, "Unit tests using customMem :\n") + DISPLAYLEVEL(3, "Unit tests using customMem :\n") result = basicUnitTests(0, ((double)proba) / 100, customMem); /* use custom memory allocation functions */ } } - if (!result) - result = fuzzerTests(seed, nbTests, testNb, ((double)proba) / 100); + if (!result && !mtOnly) result = fuzzerTests(seed, nbTests, testNb, ((double)proba) / 100); + if (!result) result = fuzzerTests_MT(seed, nbTests, testNb, ((double)proba) / 100); if (mainPause) { int unused;