Merge remote-tracking branch 'refs/remotes/facebook/dev' into dev11

dev
Przemyslaw Skibinski 2016-10-31 10:43:07 +01:00
commit 33d751999b
32 changed files with 1055 additions and 387 deletions

1
.gitignore vendored
View File

@ -32,3 +32,4 @@ _zstdbench/
*.swp
.DS_Store
googletest/
*.d

View File

@ -22,7 +22,7 @@ matrix:
packages:
- gcc-4.8
- g++-4.8
env: PLATFORM="Ubuntu 12.04 container" CMD="make zlibwrapper && make clean && make -C tests test-zstd_nolegacy && make clean && make clean && make cmaketest && make clean && make -C contrib/pzstd pzstd && make -C contrib/pzstd googletest && make -C contrib/pzstd test && make -C contrib/pzstd clean"
env: PLATFORM="Ubuntu 12.04 container" CMD="make zlibwrapper && make clean && make -C tests test-zstd_nolegacy && make clean && make clean && make cmaketest && make clean && make -C contrib/pzstd googletest && make -C contrib/pzstd all && make -C contrib/pzstd check && make -C contrib/pzstd clean"
- os: linux
sudo: false
env: PLATFORM="Ubuntu 12.04 container" CMD="make usan"
@ -55,6 +55,20 @@ matrix:
packages:
- libc6-dev-i386
- gcc-multilib
- os: linux
sudo: required
install:
- export CXX="g++-6" CC="gcc-6"
- export LDFLAGS="-fuse-ld=gold"
- export TESTFLAGS='--gtest_filter=-*ExtremelyLarge*'
env: PLATFORM="Ubuntu 12.04" CMD='cd contrib/pzstd && make googletest && make tsan && make check && make clean && make asan && make check && make clean && cd ../..'
addons:
apt:
sources:
- ubuntu-toolchain-r-test
packages:
- gcc-6
- g++-6
# Ubuntu 14.04 LTS Server Edition 64 bit
- os: linux
dist: trusty
@ -69,7 +83,7 @@ matrix:
sudo: required
install:
- export CXX="g++-4.8" CC="gcc-4.8"
env: PLATFORM="Ubuntu 14.04" CMD="make gpptest && make clean && make gnu90test && make clean && make c99test && make clean && make gnu99test && make clean && make clangtest && make clean && make -C contrib/pzstd pzstd32 && make -C contrib/pzstd googletest32 && make -C contrib/pzstd test32 && make -C contrib/pzstd clean"
env: PLATFORM="Ubuntu 14.04" CMD="make gpptest && make clean && make gnu90test && make clean && make c99test && make clean && make gnu99test && make clean && make clangtest && make clean && make -C contrib/pzstd googletest32 && make -C contrib/pzstd all32 && make -C contrib/pzstd check && make -C contrib/pzstd clean"
addons:
apt:
packages:

3
NEWS
View File

@ -2,7 +2,10 @@ v1.1.1
New : command -M#, --memory=, --memlimit=, --memlimit-decompress= to limit allowed memory consumption
New : doc/zstd_manual.html, by Przemyslaw Skibinski
Improved : slightly better compression ratio at --ultra levels (>= 20)
Added : ZSTD_initCStream_usingCDict(), ZSTD_initDStream_usingDDict() (experimental section)
Added : example/multiple_streaming_compression
Changed : zstd_errors.h is now part of include installation
Updated man page
Fixed : zstd-small, zstd-compress and zstd-decompress compilation targets
v1.1.0

View File

