Merge pull request #674 from nate-moo/KwikFix

Kwik Fixes
master
Arjix 2021-08-20 19:00:22 +03:00 committed by GitHub
commit ecbd22c6e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 186 additions and 161 deletions

View File

@ -73,6 +73,9 @@ DEFAULT_CONFIG = {
'anistream.xyz': {
'version': 'subbed',
},
'animepahe': {
'version': 'subbed',
},
'animeflv': {
'version': 'subbed',
'servers': [

View File

@ -1,72 +1,122 @@
from base64 import b64decode
import requests
import logging
import re
import requests
from anime_downloader.extractors.base_extractor import BaseExtractor
from anime_downloader.sites import helpers
from anime_downloader import util
from subprocess import CalledProcessError
from anime_downloader import util
logger = logging.getLogger(__name__)
class Kwik(BaseExtractor):
'''Extracts video url from kwik pages, Kwik has some `security`
which allows to access kwik pages when only referred by something
and the kwik video stream when referred through the corresponding
kwik video page.
'''
YTSM = re.compile(r"ysmm = '([^']+)")
KWIK_PARAMS_RE = re.compile(r'\("(\w+)",\d+,"(\w+)",(\d+),(\d+),\d+\)')
KWIK_D_URL = re.compile(r'action="([^"]+)"')
KWIK_D_TOKEN = re.compile(r'value="([^"]+)"')
CHARACTER_MAP = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ+/"
def get_string(self, content: str, s1: int, s2: int) -> str:
slice_2 = self.CHARACTER_MAP[0:s2]
acc = 0
for n, i in enumerate(content[::-1]):
acc += int(i if i.isdigit() else 0) * s1**n
k = ''
while acc > 0:
k = slice_2[int(acc % s2)] + k
acc = (acc - (acc % s2)) / s2
return k or '0'
def decrypt(self, full_string: str, key: str, v1: int, v2: int) -> str:
v1, v2 = int(v1), int(v2)
r, i = "", 0
while i < len(full_string):
s = ""
while (full_string[i] != key[v2]):
s += full_string[i]
i += 1
j = 0
while j < len(key):
s = s.replace(key[j], str(j))
j += 1
r += chr(int(self.get_string(s, v2, 10)) - v1)
i += 1
return r
def decode_adfly(self, coded_key: str) -> str:
r, j = '', ''
for n, l in enumerate(coded_key):
if not n % 2:
r += l
else:
j = l + j
encoded_uri = list(r + j)
numbers = ((i, n) for i, n in enumerate(encoded_uri) if str.isdigit(n))
for first, second in zip(numbers, numbers):
xor = int(first[1]) ^ int(second[1])
if xor < 10:
encoded_uri[first[0]] = str(xor)
return b64decode(("".join(encoded_uri)).encode("utf-8")
)[16:-16].decode('utf-8', errors='ignore')
def bypass_adfly(self, adfly_url):
session = requests.session()
response_code = 302
while response_code != 200:
adfly_content = session.get(
session.get(
adfly_url,
allow_redirects=False).headers.get('location'),
allow_redirects=False)
response_code = adfly_content.status_code
return self.decode_adfly(self.YTSM.search(adfly_content.text).group(1))
def get_stream_url_from_kwik(self, adfly_url):
session = requests.session()
f_content = requests.get(
self.bypass_adfly(adfly_url),
headers={
'referer': 'https://kwik.cx/'
}
)
decrypted = self.decrypt(
*
self.KWIK_PARAMS_RE.search(
f_content.text
).group(
1, 2,
3, 4
)
)
code = 419
while code != 302:
content = session.post(
self.KWIK_D_URL.search(decrypted).group(1),
allow_redirects=False,
data={
'_token': self.KWIK_D_TOKEN.search(decrypted).group(1)},
headers={
'referer': str(f_content.url),
'cookie': f_content.headers.get('set-cookie')})
code = content.status_code
return content.headers.get('location')
def _get_data(self):
# Kwik servers don't have direct link access you need to be referred
# from somewhere, I will just use the url itself. We then
# have to rebuild the url. Hopefully kwik doesn't block this too
# Necessary
self.url = self.url.replace(".cx/e/", ".cx/f/")
self.headers.update({"referer": self.url})
cookies = util.get_hcaptcha_cookies(self.url)
if not cookies:
resp = util.bypass_hcaptcha(self.url)
else:
resp = requests.get(self.url, cookies=cookies)
title_re = re.compile(r'title>(.*)<')
kwik_text = resp.text
deobfuscated = None
loops = 0
while not deobfuscated and loops < 6:
try:
deobfuscated = helpers.soupify(util.deobfuscate_packed_js(re.search(r'<(script).*(var\s+_.*escape.*?)</\1>(?s)', kwik_text).group(2)))
except (AttributeError, CalledProcessError) as e:
if type(e) == AttributeError:
resp = util.bypass_hcaptcha(self.url)
kwik_text = resp.text
if type(e) == CalledProcessError:
resp = requests.get(self.url, cookies=cookies)
finally:
cookies = resp.cookies
title = title_re.search(kwik_text).group(1)
loops += 1
post_url = deobfuscated.form["action"]
token = deobfuscated.input["value"]
resp = helpers.post(post_url, headers=self.headers, params={"_token": token}, cookies=cookies, allow_redirects=False)
stream_url = resp.headers["Location"]
logger.debug('Stream URL: %s' % stream_url)
return {
'stream_url': stream_url,
'meta': {
'title': title,
'thumbnail': ''
},
'stream_url': self.get_stream_url_from_kwik(self.url),
'referer': None
}

View File

@ -8,57 +8,9 @@ from anime_downloader.sites import helpers
logger = logging.getLogger(__name__)
class AnimePaheEpisode(AnimeEpisode, sitename='animepahe'):
QUALITIES = ['360p', '480p', '720p', '1080p']
def _get_source(self, episode_id, server, session_id):
# We will extract the episodes data through the animepahe api
# which returns the available qualities and the episode sources.
params = {
'id': episode_id,
'm': 'embed',
'p': server,
'session': session_id
}
episode_data = helpers.get('https://animepahe.com/api', params=params).json()
episode_data = episode_data['data']
sources = {}
for info in range(len(episode_data)):
quality = list(episode_data[info].keys())[0]
sources[f'{quality}p'] = episode_data[info][quality]['kwik']
if self.quality in sources:
return (server, sources[self.quality])
return
def _get_sources(self):
supported_servers = ['kwik', 'mp4upload', 'rapidvideo']
source_text = helpers.get(self.url, cf=True).text
sources = []
server_list = re.findall(r'data-provider="([^"]+)', source_text)
episode_id, session_id = re.search("getUrls\((\d+?), \"(.*)?\"", source_text).groups()
for server in server_list:
if server not in supported_servers:
continue
source = self._get_source(episode_id, server, session_id)
if source:
sources.append(source)
if sources:
return sources
raise NotFoundError
class AnimePahe(Anime, sitename='animepahe'):
sitename = 'animepahe'
api_url = 'https://animepahe.com/api'
base_anime_url = 'https://animepahe.com/anime/'
QUALITIES = ['360p', '480p', '720p', '1080p']
_episodeClass = AnimePaheEpisode
@classmethod
def search(cls, query):
@ -69,68 +21,87 @@ class AnimePahe(Anime, sitename='animepahe'):
}
search_results = helpers.get(cls.api_url, params=params).json()
results = []
if search_results['total'] == []:
return []
for search_result in search_results['data']:
search_result_info = SearchResult(
title=search_result['title'],
url=cls.base_anime_url + search_result['slug'],
poster=search_result['poster']
return [
SearchResult(
title=result['title'] + " (" + result['type'] + ")",
url="https://animepahe.com/anime/TITLE!" + result['title'] + " (" + result['type'] + ")" + '!TITLE/' + result['session'] + "/" + str(result['id']), # noqa
poster=result['poster']
)
for result in search_results['data']
]
logger.debug(search_result_info)
results.append(search_result_info)
def _scrape_episodes(self):
attr = self.url.split('/')
session = attr[-2]
id_ = attr[-1]
page = 1
headers = {'referer': 'https://animepahe.com/'}
return results
apiUri = self.api_url + '?m=release&id=' + id_ + '&sort=episode_asc&page='
jsonResponse = helpers.get(apiUri + str(page), headers=headers).json()
lastPage = jsonResponse['last_page']
perPage = jsonResponse['per_page']
total = jsonResponse['total']
ep = 1
episodes = []
def get_data(self):
page = helpers.get(self.url, cf=True).text
anime_id = re.search(r'&id=(\d+)', page).group(1)
self.params = {
'm': 'release',
'id': anime_id,
'sort': 'episode_asc',
'page': 1
}
json_resp = helpers.get(self.api_url, params=self.params).json()
self._scrape_metadata(page)
self._episode_urls = self._scrape_episodes(json_resp)
self._len = len(self._episode_urls)
return self._episode_urls
def _collect_episodes(self, ani_json, episodes=[]):
# Avoid changing original list
episodes = episodes[:]
# If episodes is not an empty list we ensure that we start off
# from the length of the episodes list to get correct episode
# numbers
for no, anime_ep in enumerate(ani_json, len(episodes)):
episodes.append((no + 1, f'{self.url}/{anime_ep["id"]}',))
return episodes
def _scrape_episodes(self, ani_json):
episodes = self._collect_episodes(ani_json['data'])
if not episodes:
raise NotFoundError(f'No episodes found for {self.url}')
if (lastPage == 1 and perPage > total):
for epi in jsonResponse['data']:
episodes.append(
f'{self.api_url}?m=links&id={epi["anime_id"]}&session={epi["session"]}&p=kwik!!TRUE!!')
else:
# Check if other pages exist since animepahe only loads
# first page and make subsequent calls to the api for every
# page
start_page = ani_json['current_page'] + 1
end_page = ani_json['last_page'] + 1
for i in range(start_page, end_page):
self.params['page'] = i
resp = helpers.get(self.api_url, params=self.params).json()
episodes = self._collect_episodes(resp['data'], episodes)
stop = False
for page in range(lastPage):
if stop:
break
for i in range(perPage):
if ep <= total:
episodes.append(
f'{self.api_url}?m=release&id={id_}&sort=episode_asc&page={page+1}&ep={ep}!!FALSE!!')
ep += 1
else:
stop = True
break
return episodes
def _scrape_metadata(self, data):
self.title = re.search(r'<h1>([^<]+)', data).group(1)
def _scrape_metadata(self):
self.title = re.findall(r"TITLE!(.*?)!TITLE", self.url)[0]
class AnimePaheEpisode(AnimeEpisode, sitename='animepahe'):
def _get_sources(self):
if '!!TRUE!!' in self.url:
self.url = self.url.replace('!!TRUE!!', '')
else:
headers = {'referer': 'https://animepahe.com/'}
regex = r"\&ep\=(\d+)\!\!FALSE\!\!"
episodeNum = int(re.findall(regex, self.url)[0])
self.url = re.sub(regex, '', self.url)
jsonResponse = helpers.get(self.url, headers=headers).json()
ep = None
for episode in jsonResponse['data']:
if int(episode['episode']) == episodeNum:
ep = episode
if ep:
self.url = 'https://animepahe.com/api?m=links&id=' + str(ep['anime_id']) + '&session=' + ep['session'] + '&p=kwik' # noqa
else:
raise NotFoundError
episode_data = helpers.get(self.url).json()
data = episode_data['data']
qualities = [x + 'p' for f in data for x in f]
sources_list = [
f[x]['kwik_adfly'] for f in data for x in f
]
for i, quality in enumerate(qualities):
if self.quality == quality:
return [("kwik", sources_list[i])]
return [("kwik", x) for x in sources_list]

View File

@ -18,6 +18,7 @@ ALL_ANIME_SITES = [
('animetake','animetake','AnimeTake'),
('animeonline','animeonline360','AnimeOnline'),
('animeout', 'animeout', 'AnimeOut'),
('animepahe', 'animepahe', 'AnimePahe'),
('animerush', 'animerush', 'AnimeRush'),
('animesimple', 'animesimple', 'AnimeSimple'),
('animestar', 'animestar', 'AnimeStar'),