gallery-dl/test/test_results.py

177 lines
5.7 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2015-2018 Mike Fährmann
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
import os
import sys
import unittest
2017-11-12 20:51:12 +01:00
from gallery_dl import extractor, job, config, exception
2017-01-09 12:27:20 +01:00
# these don't work on travis-ci
TRAVIS_SKIP = {
"exhentai", "kissmanga", "mangafox", "dynastyscans", "nijie",
2018-05-13 11:19:10 +02:00
"archivedmoe", "archiveofsins", "thebarchive", "fireden",
2018-07-06 16:02:40 +02:00
"sankaku", "idolcomplex", "mangahere",
}
# temporary issues, etc.
BROKEN = {
2018-07-13 16:20:14 +02:00
"subapics",
}
class TestExtractorResults(unittest.TestCase):
2016-02-18 15:53:53 +01:00
def setUp(self):
name = "gallerydl"
email = "gallerydl@openaliasbox.org"
config.set(("cache", "file"), ":memory:")
config.set(("downloader", "part"), False)
config.set(("extractor", "username"), name)
config.set(("extractor", "password"), name)
config.set(("extractor", "nijie", "username"), email)
config.set(("extractor", "seiga", "username"), email)
config.set(("extractor", "deviantart", "client-id"), "7777")
config.set(("extractor", "deviantart", "client-secret"),
"ff14994c744d9208e5caeec7aab4a026")
config.set(("extractor", "tumblr", "api-key"),
"0cXoHfIqVzMQcc3HESZSNsVlulGxEXGDTTZCDrRrjaa0jmuTc6")
2017-07-25 14:59:41 +02:00
def tearDown(self):
config.clear()
2017-01-09 12:27:20 +01:00
def _run_test(self, extr, url, result):
if result:
if "options" in result:
for key, value in result["options"]:
config.set(key.split("."), value)
content = "content" in result
else:
content = False
2017-10-25 12:55:36 +02:00
tjob = job.TestJob(url, content=content)
2017-02-26 02:06:56 +01:00
self.assertEqual(extr, tjob.extractor.__class__)
2017-06-13 23:10:42 +02:00
if not result:
return
if "exception" in result:
self.assertRaises(result["exception"], tjob.run)
return
2017-11-12 20:51:12 +01:00
try:
tjob.run()
except exception.HttpError as exc:
try:
if 500 <= exc.args[0].response.status_code < 600:
self.skipTest(exc)
except AttributeError:
2017-11-12 20:51:12 +01:00
pass
raise
# test archive-id uniqueness
self.assertEqual(len(set(tjob.list_archive)), len(tjob.list_archive))
# test extraction results
2015-12-13 03:56:29 +01:00
if "url" in result:
self.assertEqual(result["url"], tjob.hash_url.hexdigest())
2017-10-25 12:55:36 +02:00
if "content" in result:
self.assertEqual(result["content"], tjob.hash_content.hexdigest())
if "keyword" in result:
keyword = result["keyword"]
if isinstance(keyword, dict):
for kwdict in tjob.list_keyword:
self._test_kwdict(kwdict, keyword)
else: # assume SHA1 hash
self.assertEqual(keyword, tjob.hash_keyword.hexdigest())
2016-02-18 15:53:53 +01:00
if "count" in result:
count = result["count"]
if isinstance(count, str):
self.assertRegex(count, r"^ *(==|!=|<|<=|>|>=) *\d+ *$")
expr = "{} {}".format(len(tjob.list_url), count)
self.assertTrue(eval(expr), msg=expr)
else: # assume integer
self.assertEqual(len(tjob.list_url), count)
2016-02-18 15:53:53 +01:00
if "pattern" in result:
for url in tjob.list_url:
self.assertRegex(url, result["pattern"])
2017-01-30 19:40:15 +01:00
def _test_kwdict(self, kwdict, tests):
for key, test in tests.items():
if key.startswith("?"):
key = key[1:]
if key not in kwdict:
continue
self.assertIn(key, kwdict)
value = kwdict[key]
if isinstance(test, dict):
self._test_kwdict(kwdict[key], test)
continue
elif isinstance(test, type):
self.assertIsInstance(value, test)
elif isinstance(test, str) and value.startswith("re:"):
self.assertRegex(value, test[3:])
else:
self.assertEqual(value, test)
def generate_tests():
"""Dynamically generate extractor unittests"""
def _generate_test(extr, tcase):
def test(self):
url, result = tcase
print("\n", url, sep="")
self._run_test(extr, url, result)
return test
# enable selective testing for direct calls
if __name__ == '__main__' and len(sys.argv) > 1:
if sys.argv[1].lower() == "all":
fltr = lambda c, bc: True # noqa: E731
elif sys.argv[1].lower() == "broken":
fltr = lambda c, bc: c in BROKEN # noqa: E731
else:
argv = sys.argv[1:]
fltr = lambda c, bc: c in argv or bc in argv # noqa: E731
del sys.argv[1:]
else:
skip = set(BROKEN)
if "CI" in os.environ and "TRAVIS" in os.environ:
skip |= set(TRAVIS_SKIP)
print("skipping:", ", ".join(skip))
fltr = lambda c, bc: c not in skip # noqa: E731
# filter available extractor classes
extractors = [
extr for extr in extractor.extractors()
if fltr(
extr.category,
extr.basecategory if hasattr(extr, "basecategory") else None
)
]
# add 'test_...' methods
for extr in extractors:
if not hasattr(extr, "test") or not extr.test:
continue
2017-01-09 12:27:20 +01:00
name = "test_" + extr.__name__ + "_"
for num, tcase in enumerate(extr.test, 1):
test = _generate_test(extr, tcase)
test.__name__ = name + str(num)
setattr(TestExtractorResults, test.__name__, test)
2017-01-09 12:27:20 +01:00
generate_tests()
if __name__ == '__main__':
unittest.main(warnings='ignore')