Newer less versions appear to have changed how stderr and stdout are showing error messages. hardcode the expected behavior to make the tests pass with any less version. Also set locale to C so that the strings are matching.
729 lines
24 KiB
Python
Executable File
729 lines
24 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# ################################################################
|
|
# Copyright (c) Facebook, Inc.
|
|
# All rights reserved.
|
|
#
|
|
# This source code is licensed under both the BSD-style license (found in the
|
|
# LICENSE file in the root directory of this source tree) and the GPLv2 (found
|
|
# in the COPYING file in the root directory of this source tree).
|
|
# You may select, at your option, one of the above-listed licenses.
|
|
# ##########################################################################
|
|
|
|
import argparse
|
|
import contextlib
|
|
import copy
|
|
import fnmatch
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import typing
|
|
|
|
|
|
ZSTD_SYMLINKS = [
|
|
"zstd",
|
|
"zstdmt",
|
|
"unzstd",
|
|
"zstdcat",
|
|
"zcat",
|
|
"gzip",
|
|
"gunzip",
|
|
"gzcat",
|
|
"lzma",
|
|
"unlzma",
|
|
"xz",
|
|
"unxz",
|
|
"lz4",
|
|
"unlz4",
|
|
]
|
|
|
|
|
|
EXCLUDED_DIRS = {
|
|
"bin",
|
|
"common",
|
|
"scratch",
|
|
}
|
|
|
|
|
|
EXCLUDED_BASENAMES = {
|
|
"setup",
|
|
"setup_once",
|
|
"teardown",
|
|
"teardown_once",
|
|
"README.md",
|
|
"run.py",
|
|
".gitignore",
|
|
}
|
|
|
|
EXCLUDED_SUFFIXES = [
|
|
".exact",
|
|
".glob",
|
|
".ignore",
|
|
".exit",
|
|
]
|
|
|
|
|
|
def exclude_dir(dirname: str) -> bool:
|
|
"""
|
|
Should files under the directory :dirname: be excluded from the test runner?
|
|
"""
|
|
if dirname in EXCLUDED_DIRS:
|
|
return True
|
|
return False
|
|
|
|
|
|
def exclude_file(filename: str) -> bool:
|
|
"""Should the file :filename: be excluded from the test runner?"""
|
|
if filename in EXCLUDED_BASENAMES:
|
|
return True
|
|
for suffix in EXCLUDED_SUFFIXES:
|
|
if filename.endswith(suffix):
|
|
return True
|
|
return False
|
|
|
|
def read_file(filename: str) -> bytes:
|
|
"""Reads the file :filename: and returns the contents as bytes."""
|
|
with open(filename, "rb") as f:
|
|
return f.read()
|
|
|
|
|
|
def diff(a: bytes, b: bytes) -> str:
|
|
"""Returns a diff between two different byte-strings :a: and :b:."""
|
|
assert a != b
|
|
with tempfile.NamedTemporaryFile("wb") as fa:
|
|
fa.write(a)
|
|
fa.flush()
|
|
with tempfile.NamedTemporaryFile("wb") as fb:
|
|
fb.write(b)
|
|
fb.flush()
|
|
|
|
diff_bytes = subprocess.run(["diff", fa.name, fb.name], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL).stdout
|
|
return diff_bytes.decode("utf8")
|
|
|
|
|
|
def pop_line(data: bytes) -> typing.Tuple[typing.Optional[bytes], bytes]:
|
|
"""
|
|
Pop the first line from :data: and returns the first line and the remainder
|
|
of the data as a tuple. If :data: is empty, returns :(None, data):. Otherwise
|
|
the first line always ends in a :\n:, even if it is the last line and :data:
|
|
doesn't end in :\n:.
|
|
"""
|
|
NEWLINE = b"\n"[0]
|
|
|
|
if data == b'':
|
|
return (None, data)
|
|
|
|
newline_idx = data.find(b"\n")
|
|
if newline_idx == -1:
|
|
end_idx = len(data)
|
|
else:
|
|
end_idx = newline_idx + 1
|
|
|
|
line = data[:end_idx]
|
|
data = data[end_idx:]
|
|
|
|
assert len(line) != 0
|
|
if line[-1] != NEWLINE:
|
|
line += NEWLINE
|
|
|
|
return (line, data)
|
|
|
|
|
|
def glob_line_matches(actual: bytes, expect: bytes) -> bool:
|
|
"""
|
|
Does the `actual` line match the expected glob line `expect`?
|
|
"""
|
|
return fnmatch.fnmatchcase(actual.strip(), expect.strip())
|
|
|
|
|
|
def glob_diff(actual: bytes, expect: bytes) -> bytes:
|
|
"""
|
|
Returns None if the :actual: content matches the expected glob :expect:,
|
|
otherwise returns the diff bytes.
|
|
"""
|
|
diff = b''
|
|
actual_line, actual = pop_line(actual)
|
|
expect_line, expect = pop_line(expect)
|
|
while True:
|
|
# Handle end of file conditions - allow extra newlines
|
|
while expect_line is None and actual_line == b"\n":
|
|
actual_line, actual = pop_line(actual)
|
|
while actual_line is None and expect_line == b"\n":
|
|
expect_line, expect = pop_line(expect)
|
|
|
|
if expect_line is None and actual_line is None:
|
|
if diff == b'':
|
|
return None
|
|
return diff
|
|
elif expect_line is None:
|
|
diff += b"---\n"
|
|
while actual_line != None:
|
|
diff += b"> "
|
|
diff += actual_line
|
|
actual_line, actual = pop_line(actual)
|
|
return diff
|
|
elif actual_line is None:
|
|
diff += b"---\n"
|
|
while expect_line != None:
|
|
diff += b"< "
|
|
diff += expect_line
|
|
expect_line, expect = pop_line(expect)
|
|
return diff
|
|
|
|
assert expect_line is not None
|
|
assert actual_line is not None
|
|
|
|
if expect_line == b'...\n':
|
|
next_expect_line, expect = pop_line(expect)
|
|
if next_expect_line is None:
|
|
if diff == b'':
|
|
return None
|
|
return diff
|
|
while not glob_line_matches(actual_line, next_expect_line):
|
|
actual_line, actual = pop_line(actual)
|
|
if actual_line is None:
|
|
diff += b"---\n"
|
|
diff += b"< "
|
|
diff += next_expect_line
|
|
return diff
|
|
expect_line = next_expect_line
|
|
continue
|
|
|
|
if not glob_line_matches(actual_line, expect_line):
|
|
diff += b'---\n'
|
|
diff += b'< ' + expect_line
|
|
diff += b'> ' + actual_line
|
|
|
|
actual_line, actual = pop_line(actual)
|
|
expect_line, expect = pop_line(expect)
|
|
|
|
|
|
class Options:
|
|
"""Options configuring how to run a :TestCase:."""
|
|
def __init__(
|
|
self,
|
|
env: typing.Dict[str, str],
|
|
timeout: typing.Optional[int],
|
|
verbose: bool,
|
|
preserve: bool,
|
|
scratch_dir: str,
|
|
test_dir: str,
|
|
) -> None:
|
|
self.env = env
|
|
self.timeout = timeout
|
|
self.verbose = verbose
|
|
self.preserve = preserve
|
|
self.scratch_dir = scratch_dir
|
|
self.test_dir = test_dir
|
|
|
|
|
|
class TestCase:
|
|
"""
|
|
Logic and state related to running a single test case.
|
|
|
|
1. Initialize the test case.
|
|
2. Launch the test case with :TestCase.launch():.
|
|
This will start the test execution in a subprocess, but
|
|
not wait for completion. So you could launch multiple test
|
|
cases in parallel. This will now print any test output.
|
|
3. Analyze the results with :TestCase.analyze():. This will
|
|
join the test subprocess, check the results against the
|
|
expectations, and print the results to stdout.
|
|
|
|
:TestCase.run(): is also provided which combines the launch & analyze
|
|
steps for single-threaded use-cases.
|
|
|
|
All other methods, prefixed with _, are private helper functions.
|
|
"""
|
|
def __init__(self, test_filename: str, options: Options) -> None:
|
|
"""
|
|
Initialize the :TestCase: for the test located in :test_filename:
|
|
with the given :options:.
|
|
"""
|
|
self._opts = options
|
|
self._test_file = test_filename
|
|
self._test_name = os.path.normpath(
|
|
os.path.relpath(test_filename, start=self._opts.test_dir)
|
|
)
|
|
self._success = {}
|
|
self._message = {}
|
|
self._test_stdin = None
|
|
self._scratch_dir = os.path.abspath(os.path.join(self._opts.scratch_dir, self._test_name))
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
"""Returns the unique name for the test."""
|
|
return self._test_name
|
|
|
|
def launch(self) -> None:
|
|
"""
|
|
Launch the test case as a subprocess, but do not block on completion.
|
|
This allows users to run multiple tests in parallel. Results aren't yet
|
|
printed out.
|
|
"""
|
|
self._launch_test()
|
|
|
|
def analyze(self) -> bool:
|
|
"""
|
|
Must be called after :TestCase.launch():. Joins the test subprocess and
|
|
checks the results against expectations. Finally prints the results to
|
|
stdout and returns the success.
|
|
"""
|
|
self._join_test()
|
|
self._check_exit()
|
|
self._check_stderr()
|
|
self._check_stdout()
|
|
self._analyze_results()
|
|
return self._succeeded
|
|
|
|
def run(self) -> bool:
|
|
"""Shorthand for combining both :TestCase.launch(): and :TestCase.analyze():."""
|
|
self.launch()
|
|
return self.analyze()
|
|
|
|
def _log(self, *args, **kwargs) -> None:
|
|
"""Logs test output."""
|
|
print(file=sys.stdout, *args, **kwargs)
|
|
|
|
def _vlog(self, *args, **kwargs) -> None:
|
|
"""Logs verbose test output."""
|
|
if self._opts.verbose:
|
|
print(file=sys.stdout, *args, **kwargs)
|
|
|
|
def _test_environment(self) -> typing.Dict[str, str]:
|
|
"""
|
|
Returns the environment to be used for the
|
|
test subprocess.
|
|
"""
|
|
# We want to omit ZSTD cli flags so tests will be consistent across environments
|
|
env = {k: v for k, v in os.environ.items() if not k.startswith("ZSTD")}
|
|
for k, v in self._opts.env.items():
|
|
self._vlog(f"${k}='{v}'")
|
|
env[k] = v
|
|
return env
|
|
|
|
def _launch_test(self) -> None:
|
|
"""Launch the test subprocess, but do not join it."""
|
|
args = [os.path.abspath(self._test_file)]
|
|
stdin_name = f"{self._test_file}.stdin"
|
|
if os.path.exists(stdin_name):
|
|
self._test_stdin = open(stdin_name, "rb")
|
|
stdin = self._test_stdin
|
|
else:
|
|
stdin = subprocess.DEVNULL
|
|
cwd = self._scratch_dir
|
|
env = self._test_environment()
|
|
self._test_process = subprocess.Popen(
|
|
args=args,
|
|
stdin=stdin,
|
|
cwd=cwd,
|
|
env=env,
|
|
stderr=subprocess.PIPE,
|
|
stdout=subprocess.PIPE
|
|
)
|
|
|
|
def _join_test(self) -> None:
|
|
"""Join the test process and save stderr, stdout, and the exit code."""
|
|
(stdout, stderr) = self._test_process.communicate(timeout=self._opts.timeout)
|
|
self._output = {}
|
|
self._output["stdout"] = stdout
|
|
self._output["stderr"] = stderr
|
|
self._exit_code = self._test_process.returncode
|
|
self._test_process = None
|
|
if self._test_stdin is not None:
|
|
self._test_stdin.close()
|
|
self._test_stdin = None
|
|
|
|
def _check_output_exact(self, out_name: str, expected: bytes) -> None:
|
|
"""
|
|
Check the output named :out_name: for an exact match against the :expected: content.
|
|
Saves the success and message.
|
|
"""
|
|
check_name = f"check_{out_name}"
|
|
actual = self._output[out_name]
|
|
if actual == expected:
|
|
self._success[check_name] = True
|
|
self._message[check_name] = f"{out_name} matches!"
|
|
else:
|
|
self._success[check_name] = False
|
|
self._message[check_name] = f"{out_name} does not match!\n> diff expected actual\n{diff(expected, actual)}"
|
|
|
|
def _check_output_glob(self, out_name: str, expected: bytes) -> None:
|
|
"""
|
|
Check the output named :out_name: for a glob match against the :expected: glob.
|
|
Saves the success and message.
|
|
"""
|
|
check_name = f"check_{out_name}"
|
|
actual = self._output[out_name]
|
|
diff = glob_diff(actual, expected)
|
|
if diff is None:
|
|
self._success[check_name] = True
|
|
self._message[check_name] = f"{out_name} matches!"
|
|
else:
|
|
utf8_diff = diff.decode('utf8')
|
|
self._success[check_name] = False
|
|
self._message[check_name] = f"{out_name} does not match!\n> diff expected actual\n{utf8_diff}"
|
|
|
|
def _check_output(self, out_name: str) -> None:
|
|
"""
|
|
Checks the output named :out_name: for a match against the expectation.
|
|
We check for a .exact, .glob, and a .ignore file. If none are found we
|
|
expect that the output should be empty.
|
|
|
|
If :Options.preserve: was set then we save the scratch directory and
|
|
save the stderr, stdout, and exit code to the scratch directory for
|
|
debugging.
|
|
"""
|
|
if self._opts.preserve:
|
|
# Save the output to the scratch directory
|
|
actual_name = os.path.join(self._scratch_dir, f"{out_name}")
|
|
with open(actual_name, "wb") as f:
|
|
f.write(self._output[out_name])
|
|
|
|
exact_name = f"{self._test_file}.{out_name}.exact"
|
|
glob_name = f"{self._test_file}.{out_name}.glob"
|
|
ignore_name = f"{self._test_file}.{out_name}.ignore"
|
|
|
|
if os.path.exists(exact_name):
|
|
return self._check_output_exact(out_name, read_file(exact_name))
|
|
elif os.path.exists(glob_name):
|
|
return self._check_output_glob(out_name, read_file(glob_name))
|
|
elif os.path.exists(ignore_name):
|
|
check_name = f"check_{out_name}"
|
|
self._success[check_name] = True
|
|
self._message[check_name] = f"{out_name} ignored!"
|
|
else:
|
|
return self._check_output_exact(out_name, bytes())
|
|
|
|
def _check_stderr(self) -> None:
|
|
"""Checks the stderr output against the expectation."""
|
|
self._check_output("stderr")
|
|
|
|
def _check_stdout(self) -> None:
|
|
"""Checks the stdout output against the expectation."""
|
|
self._check_output("stdout")
|
|
|
|
def _check_exit(self) -> None:
|
|
"""
|
|
Checks the exit code against expectations. If a .exit file
|
|
exists, we expect that the exit code matches the contents.
|
|
Otherwise we expect the exit code to be zero.
|
|
|
|
If :Options.preserve: is set we save the exit code to the
|
|
scratch directory under the filename "exit".
|
|
"""
|
|
if self._opts.preserve:
|
|
exit_name = os.path.join(self._scratch_dir, "exit")
|
|
with open(exit_name, "w") as f:
|
|
f.write(str(self._exit_code) + "\n")
|
|
exit_name = f"{self._test_file}.exit"
|
|
if os.path.exists(exit_name):
|
|
exit_code: int = int(read_file(exit_name))
|
|
else:
|
|
exit_code: int = 0
|
|
if exit_code == self._exit_code:
|
|
self._success["check_exit"] = True
|
|
self._message["check_exit"] = "Exit code matches!"
|
|
else:
|
|
self._success["check_exit"] = False
|
|
self._message["check_exit"] = f"Exit code mismatch! Expected {exit_code} but got {self._exit_code}"
|
|
|
|
def _analyze_results(self) -> None:
|
|
"""
|
|
After all tests have been checked, collect all the successes
|
|
and messages, and print the results to stdout.
|
|
"""
|
|
STATUS = {True: "PASS", False: "FAIL"}
|
|
checks = sorted(self._success.keys())
|
|
self._succeeded = all(self._success.values())
|
|
self._log(f"{STATUS[self._succeeded]}: {self._test_name}")
|
|
|
|
if not self._succeeded or self._opts.verbose:
|
|
for check in checks:
|
|
if self._opts.verbose or not self._success[check]:
|
|
self._log(f"{STATUS[self._success[check]]}: {self._test_name}.{check}")
|
|
self._log(self._message[check])
|
|
|
|
self._log("----------------------------------------")
|
|
|
|
|
|
class TestSuite:
|
|
"""
|
|
Setup & teardown test suite & cases.
|
|
This class is intended to be used as a context manager.
|
|
|
|
TODO: Make setup/teardown failure emit messages, not throw exceptions.
|
|
"""
|
|
def __init__(self, test_directory: str, options: Options) -> None:
|
|
self._opts = options
|
|
self._test_dir = os.path.abspath(test_directory)
|
|
rel_test_dir = os.path.relpath(test_directory, start=self._opts.test_dir)
|
|
assert not rel_test_dir.startswith(os.path.sep)
|
|
self._scratch_dir = os.path.normpath(os.path.join(self._opts.scratch_dir, rel_test_dir))
|
|
|
|
def __enter__(self) -> 'TestSuite':
|
|
self._setup_once()
|
|
return self
|
|
|
|
def __exit__(self, _exc_type, _exc_value, _traceback) -> None:
|
|
self._teardown_once()
|
|
|
|
@contextlib.contextmanager
|
|
def test_case(self, test_basename: str) -> TestCase:
|
|
"""
|
|
Context manager for a test case in the test suite.
|
|
Pass the basename of the test relative to the :test_directory:.
|
|
"""
|
|
assert os.path.dirname(test_basename) == ""
|
|
try:
|
|
self._setup(test_basename)
|
|
test_filename = os.path.join(self._test_dir, test_basename)
|
|
yield TestCase(test_filename, self._opts)
|
|
finally:
|
|
self._teardown(test_basename)
|
|
|
|
def _remove_scratch_dir(self, dir: str) -> None:
|
|
"""Helper to remove a scratch directory with sanity checks"""
|
|
assert "scratch" in dir
|
|
assert dir.startswith(self._scratch_dir)
|
|
assert os.path.exists(dir)
|
|
shutil.rmtree(dir)
|
|
|
|
def _setup_once(self) -> None:
|
|
if os.path.exists(self._scratch_dir):
|
|
self._remove_scratch_dir(self._scratch_dir)
|
|
os.makedirs(self._scratch_dir)
|
|
setup_script = os.path.join(self._test_dir, "setup_once")
|
|
if os.path.exists(setup_script):
|
|
self._run_script(setup_script, cwd=self._scratch_dir)
|
|
|
|
def _teardown_once(self) -> None:
|
|
assert os.path.exists(self._scratch_dir)
|
|
teardown_script = os.path.join(self._test_dir, "teardown_once")
|
|
if os.path.exists(teardown_script):
|
|
self._run_script(teardown_script, cwd=self._scratch_dir)
|
|
if not self._opts.preserve:
|
|
self._remove_scratch_dir(self._scratch_dir)
|
|
|
|
def _setup(self, test_basename: str) -> None:
|
|
test_scratch_dir = os.path.join(self._scratch_dir, test_basename)
|
|
assert not os.path.exists(test_scratch_dir)
|
|
os.makedirs(test_scratch_dir)
|
|
setup_script = os.path.join(self._test_dir, "setup")
|
|
if os.path.exists(setup_script):
|
|
self._run_script(setup_script, cwd=test_scratch_dir)
|
|
|
|
def _teardown(self, test_basename: str) -> None:
|
|
test_scratch_dir = os.path.join(self._scratch_dir, test_basename)
|
|
assert os.path.exists(test_scratch_dir)
|
|
teardown_script = os.path.join(self._test_dir, "teardown")
|
|
if os.path.exists(teardown_script):
|
|
self._run_script(teardown_script, cwd=test_scratch_dir)
|
|
if not self._opts.preserve:
|
|
self._remove_scratch_dir(test_scratch_dir)
|
|
|
|
def _run_script(self, script: str, cwd: str) -> None:
|
|
env = copy.copy(os.environ)
|
|
for k, v in self._opts.env.items():
|
|
env[k] = v
|
|
try:
|
|
subprocess.run(
|
|
args=[script],
|
|
stdin=subprocess.DEVNULL,
|
|
capture_output=True,
|
|
cwd=cwd,
|
|
env=env,
|
|
check=True,
|
|
)
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"{script} failed with exit code {e.returncode}!")
|
|
print(f"stderr:\n{e.stderr}")
|
|
print(f"stdout:\n{e.stdout}")
|
|
raise
|
|
|
|
TestSuites = typing.Dict[str, typing.List[str]]
|
|
|
|
def get_all_tests(options: Options) -> TestSuites:
|
|
"""
|
|
Find all the test in the test directory and return the test suites.
|
|
"""
|
|
test_suites = {}
|
|
for root, dirs, files in os.walk(options.test_dir, topdown=True):
|
|
dirs[:] = [d for d in dirs if not exclude_dir(d)]
|
|
test_cases = []
|
|
for file in files:
|
|
if not exclude_file(file):
|
|
test_cases.append(file)
|
|
assert root == os.path.normpath(root)
|
|
test_suites[root] = test_cases
|
|
return test_suites
|
|
|
|
|
|
def resolve_listed_tests(
|
|
tests: typing.List[str], options: Options
|
|
) -> TestSuites:
|
|
"""
|
|
Resolve the list of tests passed on the command line into their
|
|
respective test suites. Tests can either be paths, or test names
|
|
relative to the test directory.
|
|
"""
|
|
test_suites = {}
|
|
for test in tests:
|
|
if not os.path.exists(test):
|
|
test = os.path.join(options.test_dir, test)
|
|
if not os.path.exists(test):
|
|
raise RuntimeError(f"Test {test} does not exist!")
|
|
|
|
test = os.path.normpath(os.path.abspath(test))
|
|
assert test.startswith(options.test_dir)
|
|
test_suite = os.path.dirname(test)
|
|
test_case = os.path.basename(test)
|
|
test_suites.setdefault(test_suite, []).append(test_case)
|
|
|
|
return test_suites
|
|
|
|
def run_tests(test_suites: TestSuites, options: Options) -> bool:
|
|
"""
|
|
Runs all the test in the :test_suites: with the given :options:.
|
|
Prints the results to stdout.
|
|
"""
|
|
tests = {}
|
|
for test_dir, test_files in test_suites.items():
|
|
with TestSuite(test_dir, options) as test_suite:
|
|
test_files = sorted(set(test_files))
|
|
for test_file in test_files:
|
|
with test_suite.test_case(test_file) as test_case:
|
|
tests[test_case.name] = test_case.run()
|
|
|
|
successes = 0
|
|
for test, status in tests.items():
|
|
if status:
|
|
successes += 1
|
|
else:
|
|
print(f"FAIL: {test}")
|
|
if successes == len(tests):
|
|
print(f"PASSED all {len(tests)} tests!")
|
|
return True
|
|
else:
|
|
print(f"FAILED {len(tests) - successes} / {len(tests)} tests!")
|
|
return False
|
|
|
|
|
|
def setup_zstd_symlink_dir(zstd_symlink_dir: str, zstd: str) -> None:
|
|
assert os.path.join("bin", "symlinks") in zstd_symlink_dir
|
|
if not os.path.exists(zstd_symlink_dir):
|
|
os.makedirs(zstd_symlink_dir)
|
|
for symlink in ZSTD_SYMLINKS:
|
|
path = os.path.join(zstd_symlink_dir, symlink)
|
|
if os.path.exists(path):
|
|
os.remove(path)
|
|
os.symlink(zstd, path)
|
|
|
|
if __name__ == "__main__":
|
|
CLI_TEST_DIR = os.path.dirname(sys.argv[0])
|
|
REPO_DIR = os.path.join(CLI_TEST_DIR, "..", "..")
|
|
PROGRAMS_DIR = os.path.join(REPO_DIR, "programs")
|
|
TESTS_DIR = os.path.join(REPO_DIR, "tests")
|
|
ZSTD_PATH = os.path.join(PROGRAMS_DIR, "zstd")
|
|
ZSTDGREP_PATH = os.path.join(PROGRAMS_DIR, "zstdgrep")
|
|
ZSTDLESS_PATH = os.path.join(PROGRAMS_DIR, "zstdless")
|
|
DATAGEN_PATH = os.path.join(TESTS_DIR, "datagen")
|
|
|
|
parser = argparse.ArgumentParser(
|
|
(
|
|
"Runs the zstd CLI tests. Exits nonzero on failure. Default arguments are\n"
|
|
"generally correct. Pass --preserve to preserve test output for debugging,\n"
|
|
"and --verbose to get verbose test output.\n"
|
|
)
|
|
)
|
|
parser.add_argument(
|
|
"--preserve",
|
|
action="store_true",
|
|
help="Preserve the scratch directory TEST_DIR/scratch/ for debugging purposes."
|
|
)
|
|
parser.add_argument("--verbose", action="store_true", help="Verbose test output.")
|
|
parser.add_argument("--timeout", default=60, type=int, help="Test case timeout in seconds. Set to 0 to disable timeouts.")
|
|
parser.add_argument(
|
|
"--exec-prefix",
|
|
default=None,
|
|
help="Sets the EXEC_PREFIX environment variable. Prefix to invocations of the zstd CLI."
|
|
)
|
|
parser.add_argument(
|
|
"--zstd",
|
|
default=ZSTD_PATH,
|
|
help="Sets the ZSTD_BIN environment variable. Path of the zstd CLI."
|
|
)
|
|
parser.add_argument(
|
|
"--zstdgrep",
|
|
default=ZSTDGREP_PATH,
|
|
help="Sets the ZSTDGREP_BIN environment variable. Path of the zstdgrep CLI."
|
|
)
|
|
parser.add_argument(
|
|
"--zstdless",
|
|
default=ZSTDLESS_PATH,
|
|
help="Sets the ZSTDLESS_BIN environment variable. Path of the zstdless CLI."
|
|
)
|
|
parser.add_argument(
|
|
"--datagen",
|
|
default=DATAGEN_PATH,
|
|
help="Sets the DATAGEN_BIN environment variable. Path to the datagen CLI."
|
|
)
|
|
parser.add_argument(
|
|
"--test-dir",
|
|
default=CLI_TEST_DIR,
|
|
help=(
|
|
"Runs the tests under this directory. "
|
|
"Adds TEST_DIR/bin/ to path. "
|
|
"Scratch directory located in TEST_DIR/scratch/."
|
|
)
|
|
)
|
|
parser.add_argument(
|
|
"tests",
|
|
nargs="*",
|
|
help="Run only these test cases. Can either be paths or test names relative to TEST_DIR/"
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
if args.timeout <= 0:
|
|
args.timeout = None
|
|
|
|
args.test_dir = os.path.normpath(os.path.abspath(args.test_dir))
|
|
bin_dir = os.path.abspath(os.path.join(args.test_dir, "bin"))
|
|
zstd_symlink_dir = os.path.join(bin_dir, "symlinks")
|
|
scratch_dir = os.path.join(args.test_dir, "scratch")
|
|
|
|
setup_zstd_symlink_dir(zstd_symlink_dir, os.path.abspath(args.zstd))
|
|
|
|
env = {}
|
|
if args.exec_prefix is not None:
|
|
env["EXEC_PREFIX"] = args.exec_prefix
|
|
env["ZSTD_SYMLINK_DIR"] = zstd_symlink_dir
|
|
env["DATAGEN_BIN"] = os.path.abspath(args.datagen)
|
|
env["ZSTDGREP_BIN"] = os.path.abspath(args.zstdgrep)
|
|
env["ZSTDLESS_BIN"] = os.path.abspath(args.zstdless)
|
|
env["COMMON"] = os.path.abspath(os.path.join(args.test_dir, "common"))
|
|
env["PATH"] = bin_dir + ":" + os.getenv("PATH", "")
|
|
env["LC_ALL"] = "C"
|
|
|
|
opts = Options(
|
|
env=env,
|
|
timeout=args.timeout,
|
|
verbose=args.verbose,
|
|
preserve=args.preserve,
|
|
test_dir=args.test_dir,
|
|
scratch_dir=scratch_dir,
|
|
)
|
|
|
|
if len(args.tests) == 0:
|
|
tests = get_all_tests(opts)
|
|
else:
|
|
tests = resolve_listed_tests(args.tests, opts)
|
|
|
|
success = run_tests(tests, opts)
|
|
if success:
|
|
sys.exit(0)
|
|
else:
|
|
sys.exit(1)
|
|
|