@ -50,10 +50,9 @@ build_script:
ECHO *** &&
ECHO *** Building pzstd for %PLATFORM% &&
ECHO *** &&
ECHO make -C contrib\pzstd pzstd &&
make -C contrib\pzstd pzstd &&
make -C contrib\pzstd googletest-mingw64 &&
make -C contrib\pzstd test &&
make -C contrib\pzstd all &&
make -C contrib\pzstd check &&
make -C contrib\pzstd clean
)
- if [%COMPILER%]==[gcc] (

72
contrib/pzstd/Logging.h Normal file
View File

@ -0,0 +1,72 @@
/**
* Copyright (c) 2016-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/
#pragma once
#include <cstdio>
#include <mutex>
namespace pzstd {
constexpr int ERROR = 1;
constexpr int INFO = 2;
constexpr int DEBUG = 3;
constexpr int VERBOSE = 4;
class Logger {
std::mutex mutex_;
FILE* out_;
const int level_;
using Clock = std::chrono::system_clock;
Clock::time_point lastUpdate_;
std::chrono::milliseconds refreshRate_;
public:
explicit Logger(int level, FILE* out = stderr)
: out_(out), level_(level), lastUpdate_(Clock::now()),
refreshRate_(150) {}
bool logsAt(int level) {
return level <= level_;
}
template <typename... Args>
void operator()(int level, const char *fmt, Args... args) {
if (level > level_) {
return;
}
std::lock_guard<std::mutex> lock(mutex_);
std::fprintf(out_, fmt, args...);
}
template <typename... Args>
void update(int level, const char *fmt, Args... args) {
if (level > level_) {
return;
}
std::lock_guard<std::mutex> lock(mutex_);
auto now = Clock::now();
if (now - lastUpdate_ > refreshRate_) {
lastUpdate_ = now;
std::fprintf(out_, "\r");
std::fprintf(out_, fmt, args...);
}
}
void clear(int level) {
if (level > level_) {
return;
}
std::lock_guard<std::mutex> lock(mutex_);
std::fprintf(out_, "\r%79s\r", "");
}
};
}

View File

@ -7,20 +7,71 @@
# of patent rights can be found in the PATENTS file in the same directory.
# ##########################################################################
# Standard variables for installation
DESTDIR ?=
PREFIX ?= /usr/local
BINDIR := $(DESTDIR)$(PREFIX)/bin
ZSTDDIR = ../../lib
PROGDIR = ../../programs
CPPFLAGS = -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(PROGDIR) -I.
CXXFLAGS ?= -O3
CXXFLAGS += -std=c++11
CXXFLAGS += $(MOREFLAGS)
FLAGS = $(CPPFLAGS) $(CXXFLAGS) $(LDFLAGS)
# External program to use to run tests, e.g. qemu or valgrind
TESTPROG ?=
# Flags to pass to the tests
TESTFLAGS ?=
# We use gcc/clang to generate the header dependencies of files
DEPFLAGS = -MMD -MP -MF $*.Td
POSTCOMPILE = mv -f $*.Td $*.d
# CFLAGS, CXXFLAGS, CPPFLAGS, and LDFLAGS are for the users to override
CFLAGS ?= -O3 -Wall -Wextra
CXXFLAGS ?= -O3 -Wall -Wextra -pedantic -std=c++11
CPPFLAGS ?=
LDFLAGS ?=
# Include flags
PZSTD_INC = -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(PROGDIR) -I.
GTEST_INC = -isystem googletest/googletest/include
PZSTD_CPPFLAGS = $(PZSTD_INC) $(GTEST_INC)
PZSTD_CCXXFLAGS =
PZSTD_CFLAGS = $(PZSTD_CCXXFLAGS)
PZSTD_CXXFLAGS = $(PZSTD_CCXXFLAGS)
PZSTD_LDFLAGS =
EXTRA_FLAGS =
ALL_CFLAGS = $(EXTRA_FLAGS) $(CPPFLAGS) $(PZSTD_CPPFLAGS) $(CFLAGS) $(PZSTD_CFLAGS)
ALL_CXXFLAGS = $(EXTRA_FLAGS) $(CPPFLAGS) $(PZSTD_CPPFLAGS) $(CXXFLAGS) $(PZSTD_CXXFLAGS)
ALL_LDFLAGS = $(EXTRA_FLAGS) $(LDFLAGS) $(PZSTD_LDFLAGS)
ZSTDCOMMON_FILES := $(ZSTDDIR)/common/*.c
ZSTDCOMP_FILES := $(ZSTDDIR)/compress/zstd_compress.c $(ZSTDDIR)/compress/fse_compress.c $(ZSTDDIR)/compress/huf_compress.c
ZSTDDECOMP_FILES := $(ZSTDDIR)/decompress/huf_decompress.c
ZSTD_FILES := $(ZSTDDECOMP_FILES) $(ZSTDCOMMON_FILES) $(ZSTDCOMP_FILES)
# gtest libraries need to go before "-lpthread" because they depend on it.
GTEST_LIB = -L googletest/build/googlemock/gtest
LIBS = $(GTEST_LIB) -lpthread
# Compilation commands
LD_COMMAND = $(CXX) $^ $(ALL_LDFLAGS) $(LIBS) -o $@
CC_COMMAND = $(CC) $(DEPFLAGS) $(ALL_CFLAGS) -c $< -o $@
CXX_COMMAND = $(CXX) $(DEPFLAGS) $(ALL_CXXFLAGS) -c $< -o $@
# Get a list of all zstd files so we rebuild the static library when we need to
ZSTDCOMMON_FILES := $(wildcard $(ZSTDDIR)/common/*.c) \
$(wildcard $(ZSTDDIR)/common/*.h)
ZSTDCOMP_FILES := $(wildcard $(ZSTDDIR)/compress/*.c) \
$(wildcard $(ZSTDDIR)/compress/*.h)
ZSTDDECOMP_FILES := $(wildcard $(ZSTDDIR)/decompress/*.c) \
$(wildcard $(ZSTDDIR)/decompress/*.h)
ZSTDPROG_FILES := $(wildcard $(PROGDIR)/*.c) \
$(wildcard $(PROGDIR)/*.h)
ZSTD_FILES := $(wildcard $(ZSTDDIR)/*.h) \
$(ZSTDDECOMP_FILES) $(ZSTDCOMMON_FILES) $(ZSTDCOMP_FILES) \
$(ZSTDPROG_FILES)
# List all the pzstd source files so we can determine their dependencies
PZSTD_SRCS := $(wildcard *.cpp)
PZSTD_TESTS := $(wildcard test/*.cpp)
UTILS_TESTS := $(wildcard utils/test/*.cpp)
ALL_SRCS := $(PZSTD_SRCS) $(PZSTD_TESTS) $(UTILS_TESTS)
# Define *.exe as extension for Windows systems
@ -30,89 +81,169 @@ else
EXT =
endif
.PHONY: default all test clean test32 googletest googletest32
# Standard targets
.PHONY: default
default: all
default: pzstd
.PHONY: check
check:
$(TESTPROG) ./utils/test/BufferTest$(EXT) $(TESTFLAGS)
$(TESTPROG) ./utils/test/RangeTest$(EXT) $(TESTFLAGS)
$(TESTPROG) ./utils/test/ResourcePoolTest$(EXT) $(TESTFLAGS)
$(TESTPROG) ./utils/test/ScopeGuardTest$(EXT) $(TESTFLAGS)
$(TESTPROG) ./utils/test/ThreadPoolTest$(EXT) $(TESTFLAGS)
$(TESTPROG) ./utils/test/WorkQueueTest$(EXT) $(TESTFLAGS)
$(TESTPROG) ./test/OptionsTest$(EXT) $(TESTFLAGS)
$(TESTPROG) ./test/PzstdTest$(EXT) $(TESTFLAGS)
all: pzstd
.PHONY: install
install: PZSTD_CPPFLAGS += -DNDEBUG
install: pzstd$(EXT)
install -d -m 755 $(BINDIR)/
install -m 755 pzstd$(EXT) $(BINDIR)/pzstd$(EXT)
.PHONY: uninstall
uninstall:
$(RM) $(BINDIR)/pzstd$(EXT)
# Targets for many different builds
.PHONY: all
all: PZSTD_CPPFLAGS += -DNDEBUG
all: pzstd$(EXT) tests roundtrip
.PHONY: debug
debug: EXTRA_FLAGS += -g
debug: pzstd$(EXT) tests roundtrip
.PHONY: tsan
tsan: PZSTD_CCXXFLAGS += -fsanitize=thread -fPIC
tsan: PZSTD_LDFLAGS += -fsanitize=thread -pie
tsan: debug
.PHONY: asan
asan: EXTRA_FLAGS += -fsanitize=address
asan: debug
.PHONY: ubsan
ubsan: EXTRA_FLAGS += -fsanitize=undefined
ubsan: debug
.PHONY: all32
all32: EXTRA_FLAGS += -m32
all32: all
.PHONY: debug32
debug32: EXTRA_FLAGS += -m32
debug32: debug
.PHONY: asan32
asan32: EXTRA_FLAGS += -m32
asan32: asan
.PHONY: tsan32
tsan32: EXTRA_FLAGS += -m32
tsan32: tsan
.PHONY: ubsan32
ubsan32: EXTRA_FLAGS += -m32
ubsan32: ubsan
# Run long round trip tests
.PHONY: roundtripcheck
roundtripcheck: roundtrip check
$(TESTPROG) ./test/RoundTripTest$(EXT) $(TESTFLAGS)
# Build the main binary
pzstd$(EXT): main.o Options.o Pzstd.o SkippableFrame.o $(ZSTDDIR)/libzstd.a
$(LD_COMMAND)
# Target that depends on all the tests
.PHONY: tests
tests: EXTRA_FLAGS += -Wno-deprecated-declarations
tests: $(patsubst %,%$(EXT),$(basename $(PZSTD_TESTS) $(UTILS_TESTS)))
# Build the round trip tests
.PHONY: roundtrip
roundtrip: EXTRA_FLAGS += -Wno-deprecated-declarations
roundtrip: test/RoundTripTest$(EXT)
# Use the static library that zstd builds for simplicity and
# so we get the compiler options correct
$(ZSTDDIR)/libzstd.a: $(ZSTD_FILES)
$(MAKE) -C $(ZSTDDIR) libzstd CFLAGS="$(ALL_CFLAGS)" LDFLAGS="$(ALL_LDFLAGS)"
libzstd.a: $(ZSTD_FILES)
$(MAKE) -C $(ZSTDDIR) libzstd
@cp $(ZSTDDIR)/libzstd.a .
# Rules to build the tests
test/RoundTripTest$(EXT): test/RoundTripTest.o $(PROGDIR)/datagen.o Options.o \
Pzstd.o SkippableFrame.o $(ZSTDDIR)/libzstd.a
$(LD_COMMAND)
Pzstd.o: Pzstd.h Pzstd.cpp ErrorHolder.h utils/*.h
$(CXX) $(FLAGS) -c Pzstd.cpp -o $@
test/%Test$(EXT): GTEST_LIB += -lgtest -lgtest_main
test/%Test$(EXT): test/%Test.o $(PROGDIR)/datagen.o Options.o Pzstd.o \
SkippableFrame.o $(ZSTDDIR)/libzstd.a
$(LD_COMMAND)
SkippableFrame.o: SkippableFrame.h SkippableFrame.cpp utils/*.h
$(CXX) $(FLAGS) -c SkippableFrame.cpp -o $@
utils/test/%Test$(EXT): GTEST_LIB += -lgtest -lgtest_main
utils/test/%Test$(EXT): utils/test/%Test.o
$(LD_COMMAND)
Options.o: Options.h Options.cpp
$(CXX) $(FLAGS) -c Options.cpp -o $@
main.o: main.cpp *.h utils/*.h
$(CXX) $(FLAGS) -c main.cpp -o $@
pzstd: Pzstd.o SkippableFrame.o Options.o main.o libzstd.a
$(CXX) $(FLAGS) $^ -o $@$(EXT) -lpthread
libzstd32.a: $(ZSTD_FILES)
$(MAKE) -C $(ZSTDDIR) libzstd MOREFLAGS="-m32"
@cp $(ZSTDDIR)/libzstd.a libzstd32.a
Pzstd32.o: Pzstd.h Pzstd.cpp ErrorHolder.h utils/*.h
$(CXX) -m32 $(FLAGS) -c Pzstd.cpp -o $@
SkippableFrame32.o: SkippableFrame.h SkippableFrame.cpp utils/*.h
$(CXX) -m32 $(FLAGS) -c SkippableFrame.cpp -o $@
Options32.o: Options.h Options.cpp
$(CXX) -m32 $(FLAGS) -c Options.cpp -o $@
main32.o: main.cpp *.h utils/*.h
$(CXX) -m32 $(FLAGS) -c main.cpp -o $@
pzstd32: Pzstd32.o SkippableFrame32.o Options32.o main32.o libzstd32.a
$(CXX) -m32 $(FLAGS) $^ -o $@$(EXT) -lpthread
GTEST_CMAKEFLAGS =
# Install googletest
.PHONY: googletest
googletest: PZSTD_CCXXFLAGS += -fPIC
googletest:
@$(RM) -rf googletest
@git clone https://github.com/google/googletest
@mkdir -p googletest/build
@cd googletest/build && cmake .. && make
@cd googletest/build && cmake $(GTEST_CMAKEFLAGS) -DCMAKE_CXX_FLAGS="$(ALL_CXXFLAGS)" .. && $(MAKE)
googletest32:
@$(RM) -rf googletest
@git clone https://github.com/google/googletest
@mkdir -p googletest/build
@cd googletest/build && cmake .. -DCMAKE_CXX_FLAGS=-m32 && make
googletest-mingw64:
$(RM) -rf googletest
git clone https://github.com/google/googletest
mkdir -p googletest/build
cd googletest/build && cmake -G "MSYS Makefiles" .. && $(MAKE)
test:
$(MAKE) libzstd.a
$(MAKE) pzstd MOREFLAGS="-Wall -Wextra -pedantic -Werror"
$(MAKE) -C utils/test clean
$(MAKE) -C utils/test test MOREFLAGS="-Wall -Wextra -pedantic -Werror"
$(MAKE) -C test clean
$(MAKE) -C test test MOREFLAGS="-Wall -Wextra -pedantic -Werror"
test32:
$(MAKE) libzstd.a MOREFLAGS="-m32"
$(MAKE) pzstd MOREFLAGS="-m32 -Wall -Wextra -pedantic -Werror"
$(MAKE) -C utils/test clean
$(MAKE) -C utils/test test MOREFLAGS="-m32 -Wall -Wextra -pedantic -Werror"
$(MAKE) -C test clean
$(MAKE) -C test test MOREFLAGS="-m32 -Wall -Wextra -pedantic -Werror"
.PHONY: googletest32
googletest32: PZSTD_CCXXFLAGS += -m32
googletest32: googletest
.PHONY: googletest-mingw64
googletest-mingw64: GTEST_CMAKEFLAGS += -G "MSYS Makefiles"
googletest-mingw64: googletest
.PHONY: clean
clean:
$(RM) -f *.o pzstd$(EXT) *.Td *.d
$(RM) -f test/*.o test/*Test$(EXT) test/*.Td test/*.d
$(RM) -f utils/test/*.o utils/test/*Test$(EXT) utils/test/*.Td utils/test/*.d
$(RM) -f $(PROGDIR)/*.o $(PROGDIR)/*.Td $(PROGDIR)/*.d
$(MAKE) -C $(ZSTDDIR) clean
$(MAKE) -C utils/test clean
$(MAKE) -C test clean
@$(RM) -rf libzstd.a *.o pzstd$(EXT) pzstd32$(EXT)
@echo Cleaning completed
# Cancel implicit rules
%.o: %.c
%.o: %.cpp
# Object file rules
%.o: %.c
$(CC_COMMAND)
$(POSTCOMPILE)
$(PROGDIR)/%.o: $(PROGDIR)/%.c
$(CC_COMMAND)
$(POSTCOMPILE)
%.o: %.cpp
$(CXX_COMMAND)
$(POSTCOMPILE)
test/%.o: test/%.cpp
$(CXX_COMMAND)
$(POSTCOMPILE)
utils/test/%.o: utils/test/%.cpp
$(CXX_COMMAND)
$(POSTCOMPILE)
# Dependency file stuff
.PRECIOUS: %.d test/%.d utils/test/%.d
# Include rules that specify header file dependencies
-include $(patsubst %,%.d,$(basename $(ALL_SRCS)))

View File

@ -303,6 +303,12 @@ Options::Status Options::parse(int argc, const char **argv) {
} // while (*options != 0);
} // for (int i = 1; i < argc; ++i);
// Set options for test mode
if (test) {
outputFile = nullOutput;
keepSource = true;
}
// Input file defaults to standard input if not provided.
if (localInputFiles.empty()) {
localInputFiles.emplace_back(kStdIn);
@ -399,11 +405,6 @@ Options::Status Options::parse(int argc, const char **argv) {
verbosity = 1;
}
// Set options for test mode
if (test) {
outputFile = nullOutput;
keepSource = true;
}
return Status::Success;
}

View File

@ -15,6 +15,7 @@
#include "utils/WorkQueue.h"
#include <chrono>
#include <cinttypes>
#include <cstddef>
#include <cstdio>
#include <memory>
@ -58,26 +59,24 @@ static std::uint64_t handleOneInput(const Options &options,
FILE* inputFd,
const std::string &outputFile,
FILE* outputFd,
ErrorHolder &errorHolder) {
SharedState& state) {
auto inputSize = fileSizeOrZero(inputFile);
// WorkQueue outlives ThreadPool so in the case of error we are certain
// we don't accidently try to call push() on it after it is destroyed.
// we don't accidently try to call push() on it after it is destroyed
WorkQueue<std::shared_ptr<BufferWorkQueue>> outs{options.numThreads + 1};
std::uint64_t bytesRead;
std::uint64_t bytesWritten;
{
// Initialize the thread pool with numThreads + 1
// We add one because the read thread spends most of its time waiting.
// This also sets the minimum number of threads to 2, so the algorithm
// doesn't deadlock.
ThreadPool executor(options.numThreads + 1);
// Initialize the (de)compression thread pool with numThreads
ThreadPool executor(options.numThreads);
// Run the reader thread on an extra thread
ThreadPool readExecutor(1);
if (!options.decompress) {
// Add a job that reads the input and starts all the compression jobs
executor.add(
[&errorHolder, &outs, &executor, inputFd, inputSize, &options,
&bytesRead] {
readExecutor.add(
[&state, &outs, &executor, inputFd, inputSize, &options, &bytesRead] {
bytesRead = asyncCompressChunks(
errorHolder,
state,
outs,
executor,
inputFd,
@ -86,29 +85,28 @@ static std::uint64_t handleOneInput(const Options &options,
options.determineParameters());
});
// Start writing
bytesWritten = writeFile(errorHolder, outs, outputFd, options.decompress,
options.verbosity);
bytesWritten = writeFile(state, outs, outputFd, options.decompress);
} else {
// Add a job that reads the input and starts all the decompression jobs
executor.add([&errorHolder, &outs, &executor, inputFd, &bytesRead] {
bytesRead = asyncDecompressFrames(errorHolder, outs, executor, inputFd);
readExecutor.add([&state, &outs, &executor, inputFd, &bytesRead] {
bytesRead = asyncDecompressFrames(state, outs, executor, inputFd);
});
// Start writing
bytesWritten = writeFile(errorHolder, outs, outputFd, options.decompress,
options.verbosity);
bytesWritten = writeFile(state, outs, outputFd, options.decompress);
}
}
if (options.verbosity > 1 && !errorHolder.hasError()) {
if (!state.errorHolder.hasError()) {
std::string inputFileName = inputFile == "-" ? "stdin" : inputFile;
std::string outputFileName = outputFile == "-" ? "stdout" : outputFile;
if (!options.decompress) {
double ratio = static_cast<double>(bytesWritten) /
static_cast<double>(bytesRead + !bytesRead);
std::fprintf(stderr, "%-20s :%6.2f%% (%6llu => %6llu bytes, %s)\n",
state.log(INFO, "%-20s :%6.2f%% (%6" PRIu64 " => %6" PRIu64
" bytes, %s)\n",
inputFileName.c_str(), ratio * 100, bytesRead, bytesWritten,
outputFileName.c_str());
} else {
std::fprintf(stderr, "%-20s: %llu bytes \n",
state.log(INFO, "%-20s: %" PRIu64 " bytes \n",
inputFileName.c_str(),bytesWritten);
}
}
@ -138,7 +136,7 @@ static FILE *openInputFile(const std::string &inputFile,
static FILE *openOutputFile(const Options &options,
const std::string &outputFile,
ErrorHolder &errorHolder) {
SharedState& state) {
if (outputFile == "-") {
SET_BINARY_MODE(stdout);
return stdout;
@ -148,82 +146,78 @@ static FILE *openOutputFile(const Options &options,
auto outputFd = std::fopen(outputFile.c_str(), "rb");
if (outputFd != nullptr) {
std::fclose(outputFd);
if (options.verbosity <= 1) {
errorHolder.setError("Output file exists");
if (!state.log.logsAt(INFO)) {
state.errorHolder.setError("Output file exists");
return nullptr;
}
std::fprintf(
stderr,
state.log(
INFO,
"pzstd: %s already exists; do you wish to overwrite (y/n) ? ",
outputFile.c_str());
int c = getchar();
if (c != 'y' && c != 'Y') {
errorHolder.setError("Not overwritten");
state.errorHolder.setError("Not overwritten");
return nullptr;
}
}
}
auto outputFd = std::fopen(outputFile.c_str(), "wb");
if (!errorHolder.check(
if (!state.errorHolder.check(
outputFd != nullptr, "Failed to open output file")) {
return 0;
return nullptr;
}
return outputFd;
}
int pzstdMain(const Options &options) {
int returnCode = 0;
SharedState state(options);
for (const auto& input : options.inputFiles) {
// Setup the error holder
ErrorHolder errorHolder;
// Setup the shared state
auto printErrorGuard = makeScopeGuard([&] {
if (errorHolder.hasError()) {
if (state.errorHolder.hasError()) {
returnCode = 1;
if (options.verbosity > 0) {
std::fprintf(stderr, "pzstd: %s: %s.\n", input.c_str(),
errorHolder.getError().c_str());
}
} else {
state.log(ERROR, "pzstd: %s: %s.\n", input.c_str(),
state.errorHolder.getError().c_str());
}
});
// Open the input file
auto inputFd = openInputFile(input, errorHolder);
auto inputFd = openInputFile(input, state.errorHolder);
if (inputFd == nullptr) {
continue;
}
auto closeInputGuard = makeScopeGuard([&] { std::fclose(inputFd); });
// Open the output file
auto outputFile = options.getOutputFile(input);
if (!errorHolder.check(outputFile != "",
if (!state.errorHolder.check(outputFile != "",
"Input file does not have extension .zst")) {
continue;
}
auto outputFd = openOutputFile(options, outputFile, errorHolder);
auto outputFd = openOutputFile(options, outputFile, state);
if (outputFd == nullptr) {
continue;
}
auto closeOutputGuard = makeScopeGuard([&] { std::fclose(outputFd); });
// (de)compress the file
handleOneInput(options, input, inputFd, outputFile, outputFd, errorHolder);
if (errorHolder.hasError()) {
handleOneInput(options, input, inputFd, outputFile, outputFd, state);
if (state.errorHolder.hasError()) {
continue;
}
// Delete the input file if necessary
if (!options.keepSource) {
// Be sure that we are done and have written everything before we delete
if (!errorHolder.check(std::fclose(inputFd) == 0,
if (!state.errorHolder.check(std::fclose(inputFd) == 0,
"Failed to close input file")) {
continue;
}
closeInputGuard.dismiss();
if (!errorHolder.check(std::fclose(outputFd) == 0,
if (!state.errorHolder.check(std::fclose(outputFd) == 0,
"Failed to close output file")) {
continue;
}
closeOutputGuard.dismiss();
if (std::remove(input.c_str()) != 0) {
errorHolder.setError("Failed to remove input file");
state.errorHolder.setError("Failed to remove input file");
continue;
}
}
@ -269,27 +263,25 @@ Buffer split(Buffer& buffer, ZSTD_outBuffer& outBuffer) {
/**
* Stream chunks of input from `in`, compress it, and stream it out to `out`.
*
* @param errorHolder Used to report errors and check if an error occured
* @param state The shared state
* @param in Queue that we `pop()` input buffers from
* @param out Queue that we `push()` compressed output buffers to
* @param maxInputSize An upper bound on the size of the input
* @param parameters The zstd parameters to use for compression
*/
static void compress(
ErrorHolder& errorHolder,
SharedState& state,
std::shared_ptr<BufferWorkQueue> in,
std::shared_ptr<BufferWorkQueue> out,
size_t maxInputSize,
ZSTD_parameters parameters) {
size_t maxInputSize) {
auto& errorHolder = state.errorHolder;
auto guard = makeScopeGuard([&] { out->finish(); });
// Initialize the CCtx
std::unique_ptr<ZSTD_CStream, size_t (*)(ZSTD_CStream*)> ctx(
ZSTD_createCStream(), ZSTD_freeCStream);
auto ctx = state.cStreamPool->get();
if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_CStream")) {
return;
}
{
auto err = ZSTD_initCStream_advanced(ctx.get(), nullptr, 0, parameters, 0);
auto err = ZSTD_resetCStream(ctx.get(), 0);
if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) {
return;
}
@ -396,7 +388,7 @@ readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd,
}
std::uint64_t asyncCompressChunks(
ErrorHolder& errorHolder,
SharedState& state,
WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks,
ThreadPool& executor,
FILE* fd,
@ -410,23 +402,23 @@ std::uint64_t asyncCompressChunks(
// independently.
size_t step = calculateStep(size, numThreads, params);
auto status = FileStatus::Continue;
while (status == FileStatus::Continue && !errorHolder.hasError()) {
while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
// Make a new input queue that we will put the chunk's input data into.
auto in = std::make_shared<BufferWorkQueue>();
auto inGuard = makeScopeGuard([&] { in->finish(); });
// Make a new output queue that compress will put the compressed data into.
auto out = std::make_shared<BufferWorkQueue>();
// Start compression in the thread pool
executor.add([&errorHolder, in, out, step, params] {
executor.add([&state, in, out, step] {
return compress(
errorHolder, std::move(in), std::move(out), step, params);
state, std::move(in), std::move(out), step);
});
// Pass the output queue to the writer thread.
chunks.push(std::move(out));
// Fill the input queue for the compression job we just started
status = readData(*in, ZSTD_CStreamInSize(), step, fd, &bytesRead);
}
errorHolder.check(status != FileStatus::Error, "Error reading input");
state.errorHolder.check(status != FileStatus::Error, "Error reading input");
return bytesRead;
}
@ -434,24 +426,24 @@ std::uint64_t asyncCompressChunks(
* Decompress a frame, whose data is streamed into `in`, and stream the output
* to `out`.
*
* @param errorHolder Used to report errors and check if an error occured
* @param state The shared state
* @param in Queue that we `pop()` input buffers from. It contains
* exactly one compressed frame.
* @param out Queue that we `push()` decompressed output buffers to
*/
static void decompress(
ErrorHolder& errorHolder,
SharedState& state,
std::shared_ptr<BufferWorkQueue> in,
std::shared_ptr<BufferWorkQueue> out) {
auto& errorHolder = state.errorHolder;
auto guard = makeScopeGuard([&] { out->finish(); });
// Initialize the DCtx
std::unique_ptr<ZSTD_DStream, size_t (*)(ZSTD_DStream*)> ctx(
ZSTD_createDStream(), ZSTD_freeDStream);
auto ctx = state.dStreamPool->get();
if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_DStream")) {
return;
}
{
auto err = ZSTD_initDStream(ctx.get());
auto err = ZSTD_resetDStream(ctx.get());
if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) {
return;
}
@ -509,7 +501,7 @@ static void decompress(
}
std::uint64_t asyncDecompressFrames(
ErrorHolder& errorHolder,
SharedState& state,
WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames,
ThreadPool& executor,
FILE* fd) {
@ -522,7 +514,7 @@ std::uint64_t asyncDecompressFrames(
// Otherwise, we will decompress using only one decompression task.
const size_t chunkSize = ZSTD_DStreamInSize();
auto status = FileStatus::Continue;
while (status == FileStatus::Continue && !errorHolder.hasError()) {
while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
// Make a new input queue that we will put the frames's bytes into.
auto in = std::make_shared<BufferWorkQueue>();
auto inGuard = makeScopeGuard([&] { in->finish(); });
@ -551,15 +543,15 @@ std::uint64_t asyncDecompressFrames(
out->setMaxSize(64);
}
// Start decompression in the thread pool
executor.add([&errorHolder, in, out] {
return decompress(errorHolder, std::move(in), std::move(out));
executor.add([&state, in, out] {
return decompress(state, std::move(in), std::move(out));
});
// Pass the output queue to the writer thread
frames.push(std::move(out));
if (frameSize == 0) {
// We hit a non SkippableFrame ==> not compressed by pzstd or corrupted
// Pass the rest of the source to this decompression task
while (status == FileStatus::Continue && !errorHolder.hasError()) {
while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
status = readData(*in, chunkSize, chunkSize, fd, &totalBytesRead);
}
break;
@ -567,7 +559,7 @@ std::uint64_t asyncDecompressFrames(
// Fill the input queue for the decompression job we just started
status = readData(*in, chunkSize, frameSize, fd, &totalBytesRead);
}
errorHolder.check(status != FileStatus::Error, "Error reading input");
state.errorHolder.check(status != FileStatus::Error, "Error reading input");
return totalBytesRead;
}
@ -582,32 +574,14 @@ static bool writeData(ByteRange data, FILE* fd) {
return true;
}
void updateWritten(int verbosity, std::uint64_t bytesWritten) {
if (verbosity <= 1) {
return;
}
using Clock = std::chrono::system_clock;
static Clock::time_point then;
constexpr std::chrono::milliseconds refreshRate{150};
auto now = Clock::now();
if (now - then > refreshRate) {
then = now;
std::fprintf(stderr, "\rWritten: %u MB ",
static_cast<std::uint32_t>(bytesWritten >> 20));
}
}
std::uint64_t writeFile(
ErrorHolder& errorHolder,
SharedState& state,
WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,
FILE* outputFd,
bool decompress,
int verbosity) {
auto lineClearGuard = makeScopeGuard([verbosity] {
if (verbosity > 1) {
std::fprintf(stderr, "\r%79s\r", "");
}
bool decompress) {
auto& errorHolder = state.errorHolder;
auto lineClearGuard = makeScopeGuard([&state] {
state.log.clear(INFO);
});
std::uint64_t bytesWritten = 0;
std::shared_ptr<BufferWorkQueue> out;
@ -633,7 +607,8 @@ std::uint64_t writeFile(
return bytesWritten;
}
bytesWritten += buffer.size();
updateWritten(verbosity, bytesWritten);
state.log.update(INFO, "Written: %u MB ",
static_cast<std::uint32_t>(bytesWritten >> 20));
}
}
return bytesWritten;

View File

@ -9,9 +9,11 @@
#pragma once
#include "ErrorHolder.h"
#include "Logging.h"
#include "Options.h"
#include "utils/Buffer.h"
#include "utils/Range.h"
#include "utils/ResourcePool.h"
#include "utils/ThreadPool.h"
#include "utils/WorkQueue.h"
#define ZSTD_STATIC_LINKING_ONLY
@ -32,12 +34,58 @@ namespace pzstd {
*/
int pzstdMain(const Options& options);
class SharedState {
public:
SharedState(const Options& options) : log(options.verbosity) {
if (!options.decompress) {
auto parameters = options.determineParameters();
cStreamPool.reset(new ResourcePool<ZSTD_CStream>{
[parameters]() -> ZSTD_CStream* {
auto zcs = ZSTD_createCStream();
if (zcs) {
auto err = ZSTD_initCStream_advanced(
zcs, nullptr, 0, parameters, 0);
if (ZSTD_isError(err)) {
ZSTD_freeCStream(zcs);
return nullptr;
}
}
return zcs;
},
[](ZSTD_CStream *zcs) {
ZSTD_freeCStream(zcs);
}});
} else {
dStreamPool.reset(new ResourcePool<ZSTD_DStream>{
[]() -> ZSTD_DStream* {
auto zds = ZSTD_createDStream();
if (zds) {
auto err = ZSTD_initDStream(zds);
if (ZSTD_isError(err)) {
ZSTD_freeDStream(zds);
return nullptr;
}
}
return zds;
},
[](ZSTD_DStream *zds) {
ZSTD_freeDStream(zds);
}});
}
}
Logger log;
ErrorHolder errorHolder;
std::unique_ptr<ResourcePool<ZSTD_CStream>> cStreamPool;
std::unique_ptr<ResourcePool<ZSTD_DStream>> dStreamPool;
};
/**
* Streams input from `fd`, breaks input up into chunks, and compresses each
* chunk independently. Output of each chunk gets streamed to a queue, and
* the output queues get put into `chunks` in order.
*
* @param errorHolder Used to report errors and coordinate early shutdown
* @param state The shared state
* @param chunks Each compression jobs output queue gets `pushed()` here
* as soon as it is available
* @param executor The thread pool to run compression jobs in
@ -48,7 +96,7 @@ int pzstdMain(const Options& options);
* @returns The number of bytes read from the file
*/
std::uint64_t asyncCompressChunks(
ErrorHolder& errorHolder,
SharedState& state,
WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks,
ThreadPool& executor,
FILE* fd,
@ -62,7 +110,7 @@ std::uint64_t asyncCompressChunks(
* decompression job. Output of each frame gets streamed to a queue, and
* the output queues get put into `frames` in order.
*
* @param errorHolder Used to report errors and coordinate early shutdown
* @param state The shared state
* @param frames Each decompression jobs output queue gets `pushed()` here
* as soon as it is available
* @param executor The thread pool to run compression jobs in
@ -70,7 +118,7 @@ std::uint64_t asyncCompressChunks(
* @returns The number of bytes read from the file
*/
std::uint64_t asyncDecompressFrames(
ErrorHolder& errorHolder,
SharedState& state,
WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames,
ThreadPool& executor,
FILE* fd);
@ -79,18 +127,16 @@ std::uint64_t asyncDecompressFrames(
* Streams input in from each queue in `outs` in order, and writes the data to
* `outputFd`.
*
* @param errorHolder Used to report errors and coordinate early exit
* @param state The shared state
* @param outs A queue of output queues, one for each
* (de)compression job.
* @param outputFd The file descriptor to write to
* @param decompress Are we decompressing?
* @param verbosity The verbosity level to log at
* @returns The number of bytes written
*/
std::uint64_t writeFile(
ErrorHolder& errorHolder,
SharedState& state,
WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,
FILE* outputFd,
bool decompress,
int verbosity);
bool decompress);
}

View File

@ -10,7 +10,7 @@ When decompressing files compressed with Zstandard, PZstandard does IO in one th
## Usage
PZstandard supports the same command line interface as Zstandard, but also provies the `-p` option to specify the number of threads.
PZstandard supports the same command line interface as Zstandard, but also provides the `-p` option to specify the number of threads.
Dictionary mode is not currently supported.
Basic usage

View File

@ -1,48 +0,0 @@
# ##########################################################################
# Copyright (c) 2016-present, Facebook, Inc.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree. An additional grant
# of patent rights can be found in the PATENTS file in the same directory.
# ##########################################################################
# Define *.exe as extension for Windows systems
ifneq (,$(filter Windows%,$(OS)))
EXT =.exe
else
EXT =
endif
PZSTDDIR = ..
PROGDIR = ../../../programs
ZSTDDIR = ../../../lib
# Set GTEST_INC and GTEST_LIB to work with your install of gtest
GTEST_INC ?= -isystem $(PZSTDDIR)/googletest/googletest/include
GTEST_LIB ?= -L $(PZSTDDIR)/googletest/build/googlemock/gtest
GTEST_FLAGS = $(GTEST_INC) $(GTEST_LIB)
CPPFLAGS = -I$(PZSTDDIR) -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(PROGDIR) -I.
CXXFLAGS ?= -O3
CXXFLAGS += -std=c++11 -Wno-deprecated-declarations
CXXFLAGS += $(MOREFLAGS)
FLAGS = $(CPPFLAGS) $(CXXFLAGS) $(LDFLAGS)
datagen.o: $(PROGDIR)/datagen.*
$(CC) $(CPPFLAGS) -O3 $(MOREFLAGS) $(LDFLAGS) -Wno-long-long -Wno-variadic-macros $(PROGDIR)/datagen.c -c -o $@
%: %.cpp *.h datagen.o
$(CXX) $(FLAGS) $@.cpp datagen.o $(PZSTDDIR)/Pzstd.o $(PZSTDDIR)/SkippableFrame.o $(PZSTDDIR)/Options.o $(PZSTDDIR)/libzstd.a -o $@$(EXT) $(GTEST_FLAGS) -lgtest -lgtest_main -lpthread
.PHONY: test clean
test: OptionsTest PzstdTest
@./OptionsTest$(EXT)
@./PzstdTest$(EXT)
roundtrip: RoundTripTest
@./RoundTripTest$(EXT)
clean:
@rm -f datagen.o OptionsTest PzstdTest RoundTripTest

View File

@ -0,0 +1,96 @@
/**
* Copyright (c) 2016-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/
#pragma once
#include <cassert>
#include <functional>
#include <memory>
#include <mutex>
#include <vector>
namespace pzstd {
/**
* An unbounded pool of resources.
* A `ResourcePool<T>` requires a factory function that takes allocates `T*` and
* a free function that frees a `T*`.
* Calling `ResourcePool::get()` will give you a new `ResourcePool::UniquePtr`
* to a `T`, and when it goes out of scope the resource will be returned to the
* pool.
* The `ResourcePool<T>` *must* survive longer than any resources it hands out.
* Remember that `ResourcePool<T>` hands out mutable `T`s, so make sure to clean
* up the resource before or after every use.
*/
template <typename T>
class ResourcePool {
public:
class Deleter;
using Factory = std::function<T*()>;
using Free = std::function<void(T*)>;
using UniquePtr = std::unique_ptr<T, Deleter>;
private:
std::mutex mutex_;
Factory factory_;
Free free_;
std::vector<T*> resources_;
unsigned inUse_;
public:
/**
* Creates a `ResourcePool`.
*
* @param factory The function to use to create new resources.
* @param free The function to use to free resources created by `factory`.
*/
ResourcePool(Factory factory, Free free)
: factory_(std::move(factory)), free_(std::move(free)), inUse_(0) {}
/**
* @returns A unique pointer to a resource. The resource is null iff
* there are no avaiable resources and `factory()` returns null.
*/
UniquePtr get() {
std::lock_guard<std::mutex> lock(mutex_);
if (!resources_.empty()) {
UniquePtr resource{resources_.back(), Deleter{*this}};
resources_.pop_back();
++inUse_;
return resource;
}
UniquePtr resource{factory_(), Deleter{*this}};
++inUse_;
return resource;
}
~ResourcePool() noexcept {
assert(inUse_ == 0);
for (const auto resource : resources_) {
free_(resource);
}
}
class Deleter {
ResourcePool *pool_;
public:
explicit Deleter(ResourcePool &pool) : pool_(&pool) {}
void operator() (T *resource) {
std::lock_guard<std::mutex> lock(pool_->mutex_);
// Make sure we don't put null resources into the pool
if (resource) {
pool_->resources_.push_back(resource);
}
assert(pool_->inUse_ > 0);
--pool_->inUse_;
}
};
};
}

View File

@ -27,7 +27,7 @@ class ThreadPool {
explicit ThreadPool(std::size_t numThreads) {
threads_.reserve(numThreads);
for (std::size_t i = 0; i < numThreads; ++i) {
threads_.emplace_back([&] {
threads_.emplace_back([this] {
std::function<void()> task;
while (tasks_.pop(task)) {
task();

View File

@ -28,6 +28,7 @@ class WorkQueue {
std::mutex mutex_;
std::condition_variable readerCv_;
std::condition_variable writerCv_;
std::condition_variable finishCv_;
std::queue<T> queue_;
bool done_;
@ -53,12 +54,13 @@ class WorkQueue {
/**
* Push an item onto the work queue. Notify a single thread that work is
* available. If `finish()` has been called, do nothing and return false.
* If `push()` returns false, then `item` has not been moved from.
*
* @param item Item to push onto the queue.
* @returns True upon success, false if `finish()` has been called. An
* item was pushed iff `push()` returns true.
*/
bool push(T item) {
bool push(T&& item) {
{
std::unique_lock<std::mutex> lock(mutex_);
while (full() && !done_) {
@ -124,19 +126,14 @@ class WorkQueue {
}
readerCv_.notify_all();
writerCv_.notify_all();
finishCv_.notify_all();
}
/// Blocks until `finish()` has been called (but the queue may not be empty).
void waitUntilFinished() {
std::unique_lock<std::mutex> lock(mutex_);
while (!done_) {
readerCv_.wait(lock);
// If we were woken by a push, we need to wake a thread waiting on pop().
if (!done_) {
lock.unlock();
readerCv_.notify_one();
lock.lock();
}
finishCv_.wait(lock);
}
}
};

View File

@ -1,42 +0,0 @@
# ##########################################################################
# Copyright (c) 2016-present, Facebook, Inc.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree. An additional grant
# of patent rights can be found in the PATENTS file in the same directory.
# ##########################################################################
# Define *.exe as extension for Windows systems
ifneq (,$(filter Windows%,$(OS)))
EXT =.exe
else
EXT =
endif
PZSTDDIR = ../..
# Set GTEST_INC and GTEST_LIB to work with your install of gtest
GTEST_INC ?= -isystem $(PZSTDDIR)/googletest/googletest/include
GTEST_LIB ?= -L $(PZSTDDIR)/googletest/build/googlemock/gtest
CPPFLAGS = -I$(PZSTDDIR) $(GTEST_INC) $(GTEST_LIB)
CXXFLAGS ?= -O3
CXXFLAGS += -std=c++11
CXXFLAGS += $(MOREFLAGS)
FLAGS = $(CPPFLAGS) $(CXXFLAGS) $(LDFLAGS)
%: %.cpp
$(CXX) $(FLAGS) $^ -o $@$(EXT) -lgtest -lgtest_main -lpthread
.PHONY: test clean
test: BufferTest RangeTest ScopeGuardTest ThreadPoolTest WorkQueueTest
@./BufferTest$(EXT)
@./RangeTest$(EXT)
@./ScopeGuardTest$(EXT)
@./ThreadPoolTest$(EXT)
@./WorkQueueTest$(EXT)
clean:
@rm -f BufferTest RangeTest ScopeGuardTest ThreadPoolTest WorkQueueTest

View File

@ -0,0 +1,72 @@
/**
* Copyright (c) 2016-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/
#include "utils/ResourcePool.h"
#include <gtest/gtest.h>
#include <atomic>
#include <thread>
using namespace pzstd;
TEST(ResourcePool, FullTest) {
unsigned numCreated = 0;
unsigned numDeleted = 0;
{
ResourcePool<int> pool(
[&numCreated] { ++numCreated; return new int{5}; },
[&numDeleted](int *x) { ++numDeleted; delete x; });
{
auto i = pool.get();
EXPECT_EQ(5, *i);
*i = 6;
}
{
auto i = pool.get();
EXPECT_EQ(6, *i);
auto j = pool.get();
EXPECT_EQ(5, *j);
*j = 7;
}
{
auto i = pool.get();
EXPECT_EQ(6, *i);
auto j = pool.get();
EXPECT_EQ(7, *j);
}
}
EXPECT_EQ(2, numCreated);
EXPECT_EQ(numCreated, numDeleted);
}
TEST(ResourcePool, ThreadSafe) {
std::atomic<unsigned> numCreated{0};
std::atomic<unsigned> numDeleted{0};
{
ResourcePool<int> pool(
[&numCreated] { ++numCreated; return new int{0}; },
[&numDeleted](int *x) { ++numDeleted; delete x; });
auto push = [&pool] {
for (int i = 0; i < 100; ++i) {
auto x = pool.get();
++*x;
}
};
std::thread t1{push};
std::thread t2{push};
t1.join();
t2.join();
auto x = pool.get();
auto y = pool.get();
EXPECT_EQ(200, *x + *y);
}
EXPECT_GE(2, numCreated);
EXPECT_EQ(numCreated, numDeleted);
}

View File

@ -10,6 +10,7 @@
#include "utils/WorkQueue.h"
#include <gtest/gtest.h>
#include <memory>
#include <mutex>
#include <thread>
#include <vector>
@ -64,7 +65,7 @@ TEST(WorkQueue, SPSC) {
const int max = 100;
for (int i = 0; i < 10; ++i) {
queue.push(i);
queue.push(int{i});
}
std::thread thread([ &queue, max ] {
@ -80,7 +81,7 @@ TEST(WorkQueue, SPSC) {
std::this_thread::yield();
for (int i = 10; i < max; ++i) {
queue.push(i);
queue.push(int{i});
}
queue.finish();
@ -97,7 +98,7 @@ TEST(WorkQueue, SPMC) {
}
for (int i = 0; i < 50; ++i) {
queue.push(i);
queue.push(int{i});
}
queue.finish();
@ -126,7 +127,7 @@ TEST(WorkQueue, MPMC) {
pusherThreads.emplace_back(
[ &queue, min, max ] {
for (int i = min; i < max; ++i) {
queue.push(i);
queue.push(int{i});
}
});
}
@ -212,7 +213,7 @@ TEST(WorkQueue, BoundedSizeMPMC) {
pusherThreads.emplace_back(
[ &queue, min, max ] {
for (int i = min; i < max; ++i) {
queue.push(i);
queue.push(int{i});
}
});
}
@ -231,6 +232,18 @@ TEST(WorkQueue, BoundedSizeMPMC) {
}
}
TEST(WorkQueue, FailedPush) {
WorkQueue<std::unique_ptr<int>> queue;
std::unique_ptr<int> x(new int{5});
EXPECT_TRUE(queue.push(std::move(x)));
EXPECT_EQ(nullptr, x);
queue.finish();
x.reset(new int{6});
EXPECT_FALSE(queue.push(std::move(x)));
EXPECT_NE(nullptr, x);
EXPECT_EQ(6, *x);
}
TEST(BufferWorkQueue, SizeCalculatedCorrectly) {
{
BufferWorkQueue queue;

1
examples/.gitignore vendored
View File

@ -5,6 +5,7 @@ dictionary_compression
dictionary_decompression
streaming_compression
streaming_decompression
multiple_streaming_compression
#test artefact
tmp*

View File

@ -17,7 +17,8 @@ default: all
all: simple_compression simple_decompression \
dictionary_compression dictionary_decompression \
streaming_compression streaming_decompression
streaming_compression streaming_decompression \
multiple_streaming_compression
simple_compression : simple_compression.c
$(CC) $(CPPFLAGS) $(CFLAGS) $^ $(LDFLAGS) -o $@
@ -34,6 +35,9 @@ dictionary_decompression : dictionary_decompression.c
streaming_compression : streaming_compression.c
$(CC) $(CPPFLAGS) $(CFLAGS) $^ $(LDFLAGS) -o $@
multiple_streaming_compression : multiple_streaming_compression.c
$(CC) $(CPPFLAGS) $(CFLAGS) $^ $(LDFLAGS) -o $@
streaming_decompression : streaming_decompression.c
$(CC) $(CPPFLAGS) $(CFLAGS) $^ $(LDFLAGS) -o $@
@ -41,7 +45,8 @@ clean:
@rm -f core *.o tmp* result* *.zst \
simple_compression simple_decompression \
dictionary_compression dictionary_decompression \
streaming_compression streaming_decompression
streaming_compression streaming_decompression \
multiple_streaming_compression
@echo Cleaning completed
test: all
@ -54,7 +59,10 @@ test: all
@echo starting streaming compression
./streaming_compression tmp
./streaming_decompression tmp.zst > /dev/null
@echo starting multiple streaming compression
./multiple_streaming_compression *.c
@echo starting dictionary compression
./dictionary_compression tmp2 tmp README.md
./dictionary_decompression tmp2.zst tmp.zst README.md
$(RM) tmp* *.zst
@echo tests completed

View File

@ -15,6 +15,11 @@ Zstandard library : usage examples
Compress a single file.
Introduces usage of : `ZSTD_compressStream()`
- [Multiple Streaming compression](multiple_streaming_compression.c) :
Compress multiple files in a single command line.
Introduces memory usage preservation technique,
reducing impact of malloc()/free() and memset() by re-using existing resources.
- [Streaming decompression](streaming_decompression.c) :
Decompress a single file compressed by zstd.
Compatible with both simple and streaming compression.

View File

@ -0,0 +1,163 @@
/**
* Copyright 2016-present, Yann Collet, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the license found in the
* LICENSE-examples file in the root directory of this source tree.
*/
/* The objective of this example is to show of to compress multiple successive files
* while preserving memory management.
* All structures and buffers will be created only once,
* and shared across all compression operations */
#include <stdlib.h> // malloc, exit
#include <stdio.h> // fprintf, perror, feof
#include <string.h> // strerror
#include <errno.h> // errno
#define ZSTD_STATIC_LINKING_ONLY // streaming API defined as "experimental" for the time being
#include <zstd.h> // presumes zstd library is installed
static void* malloc_orDie(size_t size)
{
void* const buff = malloc(size);
if (buff) return buff;
/* error */
perror("malloc:");
exit(1);
}
static FILE* fopen_orDie(const char *filename, const char *instruction)
{
FILE* const inFile = fopen(filename, instruction);
if (inFile) return inFile;
/* error */
perror(filename);
exit(3);
}
static size_t fread_orDie(void* buffer, size_t sizeToRead, FILE* file)
{
size_t const readSize = fread(buffer, 1, sizeToRead, file);
if (readSize == sizeToRead) return readSize; /* good */
if (feof(file)) return readSize; /* good, reached end of file */
/* error */
perror("fread");
exit(4);
}
static size_t fwrite_orDie(const void* buffer, size_t sizeToWrite, FILE* file)
{
size_t const writtenSize = fwrite(buffer, 1, sizeToWrite, file);
if (writtenSize == sizeToWrite) return sizeToWrite; /* good */
/* error */
perror("fwrite");
exit(5);
}
static size_t fclose_orDie(FILE* file)
{
if (!fclose(file)) return 0;
/* error */
perror("fclose");
exit(6);
}
typedef struct {
void* buffIn;
void* buffOut;
size_t buffInSize;
size_t buffOutSize;
ZSTD_CStream* cstream;
} resources ;
static resources createResources_orDie()
{
resources ress;
ress.buffInSize = ZSTD_CStreamInSize(); /* can always read one full block */
ress.buffOutSize= ZSTD_CStreamOutSize(); /* can always flush a full block */
ress.buffIn = malloc_orDie(ress.buffInSize);
ress.buffOut= malloc_orDie(ress.buffOutSize);
ress.cstream = ZSTD_createCStream();
if (ress.cstream==NULL) { fprintf(stderr, "ZSTD_createCStream() error \n"); exit(10); }
return ress;
}
static void freeResources(resources ress)
{
ZSTD_freeCStream(ress.cstream);
free(ress.buffIn);
free(ress.buffOut);
}
static void compressFile_orDie(resources ress, const char* fname, const char* outName, int cLevel)
{
FILE* const fin = fopen_orDie(fname, "rb");
FILE* const fout = fopen_orDie(outName, "wb");
size_t const initResult = ZSTD_initCStream(ress.cstream, cLevel);
if (ZSTD_isError(initResult)) { fprintf(stderr, "ZSTD_initCStream() error : %s \n", ZSTD_getErrorName(initResult)); exit(11); }
size_t read, toRead = ress.buffInSize;
while( (read = fread_orDie(ress.buffIn, toRead, fin)) ) {
ZSTD_inBuffer input = { ress.buffIn, read, 0 };
while (input.pos < input.size) {
ZSTD_outBuffer output = { ress.buffOut, ress.buffOutSize, 0 };
toRead = ZSTD_compressStream(ress.cstream, &output , &input); /* toRead is guaranteed to be <= ZSTD_CStreamInSize() */
if (ZSTD_isError(toRead)) { fprintf(stderr, "ZSTD_compressStream() error : %s \n", ZSTD_getErrorName(toRead)); exit(12); }
if (toRead > ress.buffInSize) toRead = ress.buffInSize; /* Safely handle when `buffInSize` is manually changed to a smaller value */
fwrite_orDie(ress.buffOut, output.pos, fout);
}
}
ZSTD_outBuffer output = { ress.buffOut, ress.buffOutSize, 0 };
size_t const remainingToFlush = ZSTD_endStream(ress.cstream, &output); /* close frame */
if (remainingToFlush) { fprintf(stderr, "not fully flushed"); exit(13); }
fwrite_orDie(ress.buffOut, output.pos, fout);
fclose_orDie(fout);
fclose_orDie(fin);
}
int main(int argc, const char** argv)
{
const char* const exeName = argv[0];
if (argc<2) {
printf("wrong arguments\n");
printf("usage:\n");
printf("%s FILE(s)\n", exeName);
return 1;
}
resources const ress = createResources_orDie();
void* ofnBuffer = NULL;
size_t ofnbSize = 0;
int argNb;
for (argNb = 1; argNb < argc; argNb++) {
const char* const ifn = argv[argNb];
size_t const ifnSize = strlen(ifn);
size_t const ofnSize = ifnSize + 5;
if (ofnbSize <= ofnSize) {
ofnbSize = ofnSize + 16;
free(ofnBuffer);
ofnBuffer = malloc_orDie(ofnbSize);
}
memset(ofnBuffer, 0, ofnSize);
strcat(ofnBuffer, ifn);
strcat(ofnBuffer, ".zst");
compressFile_orDie(ress, ifn, ofnBuffer, 7);
}
freeResources(ress);
/* success */
printf("compressed %i files \n", argc-1);
return 0;
}

View File

@ -65,7 +65,7 @@ static void decompress(const char* fname)
void* const cBuff = loadFile_X(fname, &cSize);
unsigned long long const rSize = ZSTD_getDecompressedSize(cBuff, cSize);
if (rSize==0) {
printf("%s : original size unknown \n", fname);
printf("%s : original size unknown. Use streaming decompression instead. \n", fname);
exit(5);
}
void* const rBuff = malloc_X((size_t)rSize);

View File

@ -71,9 +71,12 @@ static void decompressFile_orDie(const char* fname)
ZSTD_DStream* const dstream = ZSTD_createDStream();
if (dstream==NULL) { fprintf(stderr, "ZSTD_createDStream() error \n"); exit(10); }
/* In more complex scenarios, a file may consist of multiple appended frames (ex : pzstd).
* The following example decompresses only the first frame.
* It is compatible with other provided streaming examples */
size_t const initResult = ZSTD_initDStream(dstream);
if (ZSTD_isError(initResult)) { fprintf(stderr, "ZSTD_initDStream() error : %s \n", ZSTD_getErrorName(initResult)); exit(11); }
size_t read, toRead = initResult;
while ( (read = fread_orDie(buffIn, toRead, fin)) ) {
ZSTD_inBuffer input = { buffIn, read, 0 };

View File

@ -31,6 +31,16 @@
# endif /* __STDC_VERSION__ */
#endif
#ifdef _MSC_VER
# define FORCE_NOINLINE static __declspec(noinline)
#else
# ifdef __GNUC__
# define FORCE_NOINLINE static __attribute__((__noinline__))
# else
# define FORCE_NOINLINE static
# endif
#endif
/*-*************************************
* Dependencies

View File

@ -2489,13 +2489,13 @@ static size_t ZSTD_checkDictNCount(short* normalizedCounter, unsigned dictMaxSym
/* Dictionary format :
Magic == ZSTD_DICT_MAGIC (4 bytes)
HUF_writeCTable(256)
FSE_writeNCount(off)
FSE_writeNCount(ml)
FSE_writeNCount(ll)
RepOffsets
Dictionary content
Magic == ZSTD_DICT_MAGIC (4 bytes)
HUF_writeCTable(256)
FSE_writeNCount(off)
FSE_writeNCount(ml)
FSE_writeNCount(ll)
RepOffsets
Dictionary content
*/
/*! ZSTD_loadDictEntropyStats() :
@return : size read from dictionary
@ -2839,6 +2839,7 @@ struct ZSTD_CStream_s {
ZSTD_cStreamStage stage;
U32 checksum;
U32 frameEnded;
ZSTD_parameters params;
ZSTD_customMem customMem;
}; /* typedef'd to ZSTD_CStream within "zstd.h" */
@ -2884,7 +2885,10 @@ size_t ZSTD_CStreamOutSize(void) { return ZSTD_compressBound(ZSTD_BLOCKSIZE_ABSO
size_t ZSTD_resetCStream(ZSTD_CStream* zcs, unsigned long long pledgedSrcSize)
{
CHECK_F(ZSTD_compressBegin_usingCDict(zcs->cctx, zcs->cdict, pledgedSrcSize));
if (zcs->inBuffSize==0) return ERROR(stage_wrong); /* zcs has not been init at least once */
if (zcs->cdict) CHECK_F(ZSTD_compressBegin_usingCDict(zcs->cctx, zcs->cdict, pledgedSrcSize))
else CHECK_F(ZSTD_compressBegin_advanced(zcs->cctx, NULL, 0, zcs->params, pledgedSrcSize));
zcs->inToCompress = 0;
zcs->inBuffPos = 0;
@ -2916,12 +2920,15 @@ size_t ZSTD_initCStream_advanced(ZSTD_CStream* zcs,
if (zcs->outBuff == NULL) return ERROR(memory_allocation);
}
ZSTD_freeCDict(zcs->cdictLocal);
zcs->cdictLocal = ZSTD_createCDict_advanced(dict, dictSize, params, zcs->customMem);
if (zcs->cdictLocal == NULL) return ERROR(memory_allocation);
zcs->cdict = zcs->cdictLocal;
if (dict) {
ZSTD_freeCDict(zcs->cdictLocal);
zcs->cdictLocal = ZSTD_createCDict_advanced(dict, dictSize, params, zcs->customMem);
if (zcs->cdictLocal == NULL) return ERROR(memory_allocation);
zcs->cdict = zcs->cdictLocal;
} else zcs->cdict = NULL;
zcs->checksum = params.fParams.checksumFlag > 0;
zcs->params = params;
return ZSTD_resetCStream(zcs, pledgedSrcSize);
}

View File

@ -34,7 +34,7 @@
* Frames requiring more memory will be rejected.
*/
#ifndef ZSTD_MAXWINDOWSIZE_DEFAULT
# define ZSTD_MAXWINDOWSIZE_DEFAULT (257 << 20) /* 257 MB */
# define ZSTD_MAXWINDOWSIZE_DEFAULT ((1 << ZSTD_WINDOWLOG_MAX) + 1) /* defined within zstd.h */
#endif
@ -111,7 +111,7 @@ struct ZSTD_DCtx_s
BYTE headerBuffer[ZSTD_FRAMEHEADERSIZE_MAX];
}; /* typedef'd to ZSTD_DCtx within "zstd.h" */
size_t ZSTD_sizeof_DCtx (const ZSTD_DCtx* dctx) { if (dctx==NULL) return 0; return sizeof(ZSTD_DCtx); } /* support sizeof on NULL */
size_t ZSTD_sizeof_DCtx (const ZSTD_DCtx* dctx) { return (dctx==NULL) ? 0 : sizeof(ZSTD_DCtx); }
size_t ZSTD_estimateDCtxSize(void) { return sizeof(ZSTD_DCtx); }
@ -847,6 +847,53 @@ static seq_t ZSTD_decodeSequence(seqState_t* seqState)
}
FORCE_NOINLINE
size_t ZSTD_execSequenceLast7(BYTE* op,
BYTE* const oend, seq_t sequence,
const BYTE** litPtr, const BYTE* const litLimit_w,
const BYTE* const base, const BYTE* const vBase, const BYTE* const dictEnd)
{
BYTE* const oLitEnd = op + sequence.litLength;
size_t const sequenceLength = sequence.litLength + sequence.matchLength;
BYTE* const oMatchEnd = op + sequenceLength; /* risk : address space overflow (32-bits) */
BYTE* const oend_w = oend - WILDCOPY_OVERLENGTH;
const BYTE* const iLitEnd = *litPtr + sequence.litLength;
const BYTE* match = oLitEnd - sequence.offset;
/* check */
if (oMatchEnd>oend) return ERROR(dstSize_tooSmall); /* last match must start at a minimum distance of WILDCOPY_OVERLENGTH from oend */
if (iLitEnd > litLimit_w) return ERROR(corruption_detected); /* over-read beyond lit buffer */
if (oLitEnd <= oend_w) return ERROR(GENERIC); /* Precondition */
/* copy literals */
if (op < oend_w) {
ZSTD_wildcopy(op, *litPtr, oend_w - op);
*litPtr += oend_w - op;
op = oend_w;
}
while (op < oLitEnd) *op++ = *(*litPtr)++;
/* copy Match */
if (sequence.offset > (size_t)(oLitEnd - base)) {
/* offset beyond prefix */
if (sequence.offset > (size_t)(oLitEnd - vBase)) return ERROR(corruption_detected);
match = dictEnd - (base-match);
if (match + sequence.matchLength <= dictEnd) {
memmove(oLitEnd, match, sequence.matchLength);
return sequenceLength;
}
/* span extDict & currentPrefixSegment */
{ size_t const length1 = dictEnd - match;
memmove(oLitEnd, match, length1);
op = oLitEnd + length1;
sequence.matchLength -= length1;
match = base;
} }
while (op < oMatchEnd) *op++ = *match++;
return sequenceLength;
}
FORCE_INLINE
size_t ZSTD_execSequence(BYTE* op,
BYTE* const oend, seq_t sequence,
@ -861,8 +908,9 @@ size_t ZSTD_execSequence(BYTE* op,
const BYTE* match = oLitEnd - sequence.offset;
/* check */
if ((oLitEnd>oend_w) | (oMatchEnd>oend)) return ERROR(dstSize_tooSmall); /* last match must start at a minimum distance of WILDCOPY_OVERLENGTH from oend */
if (oMatchEnd>oend) return ERROR(dstSize_tooSmall); /* last match must start at a minimum distance of WILDCOPY_OVERLENGTH from oend */
if (iLitEnd > litLimit_w) return ERROR(corruption_detected); /* over-read beyond lit buffer */
if (oLitEnd>oend_w) return ZSTD_execSequenceLast7(op, oend, sequence, litPtr, litLimit_w, base, vBase, dictEnd);
/* copy Literals */
ZSTD_copy8(op, *litPtr);
@ -887,7 +935,8 @@ size_t ZSTD_execSequence(BYTE* op,
sequence.matchLength -= length1;
match = base;
if (op > oend_w) {
while (op < oMatchEnd) *op++ = *match++;
U32 i;
for (i = 0; i < sequence.matchLength; ++i) op[i] = match[i];
return sequenceLength;
}
} }

View File

@ -460,7 +460,7 @@ ZSTDLIB_API size_t ZSTD_initCStream_usingDict(ZSTD_CStream* zcs, const void* dic
ZSTDLIB_API size_t ZSTD_initCStream_advanced(ZSTD_CStream* zcs, const void* dict, size_t dictSize,
ZSTD_parameters params, unsigned long long pledgedSrcSize); /**< pledgedSrcSize is optional and can be zero == unknown */
ZSTDLIB_API size_t ZSTD_initCStream_usingCDict(ZSTD_CStream* zcs, const ZSTD_CDict* cdict); /**< note : cdict will just be referenced, and must outlive compression session */
ZSTDLIB_API size_t ZSTD_resetCStream(ZSTD_CStream* zcs, unsigned long long pledgedSrcSize); /**< re-use compression parameters from previous init; saves dictionary loading */
ZSTDLIB_API size_t ZSTD_resetCStream(ZSTD_CStream* zcs, unsigned long long pledgedSrcSize); /**< re-use compression parameters from previous init; skip dictionary loading stage; zcs must be init at least once before */
ZSTDLIB_API size_t ZSTD_sizeof_CStream(const ZSTD_CStream* zcs);

View File

@ -35,22 +35,85 @@ It also features a very fast decoder, with speed > 500 MB/s per core.
\fBzstd\fR command line is generally similar to gzip, but features the following differences :
- Source files are preserved by default
It's possible to remove them automatically by using \fB--rm\fR command
- By default, when compressing a single file, \fBzstd\fR displays progress notifications and result summary.
- When compressing a single file, \fBzstd\fR displays progress notifications and result summary by default.
Use \fB-q\fR to turn them off
.SH OPTIONS
.
.SS "Integer suffixes and special values"
In most places where an integer argument is expected,
an optional suffix is supported to easily indicate large integers.
There must be no space between the integer and the suffix.
.TP
.B KiB
Multiply the integer by 1,024 (2^10).
.BR Ki ,
.BR K ,
and
.B KB
are accepted as synonyms for
.BR KiB .
.TP
.B MiB
Multiply the integer by 1,048,576 (2^20).
.BR Mi ,
.BR M ,
and
.B MB
are accepted as synonyms for
.BR MiB .
.
.SS "Operation mode"
If multiple operation mode options are given,
the last one takes effect.
.TP
.BR \-z ", " \-\-compress
Compress.
This is the default operation mode when no operation mode option
is specified and no other operation mode is implied from
the command name (for example,
.B unzstd
implies
.BR \-\-decompress ).
.TP
.BR \-d ", " \-\-decompress ", " \-\-uncompress
Decompress.
.TP
.BR \-t ", " \-\-test
Test the integrity of compressed
.IR files .
This option is equivalent to
.B "\-\-decompress \-\-stdout"
except that the decompressed data is discarded instead of being
written to standard output.
No files are created or removed.
.TP
.B \-b#
benchmark file(s) using compression level #
.TP
.B \--train FILEs
use FILEs as training set to create a dictionary. The training set should contain a lot of small files (> 100).
.
.SS "Operation modifiers"
.TP
.B \-#
# compression level [1-22] (default:3)
# compression level [1-19] (default:3)
.TP
.BR \-d ", " --decompress
decompression
.BR \--ultra
unlocks high compression levels 20+ (maximum 22), using a lot more memory
.TP
.B \-D file
use `file` as Dictionary to compress or decompress FILE(s)
.TP
.BR \--no-dictID
do not store dictionary ID within frame header (dictionary compression).
the decoder will have to rely on implicit knowledge about which dictionary to use, it won't be able to check if it's correct.
.TP
.B \-o file
save result into `file` (only possible with a single INPUT-FILE)
.TP
@ -60,6 +123,11 @@ It also features a very fast decoder, with speed > 500 MB/s per core.
.BR \-c ", " --stdout
force write to standard output, even if it is the console
.TP
.BR \--[no-]sparse
enable / disable sparse FS support, to make files with many zeroes smaller on disk.
default : enabled when output is into a file, and disabled when output is stdout.
This setting overrides default and can force sparse mode over stdout.
.TP
.BR \--rm
remove source file(s) after successful compression or decompression
.TP
@ -83,8 +151,8 @@ It also features a very fast decoder, with speed > 500 MB/s per core.
suppress warnings, interactivity and notifications.
specify twice to suppress errors too.
.TP
.BR \-C ", " --check
add integrity check computed from uncompressed data
.BR \-C ", " --[no-]check
add integrity check computed from uncompressed data (default : enabled)
.TP
.BR \-t ", " --test
Test the integrity of compressed files. This option is equivalent to \fB--decompress --stdout > /dev/null\fR.
@ -103,10 +171,8 @@ It will improve compression ratio of small files.
Typical gains range from ~10% (at 64KB) to x5 better (at <1KB).
.TP
.B \--train FILEs
use FILEs as training set to create a dictionary.
The training set should contain a lot of small files (> 100).
and weight typically 100x the target dictionary size
(for example, 10 MB for a 100 KB dictionary)
use FILEs as training set to create a dictionary. The training set should contain a lot of small files (> 100),
and weight typically 100x the target dictionary size (for example, 10 MB for a 100 KB dictionary)
.TP
.B \-o file
dictionary saved into `file` (default: dictionary)
@ -131,6 +197,9 @@ Typical gains range from ~10% (at 64KB) to x5 better (at <1KB).
.B \-b#
benchmark file(s) using compression level #
.TP
.B \-e#
benchmark file(s) using multiple compression levels, from -b# to -e# (included).
.TP
.B \-i#
minimum evaluation time, in seconds (default : 3s), benchmark mode only
.TP

View File

@ -199,26 +199,24 @@ static unsigned longCommandWArg(const char** stringPtr, const char* longCommand)
return result;
}
typedef enum { zom_compress, zom_decompress, zom_test, zom_bench, zom_train } zstd_operation_mode;
#define CLEAN_RETURN(i) { operationResult = (i); goto _end; }
int main(int argCount, const char* argv[])
{
int argNb,
bench=0,
decode=0,
testmode=0,
forceStdout=0,
main_pause=0,
nextEntryIsDictionary=0,
operationResult=0,
dictBuild=0,
nextArgumentIsOutFileName=0,
nextArgumentIsMaxDict=0,
nextArgumentIsDictID=0,
nextArgumentIsFile=0,
ultra=0,
lastCommand = 0;
zstd_operation_mode operation = zom_compress;
int cLevel = ZSTDCLI_CLEVEL_DEFAULT;
int cLevelLast = 1;
unsigned recursive = 0;
@ -241,7 +239,7 @@ int main(int argCount, const char* argv[])
/* init */
(void)recursive; (void)cLevelLast; /* not used when ZSTD_NOBENCH set */
(void)dictCLevel; (void)dictSelect; (void)dictID; /* not used when ZSTD_NODICT set */
(void)decode; (void)cLevel; (void)testmode; /* not used when ZSTD_NOCOMPRESS set */
(void)cLevel; /* not used when ZSTD_NOCOMPRESS set */
(void)ultra; (void)memLimit; /* not used when ZSTD_NODECOMPRESS set */
if (filenameTable==NULL) { DISPLAY("zstd: %s \n", strerror(errno)); exit(1); }
filenameTable[0] = stdinmark;
@ -253,8 +251,8 @@ int main(int argCount, const char* argv[])
}
/* preset behaviors */
if (!strcmp(programName, ZSTD_UNZSTD)) decode=1;
if (!strcmp(programName, ZSTD_CAT)) { decode=1; forceStdout=1; displayLevel=1; outFileName=stdoutmark; }
if (!strcmp(programName, ZSTD_UNZSTD)) operation=zom_decompress;
if (!strcmp(programName, ZSTD_CAT)) { operation=zom_decompress; forceStdout=1; displayLevel=1; outFileName=stdoutmark; }
/* command switches */
for (argNb=1; argNb<argCount; argNb++) {
@ -265,7 +263,9 @@ int main(int argCount, const char* argv[])
/* long commands (--long-word) */
if (!strcmp(argument, "--")) { nextArgumentIsFile=1; continue; } /* only file names allowed from now on */
if (!strcmp(argument, "--decompress")) { decode=1; continue; }
if (!strcmp(argument, "--compress")) { operation=zom_compress; continue; }
if (!strcmp(argument, "--decompress")) { operation=zom_decompress; continue; }
if (!strcmp(argument, "--uncompress")) { operation=zom_decompress; continue; }
if (!strcmp(argument, "--force")) { FIO_overwriteMode(); continue; }
if (!strcmp(argument, "--version")) { displayOut=stdout; DISPLAY(WELCOME_MESSAGE); CLEAN_RETURN(0); }
if (!strcmp(argument, "--help")) { displayOut=stdout; CLEAN_RETURN(usage_advanced(programName)); }
@ -278,8 +278,8 @@ int main(int argCount, const char* argv[])
if (!strcmp(argument, "--no-dictID")) { FIO_setDictIDFlag(0); continue; }
if (!strcmp(argument, "--sparse")) { FIO_setSparseWrite(2); continue; }
if (!strcmp(argument, "--no-sparse")) { FIO_setSparseWrite(0); continue; }
if (!strcmp(argument, "--test")) { testmode=1; decode=1; continue; }
if (!strcmp(argument, "--train")) { dictBuild=1; outFileName=g_defaultDictName; continue; }
if (!strcmp(argument, "--test")) { operation=zom_test; continue; }
if (!strcmp(argument, "--train")) { operation=zom_train; outFileName=g_defaultDictName; continue; }
if (!strcmp(argument, "--maxdict")) { nextArgumentIsMaxDict=1; lastCommand=1; continue; }
if (!strcmp(argument, "--dictID")) { nextArgumentIsDictID=1; lastCommand=1; continue; }
if (!strcmp(argument, "--keep")) { FIO_setRemoveSrcFile(0); continue; }
@ -323,8 +323,11 @@ int main(int argCount, const char* argv[])
case 'H':
case 'h': displayOut=stdout; CLEAN_RETURN(usage_advanced(programName));
/* Compress */
case 'z': operation=zom_compress; argument++; break;
/* Decoding */
case 'd': decode=1; argument++; break;
case 'd': operation=zom_decompress; argument++; break;
/* Force stdout, even if stdout==console */
case 'c': forceStdout=1; outFileName=stdoutmark; argument++; break;
@ -348,7 +351,7 @@ int main(int argCount, const char* argv[])
case 'C': argument++; FIO_setChecksumFlag(2); break;
/* test compressed file */
case 't': testmode=1; decode=1; argument++; break;
case 't': operation=zom_test; argument++; break;
/* destination file name */
case 'o': nextArgumentIsOutFileName=1; lastCommand=1; argument++; break;
@ -366,7 +369,7 @@ int main(int argCount, const char* argv[])
#ifndef ZSTD_NOBENCH
/* Benchmark */
case 'b': bench=1; argument++; break;
case 'b': operation=zom_bench; argument++; break;
/* range bench (benchmark only) */
case 'e':
@ -470,7 +473,7 @@ int main(int argCount, const char* argv[])
#endif
/* Check if benchmark is selected */
if (bench) {
if (operation==zom_bench) {
#ifndef ZSTD_NOBENCH
BMK_setNotificationLevel(displayLevel);
BMK_benchFiles(filenameTable, filenameIdx, dictFileName, cLevel, cLevelLast);
@ -479,7 +482,7 @@ int main(int argCount, const char* argv[])
}
/* Check if dictionary builder is selected */
if (dictBuild) {
if (operation==zom_train) {
#ifndef ZSTD_NODICT
ZDICT_params_t dictParams;
memset(&dictParams, 0, sizeof(dictParams));
@ -498,7 +501,7 @@ int main(int argCount, const char* argv[])
/* Check if input/output defined as console; trigger an error in this case */
if (!strcmp(filenameTable[0], stdinmark) && IS_CONSOLE(stdin) ) CLEAN_RETURN(badusage(programName));
if (outFileName && !strcmp(outFileName, stdoutmark) && IS_CONSOLE(stdout) && strcmp(filenameTable[0], stdinmark) && !(forceStdout && decode))
if (outFileName && !strcmp(outFileName, stdoutmark) && IS_CONSOLE(stdout) && strcmp(filenameTable[0], stdinmark) && !(forceStdout && (operation==zom_decompress)))
CLEAN_RETURN(badusage(programName));
/* user-selected output filename, only possible with a single file */
@ -522,7 +525,7 @@ int main(int argCount, const char* argv[])
/* IO Stream/File */
FIO_setNotificationLevel(displayLevel);
if (!decode) {
if (operation==zom_compress) {
#ifndef ZSTD_NOCOMPRESS
if ((filenameIdx==1) && outFileName)
operationResult = FIO_compressFilename(outFileName, filenameTable[0], dictFileName, cLevel);
@ -531,9 +534,9 @@ int main(int argCount, const char* argv[])
#else
DISPLAY("Compression not supported\n");
#endif
} else { /* decompression */
} else { /* decompression or test */
#ifndef ZSTD_NODECOMPRESS
if (testmode) { outFileName=nulmark; FIO_setRemoveSrcFile(0); } /* test mode */
if (operation==zom_test) { outFileName=nulmark; FIO_setRemoveSrcFile(0); } /* test mode */
FIO_setMemLimit(memLimit);
if (filenameIdx==1 && outFileName)
operationResult = FIO_decompressFilename(outFileName, filenameTable[0], dictFileName);

View File

@ -79,10 +79,10 @@ zstd32:
zstd_nolegacy:
$(MAKE) -C $(PRGDIR) $@
fullbench : $(ZSTD_FILES) $(ZBUFF_FILES) $(PRGDIR)/datagen.c fullbench.c
fullbench : $(ZSTD_FILES) $(PRGDIR)/datagen.c fullbench.c
$(CC) $(FLAGS) $^ -o $@$(EXT)
fullbench32 : $(ZSTD_FILES) $(ZBUFF_FILES) $(PRGDIR)/datagen.c fullbench.c
fullbench32 : $(ZSTD_FILES) $(PRGDIR)/datagen.c fullbench.c
$(CC) -m32 $(FLAGS) $^ -o $@$(EXT)
fuzzer : CPPFLAGS += -I$(ZSTDDIR)/dictBuilder

View File

@ -22,7 +22,6 @@
#include "zstd.h" /* ZSTD_VERSION_STRING */
#define FSE_STATIC_LINKING_ONLY /* FSE_DTABLE_SIZE_U32 */
#include "fse.h"
#include "zbuff.h"
#include "datagen.h"
@ -130,29 +129,39 @@ size_t local_ZSTD_decodeSeqHeaders(void* dst, size_t dstSize, void* buff2, const
}
static ZBUFF_CCtx* g_zbcc = NULL;
size_t local_ZBUFF_compress(void* dst, size_t dstCapacity, void* buff2, const void* src, size_t srcSize)
static ZSTD_CStream* g_cstream= NULL;
size_t local_ZSTD_compressStream(void* dst, size_t dstCapacity, void* buff2, const void* src, size_t srcSize)
{
size_t compressedSize;
size_t srcRead = srcSize, dstWritten = dstCapacity;
ZSTD_outBuffer buffOut;
ZSTD_inBuffer buffIn;
(void)buff2;
ZBUFF_compressInit(g_zbcc, 1);
ZBUFF_compressContinue(g_zbcc, dst, &dstWritten, src, &srcRead);
compressedSize = dstWritten;
dstWritten = dstCapacity-compressedSize;
ZBUFF_compressEnd(g_zbcc, ((char*)dst)+compressedSize, &dstWritten);
compressedSize += dstWritten;
return compressedSize;
ZSTD_initCStream(g_cstream, 1);
buffOut.dst = dst;
buffOut.size = dstCapacity;
buffOut.pos = 0;
buffIn.src = src;
buffIn.size = srcSize;
buffIn.pos = 0;
ZSTD_compressStream(g_cstream, &buffOut, &buffIn);
ZSTD_endStream(g_cstream, &buffOut);
return buffOut.pos;
}
static ZBUFF_DCtx* g_zbdc = NULL;
static size_t local_ZBUFF_decompress(void* dst, size_t dstCapacity, void* buff2, const void* src, size_t srcSize)
static ZSTD_DStream* g_dstream= NULL;
static size_t local_ZSTD_decompressStream(void* dst, size_t dstCapacity, void* buff2, const void* src, size_t srcSize)
{
size_t srcRead = g_cSize, dstWritten = dstCapacity;
ZSTD_outBuffer buffOut;
ZSTD_inBuffer buffIn;
(void)src; (void)srcSize;
ZBUFF_decompressInit(g_zbdc);
ZBUFF_decompressContinue(g_zbdc, dst, &dstWritten, buff2, &srcRead);
return dstWritten;
ZSTD_initDStream(g_dstream);
buffOut.dst = dst;
buffOut.size = dstCapacity;
buffOut.pos = 0;
buffIn.src = buff2;
buffIn.size = g_cSize;
buffIn.pos = 0;
ZSTD_decompressStream(g_dstream, &buffOut, &buffIn);
return buffOut.pos;
}
static ZSTD_CCtx* g_zcc = NULL;
@ -220,10 +229,10 @@ static size_t benchMem(const void* src, size_t srcSize, U32 benchNb)
benchFunction = local_ZSTD_decodeSeqHeaders; benchName = "ZSTD_decodeSeqHeaders";
break;
case 41:
benchFunction = local_ZBUFF_compress; benchName = "ZBUFF_compressContinue";
benchFunction = local_ZSTD_compressStream; benchName = "ZSTD_compressStream";
break;
case 42:
benchFunction = local_ZBUFF_decompress; benchName = "ZBUFF_decompressContinue";
benchFunction = local_ZSTD_decompressStream; benchName = "ZSTD_decompressStream";
break;
default :
return 0;
@ -296,10 +305,10 @@ static size_t benchMem(const void* src, size_t srcSize, U32 benchNb)
break;
}
case 41 :
if (g_zbcc==NULL) g_zbcc = ZBUFF_createCCtx();
if (g_cstream==NULL) g_cstream = ZSTD_createCStream();
break;
case 42 :
if (g_zbdc==NULL) g_zbdc = ZBUFF_createDCtx();
if (g_dstream==NULL) g_dstream = ZSTD_createDStream();
g_cSize = ZSTD_compress(buff2, dstBuffSize, src, srcSize, 1);
break;
@ -311,27 +320,27 @@ static size_t benchMem(const void* src, size_t srcSize, U32 benchNb)
{ size_t i; for (i=0; i<dstBuffSize; i++) dstBuff[i]=(BYTE)i; } /* warming up memory */
{ U32 loopNb;
for (loopNb = 1; loopNb <= g_nbIterations; loopNb++) {
clock_t const timeLoop = TIMELOOP_S * CLOCKS_PER_SEC;
clock_t clockStart;
U32 nbRounds;
size_t benchResult=0;
double averageTime;
{ U32 loopNb;
for (loopNb = 1; loopNb <= g_nbIterations; loopNb++) {
clock_t const timeLoop = TIMELOOP_S * CLOCKS_PER_SEC;
clock_t clockStart;
U32 nbRounds;
size_t benchResult=0;
double averageTime;
DISPLAY("%2i- %-30.30s : \r", loopNb, benchName);
DISPLAY("%2i- %-30.30s : \r", loopNb, benchName);
clockStart = clock();
while (clock() == clockStart);
clockStart = clock();
for (nbRounds=0; BMK_clockSpan(clockStart) < timeLoop; nbRounds++) {
benchResult = benchFunction(dstBuff, dstBuffSize, buff2, src, srcSize);
if (ZSTD_isError(benchResult)) { DISPLAY("ERROR ! %s() => %s !! \n", benchName, ZSTD_getErrorName(benchResult)); exit(1); }
}
averageTime = (((double)BMK_clockSpan(clockStart)) / CLOCKS_PER_SEC) / nbRounds;
if (averageTime < bestTime) bestTime = averageTime;
DISPLAY("%2i- %-30.30s : %7.1f MB/s (%9u)\r", loopNb, benchName, (double)srcSize / (1 MB) / bestTime, (U32)benchResult);
}}
clockStart = clock();
while (clock() == clockStart);
clockStart = clock();
for (nbRounds=0; BMK_clockSpan(clockStart) < timeLoop; nbRounds++) {
benchResult = benchFunction(dstBuff, dstBuffSize, buff2, src, srcSize);
if (ZSTD_isError(benchResult)) { DISPLAY("ERROR ! %s() => %s !! \n", benchName, ZSTD_getErrorName(benchResult)); exit(1); }
}
averageTime = (((double)BMK_clockSpan(clockStart)) / CLOCKS_PER_SEC) / nbRounds;
if (averageTime < bestTime) bestTime = averageTime;
DISPLAY("%2i- %-30.30s : %7.1f MB/s (%9u)\r", loopNb, benchName, (double)srcSize / (1 MB) / bestTime, (U32)benchResult);
} }
DISPLAY("%2u\n", benchNb);
_cleanOut:
@ -466,7 +475,7 @@ int main(int argc, const char** argv)
switch(argument[0])
{
/* Display help on usage */
case 'h' :
case 'h':
case 'H': return usage_advanced(exename);
/* Pause at the end (hidden option) */

View File

@ -258,6 +258,17 @@ $ZSTD -t --rm tmp1.zst
ls -ls tmp1.zst # check file is still present
$ECHO "\n**** benchmark mode tests **** "
$ECHO "bench one file"
./datagen > tmp1
$ZSTD -bi1 tmp1
$ECHO "bench multiple levels"
$ZSTD -i1b1e3 tmp1
$ECHO "with recursive and quiet modes"
$ZSTD -rqi1b1e3 tmp1
$ECHO "\n**** zstd round-trip tests **** "
roundTripTest