AsyncIO compression part 1 - refactor of existing asyncio code (#3021)

* Refactored fileio.c:
- Extracted asyncio code to fileio_asyncio.c/.h
- Moved type definitions to fileio_types.h
- Moved common macro definitions needed by both fileio.c and fileio_asyncio.c to fileio_common.h

* Bugfix - rename fileio_asycio to fileio_asyncio

* Added copyrights & license to new files

* CR fixes
dev
Yonatan Komornik 2022-01-24 14:43:02 -08:00 committed by GitHub
parent 87f81d0796
commit 70df5de1b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 736 additions and 498 deletions

View File

@ -384,6 +384,10 @@
RelativePath="..\..\..\programs\fileio.c"
>
</File>
<File
RelativePath="..\..\..\programs\fileio_asyncio.c"
>
</File>
<File
RelativePath="..\..\..\lib\compress\fse_compress.c"
>

View File

@ -62,6 +62,7 @@
<ClCompile Include="..\..\..\programs\datagen.c" />
<ClCompile Include="..\..\..\programs\dibio.c" />
<ClCompile Include="..\..\..\programs\fileio.c" />
<ClCompile Include="..\..\..\programs\fileio_asyncio.c" />
<ClCompile Include="..\..\..\programs\zstdcli.c" />
<ClCompile Include="..\..\..\programs\zstdcli_trace.c" />
</ItemGroup>

View File

@ -32,7 +32,7 @@ if (MSVC)
set(PlatformDependResources ${MSVC_RESOURCE_DIR}/zstd.rc)
endif ()
add_executable(zstd ${PROGRAMS_DIR}/zstdcli.c ${PROGRAMS_DIR}/util.c ${PROGRAMS_DIR}/timefn.c ${PROGRAMS_DIR}/fileio.c ${PROGRAMS_DIR}/benchfn.c ${PROGRAMS_DIR}/benchzstd.c ${PROGRAMS_DIR}/datagen.c ${PROGRAMS_DIR}/dibio.c ${PROGRAMS_DIR}/zstdcli_trace.c ${PlatformDependResources})
add_executable(zstd ${PROGRAMS_DIR}/zstdcli.c ${PROGRAMS_DIR}/util.c ${PROGRAMS_DIR}/timefn.c ${PROGRAMS_DIR}/fileio.c ${PROGRAMS_DIR}/fileio_asyncio.c ${PROGRAMS_DIR}/benchfn.c ${PROGRAMS_DIR}/benchzstd.c ${PROGRAMS_DIR}/datagen.c ${PROGRAMS_DIR}/dibio.c ${PROGRAMS_DIR}/zstdcli_trace.c ${PlatformDependResources})
target_link_libraries(zstd ${PROGRAMS_ZSTD_LINK_TARGET})
if (CMAKE_SYSTEM_NAME MATCHES "(Solaris|SunOS)")
target_link_libraries(zstd rt)
@ -75,7 +75,7 @@ if (UNIX)
${CMAKE_CURRENT_BINARY_DIR}/zstdless.1
DESTINATION "${MAN_INSTALL_DIR}")
add_executable(zstd-frugal ${PROGRAMS_DIR}/zstdcli.c ${PROGRAMS_DIR}/util.c ${PROGRAMS_DIR}/timefn.c ${PROGRAMS_DIR}/fileio.c)
add_executable(zstd-frugal ${PROGRAMS_DIR}/zstdcli.c ${PROGRAMS_DIR}/util.c ${PROGRAMS_DIR}/timefn.c ${PROGRAMS_DIR}/fileio.c ${PROGRAMS_DIR}/fileio_asyncio.c)
target_link_libraries(zstd-frugal ${PROGRAMS_ZSTD_LINK_TARGET})
set_property(TARGET zstd-frugal APPEND PROPERTY COMPILE_DEFINITIONS "ZSTD_NOBENCH;ZSTD_NODICT;ZSTD_NOTRACE")
endif ()

View File

