Merge pull request #387 from terrelln/display

[pzstd] Print (de)compression results
This commit is contained in:
Yann Collet 2016-09-24 11:52:29 +02:00 committed by GitHub
commit 02c17e18d8
4 changed files with 80 additions and 22 deletions

View File

@ -13,6 +13,7 @@
#include <cassert> #include <cassert>
#include <cstdio> #include <cstdio>
#include <cstring> #include <cstring>
#include <iterator>
#include <thread> #include <thread>
#include <util.h> #include <util.h>
#include <vector> #include <vector>

View File

@ -14,6 +14,7 @@
#include "utils/ThreadPool.h" #include "utils/ThreadPool.h"
#include "utils/WorkQueue.h" #include "utils/WorkQueue.h"
#include <chrono>
#include <cstddef> #include <cstddef>
#include <cstdio> #include <cstdio>
#include <memory> #include <memory>
@ -52,16 +53,18 @@ static std::uintmax_t fileSizeOrZero(const std::string &file) {
return size; return size;
} }
static size_t handleOneInput(const Options &options, static std::uint64_t handleOneInput(const Options &options,
const std::string &inputFile, const std::string &inputFile,
FILE* inputFd, FILE* inputFd,
const std::string &outputFile,
FILE* outputFd, FILE* outputFd,
ErrorHolder &errorHolder) { ErrorHolder &errorHolder) {
auto inputSize = fileSizeOrZero(inputFile); auto inputSize = fileSizeOrZero(inputFile);
// WorkQueue outlives ThreadPool so in the case of error we are certain // WorkQueue outlives ThreadPool so in the case of error we are certain
// we don't accidently try to call push() on it after it is destroyed. // we don't accidently try to call push() on it after it is destroyed.
WorkQueue<std::shared_ptr<BufferWorkQueue>> outs{options.numThreads + 1}; WorkQueue<std::shared_ptr<BufferWorkQueue>> outs{options.numThreads + 1};
size_t bytesWritten; std::uint64_t bytesRead;
std::uint64_t bytesWritten;
{ {
// Initialize the thread pool with numThreads + 1 // Initialize the thread pool with numThreads + 1
// We add one because the read thread spends most of its time waiting. // We add one because the read thread spends most of its time waiting.
@ -71,8 +74,9 @@ static size_t handleOneInput(const Options &options,
if (!options.decompress) { if (!options.decompress) {
// Add a job that reads the input and starts all the compression jobs // Add a job that reads the input and starts all the compression jobs
executor.add( executor.add(
[&errorHolder, &outs, &executor, inputFd, inputSize, &options] { [&errorHolder, &outs, &executor, inputFd, inputSize, &options,
asyncCompressChunks( &bytesRead] {
bytesRead = asyncCompressChunks(
errorHolder, errorHolder,
outs, outs,
executor, executor,
@ -82,14 +86,30 @@ static size_t handleOneInput(const Options &options,
options.determineParameters()); options.determineParameters());
}); });
// Start writing // Start writing
bytesWritten = writeFile(errorHolder, outs, outputFd, options.decompress); bytesWritten = writeFile(errorHolder, outs, outputFd, options.decompress,
options.verbosity);
} else { } else {
// Add a job that reads the input and starts all the decompression jobs // Add a job that reads the input and starts all the decompression jobs
executor.add([&errorHolder, &outs, &executor, inputFd] { executor.add([&errorHolder, &outs, &executor, inputFd, &bytesRead] {
asyncDecompressFrames(errorHolder, outs, executor, inputFd); bytesRead = asyncDecompressFrames(errorHolder, outs, executor, inputFd);
}); });
// Start writing // Start writing
bytesWritten = writeFile(errorHolder, outs, outputFd, options.decompress); bytesWritten = writeFile(errorHolder, outs, outputFd, options.decompress,
options.verbosity);
}
}
if (options.verbosity > 1 && !errorHolder.hasError()) {
std::string inputFileName = inputFile == "-" ? "stdin" : inputFile;
std::string outputFileName = outputFile == "-" ? "stdout" : outputFile;
if (!options.decompress) {
double ratio = static_cast<double>(bytesWritten) /
static_cast<double>(bytesRead + !bytesRead);
std::fprintf(stderr, "%-20s :%6.2f%% (%6llu => %6llu bytes, %s)\n",
inputFileName.c_str(), ratio * 100, bytesRead, bytesWritten,
outputFileName.c_str());
} else {
std::fprintf(stderr, "%-20s: %llu bytes \n",
inputFileName.c_str(),bytesWritten);
} }
} }
return bytesWritten; return bytesWritten;
@ -185,7 +205,7 @@ int pzstdMain(const Options &options) {
} }
auto closeOutputGuard = makeScopeGuard([&] { std::fclose(outputFd); }); auto closeOutputGuard = makeScopeGuard([&] { std::fclose(outputFd); });
// (de)compress the file // (de)compress the file
handleOneInput(options, input, inputFd, outputFd, errorHolder); handleOneInput(options, input, inputFd, outputFile, outputFd, errorHolder);
if (errorHolder.hasError()) { if (errorHolder.hasError()) {
continue; continue;
} }
@ -359,11 +379,13 @@ FileStatus fileStatus(FILE* fd) {
* Returns the status of the file after all of the reads have occurred. * Returns the status of the file after all of the reads have occurred.
*/ */
static FileStatus static FileStatus
readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd) { readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd,
std::uint64_t *totalBytesRead) {
Buffer buffer(size); Buffer buffer(size);
while (!buffer.empty()) { while (!buffer.empty()) {
auto bytesRead = auto bytesRead =
std::fread(buffer.data(), 1, std::min(chunkSize, buffer.size()), fd); std::fread(buffer.data(), 1, std::min(chunkSize, buffer.size()), fd);
*totalBytesRead += bytesRead;
queue.push(buffer.splitAt(bytesRead)); queue.push(buffer.splitAt(bytesRead));
auto status = fileStatus(fd); auto status = fileStatus(fd);
if (status != FileStatus::Continue) { if (status != FileStatus::Continue) {
@ -373,7 +395,7 @@ readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd) {
return FileStatus::Continue; return FileStatus::Continue;
} }
void asyncCompressChunks( std::uint64_t asyncCompressChunks(
ErrorHolder& errorHolder, ErrorHolder& errorHolder,
WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks, WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks,
ThreadPool& executor, ThreadPool& executor,
@ -382,6 +404,7 @@ void asyncCompressChunks(
size_t numThreads, size_t numThreads,
ZSTD_parameters params) { ZSTD_parameters params) {
auto chunksGuard = makeScopeGuard([&] { chunks.finish(); }); auto chunksGuard = makeScopeGuard([&] { chunks.finish(); });
std::uint64_t bytesRead = 0;
// Break the input up into chunks of size `step` and compress each chunk // Break the input up into chunks of size `step` and compress each chunk
// independently. // independently.
@ -401,9 +424,10 @@ void asyncCompressChunks(
// Pass the output queue to the writer thread. // Pass the output queue to the writer thread.
chunks.push(std::move(out)); chunks.push(std::move(out));
// Fill the input queue for the compression job we just started // Fill the input queue for the compression job we just started
status = readData(*in, ZSTD_CStreamInSize(), step, fd); status = readData(*in, ZSTD_CStreamInSize(), step, fd, &bytesRead);
} }
errorHolder.check(status != FileStatus::Error, "Error reading input"); errorHolder.check(status != FileStatus::Error, "Error reading input");
return bytesRead;
} }
/** /**
@ -484,12 +508,14 @@ static void decompress(
} }
} }
void asyncDecompressFrames( std::uint64_t asyncDecompressFrames(
ErrorHolder& errorHolder, ErrorHolder& errorHolder,
WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames, WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames,
ThreadPool& executor, ThreadPool& executor,
FILE* fd) { FILE* fd) {
auto framesGuard = makeScopeGuard([&] { frames.finish(); }); auto framesGuard = makeScopeGuard([&] { frames.finish(); });
std::uint64_t totalBytesRead = 0;
// Split the source up into its component frames. // Split the source up into its component frames.
// If we find our recognized skippable frame we know the next frames size // If we find our recognized skippable frame we know the next frames size
// which means that we can decompress each standard frame in independently. // which means that we can decompress each standard frame in independently.
@ -509,6 +535,7 @@ void asyncDecompressFrames(
// frameSize is 0 if the frame info can't be decoded. // frameSize is 0 if the frame info can't be decoded.
Buffer buffer(SkippableFrame::kSize); Buffer buffer(SkippableFrame::kSize);
auto bytesRead = std::fread(buffer.data(), 1, buffer.size(), fd); auto bytesRead = std::fread(buffer.data(), 1, buffer.size(), fd);
totalBytesRead += bytesRead;
status = fileStatus(fd); status = fileStatus(fd);
if (bytesRead == 0 && status != FileStatus::Continue) { if (bytesRead == 0 && status != FileStatus::Continue) {
break; break;
@ -533,14 +560,15 @@ void asyncDecompressFrames(
// We hit a non SkippableFrame ==> not compressed by pzstd or corrupted // We hit a non SkippableFrame ==> not compressed by pzstd or corrupted
// Pass the rest of the source to this decompression task // Pass the rest of the source to this decompression task
while (status == FileStatus::Continue && !errorHolder.hasError()) { while (status == FileStatus::Continue && !errorHolder.hasError()) {
status = readData(*in, chunkSize, chunkSize, fd); status = readData(*in, chunkSize, chunkSize, fd, &totalBytesRead);
} }
break; break;
} }
// Fill the input queue for the decompression job we just started // Fill the input queue for the decompression job we just started
status = readData(*in, chunkSize, frameSize, fd); status = readData(*in, chunkSize, frameSize, fd, &totalBytesRead);
} }
errorHolder.check(status != FileStatus::Error, "Error reading input"); errorHolder.check(status != FileStatus::Error, "Error reading input");
return totalBytesRead;
} }
/// Write `data` to `fd`, returns true iff success. /// Write `data` to `fd`, returns true iff success.
@ -554,12 +582,34 @@ static bool writeData(ByteRange data, FILE* fd) {
return true; return true;
} }
size_t writeFile( void updateWritten(int verbosity, std::uint64_t bytesWritten) {
if (verbosity <= 1) {
return;
}
using Clock = std::chrono::system_clock;
static Clock::time_point then;
constexpr std::chrono::milliseconds refreshRate{150};
auto now = Clock::now();
if (now - then > refreshRate) {
then = now;
std::fprintf(stderr, "\rWritten: %u MB ",
static_cast<std::uint32_t>(bytesWritten >> 20));
}
}
std::uint64_t writeFile(
ErrorHolder& errorHolder, ErrorHolder& errorHolder,
WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs, WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,
FILE* outputFd, FILE* outputFd,
bool decompress) { bool decompress,
size_t bytesWritten = 0; int verbosity) {
auto lineClearGuard = makeScopeGuard([verbosity] {
if (verbosity > 1) {
std::fprintf(stderr, "\r%79s\r", "");
}
});
std::uint64_t bytesWritten = 0;
std::shared_ptr<BufferWorkQueue> out; std::shared_ptr<BufferWorkQueue> out;
// Grab the output queue for each decompression job (in order). // Grab the output queue for each decompression job (in order).
while (outs.pop(out) && !errorHolder.hasError()) { while (outs.pop(out) && !errorHolder.hasError()) {
@ -583,6 +633,7 @@ size_t writeFile(
return bytesWritten; return bytesWritten;
} }
bytesWritten += buffer.size(); bytesWritten += buffer.size();
updateWritten(verbosity, bytesWritten);
} }
} }
return bytesWritten; return bytesWritten;

View File

@ -45,8 +45,9 @@ int pzstdMain(const Options& options);
* @param size The size of the input file if known, 0 otherwise * @param size The size of the input file if known, 0 otherwise
* @param numThreads The number of threads in the thread pool * @param numThreads The number of threads in the thread pool
* @param parameters The zstd parameters to use for compression * @param parameters The zstd parameters to use for compression
* @returns The number of bytes read from the file
*/ */
void asyncCompressChunks( std::uint64_t asyncCompressChunks(
ErrorHolder& errorHolder, ErrorHolder& errorHolder,
WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks, WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks,
ThreadPool& executor, ThreadPool& executor,
@ -66,8 +67,9 @@ void asyncCompressChunks(
* as soon as it is available * as soon as it is available
* @param executor The thread pool to run compression jobs in * @param executor The thread pool to run compression jobs in
* @param fd The input file descriptor * @param fd The input file descriptor
* @returns The number of bytes read from the file
*/ */
void asyncDecompressFrames( std::uint64_t asyncDecompressFrames(
ErrorHolder& errorHolder, ErrorHolder& errorHolder,
WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames, WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames,
ThreadPool& executor, ThreadPool& executor,
@ -82,11 +84,13 @@ void asyncDecompressFrames(
* (de)compression job. * (de)compression job.
* @param outputFd The file descriptor to write to * @param outputFd The file descriptor to write to
* @param decompress Are we decompressing? * @param decompress Are we decompressing?
* @param verbosity The verbosity level to log at
* @returns The number of bytes written * @returns The number of bytes written
*/ */
std::size_t writeFile( std::uint64_t writeFile(
ErrorHolder& errorHolder, ErrorHolder& errorHolder,
WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs, WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,
FILE* outputFd, FILE* outputFd,
bool decompress); bool decompress,
int verbosity);
} }

View File

@ -54,6 +54,7 @@ TEST(Pzstd, SmallSizes) {
options.inputFiles = {inputFile}; options.inputFiles = {inputFile};
options.numThreads = numThreads; options.numThreads = numThreads;
options.compressionLevel = level; options.compressionLevel = level;
options.verbosity = 1;
ASSERT_TRUE(roundTrip(options)); ASSERT_TRUE(roundTrip(options));
errorGuard.dismiss(); errorGuard.dismiss();
} }
@ -91,6 +92,7 @@ TEST(Pzstd, LargeSizes) {
options.inputFiles = {inputFile}; options.inputFiles = {inputFile};
options.numThreads = std::min(numThreads, options.numThreads); options.numThreads = std::min(numThreads, options.numThreads);
options.compressionLevel = level; options.compressionLevel = level;
options.verbosity = 1;
ASSERT_TRUE(roundTrip(options)); ASSERT_TRUE(roundTrip(options));
errorGuard.dismiss(); errorGuard.dismiss();
} }