The previous one would match the first number in the URL slug as post ID, which would fail for posts with numbers in their title.
262 lines
8.9 KiB
262 lines
8.9 KiB
# -*- coding: utf-8 -*-
# Copyright 2019 Mike Fährmann
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
"""Extractors for https://www.patreon.com/"""
from .common import Extractor, Message
from .. import text, exception
from ..cache import memcache
import collections
import json
class PatreonExtractor(Extractor):
"""Base class for patreon extractors"""
category = "patreon"
root = "https://www.patreon.com"
directory_fmt = ("{category}", "{creator[full_name]}")
filename_fmt = "{id}_{title}_{num:>02}.{extension}"
archive_fmt = "{id}_{num}"
_warning = True
def items(self):
yield Message.Version, 1
if self._warning:
if "session_id" not in self.session.cookies:
self.log.warning("no 'session_id' cookie set")
PatreonExtractor._warning = False
for post in self.posts():
yield Message.Directory, post
ids = set()
post["num"] = 0
content = post.get("content")
postfile = post.get("post_file")
for image in post["images"]:
url = image.get("download_url")
if not url:
name = image.get("file_name") or self._filename(url) or url
post["num"] += 1
post["type"] = "image"
yield Message.Url, url, text.nameext_from_url(name, post)
if postfile and postfile["url"].split("/")[-2] not in ids:
post["num"] += 1
post["type"] = "postfile"
text.nameext_from_url(postfile["name"], post)
yield Message.Url, postfile["url"], post
for attachment in post["attachments"]:
post["num"] += 1
post["type"] = "attachment"
text.nameext_from_url(attachment["name"], post)
yield Message.Url, attachment["url"], post
if content:
for url in text.extract_iter(content, 'src="', '"'):
post["num"] += 1
post["type"] = "content"
yield Message.Url, url, text.nameext_from_url(url, post)
def posts(self):
"""Return all relevant post objects"""
def _pagination(self, url):
headers = {"Referer": self.root}
while url:
if not url.startswith("http"):
url = "https://" + url.lstrip("/:")
posts = self.request(url, headers=headers).json()
if "included" in posts:
included = self._transform(posts["included"])
for post in posts["data"]:
yield self._process(post, included)
if "links" not in posts:
url = posts["links"].get("next")
def _process(self, post, included):
"""Process and extend a 'post' object"""
attr = post["attributes"]
attr["id"] = text.parse_int(post["id"])
attr["images"] = self._files(post, included, "images")
attr["attachments"] = self._files(post, included, "attachments")
attr["date"] = text.parse_datetime(
attr["published_at"], "%Y-%m-%dT%H:%M:%S.%f%z")
user = post["relationships"]["user"]
attr["creator"] = (
self._user(user["links"]["related"]) or
return attr
def _transform(included):
"""Transform 'included' into an easier to handle format"""
result = collections.defaultdict(dict)
for inc in included:
result[inc["type"]][inc["id"]] = inc["attributes"]
return result
def _files(post, included, key):
"""Build a list of files"""
files = post["relationships"].get(key)
if files and files.get("data"):
return [
for file in files["data"]
return []
def _user(self, url):
"""Fetch user information"""
response = self.request(url, fatal=False)
if response.status_code >= 400:
return None
user = response.json()["data"]
attr = user["attributes"]
attr["id"] = user["id"]
attr["date"] = text.parse_datetime(
attr["created"], "%Y-%m-%dT%H:%M:%S.%f%z")
return attr
def _filename(self, url):
"""Fetch filename from its Content-Disposition header"""
response = self.request(url, method="HEAD", fatal=False)
cd = response.headers.get("Content-Disposition")
return text.extract(cd, 'filename="', '"')[0]
def _build_url(endpoint, query):
return (
"https://www.patreon.com/api/" + endpoint +
"&fields[access_rule]=access_rule_type,amount_cents" + query +
class PatreonCreatorExtractor(PatreonExtractor):
"""Extractor for a creator's works"""
subcategory = "creator"
pattern = (r"(?:https?://)?(?:www\.)?patreon\.com"
test = (
("https://www.patreon.com/koveliana", {
"range": "1-25",
"count": ">= 25",
"keyword": {
"attachments" : list,
"comment_count": int,
"content" : str,
"creator" : dict,
"date" : "type:datetime",
"id" : int,
"images" : list,
"like_count" : int,
"post_type" : str,
"published_at" : str,
"title" : str,
("https://www.patreon.com/kovelianot", {
"exception": exception.NotFoundError,
def __init__(self, match):
PatreonExtractor.__init__(self, match)
self.creator = match.group(1).lower()
def posts(self):
url = "{}/{}".format(self.root, self.creator)
page = self.request(url, notfound="creator").text
campaign_id = text.extract(page, "/campaign/", "/")[0]
if not campaign_id:
raise exception.NotFoundError("creator")
url = self._build_url("posts", (
"&filter[campaign_id]=" + campaign_id
return self._pagination(url)
class PatreonUserExtractor(PatreonExtractor):
"""Extractor for media from creators supported by you"""
subcategory = "user"
pattern = r"(?:https?://)?(?:www\.)?patreon\.com/home$"
test = ("https://www.patreon.com/home",)
def posts(self):
url = self._build_url("stream", (
return self._pagination(url)
class PatreonPostExtractor(PatreonExtractor):
"""Extractor for media from a single post"""
subcategory = "post"
pattern = r"(?:https?://)?(?:www\.)?patreon\.com/posts/([^/?&#]+)"
test = (
("https://www.patreon.com/posts/precious-metal-23563293", {
"count": 4,
("https://www.patreon.com/posts/er1-28201153", {
"count": 1,
("https://www.patreon.com/posts/not-found-123", {
"exception": exception.NotFoundError,
def __init__(self, match):
PatreonExtractor.__init__(self, match)
self.slug = match.group(1)
def posts(self):
url = "{}/posts/{}".format(self.root, self.slug)
page = self.request(url, notfound="post").text
data = text.extract(page, "window.patreon.bootstrap,", "\n});")[0]
post = json.loads(data + "}")["post"]
included = self._transform(post["included"])
return (self._process(post["data"], included),)