@ -14,6 +14,7 @@ zstd_programs_sources = [join_paths(zstd_rootdir, 'programs/zstdcli.c'),
join_paths(zstd_rootdir, 'programs/util.c'),
join_paths(zstd_rootdir, 'programs/timefn.c'),
join_paths(zstd_rootdir, 'programs/fileio.c'),
join_paths(zstd_rootdir, 'programs/fileio_asyncio.c'),
join_paths(zstd_rootdir, 'programs/benchfn.c'),
join_paths(zstd_rootdir, 'programs/benchzstd.c'),
join_paths(zstd_rootdir, 'programs/datagen.c'),
@ -80,6 +81,7 @@ zstd_frugal_sources = [join_paths(zstd_rootdir, 'programs/zstdcli.c'),
join_paths(zstd_rootdir, 'programs/timefn.c'),
join_paths(zstd_rootdir, 'programs/util.c'),
join_paths(zstd_rootdir, 'programs/fileio.c'),
join_paths(zstd_rootdir, 'programs/fileio_asyncio.c'),
join_paths(zstd_rootdir, 'lib/common/pool.c'),
join_paths(zstd_rootdir, 'lib/common/zstd_common.c'),
join_paths(zstd_rootdir, 'lib/common/error_private.c')]

View File

@ -363,6 +363,10 @@
RelativePath="..\..\..\programs\fileio.c"
>
</File>
<File
RelativePath="..\..\..\programs\fileio_asyncio.c"
>
</File>
<File
RelativePath="..\..\..\lib\compress\fse_compress.c"
>

View File

@ -243,17 +243,17 @@ zstd-pgo :
## zstd-small: minimal target, supporting only zstd compression and decompression. no bench. no legacy. no other format.
zstd-small: CFLAGS = -Os -s
zstd-frugal zstd-small: $(ZSTDLIB_CORE_SRC) zstdcli.c util.c timefn.c fileio.c
zstd-frugal zstd-small: $(ZSTDLIB_CORE_SRC) zstdcli.c util.c timefn.c fileio.c fileio_asyncio.c
$(CC) $(FLAGS) -DZSTD_NOBENCH -DZSTD_NODICT -DZSTD_NOTRACE -UZSTD_LEGACY_SUPPORT -DZSTD_LEGACY_SUPPORT=0 $^ -o $@$(EXT)
zstd-decompress: $(ZSTDLIB_COMMON_SRC) $(ZSTDLIB_DECOMPRESS_SRC) zstdcli.c util.c timefn.c fileio.c
zstd-decompress: $(ZSTDLIB_COMMON_SRC) $(ZSTDLIB_DECOMPRESS_SRC) zstdcli.c util.c timefn.c fileio.c fileio_asyncio.c
$(CC) $(FLAGS) -DZSTD_NOBENCH -DZSTD_NODICT -DZSTD_NOCOMPRESS -DZSTD_NOTRACE -UZSTD_LEGACY_SUPPORT -DZSTD_LEGACY_SUPPORT=0 $^ -o $@$(EXT)
zstd-compress: $(ZSTDLIB_COMMON_SRC) $(ZSTDLIB_COMPRESS_SRC) zstdcli.c util.c timefn.c fileio.c
zstd-compress: $(ZSTDLIB_COMMON_SRC) $(ZSTDLIB_COMPRESS_SRC) zstdcli.c util.c timefn.c fileio.c fileio_asyncio.c
$(CC) $(FLAGS) -DZSTD_NOBENCH -DZSTD_NODICT -DZSTD_NODECOMPRESS -DZSTD_NOTRACE -UZSTD_LEGACY_SUPPORT -DZSTD_LEGACY_SUPPORT=0 $^ -o $@$(EXT)
## zstd-dictBuilder: executable supporting dictionary creation and compression (only)
zstd-dictBuilder: $(ZSTDLIB_COMMON_SRC) $(ZSTDLIB_COMPRESS_SRC) $(ZDICT_SRC) zstdcli.c util.c timefn.c fileio.c dibio.c
zstd-dictBuilder: $(ZSTDLIB_COMMON_SRC) $(ZSTDLIB_COMPRESS_SRC) $(ZDICT_SRC) zstdcli.c util.c timefn.c fileio.c fileio_asyncio.c dibio.c
$(CC) $(FLAGS) -DZSTD_NOBENCH -DZSTD_NODECOMPRESS -DZSTD_NOTRACE $^ -o $@$(EXT)
zstdmt: zstd

View File

@ -34,16 +34,18 @@
#include <limits.h> /* INT_MAX */
#include <signal.h>
#include "timefn.h" /* UTIL_getTime, UTIL_clockSpanMicro */
#include "../lib/common/pool.h"
#include "../lib/common/threading.h"
#if defined (_MSC_VER)
# include <sys/stat.h>
# include <io.h>
#endif
#include "../lib/common/mem.h" /* U32, U64 */
#include "fileio.h"
#include "fileio_asyncio.h"
#include "fileio_common.h"
FIO_display_prefs_t g_display_prefs = {2, FIO_ps_auto};
UTIL_time_t g_displayClock = UTIL_TIME_INITIALIZER;
#define ZSTD_STATIC_LINKING_ONLY /* ZSTD_magicNumber, ZSTD_frameHeaderSize_max */
#include "../lib/zstd.h"
@ -84,62 +86,6 @@
#define DEFAULT_FILE_PERMISSIONS (0666)
#endif
/*-*************************************
* Macros
***************************************/
#define KB *(1 <<10)
#define MB *(1 <<20)
#define GB *(1U<<30)
#undef MAX
#define MAX(a,b) ((a)>(b) ? (a) : (b))
struct FIO_display_prefs_s {
int displayLevel; /* 0 : no display; 1: errors; 2: + result + interaction + warnings; 3: + progression; 4: + information */
FIO_progressSetting_e progressSetting;
};
static FIO_display_prefs_t g_display_prefs = {2, FIO_ps_auto};
#define DISPLAY(...) fprintf(stderr, __VA_ARGS__)
#define DISPLAYOUT(...) fprintf(stdout, __VA_ARGS__)
#define DISPLAYLEVEL(l, ...) { if (g_display_prefs.displayLevel>=l) { DISPLAY(__VA_ARGS__); } }
static const U64 g_refreshRate = SEC_TO_MICRO / 6;
static UTIL_time_t g_displayClock = UTIL_TIME_INITIALIZER;
#define READY_FOR_UPDATE() ((g_display_prefs.progressSetting != FIO_ps_never) && UTIL_clockSpanMicro(g_displayClock) > g_refreshRate)
#define DELAY_NEXT_UPDATE() { g_displayClock = UTIL_getTime(); }
#define DISPLAYUPDATE(l, ...) { \
if (g_display_prefs.displayLevel>=l && (g_display_prefs.progressSetting != FIO_ps_never)) { \
if (READY_FOR_UPDATE() || (g_display_prefs.displayLevel>=4)) { \
DELAY_NEXT_UPDATE(); \
DISPLAY(__VA_ARGS__); \
if (g_display_prefs.displayLevel>=4) fflush(stderr); \
} } }
#undef MIN /* in case it would be already defined */
#define MIN(a,b) ((a) < (b) ? (a) : (b))
#define EXM_THROW(error, ...) \
{ \
DISPLAYLEVEL(1, "zstd: "); \
DISPLAYLEVEL(5, "Error defined at %s, line %i : \n", __FILE__, __LINE__); \
DISPLAYLEVEL(1, "error %i : ", error); \
DISPLAYLEVEL(1, __VA_ARGS__); \
DISPLAYLEVEL(1, " \n"); \
exit(error); \
}
#define CHECK_V(v, f) \
v = f; \
if (ZSTD_isError(v)) { \
DISPLAYLEVEL(5, "%s \n", #f); \
EXM_THROW(11, "%s", ZSTD_getErrorName(v)); \
}
#define CHECK(f) { size_t err; CHECK_V(err, f); }
/*-************************************
* Signal (Ctrl-C trapping)
**************************************/
@ -250,95 +196,6 @@ void FIO_addAbortHandler()
#endif
}
/*-************************************************************
* Avoid fseek()'s 2GiB barrier with MSVC, macOS, *BSD, MinGW
***************************************************************/
#if defined(_MSC_VER) && _MSC_VER >= 1400
# define LONG_SEEK _fseeki64
# define LONG_TELL _ftelli64
#elif !defined(__64BIT__) && (PLATFORM_POSIX_VERSION >= 200112L) /* No point defining Large file for 64 bit */
# define LONG_SEEK fseeko
# define LONG_TELL ftello
#elif defined(__MINGW32__) && !defined(__STRICT_ANSI__) && !defined(__NO_MINGW_LFS) && defined(__MSVCRT__)
# define LONG_SEEK fseeko64
# define LONG_TELL ftello64
#elif defined(_WIN32) && !defined(__DJGPP__)
# include <windows.h>
static int LONG_SEEK(FILE* file, __int64 offset, int origin) {
LARGE_INTEGER off;
DWORD method;
off.QuadPart = offset;
if (origin == SEEK_END)
method = FILE_END;
else if (origin == SEEK_CUR)
method = FILE_CURRENT;
else
method = FILE_BEGIN;
if (SetFilePointerEx((HANDLE) _get_osfhandle(_fileno(file)), off, NULL, method))
return 0;
else
return -1;
}
static __int64 LONG_TELL(FILE* file) {
LARGE_INTEGER off, newOff;
off.QuadPart = 0;
newOff.QuadPart = 0;
SetFilePointerEx((HANDLE) _get_osfhandle(_fileno(file)), off, &newOff, FILE_CURRENT);
return newOff.QuadPart;
}
#else
# define LONG_SEEK fseek
# define LONG_TELL ftell
#endif
/*-*************************************
* Parameters: FIO_prefs_t
***************************************/
/* typedef'd to FIO_prefs_t within fileio.h */
struct FIO_prefs_s {
/* Algorithm preferences */
FIO_compressionType_t compressionType;
U32 sparseFileSupport; /* 0: no sparse allowed; 1: auto (file yes, stdout no); 2: force sparse */
int dictIDFlag;
int checksumFlag;
int blockSize;
int overlapLog;
U32 adaptiveMode;
U32 useRowMatchFinder;
int rsyncable;
int minAdaptLevel;
int maxAdaptLevel;
int ldmFlag;
int ldmHashLog;
int ldmMinMatch;
int ldmBucketSizeLog;
int ldmHashRateLog;
size_t streamSrcSize;
size_t targetCBlockSize;
int srcSizeHint;
int testMode;
ZSTD_paramSwitch_e literalCompressionMode;
/* IO preferences */
U32 removeSrcFile;
U32 overwrite;
U32 asyncIO;
/* Computation resources preferences */
unsigned memLimit;
int nbWorkers;
int excludeCompressedFiles;
int patchFromMode;
int contentSize;
int allowBlockDevices;
};
/*-*************************************
* Parameters: FIO_ctx_t
***************************************/
@ -563,7 +420,13 @@ void FIO_setContentSize(FIO_prefs_t* const prefs, int value)
}
void FIO_setAsyncIOFlag(FIO_prefs_t* const prefs, unsigned value) {
#ifdef ZSTD_MULTITHREAD
prefs->asyncIO = value;
#else
(void) prefs;
(void) value;
DISPLAYLEVEL(2, "Note : asyncio is disabled (lack of multithreading support) \n");
#endif
}
/* FIO_ctx_t functions */
@ -2019,124 +1882,15 @@ int FIO_compressMultipleFilenames(FIO_ctx_t* const fCtx,
/* **************************************************************************
* Decompression
***************************************************************************/
#define DECOMPRESSION_MAX_WRITE_JOBS (10)
typedef struct {
/* These struct fields should be set only on creation and not changed afterwards */
POOL_ctx* writerPool;
int totalWriteJobs;
FIO_prefs_t* prefs;
/* Controls the file we currently write to, make changes only by using provided utility functions */
FILE* dstFile;
unsigned storedSkips;
/* The jobs and availableWriteJobs fields are access by both the main and writer threads and should
* only be mutated after locking the mutex */
ZSTD_pthread_mutex_t writeJobsMutex;
void* jobs[DECOMPRESSION_MAX_WRITE_JOBS];
int availableWriteJobs;
} write_pool_ctx_t;
typedef struct {
/* These fields are automaically set and shouldn't be changed by non WritePool code. */
write_pool_ctx_t *ctx;
FILE* dstFile;
void *buffer;
size_t bufferSize;
/* This field should be changed before a job is queued for execution and should contain the number
* of bytes to write from the buffer. */
size_t usedBufferSize;
} write_job_t;
typedef struct {
void* srcBuffer;
size_t srcBufferSize;
size_t srcBufferLoaded;
ZSTD_DStream* dctx;
write_pool_ctx_t *writePoolCtx;
WritePoolCtx_t *writeCtx;
} dRess_t;
static write_job_t *FIO_createWriteJob(write_pool_ctx_t *ctx) {
void *buffer;
write_job_t *job;
job = (write_job_t*) malloc(sizeof(write_job_t));
buffer = malloc(ZSTD_DStreamOutSize());
if(!job || !buffer)
EXM_THROW(101, "Allocation error : not enough memory");
job->buffer = buffer;
job->bufferSize = ZSTD_DStreamOutSize();
job->usedBufferSize = 0;
job->dstFile = NULL;
job->ctx = ctx;
return job;
}
/* WritePool_createThreadPool:
* Creates a thread pool and a mutex for threaded write pool.
* Displays warning if asyncio is requested but MT isn't available. */
static void WritePool_createThreadPool(write_pool_ctx_t *ctx, const FIO_prefs_t *prefs) {
ctx->writerPool = NULL;
if(prefs->asyncIO) {
#ifdef ZSTD_MULTITHREAD
if (ZSTD_pthread_mutex_init(&ctx->writeJobsMutex, NULL))
EXM_THROW(102, "Failed creating write jobs mutex");
/* We want DECOMPRESSION_MAX_WRITE_JOBS-2 queue items because we need to always have 1 free buffer to
* decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */
assert(DECOMPRESSION_MAX_WRITE_JOBS >= 2);
ctx->writerPool = POOL_create(1, DECOMPRESSION_MAX_WRITE_JOBS - 2);
if (!ctx->writerPool)
EXM_THROW(103, "Failed creating writer thread pool");
#else
DISPLAYLEVEL(2, "Note : asyncio decompression is disabled (lack of multithreading support) \n");
#endif
}
}
/* WritePool_create:
* Allocates and sets and a new write pool including its included jobs. */
static write_pool_ctx_t* WritePool_create(FIO_prefs_t* const prefs) {
write_pool_ctx_t *ctx;
int i;
ctx = (write_pool_ctx_t*) malloc(sizeof(write_pool_ctx_t));
if(!ctx)
EXM_THROW(100, "Allocation error : not enough memory");
WritePool_createThreadPool(ctx, prefs);
ctx->prefs = prefs;
ctx->totalWriteJobs = ctx->writerPool ? DECOMPRESSION_MAX_WRITE_JOBS : 1;
ctx->availableWriteJobs = ctx->totalWriteJobs;
for(i=0; i < ctx->availableWriteJobs; i++) {
ctx->jobs[i] = FIO_createWriteJob(ctx);
}
ctx->storedSkips = 0;
ctx->dstFile = NULL;
return ctx;
}
/* WritePool_free:
* Release a previously allocated write thread pool. Makes sure all takss are done and released. */
static void WritePool_free(write_pool_ctx_t* ctx) {
int i=0;
if(ctx->writerPool) {
/* Make sure we finish all tasks and then free the resources */
POOL_joinJobs(ctx->writerPool);
/* Make sure we are not leaking jobs */
assert(ctx->availableWriteJobs==ctx->totalWriteJobs);
POOL_free(ctx->writerPool);
ZSTD_pthread_mutex_destroy(&ctx->writeJobsMutex);
}
assert(ctx->dstFile==NULL);
assert(ctx->storedSkips==0);
for(i=0; i<ctx->availableWriteJobs; i++) {
write_job_t* job = (write_job_t*) ctx->jobs[i];
free(job->buffer);
free(job);
}
free(ctx);
}
static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFileName)
{
dRess_t ress;
@ -2164,7 +1918,7 @@ static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFi
free(dictBuffer);
}
ress.writePoolCtx = WritePool_create(prefs);
ress.writeCtx = AIO_WritePool_create(prefs, ZSTD_DStreamOutSize());
return ress;
}
@ -2173,7 +1927,7 @@ static void FIO_freeDResources(dRess_t ress)
{
CHECK( ZSTD_freeDStream(ress.dctx) );
free(ress.srcBuffer);
WritePool_free(ress.writePoolCtx);
AIO_WritePool_free(ress.writeCtx);
}
/* FIO_consumeDSrcBuffer:
@ -2184,205 +1938,6 @@ static void FIO_consumeDSrcBuffer(dRess_t *ress, size_t len) {
memmove(ress->srcBuffer, (char *)ress->srcBuffer + len, ress->srcBufferLoaded);
}
/** FIO_fwriteSparse() :
* @return : storedSkips,
* argument for next call to FIO_fwriteSparse() or FIO_fwriteSparseEnd() */
static unsigned
FIO_fwriteSparse(FILE* file,
const void* buffer, size_t bufferSize,
const FIO_prefs_t* const prefs,
unsigned storedSkips)
{
const size_t* const bufferT = (const size_t*)buffer; /* Buffer is supposed malloc'ed, hence aligned on size_t */
size_t bufferSizeT = bufferSize / sizeof(size_t);
const size_t* const bufferTEnd = bufferT + bufferSizeT;
const size_t* ptrT = bufferT;
static const size_t segmentSizeT = (32 KB) / sizeof(size_t); /* check every 32 KB */
if (prefs->testMode) return 0; /* do not output anything in test mode */
if (!prefs->sparseFileSupport) { /* normal write */
size_t const sizeCheck = fwrite(buffer, 1, bufferSize, file);
if (sizeCheck != bufferSize)
EXM_THROW(70, "Write error : cannot write decoded block : %s",
strerror(errno));
return 0;
}
/* avoid int overflow */
if (storedSkips > 1 GB) {
if (LONG_SEEK(file, 1 GB, SEEK_CUR) != 0)
EXM_THROW(91, "1 GB skip error (sparse file support)");
storedSkips -= 1 GB;
}
while (ptrT < bufferTEnd) {
size_t nb0T;
/* adjust last segment if < 32 KB */
size_t seg0SizeT = segmentSizeT;
if (seg0SizeT > bufferSizeT) seg0SizeT = bufferSizeT;
bufferSizeT -= seg0SizeT;
/* count leading zeroes */
for (nb0T=0; (nb0T < seg0SizeT) && (ptrT[nb0T] == 0); nb0T++) ;
storedSkips += (unsigned)(nb0T * sizeof(size_t));
if (nb0T != seg0SizeT) { /* not all 0s */
size_t const nbNon0ST = seg0SizeT - nb0T;
/* skip leading zeros */
if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0)
EXM_THROW(92, "Sparse skip error ; try --no-sparse");
storedSkips = 0;
/* write the rest */
if (fwrite(ptrT + nb0T, sizeof(size_t), nbNon0ST, file) != nbNon0ST)
EXM_THROW(93, "Write error : cannot write decoded block : %s",
strerror(errno));
}
ptrT += seg0SizeT;
}
{ static size_t const maskT = sizeof(size_t)-1;
if (bufferSize & maskT) {
/* size not multiple of sizeof(size_t) : implies end of block */
const char* const restStart = (const char*)bufferTEnd;
const char* restPtr = restStart;
const char* const restEnd = (const char*)buffer + bufferSize;
assert(restEnd > restStart && restEnd < restStart + sizeof(size_t));
for ( ; (restPtr < restEnd) && (*restPtr == 0); restPtr++) ;
storedSkips += (unsigned) (restPtr - restStart);
if (restPtr != restEnd) {
/* not all remaining bytes are 0 */
size_t const restSize = (size_t)(restEnd - restPtr);
if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0)
EXM_THROW(92, "Sparse skip error ; try --no-sparse");
if (fwrite(restPtr, 1, restSize, file) != restSize)
EXM_THROW(95, "Write error : cannot write end of decoded block : %s",
strerror(errno));
storedSkips = 0;
} } }
return storedSkips;
}
static void
FIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips)
{
if (prefs->testMode) assert(storedSkips == 0);
if (storedSkips>0) {
assert(prefs->sparseFileSupport > 0); /* storedSkips>0 implies sparse support is enabled */
(void)prefs; /* assert can be disabled, in which case prefs becomes unused */
if (LONG_SEEK(file, storedSkips-1, SEEK_CUR) != 0)
EXM_THROW(69, "Final skip error (sparse file support)");
/* last zero must be explicitly written,
* so that skipped ones get implicitly translated as zero by FS */
{ const char lastZeroByte[1] = { 0 };
if (fwrite(lastZeroByte, 1, 1, file) != 1)
EXM_THROW(69, "Write error : cannot write last zero : %s", strerror(errno));
} }
}
/* WritePool_releaseWriteJob:
* Releases an acquired job back to the pool. Doesn't execute the job. */
static void WritePool_releaseWriteJob(write_job_t *job) {
write_pool_ctx_t *ctx = job->ctx;
if(ctx->writerPool) {
ZSTD_pthread_mutex_lock(&ctx->writeJobsMutex);
assert(ctx->availableWriteJobs < DECOMPRESSION_MAX_WRITE_JOBS);
ctx->jobs[ctx->availableWriteJobs++] = job;
ZSTD_pthread_mutex_unlock(&ctx->writeJobsMutex);
} else {
ctx->availableWriteJobs++;
}
}
/* WritePool_acquireWriteJob:
* Returns an available write job to be used for a future write. */
static write_job_t* WritePool_acquireWriteJob(write_pool_ctx_t *ctx) {
write_job_t *job;
assert(ctx->dstFile!=NULL || ctx->prefs->testMode);
if(ctx->writerPool) {
ZSTD_pthread_mutex_lock(&ctx->writeJobsMutex);
assert(ctx->availableWriteJobs > 0);
job = (write_job_t*) ctx->jobs[--ctx->availableWriteJobs];
ZSTD_pthread_mutex_unlock(&ctx->writeJobsMutex);
} else {
assert(ctx->availableWriteJobs==1);
ctx->availableWriteJobs--;
job = (write_job_t*)ctx->jobs[0];
}
job->usedBufferSize = 0;
job->dstFile = ctx->dstFile;
return job;
}
/* WritePool_executeWriteJob:
* Executes a write job synchronously. Can be used as a function for a thread pool. */
static void WritePool_executeWriteJob(void* opaque){
write_job_t* job = (write_job_t*) opaque;
write_pool_ctx_t* ctx = job->ctx;
ctx->storedSkips = FIO_fwriteSparse(job->dstFile, job->buffer, job->usedBufferSize, ctx->prefs, ctx->storedSkips);
WritePool_releaseWriteJob(job);
}
/* WritePool_queueWriteJob:
* Queues a write job for execution.
* Make sure to set `usedBufferSize` to the wanted length before call.
* The queued job shouldn't be used directly after queueing it. */
static void WritePool_queueWriteJob(write_job_t *job) {
write_pool_ctx_t* ctx = job->ctx;
if(ctx->writerPool)
POOL_add(ctx->writerPool, WritePool_executeWriteJob, job);
else
WritePool_executeWriteJob(job);
}
/* WritePool_queueAndReacquireWriteJob:
* Queues a write job for execution and acquires a new one.
* After execution `job`'s pointed value would change to the newly acquired job.
* Make sure to set `usedBufferSize` to the wanted length before call.
* The queued job shouldn't be used directly after queueing it. */
static void WritePool_queueAndReacquireWriteJob(write_job_t **job) {
WritePool_queueWriteJob(*job);
*job = WritePool_acquireWriteJob((*job)->ctx);
}
/* WritePool_sparseWriteEnd:
* Ends sparse writes to the current dstFile.
* Blocks on completion of all current write jobs before executing. */
static void WritePool_sparseWriteEnd(write_pool_ctx_t* ctx) {
assert(ctx != NULL);
if(ctx->writerPool)
POOL_joinJobs(ctx->writerPool);
FIO_fwriteSparseEnd(ctx->prefs, ctx->dstFile, ctx->storedSkips);
ctx->storedSkips = 0;
}
/* WritePool_setDstFile:
* Sets the destination file for future files in the pool.
* Requires completion of all queues write jobs and release of all otherwise acquired jobs.
* Also requires ending of sparse write if a previous file was used in sparse mode. */
static void WritePool_setDstFile(write_pool_ctx_t *ctx, FILE* dstFile) {
assert(ctx!=NULL);
/* We can change the dst file only if we have finished writing */
if(ctx->writerPool)
POOL_joinJobs(ctx->writerPool);
assert(ctx->storedSkips == 0);
assert(ctx->availableWriteJobs == ctx->totalWriteJobs);
ctx->dstFile = dstFile;
}
/* WritePool_closeDstFile:
* Ends sparse write and closes the writePool's current dstFile and sets the dstFile to NULL.
* Requires completion of all queues write jobs and release of all otherwise acquired jobs. */
static int WritePool_closeDstFile(write_pool_ctx_t *ctx) {
FILE *dstFile = ctx->dstFile;
assert(dstFile!=NULL || ctx->prefs->testMode!=0);
WritePool_sparseWriteEnd(ctx);
WritePool_setDstFile(ctx, NULL);
return fclose(dstFile);
}
/** FIO_passThrough() : just copy input into output, for compatibility with gzip -df mode
@return : 0 (no error) */
static int FIO_passThrough(const FIO_prefs_t* const prefs,
@ -2403,7 +1958,7 @@ static int FIO_passThrough(const FIO_prefs_t* const prefs,
do {
readFromInput = fread(buffer, 1, blockSize, finput);
storedSkips = FIO_fwriteSparse(foutput, buffer, readFromInput, prefs, storedSkips);
storedSkips = AIO_fwriteSparse(foutput, buffer, readFromInput, prefs, storedSkips);
} while (readFromInput == blockSize);
if (ferror(finput)) {
DISPLAYLEVEL(1, "Pass-through read error : %s\n", strerror(errno));
@ -2411,7 +1966,7 @@ static int FIO_passThrough(const FIO_prefs_t* const prefs,
}
assert(feof(finput));
FIO_fwriteSparseEnd(prefs, foutput, storedSkips);
AIO_fwriteSparseEnd(prefs, foutput, storedSkips);
return 0;
}
@ -2458,7 +2013,7 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput,
U64 alreadyDecoded) /* for multi-frames streams */
{
U64 frameSize = 0;
write_job_t *writeJob = WritePool_acquireWriteJob(ress->writePoolCtx);
IOJob_t *writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
/* display last 20 characters only */
{ size_t const srcFileLength = strlen(srcFileName);
@ -2486,12 +2041,13 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput,
DISPLAYLEVEL(1, "%s : Decoding error (36) : %s \n",
srcFileName, ZSTD_getErrorName(readSizeHint));
FIO_zstdErrorHelp(prefs, ress, readSizeHint, srcFileName);
AIO_WritePool_releaseIoJob(writeJob);
return FIO_ERROR_FRAME_DECODING;
}
/* Write block */
writeJob->usedBufferSize = outBuff.pos;
WritePool_queueAndReacquireWriteJob(&writeJob);
AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
frameSize += outBuff.pos;
if (fCtx->nbFilesTotal > 1) {
size_t srcFileNameSize = strlen(srcFileName);
@ -2526,8 +2082,8 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput,
ress->srcBufferLoaded += readSize;
} } }
WritePool_releaseWriteJob(writeJob);
WritePool_sparseWriteEnd(ress->writePoolCtx);
AIO_WritePool_releaseIoJob(writeJob);
AIO_WritePool_sparseWriteEnd(ress->writeCtx);
return frameSize;
}
@ -2541,7 +2097,7 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName)
z_stream strm;
int flush = Z_NO_FLUSH;
int decodingError = 0;
write_job_t *writeJob = NULL;
IOJob_t *writeJob = NULL;
strm.zalloc = Z_NULL;
strm.zfree = Z_NULL;
@ -2552,7 +2108,7 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName)
if (inflateInit2(&strm, 15 /* maxWindowLogSize */ + 16 /* gzip only */) != Z_OK)
return FIO_ERROR_FRAME_DECODING;
writeJob = WritePool_acquireWriteJob(ress->writePoolCtx);
writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
strm.next_out = (Bytef*)writeJob->buffer;
strm.avail_out = (uInt)writeJob->bufferSize;
strm.avail_in = (uInt)ress->srcBufferLoaded;
@ -2578,7 +2134,7 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName)
{ size_t const decompBytes = writeJob->bufferSize - strm.avail_out;
if (decompBytes) {
writeJob->usedBufferSize = decompBytes;
WritePool_queueAndReacquireWriteJob(&writeJob);
AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
outFileSize += decompBytes;
strm.next_out = (Bytef*)writeJob->buffer;
strm.avail_out = (uInt)writeJob->bufferSize;
@ -2594,8 +2150,8 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName)
DISPLAYLEVEL(1, "zstd: %s: inflateEnd error \n", srcFileName);
decodingError = 1;
}
WritePool_releaseWriteJob(writeJob);
WritePool_sparseWriteEnd(ress->writePoolCtx);
AIO_WritePool_releaseIoJob(writeJob);
AIO_WritePool_sparseWriteEnd(ress->writeCtx);
return decodingError ? FIO_ERROR_FRAME_DECODING : outFileSize;
}
#endif
@ -2610,7 +2166,7 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile,
lzma_action action = LZMA_RUN;
lzma_ret initRet;
int decodingError = 0;
write_job_t *writeJob = NULL;
IOJob_t *writeJob = NULL;
strm.next_in = 0;
strm.avail_in = 0;
@ -2627,7 +2183,7 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile,
return FIO_ERROR_FRAME_DECODING;
}
writeJob = WritePool_acquireWriteJob(ress->writePoolCtx);
writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
strm.next_out = (Bytef*)writeJob->buffer;
strm.avail_out = (uInt)writeJob->bufferSize;
strm.next_in = (BYTE const*)ress->srcBuffer;
@ -2655,7 +2211,7 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile,
{ size_t const decompBytes = writeJob->bufferSize - strm.avail_out;
if (decompBytes) {
writeJob->usedBufferSize = decompBytes;
WritePool_queueAndReacquireWriteJob(&writeJob);
AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
outFileSize += decompBytes;
strm.next_out = (Bytef*)writeJob->buffer;
strm.avail_out = writeJob->bufferSize;
@ -2665,8 +2221,8 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile,
FIO_consumeDSrcBuffer(ress, ress->srcBufferLoaded - strm.avail_in);
lzma_end(&strm);
WritePool_releaseWriteJob(writeJob);
WritePool_sparseWriteEnd(ress->writePoolCtx);
AIO_WritePool_releaseIoJob(writeJob);
AIO_WritePool_sparseWriteEnd(ress->writeCtx);
return decodingError ? FIO_ERROR_FRAME_DECODING : outFileSize;
}
#endif
@ -2681,13 +2237,15 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile,
LZ4F_decompressionContext_t dCtx;
LZ4F_errorCode_t const errorCode = LZ4F_createDecompressionContext(&dCtx, LZ4F_VERSION);
int decodingError = 0;
write_job_t *writeJob = WritePool_acquireWriteJob(ress->writePoolCtx);
IOJob_t *writeJob = NULL;
if (LZ4F_isError(errorCode)) {
DISPLAYLEVEL(1, "zstd: failed to create lz4 decompression context \n");
return FIO_ERROR_FRAME_DECODING;
}
writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
/* Main Loop */
for (;nextToLoad;) {
size_t readSize;
@ -2724,7 +2282,7 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile,
if (decodedBytes) {
UTIL_HumanReadableSize_t hrs;
writeJob->usedBufferSize = decodedBytes;
WritePool_queueAndReacquireWriteJob(&writeJob);
AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
filesize += decodedBytes;
hrs = UTIL_makeHumanReadableSize(filesize);
DISPLAYUPDATE(2, "\rDecompressed : %.*f%s ", hrs.precision, hrs.value, hrs.suffix);
@ -2740,8 +2298,8 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile,
}
LZ4F_freeDecompressionContext(dCtx);
WritePool_releaseWriteJob(writeJob);
WritePool_sparseWriteEnd(ress->writePoolCtx);
AIO_WritePool_releaseIoJob(writeJob);
AIO_WritePool_sparseWriteEnd(ress->writeCtx);
return decodingError ? FIO_ERROR_FRAME_DECODING : filesize;
}
@ -2818,7 +2376,7 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx,
#endif
} else if ((prefs->overwrite) && !strcmp (dstFileName, stdoutmark)) { /* pass-through mode */
return FIO_passThrough(prefs,
ress.writePoolCtx->dstFile, srcFile,
AIO_WritePool_getFile(ress.writeCtx), srcFile,
ress.srcBuffer, ress.srcBufferSize,
ress.srcBufferLoaded);
} else {
@ -2856,7 +2414,7 @@ static int FIO_decompressDstFile(FIO_ctx_t* const fCtx,
int releaseDstFile = 0;
int transferMTime = 0;
if ((ress.writePoolCtx->dstFile == NULL) && (prefs->testMode==0)) {
if ((AIO_WritePool_getFile(ress.writeCtx) == NULL) && (prefs->testMode == 0)) {
FILE *dstFile;
int dstFilePermissions = DEFAULT_FILE_PERMISSIONS;
if ( strcmp(srcFileName, stdinmark) /* special case : don't transfer permissions from stdin */
@ -2871,7 +2429,7 @@ static int FIO_decompressDstFile(FIO_ctx_t* const fCtx,
dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFilePermissions);
if (dstFile==NULL) return 1;
WritePool_setDstFile(ress.writePoolCtx, dstFile);
AIO_WritePool_setFile(ress.writeCtx, dstFile);
/* Must only be added after FIO_openDstFile() succeeds.
* Otherwise we may delete the destination file if it already exists,
@ -2884,7 +2442,7 @@ static int FIO_decompressDstFile(FIO_ctx_t* const fCtx,
if (releaseDstFile) {
clearHandler();
if (WritePool_closeDstFile(ress.writePoolCtx)) {
if (AIO_WritePool_closeFile(ress.writeCtx)) {
DISPLAYLEVEL(1, "zstd: %s: %s \n", dstFileName, strerror(errno));
result = 1;
}
@ -3100,14 +2658,14 @@ FIO_decompressMultipleFilenames(FIO_ctx_t* const fCtx,
if (!prefs->testMode) {
FILE* dstFile = FIO_openDstFile(fCtx, prefs, NULL, outFileName, DEFAULT_FILE_PERMISSIONS);
if (dstFile == 0) EXM_THROW(19, "cannot open %s", outFileName);
WritePool_setDstFile(ress.writePoolCtx, dstFile);
AIO_WritePool_setFile(ress.writeCtx, dstFile);
}
for (; fCtx->currFileIdx < fCtx->nbFilesTotal; fCtx->currFileIdx++) {
status = FIO_decompressSrcFile(fCtx, prefs, ress, outFileName, srcNamesTable[fCtx->currFileIdx]);
if (!status) fCtx->nbFilesProcessed++;
error |= status;
}
if ((!prefs->testMode) && (WritePool_closeDstFile(ress.writePoolCtx)))
if ((!prefs->testMode) && (AIO_WritePool_closeFile(ress.writeCtx)))
EXM_THROW(72, "Write error : %s : cannot properly close output file",
strerror(errno));
} else {

View File

@ -13,6 +13,7 @@
#define FILEIO_H_23981798732
#define ZSTD_STATIC_LINKING_ONLY /* ZSTD_compressionParameters */
#include "fileio_types.h"
#include "../lib/zstd.h" /* ZSTD_* */
#if defined (__cplusplus)
@ -53,10 +54,6 @@ extern "C" {
/*-*************************************
* Types
***************************************/
typedef enum { FIO_zstdCompression, FIO_gzipCompression, FIO_xzCompression, FIO_lzmaCompression, FIO_lz4Compression } FIO_compressionType_t;
typedef struct FIO_prefs_s FIO_prefs_t;
FIO_prefs_t* FIO_createPreferences(void);
void FIO_freePreferences(FIO_prefs_t* const prefs);
@ -66,9 +63,6 @@ typedef struct FIO_ctx_s FIO_ctx_t;
FIO_ctx_t* FIO_createContext(void);
void FIO_freeContext(FIO_ctx_t* const fCtx);
typedef struct FIO_display_prefs_s FIO_display_prefs_t;
typedef enum { FIO_ps_auto, FIO_ps_never, FIO_ps_always } FIO_progressSetting_e;
/*-*************************************
* Parameters

365
programs/fileio_asyncio.c Normal file
View File

@ -0,0 +1,365 @@
/*
* Copyright (c) Yann Collet, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under both the BSD-style license (found in the
* LICENSE file in the root directory of this source tree) and the GPLv2 (found
* in the COPYING file in the root directory of this source tree).
* You may select, at your option, one of the above-listed licenses.
*/
#include "platform.h"
#include <stdio.h> /* fprintf, open, fdopen, fread, _fileno, stdin, stdout */
#include <stdlib.h> /* malloc, free */
#include <assert.h>
#include <errno.h> /* errno */
#if defined (_MSC_VER)
# include <sys/stat.h>
# include <io.h>
#endif
#include "fileio_asyncio.h"
#include "fileio_common.h"
/* **********************************************************************
* Sparse write
************************************************************************/
/** AIO_fwriteSparse() :
* @return : storedSkips,
* argument for next call to AIO_fwriteSparse() or AIO_fwriteSparseEnd() */
unsigned AIO_fwriteSparse(FILE* file,
const void* buffer, size_t bufferSize,
const FIO_prefs_t* const prefs,
unsigned storedSkips)
{
const size_t* const bufferT = (const size_t*)buffer; /* Buffer is supposed malloc'ed, hence aligned on size_t */
size_t bufferSizeT = bufferSize / sizeof(size_t);
const size_t* const bufferTEnd = bufferT + bufferSizeT;
const size_t* ptrT = bufferT;
static const size_t segmentSizeT = (32 KB) / sizeof(size_t); /* check every 32 KB */
if (prefs->testMode) return 0; /* do not output anything in test mode */
if (!prefs->sparseFileSupport) { /* normal write */
size_t const sizeCheck = fwrite(buffer, 1, bufferSize, file);
if (sizeCheck != bufferSize)
EXM_THROW(70, "Write error : cannot write decoded block : %s",
strerror(errno));
return 0;
}
/* avoid int overflow */
if (storedSkips > 1 GB) {
if (LONG_SEEK(file, 1 GB, SEEK_CUR) != 0)
EXM_THROW(91, "1 GB skip error (sparse file support)");
storedSkips -= 1 GB;
}
while (ptrT < bufferTEnd) {
size_t nb0T;
/* adjust last segment if < 32 KB */
size_t seg0SizeT = segmentSizeT;
if (seg0SizeT > bufferSizeT) seg0SizeT = bufferSizeT;
bufferSizeT -= seg0SizeT;
/* count leading zeroes */
for (nb0T=0; (nb0T < seg0SizeT) && (ptrT[nb0T] == 0); nb0T++) ;
storedSkips += (unsigned)(nb0T * sizeof(size_t));
if (nb0T != seg0SizeT) { /* not all 0s */
size_t const nbNon0ST = seg0SizeT - nb0T;
/* skip leading zeros */
if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0)
EXM_THROW(92, "Sparse skip error ; try --no-sparse");
storedSkips = 0;
/* write the rest */
if (fwrite(ptrT + nb0T, sizeof(size_t), nbNon0ST, file) != nbNon0ST)
EXM_THROW(93, "Write error : cannot write decoded block : %s",
strerror(errno));
}
ptrT += seg0SizeT;
}
{ static size_t const maskT = sizeof(size_t)-1;
if (bufferSize & maskT) {
/* size not multiple of sizeof(size_t) : implies end of block */
const char* const restStart = (const char*)bufferTEnd;
const char* restPtr = restStart;
const char* const restEnd = (const char*)buffer + bufferSize;
assert(restEnd > restStart && restEnd < restStart + sizeof(size_t));
for ( ; (restPtr < restEnd) && (*restPtr == 0); restPtr++) ;
storedSkips += (unsigned) (restPtr - restStart);
if (restPtr != restEnd) {
/* not all remaining bytes are 0 */
size_t const restSize = (size_t)(restEnd - restPtr);
if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0)
EXM_THROW(92, "Sparse skip error ; try --no-sparse");
if (fwrite(restPtr, 1, restSize, file) != restSize)
EXM_THROW(95, "Write error : cannot write end of decoded block : %s",
strerror(errno));
storedSkips = 0;
} } }
return storedSkips;
}
void AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips)
{
if (prefs->testMode) assert(storedSkips == 0);
if (storedSkips>0) {
assert(prefs->sparseFileSupport > 0); /* storedSkips>0 implies sparse support is enabled */
(void)prefs; /* assert can be disabled, in which case prefs becomes unused */
if (LONG_SEEK(file, storedSkips-1, SEEK_CUR) != 0)
EXM_THROW(69, "Final skip error (sparse file support)");
/* last zero must be explicitly written,
* so that skipped ones get implicitly translated as zero by FS */
{ const char lastZeroByte[1] = { 0 };
if (fwrite(lastZeroByte, 1, 1, file) != 1)
EXM_THROW(69, "Write error : cannot write last zero : %s", strerror(errno));
} }
}
/* **********************************************************************
* AsyncIO functionality
************************************************************************/
/* ***********************************
* General IoPool implementation
*************************************/
static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) {
void *buffer;
IOJob_t *job;
job = (IOJob_t*) malloc(sizeof(IOJob_t));
buffer = malloc(bufferSize);
if(!job || !buffer)
EXM_THROW(101, "Allocation error : not enough memory");
job->buffer = buffer;
job->bufferSize = bufferSize;
job->usedBufferSize = 0;
job->file = NULL;
job->ctx = ctx;
job->offset = 0;
return job;
}
/* AIO_IOPool_createThreadPool:
* Creates a thread pool and a mutex for threaded IO pool.
* Displays warning if asyncio is requested but MT isn't available. */
static void AIO_IOPool_createThreadPool(IOPoolCtx_t *ctx, const FIO_prefs_t *prefs) {
ctx->threadPool = NULL;
if(prefs->asyncIO) {
if (ZSTD_pthread_mutex_init(&ctx->ioJobsMutex, NULL))
EXM_THROW(102,"Failed creating write availableJobs mutex");
/* We want MAX_IO_JOBS-2 queue items because we need to always have 1 free buffer to
* decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */
assert(MAX_IO_JOBS >= 2);
ctx->threadPool = POOL_create(1, MAX_IO_JOBS - 2);
if (!ctx->threadPool)
EXM_THROW(104, "Failed creating writer thread pool");
}
}
/* AIO_IOPool_init:
* Allocates and sets and a new write pool including its included availableJobs. */
static void AIO_IOPool_init(IOPoolCtx_t *ctx, FIO_prefs_t* const prefs, POOL_function poolFunction, size_t bufferSize) {
int i;
AIO_IOPool_createThreadPool(ctx, prefs);
ctx->prefs = prefs;
ctx->poolFunction = poolFunction;
ctx->totalIoJobs = ctx->threadPool ? MAX_IO_JOBS : 1;
ctx->availableJobsCount = ctx->totalIoJobs;
for(i=0; i < ctx->availableJobsCount; i++) {
ctx->availableJobs[i] = AIO_IOPool_createIoJob(ctx, bufferSize);
}
ctx->file = NULL;
}
/* AIO_IOPool_releaseIoJob:
* Releases an acquired job back to the pool. Doesn't execute the job. */
static void AIO_IOPool_releaseIoJob(IOJob_t *job) {
IOPoolCtx_t *ctx = (IOPoolCtx_t *) job->ctx;
if(ctx->threadPool) {
ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
assert(ctx->availableJobsCount < MAX_IO_JOBS);
ctx->availableJobs[ctx->availableJobsCount++] = job;
ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
} else {
assert(ctx->availableJobsCount == 0);
ctx->availableJobsCount++;
}
}
/* AIO_IOPool_join:
* Waits for all tasks in the pool to finish executing. */
static void AIO_IOPool_join(IOPoolCtx_t* ctx) {
if(ctx->threadPool)
POOL_joinJobs(ctx->threadPool);
}
/* AIO_IOPool_free:
* Release a previously allocated write thread pool. Makes sure all takss are done and released. */
static void AIO_IOPool_destroy(IOPoolCtx_t* ctx) {
int i;
if(ctx->threadPool) {
/* Make sure we finish all tasks and then free the resources */
AIO_IOPool_join(ctx);
/* Make sure we are not leaking availableJobs */
assert(ctx->availableJobsCount == ctx->totalIoJobs);
POOL_free(ctx->threadPool);
ZSTD_pthread_mutex_destroy(&ctx->ioJobsMutex);
}
assert(ctx->file == NULL);
for(i=0; i<ctx->availableJobsCount; i++) {
IOJob_t* job = (IOJob_t*) ctx->availableJobs[i];
free(job->buffer);
free(job);
}
}
/* AIO_IOPool_acquireJob:
* Returns an available io job to be used for a future io. */
static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t *ctx) {
IOJob_t *job;
assert(ctx->file != NULL || ctx->prefs->testMode);
if(ctx->threadPool) {
ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
assert(ctx->availableJobsCount > 0);
job = (IOJob_t*) ctx->availableJobs[--ctx->availableJobsCount];
ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
} else {
assert(ctx->availableJobsCount == 1);
ctx->availableJobsCount--;
job = (IOJob_t*)ctx->availableJobs[0];
}
job->usedBufferSize = 0;
job->file = ctx->file;
job->offset = 0;
return job;
}
/* AIO_IOPool_setFile:
* Sets the destination file for future files in the pool.
* Requires completion of all queues write jobs and release of all otherwise acquired jobs.
* Also requires ending of sparse write if a previous file was used in sparse mode. */
static void AIO_IOPool_setFile(IOPoolCtx_t *ctx, FILE* file) {
assert(ctx!=NULL);
AIO_IOPool_join(ctx);
assert(ctx->availableJobsCount == ctx->totalIoJobs);
ctx->file = file;
}
static FILE* AIO_IOPool_getFile(IOPoolCtx_t *ctx) {
return ctx->file;
}
/* AIO_IOPool_enqueueJob:
* Enqueues an io job for execution.
* The queued job shouldn't be used directly after queueing it. */
static void AIO_IOPool_enqueueJob(IOJob_t *job) {
IOPoolCtx_t* ctx = (IOPoolCtx_t *)job->ctx;
if(ctx->threadPool)
POOL_add(ctx->threadPool, ctx->poolFunction, job);
else
ctx->poolFunction(job);
}
/* ***********************************
* WritePool implementation
*************************************/
/* AIO_WritePool_acquireJob:
* Returns an available write job to be used for a future write. */
IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t *ctx) {
return AIO_IOPool_acquireJob(&ctx->base);
}
/* AIO_WritePool_enqueueAndReacquireWriteJob:
* Queues a write job for execution and acquires a new one.
* After execution `job`'s pointed value would change to the newly acquired job.
* Make sure to set `usedBufferSize` to the wanted length before call.
* The queued job shouldn't be used directly after queueing it. */
void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job) {
AIO_IOPool_enqueueJob(*job);
*job = AIO_IOPool_acquireJob((IOPoolCtx_t *)(*job)->ctx);
}
/* AIO_WritePool_sparseWriteEnd:
* Ends sparse writes to the current file.
* Blocks on completion of all current write jobs before executing. */
void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t *ctx) {
assert(ctx != NULL);
if(ctx->base.threadPool)
POOL_joinJobs(ctx->base.threadPool);
AIO_fwriteSparseEnd(ctx->base.prefs, ctx->base.file, ctx->storedSkips);
ctx->storedSkips = 0;
}
/* AIO_WritePool_setFile:
* Sets the destination file for future writes in the pool.
* Requires completion of all queues write jobs and release of all otherwise acquired jobs.
* Also requires ending of sparse write if a previous file was used in sparse mode. */
void AIO_WritePool_setFile(WritePoolCtx_t *ctx, FILE* file) {
AIO_IOPool_setFile(&ctx->base, file);
assert(ctx->storedSkips == 0);
}
/* AIO_WritePool_getFile:
* Returns the file the writePool is currently set to write to. */
FILE* AIO_WritePool_getFile(WritePoolCtx_t *ctx) {
return AIO_IOPool_getFile(&ctx->base);
}
/* AIO_WritePool_releaseIoJob:
* Releases an acquired job back to the pool. Doesn't execute the job. */
void AIO_WritePool_releaseIoJob(IOJob_t *job) {
AIO_IOPool_releaseIoJob(job);
}
/* AIO_WritePool_closeFile:
* Ends sparse write and closes the writePool's current file and sets the file to NULL.
* Requires completion of all queues write jobs and release of all otherwise acquired jobs. */
int AIO_WritePool_closeFile(WritePoolCtx_t *ctx) {
FILE *dstFile = ctx->base.file;
assert(dstFile!=NULL || ctx->base.prefs->testMode!=0);
AIO_WritePool_sparseWriteEnd(ctx);
AIO_IOPool_setFile(&ctx->base, NULL);
return fclose(dstFile);
}
/* AIO_WritePool_executeWriteJob:
* Executes a write job synchronously. Can be used as a function for a thread pool. */
static void AIO_WritePool_executeWriteJob(void* opaque){
IOJob_t* job = (IOJob_t*) opaque;
WritePoolCtx_t* ctx = (WritePoolCtx_t*) job->ctx;
ctx->storedSkips = AIO_fwriteSparse(job->file, job->buffer, job->usedBufferSize, ctx->base.prefs, ctx->storedSkips);
AIO_IOPool_releaseIoJob(job);
}
/* AIO_WritePool_create:
* Allocates and sets and a new write pool including its included jobs. */
WritePoolCtx_t* AIO_WritePool_create(FIO_prefs_t* const prefs, size_t bufferSize) {
WritePoolCtx_t* ctx = (WritePoolCtx_t*) malloc(sizeof(WritePoolCtx_t));
if(!ctx) EXM_THROW(100, "Allocation error : not enough memory");
AIO_IOPool_init(&ctx->base, prefs, AIO_WritePool_executeWriteJob, bufferSize);
ctx->storedSkips = 0;
return ctx;
}
/* AIO_WritePool_free:
* Frees and releases a writePool and its resources. Closes destination file if needs to. */
void AIO_WritePool_free(WritePoolCtx_t* ctx) {
/* Make sure we finish all tasks and then free the resources */
if(AIO_WritePool_getFile(ctx))
AIO_WritePool_closeFile(ctx);
AIO_IOPool_destroy(&ctx->base);
assert(ctx->storedSkips==0);
free(ctx);
}

