anime-downloader/anime_downloader/util.py

579 lines
19 KiB
Python

import logging
import sys
import shutil
import click
import subprocess
import platform
import re
import os
import json
import errno
import time
import ast
import math
import coloredlogs
import pickle
import tempfile
import requests
from tabulate import tabulate
from uuid import uuid4
from secrets import choice
from urllib.parse import urlparse, unquote
from anime_downloader import session
from anime_downloader.sites import get_anime_class, helpers
from anime_downloader.const import desktop_headers, get_random_header
logger = logging.getLogger(__name__)
__all__ = [
'check_in_path',
'setup_logger',
'format_search_results',
'search',
'split_anime',
'parse_episode_range',
'parse_ep_str',
'print_episodeurl',
'play_episode',
'print_info',
]
def check_in_path(app):
"""
Checks to see if the given app exists on the path
:param app: app name to look for
:return: true if the app exists, false otherwise
"""
return shutil.which(app) is not None
def setup_logger(log_level):
if log_level == 'DEBUG':
format = '%(asctime)s %(hostname)s %(name)s[%(process)d] %(levelname)s %(message)s'
from http.client import HTTPConnection
HTTPConnection.debuglevel = 1
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
else:
format = click.style('anime', fg='green') + ': %(message)s'
logger = logging.getLogger("anime_downloader")
coloredlogs.install(level=log_level, fmt=format, logger=logger)
def format_search_results(search_results):
headers = [
'SlNo',
'Title',
'Meta',
]
table = [(i + 1, v.title, v.pretty_metadata)
for i, v in enumerate(search_results)]
table = tabulate(table, headers, tablefmt='psql')
table = '\n'.join(table.split('\n')[::-1])
return table
def format_matches(matches):
if matches:
table = [[[p], [sr]] for p, sr, r in sorted(matches, key = lambda x: x[2], reverse=True)]
table = [a for b in table for a in b]
else:
table = [["None"]]
table = tabulate(table, ['RESULTS'], tablefmt='grid', colalign=("center",))
return table
def search(query, provider, val=None, season_info=None, ratio=50):
# Will use animeinfo sync if season_info is provided
# Since this function outputs to stdout this should ideally be in
# cli. But it is used in watch too. :(
cls = get_anime_class(provider)
search_results = cls.search(query)
if not search_results:
logger.error('No such Anime found. Please ensure correct spelling.')
return None, None
if season_info:
from anime_downloader import animeinfo
match = animeinfo.fuzzy_match_metadata([season_info], search_results)
logger.debug('Match ratio: {}'.format(match.ratio))
# ratios are a range between 0-100 where 100 means 100% match.
if match.ratio >= ratio and not val:
logger.debug('Selected {}'.format(match.SearchResult.title))
return match.SearchResult.url, None
click.echo(format_search_results(search_results), err=True)
# Loop to allow re-propmt if the user chooses incorrectly
# Makes it harder to unintentionally exit the anime command if it's automated
while True:
if val == None:
val = click.prompt('Enter the anime no{}:'. format(' (0 to switch provider)' * (season_info != None)),
type=int, default=1, err=True)
try:
url = search_results[val - 1].url
title = search_results[val - 1].title
except IndexError:
logger.error('Only maximum of {} search results are allowed.'
' Please input a number less than {}'.format(
len(search_results), len(search_results) + 1))
val = False
continue
break
# Doesn't print if skipped.
if season_info is None or val != 0:
logger.info('Selected {}'.format(title))
return url, val
def primitive_search(search_results):
headers = [
'SlNo',
'Title',
]
table = [(i + 1, v.title)
for i, v in enumerate(search_results)]
table = tabulate(table, headers, tablefmt='psql')
table = '\n'.join(table.split('\n')[::-1])
click.echo(table, err=True)
while True:
val = click.prompt('Enter the anime no: ',
type=int, default=1, err=True)
try:
return search_results[val - 1]
except IndexError:
logger.error('Only maximum of {} search results are allowed.'
' Please input a number less than {}'.format(
len(search_results), len(search_results) + 1))
def download_metadata(file_format, metdata, episode, filename='metdata.json'):
# turns '{animeinfo_anime_title}/{animeinfo_anime_title}_{provider}_{ep_no}'
# to '{animeinfo_anime_title}/'
location = ''.join(file_format.split('/')[:-1])
location = format_filename(location, episode)
location_metadata = location + '/' + filename
if os.path.isfile(location_metadata):
logger.debug('Metadata file already downloaded.')
return False
make_dir(location)
with open(location_metadata, 'w') as file:
json.dump(metdata, file, indent=4)
logger.debug('Downloaded metadata to "{}".'.format(location_metadata))
return location_metadata
def split_anime(anime, episode_range):
from anime_downloader.sites.anime import AnimeEpisode
try:
start, end = [int(x) for x in episode_range.split(':')]
ep_range = [x for x in range(start, end)]
eps = [x for x in anime._episode_urls if x[0] in ep_range]
anime._episode_urls = [(x[0], x[1]) for x in eps]
anime._len = len(anime._episode_urls)
except ValueError:
# Only one episode specified
episode = int(episode_range)
anime = anime[episode - 1:episode]
return anime
def parse_episode_range(max_range, episode_range):
if not episode_range:
episode_range = '1:'
if episode_range.endswith(':'):
length = max_range if type(max_range) == int else (
int(max_range._episode_urls[-1][0]))
episode_range += str(length + 1)
if episode_range.startswith(':'):
episode_range = '1' + episode_range
return episode_range
def parse_ep_str(anime, grammar):
episodes = []
if not grammar:
return split_anime(anime, parse_episode_range(anime, grammar))
for episode_grammar in grammar.split(','):
if ':' in episode_grammar:
start, end = parse_episode_range(anime, episode_grammar).split(':')
episode_grammar = '%d:%d' % (int(start), int(end) + 1)
for episode in split_anime(anime, episode_grammar):
episodes.append(episode)
else:
from anime_downloader.sites.anime import AnimeEpisode
if episode_grammar == '0':
ep = sorted(anime._episode_urls)[-1]
else:
ep = [x for x in anime._episode_urls if x[0]
== int(episode_grammar)][0]
ep_cls = AnimeEpisode.subclasses[anime.sitename]
episodes.append(ep_cls(ep[1], parent=anime, ep_no=ep[0]))
return episodes
def print_episodeurl(episode):
# if episode.source().referer != '':
# print(episode.source().stream_url + "?referer=" + episode.source().referer)
# else:
# Currently I don't know of a way to specify referer in url itself so leaving it here.
url = episode.url if episode.url.startswith(
"magnet") else episode.source().stream_url
print(unquote(url))
def play_episode(episode, *, player, title, episodes="0:0"):
if player == 'mpv':
p = subprocess.Popen([player,
f'--title={title}',
f'--referrer={episode.source().referer}',
f'--user-agent={get_random_header()["user-agent"]}',
episode.source().stream_url])
elif player == "android":
p = subprocess.Popen(['am', 'start', '-a', 'android.intent.action.VIEW',
'-t', 'video/*', '-d', f'{episode.source().stream_url}'])
if episodes == None or ':' in episodes and episodes != "0:1":
input("Press enter to continue\n")
else:
p = subprocess.Popen([player, episode.source().stream_url])
p.wait()
def print_info(version):
logger.info('anime-downloader {}'.format(version))
logger.debug('Platform: {}'.format(platform.platform()))
logger.debug('Python {}'.format(platform.python_version()))
def get_json(url, params=None):
logger.debug('API call URL: {} with params {!r}'.format(url, params))
res = session.get_session().get(url, headers=desktop_headers, params=params)
logger.debug('URL: {}'.format(res.url))
data = res.json()
logger.debug('Returned data: {}'.format(data))
return data
def slugify(file_name):
file_name = str(file_name).strip().replace(' ', '_')
# First group removes filenames starting with a dot making them hidden.
# Second group removes anything not in it, for example '"/\|
return re.sub(r'(^\.)|([^-\w.!+-])', '', file_name)
def format_filename(filename, episode):
zerosTofill = math.ceil(math.log10(episode._parent._len))
rep_dict = {
'anime_title': slugify(episode._parent.title),
'ep_no': str(episode.ep_no).zfill(zerosTofill),
}
filename = filename.format(**rep_dict)
return filename
def format_command(cmd, episode, file_format, speed_limit, path):
from anime_downloader.config import Config
if not Config._CONFIG['dl']['aria2c_for_torrents'] and (episode.url.startswith('magnet:?xt=urn:btih:') or episode.source().stream_url.startswith('https://magnet:?xt=urn:btih:')):
url = episode.url if episode.url.startswith(
"magnet") else episode.source().stream_url
url = url.replace("https://", "")
return ['open', url]
# For aria2c.
log_levels = ['debug', 'info', 'notice', 'warn', 'error']
log_level = Config['dl']['aria2c_log_level'].lower()
if log_level not in log_levels:
logger.warn(
'Invalid logging level "{}", defaulting to "error".'.format(log_level))
logger.debug('Possible levels: {}.'.format(log_levels))
log_level = 'error'
cmd_dict = {
'{aria2}': 'aria2c {stream_url} -x 12 -s 12 -j 12 -k 10M -o '
'{file_format}.mp4 --continue=true --dir={download_dir} '
'--stream-piece-selector=inorder --min-split-size=5M --referer={referer} '
'--check-certificate=false --user-agent={useragent} --max-overall-download-limit={speed_limit} '
'--console-log-level={log_level}',
'{idm}': 'idman.exe /n /d {stream_url} /p {download_dir} /f {file_format}.mp4',
'{wget}': 'wget {stream_url} --referer={referer} --user-agent={useragent} -O {download_dir}/{file_format}.mp4 -c',
'{uget}': '/CMD/ --http-referer={referer} --http-user-agent={useragent} --folder={download_dir} --filename={file_format}.mp4 {stream_url}'
}
# Allows for passing the user agent with self.headers in the site.
# Some sites block downloads using a different user agent.
if episode.headers.get('user-agent'):
useragent = episode.headers['user-agent']
else:
useragent = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/605.1.15 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/605.1.15'
stream_url = episode.source().stream_url if not episode.url.startswith(
'magnet:?xt=urn:btih:') else episode.url
stream_url = stream_url if 'magnet:?xt=urn:btih:' not in stream_url else stream_url.replace(
'https://', '')
rep_dict = {
'stream_url': stream_url,
'file_format': file_format,
'download_dir': os.path.abspath(path),
'referer': episode.source().referer,
'useragent': useragent,
'speed_limit': speed_limit,
'log_level': log_level
}
if cmd == "{wget}":
# Create the directory if it doesn't exist
make_dir(
f"{rep_dict['download_dir']}/{os.path.dirname(format_filename(rep_dict['file_format'], episode))}")
path_string = file_format.replace('\\', '/').split('/')
rep_dict['file_format'] = path_string.pop(-1)
path_string = '/'.join(path_string)
rep_dict['download_dir'] = os.path.join(path, path_string)
if cmd == "{idm}":
rep_dict['file_format'] = rep_dict['file_format'].replace('/', '\\')
if cmd == '{uget}':
cmd_dict['{uget}'] = cmd_dict['{uget}'].replace('/CMD/', 'uget-gtk' if check_in_path('uget-gtk') else 'uget')
if cmd in cmd_dict:
cmd = cmd_dict[cmd]
cmd = cmd.split(' ')
cmd = [c.format(**rep_dict) for c in cmd]
cmd = [format_filename(c, episode) for c in cmd]
return cmd
def deobfuscate_packed_js(packedjs):
return eval_in_node('eval=console.log; ' + packedjs)
def eval_in_node(js: str):
# TODO: This should be in util
output = subprocess.check_output(['node', '-e', js])
return output.decode('utf-8')
def open_magnet(magnet):
if sys.platform.startswith('win32') or sys.platform.startswith('cygwin'):
os.startfile(magnet)
elif sys.platform.startswith('darwin'):
subprocess.Popen(['open', magnet],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
else:
subprocess.Popen(['xdg-open', magnet],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
def external_download(cmd, episode, file_format, speed_limit, path=''):
logger.debug('cmd: ' + cmd)
logger.debug('episode: {!r}'.format(episode))
logger.debug('file format: ' + file_format)
cmd = format_command(cmd, episode, file_format, speed_limit, path=path)
logger.debug('formatted cmd: ' + ' '.join(cmd))
if cmd[0] == 'open': # for torrents
open_magnet(cmd[1])
else:
p = subprocess.Popen(cmd)
return_code = p.wait()
if return_code != 0:
# Sleep for a while to make sure downloader exits correctly
time.sleep(2)
sys.exit(1)
def make_dir(path):
try:
os.makedirs(path)
except OSError as e:
if e.errno != errno.EEXIST:
raise
def get_filler_episodes(query):
def search_filler_episodes(query, page):
url = 'https://animefillerlist.com/search/node/'
search_results = helpers.soupify(helpers.get(
url + query, params={'page': page})).select('h3.title > a')
urls = [a.get('href') for a in search_results if a.get(
'href').split('/')[-2] == 'shows']
search_results = [
[
search_results[a].text]
for a in range(len(search_results)) if search_results[a].get('href').split('/')[-2] == 'shows'
]
return search_results, urls
results_list, urls_list = [], []
prev = ['']
for a in range(5): # Max 5 pages, could be done using the pager element
search_results, urls = search_filler_episodes(query, a)
# stops the loop if the same site is visited twice
if urls == prev and not (len(urls) == 0 or a == 0):
break
prev = urls[:]
for b in search_results:
results_list.append(b)
for c in urls:
urls_list.append(c)
[results_list[a].insert(0, a + 1)
for a in range(len(results_list))] # inserts numbers
headers = ["SlNo", "Title"]
table = tabulate(results_list, headers, tablefmt='psql')
table = '\n'.join(table.split('\n')[::-1])
click.echo(table)
val = click.prompt(
'Enter the filler-anime no (0 to cancel): ', type=int, default=1, err=True)
if val == 0:
return False
url = urls_list[val - 1]
try:
logger.info("Fetching filler episodes...")
res = helpers.get(url)
soup = helpers.soupify(res.text)
episodes = []
for filler_episode in soup.find("div", attrs={"class": "filler"}).find_all("a"):
txt = filler_episode.text.strip()
if '-' in txt:
split = txt.split('-')
for a in range(int(split[0]), int(split[1]) + 1):
episodes.append(a)
else:
episodes.append(int(txt))
logger.debug("Found {} filler episodes.".format(len(episodes)))
return episodes
except:
logger.warn(
"Can't get filler episodes. Will download all specified episodes.")
return False
class ClickListOption(click.Option):
def type_cast_value(self, ctx, value):
try:
if isinstance(value, list):
return value
return ast.literal_eval(value)
except:
raise click.BadParameter(value)
class Process:
def __init__(self, name, cmdline, pid):
self.name = name
self.pid = pid
self.cmdline = cmdline
def __str__(self):
return str({
'name': self.name,
'pid': self.pid,
'cmdline': self.cmdline
})
def getAllProcesses_Win32():
placeholder = list()
out = os.popen('WMIC path win32_process get Caption,Processid,Commandline').read(
).split('\n')[::2][1:]
for line in out:
f = line.split()
if f:
if len(f) > 2:
placeholder.append(
Process(name=f[0], cmdline=f[1:-1], pid=int(f[-1])))
else:
placeholder.append(
Process(name=f[0], cmdline=None, pid=int(f[-1])))
return placeholder
def getAllProcesses_unix():
if sys.platform.startswith('darwin'):
cmd = 'ps -Ao user,pid,%cpu,%mem,vsz,rss,tt,stat,start,time,command'
return []
elif sys.platform.startswith('linux'):
cmd = 'ps aux'
return []
out = os.popen(cmd).read()
out = out.split('\n')[1:]
placeholder = list()
for line in out:
try:
line_list = line.lower().split()
PID = line_list[1]
NAME = line_list[10:][0]
CMD = line_list[10:]
placeholder.append(Process(name=NAME, cmdline=CMD, pid=PID))
except IndexError:
continue
return placeholder
def get_all_processes():
return []
if sys.platform.startswith('win'):
return getAllProcesses_Win32()
else:
return getAllProcesses_unix()
def is_running(regex, expected_matches):
"""
Iterates through all the processes that are running
and returns a boolean if a process matches the regex passed
and the groups matched are equal to or more than the expected_matches.
"""
already_running = False
dict_pids = {
p.pid: [p.name, p.cmdline]
for p in get_all_processes()
}
if os.getpid() in dict_pids:
del dict_pids[os.getpid()]
for key, value in dict_pids.items():
if value[1]:
list_of_matches = re.findall(regex, ' '.join(value[1]))
if list_of_matches and len(list_of_matches) >= expected_matches:
already_running = True
return already_running