Start porting hooks to wpull 2.0's new plugin system
This commit is contained in:
parent
3f14886435
commit
0d42842fd0
@ -211,24 +211,7 @@ custom_hooks, which_wpull_args_partial, which_wpull_command):
|
||||
id = binascii.hexlify(os.urandom(16)).decode('utf-8')
|
||||
ymd = datetime.datetime.utcnow().isoformat()[:10]
|
||||
no_proto_no_trailing = claim_start_url.split('://', 1)[1].rstrip('/')[:100]
|
||||
warc_name = "{}-{}-{}".format(re.sub('[^-_a-zA-Z0-9%\.,;@+=]', '-', no_proto_no_trailing).lstrip('-'), ymd, id[:8])
|
||||
|
||||
def get_base_wpull_args():
|
||||
return ["-U", ua,
|
||||
"--header=Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||||
"--header=Accept-Language: en-US,en;q=0.5",
|
||||
"--no-check-certificate",
|
||||
"--no-robots",
|
||||
"--inet4-only",
|
||||
"--dns-timeout", "20",
|
||||
"--connect-timeout", "20",
|
||||
"--read-timeout", "900",
|
||||
"--session-timeout", str(86400 * 2),
|
||||
"--tries", "3",
|
||||
"--waitretry", "5",
|
||||
"--max-redirect", "8",
|
||||
"--quiet"
|
||||
]
|
||||
warc_name = "{}-{}-{}".format(re.sub(r'[^-_a-zA-Z0-9%\.,;@+=]', '-', no_proto_no_trailing).lstrip('-'), ymd, id[:8])
|
||||
|
||||
# make absolute because wpull will start in temp/
|
||||
if not dir:
|
||||
@ -237,11 +220,24 @@ custom_hooks, which_wpull_args_partial, which_wpull_command):
|
||||
working_dir = os.path.abspath(dir)
|
||||
|
||||
LIBGRABSITE = os.path.dirname(libgrabsite.__file__)
|
||||
args = get_base_wpull_args() + [
|
||||
args = [
|
||||
"--debug",
|
||||
"-U", ua,
|
||||
"--header=Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||||
"--header=Accept-Language: en-US,en;q=0.5",
|
||||
"--no-check-certificate",
|
||||
"--no-robots",
|
||||
"--inet4-only",
|
||||
"--dns-timeout", "20",
|
||||
"--connect-timeout", "20",
|
||||
"--read-timeout", "900",
|
||||
"--session-timeout", str(86400 * 2),
|
||||
"--tries", "3",
|
||||
"--waitretry", "5",
|
||||
"--max-redirect", "8",
|
||||
"--output-file", "{}/wpull.log".format(working_dir),
|
||||
"--database", "{}/wpull.db".format(working_dir),
|
||||
"--plugin-script", "{}/plugin.py".format(LIBGRABSITE),
|
||||
"--python-script", "{}/wpull_hooks.py".format(LIBGRABSITE),
|
||||
"--plugin-script", "{}/wpull_hooks.py".format(LIBGRABSITE),
|
||||
"--save-cookies", "{}/cookies.txt".format(working_dir),
|
||||
"--delete-after",
|
||||
"--page-requisites",
|
||||
@ -255,7 +251,7 @@ custom_hooks, which_wpull_args_partial, which_wpull_command):
|
||||
"--level", level,
|
||||
"--page-requisites-level", page_requisites_level,
|
||||
"--span-hosts-allow", span_hosts_allow,
|
||||
"--load-cookies", "{}/default_cookies.txt".format(LIBGRABSITE)
|
||||
"--load-cookies", "{}/default_cookies.txt".format(LIBGRABSITE),
|
||||
]
|
||||
|
||||
# psutil is not available on Windows and therefore wpull's --monitor-*
|
||||
@ -393,22 +389,21 @@ custom_hooks, which_wpull_args_partial, which_wpull_command):
|
||||
# files in the cwd, so we must start wpull in temp/ anyway.
|
||||
os.chdir(temp_dir)
|
||||
|
||||
from wpull.app import Application
|
||||
def noop_setup_signal_handlers(self):
|
||||
pass
|
||||
|
||||
# Don't let wpull install a handler for SIGINT or SIGTERM,
|
||||
# because we install our own in wpull_hooks.py.
|
||||
Application.setup_signal_handlers = noop_setup_signal_handlers
|
||||
|
||||
# Modify NO_DOCUMENT_STATUS_CODES
|
||||
# https://github.com/chfoo/wpull/issues/143
|
||||
from wpull.processor.web import WebProcessor
|
||||
WebProcessor.NO_DOCUMENT_STATUS_CODES = \
|
||||
tuple(int(code) for code in permanent_error_status_codes.split(","))
|
||||
|
||||
import wpull.__main__
|
||||
wpull.__main__.main()
|
||||
# Uncomment to debug import-time errors that are otherwise silently
|
||||
# swallowed by wpull's plugin system:
|
||||
#
|
||||
# from libgrabsite import wpull_hooks as _
|
||||
|
||||
import wpull.application.main
|
||||
# Don't let wpull install a handler for SIGINT or SIGTERM,
|
||||
# because we install our own in wpull_hooks.py.
|
||||
wpull.application.main.main(use_signals=False)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
@ -1,55 +0,0 @@
|
||||
import os
|
||||
import functools
|
||||
import hashlib
|
||||
|
||||
from wpull.database.sqltable import SQLiteURLTable
|
||||
from wpull.document.html import HTMLReader
|
||||
import wpull.processor.rule
|
||||
|
||||
from libgrabsite import dupespotter
|
||||
from libgrabsite.dupes import DupesOnDisk
|
||||
|
||||
|
||||
|
||||
class NoFsyncSQLTable(SQLiteURLTable):
|
||||
@classmethod
|
||||
def _apply_pragmas_callback(cls, connection, record):
|
||||
super()._apply_pragmas_callback(connection, record)
|
||||
connection.execute('PRAGMA synchronous=OFF')
|
||||
|
||||
|
||||
class DupeSpottingProcessingRule(wpull.processor.rule.ProcessingRule):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.dupes_db = kwargs.pop('dupes_db', None)
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def scrape_document(self, request, response, url_item):
|
||||
if response.body.size() < 30*1024*1024:
|
||||
dupes_db = self.dupes_db
|
||||
body = response.body.content()
|
||||
if HTMLReader.is_response(response):
|
||||
body = dupespotter.process_body(body, response.request.url)
|
||||
digest = hashlib.md5(body).digest()
|
||||
if dupes_db is not None:
|
||||
dupe_of = dupes_db.get_old_url(digest)
|
||||
else:
|
||||
dupe_of = None
|
||||
if dupe_of is not None:
|
||||
# Don't extract links from pages we've already seen
|
||||
# to avoid loops that descend a directory endlessly
|
||||
print("DUPE {}\n OF {}".format(response.request.url, dupe_of))
|
||||
return
|
||||
else:
|
||||
if dupes_db is not None:
|
||||
dupes_db.set_old_url(digest, response.request.url)
|
||||
|
||||
super().scrape_document(request, response, url_item)
|
||||
|
||||
|
||||
wpull_plugin.factory.class_map['URLTableImplementation'] = NoFsyncSQLTable
|
||||
if int(os.environ["DUPESPOTTER_ENABLED"]):
|
||||
wpull_plugin.factory.class_map['ProcessingRule'] = functools.partial(
|
||||
DupeSpottingProcessingRule,
|
||||
dupes_db=DupesOnDisk(
|
||||
os.path.join(os.environ["GRAB_SITE_WORKING_DIR"], "dupes_db"))
|
||||
)
|
@ -3,16 +3,101 @@ import os
|
||||
import sys
|
||||
import json
|
||||
import time
|
||||
import pprint
|
||||
import signal
|
||||
import random
|
||||
import hashlib
|
||||
import functools
|
||||
import traceback
|
||||
import asyncio
|
||||
from autobahn.asyncio.websocket import WebSocketClientFactory, WebSocketClientProtocol
|
||||
|
||||
from wpull.application.hook import Actions
|
||||
from wpull.application.plugin import WpullPlugin, PluginFunctions, hook, event
|
||||
from wpull.database.sqltable import SQLiteURLTable
|
||||
from wpull.document.html import HTMLReader
|
||||
from wpull.pipeline.app import AppSession
|
||||
from wpull.pipeline.item import URLRecord
|
||||
from wpull.pipeline.session import ItemSession
|
||||
from wpull.protocol.http.request import Response as HTTPResponse
|
||||
from wpull.url import URLInfo
|
||||
import wpull.processor.rule
|
||||
|
||||
from libgrabsite import dupespotter
|
||||
from libgrabsite.dupes import DupesOnDisk
|
||||
from libgrabsite.ignoracle import Ignoracle, parameterize_record_info
|
||||
import libgrabsite
|
||||
|
||||
|
||||
def _extract_response_code(response) -> int:
|
||||
statcode = 0
|
||||
|
||||
try:
|
||||
# duck typing: assume the response is
|
||||
# wpull.protocol.http.request.Response
|
||||
statcode = response.status_code
|
||||
except (AttributeError, KeyError):
|
||||
pass
|
||||
|
||||
try:
|
||||
# duck typing: assume the response is
|
||||
# wpull.protocol.ftp.request.Response
|
||||
statcode = response.reply.code
|
||||
except (AttributeError, KeyError):
|
||||
pass
|
||||
|
||||
return statcode
|
||||
|
||||
|
||||
def _extract_item_size(response) -> int:
|
||||
try:
|
||||
return response.body.size()
|
||||
except Exception:
|
||||
return 0
|
||||
|
||||
|
||||
# class NoFsyncSQLTable(SQLiteURLTable):
|
||||
# @classmethod
|
||||
# def _apply_pragmas_callback(cls, connection, record):
|
||||
# super()._apply_pragmas_callback(connection, record)
|
||||
# connection.execute('PRAGMA synchronous=OFF')
|
||||
|
||||
|
||||
# class DupeSpottingProcessingRule(wpull.processor.rule.ProcessingRule):
|
||||
# def __init__(self, *args, **kwargs):
|
||||
# self.dupes_db = kwargs.pop('dupes_db', None)
|
||||
# super().__init__(*args, **kwargs)
|
||||
|
||||
# def scrape_document(self, request, response, url_item):
|
||||
# if _extract_item_size(response) < 30 * 1024 * 1024:
|
||||
# dupes_db = self.dupes_db
|
||||
# body = response.body.content()
|
||||
# if HTMLReader.is_response(response):
|
||||
# body = dupespotter.process_body(body, response.request.url)
|
||||
# digest = hashlib.md5(body).digest()
|
||||
# if dupes_db is not None:
|
||||
# dupe_of = dupes_db.get_old_url(digest)
|
||||
# else:
|
||||
# dupe_of = None
|
||||
# if dupe_of is not None:
|
||||
# # Don't extract links from pages we've already seen
|
||||
# # to avoid loops that descend a directory endlessly
|
||||
# print("DUPE {}\n OF {}".format(response.request.url, dupe_of))
|
||||
# return
|
||||
# else:
|
||||
# if dupes_db is not None:
|
||||
# dupes_db.set_old_url(digest, response.request.url)
|
||||
|
||||
# super().scrape_document(request, response, url_item)
|
||||
|
||||
|
||||
# wpull_plugin.factory.class_map['URLTableImplementation'] = NoFsyncSQLTable
|
||||
# if int(os.environ["DUPESPOTTER_ENABLED"]):
|
||||
# wpull_plugin.factory.class_map['ProcessingRule'] = functools.partial(
|
||||
# DupeSpottingProcessingRule,
|
||||
# dupes_db=DupesOnDisk(
|
||||
# os.path.join(os.environ["GRAB_SITE_WORKING_DIR"], "dupes_db"))
|
||||
# )
|
||||
|
||||
|
||||
real_stdout_write = sys.stdout.buffer.write
|
||||
real_stderr_write = sys.stderr.buffer.write
|
||||
|
||||
@ -21,36 +106,6 @@ def print_to_terminal(s):
|
||||
sys.stdout.buffer.flush()
|
||||
|
||||
|
||||
class GrabberClientProtocol(WebSocketClientProtocol):
|
||||
def onOpen(self):
|
||||
self.factory.client = self
|
||||
self.send_object({
|
||||
"type": "hello",
|
||||
"mode": "grabber",
|
||||
"url": job_data["url"]
|
||||
})
|
||||
|
||||
def onClose(self, was_clean, code, reason):
|
||||
self.factory.client = None
|
||||
print_to_terminal(
|
||||
"Disconnected from ws:// server with (was_clean, code, reason): {!r}"
|
||||
.format((was_clean, code, reason)))
|
||||
asyncio.ensure_future(connect_to_server())
|
||||
|
||||
def send_object(self, obj):
|
||||
self.sendMessage(json.dumps(obj).encode("utf-8"))
|
||||
|
||||
|
||||
class GrabberClientFactory(WebSocketClientFactory):
|
||||
protocol = GrabberClientProtocol
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.client = None
|
||||
|
||||
|
||||
ws_factory = GrabberClientFactory()
|
||||
|
||||
class Decayer(object):
|
||||
def __init__(self, initial, multiplier, maximum):
|
||||
"""
|
||||
@ -74,26 +129,14 @@ class Decayer(object):
|
||||
return self.current
|
||||
|
||||
|
||||
@asyncio.coroutine
|
||||
def connect_to_server():
|
||||
host = os.environ.get('GRAB_SITE_HOST', '127.0.0.1')
|
||||
port = int(os.environ.get('GRAB_SITE_PORT', 29000))
|
||||
decayer = Decayer(0.25, 1.5, 8)
|
||||
while True:
|
||||
try:
|
||||
yield from loop.create_connection(ws_factory, host, port)
|
||||
except OSError:
|
||||
delay = decayer.decay()
|
||||
print_to_terminal(
|
||||
"Could not connect to ws://{}:{}, retrying in {:.1f} seconds..."
|
||||
.format(host, port, delay))
|
||||
yield from asyncio.sleep(delay)
|
||||
else:
|
||||
print_to_terminal("Connected to ws://{}:{}".format(host, port))
|
||||
break
|
||||
class GrabberClientFactory(object): # WebSocketClientFactory
|
||||
#protocol = GrabberClientProtocol
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
asyncio.ensure_future(connect_to_server())
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.client = None
|
||||
|
||||
ws_factory = GrabberClientFactory()
|
||||
|
||||
|
||||
def graceful_stop_callback():
|
||||
@ -103,15 +146,17 @@ def graceful_stop_callback():
|
||||
|
||||
|
||||
def forceful_stop_callback():
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.stop()
|
||||
|
||||
|
||||
try:
|
||||
loop.add_signal_handler(signal.SIGINT, graceful_stop_callback)
|
||||
loop.add_signal_handler(signal.SIGTERM, forceful_stop_callback)
|
||||
except NotImplementedError:
|
||||
# Not supported on Windows
|
||||
pass
|
||||
def add_signal_handlers():
|
||||
try:
|
||||
loop.add_signal_handler(signal.SIGINT, graceful_stop_callback)
|
||||
loop.add_signal_handler(signal.SIGTERM, forceful_stop_callback)
|
||||
except NotImplementedError:
|
||||
# Not supported on Windows
|
||||
pass
|
||||
|
||||
|
||||
ignore_sets_path = os.path.join(os.path.dirname(libgrabsite.__file__), "ignore_sets")
|
||||
@ -185,12 +230,11 @@ class FileChangedWatcher(object):
|
||||
return changed
|
||||
|
||||
|
||||
igsets_watcher = FileChangedWatcher(os.path.join(working_dir, "igsets"))
|
||||
ignores_watcher = FileChangedWatcher(os.path.join(working_dir, "ignores"))
|
||||
delay_watcher = FileChangedWatcher(os.path.join(working_dir, "delay"))
|
||||
concurrency_watcher = FileChangedWatcher(os.path.join(working_dir, "concurrency"))
|
||||
igsets_watcher = FileChangedWatcher(os.path.join(working_dir, "igsets"))
|
||||
ignores_watcher = FileChangedWatcher(os.path.join(working_dir, "ignores"))
|
||||
delay_watcher = FileChangedWatcher(os.path.join(working_dir, "delay"))
|
||||
concurrency_watcher = FileChangedWatcher(os.path.join(working_dir, "concurrency"))
|
||||
max_content_length_watcher = FileChangedWatcher(os.path.join(working_dir, "max_content_length"))
|
||||
custom_hooks_watcher = FileChangedWatcher(os.path.join(working_dir, "custom_hooks.py"))
|
||||
|
||||
ignoracle = Ignoracle()
|
||||
|
||||
@ -215,15 +259,14 @@ def update_ignoracle():
|
||||
|
||||
ignoracle.set_patterns(ignores)
|
||||
|
||||
update_ignoracle()
|
||||
|
||||
|
||||
def should_ignore_url(url, record_info):
|
||||
"""
|
||||
Returns whether a URL should be ignored.
|
||||
"""
|
||||
parameters = parameterize_record_info(record_info)
|
||||
return ignoracle.ignores(url, **parameters)
|
||||
return False
|
||||
# parameters = parameterize_record_info(record_info)
|
||||
# return ignoracle.ignores(url, **parameters)
|
||||
|
||||
|
||||
all_start_urls = open(os.path.join(working_dir, "all_start_urls")).read().rstrip("\n").split("\n")
|
||||
@ -231,7 +274,7 @@ all_start_urls = open(os.path.join(working_dir, "all_start_urls")).read().rstrip
|
||||
def accept_url(url_info, record_info, verdict, reasons):
|
||||
update_ignoracle()
|
||||
|
||||
url = url_info['url']
|
||||
url = url_info.raw
|
||||
|
||||
if url.startswith('data:'):
|
||||
# data: URLs aren't something you can grab, so drop them to avoid ignore
|
||||
@ -251,14 +294,6 @@ def accept_url(url_info, record_info, verdict, reasons):
|
||||
return verdict
|
||||
|
||||
|
||||
def queued_url(url_info):
|
||||
job_data["items_queued"] += 1
|
||||
|
||||
|
||||
def dequeued_url(url_info, record_info):
|
||||
job_data["items_downloaded"] += 1
|
||||
|
||||
|
||||
job_data = {
|
||||
"ident": open(os.path.join(working_dir, "id")).read().strip(),
|
||||
"url": open(os.path.join(working_dir, "start_url")).read().strip(),
|
||||
@ -282,54 +317,38 @@ job_data = {
|
||||
}
|
||||
|
||||
|
||||
def handle_result(url_info, record_info, error_info={}, http_info={}):
|
||||
#print("url_info", url_info)
|
||||
#print("record_info", record_info)
|
||||
#print("error_info", error_info)
|
||||
#print("http_info", http_info)
|
||||
|
||||
def handle_result(url_info, record_info, error_info, response):
|
||||
update_igoff()
|
||||
|
||||
response_code = 0
|
||||
if http_info.get("response_code"):
|
||||
response_code = http_info.get("response_code")
|
||||
response_code_str = str(http_info["response_code"])
|
||||
if len(response_code_str) == 3 and response_code_str[0] in "12345":
|
||||
job_data["r%sxx" % response_code_str[0]] += 1
|
||||
else:
|
||||
job_data["runk"] += 1
|
||||
response_code = _extract_response_code(response)
|
||||
response_code_str = str(response_code)
|
||||
if len(response_code_str) == 3 and response_code_str[0] in "12345":
|
||||
job_data["r%sxx" % response_code_str[0]] += 1
|
||||
else:
|
||||
job_data["runk"] += 1
|
||||
|
||||
if http_info.get("body"):
|
||||
job_data["bytes_downloaded"] += http_info["body"]["content_size"]
|
||||
job_data["bytes_downloaded"] += _extract_item_size(response)
|
||||
|
||||
stop = should_stop()
|
||||
|
||||
response_message = http_info.get("response_message")
|
||||
response_message = response.reason
|
||||
if error_info:
|
||||
response_code = 0
|
||||
response_message = error_info["error"]
|
||||
response_message = str(error_info)
|
||||
|
||||
if ws_factory.client:
|
||||
ws_factory.client.send_object({
|
||||
"type": "download",
|
||||
"job_data": job_data,
|
||||
"url": url_info["url"],
|
||||
"url": url_info.raw,
|
||||
"response_code": response_code,
|
||||
"response_message": response_message,
|
||||
})
|
||||
|
||||
if stop:
|
||||
return wpull_hook.actions.STOP
|
||||
return Actions.STOP
|
||||
|
||||
return wpull_hook.actions.NORMAL
|
||||
|
||||
|
||||
def handle_response(url_info, record_info, http_info):
|
||||
return handle_result(url_info, record_info, http_info=http_info)
|
||||
|
||||
|
||||
def handle_error(url_info, record_info, error_info):
|
||||
return handle_result(url_info, record_info, error_info=error_info)
|
||||
return Actions.NORMAL
|
||||
|
||||
|
||||
stop_path = os.path.join(working_dir, "stop")
|
||||
@ -343,28 +362,22 @@ igoff_path = os.path.join(working_dir, "igoff")
|
||||
def update_igoff():
|
||||
job_data["suppress_ignore_reports"] = path_exists_with_cache(igoff_path)
|
||||
|
||||
update_igoff()
|
||||
|
||||
|
||||
video_path = os.path.join(working_dir, "video")
|
||||
|
||||
def update_video():
|
||||
job_data["video"] = path_exists_with_cache(video_path)
|
||||
|
||||
update_video()
|
||||
|
||||
|
||||
scrape_path = os.path.join(working_dir, "scrape")
|
||||
|
||||
def update_scrape():
|
||||
def update_scrape(app_session):
|
||||
scrape = path_exists_with_cache(scrape_path)
|
||||
job_data["scrape"] = scrape
|
||||
if not scrape:
|
||||
# Empty the list of scrapers, which will stop scraping for new URLs
|
||||
# but still keep going through what is already in the queue.
|
||||
wpull_hook.factory.get('DemuxDocumentScraper')._document_scrapers = []
|
||||
|
||||
update_scrape()
|
||||
app_session.factory['DemuxDocumentScraper']._document_scrapers = []
|
||||
|
||||
|
||||
def maybe_log_ignore(url, pattern):
|
||||
@ -383,16 +396,16 @@ def maybe_log_ignore(url, pattern):
|
||||
ICY_FIELD_PATTERN = re.compile('icy-|ice-|x-audiocast-', re.IGNORECASE)
|
||||
ICY_VALUE_PATTERN = re.compile('icecast', re.IGNORECASE)
|
||||
|
||||
def get_content_length(response_info):
|
||||
def get_content_length(response):
|
||||
try:
|
||||
return int(list(p for p in response_info["fields"] if p[0] == "Content-Length")[0][1])
|
||||
return int(list(p for p in response.fields.get_all() if p[0] == "Content-Length")[0][1])
|
||||
except (IndexError, ValueError):
|
||||
return -1
|
||||
|
||||
|
||||
def has_content_type_video(response_info):
|
||||
def has_content_type_video(response):
|
||||
try:
|
||||
t = list(p for p in response_info["fields"] if p[0] == "Content-Type")[0][1]
|
||||
t = list(p for p in response.fields.get_all() if p[0] == "Content-Type")[0][1]
|
||||
return t.lower().startswith("video/")
|
||||
except (IndexError, ValueError):
|
||||
return False
|
||||
@ -412,49 +425,45 @@ skipped_videos = open(skipped_videos_path, "w", encoding="utf-8")
|
||||
skipped_max_content_length_path = os.path.join(working_dir, "skipped_max_content_length")
|
||||
skipped_max_content_length = open(skipped_max_content_length_path, "w", encoding="utf-8")
|
||||
|
||||
def handle_pre_response(url_info, url_record, response_info):
|
||||
url = url_info['url']
|
||||
|
||||
update_scrape()
|
||||
def handle_pre_response(url_info, response):
|
||||
url = url_info.raw
|
||||
|
||||
update_max_content_length()
|
||||
if job_data["max_content_length"] != -1:
|
||||
##pprint.pprint(response_info)
|
||||
length = get_content_length(response_info)
|
||||
##print((length, job_data["max_content_length"]))
|
||||
length = get_content_length(response)
|
||||
if length > job_data["max_content_length"]:
|
||||
skipped_max_content_length.write(url + "\n")
|
||||
skipped_max_content_length.flush()
|
||||
maybe_log_ignore(url, '[content-length %d over limit %d]' % (
|
||||
length, job_data["max_content_length"]))
|
||||
return wpull_hook.actions.FINISH
|
||||
return Actions.FINISH
|
||||
|
||||
update_video()
|
||||
if not job_data["video"]:
|
||||
if has_content_type_video(response_info) or has_video_ext(url):
|
||||
if has_content_type_video(response) or has_video_ext(url):
|
||||
skipped_videos.write(url + "\n")
|
||||
skipped_videos.flush()
|
||||
maybe_log_ignore(url, '[video]')
|
||||
return wpull_hook.actions.FINISH
|
||||
return Actions.FINISH
|
||||
|
||||
# Check if server version starts with ICY
|
||||
if response_info.get('version', '') == 'ICY':
|
||||
if response.version == 'ICY':
|
||||
maybe_log_ignore(url, '[icy version]')
|
||||
return wpull_hook.actions.FINISH
|
||||
return Actions.FINISH
|
||||
|
||||
# Loop through all the server headers for matches
|
||||
for field, value in response_info.get('fields', []):
|
||||
for field, value in response.fields.get_all():
|
||||
if ICY_FIELD_PATTERN.match(field):
|
||||
maybe_log_ignore(url, '[icy field]')
|
||||
return wpull_hook.actions.FINISH
|
||||
return Actions.FINISH
|
||||
|
||||
if field == 'Server' and ICY_VALUE_PATTERN.match(value):
|
||||
maybe_log_ignore(url, '[icy server]')
|
||||
return wpull_hook.actions.FINISH
|
||||
return Actions.FINISH
|
||||
|
||||
# Nothing matched, allow download
|
||||
print_to_terminal(url + " ...")
|
||||
return wpull_hook.actions.NORMAL
|
||||
return Actions.NORMAL
|
||||
|
||||
|
||||
def stdout_write_both(message):
|
||||
@ -488,14 +497,6 @@ sys.stdout.buffer.write = stdout_write_both
|
||||
sys.stderr.buffer.write = stderr_write_both
|
||||
|
||||
|
||||
def exit_status(code):
|
||||
print()
|
||||
print("Finished grab {} {} with exit code {}".format(
|
||||
job_data["ident"], job_data["url"], code))
|
||||
print("Output is in directory:\n{}".format(working_dir))
|
||||
return code
|
||||
|
||||
|
||||
@swallow_exception
|
||||
def update_max_content_length():
|
||||
if not max_content_length_watcher.has_changed():
|
||||
@ -503,8 +504,6 @@ def update_max_content_length():
|
||||
with open(max_content_length_watcher.fname, "r") as f:
|
||||
job_data["max_content_length"] = int(f.read().strip())
|
||||
|
||||
update_max_content_length()
|
||||
|
||||
|
||||
@swallow_exception
|
||||
def update_delay():
|
||||
@ -517,11 +516,9 @@ def update_delay():
|
||||
else:
|
||||
job_data["delay_min"] = job_data["delay_max"] = int(content)
|
||||
|
||||
update_delay()
|
||||
|
||||
|
||||
@swallow_exception
|
||||
def update_concurrency():
|
||||
def update_concurrency(app_session):
|
||||
if not concurrency_watcher.has_changed():
|
||||
return
|
||||
with open(concurrency_watcher.fname, "r") as f:
|
||||
@ -530,21 +527,11 @@ def update_concurrency():
|
||||
print("Warning: using 1 for concurrency instead of %r because it cannot be < 1" % (concurrency,))
|
||||
concurrency = 1
|
||||
job_data["concurrency"] = concurrency
|
||||
wpull_hook.factory.get('Engine').set_concurrent(concurrency)
|
||||
|
||||
update_concurrency()
|
||||
app_session.factory['PipelineSeries'].concurrency = concurrency
|
||||
|
||||
|
||||
def wait_time(seconds, url_info, record_info, response_info, error_info):
|
||||
update_delay()
|
||||
update_concurrency()
|
||||
update_custom_hooks()
|
||||
|
||||
return random.uniform(job_data["delay_min"], job_data["delay_max"]) / 1000
|
||||
|
||||
|
||||
def get_urls(filename, url_info, document_info):
|
||||
url = url_info["url"]
|
||||
def get_urls(url_info):
|
||||
url = url_info.raw
|
||||
extra_urls = None
|
||||
# If we see this URL, also queue the URL for the :orig quality image
|
||||
if url.startswith("https://pbs.twimg.com/media/"):
|
||||
@ -558,37 +545,65 @@ def get_urls(filename, url_info, document_info):
|
||||
return extra_urls
|
||||
|
||||
|
||||
@swallow_exception
|
||||
def update_custom_hooks():
|
||||
if not custom_hooks_watcher.has_changed():
|
||||
return
|
||||
class GrabSitePlugin(WpullPlugin):
|
||||
@hook(PluginFunctions.accept_url)
|
||||
def accept_url(self, item_session: ItemSession, verdict: bool, reasons: dict):
|
||||
url_info = item_session.request.url_info
|
||||
record_info = item_session.url_record
|
||||
return accept_url(url_info, record_info, verdict, reasons)
|
||||
|
||||
assert 3 in wpull_hook.callbacks.AVAILABLE_VERSIONS
|
||||
@event(PluginFunctions.queued_url)
|
||||
def queued_url(self, _url_info: URLInfo):
|
||||
job_data["items_queued"] += 1
|
||||
|
||||
# Set these every time, because custom_hooks.py may be wrapping them,
|
||||
# and when custom_hooks.py is reloaded, we want it re-wrap the base functions
|
||||
# instead of its already-wrapped functions.
|
||||
wpull_hook.callbacks.version = 3
|
||||
wpull_hook.callbacks.accept_url = accept_url
|
||||
wpull_hook.callbacks.queued_url = queued_url
|
||||
wpull_hook.callbacks.dequeued_url = dequeued_url
|
||||
wpull_hook.callbacks.handle_response = handle_response
|
||||
wpull_hook.callbacks.handle_error = handle_error
|
||||
wpull_hook.callbacks.handle_pre_response = handle_pre_response
|
||||
wpull_hook.callbacks.exit_status = exit_status
|
||||
wpull_hook.callbacks.wait_time = wait_time
|
||||
wpull_hook.callbacks.get_urls = get_urls
|
||||
@event(PluginFunctions.dequeued_url)
|
||||
def dequeued_url(self, _url_info: URLInfo, _record_info: URLRecord):
|
||||
job_data["items_downloaded"] += 1
|
||||
|
||||
custom_hooks_filename = os.path.join(working_dir, "custom_hooks.py")
|
||||
with open(custom_hooks_filename, 'rb') as in_file:
|
||||
code = compile(in_file.read(), custom_hooks_filename, 'exec')
|
||||
context = {
|
||||
'wpull_hook': wpull_hook,
|
||||
'maybe_log_ignore': maybe_log_ignore,
|
||||
'print_to_terminal': print_to_terminal
|
||||
}
|
||||
exec(code, context, context)
|
||||
@hook(PluginFunctions.handle_response)
|
||||
def handle_response(self, item_session: ItemSession):
|
||||
url_info = item_session.request.url_info
|
||||
record_info = item_session.url_record
|
||||
response = item_session.response
|
||||
error_info = None
|
||||
return handle_result(url_info, record_info, error_info, response)
|
||||
|
||||
update_custom_hooks()
|
||||
@hook(PluginFunctions.handle_error)
|
||||
def handle_error(self, item_session: ItemSession, error_info: BaseException):
|
||||
url_info = item_session.request.url_info
|
||||
record_info = item_session.url_record
|
||||
response = item_session.response
|
||||
return handle_result(url_info, record_info, error_info, response)
|
||||
|
||||
@hook(PluginFunctions.handle_pre_response)
|
||||
def handle_pre_response(self, item_session: ItemSession):
|
||||
url_info = item_session.request.url_info
|
||||
response = item_session.response
|
||||
update_scrape(self.app_session)
|
||||
return handle_pre_response(url_info, response)
|
||||
|
||||
@hook(PluginFunctions.exit_status)
|
||||
def exit_status(self, _app_session: AppSession, _exit_code: int):
|
||||
print()
|
||||
print(f'Finished grab {job_data["ident"]} {job_data["url"]} with exit code {code}')
|
||||
print(f'Output is in directory:\n{working_dir}')
|
||||
return code
|
||||
|
||||
@hook(PluginFunctions.wait_time)
|
||||
def wait_time(self, _seconds: float, _item_session: ItemSession, _error):
|
||||
update_delay()
|
||||
update_concurrency(self.app_session)
|
||||
return random.uniform(job_data["delay_min"], job_data["delay_max"]) / 1000
|
||||
|
||||
@event(PluginFunctions.get_urls)
|
||||
def get_urls(self, item_session: ItemSession):
|
||||
url_info = item_session.request.url_info
|
||||
return get_urls(url_info)
|
||||
|
||||
really_swallow_exceptions = True
|
||||
|
||||
update_ignoracle()
|
||||
update_igoff()
|
||||
update_video()
|
||||
update_max_content_length()
|
||||
update_delay()
|
||||
|
Loading…
x
Reference in New Issue
Block a user