306 lines
9.7 KiB
Python
Raw Normal View History

2016-02-20 11:29:10 +01:00
# -*- coding: utf-8 -*-
# Copyright 2016-2018 Mike Fährmann
2016-02-20 11:29:10 +01:00
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
"""Extract images from https://www.tumblr.com/"""
from .common import Extractor, Message
from .. import text, util, exception
2017-11-03 22:16:57 +01:00
from ..cache import memcache
import re
2016-02-20 11:29:10 +01:00
2017-02-01 00:53:19 +01:00
def _original_image(url):
match = re.match(
r"https?://\d+\.media\.tumblr\.com"
r"((/[0-9a-f]+)?/tumblr_[^/?&#.]+_)\d+(\.[0-9a-z]+)",
url)
if not match:
return (url,)
root = "https://s3.amazonaws.com/data.tumblr.com"
path, key, ext = match.groups()
return (
"".join((root, path, "raw" if key else "1280", ext)),
"".join((root, path, "500", ext)),
url,
)
def _original_video(url):
return re.sub(
(r"https?://vt\.media\.tumblr\.com"
r"/tumblr_([^_]+)_\d+\.([0-9a-z]+)"),
r"https://vt.media.tumblr.com/tumblr_\1.\2", url
)
POST_TYPES = frozenset((
"text", "quote", "link", "answer", "video", "audio", "photo", "chat"))
BASE_PATTERN = (
r"(?:tumblr:(?:https?://)?([^/]+)|"
r"(?:https?://)?([^.]+\.tumblr\.com))")
2017-11-03 22:16:57 +01:00
class TumblrExtractor(Extractor):
"""Base class for tumblr extractors"""
2016-02-20 11:29:10 +01:00
category = "tumblr"
2017-11-03 22:16:57 +01:00
directory_fmt = ["{category}", "{name}"]
2018-01-12 14:56:01 +01:00
filename_fmt = "{category}_{blog_name}_{id}o{offset}.{extension}"
archive_fmt = "{id}_{offset}"
2016-02-20 11:29:10 +01:00
def __init__(self, match):
Extractor.__init__(self)
self.blog = match.group(1) or match.group(2)
self.api = TumblrAPI(self)
2016-02-20 11:29:10 +01:00
self.types = self._setup_posttypes()
self.inline = self.config("inline", False)
self.reblogs = self.config("reblogs", True)
self.external = self.config("external", False)
if len(self.types) == 1:
2018-01-12 14:56:01 +01:00
self.api.posts_type = next(iter(self.types))
elif not self.types:
self.log.warning("no valid post types selected")
2016-02-20 11:29:10 +01:00
def items(self):
blog = self.api.info(self.blog)
2016-02-20 11:29:10 +01:00
yield Message.Version, 1
2017-11-03 22:16:57 +01:00
yield Message.Directory, blog
2016-02-20 15:24:30 +01:00
2017-11-03 22:16:57 +01:00
for post in self.posts():
if post["type"] not in self.types:
continue
reblog = "reblogged_from_id" in post
if reblog and not self.reblogs:
continue
post["reblogged"] = reblog
post["blog"] = blog
post["offset"] = 0
if "trail" in post:
del post["trail"]
if "photos" in post: # type "photo" or "link"
photos = post["photos"]
del post["photos"]
for photo in photos:
post["photo"] = photo
photo.update(photo["original_size"])
del photo["original_size"]
del photo["alt_sizes"]
yield self._prepare_image(photo["url"], post)
if "audio_url" in post: # type: "audio"
2017-11-23 16:12:07 +01:00
yield self._prepare(post["audio_url"], post)
if "video_url" in post: # type: "video"
2017-11-23 16:12:07 +01:00
yield self._prepare(_original_video(post["video_url"]), post)
if self.inline: # inline images
for key in ("body", "description"):
if key in post:
for url in re.findall('<img src="([^"]+)"', post[key]):
yield self._prepare_image(url, post)
if self.external: # external links
post["extension"] = None
for key in ("permalink_url", "url"):
if key in post:
yield Message.Queue, post[key], post
2017-11-03 22:16:57 +01:00
def posts(self):
"""Return an iterable containing all relevant posts"""
def _setup_posttypes(self):
types = self.config("posts", ("photo",))
if types == "all":
return POST_TYPES
elif not types:
return frozenset()
else:
if isinstance(types, str):
types = types.split(",")
types = frozenset(types)
invalid = types - POST_TYPES
if invalid:
types = types & POST_TYPES
self.log.warning('invalid post types: "%s"',
'", "'.join(sorted(invalid)))
return types
@staticmethod
2017-11-23 16:12:07 +01:00
def _prepare(url, post):
post["offset"] += 1
return Message.Url, url, text.nameext_from_url(url, post)
@staticmethod
def _prepare_image(url, post):
post["offset"] += 1
urls = _original_image(url)
return Message.Urllist, urls, text.nameext_from_url(url, post)
2017-11-03 22:16:57 +01:00
class TumblrUserExtractor(TumblrExtractor):
"""Extractor for all images from a tumblr-user"""
subcategory = "user"
pattern = [BASE_PATTERN + r"(?:/page/\d+)?/?$"]
test = [
("http://demo.tumblr.com/", {
"pattern": (r"https://s3\.amazonaws\.com/data\.tumblr\.com"
r"/tumblr_[^/_]+_\d+\.jpg"),
"count": 1,
}),
("http://demo.tumblr.com/", {
"pattern": (r"https?://(?:$|"
r"s3\.amazonaws\.com/data\.tumblr\.com/.+_1280\.jpg|"
r"w+\.tumblr\.com/audio_file/demo/\d+/tumblr_\w+)"),
"count": 3,
"options": (("posts", "all"), ("external", True),
("inline", True), ("reblogs", True))
}),
("tumblr:http://www.b-authentique.com/", None),
("tumblr:www.b-authentique.com", None),
]
2017-11-03 22:16:57 +01:00
def posts(self):
return self.api.posts(self.blog, {})
2017-11-03 22:16:57 +01:00
class TumblrPostExtractor(TumblrExtractor):
"""Extractor for images from a single post on tumblr"""
2016-02-20 15:24:30 +01:00
subcategory = "post"
pattern = [BASE_PATTERN + r"/(?:post|image)/(\d+)"]
test = [
("http://demo.tumblr.com/post/459265350", {
"pattern": (r"https://s3\.amazonaws\.com/data\.tumblr\.com"
r"/tumblr_[^/_]+_1280.jpg"),
"count": 1,
}),
("http://demo.tumblr.com/image/459265350", None),
]
2016-02-20 15:24:30 +01:00
def __init__(self, match):
2017-11-03 22:16:57 +01:00
TumblrExtractor.__init__(self, match)
self.post_id = match.group(3)
self.reblogs = True
2016-02-20 15:24:55 +01:00
2017-11-03 22:16:57 +01:00
def posts(self):
return self.api.posts(self.blog, {"id": self.post_id})
2016-02-20 15:24:55 +01:00
@staticmethod
def _setup_posttypes():
return POST_TYPES
2017-11-03 22:16:57 +01:00
class TumblrTagExtractor(TumblrExtractor):
"""Extractor for images from a tumblr-user by tag"""
2016-02-20 15:24:55 +01:00
subcategory = "tag"
pattern = [BASE_PATTERN + r"/tagged/([^/?&#]+)"]
test = [("http://demo.tumblr.com/tagged/Times%20Square", {
"pattern": (r"https://s3\.amazonaws\.com/data\.tumblr\.com"
r"/tumblr_[^/_]+_1280.jpg"),
2017-10-08 23:59:27 +02:00
"count": 1,
2016-02-20 15:24:55 +01:00
})]
def __init__(self, match):
2017-11-03 22:16:57 +01:00
TumblrExtractor.__init__(self, match)
self.tag = text.unquote(match.group(3))
2017-11-03 22:16:57 +01:00
def posts(self):
return self.api.posts(self.blog, {"tag": self.tag})
2017-11-03 22:16:57 +01:00
2018-01-12 14:56:01 +01:00
class TumblrLikesExtractor(TumblrExtractor):
"""Extractor for images from a tumblr-user by tag"""
subcategory = "likes"
directory_fmt = ["{category}", "{name}", "likes"]
pattern = [BASE_PATTERN + r"/likes"]
2018-01-12 14:56:01 +01:00
test = [("http://mikf123.tumblr.com/likes", {
"count": 1,
})]
def posts(self):
return self.api.likes(self.blog)
2018-01-12 14:56:01 +01:00
2017-11-03 22:16:57 +01:00
class TumblrAPI():
"""Minimal interface for the Tumblr API v2"""
API_KEY = "O3hU2tMi5e4Qs5t3vezEi6L0qRORJ5y9oUpSGsrWu8iA3UCc3B"
API_SECRET = "sFdsK3PDdP2QpYMRAoq0oDnw0sFS24XigXmdfnaeNZpJpqAn03"
2017-11-03 22:16:57 +01:00
def __init__(self, extractor):
self.api_key = extractor.config("api-key", self.API_KEY)
api_secret = extractor.config("api-secret", self.API_SECRET)
token = extractor.config("access-token")
token_secret = extractor.config("access-token-secret")
if token and token_secret:
self.session = util.OAuthSession(
extractor.session,
self.api_key, api_secret, token, token_secret)
self.api_key = None
else:
self.session = extractor.session
2018-01-12 14:56:01 +01:00
self.posts_type = None
2017-11-03 22:16:57 +01:00
self.extractor = extractor
@memcache(keyarg=1)
def info(self, blog):
"""Return general information about a blog"""
return self._call(blog, "info", {})["blog"]
def posts(self, blog, params):
"""Retrieve published posts"""
2018-01-12 14:56:01 +01:00
params.update({"offset": 0, "limit": 50, "reblog_info": "true"})
if self.posts_type:
params["type"] = self.posts_type
while True:
data = self._call(blog, "posts", params)
yield from data["posts"]
params["offset"] += params["limit"]
if params["offset"] >= data["total_posts"]:
return
def likes(self, blog):
"""Retrieve liked posts"""
params = {"limit": 50}
while True:
posts = self._call(blog, "likes", params)["liked_posts"]
if not posts:
return
yield from posts
params["before"] = posts[-1]["liked_timestamp"]
2017-11-03 22:16:57 +01:00
def _call(self, blog, endpoint, params):
if self.api_key:
params["api_key"] = self.api_key
url = "https://api.tumblr.com/v2/blog/{}/{}".format(
2017-11-03 22:16:57 +01:00
blog, endpoint)
response = self.session.get(url, params=params).json()
2018-01-12 14:56:01 +01:00
status = response["meta"]["status"]
if status == 200:
return response["response"]
elif status == 403:
raise exception.AuthorizationError()
elif status == 404:
raise exception.NotFoundError("user or post")
2018-01-12 14:56:01 +01:00
else:
2017-11-03 22:16:57 +01:00
self.extractor.log.error(response)
raise exception.StopExtraction()