From d249889b9ff484f25c6085a8e190fcc16ca15ff5 Mon Sep 17 00:00:00 2001 From: Nick Terrell Date: Fri, 23 Sep 2016 12:55:21 -0700 Subject: [PATCH 1/3] [pzstd] Print (de)compression results --- contrib/pzstd/Pzstd.cpp | 55 +++++++++++++++++++++++--------- contrib/pzstd/Pzstd.h | 8 +++-- contrib/pzstd/test/PzstdTest.cpp | 2 ++ 3 files changed, 47 insertions(+), 18 deletions(-) diff --git a/contrib/pzstd/Pzstd.cpp b/contrib/pzstd/Pzstd.cpp index ccd4f626..5de90e8b 100644 --- a/contrib/pzstd/Pzstd.cpp +++ b/contrib/pzstd/Pzstd.cpp @@ -52,16 +52,18 @@ static std::uintmax_t fileSizeOrZero(const std::string &file) { return size; } -static size_t handleOneInput(const Options &options, +static std::uint64_t handleOneInput(const Options &options, const std::string &inputFile, FILE* inputFd, + const std::string &outputFile, FILE* outputFd, ErrorHolder &errorHolder) { auto inputSize = fileSizeOrZero(inputFile); // WorkQueue outlives ThreadPool so in the case of error we are certain // we don't accidently try to call push() on it after it is destroyed. WorkQueue> outs{options.numThreads + 1}; - size_t bytesWritten; + std::uint64_t bytesRead; + std::uint64_t bytesWritten; { // Initialize the thread pool with numThreads + 1 // We add one because the read thread spends most of its time waiting. @@ -71,8 +73,9 @@ static size_t handleOneInput(const Options &options, if (!options.decompress) { // Add a job that reads the input and starts all the compression jobs executor.add( - [&errorHolder, &outs, &executor, inputFd, inputSize, &options] { - asyncCompressChunks( + [&errorHolder, &outs, &executor, inputFd, inputSize, &options, + &bytesRead] { + bytesRead = asyncCompressChunks( errorHolder, outs, executor, @@ -85,13 +88,27 @@ static size_t handleOneInput(const Options &options, bytesWritten = writeFile(errorHolder, outs, outputFd, options.decompress); } else { // Add a job that reads the input and starts all the decompression jobs - executor.add([&errorHolder, &outs, &executor, inputFd] { - asyncDecompressFrames(errorHolder, outs, executor, inputFd); + executor.add([&errorHolder, &outs, &executor, inputFd, &bytesRead] { + bytesRead = asyncDecompressFrames(errorHolder, outs, executor, inputFd); }); // Start writing bytesWritten = writeFile(errorHolder, outs, outputFd, options.decompress); } } + if (options.verbosity > 1 && !errorHolder.hasError()) { + std::string inputFileName = inputFile == "-" ? "stdin" : inputFile; + std::string outputFileName = outputFile == "-" ? "stdout" : outputFile; + if (!options.decompress) { + double ratio = static_cast(bytesWritten) / + static_cast(bytesRead + !bytesRead); + std::fprintf(stderr, "%-20s :%6.2f%% (%6llu => %6llu bytes, %s)\n", + inputFileName.c_str(), ratio * 100, bytesRead, bytesWritten, + outputFileName.c_str()); + } else { + std::fprintf(stderr, "%-20s: %llu bytes \n", + inputFileName.c_str(),bytesWritten); + } + } return bytesWritten; } @@ -185,7 +202,7 @@ int pzstdMain(const Options &options) { } auto closeOutputGuard = makeScopeGuard([&] { std::fclose(outputFd); }); // (de)compress the file - handleOneInput(options, input, inputFd, outputFd, errorHolder); + handleOneInput(options, input, inputFd, outputFile, outputFd, errorHolder); if (errorHolder.hasError()) { continue; } @@ -359,11 +376,13 @@ FileStatus fileStatus(FILE* fd) { * Returns the status of the file after all of the reads have occurred. */ static FileStatus -readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd) { +readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd, + std::uint64_t *totalBytesRead) { Buffer buffer(size); while (!buffer.empty()) { auto bytesRead = std::fread(buffer.data(), 1, std::min(chunkSize, buffer.size()), fd); + *totalBytesRead += bytesRead; queue.push(buffer.splitAt(bytesRead)); auto status = fileStatus(fd); if (status != FileStatus::Continue) { @@ -373,7 +392,7 @@ readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd) { return FileStatus::Continue; } -void asyncCompressChunks( +std::uint64_t asyncCompressChunks( ErrorHolder& errorHolder, WorkQueue>& chunks, ThreadPool& executor, @@ -382,6 +401,7 @@ void asyncCompressChunks( size_t numThreads, ZSTD_parameters params) { auto chunksGuard = makeScopeGuard([&] { chunks.finish(); }); + std::uint64_t bytesRead = 0; // Break the input up into chunks of size `step` and compress each chunk // independently. @@ -401,9 +421,10 @@ void asyncCompressChunks( // Pass the output queue to the writer thread. chunks.push(std::move(out)); // Fill the input queue for the compression job we just started - status = readData(*in, ZSTD_CStreamInSize(), step, fd); + status = readData(*in, ZSTD_CStreamInSize(), step, fd, &bytesRead); } errorHolder.check(status != FileStatus::Error, "Error reading input"); + return bytesRead; } /** @@ -484,12 +505,14 @@ static void decompress( } } -void asyncDecompressFrames( +std::uint64_t asyncDecompressFrames( ErrorHolder& errorHolder, WorkQueue>& frames, ThreadPool& executor, FILE* fd) { auto framesGuard = makeScopeGuard([&] { frames.finish(); }); + std::uint64_t totalBytesRead = 0; + // Split the source up into its component frames. // If we find our recognized skippable frame we know the next frames size // which means that we can decompress each standard frame in independently. @@ -509,6 +532,7 @@ void asyncDecompressFrames( // frameSize is 0 if the frame info can't be decoded. Buffer buffer(SkippableFrame::kSize); auto bytesRead = std::fread(buffer.data(), 1, buffer.size(), fd); + totalBytesRead += bytesRead; status = fileStatus(fd); if (bytesRead == 0 && status != FileStatus::Continue) { break; @@ -533,14 +557,15 @@ void asyncDecompressFrames( // We hit a non SkippableFrame ==> not compressed by pzstd or corrupted // Pass the rest of the source to this decompression task while (status == FileStatus::Continue && !errorHolder.hasError()) { - status = readData(*in, chunkSize, chunkSize, fd); + status = readData(*in, chunkSize, chunkSize, fd, &totalBytesRead); } break; } // Fill the input queue for the decompression job we just started - status = readData(*in, chunkSize, frameSize, fd); + status = readData(*in, chunkSize, frameSize, fd, &totalBytesRead); } errorHolder.check(status != FileStatus::Error, "Error reading input"); + return totalBytesRead; } /// Write `data` to `fd`, returns true iff success. @@ -554,12 +579,12 @@ static bool writeData(ByteRange data, FILE* fd) { return true; } -size_t writeFile( +std::uint64_t writeFile( ErrorHolder& errorHolder, WorkQueue>& outs, FILE* outputFd, bool decompress) { - size_t bytesWritten = 0; + std::uint64_t bytesWritten = 0; std::shared_ptr out; // Grab the output queue for each decompression job (in order). while (outs.pop(out) && !errorHolder.hasError()) { diff --git a/contrib/pzstd/Pzstd.h b/contrib/pzstd/Pzstd.h index 0c21d135..c3b2926b 100644 --- a/contrib/pzstd/Pzstd.h +++ b/contrib/pzstd/Pzstd.h @@ -45,8 +45,9 @@ int pzstdMain(const Options& options); * @param size The size of the input file if known, 0 otherwise * @param numThreads The number of threads in the thread pool * @param parameters The zstd parameters to use for compression + * @returns The number of bytes read from the file */ -void asyncCompressChunks( +std::uint64_t asyncCompressChunks( ErrorHolder& errorHolder, WorkQueue>& chunks, ThreadPool& executor, @@ -66,8 +67,9 @@ void asyncCompressChunks( * as soon as it is available * @param executor The thread pool to run compression jobs in * @param fd The input file descriptor + * @returns The number of bytes read from the file */ -void asyncDecompressFrames( +std::uint64_t asyncDecompressFrames( ErrorHolder& errorHolder, WorkQueue>& frames, ThreadPool& executor, @@ -84,7 +86,7 @@ void asyncDecompressFrames( * @param decompress Are we decompressing? * @returns The number of bytes written */ -std::size_t writeFile( +std::uint64_t writeFile( ErrorHolder& errorHolder, WorkQueue>& outs, FILE* outputFd, diff --git a/contrib/pzstd/test/PzstdTest.cpp b/contrib/pzstd/test/PzstdTest.cpp index 64bcf9ca..c85f73a3 100644 --- a/contrib/pzstd/test/PzstdTest.cpp +++ b/contrib/pzstd/test/PzstdTest.cpp @@ -54,6 +54,7 @@ TEST(Pzstd, SmallSizes) { options.inputFiles = {inputFile}; options.numThreads = numThreads; options.compressionLevel = level; + options.verbosity = 1; ASSERT_TRUE(roundTrip(options)); errorGuard.dismiss(); } @@ -91,6 +92,7 @@ TEST(Pzstd, LargeSizes) { options.inputFiles = {inputFile}; options.numThreads = std::min(numThreads, options.numThreads); options.compressionLevel = level; + options.verbosity = 1; ASSERT_TRUE(roundTrip(options)); errorGuard.dismiss(); } From dac03769082a895dafae6bc629db085c655faa8f Mon Sep 17 00:00:00 2001 From: Nick Terrell Date: Fri, 23 Sep 2016 14:38:25 -0700 Subject: [PATCH 2/3] [pzstd] Add header required for Visual Studios --- contrib/pzstd/Options.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/contrib/pzstd/Options.cpp b/contrib/pzstd/Options.cpp index 5562ee18..2d8d3220 100644 --- a/contrib/pzstd/Options.cpp +++ b/contrib/pzstd/Options.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include From 39801674881e4d18aaf70990bcd038c3d398c6d1 Mon Sep 17 00:00:00 2001 From: Nick Terrell Date: Fri, 23 Sep 2016 15:47:26 -0700 Subject: [PATCH 3/3] [pzstd] Add status update for MB written --- contrib/pzstd/Pzstd.cpp | 32 +++++++++++++++++++++++++++++--- contrib/pzstd/Pzstd.h | 4 +++- 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/contrib/pzstd/Pzstd.cpp b/contrib/pzstd/Pzstd.cpp index 5de90e8b..e0826b9d 100644 --- a/contrib/pzstd/Pzstd.cpp +++ b/contrib/pzstd/Pzstd.cpp @@ -14,6 +14,7 @@ #include "utils/ThreadPool.h" #include "utils/WorkQueue.h" +#include #include #include #include @@ -85,14 +86,16 @@ static std::uint64_t handleOneInput(const Options &options, options.determineParameters()); }); // Start writing - bytesWritten = writeFile(errorHolder, outs, outputFd, options.decompress); + bytesWritten = writeFile(errorHolder, outs, outputFd, options.decompress, + options.verbosity); } else { // Add a job that reads the input and starts all the decompression jobs executor.add([&errorHolder, &outs, &executor, inputFd, &bytesRead] { bytesRead = asyncDecompressFrames(errorHolder, outs, executor, inputFd); }); // Start writing - bytesWritten = writeFile(errorHolder, outs, outputFd, options.decompress); + bytesWritten = writeFile(errorHolder, outs, outputFd, options.decompress, + options.verbosity); } } if (options.verbosity > 1 && !errorHolder.hasError()) { @@ -579,11 +582,33 @@ static bool writeData(ByteRange data, FILE* fd) { return true; } +void updateWritten(int verbosity, std::uint64_t bytesWritten) { + if (verbosity <= 1) { + return; + } + using Clock = std::chrono::system_clock; + static Clock::time_point then; + constexpr std::chrono::milliseconds refreshRate{150}; + + auto now = Clock::now(); + if (now - then > refreshRate) { + then = now; + std::fprintf(stderr, "\rWritten: %u MB ", + static_cast(bytesWritten >> 20)); + } +} + std::uint64_t writeFile( ErrorHolder& errorHolder, WorkQueue>& outs, FILE* outputFd, - bool decompress) { + bool decompress, + int verbosity) { + auto lineClearGuard = makeScopeGuard([verbosity] { + if (verbosity > 1) { + std::fprintf(stderr, "\r%79s\r", ""); + } + }); std::uint64_t bytesWritten = 0; std::shared_ptr out; // Grab the output queue for each decompression job (in order). @@ -608,6 +633,7 @@ std::uint64_t writeFile( return bytesWritten; } bytesWritten += buffer.size(); + updateWritten(verbosity, bytesWritten); } } return bytesWritten; diff --git a/contrib/pzstd/Pzstd.h b/contrib/pzstd/Pzstd.h index c3b2926b..fe44ccfd 100644 --- a/contrib/pzstd/Pzstd.h +++ b/contrib/pzstd/Pzstd.h @@ -84,11 +84,13 @@ std::uint64_t asyncDecompressFrames( * (de)compression job. * @param outputFd The file descriptor to write to * @param decompress Are we decompressing? + * @param verbosity The verbosity level to log at * @returns The number of bytes written */ std::uint64_t writeFile( ErrorHolder& errorHolder, WorkQueue>& outs, FILE* outputFd, - bool decompress); + bool decompress, + int verbosity); }