grab-site/libgrabsite/wpull_hooks.py

487 lines
12 KiB
Python
Raw Normal View History

import re
import os
import sys
import json
import time
import pprint
import signal
import random
import functools
import traceback
2015-07-18 03:17:27 +00:00
import trollius as asyncio
from autobahn.asyncio.websocket import WebSocketClientFactory, WebSocketClientProtocol
from libgrabsite.ignoracle import Ignoracle, parameterize_record_info
import libgrabsite
2015-07-28 12:51:59 +00:00
real_stdout_write = sys.stdout.buffer.write
real_stderr_write = sys.stderr.buffer.write
2015-07-28 12:51:59 +00:00
def print_to_real(s):
real_stdout_write((s + "\n").encode("utf-8"))
2015-07-18 05:48:33 +00:00
sys.stdout.buffer.flush()
2015-07-18 10:57:57 +00:00
class GrabberClientProtocol(WebSocketClientProtocol):
2015-07-28 12:51:59 +00:00
def on_open(self):
2015-07-18 03:17:27 +00:00
self.factory.client = self
2015-07-28 12:51:59 +00:00
self.send_object({
2015-07-18 05:51:15 +00:00
"type": "hello",
"mode": "grabber",
2015-07-28 12:51:59 +00:00
"url": job_data["url"]
})
2015-07-28 12:51:59 +00:00
def on_close(self, was_clean, code, reason):
2015-07-18 03:17:27 +00:00
self.factory.client = None
2015-07-28 12:51:59 +00:00
print_to_real(
"Disconnected from ws:// server with (was_clean, code, reason): {!r}"
.format((was_clean, code, reason)))
asyncio.ensure_future(connect_to_server())
2015-07-28 12:51:59 +00:00
def send_object(self, obj):
self.sendMessage(json.dumps(obj).encode("utf-8"))
2015-07-28 12:51:59 +00:00
onOpen = on_open
onClose = on_close
2015-07-18 10:57:57 +00:00
class GrabberClientFactory(WebSocketClientFactory):
protocol = GrabberClientProtocol
2015-07-18 03:17:27 +00:00
def __init__(self):
super().__init__()
self.client = None
2015-07-28 12:51:59 +00:00
ws_factory = GrabberClientFactory()
2015-07-17 23:57:46 +00:00
class Decayer(object):
def __init__(self, initial, multiplier, maximum):
"""
initial - initial number to return
multiplier - multiply number by this value after each call to decay()
maximum - cap number at this value
"""
self.initial = initial
self.multiplier = multiplier
self.maximum = maximum
self.reset()
def reset(self):
# First call to .decay() will multiply, but we want to get the `intitial`
# value on the first call to .decay(), so divide.
self.current = self.initial / self.multiplier
return self.current
def decay(self):
self.current = min(self.current * self.multiplier, self.maximum)
return self.current
2015-07-18 03:17:27 +00:00
@asyncio.coroutine
2015-07-28 12:51:59 +00:00
def connect_to_server():
2015-07-18 06:16:46 +00:00
host = os.environ.get('GRAB_SITE_WS_HOST', '127.0.0.1')
2015-07-17 23:57:46 +00:00
port = int(os.environ.get('GRAB_SITE_WS_PORT', 29001))
decayer = Decayer(0.25, 1.5, 8)
2015-07-18 03:17:27 +00:00
while True:
try:
2015-07-29 18:37:43 +00:00
yield from loop.create_connection(ws_factory, host, port)
2015-07-18 03:17:27 +00:00
except OSError:
delay = decayer.decay()
2015-07-28 12:51:59 +00:00
print_to_real(
"Could not connect to ws://{}:{}, retrying in {:.1f} seconds..."
.format(host, port, delay))
yield from asyncio.sleep(delay)
2015-07-18 03:17:27 +00:00
else:
2015-07-28 12:51:59 +00:00
print_to_real("Connected to ws://{}:{}".format(host, port))
2015-07-18 03:17:27 +00:00
break
loop = asyncio.get_event_loop()
2015-07-28 12:51:59 +00:00
asyncio.ensure_future(connect_to_server())
2015-07-17 23:57:46 +00:00
2015-07-28 12:51:59 +00:00
def graceful_stop_callback():
print_to_real("\n^C detected, creating 'stop' file, please wait for exit...")
with open(os.path.join(working_dir, "stop"), "wb") as f:
pass
2015-07-28 12:51:59 +00:00
def forceful_stop_callback():
loop.stop()
2015-07-28 12:51:59 +00:00
loop.add_signal_handler(signal.SIGINT, graceful_stop_callback)
loop.add_signal_handler(signal.SIGTERM, forceful_stop_callback)
2015-07-17 23:57:46 +00:00
ignore_sets_path = os.path.join(os.path.dirname(libgrabsite.__file__), "ignore_sets")
2015-07-28 12:51:59 +00:00
def get_patterns_for_ignore_set(name):
assert name != "", name
with open(os.path.join(ignore_sets_path, name), "r", encoding="utf-8") as f:
lines = f.read().strip("\n").split("\n")
lines = filter(lambda line: line and not line.startswith("# "), lines)
return lines
2015-07-28 12:51:59 +00:00
working_dir = os.environ['GRAB_SITE_WORKING_DIR']
2015-02-05 04:39:52 +00:00
# Don't swallow during startup
really_swallow_exceptions = False
def swallow_exception(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
global really_swallow_exceptions
try:
return f(*args, **kwargs)
except Exception:
if not really_swallow_exceptions:
raise
traceback.print_exc()
return wrapper
CONTROL_FILE_CACHE_SEC = 3
def caching_decorator(f):
cache = {}
@functools.wraps(f)
def wrapper(path):
timestamp, val = cache.get(path, (-CONTROL_FILE_CACHE_SEC, None))
if timestamp > (time.monotonic() - CONTROL_FILE_CACHE_SEC):
#print("returning cached value {} {}".format(path, val))
return val
val = f(path)
cache[path] = (time.monotonic(), val)
#print("returning new value {} {}".format(path, val))
return val
return wrapper
@caching_decorator
def path_exists_with_cache(path):
return os.path.exists(path)
@caching_decorator
def mtime_with_cache(path):
return os.stat(path).st_mtime
class FileChangedWatcher(object):
def __init__(self, fname):
self.fname = fname
# Use a bogus mtime so that has_changed() returns True
# at least once
self.last_mtime = -1
2015-07-28 12:51:59 +00:00
def has_changed(self):
now_mtime = mtime_with_cache(self.fname)
changed = now_mtime != self.last_mtime
2015-07-28 12:51:59 +00:00
self.last_mtime = now_mtime
return changed
2015-07-28 12:51:59 +00:00
igsets_watcher = FileChangedWatcher(os.path.join(working_dir, "igsets"))
ignores_watcher = FileChangedWatcher(os.path.join(working_dir, "ignores"))
delay_watcher = FileChangedWatcher(os.path.join(working_dir, "delay"))
concurrency_watcher = FileChangedWatcher(os.path.join(working_dir, "concurrency"))
max_content_length_watcher = FileChangedWatcher(os.path.join(working_dir, "max_content_length"))
ignoracle = Ignoracle()
@swallow_exception
2015-07-28 12:51:59 +00:00
def update_ignoracle():
if not (igsets_watcher.has_changed() or ignores_watcher.has_changed()):
return
2015-07-28 12:51:59 +00:00
with open(os.path.join(working_dir, "igsets"), "r") as f:
igsets = f.read().strip("\r\n\t ,").split(',')
2015-02-05 04:39:52 +00:00
2015-07-28 12:51:59 +00:00
with open(os.path.join(working_dir, "ignores"), "r") as f:
ignores = set(ig for ig in f.read().strip("\r\n").split('\n') if ig != "")
for igset in igsets:
2015-07-28 12:51:59 +00:00
patterns = get_patterns_for_ignore_set(igset)
ignores.update(patterns)
2015-07-28 12:51:59 +00:00
print_to_real("Using these %d ignores:" % len(ignores))
for ig in sorted(ignores):
print_to_real("\t" + ig)
2015-02-05 04:39:52 +00:00
ignoracle.set_patterns(ignores)
2015-07-28 12:51:59 +00:00
update_ignoracle()
2015-07-28 12:51:59 +00:00
def should_ignore_url(url, record_info):
"""
Returns whether a URL should be ignored.
"""
2015-07-28 12:51:59 +00:00
parameters = parameterize_record_info(record_info)
return ignoracle.ignores(url, **parameters)
2015-07-28 12:51:59 +00:00
def accept_url(url_info, record_info, verdict, reasons):
update_ignoracle()
2015-07-28 12:51:59 +00:00
url = url_info['url']
if url.startswith('data:'):
# data: URLs aren't something you can grab, so drop them to avoid ignore
# checking and ignore logging.
return False
2015-07-28 12:51:59 +00:00
pattern = should_ignore_url(url, record_info)
if pattern:
2015-07-28 12:51:59 +00:00
maybe_log_ignore(url, pattern)
return False
# If we get here, none of our ignores apply. Return the original verdict.
return verdict
2015-07-28 12:51:59 +00:00
def queued_url(url_info):
job_data["items_queued"] += 1
2015-07-18 08:21:34 +00:00
2015-07-28 12:51:59 +00:00
def dequeued_url(url_info, record_info):
job_data["items_downloaded"] += 1
2015-07-18 08:21:34 +00:00
2015-07-28 12:51:59 +00:00
job_data = {
"ident": open(os.path.join(working_dir, "id")).read().strip(),
"url": open(os.path.join(working_dir, "start_url")).read().strip(),
"started_at": os.stat(os.path.join(working_dir, "start_url")).st_mtime,
"max_content_length": -1,
"suppress_ignore_reports": True,
"concurrency": 2,
2015-07-18 08:21:34 +00:00
"bytes_downloaded": 0,
"items_queued": 0,
"items_downloaded": 0,
"delay_min": 0,
"delay_max": 0,
"r1xx": 0,
"r2xx": 0,
"r3xx": 0,
"r4xx": 0,
"r5xx": 0,
"runk": 0,
}
2015-07-18 04:31:54 +00:00
2015-07-28 12:51:59 +00:00
def handle_result(url_info, record_info, error_info={}, http_info={}):
#print("url_info", url_info)
#print("record_info", record_info)
#print("error_info", error_info)
#print("http_info", http_info)
2015-07-18 04:31:54 +00:00
update_igoff()
2015-07-18 09:49:03 +00:00
response_code = 0
2015-07-28 12:51:59 +00:00
if http_info.get("response_code"):
response_code = http_info.get("response_code")
response_code_str = str(http_info["response_code"])
2015-07-18 09:49:03 +00:00
if len(response_code_str) == 3 and response_code_str[0] in "12345":
2015-07-28 12:51:59 +00:00
job_data["r%sxx" % response_code_str[0]] += 1
2015-07-18 09:49:50 +00:00
else:
2015-07-28 12:51:59 +00:00
job_data["runk"] += 1
2015-07-18 08:21:34 +00:00
2015-07-28 12:51:59 +00:00
if http_info.get("body"):
job_data["bytes_downloaded"] += http_info["body"]["content_size"]
2015-07-18 04:31:54 +00:00
2015-07-28 12:51:59 +00:00
stop = should_stop()
2015-07-18 08:53:38 +00:00
2015-07-28 12:51:59 +00:00
response_message = http_info.get("response_message")
if error_info:
2015-07-18 09:49:03 +00:00
response_code = 0
2015-07-28 12:51:59 +00:00
response_message = error_info["error"]
2015-07-18 09:49:03 +00:00
2015-07-28 12:51:59 +00:00
if ws_factory.client:
ws_factory.client.send_object({
"type": "download",
2015-07-28 12:51:59 +00:00
"job_data": job_data,
"url": url_info["url"],
2015-07-18 09:49:03 +00:00
"response_code": response_code,
"response_message": response_message,
})
2015-07-18 08:53:38 +00:00
if stop:
return wpull_hook.actions.STOP
return wpull_hook.actions.NORMAL
2015-07-28 12:51:59 +00:00
def handle_response(url_info, record_info, http_info):
return handle_result(url_info, record_info, http_info=http_info)
2015-07-28 12:51:59 +00:00
def handle_error(url_info, record_info, error_info):
return handle_result(url_info, record_info, error_info=error_info)
stop_path = os.path.join(working_dir, "stop")
2015-07-28 12:51:59 +00:00
def should_stop():
return path_exists_with_cache(stop_path)
2015-07-18 08:53:38 +00:00
igoff_path = os.path.join(working_dir, "igoff")
def update_igoff():
igoff = path_exists_with_cache(igoff_path)
2015-07-28 12:51:59 +00:00
job_data["suppress_ignore_reports"] = igoff
return igoff
2015-08-10 13:23:43 +00:00
update_igoff()
2015-07-28 12:51:59 +00:00
def maybe_log_ignore(url, pattern):
if not update_igoff():
print_to_real("IGNOR %s\n by %s" % (url, pattern))
2015-07-28 12:51:59 +00:00
if ws_factory.client:
ws_factory.client.send_object({
2015-07-18 05:55:51 +00:00
"type": "ignore",
2015-07-28 12:51:59 +00:00
"job_data": job_data,
2015-07-18 05:55:51 +00:00
"url": url,
"pattern": pattern
})
2015-07-18 05:48:33 +00:00
ICY_FIELD_PATTERN = re.compile('icy-|ice-|x-audiocast-', re.IGNORECASE)
ICY_VALUE_PATTERN = re.compile('icecast', re.IGNORECASE)
def get_content_length(response_info):
try:
return int(list(p for p in response_info["fields"] if p[0] == "Content-Length")[0][1])
except (IndexError, ValueError):
return -1
2015-07-28 12:51:59 +00:00
def handle_pre_response(url_info, url_record, response_info):
url = url_info['url']
update_max_content_length()
if job_data["max_content_length"] != -1:
##pprint.pprint(response_info)
length = get_content_length(response_info)
##print((length, job_data["max_content_length"]))
if length > job_data["max_content_length"]:
maybe_log_ignore(url, '[content-length %d over limit %d]' % (
length, job_data["max_content_length"]))
return wpull_hook.actions.FINISH
# Check if server version starts with ICY
2015-07-28 12:51:59 +00:00
if response_info.get('version', '') == 'ICY':
maybe_log_ignore(url, '[icy version]')
return wpull_hook.actions.FINISH
# Loop through all the server headers for matches
2015-07-28 12:51:59 +00:00
for field, value in response_info.get('fields', []):
if ICY_FIELD_PATTERN.match(field):
2015-07-28 12:51:59 +00:00
maybe_log_ignore(url, '[icy field]')
return wpull_hook.actions.FINISH
if field == 'Server' and ICY_VALUE_PATTERN.match(value):
2015-07-28 12:51:59 +00:00
maybe_log_ignore(url, '[icy server]')
return wpull_hook.actions.FINISH
# Nothing matched, allow download
2015-07-28 12:51:59 +00:00
print_to_real(url + " ...")
return wpull_hook.actions.NORMAL
2015-07-28 12:51:59 +00:00
def stdout_write_both(message):
2015-07-18 05:51:53 +00:00
assert isinstance(message, bytes), message
try:
2015-07-28 12:51:59 +00:00
real_stdout_write(message)
if ws_factory.client:
ws_factory.client.send_object({
"type": "stdout",
2015-07-28 12:51:59 +00:00
"job_data": job_data,
"message": message.decode("utf-8")
})
except Exception as e:
2015-07-28 12:51:59 +00:00
real_stderr_write((str(e) + "\n").encode("utf-8"))
2015-07-19 20:20:56 +00:00
2015-07-28 12:51:59 +00:00
def stderr_write_both(message):
2015-07-18 05:51:53 +00:00
assert isinstance(message, bytes), message
try:
2015-07-28 12:51:59 +00:00
real_stderr_write(message)
if ws_factory.client:
ws_factory.client.send_object({
"type": "stderr",
2015-07-28 12:51:59 +00:00
"job_data": job_data,
"message": message.decode("utf-8")
})
except Exception as e:
2015-07-28 12:51:59 +00:00
real_stderr_write((str(e) + "\n").encode("utf-8"))
2015-07-28 12:51:59 +00:00
sys.stdout.buffer.write = stdout_write_both
sys.stderr.buffer.write = stderr_write_both
2015-07-28 12:51:59 +00:00
def exit_status(code):
2015-07-19 20:47:39 +00:00
print()
2015-07-28 12:51:59 +00:00
print("Finished grab {} {} with exit code {}".format(
job_data["ident"], job_data["url"], code))
print("Output is in directory:\n{}".format(working_dir))
return code
@swallow_exception
def update_max_content_length():
if not max_content_length_watcher.has_changed():
return
with open(max_content_length_watcher.fname, "r") as f:
job_data["max_content_length"] = int(f.read().strip())
update_max_content_length()
@swallow_exception
def update_delay():
if not delay_watcher.has_changed():
return
2015-07-28 13:57:42 +00:00
with open(delay_watcher.fname, "r") as f:
content = f.read().strip()
if "-" in content:
job_data["delay_min"], job_data["delay_max"] = list(int(s) for s in content.split("-", 1))
else:
job_data["delay_min"] = job_data["delay_max"] = int(content)
update_delay()
2015-07-28 13:57:42 +00:00
@swallow_exception
def update_concurrency():
if not concurrency_watcher.has_changed():
return
with open(concurrency_watcher.fname, "r") as f:
job_data["concurrency"] = int(f.read().strip())
wpull_hook.factory.get('Engine').set_concurrent(job_data["concurrency"])
update_concurrency()
def wait_time(_):
update_delay()
# While we're at it, update the concurrency level
update_concurrency()
return random.uniform(job_data["delay_min"], job_data["delay_max"]) / 1000
assert 2 in wpull_hook.callbacks.AVAILABLE_VERSIONS
wpull_hook.callbacks.version = 2
2015-07-28 12:51:59 +00:00
wpull_hook.callbacks.accept_url = accept_url
wpull_hook.callbacks.queued_url = queued_url
wpull_hook.callbacks.dequeued_url = dequeued_url
wpull_hook.callbacks.handle_response = handle_response
wpull_hook.callbacks.handle_error = handle_error
wpull_hook.callbacks.handle_pre_response = handle_pre_response
wpull_hook.callbacks.exit_status = exit_status
wpull_hook.callbacks.wait_time = wait_time
really_swallow_exceptions = True