anime-downloader/anime_downloader/sites/helpers/selescrape.py

328 lines
9.8 KiB
Python

from selenium.webdriver.remote.remote_connection import LOGGER as serverLogger
from anime_downloader.const import get_random_header
from urllib.parse import urlencode
from selenium import webdriver
from sys import platform
import tempfile
import logging
import click
import time
import json
import os
def open_config():
from anime_downloader.config import Config
return Config
cache = False
serverLogger.setLevel(logging.ERROR)
logger = logging.getLogger(__name__)
TEMP_FOLDER = os.path.join(tempfile.gettempdir(), 'AnimeDL-SeleniumCache')
data = open_config()
if not os.path.isdir(TEMP_FOLDER):
os.makedirs(TEMP_FOLDER)
def get_data_dir():
'''
Gets the folder directory selescrape will store data,
such as cookies or browser extensions and logs.
'''
APP_NAME = 'anime downloader'
return os.path.join(click.get_app_dir(APP_NAME), 'data')
def get_browser_config():
'''
Decides what browser selescrape will use.
'''
os_browser = { # maps os to a browser
'linux': 'firefox',
'darwin': 'chrome',
'win32': 'chrome'
}
for a in os_browser:
if platform.startswith(a):
browser = os_browser[a]
else:
browser = 'chrome'
value = data['dl']['selescrape_browser']
value = value.lower() if value else value
if value in ['chrome', 'firefox']:
browser = value
return browser
def get_browser_executable():
value = data['dl']['selescrape_browser_executable_path']
executable_value = value.lower() if value else value
if executable_value:
return executable_value
def get_driver_binary():
value = data['dl']['selescrape_driver_binary_path']
if value:
return value
def cache_request(sele_response):
"""
This function saves the response from a Selenium request in a json.
It uses timestamps to know if the cache has expired or not.
"""
if not cache:
return
file = os.path.join(TEMP_FOLDER, 'selenium_cached_requests.json')
if os.path.isfile(file):
with open(file, 'r') as f:
tmp_cache = json.load(f)
else:
tmp_cache = {}
data = sele_response.__dict__
url = data['url']
url = (url[:-1] if url and url[-1] == '/' else url)
tmp_cache[url] = {
'data': data['text'],
'expiry': time.time(),
'method': data['method'],
'cookies': data['cookies'],
'user_agent': data['user_agent']
}
with open(file, 'w') as f:
json.dump(tmp_cache, f, indent=4)
def check_cache(url):
"""
This function checks if the cache file exists,
if it exists then it will read the file
And it will verify if the cache is less than or equal to 30 mins old
If it is, it will return it as it is.
If it isn't, it will delete the expired cache from the file and return None
If the file doesn't exist at all it will return None
"""
if not cache:
return
file = os.path.join(TEMP_FOLDER, 'selenium_cached_requests.json')
if os.path.isfile(file):
with open(file, 'r') as f:
data = json.load(f)
# Yes, this is ugly,
# but its the best way that I found to find the cache
# when the url is not exactly the same (a slash at the end or not)
clean_url = (url[:-1] if url and url[-1] == '/' else url)
found = False
for link in data:
if link == clean_url:
url = link
found = True
if not found:
return
timestamp = data[url]['expiry']
if (time.time() - timestamp <= 1800):
return data[url]
else:
data.pop(url, None)
with open(file, 'w') as f:
json.dump(data, f, indent=4)
def driver_select():
'''
This configures what each browser should do
and returns the corresponding driver.
'''
browser = get_browser_config()
data_dir = get_data_dir()
executable = get_browser_executable()
binary = get_driver_binary()
if browser == 'firefox':
fireFox_Options = webdriver.FirefoxOptions()
ops = [
"--width=1920", "--height=1080",
"-headless", "--log fatal"
]
for option in ops:
fireFox_Options.add_argument(option)
fireFox_Profile = webdriver.FirefoxProfile()
fireFox_Profile.set_preference(
"general.useragent.override", get_random_header()['user-agent']
)
driver = webdriver.Firefox(
# sets user-agent
firefox_profile=fireFox_Profile,
# sets various firefox settings
options=fireFox_Options,
# by default it will be None, if a binary location is in the config then it will use that
firefox_binary=None if not executable else executable,
# by default it will be "geckodriver", if a geckodriver location is in the config then it will use that
executable_path=(binary if binary else "geckodriver"),
# an attempt at stopping selenium from printing a pile of garbage to the console.
service_log_path=os.path.devnull
)
elif browser == 'chrome':
from selenium.webdriver.chrome.options import Options
profile_path = os.path.join(data_dir, 'Selenium_chromium')
chrome_options = Options()
ops = [
"--headless", "--disable-gpu", '--log-level=OFF',
f"--user-data-dir={profile_path}", "--no-sandbox",
"--window-size=1920,1080", f"user-agent={get_random_header()['user-agent']}" # noqa
]
for option in ops:
chrome_options.add_argument(option)
cap = None
if executable:
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
cap = DesiredCapabilities.CHROME
cap['binary_location'] = executable
driver = webdriver.Chrome(
# sets user-agent, and various chrome settings
options=chrome_options,
# by default it will be "chromedriver", if a chromedriver location is in the config then it will use that
executable_path=(binary if binary else "chromedriver"),
# by default it will be None, if a binary location is in the config then it will use that
desired_capabilities=cap,
# an attempt at stopping selenium from printing a pile of garbage to the console.
service_log_path=os.path.devnull
)
return driver
def cloudflare_wait(driver):
'''
It waits until cloudflare has gone away before doing any further actions.
The way it works is by getting the title of the page
and as long as it is "Just a moment..." it will keep waiting.
This part of the code won't make the code execute slower
if the target website has no Cloudflare redirection.
At most it will sleep 1 second as a precaution.
Also, i have made it time out after 50 seconds, useful if the target website is not responsive
and to stop it from running infinitely.
'''
abort_after = 50 # seconds
start = time.time()
title = driver.title # title = "Just a moment..."
while "Just a moment" in title:
time.sleep(0.35)
delta = time.time() - start
if delta >= abort_after:
logger.error(f'Timeout:\tCouldnt bypass cloudflare. \
See the screenshot for more info:\t{get_data_dir()}/screenshot.png')
return 1
title = driver.title
if not "Just a moment" in title:
break
time.sleep(2) # This is necessary to make sure everything has loaded fine.
return 0
def request(request_type, url, **kwargs): # Headers not yet supported , headers={}
params = kwargs.get('params', {})
url = url if not params else url + '?' + urlencode(params)
cached_data = check_cache(url)
if cached_data:
text = cached_data['data']
user_agent = cached_data['user_agent']
request_type = cached_data['method']
cookies = cached_data['cookies']
return SeleResponse(url, request_type, text, cookies, user_agent)
else:
driver = driver_select()
driver.get(url)
try:
exit_code = cloudflare_wait(driver)
user_agent = driver.execute_script("return navigator.userAgent;")
cookies = driver.get_cookies()
text = driver.page_source
driver.close()
if exit_code != 0:
return SeleResponse(url, request_type, None, cookies, user_agent)
seleResponse = SeleResponse(
url, request_type,
text, cookies,
user_agent
)
cache_request(seleResponse)
return seleResponse
except:
driver.save_screenshot(f"{get_data_dir()}/screenshot.png")
driver.close()
logger.error(f'There was a problem getting the page: {url}.' +
'\nSee the screenshot for more info:\t{get_data_dir()}/screenshot.png')
return
class SeleResponse:
"""
Class for the selenium response.
Attributes
----------
url: string
URL of the webpage.
medthod: GET or POST
Request type.
text/content: string
Webpage contents.
cookies: dict
Stored cookies from the website.
user_agent: string
User agent used on the webpage
"""
def __init__(self, url, method, text, cookies, user_agent):
self.url = url
self.method = method
self.text = text
self.content = text
self.cookies = cookies
self.user_agent = user_agent
def __str__(self):
return self.text
def __repr__(self):
return '<SeleResponse URL: {} METHOD: {} TEXT: {} COOKIES: {} USERAGENT: {}>'.format(
self.url, self.method, self.text, self.cookies, self.user_agent)