anime-downloader/anime_downloader/commands/test.py

177 lines
6.5 KiB
Python

import logging
import sys
import threading
import os
import click
from fuzzywuzzy import fuzz
from anime_downloader.sites import get_anime_class, ALL_ANIME_SITES
from anime_downloader import util
from anime_downloader.__version__ import __version__
import requests
logging.getLogger(requests.packages.urllib3.__package__).setLevel(logging.ERROR) #disable Retry warnings
logger = logging.getLogger(__name__)
echo = click.echo
sitenames = [v[1] for v in ALL_ANIME_SITES]
class SiteThread(threading.Thread):
def __init__(self, provider, anime, verify, v_tries, *args, **kwargs):
self.provider = provider
self.anime = anime
self.verify = verify
self.v_tries = v_tries
self.search_result = None
self.exception = None
super().__init__(*args, **kwargs)
def run(self):
try:
ani = get_anime_class(self.provider)
self.search_result = ani.search(self.anime)
if self.search_result:
if self.verify:
ratios = [[fuzz.token_set_ratio(self.anime.lower(), sr.title.lower()), sr] for sr in self.search_result]
ratios = sorted(ratios, key=lambda x: x[0], reverse=True)
end = len(ratios)
for r in range(self.v_tries):
if r == end: break
try:
anime_choice = ratios[r][1]
anime_url = ani(anime_choice.url)
stream_url = anime_url[0].source().stream_url
self.exception = None
break
except Exception as e:
self.exception = e
self.search_result = util.format_search_results(self.search_result)
except Exception as e:
self.exception = e
@click.command()
@click.argument('anime', default='naruto')
@click.option(
'-f', '--prompt-found', is_flag=True,
help='Ask to stop searching on anime match.')
@click.option(
'-p', '--providers',
help='Limit search to specific provider(s) separated by a comma.'
)
@click.option(
'-e', '--exclude',
help='Provider(s) to exclude separated by a comma.'
)
@click.option(
'-v', '--verify', is_flag=True,
help='Verify extraction of stream url in case of anime match.'
)
@click.option(
'-n', '--v-tries', type=int, default=1,
help='Number of tries to extract stream url. (default: 1)'
)
@click.option(
'-z', '--no-fuzzy', is_flag=True,
help='Disable fuzzy search to include possible inaccurate results.'
)
@click.option(
'-r', '--print-results', is_flag=True,
help='Enable echoing the search results at the end of testing.'
)
@click.option(
'-t', '--timeout', type=int, default=10,
help='How long to wait for a site to respond. (default: 10s)'
)
def command(anime, prompt_found, providers, exclude, verify, v_tries, no_fuzzy, print_results, timeout):
"""Test all sites to see which ones are working and which ones aren't. Test naruto as a default. Return results for each provider."""
util.print_info(__version__)
logger = logging.getLogger("anime_downloader")
logger.setLevel(logging.ERROR)
if providers:
providers = [p.strip() for p in providers.split(",")]
for p in providers:
if not p in sitenames:
raise click.BadParameter(f"{p}. Choose from {', '.join(sitenames)}")
else:
providers = sitenames
if exclude:
exclude = [e.strip() for e in exclude.split(",")]
for e in exclude:
if not e in sitenames:
raise click.BadParameter(f"{e}. Choose from {', '.join(sitenames)}")
else:
if e in providers:
providers.remove(e)
if os.name == 'nt':
p, f = '', '' # Emojis don't work in cmd
else:
p, f = '', ''
if verify:
timeout = timeout + (3 * (v_tries - 1))
threads = []
matches = []
for provider in providers:
t = SiteThread(provider, anime, verify, v_tries, daemon=True)
t.start()
threads.append(t)
for i, thread in enumerate(threads):
try:
click.echo(f"[{i+1} of {len(threads)}] Searching ", nl=False)
click.secho(f"{thread.provider}", nl=False, fg="cyan")
click.echo(f"... (CTRL-C to stop) : ", nl=False)
thread.join(timeout=timeout)
if not thread.is_alive():
if not thread.exception:
if thread.search_result:
if not no_fuzzy:
ratio = fuzz.token_set_ratio(anime.lower(), thread.search_result.lower())
else:
ratio = 100
if ratio > 50:
matches.append([thread.provider, thread.search_result, ratio])
click.secho(p + "Works, anime found.", fg="green")
if prompt_found:
if print_results:
click.echo(f"\n- - -{thread.provider}- - -\n\n{thread.search_result}")
confirm = click.confirm(f"Found anime in {thread.provider}. Keep seaching?", default=True)
if not confirm:
break
else:
click.secho(p + "Works, anime not found.", fg="yellow")
else:
click.secho(p + "Works, anime not found.", fg="yellow")
else:
logging.debug('Error occurred during testing.')
logging.debug(thread.exception)
if thread.search_result:
click.secho(f + "Not working: anime found, extraction failed.", fg="red")
else:
click.secho(f + "Not working.", fg="red")
else:
logging.debug('Timeout during testing.')
click.secho(f + "Not working: Timeout. Use -t to specify longer waiting period.", fg="red")
except KeyboardInterrupt:
skip = click.confirm(f"\nSkip {thread.provider} and continue searching? (Press enter for Yes)", default=True)
if not skip:
break
if print_results:
click.echo("\n" + util.format_matches(matches))
else:
click.echo("\n" + "Test finished.")