Merge pull request #332 from terrelln/dev

[pzstd] linux build and CLI fixes
This commit is contained in:
Yann Collet 2016-09-02 16:29:49 -07:00 committed by GitHub
commit 6e1b95b01c
9 changed files with 138 additions and 53 deletions

View File

@ -19,7 +19,7 @@ matrix:
packages: packages:
- gcc-4.8 - gcc-4.8
- g++-4.8 - g++-4.8
env: PLATFORM="Ubuntu 12.04 container" CMD="make -C tests test-zstd_nolegacy && make clean && make zlibwrapper && make clean && make cmaketest && make -C contrib/pzstd googletest && make -C contrib/pzstd test && make -C contrib/pzstd clean" env: PLATFORM="Ubuntu 12.04 container" CMD="make -C tests test-zstd_nolegacy && make clean && make zlibwrapper && make clean && make cmaketest && make clean && make -C contrib/pzstd pzstd && make -C contrib/pzstd googletest && make -C contrib/pzstd test && make -C contrib/pzstd clean"
- os: linux - os: linux
sudo: false sudo: false
env: PLATFORM="Ubuntu 12.04 container" CMD="make usan" env: PLATFORM="Ubuntu 12.04 container" CMD="make usan"

View File

@ -9,6 +9,7 @@
#pragma once #pragma once
#include <atomic> #include <atomic>
#include <cassert>
#include <stdexcept> #include <stdexcept>
#include <string> #include <string>
@ -47,9 +48,7 @@ class ErrorHolder {
} }
~ErrorHolder() { ~ErrorHolder() {
if (hasError()) { assert(!hasError());
throw std::logic_error(message_);
}
} }
}; };
} }

View File

@ -10,7 +10,7 @@
ZSTDDIR = ../../lib ZSTDDIR = ../../lib
PROGDIR = ../../programs PROGDIR = ../../programs
CPPFLAGS = -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(ZSTDDIR)/dictBuilder -I$(PROGDIR) -I. CPPFLAGS = -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(PROGDIR) -I.
CXXFLAGS ?= -O3 CXXFLAGS ?= -O3
CXXFLAGS += -std=c++11 CXXFLAGS += -std=c++11
CXXFLAGS += $(MOREFLAGS) CXXFLAGS += $(MOREFLAGS)
@ -55,7 +55,7 @@ main.o: main.cpp *.h utils/*.h
$(CXX) $(FLAGS) -c main.cpp -o $@ $(CXX) $(FLAGS) -c main.cpp -o $@
pzstd: Pzstd.o SkippableFrame.o Options.o main.o libzstd.a pzstd: Pzstd.o SkippableFrame.o Options.o main.o libzstd.a
$(CXX) $(FLAGS) $^ -o $@$(EXT) $(CXX) $(FLAGS) $^ -o $@$(EXT) -lpthread
googletest: googletest:
@git clone https://github.com/google/googletest @git clone https://github.com/google/googletest
@ -70,5 +70,5 @@ clean:
$(MAKE) -C $(ZSTDDIR) clean $(MAKE) -C $(ZSTDDIR) clean
$(MAKE) -C utils/test clean $(MAKE) -C utils/test clean
$(MAKE) -C test clean $(MAKE) -C test clean
@$(RM) -rf googletest/ libzstd.a *.o pzstd$(EXT) @$(RM) -rf libzstd.a *.o pzstd$(EXT)
@echo Cleaning completed @echo Cleaning completed

View File

@ -146,11 +146,8 @@ bool Options::parse(int argc, const char** argv) {
// Determine output file if not specified // Determine output file if not specified
if (outputFile.empty()) { if (outputFile.empty()) {
if (inputFile == "-") { if (inputFile == "-") {
std::fprintf( outputFile = "-";
stderr, } else {
"Invalid arguments: Reading from stdin, but -o not provided.\n");
return false;
}
// Attempt to add/remove zstd extension from the input file // Attempt to add/remove zstd extension from the input file
if (decompress) { if (decompress) {
int stemSize = inputFile.size() - zstdExtension.size(); int stemSize = inputFile.size() - zstdExtension.size();
@ -165,6 +162,7 @@ bool Options::parse(int argc, const char** argv) {
outputFile = inputFile + zstdExtension; outputFile = inputFile + zstdExtension;
} }
} }
}
// Check compression level // Check compression level
{ {
unsigned maxCLevel = ultra ? ZSTD_maxCLevel() : maxNonUltraCompressionLevel; unsigned maxCLevel = ultra ? ZSTD_maxCLevel() : maxNonUltraCompressionLevel;

View File

@ -7,7 +7,7 @@
* of patent rights can be found in the PATENTS file in the same directory. * of patent rights can be found in the PATENTS file in the same directory.
*/ */
#include "SkippableFrame.h" #include "SkippableFrame.h"
#include "common/mem.h" #include "mem.h"
#include "utils/Range.h" #include "utils/Range.h"
#include <cstdio> #include <cstdio>

