2016-10-30 20:38:22 +01:00

96 lines
2.4 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2014, 2015 Mike Fährmann
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
"""Common classes and constants used by extractor modules."""
import time
import queue
import requests
import threading
from .message import Message
from .. import config
class Extractor():
category = ""
subcategory = ""
directory_fmt = [""]
filename_fmt = ""
def __init__(self):
self.session = requests.Session()
def __iter__(self):
return self.items()
def items(self):
yield Message.Version, 1
def request(self, url, encoding=None, *args, **kwargs):
response = safe_request(self.session, url, *args, **kwargs)
if encoding:
response.encoding = encoding
return response
class AsynchronousExtractor(Extractor):
def __init__(self):
Extractor.__init__(self)
queue_size = int(config.get(("queue-size",), default=5))
self.__queue = queue.Queue(maxsize=queue_size)
self.__thread = threading.Thread(target=self.async_items, daemon=True)
def __iter__(self):
get = self.__queue.get
done = self.__queue.task_done
self.__thread.start()
while True:
task = get()
if task is None:
return
if isinstance(task, Exception):
raise task
yield task
done()
def async_items(self):
put = self.__queue.put
try:
for task in self.items():
put(task)
except Exception as e:
put(e)
put(None)
def safe_request(session, url, method="GET", *args, **kwargs):
tries = 0
while True:
# try to connect to remote source
try:
r = session.request(method, url, *args, **kwargs)
except requests.exceptions.ConnectionError:
tries += 1
time.sleep(1)
if tries == 5:
raise
continue
# reject error-status-codes
if r.status_code != requests.codes.ok:
tries += 1
time.sleep(1)
if tries == 5:
r.raise_for_status()
continue
# everything ok -- proceed to download
return r