120
programs/fileio_asyncio.h Normal file
View File

@ -0,0 +1,120 @@
/*
* Copyright (c) Yann Collet, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under both the BSD-style license (found in the
* LICENSE file in the root directory of this source tree) and the GPLv2 (found
* in the COPYING file in the root directory of this source tree).
* You may select, at your option, one of the above-listed licenses.
*/
#ifndef ZSTD_FILEIO_ASYNCIO_H
#define ZSTD_FILEIO_ASYNCIO_H
#if defined (__cplusplus)
extern "C" {
#endif
#include "../lib/common/mem.h" /* U32, U64 */
#include "fileio_types.h"
#include "platform.h"
#include "util.h"
#include "../lib/common/pool.h"
#include "../lib/common/threading.h"
#define MAX_IO_JOBS (10)
typedef struct {
/* These struct fields should be set only on creation and not changed afterwards */
POOL_ctx* threadPool;
int totalIoJobs;
FIO_prefs_t* prefs;
POOL_function poolFunction;
/* Controls the file we currently write to, make changes only by using provided utility functions */
FILE* file;
/* The jobs and availableJobsCount fields are accessed by both the main and worker threads and should
* only be mutated after locking the mutex */
ZSTD_pthread_mutex_t ioJobsMutex;
void* availableJobs[MAX_IO_JOBS];
int availableJobsCount;
} IOPoolCtx_t;
typedef struct {
IOPoolCtx_t base;
unsigned storedSkips;
} WritePoolCtx_t;
typedef struct {
/* These fields are automatically set and shouldn't be changed by non WritePool code. */
void *ctx;
FILE* file;
void *buffer;
size_t bufferSize;
/* This field should be changed before a job is queued for execution and should contain the number
* of bytes to write from the buffer. */
size_t usedBufferSize;
U64 offset;
} IOJob_t;
/** AIO_fwriteSparse() :
* @return : storedSkips,
* argument for next call to AIO_fwriteSparse() or AIO_fwriteSparseEnd() */
unsigned AIO_fwriteSparse(FILE* file,
const void* buffer, size_t bufferSize,
const FIO_prefs_t* const prefs,
unsigned storedSkips);
void AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips);
/* AIO_WritePool_releaseIoJob:
* Releases an acquired job back to the pool. Doesn't execute the job. */
void AIO_WritePool_releaseIoJob(IOJob_t *job);
/* AIO_WritePool_acquireJob:
* Returns an available write job to be used for a future write. */
IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t *ctx);
/* AIO_WritePool_enqueueAndReacquireWriteJob:
* Enqueues a write job for execution and acquires a new one.
* After execution `job`'s pointed value would change to the newly acquired job.
* Make sure to set `usedBufferSize` to the wanted length before call.
* The queued job shouldn't be used directly after queueing it. */
void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job);
/* AIO_WritePool_sparseWriteEnd:
* Ends sparse writes to the current file.
* Blocks on completion of all current write jobs before executing. */
void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t *ctx);
/* AIO_WritePool_setFile:
* Sets the destination file for future writes in the pool.
* Requires completion of all queues write jobs and release of all otherwise acquired jobs.
* Also requires ending of sparse write if a previous file was used in sparse mode. */
void AIO_WritePool_setFile(WritePoolCtx_t *ctx, FILE* file);
/* AIO_WritePool_getFile:
* Returns the file the writePool is currently set to write to. */
FILE* AIO_WritePool_getFile(WritePoolCtx_t *ctx);
/* AIO_WritePool_closeFile:
* Ends sparse write and closes the writePool's current file and sets the file to NULL.
* Requires completion of all queues write jobs and release of all otherwise acquired jobs. */
int AIO_WritePool_closeFile(WritePoolCtx_t *ctx);
/* AIO_WritePool_create:
* Allocates and sets and a new write pool including its included jobs.
* bufferSize should be set to the maximal buffer we want to write to at a time. */
WritePoolCtx_t* AIO_WritePool_create(FIO_prefs_t* const prefs, size_t bufferSize);
/* AIO_WritePool_free:
* Frees and releases a writePool and its resources. Closes destination file. */
void AIO_WritePool_free(WritePoolCtx_t* ctx);
#if defined (__cplusplus)
}
#endif
#endif /* ZSTD_FILEIO_ASYNCIO_H */

