Improve test command functionality

master
czoins 2020-11-25 21:01:30 +01:00
parent 43437587c0
commit 2a77e6934d
2 changed files with 152 additions and 32 deletions

View File

@ -3,11 +3,15 @@ import sys
import threading
import os
import click
from fuzzywuzzy import fuzz
from anime_downloader.sites import get_anime_class, ALL_ANIME_SITES
from anime_downloader import util
from anime_downloader.__version__ import __version__
import requests
logging.getLogger(requests.packages.urllib3.__package__).setLevel(logging.ERROR) #disable Retry warnings
logger = logging.getLogger(__name__)
echo = click.echo
@ -15,54 +19,162 @@ sitenames = [v[1] for v in ALL_ANIME_SITES]
class SiteThread(threading.Thread):
def __init__(self, site, *args, **kwargs):
self.site = site
def __init__(self, provider, anime, verify, v_tries, *args, **kwargs):
self.provider = provider
self.anime = anime
self.verify = verify
self.v_tries = v_tries
self.search_result = None
self.exception = None
super().__init__(*args, **kwargs)
def run(self):
try:
ani = get_anime_class(self.site)
ani = get_anime_class(self.provider)
self.search_result = ani.search(self.anime)
if self.search_result:
if self.verify:
ratios = [[fuzz.token_set_ratio(self.anime.lower(), sr.title.lower()), sr] for sr in self.search_result]
ratios = sorted(ratios, key=lambda x: x[0], reverse=True)
end = len(ratios)
for r in range(self.v_tries):
if r == end: break
try:
anime_choice = ratios[r][1]
anime_url = ani(anime_choice.url)
stream_url = anime_url[0].source().stream_url
self.exception = None
break
except Exception as e:
self.exception = e
self.search_result = util.format_search_results(self.search_result)
# this should be more dynamic
sr = ani.search('naruto')[0]
anime = ani(sr.url)
stream_url = anime[0].source().stream_url
except Exception as e:
self.exception = e
@click.command()
@click.argument('test_query', default='naruto')
def command(test_query):
"""Test all sites to see which ones are working and which ones aren't. Test naruto as a default."""
@click.argument('anime', default='naruto')
@click.option(
'-f', '--full-search', is_flag=True,
help='Don\'t ask to stop searching on anime match.')
@click.option(
'-p', '--providers',
help='Limit search to specific provider(s) separated by a comma.'
)
@click.option(
'-e', '--exclude',
help='Provider(s) to exclude separated by a comma.'
)
@click.option(
'-s', '--selenium', is_flag=True,
help='Enable providers using selenium.'
)
@click.option(
'-v', '--verify', is_flag=True,
help='Verify extraction of stream url in case of anime match.'
)
@click.option(
'-n', '--v-tries', type=int, default=1,
help='Number of tries to extract stream url. (default: 1)'
)
@click.option(
'-z', '--no-fuzzy', is_flag=True,
help='Disable fuzzy search to include possible inaccurate results.'
)
@click.option(
'-d', '--no-results', is_flag=True,
help='Disable echoing the search results at the end of testing.'
)
@click.option(
'-t', '--timeout', type=int, default=10,
help='How long to wait for a site to respond. (default: 10s)'
)
def command(anime, full_search, providers, exclude, selenium, verify, v_tries, no_fuzzy, no_results, timeout):
"""Test all sites to see which ones are working and which ones aren't. Test naruto as a default. Return results for each provider."""
util.print_info(__version__)
logger = logging.getLogger("anime_downloader")
logger.setLevel(logging.ERROR)
threads = []
if providers:
providers = [p.strip() for p in providers.split(",")]
for p in providers:
if not p in sitenames:
raise click.BadParameter(f"{p}. Choose from {', '.join(sitenames)}")
else:
providers = sitenames
if not selenium:
providers.remove("kisscartoon")
for site in sitenames:
t = SiteThread(site, daemon=True)
if exclude:
exclude = [e.strip() for e in exclude.split(",")]
for e in exclude:
if not e in sitenames:
raise click.BadParameter(f"{e}. Choose from {', '.join(sitenames)}")
else:
if e in providers:
providers.remove(e)
if os.name == 'nt':
p, f = '', '' # Emojis don't work in cmd
else:
p, f = '', ''
if verify:
timeout = timeout + (3 * (v_tries - 1))
threads = []
matches = []
for provider in providers:
t = SiteThread(provider, anime, verify, v_tries, daemon=True)
t.start()
threads.append(t)
for thread in threads:
if os.name == 'nt':
p, f = 'Works: ', "Doesn't work: " # Emojis doesn't work in cmd
else:
p, f = '', ''
thread.join(timeout=10)
if not thread.is_alive():
if not thread.exception:
# echo(click.style('Works ', fg='green') + site)
echo(click.style(p, fg='green') + thread.site)
for i, thread in enumerate(threads):
try:
click.echo(f"[{i+1} of {len(threads)}] Searching ", nl=False)
click.secho(f"{thread.provider}", nl=False, fg="cyan")
click.echo(f"... (CTRL-C to stop) : ", nl=False)
thread.join(timeout=timeout)
if not thread.is_alive():
if not thread.exception:
if thread.search_result:
if not no_fuzzy:
ratio = fuzz.token_set_ratio(anime.lower(), thread.search_result.lower())
else:
ratio = 100
if ratio > 50:
matches.append([thread.provider, thread.search_result, ratio])
click.secho(p + "Works, anime found.", fg="green")
if not full_search:
click.echo(f"\n- - -{thread.provider}- - -\n\n{thread.search_result}")
confirm = click.confirm(f"Found anime in {thread.provider}. Keep seaching? (use -f / --full-search to disable this prompt)", default=True)
if not confirm:
break
else:
click.secho(p + "Works, anime not found.", fg="yellow")
else:
click.secho(p + "Works, anime not found.", fg="yellow")
else:
logging.debug('Error occurred during testing.')
logging.debug(thread.exception)
if thread.search_result:
click.secho(f + "Not working: anime found, extraction failed.", fg="red")
else:
click.secho(f + "Not working.", fg="red")
else:
logging.debug('Error occurred during testing')
logging.debug(thread.exception)
echo(click.style(f, fg='red') + thread.site)
else:
logging.debug('timeout during testing')
echo(click.style(f, fg='red') + thread.site)
logging.debug('Timeout during testing.')
click.secho(f + "Not working: Timeout. Use -t to specify longer waiting period.", fg="red")
except KeyboardInterrupt:
skip = click.confirm(f"\nSkip {thread.provider} and continue searching? (Press enter for Yes)", default=True)
if not skip:
break
if not no_results:
click.echo("\n" + util.format_matches(matches))

View File

@ -77,6 +77,14 @@ def format_search_results(search_results):
table = '\n'.join(table.split('\n')[::-1])
return table
def format_matches(matches):
if matches:
table = [[[p], [sr]] for p, sr, r in sorted(matches, key = lambda x: x[2], reverse=True)]
table = [a for b in table for a in b]
else:
table = [["None"]]
table = tabulate(table, ['RESULTS'], tablefmt='grid', colalign=("center",))
return table
def search(query, provider, val=None, season_info=None, ratio=50):
# Will use animeinfo sync if season_info is provided