2021-01-12 00:15:22 +01:00

279 lines
8.9 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2014-2021 Mike Fährmann
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
"""Extractors for https://sankaku.app/"""
from .booru import BooruExtractor
from .common import Message
from .. import text, exception
from ..cache import cache
import collections
BASE_PATTERN = r"(?:https?://)?" \
r"(?:sankaku\.app|(?:beta|chan)\.sankakucomplex\.com)"
class SankakuExtractor(BooruExtractor):
"""Base class for sankaku channel extractors"""
basecategory = "booru"
category = "sankaku"
filename_fmt = "{category}_{id}_{md5}.{extension}"
cookiedomain = None
_warning = True
TAG_TYPES = {
0: "general",
1: "artist",
2: "studio",
3: "copyright",
4: "character",
5: "genre",
6: "",
7: "",
8: "medium",
9: "meta",
}
def skip(self, num):
return 0
def _file_url(self, post):
url = post["file_url"]
if not url and self._warning:
self.log.warning(
"Login required to download 'contentious_content' posts")
SankakuExtractor._warning = False
return url
@staticmethod
def _prepare(post):
post["created_at"] = post["created_at"]["s"]
post["date"] = text.parse_timestamp(post["created_at"])
post["tags"] = [tag["name"] for tag in post["tags"]]
def _extended_tags(self, post):
tags = collections.defaultdict(list)
types = self.TAG_TYPES
for tag in post["tags"]:
tags[types[tag["type"]]].append(tag["name"])
for key, value in tags.items():
post["tags_" + key] = value
class SankakuTagExtractor(SankakuExtractor):
"""Extractor for images from sankaku.app by search-tags"""
subcategory = "tag"
directory_fmt = ("{category}", "{search_tags}")
archive_fmt = "t_{search_tags}_{id}"
pattern = BASE_PATTERN + r"/\?([^#]*)"
test = (
("https://sankaku.app/?tags=bonocho", {
"count": 5,
"pattern": r"https://c?s\.sankakucomplex\.com/data/[^/]{2}/[^/]{2}"
r"/[^/]{32}\.\w+\?e=\d+&m=[^&#]+",
}),
("https://beta.sankakucomplex.com/?tags=bonocho"),
("https://chan.sankakucomplex.com/?tags=bonocho"),
# error on five or more tags
("https://chan.sankakucomplex.com/?tags=bonocho+a+b+c+d", {
"options": (("username", None),),
"exception": exception.StopExtraction,
}),
# match arbitrary query parameters
("https://chan.sankakucomplex.com"
"/?tags=marie_rose&page=98&next=3874906&commit=Search"),
)
def __init__(self, match):
SankakuExtractor.__init__(self, match)
query = text.parse_query(match.group(1))
self.tags = text.unquote(query.get("tags", "").replace("+", " "))
def metadata(self):
return {"search_tags": self.tags}
def posts(self):
params = {"tags": self.tags}
return SankakuAPI(self).posts_keyset(params)
class SankakuPoolExtractor(SankakuExtractor):
"""Extractor for image pools or books from sankaku.app"""
subcategory = "pool"
directory_fmt = ("{category}", "pool", "{pool[id]} {pool[name_en]}")
archive_fmt = "p_{pool}_{id}"
pattern = BASE_PATTERN + r"/(?:books|pool/show)/(\d+)"
test = (
("https://sankaku.app/books/90", {
"count": 5,
}),
("https://beta.sankakucomplex.com/books/90"),
("https://chan.sankakucomplex.com/pool/show/90"),
)
def __init__(self, match):
SankakuExtractor.__init__(self, match)
self.pool_id = match.group(1)
def metadata(self):
pool = SankakuAPI(self).pools(self.pool_id)
self._posts = pool.pop("posts")
return {"pool": pool}
def posts(self):
return self._posts
class SankakuPostExtractor(SankakuExtractor):
"""Extractor for single posts from sankaku.app"""
subcategory = "post"
archive_fmt = "{id}"
pattern = BASE_PATTERN + r"/post/show/(\d+)"
test = (
("https://sankaku.app/post/show/360451", {
"content": "5e255713cbf0a8e0801dc423563c34d896bb9229",
"options": (("tags", True),),
"keyword": {
"tags_artist": ["bonocho"],
"tags_studio": ["dc_comics"],
"tags_medium": ["sketch", "copyright_name"],
"tags_copyright": list,
"tags_character": list,
"tags_general" : list,
},
}),
# 'contentious_content'
("https://sankaku.app/post/show/21418978", {
"pattern": r"https://s\.sankakucomplex\.com"
r"/data/13/3c/133cda3bfde249c504284493903fb985\.jpg",
}),
("https://beta.sankakucomplex.com/post/show/360451"),
("https://chan.sankakucomplex.com/post/show/360451"),
)
def __init__(self, match):
SankakuExtractor.__init__(self, match)
self.post_id = match.group(1)
def posts(self):
return SankakuAPI(self).posts(self.post_id)
class SankakuBooksExtractor(SankakuExtractor):
"""Extractor for books by tag search on sankaku.app"""
subcategory = "books"
pattern = BASE_PATTERN + r"/books/?\?([^#]*)"
test = (
("https://sankaku.app/books?tags=aiue_oka", {
"range": "1-20",
"count": 20,
}),
("https://beta.sankakucomplex.com/books?tags=aiue_oka"),
)
def __init__(self, match):
SankakuExtractor.__init__(self, match)
query = text.parse_query(match.group(1))
self.tags = text.unquote(query.get("tags", "").replace("+", " "))
def items(self):
params = {"tags": self.tags, "pool_type": "0"}
for pool in SankakuAPI(self).pools_keyset(params):
pool["_extractor"] = SankakuPoolExtractor
url = "https://sankaku.app/books/{}".format(pool["id"])
yield Message.Queue, url, pool
class SankakuAPI():
"""Interface for the sankaku.app API"""
def __init__(self, extractor):
self.extractor = extractor
self.headers = {"Accept": "application/vnd.sankaku.api+json;v=2"}
self.username, self.password = self.extractor._get_auth_info()
if not self.username:
self.authenticate = lambda: None
def pools(self, pool_id):
params = {"lang": "en"}
return self._call("/pools/" + pool_id, params)
def pools_keyset(self, params):
return self._pagination("/pools/keyset", params)
def posts(self, post_id):
params = {
"lang" : "en",
"page" : "1",
"limit": "1",
"tags" : "id_range:" + post_id,
}
return self._call("/posts", params)
def posts_keyset(self, params):
return self._pagination("/posts/keyset", params)
def authenticate(self):
self.headers["Authorization"] = \
_authenticate_impl(self.extractor, self.username, self.password)
def _call(self, endpoint, params=None):
url = "https://capi-v2.sankakucomplex.com" + endpoint
for _ in range(5):
self.authenticate()
response = self.extractor.request(
url, params=params, headers=self.headers, fatal=False)
if response.status_code == 429:
self.extractor.wait(
until=response.headers.get("X-RateLimit-Reset"))
continue
data = response.json()
try:
success = data.get("success", True)
except AttributeError:
success = True
if not success:
code = data.get("code")
if code == "invalid_token":
_authenticate_impl.invalidate(self.username)
continue
raise exception.StopExtraction(code)
return data
def _pagination(self, endpoint, params):
params["lang"] = "en"
params["limit"] = str(self.extractor.per_page)
while True:
data = self._call(endpoint, params)
yield from data["data"]
params["next"] = data["meta"]["next"]
if not params["next"]:
return
@cache(maxage=365*24*3600, keyarg=1)
def _authenticate_impl(extr, username, password):
extr.log.info("Logging in as %s", username)
url = "https://capi-v2.sankakucomplex.com/auth/token"
headers = {"Accept": "application/vnd.sankaku.api+json;v=2"}
data = {"login": username, "password": password}
response = extr.request(
url, method="POST", headers=headers, json=data, fatal=False)
data = response.json()
if response.status_code >= 400 or not data.get("success"):
raise exception.AuthenticationError(data.get("error"))
return "Bearer " + data["access_token"]