117
programs/fileio_common.h Normal file
View File

@ -0,0 +1,117 @@
/*
* Copyright (c) Yann Collet, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under both the BSD-style license (found in the
* LICENSE file in the root directory of this source tree) and the GPLv2 (found
* in the COPYING file in the root directory of this source tree).
* You may select, at your option, one of the above-listed licenses.
*/
#ifndef ZSTD_FILEIO_COMMON_H
#define ZSTD_FILEIO_COMMON_H
#if defined (__cplusplus)
extern "C" {
#endif
#include "../lib/common/mem.h" /* U32, U64 */
#include "fileio_types.h"
#include "platform.h"
#include "timefn.h" /* UTIL_getTime, UTIL_clockSpanMicro */
/*-*************************************
* Macros
***************************************/
#define KB *(1 <<10)
#define MB *(1 <<20)
#define GB *(1U<<30)
#undef MAX
#define MAX(a,b) ((a)>(b) ? (a) : (b))
extern FIO_display_prefs_t g_display_prefs;
#define DISPLAY(...) fprintf(stderr, __VA_ARGS__)
#define DISPLAYOUT(...) fprintf(stdout, __VA_ARGS__)
#define DISPLAYLEVEL(l, ...) { if (g_display_prefs.displayLevel>=l) { DISPLAY(__VA_ARGS__); } }
extern UTIL_time_t g_displayClock;
#define REFRESH_RATE ((U64)(SEC_TO_MICRO / 6))
#define READY_FOR_UPDATE() ((g_display_prefs.progressSetting != FIO_ps_never) && UTIL_clockSpanMicro(g_displayClock) > REFRESH_RATE)
#define DELAY_NEXT_UPDATE() { g_displayClock = UTIL_getTime(); }
#define DISPLAYUPDATE(l, ...) { \
if (g_display_prefs.displayLevel>=l && (g_display_prefs.progressSetting != FIO_ps_never)) { \
if (READY_FOR_UPDATE() || (g_display_prefs.displayLevel>=4)) { \
DELAY_NEXT_UPDATE(); \
DISPLAY(__VA_ARGS__); \
if (g_display_prefs.displayLevel>=4) fflush(stderr); \
} } }
#undef MIN /* in case it would be already defined */
#define MIN(a,b) ((a) < (b) ? (a) : (b))
#define EXM_THROW(error, ...) \
{ \
DISPLAYLEVEL(1, "zstd: "); \
DISPLAYLEVEL(5, "Error defined at %s, line %i : \n", __FILE__, __LINE__); \
DISPLAYLEVEL(1, "error %i : ", error); \
DISPLAYLEVEL(1, __VA_ARGS__); \
DISPLAYLEVEL(1, " \n"); \
exit(error); \
}
#define CHECK_V(v, f) \
v = f; \
if (ZSTD_isError(v)) { \
DISPLAYLEVEL(5, "%s \n", #f); \
EXM_THROW(11, "%s", ZSTD_getErrorName(v)); \
}
#define CHECK(f) { size_t err; CHECK_V(err, f); }
/* Avoid fseek()'s 2GiB barrier with MSVC, macOS, *BSD, MinGW */
#if defined(_MSC_VER) && _MSC_VER >= 1400
# define LONG_SEEK _fseeki64
# define LONG_TELL _ftelli64
#elif !defined(__64BIT__) && (PLATFORM_POSIX_VERSION >= 200112L) /* No point defining Large file for 64 bit */
# define LONG_SEEK fseeko
# define LONG_TELL ftello
#elif defined(__MINGW32__) && !defined(__STRICT_ANSI__) && !defined(__NO_MINGW_LFS) && defined(__MSVCRT__)
# define LONG_SEEK fseeko64
# define LONG_TELL ftello64
#elif defined(_WIN32) && !defined(__DJGPP__)
# include <windows.h>
static int LONG_SEEK(FILE* file, __int64 offset, int origin) {
LARGE_INTEGER off;
DWORD method;
off.QuadPart = offset;
if (origin == SEEK_END)
method = FILE_END;
else if (origin == SEEK_CUR)
method = FILE_CURRENT;
else
method = FILE_BEGIN;
if (SetFilePointerEx((HANDLE) _get_osfhandle(_fileno(file)), off, NULL, method))
return 0;
else
return -1;
}
static __int64 LONG_TELL(FILE* file) {
LARGE_INTEGER off, newOff;
off.QuadPart = 0;
newOff.QuadPart = 0;
SetFilePointerEx((HANDLE) _get_osfhandle(_fileno(file)), off, &newOff, FILE_CURRENT);
return newOff.QuadPart;
}
#else
# define LONG_SEEK fseek
# define LONG_TELL ftell
#endif
#if defined (__cplusplus)
}
#endif
#endif //ZSTD_FILEIO_COMMON_H

