grab-site/wpull_hooks.py

258 lines
6.9 KiB
Python
Raw Normal View History

import re
import os
import sys
import json
import pprint
2015-07-18 03:17:27 +00:00
import trollius as asyncio
from urllib.request import urlopen
from autobahn.asyncio.websocket import WebSocketClientFactory, WebSocketClientProtocol
from ignoracle import Ignoracle, parameterize_record_info
2015-07-18 05:40:35 +00:00
realStdoutWrite = sys.stdout.buffer.write
realStderrWrite = sys.stderr.buffer.write
2015-07-18 05:40:35 +00:00
def printToReal(s):
realStdoutWrite((s + "\n").encode("utf-8"))
2015-07-18 05:48:33 +00:00
sys.stdout.buffer.flush()
def getJobData():
return {
"ident": ident,
"started_at": started_at,
"bytes_downloaded": stats["bytes_downloaded"],
"url": start_url
}
class MyClientProtocol(WebSocketClientProtocol):
def onOpen(self):
2015-07-18 03:17:27 +00:00
self.factory.client = self
2015-07-18 05:48:33 +00:00
printToReal("{} connected to WebSocket server".format(self.__class__.__name__))
2015-07-18 03:49:24 +00:00
self.sendMessage(json.dumps({"type": "hello", "mode": "grabber"}).encode('utf-8'))
2015-07-18 03:17:27 +00:00
def onClose(self, wasClean, code, reason):
self.factory.client = None
2015-07-18 05:48:33 +00:00
printToReal("{} disconnected from WebSocket server".format(self.__class__.__name__))
# TODO: exponentially increasing delay (copy Decayer from dashboard)
2015-07-18 03:17:27 +00:00
asyncio.ensure_future(connectToServer())
def sendObject(self, obj):
self.sendMessage(json.dumps(obj).encode("utf-8"))
def report(self, url, response_code, response_message):
self.sendObject({
"job_data": getJobData(),
"type": "download",
"url": url,
"response_code": response_code,
"response_message": response_message,
})
class MyClientFactory(WebSocketClientFactory):
protocol = MyClientProtocol
2015-07-18 03:17:27 +00:00
def __init__(self):
super().__init__()
self.client = None
wsFactory = MyClientFactory()
2015-07-17 23:57:46 +00:00
2015-07-18 03:17:27 +00:00
@asyncio.coroutine
2015-07-17 23:57:46 +00:00
def connectToServer():
port = int(os.environ.get('GRAB_SITE_WS_PORT', 29001))
2015-07-18 03:17:27 +00:00
while True:
try:
coro = yield from loop.create_connection(wsFactory, '127.0.0.1', port)
except OSError:
2015-07-18 05:48:33 +00:00
printToReal("Could not connect to WebSocket server, retrying in 2 seconds...")
2015-07-18 03:17:27 +00:00
yield from asyncio.sleep(2)
else:
break
loop = asyncio.get_event_loop()
asyncio.ensure_future(connectToServer())
2015-07-17 23:57:46 +00:00
igsetCache = {}
def getPatternsForIgnoreSet(name):
assert name != "", name
if name in igsetCache:
return igsetCache[name]
2015-07-18 05:40:35 +00:00
printToReal("Fetching ArchiveBot/master/db/ignore_patterns/%s.json" % name)
igsetCache[name] = json.loads(urlopen(
"https://raw.githubusercontent.com/ArchiveTeam/ArchiveBot/" +
"master/db/ignore_patterns/%s.json" % name).read().decode("utf-8")
)["patterns"]
return igsetCache[name]
workingDir = os.environ['GRAB_SITE_WORKING_DIR']
2015-02-05 04:39:52 +00:00
def mtime(f):
return os.stat(f).st_mtime
class FileChangedWatcher(object):
def __init__(self, fname):
self.fname = fname
self.last_mtime = mtime(fname)
def has_changed(self):
now_mtime = mtime(self.fname)
changed = mtime(self.fname) != self.last_mtime
self.last_mtime = now_mtime
return changed
2015-07-18 04:34:24 +00:00
ident = open(os.path.join(workingDir, "id")).read().strip()
2015-07-18 04:14:50 +00:00
start_url = open(os.path.join(workingDir, "start_url")).read().strip()
2015-07-18 04:21:35 +00:00
started_at = os.stat(os.path.join(workingDir, "start_url")).st_mtime
igsetsWatcher = FileChangedWatcher(os.path.join(workingDir, "igsets"))
ignoresWatcher = FileChangedWatcher(os.path.join(workingDir, "ignores"))
ignoracle = Ignoracle()
def updateIgnoracle():
with open(os.path.join(workingDir, "igsets"), "r") as f:
igsets = f.read().strip("\r\n\t ,").split(',')
2015-02-05 04:39:52 +00:00
with open(os.path.join(workingDir, "ignores"), "r") as f:
ignores = set(ig for ig in f.read().strip("\r\n").split('\n') if ig != "")
for igset in igsets:
2015-02-05 04:39:52 +00:00
ignores.update(getPatternsForIgnoreSet(igset))
2015-07-18 05:40:35 +00:00
printToReal("Using these %d ignores:" % len(ignores))
printToReal(pprint.pformat(ignores))
2015-02-05 04:39:52 +00:00
ignoracle.set_patterns(ignores)
updateIgnoracle()
2015-07-18 04:31:54 +00:00
def shouldIgnoreURL(url, recordInfo):
"""
Returns whether a URL should be ignored.
"""
2015-07-18 04:31:54 +00:00
parameters = parameterize_record_info(recordInfo)
return ignoracle.ignores(url, **parameters)
2015-07-18 04:31:54 +00:00
def acceptUrl(urlInfo, recordInfo, verdict, reasons):
if igsetsWatcher.has_changed() or ignoresWatcher.has_changed():
updateIgnoracle()
2015-07-18 04:31:54 +00:00
url = urlInfo['url']
if url.startswith('data:'):
# data: URLs aren't something you can grab, so drop them to avoid ignore
# checking and ignore logging.
return False
2015-07-18 04:31:54 +00:00
pattern = shouldIgnoreURL(url, recordInfo)
if pattern:
if not os.path.exists(os.path.join(workingDir, "igoff")):
2015-07-18 05:40:35 +00:00
printToReal("IGNOR %s by %s" % (url, pattern))
return False
# If we get here, none of our ignores apply. Return the original verdict.
return verdict
2015-07-18 04:31:54 +00:00
stats = {"bytes_downloaded": 0}
def handleResult(urlInfo, recordInfo, errorInfo={}, httpInfo={}):
#print("urlInfo", urlInfo)
#print("recordInfo", recordInfo)
#print("errorInfo", errorInfo)
#print("httpInfo", httpInfo)
if httpInfo.get("body"):
stats["bytes_downloaded"] += httpInfo["body"]["content_size"]
2015-07-18 03:17:27 +00:00
if wsFactory.client:
wsFactory.client.report(
2015-07-18 04:31:54 +00:00
urlInfo['url'],
httpInfo.get("response_code"),
httpInfo.get("response_message")
2015-07-18 03:17:27 +00:00
)
2015-07-18 04:31:54 +00:00
def handleResponse(urlInfo, recordInfo, httpInfo):
return handleResult(urlInfo, recordInfo, httpInfo=httpInfo)
2015-07-18 04:31:54 +00:00
def handleError(urlInfo, recordInfo, errorInfo):
return handleResult(urlInfo, recordInfo, errorInfo=errorInfo)
2015-07-18 05:48:33 +00:00
def maybeLogIgnore(url, pattern):
pass
# Regular expressions for server headers go here
ICY_FIELD_PATTERN = re.compile('Icy-|Ice-|X-Audiocast-')
ICY_VALUE_PATTERN = re.compile('icecast', re.IGNORECASE)
2015-07-18 04:31:54 +00:00
def handlePreResponse(urlInfo, url_record, response_info):
url = urlInfo['url']
# Check if server version starts with ICY
if response_info.get('version', '') == 'ICY':
2015-07-18 05:48:33 +00:00
maybeLogIgnore(url, '[icy version]')
return wpull_hook.actions.FINISH
# Loop through all the server headers for matches
for field, value in response_info.get('fields', []):
if ICY_FIELD_PATTERN.match(field):
2015-07-18 05:48:33 +00:00
maybeLogIgnore(url, '[icy field]')
return wpull_hook.actions.FINISH
if field == 'Server' and ICY_VALUE_PATTERN.match(value):
2015-07-18 05:48:33 +00:00
maybeLogIgnore(url, '[icy server]')
return wpull_hook.actions.FINISH
# Nothing matched, allow download
2015-07-18 05:48:33 +00:00
printToReal(url + " ...")
return wpull_hook.actions.NORMAL
def stdoutWriteToBoth(message):
assert isinstance(message, bytes)
try:
2015-07-18 05:40:35 +00:00
realStdoutWrite(message)
if wsFactory.client:
wsFactory.client.sendObject({
"type": "stdout",
"job_data": getJobData(),
"message": message.decode("utf-8")
})
except Exception as e:
2015-07-18 05:40:35 +00:00
realStderrWrite((str(e) + "\n").encode("utf-8"))
def stderrWriteToBoth(message):
assert isinstance(message, bytes)
try:
2015-07-18 05:40:35 +00:00
realStderrWrite(message)
if wsFactory.client:
wsFactory.client.sendObject({
"type": "stderr",
"job_data": getJobData(),
"message": message.decode("utf-8")
})
except Exception as e:
2015-07-18 05:40:35 +00:00
realStderrWrite((str(e) + "\n").encode("utf-8"))
sys.stdout.buffer.write = stdoutWriteToBoth
sys.stderr.buffer.write = stderrWriteToBoth
assert 2 in wpull_hook.callbacks.AVAILABLE_VERSIONS
wpull_hook.callbacks.version = 2
wpull_hook.callbacks.accept_url = acceptUrl
wpull_hook.callbacks.handle_response = handleResponse
wpull_hook.callbacks.handle_error = handleError
wpull_hook.callbacks.handle_pre_response = handlePreResponse