View File

@ -106,6 +106,16 @@ TEST(Options, ValidInputs) {
Options expected = { Options expected = {
2, 23, 3, false, "silesia.tar", "silesia.tar.pzstd", false, false}; 2, 23, 3, false, "silesia.tar", "silesia.tar.pzstd", false, false};
} }
{
Options options;
std::array<const char*, 3> args = {{nullptr, "-n", "1"}};
EXPECT_TRUE(options.parse(args.size(), args.data()));
}
{
Options options;
std::array<const char*, 4> args = {{nullptr, "-", "-n", "1"}};
EXPECT_TRUE(options.parse(args.size(), args.data()));
}
} }
TEST(Options, BadNumThreads) { TEST(Options, BadNumThreads) {
@ -153,16 +163,6 @@ TEST(Options, BadOutputFile) {
std::array<const char*, 5> args = {{nullptr, "notzst", "-d", "-n", "1"}}; std::array<const char*, 5> args = {{nullptr, "notzst", "-d", "-n", "1"}};
EXPECT_FALSE(options.parse(args.size(), args.data())); EXPECT_FALSE(options.parse(args.size(), args.data()));
} }
{
Options options;
std::array<const char*, 3> args = {{nullptr, "-n", "1"}};
EXPECT_FALSE(options.parse(args.size(), args.data()));
}
{
Options options;
std::array<const char*, 4> args = {{nullptr, "-", "-n", "1"}};
EXPECT_FALSE(options.parse(args.size(), args.data()));
}
} }
TEST(Options, Extras) { TEST(Options, Extras) {

View File

@ -12,6 +12,7 @@
#include <atomic> #include <atomic>
#include <cassert> #include <cassert>
#include <cstddef>
#include <condition_variable> #include <condition_variable>
#include <cstddef> #include <cstddef>
#include <functional> #include <functional>
@ -25,14 +26,29 @@ template <typename T>
class WorkQueue { class WorkQueue {
// Protects all member variable access // Protects all member variable access
std::mutex mutex_; std::mutex mutex_;
std::condition_variable cv_; std::condition_variable readerCv_;
std::condition_variable writerCv_;
std::queue<T> queue_; std::queue<T> queue_;
bool done_; bool done_;
std::size_t maxSize_;
// Must have lock to call this function
bool full() const {
if (maxSize_ == 0) {
return false;
}
return queue_.size() >= maxSize_;
}
public: public:
/// Constructs an empty work queue. /**
WorkQueue() : done_(false) {} * Constructs an empty work queue with an optional max size.
* If `maxSize == 0` the queue size is unbounded.
*
* @param maxSize The maximum allowed size of the work queue.
*/
WorkQueue(std::size_t maxSize = 0) : done_(false), maxSize_(maxSize) {}
/** /**
* Push an item onto the work queue. Notify a single thread that work is * Push an item onto the work queue. Notify a single thread that work is
@ -44,13 +60,16 @@ class WorkQueue {
*/ */
bool push(T item) { bool push(T item) {
{ {
std::lock_guard<std::mutex> lock(mutex_); std::unique_lock<std::mutex> lock(mutex_);
while (full() && !done_) {
writerCv_.wait(lock);
}
if (done_) { if (done_) {
return false; return false;
} }
queue_.push(std::move(item)); queue_.push(std::move(item));
} }
cv_.notify_one(); readerCv_.notify_one();
return true; return true;
} }
@ -64,9 +83,10 @@ class WorkQueue {
* `finish()` has been called. * `finish()` has been called.
*/ */
bool pop(T& item) { bool pop(T& item) {
{
std::unique_lock<std::mutex> lock(mutex_); std::unique_lock<std::mutex> lock(mutex_);
while (queue_.empty() && !done_) { while (queue_.empty() && !done_) {
cv_.wait(lock); readerCv_.wait(lock);
} }
if (queue_.empty()) { if (queue_.empty()) {
assert(done_); assert(done_);
@ -74,6 +94,8 @@ class WorkQueue {
} }
item = std::move(queue_.front()); item = std::move(queue_.front());
queue_.pop(); queue_.pop();
}
writerCv_.notify_one();
return true; return true;
} }
@ -87,18 +109,19 @@ class WorkQueue {
assert(!done_); assert(!done_);
done_ = true; done_ = true;
} }
cv_.notify_all(); readerCv_.notify_all();
writerCv_.notify_all();
} }
/// Blocks until `finish()` has been called (but the queue may not be empty). /// Blocks until `finish()` has been called (but the queue may not be empty).
void waitUntilFinished() { void waitUntilFinished() {
std::unique_lock<std::mutex> lock(mutex_); std::unique_lock<std::mutex> lock(mutex_);
while (!done_) { while (!done_) {
cv_.wait(lock); readerCv_.wait(lock);
// If we were woken by a push, we need to wake a thread waiting on pop(). // If we were woken by a push, we need to wake a thread waiting on pop().
if (!done_) { if (!done_) {
lock.unlock(); lock.unlock();
cv_.notify_one(); readerCv_.notify_one();
lock.lock(); lock.lock();
} }
} }
@ -111,7 +134,7 @@ class BufferWorkQueue {
std::atomic<std::size_t> size_; std::atomic<std::size_t> size_;
public: public:
BufferWorkQueue() : size_(0) {} BufferWorkQueue(std::size_t maxSize = 0) : queue_(maxSize), size_(0) {}
void push(Buffer buffer) { void push(Buffer buffer) {
size_.fetch_add(buffer.size()); size_.fetch_add(buffer.size());

View File

@ -23,7 +23,7 @@ GTEST_LIB ?= -L $(PZSTDDIR)/googletest/build/googlemock/gtest
CPPFLAGS = -I$(PZSTDDIR) $(GTEST_INC) $(GTEST_LIB) CPPFLAGS = -I$(PZSTDDIR) $(GTEST_INC) $(GTEST_LIB)
CXXFLAGS ?= -O3 CXXFLAGS ?= -O3
CXXFLAGS += -std=c++11 CXXFLAGS += -std=c++11
CFLAGS += $(MOREFLAGS) CXXFLAGS += $(MOREFLAGS)
FLAGS = $(CPPFLAGS) $(CXXFLAGS) $(LDFLAGS) FLAGS = $(CPPFLAGS) $(CXXFLAGS) $(LDFLAGS)
%: %.cpp %: %.cpp

View File

@ -145,6 +145,71 @@ TEST(WorkQueue, MPMC) {
} }
} }
TEST(WorkQueue, BoundedSizeWorks) {
WorkQueue<int> queue(1);
int result;
queue.push(5);
queue.pop(result);
queue.push(5);
queue.pop(result);
queue.push(5);
queue.finish();
queue.pop(result);
EXPECT_EQ(5, result);
}
TEST(WorkQueue, BoundedSizePushAfterFinish) {
WorkQueue<int> queue(1);
int result;
queue.push(5);
std::thread pusher([&queue] {
queue.push(6);
});
// Dirtily try and make sure that pusher has run.
std::this_thread::sleep_for(std::chrono::seconds(1));
queue.finish();
EXPECT_TRUE(queue.pop(result));
EXPECT_EQ(5, result);
EXPECT_FALSE(queue.pop(result));
pusher.join();
}
TEST(WorkQueue, BoundedSizeMPMC) {
WorkQueue<int> queue(100);
std::vector<int> results(10000, -1);
std::mutex mutex;
std::vector<std::thread> popperThreads;
for (int i = 0; i < 10; ++i) {
popperThreads.emplace_back(Popper{&queue, results.data(), &mutex});
}
std::vector<std::thread> pusherThreads;
for (int i = 0; i < 100; ++i) {
auto min = i * 100;
auto max = (i + 1) * 100;
pusherThreads.emplace_back(
[ &queue, min, max ] {
for (int i = min; i < max; ++i) {
queue.push(i);
}
});
}
for (auto& thread : pusherThreads) {
thread.join();
}
queue.finish();
for (auto& thread : popperThreads) {
thread.join();
}
for (int i = 0; i < 10000; ++i) {
EXPECT_EQ(i, results[i]);
}
}
TEST(BufferWorkQueue, SizeCalculatedCorrectly) { TEST(BufferWorkQueue, SizeCalculatedCorrectly) {
{ {
BufferWorkQueue queue; BufferWorkQueue queue;