thatfuckingbird e47952ac14
add extractors for fantia and fanbox (#1459)
* add extractors for fantia and fanbox

* appease linter

* make docstrings unique

* [fantia] refactor post extraction

* [fantia] capitalize

* [fantia] improve regex pattern

* code style

* capitalize

* [fanbox] use BASE_PATTERN for url regexes

* [fanbox] refactor metadata and post extraction

* [fanbox] improve url base pattern

* [fanbox] accept creator page links ending with /posts

* [fanbox] more tests

* [fantia] improved pagination

* [fanbox] misc. code logic improvements

* [fantia] finish restructuring pagination code

* [fanbox] avoid making a request for each individual post when processing a creator page

* [fanbox] support embedded videos

* [fanbox] fix errors

* [fanbox] document extractor.fanbox.videos

* [fanbox] handle "article" and "entry" post types, all embeds

* [fanbox] fix downloading of embedded fanbox posts
2021-04-25 19:39:13 +02:00

284 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
"""Extractors for https://www.fanbox.cc/"""
from .common import Extractor, Message
from .. import text
BASE_PATTERN = (
r"(?:https?://)?(?:"
r"(?!www\.)([\w-]+)\.fanbox\.cc|"
r"(?:www\.)?fanbox\.cc/@([\w-]+))"
)
class FanboxExtractor(Extractor):
"""Base class for Fanbox extractors"""
category = "fanbox"
root = "https://www.fanbox.cc"
directory_fmt = ("{category}", "{creatorId}")
filename_fmt = "{id}_{num}.{extension}"
archive_fmt = "{id}_{num}"
_warning = True
def __init__(self, match):
Extractor.__init__(self, match)
self.embeds = self.config("embeds", True)
def items(self):
yield Message.Version, 1
if self._warning:
if "FANBOXSESSID" not in self.session.cookies:
self.log.warning("no 'FANBOXSESSID' cookie set")
FanboxExtractor._warning = False
for content_body, post in self.posts():
yield Message.Directory, post
yield from self._get_urls_from_post(content_body, post)
def posts(self):
"""Return all relevant post objects"""
def _pagination(self, url):
headers = {"Origin": self.root}
while url:
url = text.ensure_http_scheme(url)
body = self.request(url, headers=headers).json()["body"]
for item in body["items"]:
yield self._process_post(item)
url = body["nextUrl"]
def _get_post_data_from_id(self, post_id):
"""Fetch and process post data"""
headers = {"Origin": self.root}
url = "https://api.fanbox.cc/post.info?postId="+post_id
post = self.request(url, headers=headers).json()["body"]
return self._process_post(post)
def _process_post(self, post):
content_body = post.pop("body", None)
if content_body:
if "html" in content_body:
post["html"] = content_body["html"]
if post["type"] == "article":
post["articleBody"] = content_body.copy()
post["date"] = text.parse_datetime(post["publishedDatetime"])
post["text"] = content_body.get("text") if content_body else None
post["isCoverImage"] = False
return content_body, post
def _get_urls_from_post(self, content_body, post):
num = 0
cover_image = post.get("coverImageUrl")
if cover_image:
final_post = post.copy()
final_post["isCoverImage"] = True
final_post["fileUrl"] = cover_image
text.nameext_from_url(cover_image, final_post)
final_post["num"] = num
num += 1
yield Message.Url, cover_image, final_post
if not content_body:
return
if "html" in content_body:
html_urls = []
for href in text.extract_iter(content_body["html"], 'href="', '"'):
if "fanbox.pixiv.net/images/entry" in href:
html_urls.append(href)
elif "downloads.fanbox.cc" in href:
html_urls.append(href)
for src in text.extract_iter(content_body["html"],
'data-src-original="', '"'):
html_urls.append(src)
for url in html_urls:
final_post = post.copy()
text.nameext_from_url(url, final_post)
final_post["fileUrl"] = url
final_post["num"] = num
num += 1
yield Message.Url, url, final_post
for group in ("images", "imageMap"):
if group in content_body:
for item in content_body[group]:
if group == "imageMap":
# imageMap is a dict with image objects as values
item = content_body[group][item]
final_post = post.copy()
final_post["fileUrl"] = item["originalUrl"]
text.nameext_from_url(item["originalUrl"], final_post)
if "extension" in item:
final_post["extension"] = item["extension"]
final_post["fileId"] = item.get("id")
final_post["width"] = item.get("width")
final_post["height"] = item.get("height")
final_post["num"] = num
num += 1
yield Message.Url, item["originalUrl"], final_post
for group in ("files", "fileMap"):
if group in content_body:
for item in content_body[group]:
if group == "fileMap":
# fileMap is a dict with file objects as values
item = content_body[group][item]
final_post = post.copy()
final_post["fileUrl"] = item["url"]
text.nameext_from_url(item["url"], final_post)
if "extension" in item:
final_post["extension"] = item["extension"]
if "name" in item:
final_post["filename"] = item["name"]
final_post["fileId"] = item.get("id")
final_post["num"] = num
num += 1
yield Message.Url, item["url"], final_post
if self.embeds:
embeds_found = []
if "video" in content_body:
embeds_found.append(content_body["video"])
embeds_found.extend(content_body.get("embedMap", {}).values())
for embed in embeds_found:
# embed_result is (message type, url, metadata dict)
embed_result = self._process_embed(post, embed)
if not embed_result:
continue
embed_result[2]["num"] = num
num += 1
yield embed_result
def _process_embed(self, post, embed):
final_post = post.copy()
provider = embed["serviceProvider"]
content_id = embed.get("videoId") or embed.get("contentId")
prefix = "ytdl:" if self.embeds == "ytdl" else ""
url = None
is_video = False
if provider == "soundcloud":
url = prefix+"https://soundcloud.com/"+content_id
is_video = True
elif provider == "youtube":
url = prefix+"https://youtube.com/watch?v="+content_id
is_video = True
elif provider == "vimeo":
url = prefix+"https://vimeo.com/"+content_id
is_video = True
elif provider == "fanbox":
# this is an old URL format that redirects
# to a proper Fanbox URL
url = "https://www.pixiv.net/fanbox/"+content_id
# resolve redirect
response = self.request(url, method="HEAD", allow_redirects=False)
url = response.headers["Location"]
final_post["_extractor"] = FanboxPostExtractor
elif provider == "twitter":
url = "https://twitter.com/_/status/"+content_id
elif provider == "google_forms":
templ = "https://docs.google.com/forms/d/e/{}/viewform?usp=sf_link"
url = templ.format(content_id)
else:
self.log.warning("service not recognized: {}".format(provider))
if url:
final_post["embed"] = embed
final_post["embedUrl"] = url
text.nameext_from_url(url, final_post)
msg_type = Message.Queue
if is_video and self.embeds == "ytdl":
msg_type = Message.Url
return msg_type, url, final_post
class FanboxCreatorExtractor(FanboxExtractor):
"""Extractor for a Fanbox creator's works"""
subcategory = "creator"
pattern = BASE_PATTERN + r"(?:/posts)?/?$"
test = (
("https://xub.fanbox.cc", {
"range": "1-15",
"count": ">= 15",
"keyword": {
"creatorId" : "xub",
"tags" : list,
"title" : str,
},
}),
("https://xub.fanbox.cc/posts"),
("https://www.fanbox.cc/@xub/"),
("https://www.fanbox.cc/@xub/posts"),
)
def __init__(self, match):
FanboxExtractor.__init__(self, match)
self.creator_id = match.group(1) or match.group(2)
def posts(self):
url = "https://api.fanbox.cc/post.listCreator?creatorId={}&limit=10"
return self._pagination(url.format(self.creator_id))
class FanboxPostExtractor(FanboxExtractor):
"""Extractor for media from a single Fanbox post"""
subcategory = "post"
pattern = BASE_PATTERN + r"/posts/(\d+)"
test = (
("https://www.fanbox.cc/@xub/posts/1910054", {
"count": 3,
"keyword": {
"title": "えま★おうがすと",
"tags": list,
"hasAdultContent": True,
"isCoverImage": False
},
}),
# entry post type, image embedded in html of the post
("https://nekoworks.fanbox.cc/posts/915", {
"count": 2,
"keyword": {
"title": "【SAYORI FAN CLUB】お届け内容",
"tags": list,
"html": str,
"hasAdultContent": True
},
}),
# article post type, imageMap, 2 twitter embeds, fanbox embed
("https://steelwire.fanbox.cc/posts/285502", {
"options": (("embeds", True),),
"count": 10,
"keyword": {
"title": "イラスト+SS義足の炭鉱少年が義足を見せてくれるだけ 【全体公開版】",
"tags": list,
"articleBody": dict,
"hasAdultContent": True
},
}),
)
def __init__(self, match):
FanboxExtractor.__init__(self, match)
self.post_id = match.group(3)
def posts(self):
return (self._get_post_data_from_id(self.post_id),)