Mike Fährmann 45baa13615
update extractor test results
- don't run Instagram tests on Travis anymore
- replace Twitter test because timeline was made private
- update Hiperdex domain to '.com' (again ...)
2020-05-28 02:18:06 +02:00

502 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# Copyright 2016-2020 Mike Fährmann
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
"""Extractors for https://twitter.com/"""
from .common import Extractor, Message
from .. import text, exception
from ..cache import cache, memcache
import json
import re
class TwitterExtractor(Extractor):
"""Base class for twitter extractors"""
category = "twitter"
directory_fmt = ("{category}", "{user[name]}")
filename_fmt = "{tweet_id}_{num}.{extension}"
archive_fmt = "{tweet_id}_{retweet_id}_{num}"
cookiedomain = ".twitter.com"
root = "https://twitter.com"
sizes = (":orig", ":large", ":medium", ":small")
user_agent = ("Mozilla/5.0 (Windows NT 6.1; WOW64; "
"Trident/7.0; rv:11.0) like Gecko")
def __init__(self, match):
Extractor.__init__(self, match)
self.user = match.group(1)
self._user_dict = None
self.logged_in = False
self.retweets = self.config("retweets", True)
self.replies = self.config("replies", True)
self.twitpic = self.config("twitpic", False)
self.content = self.config("content", False)
self.videos = self.config("videos", True)
if self.content:
self._emoji_sub = re.compile(
r'<img class="Emoji [^>]+ alt="([^"]+)"[^>]*>').sub
def items(self):
self.login()
metadata = self.metadata()
yield Message.Version, 1
for tweet in self.tweets():
data = self._data_from_tweet(tweet)
if not data or \
not self.retweets and data["retweet_id"] or \
not self.replies and data["reply"]:
continue
data.update(metadata)
if self.videos and "-videoContainer" in tweet:
yield Message.Directory, data
if self.videos == "ytdl":
data["extension"] = None
url = "ytdl:{}/i/web/status/{}".format(
self.root, data["tweet_id"])
else:
url = self._video_from_tweet(data["tweet_id"])
if not url:
continue
text.nameext_from_url(url, data)
if data["extension"] == "m3u8":
url = "ytdl:" + url
data["extension"] = "mp4"
data["_ytdl_extra"] = {"protocol": "m3u8_native"}
data["num"] = 1
yield Message.Url, url, data
elif "data-image-url=" in tweet:
yield Message.Directory, data
images = text.extract_iter(
tweet, 'data-image-url="', '"')
for data["num"], url in enumerate(images, 1):
text.nameext_from_url(url, data)
urls = [url + size for size in self.sizes]
yield Message.Urllist, urls, data
if self.twitpic and "//twitpic.com/" in tweet:
urls = [
url for url in text.extract_iter(
tweet, 'data-expanded-url="', '"')
if "//twitpic.com/" in url
]
if "num" not in data:
if urls:
yield Message.Directory, data
data["num"] = 0
for data["num"], url in enumerate(urls, data["num"]+1):
response = self.request(url, fatal=False)
if response.status_code >= 400:
continue
url = text.extract(
response.text, 'name="twitter:image" value="', '"')[0]
yield Message.Url, url, text.nameext_from_url(url, data)
def metadata(self):
"""Return general metadata"""
return {}
def tweets(self):
"""Yield HTML content of all relevant tweets"""
def login(self):
username, password = self._get_auth_info()
if username:
self._update_cookies(self._login_impl(username, password))
self.logged_in = True
@cache(maxage=360*24*3600, keyarg=1)
def _login_impl(self, username, password):
self.log.info("Logging in as %s", username)
headers = {"User-Agent": self.user_agent}
page = self.request(self.root + "/login", headers=headers).text
pos = page.index('name="authenticity_token"')
token = text.extract(page, 'value="', '"', pos-80)[0]
url = self.root + "/sessions"
data = {
"session[username_or_email]": username,
"session[password]" : password,
"authenticity_token" : token,
"ui_metrics" : '{"rf":{},"s":""}',
"scribe_log" : "",
"redirect_after_login" : "",
"remember_me" : "1",
}
response = self.request(url, method="POST", headers=headers, data=data)
if "/error" in response.url:
raise exception.AuthenticationError()
return {
cookie.name: cookie.value
for cookie in self.session.cookies
if cookie.domain and "twitter.com" in cookie.domain
}
def _data_from_tweet(self, tweet):
extr = text.extract_from(tweet)
data = {
"tweet_id" : text.parse_int(extr('data-tweet-id="' , '"')),
"reply" : bool(extr('data-is-reply-to="' , '"')),
"retweet_id": text.parse_int(extr('data-retweet-id="', '"')),
"retweeter" : extr('data-retweeter="' , '"'),
"author" : {
"name" : extr('data-screen-name="', '"'),
"nick" : text.unescape(extr('data-name="' , '"')),
"id" : text.parse_int(extr('data-user-id="' , '"')),
},
}
if not self._user_dict:
if data["retweet_id"]:
for user in json.loads(text.unescape(extr(
'data-reply-to-users-json="', '"'))):
if user["screen_name"] == data["retweeter"]:
break
else:
self.log.warning("Unable to extract user info")
return None
self._user_dict = {
"name": user["screen_name"],
"nick": text.unescape(user["name"]),
"id" : text.parse_int(user["id_str"]),
}
else:
self._user_dict = data["author"]
data["user"] = self._user_dict
data["date"] = text.parse_timestamp(extr('data-time="', '"'))
if self.content:
content = extr('<div class="js-tweet-text-container">', '\n</div>')
if '<img class="Emoji ' in content:
content = self._emoji_sub(r"\1", content)
content = text.unescape(text.remove_html(content, "", ""))
cl, _, cr = content.rpartition("pic.twitter.com/")
data["content"] = cl if cl and len(cr) < 16 else content
if extr('<div class="QuoteTweet', '>'):
data["retweet_id"] = text.parse_int(extr('data-item-id="', '"'))
data["retweeter"] = data["user"]["name"]
data["author"] = {
"name" : extr('data-screen-name="', '"'),
"id" : text.parse_int(extr('data-user-id="' , '"')),
"nick" : text.unescape(extr(
'QuoteTweet-fullname', '<').partition('>')[2]),
}
return data
def _video_from_tweet(self, tweet_id):
url = "https://api.twitter.com/1.1/videos/tweet/config/{}.json".format(
tweet_id)
cookies = None
headers = {
"Origin" : self.root,
"Referer" : "{}/i/web/status/{}".format(self.root, tweet_id),
"x-csrf-token" : self.session.cookies.get("ct0"),
"authorization": "Bearer AAAAAAAAAAAAAAAAAAAAAPYXBAAAAAAACLXUNDekM"
"xqa8h%2F40K4moUkGsoc%3DTYfbDKbT3jJPCEVnMYqilB28N"
"HfOPqkca3qaAxGfsyKCs0wRbw",
}
if self.logged_in:
headers["x-twitter-auth-type"] = "OAuth2Session"
else:
token = _guest_token(self, headers)
cookies = {"gt": token}
headers["x-guest-token"] = token
response = self.request(
url, cookies=cookies, headers=headers, fatal=None)
if response.status_code == 429 or \
response.headers.get("x-rate-limit-remaining") == "0":
if self.logged_in:
self.wait(until=response.headers.get("x-rate-limit-reset"))
else:
_guest_token.invalidate()
return self._video_from_tweet(tweet_id)
elif response.status_code >= 400:
self.log.warning("Unable to fetch video data for %s ('%s %s')",
tweet_id, response.status_code, response.reason)
return None
return response.json()["track"]["playbackUrl"]
def _tweets_from_api(self, url, max_position=None):
params = {
"include_available_features": "1",
"include_entities": "1",
"max_position": max_position,
"reset_error_state": "false",
"lang": "en",
}
headers = {
"X-Requested-With": "XMLHttpRequest",
"X-Twitter-Active-User": "yes",
"Referer": self.root + "/",
}
while True:
data = self.request(url, params=params, headers=headers).json()
if "inner" in data:
data = data["inner"]
for tweet in text.extract_iter(
data["items_html"], '<div class="tweet ', '\n</li>'):
yield tweet
if data.get("min_position") is None:
if data["has_more_items"] and "min_position" not in data:
pass
else:
return
if "min_position" in data:
position = data["min_position"]
if position == max_position or position is None:
return
else:
position = text.parse_int(text.extract(
tweet, 'data-tweet-id="', '"')[0])
if max_position and position >= max_position:
return
params["max_position"] = max_position = position
class TwitterTimelineExtractor(TwitterExtractor):
"""Extractor for all images from a user's timeline"""
subcategory = "timeline"
pattern = (r"(?:https?://)?(?:www\.|mobile\.)?twitter\.com"
r"/(?!search)([^/?&#]+)/?(?:$|[?#])")
test = (
("https://twitter.com/supernaturepics", {
"range": "1-40",
"url": "0106229d408f4111d9a52c8fd2ad687f64842aa4",
"keyword": "4a3d28cc9f7a39e27333d56f3fe19e6e07ee979e",
}),
("https://mobile.twitter.com/supernaturepics?p=i"),
)
def tweets(self):
url = "{}/i/profiles/show/{}/timeline/tweets".format(
self.root, self.user)
return self._tweets_from_api(url)
class TwitterMediaExtractor(TwitterExtractor):
"""Extractor for all images from a user's Media Tweets"""
subcategory = "media"
pattern = (r"(?:https?://)?(?:www\.|mobile\.)?twitter\.com"
r"/(?!search)([^/?&#]+)/media(?!\w)")
test = (
("https://twitter.com/supernaturepics/media", {
"range": "1-40",
"url": "0106229d408f4111d9a52c8fd2ad687f64842aa4",
}),
("https://mobile.twitter.com/supernaturepics/media#t"),
)
def tweets(self):
url = "{}/i/profiles/show/{}/media_timeline".format(
self.root, self.user)
return self._tweets_from_api(url)
class TwitterSearchExtractor(TwitterExtractor):
"""Extractor for all images from a search timeline"""
subcategory = "search"
directory_fmt = ("{category}", "Search", "{search}")
pattern = (r"(?:https?://)?(?:www\.|mobile\.)?twitter\.com"
r"/search/?\?(?:[^&#]+&)*q=([^&#]+)")
test = ("https://twitter.com/search?q=nature", {
"range": "1-40",
"count": 40,
})
def metadata(self):
return {"search": self.user}
def tweets(self):
url = "{}/i/search/timeline?f=tweets&q={}".format(
self.root, self.user)
return self._tweets_from_api(url, "-1")
class TwitterTweetExtractor(TwitterExtractor):
"""Extractor for images from individual tweets"""
subcategory = "tweet"
pattern = (r"(?:https?://)?(?:www\.|mobile\.)?twitter\.com"
r"/([^/?&#]+|i/web)/status/(\d+)")
test = (
("https://twitter.com/supernaturepics/status/604341487988576256", {
"url": "0e801d2f98142dd87c3630ded9e4be4a4d63b580",
"keyword": "76e018cf3f4c8b82d3bdd425e01e28078c98373b",
"content": "ab05e1d8d21f8d43496df284d31e8b362cd3bcab",
}),
# 4 images
("https://twitter.com/perrypumas/status/894001459754180609", {
"url": "c8a262a9698cb733fb27870f5a8f75faf77d79f6",
"keyword": "c9251b1fd79d547b0c6b4577f06c937d0e9b63d2",
}),
# video
("https://twitter.com/perrypumas/status/1065692031626829824", {
"options": (("videos", True),),
"pattern": r"ytdl:https://video.twimg.com/ext_tw_video/.*.m3u8",
}),
# content with emoji, newlines, hashtags (#338)
("https://twitter.com/playpokemon/status/1263832915173048321", {
"options": (("content", True),),
"keyword": {"content": (
r"re:Gear up for #PokemonSwordShieldEX with special Mystery "
"Gifts! \n\nYoull be able to receive four Galarian form "
"Pokémon with Hidden Abilities, plus some very useful items. "
"Its our \\(Mystery\\) Gift to you, Trainers! \n\n❓🎁➡️ "
)},
}),
# Reply to another tweet (#403)
("https://twitter.com/tyson_hesse/status/1103767554424598528", {
"options": (("videos", "ytdl"),),
"pattern": r"ytdl:https://twitter.com/i/web.+/1103767554424598528",
}),
# 'replies' option (#705)
("https://twitter.com/tyson_hesse/status/1103767554424598528", {
"options": (("replies", False),),
"count": 0,
}),
# /i/web/ URL
("https://twitter.com/i/web/status/1155074198240292865", {
"pattern": r"https://pbs.twimg.com/media/EAel0vUUYAAZ4Bq.jpg:orig",
}),
# quoted tweet (#526)
("https://twitter.com/Pistachio/status/1222690391817932803", {
"pattern": r"https://pbs\.twimg\.com/media/EPfMfDUU8AAnByO\.jpg",
"keyword": {
"author": {"name": "Afro_Herper", "id": 786047748508221440},
"user" : {"name": "Pistachio" , "id": 3533231},
},
}),
# TwitPic embeds (#579)
("https://twitter.com/i/web/status/112900228289540096", {
"options": (("twitpic", True),),
"pattern": r"https://\w+.cloudfront.net/photos/large/\d+.jpg",
"count": 3,
}),
)
def __init__(self, match):
TwitterExtractor.__init__(self, match)
self.tweet_id = match.group(2)
def tweets(self):
url = "{}/i/web/status/{}".format(self.root, self.tweet_id)
cookies = {"app_shell_visited": "1"}
headers = {"User-Agent": self.user_agent, "Referer": url}
response = self.request(url, cookies=cookies, headers=headers)
if response.history and response.url == self.root + "/":
raise exception.AuthorizationError()
page = response.text
end = page.index('class="js-tweet-stats-container')
beg = page.rindex('<div class="tweet ', 0, end)
return (page[beg:end],)
class TwitterBookmarkExtractor(TwitterExtractor):
"""Extractor for bookmarked tweets"""
subcategory = "bookmark"
pattern = r"(?:https?://)?(?:www\.|mobile\.)?twitter\.com/i/bookmarks()"
test = ("https://twitter.com/i/bookmarks",)
def items(self):
self.login()
if not self.logged_in:
raise exception.AuthorizationError("Login required")
for cookie in self.session.cookies:
cookie.expires = None
url = "https://api.twitter.com/2/timeline/bookmark.json"
params = {
"include_profile_interstitial_type": "1",
"include_blocking": "1",
"include_blocked_by": "1",
"include_followed_by": "1",
"include_want_retweets": "1",
"include_mute_edge": "1",
"include_can_dm": "1",
"include_can_media_tag": "1",
"skip_status": "1",
"cards_platform": "Web-12",
"include_cards": "1",
"include_composer_source": "true",
"include_ext_alt_text": "true",
"include_reply_count": "1",
"tweet_mode": "extended",
"include_entities": "true",
"include_user_entities": "true",
"include_ext_media_color": "true",
"include_ext_media_availability": "true",
"send_error_codes": "true",
"simple_quoted_tweets": "true",
"count": "100",
"cursor": None,
"ext": "mediaStats%2CcameraMoment",
}
headers = {
"authorization": "Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejR"
"COuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu"
"4FA33AGWWjCpTnA",
"Origin": self.root,
"Referer": self.root + "/i/bookmarks",
"x-csrf-token": self.session.cookies.get("ct0"),
"x-twitter-active-user": "yes",
"x-twitter-auth-type": "OAuth2Session",
"x-twitter-client-language": "en",
}
while True:
response = self.request(
url, params=params, headers=headers, fatal=False)
if response.status_code >= 400:
raise exception.StopExtraction(response.text)
data = response.json()
tweets = data["globalObjects"]["tweets"]
if not tweets:
return
for tweet_id, tweet_data in tweets.items():
tweet_url = "{}/i/web/status/{}".format(self.root, tweet_id)
tweet_data["_extractor"] = TwitterTweetExtractor
yield Message.Queue, tweet_url, tweet_data
inst = data["timeline"]["instructions"][0]
for entry in inst["addEntries"]["entries"]:
if entry["entryId"].startswith("cursor-bottom-"):
params["cursor"] = \
entry["content"]["operation"]["cursor"]["value"]
break
@memcache()
def _guest_token(extr, headers):
return extr.request(
"https://api.twitter.com/1.1/guest/activate.json",
method="POST", headers=headers,
).json().get("guest_token")