73
programs/fileio_types.h Normal file
View File

@ -0,0 +1,73 @@
/*
* Copyright (c) Yann Collet, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under both the BSD-style license (found in the
* LICENSE file in the root directory of this source tree) and the GPLv2 (found
* in the COPYING file in the root directory of this source tree).
* You may select, at your option, one of the above-listed licenses.
*/
#ifndef FILEIO_TYPES_HEADER
#define FILEIO_TYPES_HEADER
#define ZSTD_STATIC_LINKING_ONLY /* ZSTD_compressionParameters */
#include "../lib/zstd.h" /* ZSTD_* */
/*-*************************************
* Parameters: FIO_prefs_t
***************************************/
typedef struct FIO_display_prefs_s FIO_display_prefs_t;
typedef enum { FIO_ps_auto, FIO_ps_never, FIO_ps_always } FIO_progressSetting_e;
struct FIO_display_prefs_s {
int displayLevel; /* 0 : no display; 1: errors; 2: + result + interaction + warnings; 3: + progression; 4: + information */
FIO_progressSetting_e progressSetting;
};
typedef enum { FIO_zstdCompression, FIO_gzipCompression, FIO_xzCompression, FIO_lzmaCompression, FIO_lz4Compression } FIO_compressionType_t;
typedef struct FIO_prefs_s {
/* Algorithm preferences */
FIO_compressionType_t compressionType;
U32 sparseFileSupport; /* 0: no sparse allowed; 1: auto (file yes, stdout no); 2: force sparse */
int dictIDFlag;
int checksumFlag;
int blockSize;
int overlapLog;
U32 adaptiveMode;
U32 useRowMatchFinder;
int rsyncable;
int minAdaptLevel;
int maxAdaptLevel;
int ldmFlag;
int ldmHashLog;
int ldmMinMatch;
int ldmBucketSizeLog;
int ldmHashRateLog;
size_t streamSrcSize;
size_t targetCBlockSize;
int srcSizeHint;
int testMode;
ZSTD_paramSwitch_e literalCompressionMode;
/* IO preferences */
U32 removeSrcFile;
U32 overwrite;
U32 asyncIO;
/* Computation resources preferences */
unsigned memLimit;
int nbWorkers;
int excludeCompressedFiles;
int patchFromMode;
int contentSize;
int allowBlockDevices;
} FIO_prefs_t;
#endif /* FILEIO_TYPES_HEADER */