Dirk Müller 7a3997c21a
Handle newer less versions in zstdless testing
Newer less versions appear to have changed how stderr
and stdout are showing error messages. hardcode the
expected behavior to make the tests pass with any less version.

Also set locale to C so that the strings are matching.
2022-03-10 09:47:33 +01:00

729 lines
24 KiB
Python
Executable File

#!/usr/bin/env python3
# ################################################################
# Copyright (c) Facebook, Inc.
# All rights reserved.
#
# This source code is licensed under both the BSD-style license (found in the
# LICENSE file in the root directory of this source tree) and the GPLv2 (found
# in the COPYING file in the root directory of this source tree).
# You may select, at your option, one of the above-listed licenses.
# ##########################################################################
import argparse
import contextlib
import copy
import fnmatch
import os
import shutil
import subprocess
import sys
import tempfile
import typing
ZSTD_SYMLINKS = [
"zstd",
"zstdmt",
"unzstd",
"zstdcat",
"zcat",
"gzip",
"gunzip",
"gzcat",
"lzma",
"unlzma",
"xz",
"unxz",
"lz4",
"unlz4",
]
EXCLUDED_DIRS = {
"bin",
"common",
"scratch",
}
EXCLUDED_BASENAMES = {
"setup",
"setup_once",
"teardown",
"teardown_once",
"README.md",
"run.py",
".gitignore",
}
EXCLUDED_SUFFIXES = [
".exact",
".glob",
".ignore",
".exit",
]
def exclude_dir(dirname: str) -> bool:
"""
Should files under the directory :dirname: be excluded from the test runner?
"""
if dirname in EXCLUDED_DIRS:
return True
return False
def exclude_file(filename: str) -> bool:
"""Should the file :filename: be excluded from the test runner?"""
if filename in EXCLUDED_BASENAMES:
return True
for suffix in EXCLUDED_SUFFIXES:
if filename.endswith(suffix):
return True
return False
def read_file(filename: str) -> bytes:
"""Reads the file :filename: and returns the contents as bytes."""
with open(filename, "rb") as f:
return f.read()
def diff(a: bytes, b: bytes) -> str:
"""Returns a diff between two different byte-strings :a: and :b:."""
assert a != b
with tempfile.NamedTemporaryFile("wb") as fa:
fa.write(a)
fa.flush()
with tempfile.NamedTemporaryFile("wb") as fb:
fb.write(b)
fb.flush()
diff_bytes = subprocess.run(["diff", fa.name, fb.name], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL).stdout
return diff_bytes.decode("utf8")
def pop_line(data: bytes) -> typing.Tuple[typing.Optional[bytes], bytes]:
"""
Pop the first line from :data: and returns the first line and the remainder
of the data as a tuple. If :data: is empty, returns :(None, data):. Otherwise
the first line always ends in a :\n:, even if it is the last line and :data:
doesn't end in :\n:.
"""
NEWLINE = b"\n"[0]
if data == b'':
return (None, data)
newline_idx = data.find(b"\n")
if newline_idx == -1:
end_idx = len(data)
else:
end_idx = newline_idx + 1
line = data[:end_idx]
data = data[end_idx:]
assert len(line) != 0
if line[-1] != NEWLINE:
line += NEWLINE
return (line, data)
def glob_line_matches(actual: bytes, expect: bytes) -> bool:
"""
Does the `actual` line match the expected glob line `expect`?
"""
return fnmatch.fnmatchcase(actual.strip(), expect.strip())
def glob_diff(actual: bytes, expect: bytes) -> bytes:
"""
Returns None if the :actual: content matches the expected glob :expect:,
otherwise returns the diff bytes.
"""
diff = b''
actual_line, actual = pop_line(actual)
expect_line, expect = pop_line(expect)
while True:
# Handle end of file conditions - allow extra newlines
while expect_line is None and actual_line == b"\n":
actual_line, actual = pop_line(actual)
while actual_line is None and expect_line == b"\n":
expect_line, expect = pop_line(expect)
if expect_line is None and actual_line is None:
if diff == b'':
return None
return diff
elif expect_line is None:
diff += b"---\n"
while actual_line != None:
diff += b"> "
diff += actual_line
actual_line, actual = pop_line(actual)
return diff
elif actual_line is None:
diff += b"---\n"
while expect_line != None:
diff += b"< "
diff += expect_line
expect_line, expect = pop_line(expect)
return diff
assert expect_line is not None
assert actual_line is not None
if expect_line == b'...\n':
next_expect_line, expect = pop_line(expect)
if next_expect_line is None:
if diff == b'':
return None
return diff
while not glob_line_matches(actual_line, next_expect_line):
actual_line, actual = pop_line(actual)
if actual_line is None:
diff += b"---\n"
diff += b"< "
diff += next_expect_line
return diff
expect_line = next_expect_line
continue
if not glob_line_matches(actual_line, expect_line):
diff += b'---\n'
diff += b'< ' + expect_line
diff += b'> ' + actual_line
actual_line, actual = pop_line(actual)
expect_line, expect = pop_line(expect)
class Options:
"""Options configuring how to run a :TestCase:."""
def __init__(
self,
env: typing.Dict[str, str],
timeout: typing.Optional[int],
verbose: bool,
preserve: bool,
scratch_dir: str,
test_dir: str,
) -> None:
self.env = env
self.timeout = timeout
self.verbose = verbose
self.preserve = preserve
self.scratch_dir = scratch_dir
self.test_dir = test_dir
class TestCase:
"""
Logic and state related to running a single test case.
1. Initialize the test case.
2. Launch the test case with :TestCase.launch():.
This will start the test execution in a subprocess, but
not wait for completion. So you could launch multiple test
cases in parallel. This will now print any test output.
3. Analyze the results with :TestCase.analyze():. This will
join the test subprocess, check the results against the
expectations, and print the results to stdout.
:TestCase.run(): is also provided which combines the launch & analyze
steps for single-threaded use-cases.
All other methods, prefixed with _, are private helper functions.
"""
def __init__(self, test_filename: str, options: Options) -> None:
"""
Initialize the :TestCase: for the test located in :test_filename:
with the given :options:.
"""
self._opts = options
self._test_file = test_filename
self._test_name = os.path.normpath(
os.path.relpath(test_filename, start=self._opts.test_dir)
)
self._success = {}
self._message = {}
self._test_stdin = None
self._scratch_dir = os.path.abspath(os.path.join(self._opts.scratch_dir, self._test_name))
@property
def name(self) -> str:
"""Returns the unique name for the test."""
return self._test_name
def launch(self) -> None:
"""
Launch the test case as a subprocess, but do not block on completion.
This allows users to run multiple tests in parallel. Results aren't yet
printed out.
"""
self._launch_test()
def analyze(self) -> bool:
"""
Must be called after :TestCase.launch():. Joins the test subprocess and
checks the results against expectations. Finally prints the results to
stdout and returns the success.
"""
self._join_test()
self._check_exit()
self._check_stderr()
self._check_stdout()
self._analyze_results()
return self._succeeded
def run(self) -> bool:
"""Shorthand for combining both :TestCase.launch(): and :TestCase.analyze():."""
self.launch()
return self.analyze()
def _log(self, *args, **kwargs) -> None:
"""Logs test output."""
print(file=sys.stdout, *args, **kwargs)
def _vlog(self, *args, **kwargs) -> None:
"""Logs verbose test output."""
if self._opts.verbose:
print(file=sys.stdout, *args, **kwargs)
def _test_environment(self) -> typing.Dict[str, str]:
"""
Returns the environment to be used for the
test subprocess.
"""
# We want to omit ZSTD cli flags so tests will be consistent across environments
env = {k: v for k, v in os.environ.items() if not k.startswith("ZSTD")}
for k, v in self._opts.env.items():
self._vlog(f"${k}='{v}'")
env[k] = v
return env
def _launch_test(self) -> None:
"""Launch the test subprocess, but do not join it."""
args = [os.path.abspath(self._test_file)]
stdin_name = f"{self._test_file}.stdin"
if os.path.exists(stdin_name):
self._test_stdin = open(stdin_name, "rb")
stdin = self._test_stdin
else:
stdin = subprocess.DEVNULL
cwd = self._scratch_dir
env = self._test_environment()
self._test_process = subprocess.Popen(
args=args,
stdin=stdin,
cwd=cwd,
env=env,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE
)
def _join_test(self) -> None:
"""Join the test process and save stderr, stdout, and the exit code."""
(stdout, stderr) = self._test_process.communicate(timeout=self._opts.timeout)
self._output = {}
self._output["stdout"] = stdout
self._output["stderr"] = stderr
self._exit_code = self._test_process.returncode
self._test_process = None
if self._test_stdin is not None:
self._test_stdin.close()
self._test_stdin = None
def _check_output_exact(self, out_name: str, expected: bytes) -> None:
"""
Check the output named :out_name: for an exact match against the :expected: content.
Saves the success and message.
"""
check_name = f"check_{out_name}"
actual = self._output[out_name]
if actual == expected:
self._success[check_name] = True
self._message[check_name] = f"{out_name} matches!"
else:
self._success[check_name] = False
self._message[check_name] = f"{out_name} does not match!\n> diff expected actual\n{diff(expected, actual)}"
def _check_output_glob(self, out_name: str, expected: bytes) -> None:
"""
Check the output named :out_name: for a glob match against the :expected: glob.
Saves the success and message.
"""
check_name = f"check_{out_name}"
actual = self._output[out_name]
diff = glob_diff(actual, expected)
if diff is None:
self._success[check_name] = True
self._message[check_name] = f"{out_name} matches!"
else:
utf8_diff = diff.decode('utf8')
self._success[check_name] = False
self._message[check_name] = f"{out_name} does not match!\n> diff expected actual\n{utf8_diff}"
def _check_output(self, out_name: str) -> None:
"""
Checks the output named :out_name: for a match against the expectation.
We check for a .exact, .glob, and a .ignore file. If none are found we
expect that the output should be empty.
If :Options.preserve: was set then we save the scratch directory and
save the stderr, stdout, and exit code to the scratch directory for
debugging.
"""
if self._opts.preserve:
# Save the output to the scratch directory
actual_name = os.path.join(self._scratch_dir, f"{out_name}")
with open(actual_name, "wb") as f:
f.write(self._output[out_name])
exact_name = f"{self._test_file}.{out_name}.exact"
glob_name = f"{self._test_file}.{out_name}.glob"
ignore_name = f"{self._test_file}.{out_name}.ignore"
if os.path.exists(exact_name):
return self._check_output_exact(out_name, read_file(exact_name))
elif os.path.exists(glob_name):
return self._check_output_glob(out_name, read_file(glob_name))
elif os.path.exists(ignore_name):
check_name = f"check_{out_name}"
self._success[check_name] = True
self._message[check_name] = f"{out_name} ignored!"
else:
return self._check_output_exact(out_name, bytes())
def _check_stderr(self) -> None:
"""Checks the stderr output against the expectation."""
self._check_output("stderr")
def _check_stdout(self) -> None:
"""Checks the stdout output against the expectation."""
self._check_output("stdout")
def _check_exit(self) -> None:
"""
Checks the exit code against expectations. If a .exit file
exists, we expect that the exit code matches the contents.
Otherwise we expect the exit code to be zero.
If :Options.preserve: is set we save the exit code to the
scratch directory under the filename "exit".
"""
if self._opts.preserve:
exit_name = os.path.join(self._scratch_dir, "exit")
with open(exit_name, "w") as f:
f.write(str(self._exit_code) + "\n")
exit_name = f"{self._test_file}.exit"
if os.path.exists(exit_name):
exit_code: int = int(read_file(exit_name))
else:
exit_code: int = 0
if exit_code == self._exit_code:
self._success["check_exit"] = True
self._message["check_exit"] = "Exit code matches!"
else:
self._success["check_exit"] = False
self._message["check_exit"] = f"Exit code mismatch! Expected {exit_code} but got {self._exit_code}"
def _analyze_results(self) -> None:
"""
After all tests have been checked, collect all the successes
and messages, and print the results to stdout.
"""
STATUS = {True: "PASS", False: "FAIL"}
checks = sorted(self._success.keys())
self._succeeded = all(self._success.values())
self._log(f"{STATUS[self._succeeded]}: {self._test_name}")
if not self._succeeded or self._opts.verbose:
for check in checks:
if self._opts.verbose or not self._success[check]:
self._log(f"{STATUS[self._success[check]]}: {self._test_name}.{check}")
self._log(self._message[check])
self._log("----------------------------------------")
class TestSuite:
"""
Setup & teardown test suite & cases.
This class is intended to be used as a context manager.
TODO: Make setup/teardown failure emit messages, not throw exceptions.
"""
def __init__(self, test_directory: str, options: Options) -> None:
self._opts = options
self._test_dir = os.path.abspath(test_directory)
rel_test_dir = os.path.relpath(test_directory, start=self._opts.test_dir)
assert not rel_test_dir.startswith(os.path.sep)
self._scratch_dir = os.path.normpath(os.path.join(self._opts.scratch_dir, rel_test_dir))
def __enter__(self) -> 'TestSuite':
self._setup_once()
return self
def __exit__(self, _exc_type, _exc_value, _traceback) -> None:
self._teardown_once()
@contextlib.contextmanager
def test_case(self, test_basename: str) -> TestCase:
"""
Context manager for a test case in the test suite.
Pass the basename of the test relative to the :test_directory:.
"""
assert os.path.dirname(test_basename) == ""
try:
self._setup(test_basename)
test_filename = os.path.join(self._test_dir, test_basename)
yield TestCase(test_filename, self._opts)
finally:
self._teardown(test_basename)
def _remove_scratch_dir(self, dir: str) -> None:
"""Helper to remove a scratch directory with sanity checks"""
assert "scratch" in dir
assert dir.startswith(self._scratch_dir)
assert os.path.exists(dir)
shutil.rmtree(dir)
def _setup_once(self) -> None:
if os.path.exists(self._scratch_dir):
self._remove_scratch_dir(self._scratch_dir)
os.makedirs(self._scratch_dir)
setup_script = os.path.join(self._test_dir, "setup_once")
if os.path.exists(setup_script):
self._run_script(setup_script, cwd=self._scratch_dir)
def _teardown_once(self) -> None:
assert os.path.exists(self._scratch_dir)
teardown_script = os.path.join(self._test_dir, "teardown_once")
if os.path.exists(teardown_script):
self._run_script(teardown_script, cwd=self._scratch_dir)
if not self._opts.preserve:
self._remove_scratch_dir(self._scratch_dir)
def _setup(self, test_basename: str) -> None:
test_scratch_dir = os.path.join(self._scratch_dir, test_basename)
assert not os.path.exists(test_scratch_dir)
os.makedirs(test_scratch_dir)
setup_script = os.path.join(self._test_dir, "setup")
if os.path.exists(setup_script):
self._run_script(setup_script, cwd=test_scratch_dir)
def _teardown(self, test_basename: str) -> None:
test_scratch_dir = os.path.join(self._scratch_dir, test_basename)
assert os.path.exists(test_scratch_dir)
teardown_script = os.path.join(self._test_dir, "teardown")
if os.path.exists(teardown_script):
self._run_script(teardown_script, cwd=test_scratch_dir)
if not self._opts.preserve:
self._remove_scratch_dir(test_scratch_dir)
def _run_script(self, script: str, cwd: str) -> None:
env = copy.copy(os.environ)
for k, v in self._opts.env.items():
env[k] = v
try:
subprocess.run(
args=[script],
stdin=subprocess.DEVNULL,
capture_output=True,
cwd=cwd,
env=env,
check=True,
)
except subprocess.CalledProcessError as e:
print(f"{script} failed with exit code {e.returncode}!")
print(f"stderr:\n{e.stderr}")
print(f"stdout:\n{e.stdout}")
raise
TestSuites = typing.Dict[str, typing.List[str]]
def get_all_tests(options: Options) -> TestSuites:
"""
Find all the test in the test directory and return the test suites.
"""
test_suites = {}
for root, dirs, files in os.walk(options.test_dir, topdown=True):
dirs[:] = [d for d in dirs if not exclude_dir(d)]
test_cases = []
for file in files:
if not exclude_file(file):
test_cases.append(file)
assert root == os.path.normpath(root)
test_suites[root] = test_cases
return test_suites
def resolve_listed_tests(
tests: typing.List[str], options: Options
) -> TestSuites:
"""
Resolve the list of tests passed on the command line into their
respective test suites. Tests can either be paths, or test names
relative to the test directory.
"""
test_suites = {}
for test in tests:
if not os.path.exists(test):
test = os.path.join(options.test_dir, test)
if not os.path.exists(test):
raise RuntimeError(f"Test {test} does not exist!")
test = os.path.normpath(os.path.abspath(test))
assert test.startswith(options.test_dir)
test_suite = os.path.dirname(test)
test_case = os.path.basename(test)
test_suites.setdefault(test_suite, []).append(test_case)
return test_suites
def run_tests(test_suites: TestSuites, options: Options) -> bool:
"""
Runs all the test in the :test_suites: with the given :options:.
Prints the results to stdout.
"""
tests = {}
for test_dir, test_files in test_suites.items():
with TestSuite(test_dir, options) as test_suite:
test_files = sorted(set(test_files))
for test_file in test_files:
with test_suite.test_case(test_file) as test_case:
tests[test_case.name] = test_case.run()
successes = 0
for test, status in tests.items():
if status:
successes += 1
else:
print(f"FAIL: {test}")
if successes == len(tests):
print(f"PASSED all {len(tests)} tests!")
return True
else:
print(f"FAILED {len(tests) - successes} / {len(tests)} tests!")
return False
def setup_zstd_symlink_dir(zstd_symlink_dir: str, zstd: str) -> None:
assert os.path.join("bin", "symlinks") in zstd_symlink_dir
if not os.path.exists(zstd_symlink_dir):
os.makedirs(zstd_symlink_dir)
for symlink in ZSTD_SYMLINKS:
path = os.path.join(zstd_symlink_dir, symlink)
if os.path.exists(path):
os.remove(path)
os.symlink(zstd, path)
if __name__ == "__main__":
CLI_TEST_DIR = os.path.dirname(sys.argv[0])
REPO_DIR = os.path.join(CLI_TEST_DIR, "..", "..")
PROGRAMS_DIR = os.path.join(REPO_DIR, "programs")
TESTS_DIR = os.path.join(REPO_DIR, "tests")
ZSTD_PATH = os.path.join(PROGRAMS_DIR, "zstd")
ZSTDGREP_PATH = os.path.join(PROGRAMS_DIR, "zstdgrep")
ZSTDLESS_PATH = os.path.join(PROGRAMS_DIR, "zstdless")
DATAGEN_PATH = os.path.join(TESTS_DIR, "datagen")
parser = argparse.ArgumentParser(
(
"Runs the zstd CLI tests. Exits nonzero on failure. Default arguments are\n"
"generally correct. Pass --preserve to preserve test output for debugging,\n"
"and --verbose to get verbose test output.\n"
)
)
parser.add_argument(
"--preserve",
action="store_true",
help="Preserve the scratch directory TEST_DIR/scratch/ for debugging purposes."
)
parser.add_argument("--verbose", action="store_true", help="Verbose test output.")
parser.add_argument("--timeout", default=60, type=int, help="Test case timeout in seconds. Set to 0 to disable timeouts.")
parser.add_argument(
"--exec-prefix",
default=None,
help="Sets the EXEC_PREFIX environment variable. Prefix to invocations of the zstd CLI."
)
parser.add_argument(
"--zstd",
default=ZSTD_PATH,
help="Sets the ZSTD_BIN environment variable. Path of the zstd CLI."
)
parser.add_argument(
"--zstdgrep",
default=ZSTDGREP_PATH,
help="Sets the ZSTDGREP_BIN environment variable. Path of the zstdgrep CLI."
)
parser.add_argument(
"--zstdless",
default=ZSTDLESS_PATH,
help="Sets the ZSTDLESS_BIN environment variable. Path of the zstdless CLI."
)
parser.add_argument(
"--datagen",
default=DATAGEN_PATH,
help="Sets the DATAGEN_BIN environment variable. Path to the datagen CLI."
)
parser.add_argument(
"--test-dir",
default=CLI_TEST_DIR,
help=(
"Runs the tests under this directory. "
"Adds TEST_DIR/bin/ to path. "
"Scratch directory located in TEST_DIR/scratch/."
)
)
parser.add_argument(
"tests",
nargs="*",
help="Run only these test cases. Can either be paths or test names relative to TEST_DIR/"
)
args = parser.parse_args()
if args.timeout <= 0:
args.timeout = None
args.test_dir = os.path.normpath(os.path.abspath(args.test_dir))
bin_dir = os.path.abspath(os.path.join(args.test_dir, "bin"))
zstd_symlink_dir = os.path.join(bin_dir, "symlinks")
scratch_dir = os.path.join(args.test_dir, "scratch")
setup_zstd_symlink_dir(zstd_symlink_dir, os.path.abspath(args.zstd))
env = {}
if args.exec_prefix is not None:
env["EXEC_PREFIX"] = args.exec_prefix
env["ZSTD_SYMLINK_DIR"] = zstd_symlink_dir
env["DATAGEN_BIN"] = os.path.abspath(args.datagen)
env["ZSTDGREP_BIN"] = os.path.abspath(args.zstdgrep)
env["ZSTDLESS_BIN"] = os.path.abspath(args.zstdless)
env["COMMON"] = os.path.abspath(os.path.join(args.test_dir, "common"))
env["PATH"] = bin_dir + ":" + os.getenv("PATH", "")
env["LC_ALL"] = "C"
opts = Options(
env=env,
timeout=args.timeout,
verbose=args.verbose,
preserve=args.preserve,
test_dir=args.test_dir,
scratch_dir=scratch_dir,
)
if len(args.tests) == 0:
tests = get_all_tests(opts)
else:
tests = resolve_listed_tests(args.tests, opts)
success = run_tests(tests, opts)
if success:
sys.exit(0)
else:
sys.exit(1)