From 70df5de1b2fc291a58c1b3199dde11b353ccad8f Mon Sep 17 00:00:00 2001 From: Yonatan Komornik <11005061+yoniko@users.noreply.github.com> Date: Mon, 24 Jan 2022 14:43:02 -0800 Subject: [PATCH] AsyncIO compression part 1 - refactor of existing asyncio code (#3021) * Refactored fileio.c: - Extracted asyncio code to fileio_asyncio.c/.h - Moved type definitions to fileio_types.h - Moved common macro definitions needed by both fileio.c and fileio_asyncio.c to fileio_common.h * Bugfix - rename fileio_asycio to fileio_asyncio * Added copyrights & license to new files * CR fixes --- build/VS2008/zstd/zstd.vcproj | 4 + build/VS2010/zstd/zstd.vcxproj | 1 + build/cmake/programs/CMakeLists.txt | 4 +- build/meson/programs/meson.build | 2 + contrib/VS2005/zstd/zstd.vcproj | 4 + programs/Makefile | 8 +- programs/fileio.c | 528 +++------------------------- programs/fileio.h | 8 +- programs/fileio_asyncio.c | 365 +++++++++++++++++++ programs/fileio_asyncio.h | 120 +++++++ programs/fileio_common.h | 117 ++++++ programs/fileio_types.h | 73 ++++ 12 files changed, 736 insertions(+), 498 deletions(-) create mode 100644 programs/fileio_asyncio.c create mode 100644 programs/fileio_asyncio.h create mode 100644 programs/fileio_common.h create mode 100644 programs/fileio_types.h diff --git a/build/VS2008/zstd/zstd.vcproj b/build/VS2008/zstd/zstd.vcproj index c7eec577..91f2bda5 100644 --- a/build/VS2008/zstd/zstd.vcproj +++ b/build/VS2008/zstd/zstd.vcproj @@ -384,6 +384,10 @@ RelativePath="..\..\..\programs\fileio.c" > + + diff --git a/build/VS2010/zstd/zstd.vcxproj b/build/VS2010/zstd/zstd.vcxproj index 46e22f42..8ab239dd 100644 --- a/build/VS2010/zstd/zstd.vcxproj +++ b/build/VS2010/zstd/zstd.vcxproj @@ -62,6 +62,7 @@ + diff --git a/build/cmake/programs/CMakeLists.txt b/build/cmake/programs/CMakeLists.txt index 49003078..28b1e1d1 100644 --- a/build/cmake/programs/CMakeLists.txt +++ b/build/cmake/programs/CMakeLists.txt @@ -32,7 +32,7 @@ if (MSVC) set(PlatformDependResources ${MSVC_RESOURCE_DIR}/zstd.rc) endif () -add_executable(zstd ${PROGRAMS_DIR}/zstdcli.c ${PROGRAMS_DIR}/util.c ${PROGRAMS_DIR}/timefn.c ${PROGRAMS_DIR}/fileio.c ${PROGRAMS_DIR}/benchfn.c ${PROGRAMS_DIR}/benchzstd.c ${PROGRAMS_DIR}/datagen.c ${PROGRAMS_DIR}/dibio.c ${PROGRAMS_DIR}/zstdcli_trace.c ${PlatformDependResources}) +add_executable(zstd ${PROGRAMS_DIR}/zstdcli.c ${PROGRAMS_DIR}/util.c ${PROGRAMS_DIR}/timefn.c ${PROGRAMS_DIR}/fileio.c ${PROGRAMS_DIR}/fileio_asyncio.c ${PROGRAMS_DIR}/benchfn.c ${PROGRAMS_DIR}/benchzstd.c ${PROGRAMS_DIR}/datagen.c ${PROGRAMS_DIR}/dibio.c ${PROGRAMS_DIR}/zstdcli_trace.c ${PlatformDependResources}) target_link_libraries(zstd ${PROGRAMS_ZSTD_LINK_TARGET}) if (CMAKE_SYSTEM_NAME MATCHES "(Solaris|SunOS)") target_link_libraries(zstd rt) @@ -75,7 +75,7 @@ if (UNIX) ${CMAKE_CURRENT_BINARY_DIR}/zstdless.1 DESTINATION "${MAN_INSTALL_DIR}") - add_executable(zstd-frugal ${PROGRAMS_DIR}/zstdcli.c ${PROGRAMS_DIR}/util.c ${PROGRAMS_DIR}/timefn.c ${PROGRAMS_DIR}/fileio.c) + add_executable(zstd-frugal ${PROGRAMS_DIR}/zstdcli.c ${PROGRAMS_DIR}/util.c ${PROGRAMS_DIR}/timefn.c ${PROGRAMS_DIR}/fileio.c ${PROGRAMS_DIR}/fileio_asyncio.c) target_link_libraries(zstd-frugal ${PROGRAMS_ZSTD_LINK_TARGET}) set_property(TARGET zstd-frugal APPEND PROPERTY COMPILE_DEFINITIONS "ZSTD_NOBENCH;ZSTD_NODICT;ZSTD_NOTRACE") endif () diff --git a/build/meson/programs/meson.build b/build/meson/programs/meson.build index 0ae93fc1..5ccd679a 100644 --- a/build/meson/programs/meson.build +++ b/build/meson/programs/meson.build @@ -14,6 +14,7 @@ zstd_programs_sources = [join_paths(zstd_rootdir, 'programs/zstdcli.c'), join_paths(zstd_rootdir, 'programs/util.c'), join_paths(zstd_rootdir, 'programs/timefn.c'), join_paths(zstd_rootdir, 'programs/fileio.c'), + join_paths(zstd_rootdir, 'programs/fileio_asyncio.c'), join_paths(zstd_rootdir, 'programs/benchfn.c'), join_paths(zstd_rootdir, 'programs/benchzstd.c'), join_paths(zstd_rootdir, 'programs/datagen.c'), @@ -80,6 +81,7 @@ zstd_frugal_sources = [join_paths(zstd_rootdir, 'programs/zstdcli.c'), join_paths(zstd_rootdir, 'programs/timefn.c'), join_paths(zstd_rootdir, 'programs/util.c'), join_paths(zstd_rootdir, 'programs/fileio.c'), + join_paths(zstd_rootdir, 'programs/fileio_asyncio.c'), join_paths(zstd_rootdir, 'lib/common/pool.c'), join_paths(zstd_rootdir, 'lib/common/zstd_common.c'), join_paths(zstd_rootdir, 'lib/common/error_private.c')] diff --git a/contrib/VS2005/zstd/zstd.vcproj b/contrib/VS2005/zstd/zstd.vcproj index 78645d18..e37ebee3 100644 --- a/contrib/VS2005/zstd/zstd.vcproj +++ b/contrib/VS2005/zstd/zstd.vcproj @@ -363,6 +363,10 @@ RelativePath="..\..\..\programs\fileio.c" > + + diff --git a/programs/Makefile b/programs/Makefile index f77e1b7f..16763e49 100644 --- a/programs/Makefile +++ b/programs/Makefile @@ -243,17 +243,17 @@ zstd-pgo : ## zstd-small: minimal target, supporting only zstd compression and decompression. no bench. no legacy. no other format. zstd-small: CFLAGS = -Os -s -zstd-frugal zstd-small: $(ZSTDLIB_CORE_SRC) zstdcli.c util.c timefn.c fileio.c +zstd-frugal zstd-small: $(ZSTDLIB_CORE_SRC) zstdcli.c util.c timefn.c fileio.c fileio_asyncio.c $(CC) $(FLAGS) -DZSTD_NOBENCH -DZSTD_NODICT -DZSTD_NOTRACE -UZSTD_LEGACY_SUPPORT -DZSTD_LEGACY_SUPPORT=0 $^ -o $@$(EXT) -zstd-decompress: $(ZSTDLIB_COMMON_SRC) $(ZSTDLIB_DECOMPRESS_SRC) zstdcli.c util.c timefn.c fileio.c +zstd-decompress: $(ZSTDLIB_COMMON_SRC) $(ZSTDLIB_DECOMPRESS_SRC) zstdcli.c util.c timefn.c fileio.c fileio_asyncio.c $(CC) $(FLAGS) -DZSTD_NOBENCH -DZSTD_NODICT -DZSTD_NOCOMPRESS -DZSTD_NOTRACE -UZSTD_LEGACY_SUPPORT -DZSTD_LEGACY_SUPPORT=0 $^ -o $@$(EXT) -zstd-compress: $(ZSTDLIB_COMMON_SRC) $(ZSTDLIB_COMPRESS_SRC) zstdcli.c util.c timefn.c fileio.c +zstd-compress: $(ZSTDLIB_COMMON_SRC) $(ZSTDLIB_COMPRESS_SRC) zstdcli.c util.c timefn.c fileio.c fileio_asyncio.c $(CC) $(FLAGS) -DZSTD_NOBENCH -DZSTD_NODICT -DZSTD_NODECOMPRESS -DZSTD_NOTRACE -UZSTD_LEGACY_SUPPORT -DZSTD_LEGACY_SUPPORT=0 $^ -o $@$(EXT) ## zstd-dictBuilder: executable supporting dictionary creation and compression (only) -zstd-dictBuilder: $(ZSTDLIB_COMMON_SRC) $(ZSTDLIB_COMPRESS_SRC) $(ZDICT_SRC) zstdcli.c util.c timefn.c fileio.c dibio.c +zstd-dictBuilder: $(ZSTDLIB_COMMON_SRC) $(ZSTDLIB_COMPRESS_SRC) $(ZDICT_SRC) zstdcli.c util.c timefn.c fileio.c fileio_asyncio.c dibio.c $(CC) $(FLAGS) -DZSTD_NOBENCH -DZSTD_NODECOMPRESS -DZSTD_NOTRACE $^ -o $@$(EXT) zstdmt: zstd diff --git a/programs/fileio.c b/programs/fileio.c index d40ebbc1..2066096d 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -34,16 +34,18 @@ #include /* INT_MAX */ #include #include "timefn.h" /* UTIL_getTime, UTIL_clockSpanMicro */ -#include "../lib/common/pool.h" -#include "../lib/common/threading.h" #if defined (_MSC_VER) # include # include #endif -#include "../lib/common/mem.h" /* U32, U64 */ #include "fileio.h" +#include "fileio_asyncio.h" +#include "fileio_common.h" + +FIO_display_prefs_t g_display_prefs = {2, FIO_ps_auto}; +UTIL_time_t g_displayClock = UTIL_TIME_INITIALIZER; #define ZSTD_STATIC_LINKING_ONLY /* ZSTD_magicNumber, ZSTD_frameHeaderSize_max */ #include "../lib/zstd.h" @@ -84,62 +86,6 @@ #define DEFAULT_FILE_PERMISSIONS (0666) #endif -/*-************************************* -* Macros -***************************************/ -#define KB *(1 <<10) -#define MB *(1 <<20) -#define GB *(1U<<30) -#undef MAX -#define MAX(a,b) ((a)>(b) ? (a) : (b)) - -struct FIO_display_prefs_s { - int displayLevel; /* 0 : no display; 1: errors; 2: + result + interaction + warnings; 3: + progression; 4: + information */ - FIO_progressSetting_e progressSetting; -}; - -static FIO_display_prefs_t g_display_prefs = {2, FIO_ps_auto}; - -#define DISPLAY(...) fprintf(stderr, __VA_ARGS__) -#define DISPLAYOUT(...) fprintf(stdout, __VA_ARGS__) -#define DISPLAYLEVEL(l, ...) { if (g_display_prefs.displayLevel>=l) { DISPLAY(__VA_ARGS__); } } - -static const U64 g_refreshRate = SEC_TO_MICRO / 6; -static UTIL_time_t g_displayClock = UTIL_TIME_INITIALIZER; - -#define READY_FOR_UPDATE() ((g_display_prefs.progressSetting != FIO_ps_never) && UTIL_clockSpanMicro(g_displayClock) > g_refreshRate) -#define DELAY_NEXT_UPDATE() { g_displayClock = UTIL_getTime(); } -#define DISPLAYUPDATE(l, ...) { \ - if (g_display_prefs.displayLevel>=l && (g_display_prefs.progressSetting != FIO_ps_never)) { \ - if (READY_FOR_UPDATE() || (g_display_prefs.displayLevel>=4)) { \ - DELAY_NEXT_UPDATE(); \ - DISPLAY(__VA_ARGS__); \ - if (g_display_prefs.displayLevel>=4) fflush(stderr); \ - } } } - -#undef MIN /* in case it would be already defined */ -#define MIN(a,b) ((a) < (b) ? (a) : (b)) - - -#define EXM_THROW(error, ...) \ -{ \ - DISPLAYLEVEL(1, "zstd: "); \ - DISPLAYLEVEL(5, "Error defined at %s, line %i : \n", __FILE__, __LINE__); \ - DISPLAYLEVEL(1, "error %i : ", error); \ - DISPLAYLEVEL(1, __VA_ARGS__); \ - DISPLAYLEVEL(1, " \n"); \ - exit(error); \ -} - -#define CHECK_V(v, f) \ - v = f; \ - if (ZSTD_isError(v)) { \ - DISPLAYLEVEL(5, "%s \n", #f); \ - EXM_THROW(11, "%s", ZSTD_getErrorName(v)); \ - } -#define CHECK(f) { size_t err; CHECK_V(err, f); } - - /*-************************************ * Signal (Ctrl-C trapping) **************************************/ @@ -250,95 +196,6 @@ void FIO_addAbortHandler() #endif } - -/*-************************************************************ -* Avoid fseek()'s 2GiB barrier with MSVC, macOS, *BSD, MinGW -***************************************************************/ -#if defined(_MSC_VER) && _MSC_VER >= 1400 -# define LONG_SEEK _fseeki64 -# define LONG_TELL _ftelli64 -#elif !defined(__64BIT__) && (PLATFORM_POSIX_VERSION >= 200112L) /* No point defining Large file for 64 bit */ -# define LONG_SEEK fseeko -# define LONG_TELL ftello -#elif defined(__MINGW32__) && !defined(__STRICT_ANSI__) && !defined(__NO_MINGW_LFS) && defined(__MSVCRT__) -# define LONG_SEEK fseeko64 -# define LONG_TELL ftello64 -#elif defined(_WIN32) && !defined(__DJGPP__) -# include - static int LONG_SEEK(FILE* file, __int64 offset, int origin) { - LARGE_INTEGER off; - DWORD method; - off.QuadPart = offset; - if (origin == SEEK_END) - method = FILE_END; - else if (origin == SEEK_CUR) - method = FILE_CURRENT; - else - method = FILE_BEGIN; - - if (SetFilePointerEx((HANDLE) _get_osfhandle(_fileno(file)), off, NULL, method)) - return 0; - else - return -1; - } - static __int64 LONG_TELL(FILE* file) { - LARGE_INTEGER off, newOff; - off.QuadPart = 0; - newOff.QuadPart = 0; - SetFilePointerEx((HANDLE) _get_osfhandle(_fileno(file)), off, &newOff, FILE_CURRENT); - return newOff.QuadPart; - } -#else -# define LONG_SEEK fseek -# define LONG_TELL ftell -#endif - - -/*-************************************* -* Parameters: FIO_prefs_t -***************************************/ - -/* typedef'd to FIO_prefs_t within fileio.h */ -struct FIO_prefs_s { - - /* Algorithm preferences */ - FIO_compressionType_t compressionType; - U32 sparseFileSupport; /* 0: no sparse allowed; 1: auto (file yes, stdout no); 2: force sparse */ - int dictIDFlag; - int checksumFlag; - int blockSize; - int overlapLog; - U32 adaptiveMode; - U32 useRowMatchFinder; - int rsyncable; - int minAdaptLevel; - int maxAdaptLevel; - int ldmFlag; - int ldmHashLog; - int ldmMinMatch; - int ldmBucketSizeLog; - int ldmHashRateLog; - size_t streamSrcSize; - size_t targetCBlockSize; - int srcSizeHint; - int testMode; - ZSTD_paramSwitch_e literalCompressionMode; - - /* IO preferences */ - U32 removeSrcFile; - U32 overwrite; - U32 asyncIO; - - /* Computation resources preferences */ - unsigned memLimit; - int nbWorkers; - - int excludeCompressedFiles; - int patchFromMode; - int contentSize; - int allowBlockDevices; -}; - /*-************************************* * Parameters: FIO_ctx_t ***************************************/ @@ -563,7 +420,13 @@ void FIO_setContentSize(FIO_prefs_t* const prefs, int value) } void FIO_setAsyncIOFlag(FIO_prefs_t* const prefs, unsigned value) { +#ifdef ZSTD_MULTITHREAD prefs->asyncIO = value; +#else + (void) prefs; + (void) value; + DISPLAYLEVEL(2, "Note : asyncio is disabled (lack of multithreading support) \n"); +#endif } /* FIO_ctx_t functions */ @@ -2019,124 +1882,15 @@ int FIO_compressMultipleFilenames(FIO_ctx_t* const fCtx, /* ************************************************************************** * Decompression ***************************************************************************/ -#define DECOMPRESSION_MAX_WRITE_JOBS (10) - -typedef struct { - /* These struct fields should be set only on creation and not changed afterwards */ - POOL_ctx* writerPool; - int totalWriteJobs; - FIO_prefs_t* prefs; - - /* Controls the file we currently write to, make changes only by using provided utility functions */ - FILE* dstFile; - unsigned storedSkips; - - /* The jobs and availableWriteJobs fields are access by both the main and writer threads and should - * only be mutated after locking the mutex */ - ZSTD_pthread_mutex_t writeJobsMutex; - void* jobs[DECOMPRESSION_MAX_WRITE_JOBS]; - int availableWriteJobs; -} write_pool_ctx_t; - -typedef struct { - /* These fields are automaically set and shouldn't be changed by non WritePool code. */ - write_pool_ctx_t *ctx; - FILE* dstFile; - void *buffer; - size_t bufferSize; - - /* This field should be changed before a job is queued for execution and should contain the number - * of bytes to write from the buffer. */ - size_t usedBufferSize; -} write_job_t; typedef struct { void* srcBuffer; size_t srcBufferSize; size_t srcBufferLoaded; ZSTD_DStream* dctx; - write_pool_ctx_t *writePoolCtx; + WritePoolCtx_t *writeCtx; } dRess_t; -static write_job_t *FIO_createWriteJob(write_pool_ctx_t *ctx) { - void *buffer; - write_job_t *job; - job = (write_job_t*) malloc(sizeof(write_job_t)); - buffer = malloc(ZSTD_DStreamOutSize()); - if(!job || !buffer) - EXM_THROW(101, "Allocation error : not enough memory"); - job->buffer = buffer; - job->bufferSize = ZSTD_DStreamOutSize(); - job->usedBufferSize = 0; - job->dstFile = NULL; - job->ctx = ctx; - return job; -} - -/* WritePool_createThreadPool: - * Creates a thread pool and a mutex for threaded write pool. - * Displays warning if asyncio is requested but MT isn't available. */ -static void WritePool_createThreadPool(write_pool_ctx_t *ctx, const FIO_prefs_t *prefs) { - ctx->writerPool = NULL; - if(prefs->asyncIO) { -#ifdef ZSTD_MULTITHREAD - if (ZSTD_pthread_mutex_init(&ctx->writeJobsMutex, NULL)) - EXM_THROW(102, "Failed creating write jobs mutex"); - /* We want DECOMPRESSION_MAX_WRITE_JOBS-2 queue items because we need to always have 1 free buffer to - * decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */ - assert(DECOMPRESSION_MAX_WRITE_JOBS >= 2); - ctx->writerPool = POOL_create(1, DECOMPRESSION_MAX_WRITE_JOBS - 2); - if (!ctx->writerPool) - EXM_THROW(103, "Failed creating writer thread pool"); -#else - DISPLAYLEVEL(2, "Note : asyncio decompression is disabled (lack of multithreading support) \n"); -#endif - } -} - -/* WritePool_create: - * Allocates and sets and a new write pool including its included jobs. */ -static write_pool_ctx_t* WritePool_create(FIO_prefs_t* const prefs) { - write_pool_ctx_t *ctx; - int i; - ctx = (write_pool_ctx_t*) malloc(sizeof(write_pool_ctx_t)); - if(!ctx) - EXM_THROW(100, "Allocation error : not enough memory"); - WritePool_createThreadPool(ctx, prefs); - ctx->prefs = prefs; - ctx->totalWriteJobs = ctx->writerPool ? DECOMPRESSION_MAX_WRITE_JOBS : 1; - ctx->availableWriteJobs = ctx->totalWriteJobs; - for(i=0; i < ctx->availableWriteJobs; i++) { - ctx->jobs[i] = FIO_createWriteJob(ctx); - } - ctx->storedSkips = 0; - ctx->dstFile = NULL; - return ctx; -} - -/* WritePool_free: - * Release a previously allocated write thread pool. Makes sure all takss are done and released. */ -static void WritePool_free(write_pool_ctx_t* ctx) { - int i=0; - if(ctx->writerPool) { - /* Make sure we finish all tasks and then free the resources */ - POOL_joinJobs(ctx->writerPool); - /* Make sure we are not leaking jobs */ - assert(ctx->availableWriteJobs==ctx->totalWriteJobs); - POOL_free(ctx->writerPool); - ZSTD_pthread_mutex_destroy(&ctx->writeJobsMutex); - } - assert(ctx->dstFile==NULL); - assert(ctx->storedSkips==0); - for(i=0; iavailableWriteJobs; i++) { - write_job_t* job = (write_job_t*) ctx->jobs[i]; - free(job->buffer); - free(job); - } - free(ctx); -} - - static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFileName) { dRess_t ress; @@ -2164,7 +1918,7 @@ static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFi free(dictBuffer); } - ress.writePoolCtx = WritePool_create(prefs); + ress.writeCtx = AIO_WritePool_create(prefs, ZSTD_DStreamOutSize()); return ress; } @@ -2173,7 +1927,7 @@ static void FIO_freeDResources(dRess_t ress) { CHECK( ZSTD_freeDStream(ress.dctx) ); free(ress.srcBuffer); - WritePool_free(ress.writePoolCtx); + AIO_WritePool_free(ress.writeCtx); } /* FIO_consumeDSrcBuffer: @@ -2184,205 +1938,6 @@ static void FIO_consumeDSrcBuffer(dRess_t *ress, size_t len) { memmove(ress->srcBuffer, (char *)ress->srcBuffer + len, ress->srcBufferLoaded); } -/** FIO_fwriteSparse() : -* @return : storedSkips, -* argument for next call to FIO_fwriteSparse() or FIO_fwriteSparseEnd() */ -static unsigned -FIO_fwriteSparse(FILE* file, - const void* buffer, size_t bufferSize, - const FIO_prefs_t* const prefs, - unsigned storedSkips) -{ - const size_t* const bufferT = (const size_t*)buffer; /* Buffer is supposed malloc'ed, hence aligned on size_t */ - size_t bufferSizeT = bufferSize / sizeof(size_t); - const size_t* const bufferTEnd = bufferT + bufferSizeT; - const size_t* ptrT = bufferT; - static const size_t segmentSizeT = (32 KB) / sizeof(size_t); /* check every 32 KB */ - - if (prefs->testMode) return 0; /* do not output anything in test mode */ - - if (!prefs->sparseFileSupport) { /* normal write */ - size_t const sizeCheck = fwrite(buffer, 1, bufferSize, file); - if (sizeCheck != bufferSize) - EXM_THROW(70, "Write error : cannot write decoded block : %s", - strerror(errno)); - return 0; - } - - /* avoid int overflow */ - if (storedSkips > 1 GB) { - if (LONG_SEEK(file, 1 GB, SEEK_CUR) != 0) - EXM_THROW(91, "1 GB skip error (sparse file support)"); - storedSkips -= 1 GB; - } - - while (ptrT < bufferTEnd) { - size_t nb0T; - - /* adjust last segment if < 32 KB */ - size_t seg0SizeT = segmentSizeT; - if (seg0SizeT > bufferSizeT) seg0SizeT = bufferSizeT; - bufferSizeT -= seg0SizeT; - - /* count leading zeroes */ - for (nb0T=0; (nb0T < seg0SizeT) && (ptrT[nb0T] == 0); nb0T++) ; - storedSkips += (unsigned)(nb0T * sizeof(size_t)); - - if (nb0T != seg0SizeT) { /* not all 0s */ - size_t const nbNon0ST = seg0SizeT - nb0T; - /* skip leading zeros */ - if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0) - EXM_THROW(92, "Sparse skip error ; try --no-sparse"); - storedSkips = 0; - /* write the rest */ - if (fwrite(ptrT + nb0T, sizeof(size_t), nbNon0ST, file) != nbNon0ST) - EXM_THROW(93, "Write error : cannot write decoded block : %s", - strerror(errno)); - } - ptrT += seg0SizeT; - } - - { static size_t const maskT = sizeof(size_t)-1; - if (bufferSize & maskT) { - /* size not multiple of sizeof(size_t) : implies end of block */ - const char* const restStart = (const char*)bufferTEnd; - const char* restPtr = restStart; - const char* const restEnd = (const char*)buffer + bufferSize; - assert(restEnd > restStart && restEnd < restStart + sizeof(size_t)); - for ( ; (restPtr < restEnd) && (*restPtr == 0); restPtr++) ; - storedSkips += (unsigned) (restPtr - restStart); - if (restPtr != restEnd) { - /* not all remaining bytes are 0 */ - size_t const restSize = (size_t)(restEnd - restPtr); - if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0) - EXM_THROW(92, "Sparse skip error ; try --no-sparse"); - if (fwrite(restPtr, 1, restSize, file) != restSize) - EXM_THROW(95, "Write error : cannot write end of decoded block : %s", - strerror(errno)); - storedSkips = 0; - } } } - - return storedSkips; -} - -static void -FIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips) -{ - if (prefs->testMode) assert(storedSkips == 0); - if (storedSkips>0) { - assert(prefs->sparseFileSupport > 0); /* storedSkips>0 implies sparse support is enabled */ - (void)prefs; /* assert can be disabled, in which case prefs becomes unused */ - if (LONG_SEEK(file, storedSkips-1, SEEK_CUR) != 0) - EXM_THROW(69, "Final skip error (sparse file support)"); - /* last zero must be explicitly written, - * so that skipped ones get implicitly translated as zero by FS */ - { const char lastZeroByte[1] = { 0 }; - if (fwrite(lastZeroByte, 1, 1, file) != 1) - EXM_THROW(69, "Write error : cannot write last zero : %s", strerror(errno)); - } } -} - -/* WritePool_releaseWriteJob: - * Releases an acquired job back to the pool. Doesn't execute the job. */ -static void WritePool_releaseWriteJob(write_job_t *job) { - write_pool_ctx_t *ctx = job->ctx; - if(ctx->writerPool) { - ZSTD_pthread_mutex_lock(&ctx->writeJobsMutex); - assert(ctx->availableWriteJobs < DECOMPRESSION_MAX_WRITE_JOBS); - ctx->jobs[ctx->availableWriteJobs++] = job; - ZSTD_pthread_mutex_unlock(&ctx->writeJobsMutex); - } else { - ctx->availableWriteJobs++; - } -} - -/* WritePool_acquireWriteJob: - * Returns an available write job to be used for a future write. */ -static write_job_t* WritePool_acquireWriteJob(write_pool_ctx_t *ctx) { - write_job_t *job; - assert(ctx->dstFile!=NULL || ctx->prefs->testMode); - if(ctx->writerPool) { - ZSTD_pthread_mutex_lock(&ctx->writeJobsMutex); - assert(ctx->availableWriteJobs > 0); - job = (write_job_t*) ctx->jobs[--ctx->availableWriteJobs]; - ZSTD_pthread_mutex_unlock(&ctx->writeJobsMutex); - } else { - assert(ctx->availableWriteJobs==1); - ctx->availableWriteJobs--; - job = (write_job_t*)ctx->jobs[0]; - } - job->usedBufferSize = 0; - job->dstFile = ctx->dstFile; - return job; -} - -/* WritePool_executeWriteJob: - * Executes a write job synchronously. Can be used as a function for a thread pool. */ -static void WritePool_executeWriteJob(void* opaque){ - write_job_t* job = (write_job_t*) opaque; - write_pool_ctx_t* ctx = job->ctx; - ctx->storedSkips = FIO_fwriteSparse(job->dstFile, job->buffer, job->usedBufferSize, ctx->prefs, ctx->storedSkips); - WritePool_releaseWriteJob(job); -} - -/* WritePool_queueWriteJob: - * Queues a write job for execution. - * Make sure to set `usedBufferSize` to the wanted length before call. - * The queued job shouldn't be used directly after queueing it. */ -static void WritePool_queueWriteJob(write_job_t *job) { - write_pool_ctx_t* ctx = job->ctx; - if(ctx->writerPool) - POOL_add(ctx->writerPool, WritePool_executeWriteJob, job); - else - WritePool_executeWriteJob(job); -} - -/* WritePool_queueAndReacquireWriteJob: - * Queues a write job for execution and acquires a new one. - * After execution `job`'s pointed value would change to the newly acquired job. - * Make sure to set `usedBufferSize` to the wanted length before call. - * The queued job shouldn't be used directly after queueing it. */ -static void WritePool_queueAndReacquireWriteJob(write_job_t **job) { - WritePool_queueWriteJob(*job); - *job = WritePool_acquireWriteJob((*job)->ctx); -} - -/* WritePool_sparseWriteEnd: - * Ends sparse writes to the current dstFile. - * Blocks on completion of all current write jobs before executing. */ -static void WritePool_sparseWriteEnd(write_pool_ctx_t* ctx) { - assert(ctx != NULL); - if(ctx->writerPool) - POOL_joinJobs(ctx->writerPool); - FIO_fwriteSparseEnd(ctx->prefs, ctx->dstFile, ctx->storedSkips); - ctx->storedSkips = 0; -} - -/* WritePool_setDstFile: - * Sets the destination file for future files in the pool. - * Requires completion of all queues write jobs and release of all otherwise acquired jobs. - * Also requires ending of sparse write if a previous file was used in sparse mode. */ -static void WritePool_setDstFile(write_pool_ctx_t *ctx, FILE* dstFile) { - assert(ctx!=NULL); - /* We can change the dst file only if we have finished writing */ - if(ctx->writerPool) - POOL_joinJobs(ctx->writerPool); - assert(ctx->storedSkips == 0); - assert(ctx->availableWriteJobs == ctx->totalWriteJobs); - ctx->dstFile = dstFile; -} - -/* WritePool_closeDstFile: - * Ends sparse write and closes the writePool's current dstFile and sets the dstFile to NULL. - * Requires completion of all queues write jobs and release of all otherwise acquired jobs. */ -static int WritePool_closeDstFile(write_pool_ctx_t *ctx) { - FILE *dstFile = ctx->dstFile; - assert(dstFile!=NULL || ctx->prefs->testMode!=0); - WritePool_sparseWriteEnd(ctx); - WritePool_setDstFile(ctx, NULL); - return fclose(dstFile); -} - /** FIO_passThrough() : just copy input into output, for compatibility with gzip -df mode @return : 0 (no error) */ static int FIO_passThrough(const FIO_prefs_t* const prefs, @@ -2403,7 +1958,7 @@ static int FIO_passThrough(const FIO_prefs_t* const prefs, do { readFromInput = fread(buffer, 1, blockSize, finput); - storedSkips = FIO_fwriteSparse(foutput, buffer, readFromInput, prefs, storedSkips); + storedSkips = AIO_fwriteSparse(foutput, buffer, readFromInput, prefs, storedSkips); } while (readFromInput == blockSize); if (ferror(finput)) { DISPLAYLEVEL(1, "Pass-through read error : %s\n", strerror(errno)); @@ -2411,7 +1966,7 @@ static int FIO_passThrough(const FIO_prefs_t* const prefs, } assert(feof(finput)); - FIO_fwriteSparseEnd(prefs, foutput, storedSkips); + AIO_fwriteSparseEnd(prefs, foutput, storedSkips); return 0; } @@ -2458,7 +2013,7 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput, U64 alreadyDecoded) /* for multi-frames streams */ { U64 frameSize = 0; - write_job_t *writeJob = WritePool_acquireWriteJob(ress->writePoolCtx); + IOJob_t *writeJob = AIO_WritePool_acquireJob(ress->writeCtx); /* display last 20 characters only */ { size_t const srcFileLength = strlen(srcFileName); @@ -2486,12 +2041,13 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput, DISPLAYLEVEL(1, "%s : Decoding error (36) : %s \n", srcFileName, ZSTD_getErrorName(readSizeHint)); FIO_zstdErrorHelp(prefs, ress, readSizeHint, srcFileName); + AIO_WritePool_releaseIoJob(writeJob); return FIO_ERROR_FRAME_DECODING; } /* Write block */ writeJob->usedBufferSize = outBuff.pos; - WritePool_queueAndReacquireWriteJob(&writeJob); + AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob); frameSize += outBuff.pos; if (fCtx->nbFilesTotal > 1) { size_t srcFileNameSize = strlen(srcFileName); @@ -2526,8 +2082,8 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput, ress->srcBufferLoaded += readSize; } } } - WritePool_releaseWriteJob(writeJob); - WritePool_sparseWriteEnd(ress->writePoolCtx); + AIO_WritePool_releaseIoJob(writeJob); + AIO_WritePool_sparseWriteEnd(ress->writeCtx); return frameSize; } @@ -2541,7 +2097,7 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName) z_stream strm; int flush = Z_NO_FLUSH; int decodingError = 0; - write_job_t *writeJob = NULL; + IOJob_t *writeJob = NULL; strm.zalloc = Z_NULL; strm.zfree = Z_NULL; @@ -2552,7 +2108,7 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName) if (inflateInit2(&strm, 15 /* maxWindowLogSize */ + 16 /* gzip only */) != Z_OK) return FIO_ERROR_FRAME_DECODING; - writeJob = WritePool_acquireWriteJob(ress->writePoolCtx); + writeJob = AIO_WritePool_acquireJob(ress->writeCtx); strm.next_out = (Bytef*)writeJob->buffer; strm.avail_out = (uInt)writeJob->bufferSize; strm.avail_in = (uInt)ress->srcBufferLoaded; @@ -2578,7 +2134,7 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName) { size_t const decompBytes = writeJob->bufferSize - strm.avail_out; if (decompBytes) { writeJob->usedBufferSize = decompBytes; - WritePool_queueAndReacquireWriteJob(&writeJob); + AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob); outFileSize += decompBytes; strm.next_out = (Bytef*)writeJob->buffer; strm.avail_out = (uInt)writeJob->bufferSize; @@ -2594,8 +2150,8 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName) DISPLAYLEVEL(1, "zstd: %s: inflateEnd error \n", srcFileName); decodingError = 1; } - WritePool_releaseWriteJob(writeJob); - WritePool_sparseWriteEnd(ress->writePoolCtx); + AIO_WritePool_releaseIoJob(writeJob); + AIO_WritePool_sparseWriteEnd(ress->writeCtx); return decodingError ? FIO_ERROR_FRAME_DECODING : outFileSize; } #endif @@ -2610,7 +2166,7 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile, lzma_action action = LZMA_RUN; lzma_ret initRet; int decodingError = 0; - write_job_t *writeJob = NULL; + IOJob_t *writeJob = NULL; strm.next_in = 0; strm.avail_in = 0; @@ -2627,7 +2183,7 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile, return FIO_ERROR_FRAME_DECODING; } - writeJob = WritePool_acquireWriteJob(ress->writePoolCtx); + writeJob = AIO_WritePool_acquireJob(ress->writeCtx); strm.next_out = (Bytef*)writeJob->buffer; strm.avail_out = (uInt)writeJob->bufferSize; strm.next_in = (BYTE const*)ress->srcBuffer; @@ -2655,7 +2211,7 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile, { size_t const decompBytes = writeJob->bufferSize - strm.avail_out; if (decompBytes) { writeJob->usedBufferSize = decompBytes; - WritePool_queueAndReacquireWriteJob(&writeJob); + AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob); outFileSize += decompBytes; strm.next_out = (Bytef*)writeJob->buffer; strm.avail_out = writeJob->bufferSize; @@ -2665,8 +2221,8 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile, FIO_consumeDSrcBuffer(ress, ress->srcBufferLoaded - strm.avail_in); lzma_end(&strm); - WritePool_releaseWriteJob(writeJob); - WritePool_sparseWriteEnd(ress->writePoolCtx); + AIO_WritePool_releaseIoJob(writeJob); + AIO_WritePool_sparseWriteEnd(ress->writeCtx); return decodingError ? FIO_ERROR_FRAME_DECODING : outFileSize; } #endif @@ -2681,13 +2237,15 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile, LZ4F_decompressionContext_t dCtx; LZ4F_errorCode_t const errorCode = LZ4F_createDecompressionContext(&dCtx, LZ4F_VERSION); int decodingError = 0; - write_job_t *writeJob = WritePool_acquireWriteJob(ress->writePoolCtx); + IOJob_t *writeJob = NULL; if (LZ4F_isError(errorCode)) { DISPLAYLEVEL(1, "zstd: failed to create lz4 decompression context \n"); return FIO_ERROR_FRAME_DECODING; } + writeJob = AIO_WritePool_acquireJob(ress->writeCtx); + /* Main Loop */ for (;nextToLoad;) { size_t readSize; @@ -2724,7 +2282,7 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile, if (decodedBytes) { UTIL_HumanReadableSize_t hrs; writeJob->usedBufferSize = decodedBytes; - WritePool_queueAndReacquireWriteJob(&writeJob); + AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob); filesize += decodedBytes; hrs = UTIL_makeHumanReadableSize(filesize); DISPLAYUPDATE(2, "\rDecompressed : %.*f%s ", hrs.precision, hrs.value, hrs.suffix); @@ -2740,8 +2298,8 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile, } LZ4F_freeDecompressionContext(dCtx); - WritePool_releaseWriteJob(writeJob); - WritePool_sparseWriteEnd(ress->writePoolCtx); + AIO_WritePool_releaseIoJob(writeJob); + AIO_WritePool_sparseWriteEnd(ress->writeCtx); return decodingError ? FIO_ERROR_FRAME_DECODING : filesize; } @@ -2818,7 +2376,7 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx, #endif } else if ((prefs->overwrite) && !strcmp (dstFileName, stdoutmark)) { /* pass-through mode */ return FIO_passThrough(prefs, - ress.writePoolCtx->dstFile, srcFile, + AIO_WritePool_getFile(ress.writeCtx), srcFile, ress.srcBuffer, ress.srcBufferSize, ress.srcBufferLoaded); } else { @@ -2856,7 +2414,7 @@ static int FIO_decompressDstFile(FIO_ctx_t* const fCtx, int releaseDstFile = 0; int transferMTime = 0; - if ((ress.writePoolCtx->dstFile == NULL) && (prefs->testMode==0)) { + if ((AIO_WritePool_getFile(ress.writeCtx) == NULL) && (prefs->testMode == 0)) { FILE *dstFile; int dstFilePermissions = DEFAULT_FILE_PERMISSIONS; if ( strcmp(srcFileName, stdinmark) /* special case : don't transfer permissions from stdin */ @@ -2871,7 +2429,7 @@ static int FIO_decompressDstFile(FIO_ctx_t* const fCtx, dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFilePermissions); if (dstFile==NULL) return 1; - WritePool_setDstFile(ress.writePoolCtx, dstFile); + AIO_WritePool_setFile(ress.writeCtx, dstFile); /* Must only be added after FIO_openDstFile() succeeds. * Otherwise we may delete the destination file if it already exists, @@ -2884,7 +2442,7 @@ static int FIO_decompressDstFile(FIO_ctx_t* const fCtx, if (releaseDstFile) { clearHandler(); - if (WritePool_closeDstFile(ress.writePoolCtx)) { + if (AIO_WritePool_closeFile(ress.writeCtx)) { DISPLAYLEVEL(1, "zstd: %s: %s \n", dstFileName, strerror(errno)); result = 1; } @@ -3100,14 +2658,14 @@ FIO_decompressMultipleFilenames(FIO_ctx_t* const fCtx, if (!prefs->testMode) { FILE* dstFile = FIO_openDstFile(fCtx, prefs, NULL, outFileName, DEFAULT_FILE_PERMISSIONS); if (dstFile == 0) EXM_THROW(19, "cannot open %s", outFileName); - WritePool_setDstFile(ress.writePoolCtx, dstFile); + AIO_WritePool_setFile(ress.writeCtx, dstFile); } for (; fCtx->currFileIdx < fCtx->nbFilesTotal; fCtx->currFileIdx++) { status = FIO_decompressSrcFile(fCtx, prefs, ress, outFileName, srcNamesTable[fCtx->currFileIdx]); if (!status) fCtx->nbFilesProcessed++; error |= status; } - if ((!prefs->testMode) && (WritePool_closeDstFile(ress.writePoolCtx))) + if ((!prefs->testMode) && (AIO_WritePool_closeFile(ress.writeCtx))) EXM_THROW(72, "Write error : %s : cannot properly close output file", strerror(errno)); } else { diff --git a/programs/fileio.h b/programs/fileio.h index 398937a6..9d6ebb1a 100644 --- a/programs/fileio.h +++ b/programs/fileio.h @@ -13,6 +13,7 @@ #define FILEIO_H_23981798732 #define ZSTD_STATIC_LINKING_ONLY /* ZSTD_compressionParameters */ +#include "fileio_types.h" #include "../lib/zstd.h" /* ZSTD_* */ #if defined (__cplusplus) @@ -53,10 +54,6 @@ extern "C" { /*-************************************* * Types ***************************************/ -typedef enum { FIO_zstdCompression, FIO_gzipCompression, FIO_xzCompression, FIO_lzmaCompression, FIO_lz4Compression } FIO_compressionType_t; - -typedef struct FIO_prefs_s FIO_prefs_t; - FIO_prefs_t* FIO_createPreferences(void); void FIO_freePreferences(FIO_prefs_t* const prefs); @@ -66,9 +63,6 @@ typedef struct FIO_ctx_s FIO_ctx_t; FIO_ctx_t* FIO_createContext(void); void FIO_freeContext(FIO_ctx_t* const fCtx); -typedef struct FIO_display_prefs_s FIO_display_prefs_t; - -typedef enum { FIO_ps_auto, FIO_ps_never, FIO_ps_always } FIO_progressSetting_e; /*-************************************* * Parameters diff --git a/programs/fileio_asyncio.c b/programs/fileio_asyncio.c new file mode 100644 index 00000000..868720a1 --- /dev/null +++ b/programs/fileio_asyncio.c @@ -0,0 +1,365 @@ +/* + * Copyright (c) Yann Collet, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + * You may select, at your option, one of the above-listed licenses. + */ + +#include "platform.h" +#include /* fprintf, open, fdopen, fread, _fileno, stdin, stdout */ +#include /* malloc, free */ +#include +#include /* errno */ + +#if defined (_MSC_VER) +# include +# include +#endif + +#include "fileio_asyncio.h" +#include "fileio_common.h" + +/* ********************************************************************** + * Sparse write + ************************************************************************/ + +/** AIO_fwriteSparse() : +* @return : storedSkips, +* argument for next call to AIO_fwriteSparse() or AIO_fwriteSparseEnd() */ +unsigned AIO_fwriteSparse(FILE* file, + const void* buffer, size_t bufferSize, + const FIO_prefs_t* const prefs, + unsigned storedSkips) +{ + const size_t* const bufferT = (const size_t*)buffer; /* Buffer is supposed malloc'ed, hence aligned on size_t */ + size_t bufferSizeT = bufferSize / sizeof(size_t); + const size_t* const bufferTEnd = bufferT + bufferSizeT; + const size_t* ptrT = bufferT; + static const size_t segmentSizeT = (32 KB) / sizeof(size_t); /* check every 32 KB */ + + if (prefs->testMode) return 0; /* do not output anything in test mode */ + + if (!prefs->sparseFileSupport) { /* normal write */ + size_t const sizeCheck = fwrite(buffer, 1, bufferSize, file); + if (sizeCheck != bufferSize) + EXM_THROW(70, "Write error : cannot write decoded block : %s", + strerror(errno)); + return 0; + } + + /* avoid int overflow */ + if (storedSkips > 1 GB) { + if (LONG_SEEK(file, 1 GB, SEEK_CUR) != 0) + EXM_THROW(91, "1 GB skip error (sparse file support)"); + storedSkips -= 1 GB; + } + + while (ptrT < bufferTEnd) { + size_t nb0T; + + /* adjust last segment if < 32 KB */ + size_t seg0SizeT = segmentSizeT; + if (seg0SizeT > bufferSizeT) seg0SizeT = bufferSizeT; + bufferSizeT -= seg0SizeT; + + /* count leading zeroes */ + for (nb0T=0; (nb0T < seg0SizeT) && (ptrT[nb0T] == 0); nb0T++) ; + storedSkips += (unsigned)(nb0T * sizeof(size_t)); + + if (nb0T != seg0SizeT) { /* not all 0s */ + size_t const nbNon0ST = seg0SizeT - nb0T; + /* skip leading zeros */ + if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0) + EXM_THROW(92, "Sparse skip error ; try --no-sparse"); + storedSkips = 0; + /* write the rest */ + if (fwrite(ptrT + nb0T, sizeof(size_t), nbNon0ST, file) != nbNon0ST) + EXM_THROW(93, "Write error : cannot write decoded block : %s", + strerror(errno)); + } + ptrT += seg0SizeT; + } + + { static size_t const maskT = sizeof(size_t)-1; + if (bufferSize & maskT) { + /* size not multiple of sizeof(size_t) : implies end of block */ + const char* const restStart = (const char*)bufferTEnd; + const char* restPtr = restStart; + const char* const restEnd = (const char*)buffer + bufferSize; + assert(restEnd > restStart && restEnd < restStart + sizeof(size_t)); + for ( ; (restPtr < restEnd) && (*restPtr == 0); restPtr++) ; + storedSkips += (unsigned) (restPtr - restStart); + if (restPtr != restEnd) { + /* not all remaining bytes are 0 */ + size_t const restSize = (size_t)(restEnd - restPtr); + if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0) + EXM_THROW(92, "Sparse skip error ; try --no-sparse"); + if (fwrite(restPtr, 1, restSize, file) != restSize) + EXM_THROW(95, "Write error : cannot write end of decoded block : %s", + strerror(errno)); + storedSkips = 0; + } } } + + return storedSkips; +} + +void AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips) +{ + if (prefs->testMode) assert(storedSkips == 0); + if (storedSkips>0) { + assert(prefs->sparseFileSupport > 0); /* storedSkips>0 implies sparse support is enabled */ + (void)prefs; /* assert can be disabled, in which case prefs becomes unused */ + if (LONG_SEEK(file, storedSkips-1, SEEK_CUR) != 0) + EXM_THROW(69, "Final skip error (sparse file support)"); + /* last zero must be explicitly written, + * so that skipped ones get implicitly translated as zero by FS */ + { const char lastZeroByte[1] = { 0 }; + if (fwrite(lastZeroByte, 1, 1, file) != 1) + EXM_THROW(69, "Write error : cannot write last zero : %s", strerror(errno)); + } } +} + + +/* ********************************************************************** + * AsyncIO functionality + ************************************************************************/ + +/* *********************************** + * General IoPool implementation + *************************************/ + +static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) { + void *buffer; + IOJob_t *job; + job = (IOJob_t*) malloc(sizeof(IOJob_t)); + buffer = malloc(bufferSize); + if(!job || !buffer) + EXM_THROW(101, "Allocation error : not enough memory"); + job->buffer = buffer; + job->bufferSize = bufferSize; + job->usedBufferSize = 0; + job->file = NULL; + job->ctx = ctx; + job->offset = 0; + return job; +} + + +/* AIO_IOPool_createThreadPool: + * Creates a thread pool and a mutex for threaded IO pool. + * Displays warning if asyncio is requested but MT isn't available. */ +static void AIO_IOPool_createThreadPool(IOPoolCtx_t *ctx, const FIO_prefs_t *prefs) { + ctx->threadPool = NULL; + if(prefs->asyncIO) { + if (ZSTD_pthread_mutex_init(&ctx->ioJobsMutex, NULL)) + EXM_THROW(102,"Failed creating write availableJobs mutex"); + /* We want MAX_IO_JOBS-2 queue items because we need to always have 1 free buffer to + * decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */ + assert(MAX_IO_JOBS >= 2); + ctx->threadPool = POOL_create(1, MAX_IO_JOBS - 2); + if (!ctx->threadPool) + EXM_THROW(104, "Failed creating writer thread pool"); + } +} + +/* AIO_IOPool_init: + * Allocates and sets and a new write pool including its included availableJobs. */ +static void AIO_IOPool_init(IOPoolCtx_t *ctx, FIO_prefs_t* const prefs, POOL_function poolFunction, size_t bufferSize) { + int i; + AIO_IOPool_createThreadPool(ctx, prefs); + ctx->prefs = prefs; + ctx->poolFunction = poolFunction; + ctx->totalIoJobs = ctx->threadPool ? MAX_IO_JOBS : 1; + ctx->availableJobsCount = ctx->totalIoJobs; + for(i=0; i < ctx->availableJobsCount; i++) { + ctx->availableJobs[i] = AIO_IOPool_createIoJob(ctx, bufferSize); + } + ctx->file = NULL; +} + + +/* AIO_IOPool_releaseIoJob: + * Releases an acquired job back to the pool. Doesn't execute the job. */ +static void AIO_IOPool_releaseIoJob(IOJob_t *job) { + IOPoolCtx_t *ctx = (IOPoolCtx_t *) job->ctx; + if(ctx->threadPool) { + ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex); + assert(ctx->availableJobsCount < MAX_IO_JOBS); + ctx->availableJobs[ctx->availableJobsCount++] = job; + ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex); + } else { + assert(ctx->availableJobsCount == 0); + ctx->availableJobsCount++; + } +} + +/* AIO_IOPool_join: + * Waits for all tasks in the pool to finish executing. */ +static void AIO_IOPool_join(IOPoolCtx_t* ctx) { + if(ctx->threadPool) + POOL_joinJobs(ctx->threadPool); +} + +/* AIO_IOPool_free: + * Release a previously allocated write thread pool. Makes sure all takss are done and released. */ +static void AIO_IOPool_destroy(IOPoolCtx_t* ctx) { + int i; + if(ctx->threadPool) { + /* Make sure we finish all tasks and then free the resources */ + AIO_IOPool_join(ctx); + /* Make sure we are not leaking availableJobs */ + assert(ctx->availableJobsCount == ctx->totalIoJobs); + POOL_free(ctx->threadPool); + ZSTD_pthread_mutex_destroy(&ctx->ioJobsMutex); + } + assert(ctx->file == NULL); + for(i=0; iavailableJobsCount; i++) { + IOJob_t* job = (IOJob_t*) ctx->availableJobs[i]; + free(job->buffer); + free(job); + } +} + +/* AIO_IOPool_acquireJob: + * Returns an available io job to be used for a future io. */ +static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t *ctx) { + IOJob_t *job; + assert(ctx->file != NULL || ctx->prefs->testMode); + if(ctx->threadPool) { + ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex); + assert(ctx->availableJobsCount > 0); + job = (IOJob_t*) ctx->availableJobs[--ctx->availableJobsCount]; + ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex); + } else { + assert(ctx->availableJobsCount == 1); + ctx->availableJobsCount--; + job = (IOJob_t*)ctx->availableJobs[0]; + } + job->usedBufferSize = 0; + job->file = ctx->file; + job->offset = 0; + return job; +} + + +/* AIO_IOPool_setFile: + * Sets the destination file for future files in the pool. + * Requires completion of all queues write jobs and release of all otherwise acquired jobs. + * Also requires ending of sparse write if a previous file was used in sparse mode. */ +static void AIO_IOPool_setFile(IOPoolCtx_t *ctx, FILE* file) { + assert(ctx!=NULL); + AIO_IOPool_join(ctx); + assert(ctx->availableJobsCount == ctx->totalIoJobs); + ctx->file = file; +} + +static FILE* AIO_IOPool_getFile(IOPoolCtx_t *ctx) { + return ctx->file; +} + +/* AIO_IOPool_enqueueJob: + * Enqueues an io job for execution. + * The queued job shouldn't be used directly after queueing it. */ +static void AIO_IOPool_enqueueJob(IOJob_t *job) { + IOPoolCtx_t* ctx = (IOPoolCtx_t *)job->ctx; + if(ctx->threadPool) + POOL_add(ctx->threadPool, ctx->poolFunction, job); + else + ctx->poolFunction(job); +} + +/* *********************************** + * WritePool implementation + *************************************/ + +/* AIO_WritePool_acquireJob: + * Returns an available write job to be used for a future write. */ +IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t *ctx) { + return AIO_IOPool_acquireJob(&ctx->base); +} + +/* AIO_WritePool_enqueueAndReacquireWriteJob: + * Queues a write job for execution and acquires a new one. + * After execution `job`'s pointed value would change to the newly acquired job. + * Make sure to set `usedBufferSize` to the wanted length before call. + * The queued job shouldn't be used directly after queueing it. */ +void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job) { + AIO_IOPool_enqueueJob(*job); + *job = AIO_IOPool_acquireJob((IOPoolCtx_t *)(*job)->ctx); +} + +/* AIO_WritePool_sparseWriteEnd: + * Ends sparse writes to the current file. + * Blocks on completion of all current write jobs before executing. */ +void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t *ctx) { + assert(ctx != NULL); + if(ctx->base.threadPool) + POOL_joinJobs(ctx->base.threadPool); + AIO_fwriteSparseEnd(ctx->base.prefs, ctx->base.file, ctx->storedSkips); + ctx->storedSkips = 0; +} + +/* AIO_WritePool_setFile: + * Sets the destination file for future writes in the pool. + * Requires completion of all queues write jobs and release of all otherwise acquired jobs. + * Also requires ending of sparse write if a previous file was used in sparse mode. */ +void AIO_WritePool_setFile(WritePoolCtx_t *ctx, FILE* file) { + AIO_IOPool_setFile(&ctx->base, file); + assert(ctx->storedSkips == 0); +} + +/* AIO_WritePool_getFile: + * Returns the file the writePool is currently set to write to. */ +FILE* AIO_WritePool_getFile(WritePoolCtx_t *ctx) { + return AIO_IOPool_getFile(&ctx->base); +} + +/* AIO_WritePool_releaseIoJob: + * Releases an acquired job back to the pool. Doesn't execute the job. */ +void AIO_WritePool_releaseIoJob(IOJob_t *job) { + AIO_IOPool_releaseIoJob(job); +} + +/* AIO_WritePool_closeFile: + * Ends sparse write and closes the writePool's current file and sets the file to NULL. + * Requires completion of all queues write jobs and release of all otherwise acquired jobs. */ +int AIO_WritePool_closeFile(WritePoolCtx_t *ctx) { + FILE *dstFile = ctx->base.file; + assert(dstFile!=NULL || ctx->base.prefs->testMode!=0); + AIO_WritePool_sparseWriteEnd(ctx); + AIO_IOPool_setFile(&ctx->base, NULL); + return fclose(dstFile); +} + +/* AIO_WritePool_executeWriteJob: + * Executes a write job synchronously. Can be used as a function for a thread pool. */ +static void AIO_WritePool_executeWriteJob(void* opaque){ + IOJob_t* job = (IOJob_t*) opaque; + WritePoolCtx_t* ctx = (WritePoolCtx_t*) job->ctx; + ctx->storedSkips = AIO_fwriteSparse(job->file, job->buffer, job->usedBufferSize, ctx->base.prefs, ctx->storedSkips); + AIO_IOPool_releaseIoJob(job); +} + +/* AIO_WritePool_create: + * Allocates and sets and a new write pool including its included jobs. */ +WritePoolCtx_t* AIO_WritePool_create(FIO_prefs_t* const prefs, size_t bufferSize) { + WritePoolCtx_t* ctx = (WritePoolCtx_t*) malloc(sizeof(WritePoolCtx_t)); + if(!ctx) EXM_THROW(100, "Allocation error : not enough memory"); + AIO_IOPool_init(&ctx->base, prefs, AIO_WritePool_executeWriteJob, bufferSize); + ctx->storedSkips = 0; + return ctx; +} + +/* AIO_WritePool_free: + * Frees and releases a writePool and its resources. Closes destination file if needs to. */ +void AIO_WritePool_free(WritePoolCtx_t* ctx) { + /* Make sure we finish all tasks and then free the resources */ + if(AIO_WritePool_getFile(ctx)) + AIO_WritePool_closeFile(ctx); + AIO_IOPool_destroy(&ctx->base); + assert(ctx->storedSkips==0); + free(ctx); +} diff --git a/programs/fileio_asyncio.h b/programs/fileio_asyncio.h new file mode 100644 index 00000000..3e91164c --- /dev/null +++ b/programs/fileio_asyncio.h @@ -0,0 +1,120 @@ +/* + * Copyright (c) Yann Collet, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + * You may select, at your option, one of the above-listed licenses. + */ + +#ifndef ZSTD_FILEIO_ASYNCIO_H +#define ZSTD_FILEIO_ASYNCIO_H + +#if defined (__cplusplus) +extern "C" { +#endif + +#include "../lib/common/mem.h" /* U32, U64 */ +#include "fileio_types.h" +#include "platform.h" +#include "util.h" +#include "../lib/common/pool.h" +#include "../lib/common/threading.h" + +#define MAX_IO_JOBS (10) + +typedef struct { + /* These struct fields should be set only on creation and not changed afterwards */ + POOL_ctx* threadPool; + int totalIoJobs; + FIO_prefs_t* prefs; + POOL_function poolFunction; + + /* Controls the file we currently write to, make changes only by using provided utility functions */ + FILE* file; + + /* The jobs and availableJobsCount fields are accessed by both the main and worker threads and should + * only be mutated after locking the mutex */ + ZSTD_pthread_mutex_t ioJobsMutex; + void* availableJobs[MAX_IO_JOBS]; + int availableJobsCount; +} IOPoolCtx_t; + +typedef struct { + IOPoolCtx_t base; + unsigned storedSkips; +} WritePoolCtx_t; + +typedef struct { + /* These fields are automatically set and shouldn't be changed by non WritePool code. */ + void *ctx; + FILE* file; + void *buffer; + size_t bufferSize; + + /* This field should be changed before a job is queued for execution and should contain the number + * of bytes to write from the buffer. */ + size_t usedBufferSize; + U64 offset; +} IOJob_t; + +/** AIO_fwriteSparse() : +* @return : storedSkips, +* argument for next call to AIO_fwriteSparse() or AIO_fwriteSparseEnd() */ +unsigned AIO_fwriteSparse(FILE* file, + const void* buffer, size_t bufferSize, + const FIO_prefs_t* const prefs, + unsigned storedSkips); + +void AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips); + +/* AIO_WritePool_releaseIoJob: + * Releases an acquired job back to the pool. Doesn't execute the job. */ +void AIO_WritePool_releaseIoJob(IOJob_t *job); + +/* AIO_WritePool_acquireJob: + * Returns an available write job to be used for a future write. */ +IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t *ctx); + +/* AIO_WritePool_enqueueAndReacquireWriteJob: + * Enqueues a write job for execution and acquires a new one. + * After execution `job`'s pointed value would change to the newly acquired job. + * Make sure to set `usedBufferSize` to the wanted length before call. + * The queued job shouldn't be used directly after queueing it. */ +void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job); + +/* AIO_WritePool_sparseWriteEnd: + * Ends sparse writes to the current file. + * Blocks on completion of all current write jobs before executing. */ +void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t *ctx); + +/* AIO_WritePool_setFile: + * Sets the destination file for future writes in the pool. + * Requires completion of all queues write jobs and release of all otherwise acquired jobs. + * Also requires ending of sparse write if a previous file was used in sparse mode. */ +void AIO_WritePool_setFile(WritePoolCtx_t *ctx, FILE* file); + +/* AIO_WritePool_getFile: + * Returns the file the writePool is currently set to write to. */ +FILE* AIO_WritePool_getFile(WritePoolCtx_t *ctx); + +/* AIO_WritePool_closeFile: + * Ends sparse write and closes the writePool's current file and sets the file to NULL. + * Requires completion of all queues write jobs and release of all otherwise acquired jobs. */ +int AIO_WritePool_closeFile(WritePoolCtx_t *ctx); + +/* AIO_WritePool_create: + * Allocates and sets and a new write pool including its included jobs. + * bufferSize should be set to the maximal buffer we want to write to at a time. */ +WritePoolCtx_t* AIO_WritePool_create(FIO_prefs_t* const prefs, size_t bufferSize); + +/* AIO_WritePool_free: + * Frees and releases a writePool and its resources. Closes destination file. */ +void AIO_WritePool_free(WritePoolCtx_t* ctx); + +#if defined (__cplusplus) +} +#endif + +#endif /* ZSTD_FILEIO_ASYNCIO_H */ diff --git a/programs/fileio_common.h b/programs/fileio_common.h new file mode 100644 index 00000000..d33c19d7 --- /dev/null +++ b/programs/fileio_common.h @@ -0,0 +1,117 @@ +/* + * Copyright (c) Yann Collet, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + * You may select, at your option, one of the above-listed licenses. + */ + +#ifndef ZSTD_FILEIO_COMMON_H +#define ZSTD_FILEIO_COMMON_H + +#if defined (__cplusplus) +extern "C" { +#endif + +#include "../lib/common/mem.h" /* U32, U64 */ +#include "fileio_types.h" +#include "platform.h" +#include "timefn.h" /* UTIL_getTime, UTIL_clockSpanMicro */ + +/*-************************************* +* Macros +***************************************/ +#define KB *(1 <<10) +#define MB *(1 <<20) +#define GB *(1U<<30) +#undef MAX +#define MAX(a,b) ((a)>(b) ? (a) : (b)) + +extern FIO_display_prefs_t g_display_prefs; + +#define DISPLAY(...) fprintf(stderr, __VA_ARGS__) +#define DISPLAYOUT(...) fprintf(stdout, __VA_ARGS__) +#define DISPLAYLEVEL(l, ...) { if (g_display_prefs.displayLevel>=l) { DISPLAY(__VA_ARGS__); } } + +extern UTIL_time_t g_displayClock; + +#define REFRESH_RATE ((U64)(SEC_TO_MICRO / 6)) +#define READY_FOR_UPDATE() ((g_display_prefs.progressSetting != FIO_ps_never) && UTIL_clockSpanMicro(g_displayClock) > REFRESH_RATE) +#define DELAY_NEXT_UPDATE() { g_displayClock = UTIL_getTime(); } +#define DISPLAYUPDATE(l, ...) { \ + if (g_display_prefs.displayLevel>=l && (g_display_prefs.progressSetting != FIO_ps_never)) { \ + if (READY_FOR_UPDATE() || (g_display_prefs.displayLevel>=4)) { \ + DELAY_NEXT_UPDATE(); \ + DISPLAY(__VA_ARGS__); \ + if (g_display_prefs.displayLevel>=4) fflush(stderr); \ + } } } + +#undef MIN /* in case it would be already defined */ +#define MIN(a,b) ((a) < (b) ? (a) : (b)) + + +#define EXM_THROW(error, ...) \ +{ \ + DISPLAYLEVEL(1, "zstd: "); \ + DISPLAYLEVEL(5, "Error defined at %s, line %i : \n", __FILE__, __LINE__); \ + DISPLAYLEVEL(1, "error %i : ", error); \ + DISPLAYLEVEL(1, __VA_ARGS__); \ + DISPLAYLEVEL(1, " \n"); \ + exit(error); \ +} + +#define CHECK_V(v, f) \ + v = f; \ + if (ZSTD_isError(v)) { \ + DISPLAYLEVEL(5, "%s \n", #f); \ + EXM_THROW(11, "%s", ZSTD_getErrorName(v)); \ + } +#define CHECK(f) { size_t err; CHECK_V(err, f); } + + +/* Avoid fseek()'s 2GiB barrier with MSVC, macOS, *BSD, MinGW */ +#if defined(_MSC_VER) && _MSC_VER >= 1400 +# define LONG_SEEK _fseeki64 +# define LONG_TELL _ftelli64 +#elif !defined(__64BIT__) && (PLATFORM_POSIX_VERSION >= 200112L) /* No point defining Large file for 64 bit */ +# define LONG_SEEK fseeko +# define LONG_TELL ftello +#elif defined(__MINGW32__) && !defined(__STRICT_ANSI__) && !defined(__NO_MINGW_LFS) && defined(__MSVCRT__) +# define LONG_SEEK fseeko64 +# define LONG_TELL ftello64 +#elif defined(_WIN32) && !defined(__DJGPP__) +# include + static int LONG_SEEK(FILE* file, __int64 offset, int origin) { + LARGE_INTEGER off; + DWORD method; + off.QuadPart = offset; + if (origin == SEEK_END) + method = FILE_END; + else if (origin == SEEK_CUR) + method = FILE_CURRENT; + else + method = FILE_BEGIN; + + if (SetFilePointerEx((HANDLE) _get_osfhandle(_fileno(file)), off, NULL, method)) + return 0; + else + return -1; + } + static __int64 LONG_TELL(FILE* file) { + LARGE_INTEGER off, newOff; + off.QuadPart = 0; + newOff.QuadPart = 0; + SetFilePointerEx((HANDLE) _get_osfhandle(_fileno(file)), off, &newOff, FILE_CURRENT); + return newOff.QuadPart; + } +#else +# define LONG_SEEK fseek +# define LONG_TELL ftell +#endif + +#if defined (__cplusplus) +} +#endif +#endif //ZSTD_FILEIO_COMMON_H diff --git a/programs/fileio_types.h b/programs/fileio_types.h new file mode 100644 index 00000000..1909ab1a --- /dev/null +++ b/programs/fileio_types.h @@ -0,0 +1,73 @@ +/* + * Copyright (c) Yann Collet, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + * You may select, at your option, one of the above-listed licenses. + */ + +#ifndef FILEIO_TYPES_HEADER +#define FILEIO_TYPES_HEADER + +#define ZSTD_STATIC_LINKING_ONLY /* ZSTD_compressionParameters */ +#include "../lib/zstd.h" /* ZSTD_* */ + +/*-************************************* +* Parameters: FIO_prefs_t +***************************************/ + +typedef struct FIO_display_prefs_s FIO_display_prefs_t; + +typedef enum { FIO_ps_auto, FIO_ps_never, FIO_ps_always } FIO_progressSetting_e; + +struct FIO_display_prefs_s { + int displayLevel; /* 0 : no display; 1: errors; 2: + result + interaction + warnings; 3: + progression; 4: + information */ + FIO_progressSetting_e progressSetting; +}; + + +typedef enum { FIO_zstdCompression, FIO_gzipCompression, FIO_xzCompression, FIO_lzmaCompression, FIO_lz4Compression } FIO_compressionType_t; + +typedef struct FIO_prefs_s { + + /* Algorithm preferences */ + FIO_compressionType_t compressionType; + U32 sparseFileSupport; /* 0: no sparse allowed; 1: auto (file yes, stdout no); 2: force sparse */ + int dictIDFlag; + int checksumFlag; + int blockSize; + int overlapLog; + U32 adaptiveMode; + U32 useRowMatchFinder; + int rsyncable; + int minAdaptLevel; + int maxAdaptLevel; + int ldmFlag; + int ldmHashLog; + int ldmMinMatch; + int ldmBucketSizeLog; + int ldmHashRateLog; + size_t streamSrcSize; + size_t targetCBlockSize; + int srcSizeHint; + int testMode; + ZSTD_paramSwitch_e literalCompressionMode; + + /* IO preferences */ + U32 removeSrcFile; + U32 overwrite; + U32 asyncIO; + + /* Computation resources preferences */ + unsigned memLimit; + int nbWorkers; + + int excludeCompressedFiles; + int patchFromMode; + int contentSize; + int allowBlockDevices; +} FIO_prefs_t; + +#endif /* FILEIO_TYPES_HEADER */ \ No newline at end of file