summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--libs/subliminal_patch/providers/subscene_cloudscraper.py411
-rw-r--r--tests/subliminal_patch/test_subscene.py50
2 files changed, 461 insertions, 0 deletions
diff --git a/libs/subliminal_patch/providers/subscene_cloudscraper.py b/libs/subliminal_patch/providers/subscene_cloudscraper.py
new file mode 100644
index 000000000..89418cfb6
--- /dev/null
+++ b/libs/subliminal_patch/providers/subscene_cloudscraper.py
@@ -0,0 +1,411 @@
+# -*- coding: utf-8 -*-
+
+from difflib import SequenceMatcher
+import functools
+import logging
+import re
+import time
+import urllib.parse
+
+from bs4 import BeautifulSoup as bso
+import cloudscraper
+from guessit import guessit
+from requests import Session
+from requests.exceptions import HTTPError
+from subliminal_patch.core import Episode
+from subliminal_patch.core import Movie
+from subliminal_patch.exceptions import APIThrottled
+from subliminal_patch.providers import Provider
+from subliminal_patch.providers.utils import get_archive_from_bytes
+from subliminal_patch.providers.utils import get_subtitle_from_archive
+from subliminal_patch.providers.utils import update_matches
+from subliminal_patch.subtitle import Subtitle
+from subzero.language import Language
+
+from libs.subliminal.exceptions import ProviderError
+
+logger = logging.getLogger(__name__)
+
+
+class SubsceneSubtitle(Subtitle):
+ provider_name = "subscene_cloudscraper"
+ hash_verifiable = False
+
+ def __init__(self, language, page_link, release_info, episode_number=None):
+ super().__init__(language, page_link=page_link)
+
+ self.release_info = release_info
+ self.episode_number = episode_number
+ self.episode_title = None
+
+ self._matches = set(
+ ("title", "year")
+ if episode_number is None
+ else ("title", "series", "year", "season", "episode")
+ )
+
+ def get_matches(self, video):
+ update_matches(self._matches, video, self.release_info)
+
+ return self._matches
+
+ @property
+ def id(self):
+ return self.page_link
+
+
+_BASE_URL = "https://subscene.com"
+
+# TODO: add more seasons and languages
+
+_SEASONS = (
+ "First",
+ "Second",
+ "Third",
+ "Fourth",
+ "Fifth",
+ "Sixth",
+ "Seventh",
+ "Eighth",
+ "Ninth",
+ "Tenth",
+ "Eleventh",
+ "Twelfth",
+ "Thirdteenth",
+ "Fourthteenth",
+ "Fifteenth",
+ "Sixteenth",
+ "Seventeenth",
+ "Eightheenth",
+ "Nineteenth",
+ "Tweentieth",
+)
+
+_LANGUAGE_MAP = {
+ "english": "eng",
+ "farsi_persian": "per",
+ "arabic": "ara",
+ "spanish": "spa",
+ "portuguese": "por",
+ "italian": "ita",
+ "dutch": "dut",
+ "hebrew": "heb",
+ "indonesian": "ind",
+ "danish": "dan",
+ "norwegian": "nor",
+ "bengali": "ben",
+ "bulgarian": "bul",
+ "croatian": "hrv",
+ "swedish": "swe",
+ "vietnamese": "vie",
+ "czech": "cze",
+ "finnish": "fin",
+ "french": "fre",
+ "german": "ger",
+ "greek": "gre",
+ "hungarian": "hun",
+ "icelandic": "ice",
+ "japanese": "jpn",
+ "macedonian": "mac",
+ "malay": "may",
+ "polish": "pol",
+ "romanian": "rum",
+ "russian": "rus",
+ "serbian": "srp",
+ "thai": "tha",
+ "turkish": "tur",
+}
+
+
+class SubsceneProvider(Provider):
+ provider_name = "subscene_cloudscraper"
+
+ _movie_title_regex = re.compile(r"^(.+?)( \((\d{4})\))?$")
+ _tv_show_title_regex = re.compile(
+ r"^(.+?) [-\(]\s?(.*?) (season|series)\)?( \((\d{4})\))?$"
+ )
+ _supported_languages = {}
+ _supported_languages["brazillian-portuguese"] = Language("por", "BR")
+
+ for key, val in _LANGUAGE_MAP.items():
+ _supported_languages[key] = Language.fromalpha3b(val)
+
+ _supported_languages_reversed = {
+ val: key for key, val in _supported_languages.items()
+ }
+
+ languages = set(_supported_languages.values())
+
+ video_types = (Episode, Movie)
+ subtitle_class = SubsceneSubtitle
+
+ def initialize(self):
+ pass
+
+ def terminate(self):
+ pass
+
+ def _scraper_call(self, url, retry=7, method="GET", sleep=5, **kwargs):
+ last_exc = None
+
+ for n in range(retry):
+ # Creating an instance for every try in order to avoid dropped connections.
+
+ # This could probably be improved!
+ scraper = cloudscraper.create_scraper()
+ if method == "GET":
+ req = scraper.get(url, **kwargs)
+ elif method == "POST":
+ req = scraper.post(url, **kwargs)
+ else:
+ raise NotImplementedError(f"{method} not allowed")
+
+ try:
+ req.raise_for_status()
+ except HTTPError as error:
+ logger.debug(
+ "'%s' returned. Trying again [%d] in %s", error, n + 1, sleep
+ )
+ last_exc = error
+ time.sleep(sleep)
+ else:
+ return req
+
+ raise ProviderError("403 Retry count exceeded") from last_exc
+
+ def _gen_results(self, query):
+ url = (
+ f"{_BASE_URL}/subtitles/searchbytitle?query={urllib.parse.quote(query)}&l="
+ )
+
+ result = self._scraper_call(url, method="POST")
+ soup = bso(result.content, "html.parser")
+
+ for title in soup.select("li div[class='title'] a"):
+ yield title
+
+ def _search_movie(self, title, year):
+ title = title.lower()
+ year = str(year)
+
+ found_movie = None
+
+ results = []
+ for result in self._gen_results(title):
+ text = result.text.lower()
+ match = self._movie_title_regex.match(text)
+ if not match:
+ continue
+ match_title = match.group(1)
+ match_year = match.group(3)
+ if year == match_year:
+ results.append(
+ {
+ "href": result.get("href"),
+ "similarity": SequenceMatcher(None, title, match_title).ratio(),
+ }
+ )
+
+ if results:
+ results.sort(key=lambda x: x["similarity"], reverse=True)
+ found_movie = results[0]["href"]
+ logger.debug("Movie found: %s", results[0])
+ return found_movie
+
+ def _search_tv_show_season(self, title, season, year=None):
+ try:
+ season_str = _SEASONS[season - 1].lower()
+ except IndexError:
+ logger.debug("Season number not supported: %s", season)
+ return None
+
+ found_tv_show_season = None
+
+ results = []
+ for result in self._gen_results(title):
+ text = result.text.lower()
+
+ match = self._tv_show_title_regex.match(text)
+ if not match:
+ logger.debug("Series title not matched: %s", text)
+ continue
+ else:
+ logger.debug("Series title matched: %s", text)
+
+ match_title = match.group(1)
+ match_season = match.group(2)
+
+ # Match "complete series" titles as they usually contain season packs
+ if season_str == match_season or "complete" in match_season:
+ plus = 0.1 if year and str(year) in text else 0
+ results.append(
+ {
+ "href": result.get("href"),
+ "similarity": SequenceMatcher(None, title, match_title).ratio()
+ + plus,
+ }
+ )
+
+ if results:
+ results.sort(key=lambda x: x["similarity"], reverse=True)
+ found_tv_show_season = results[0]["href"]
+ logger.debug("TV Show season found: %s", results[0])
+
+ return found_tv_show_season
+
+ def _find_movie_subtitles(self, path, language):
+ soup = self._get_subtitle_page_soup(path, language)
+
+ subtitles = []
+ for item in soup.select("tr"):
+ subtitle = _get_subtitle_from_item(item, language)
+ if subtitle is None:
+ continue
+
+ logger.debug("Found subtitle: %s", subtitle)
+ subtitles.append(subtitle)
+
+ return subtitles
+
+ def _find_episode_subtitles(
+ self, path, season, episode, language, episode_title=None
+ ):
+ soup = self._get_subtitle_page_soup(path, language)
+
+ subtitles = []
+
+ for item in soup.select("tr"):
+ valid_item = None
+ clean_text = " ".join(item.text.split())
+
+ if not clean_text:
+ continue
+
+ # It will return list values
+ guess = _memoized_episode_guess(clean_text)
+
+ if "season" not in guess:
+ if "complete series" in clean_text.lower():
+ logger.debug("Complete series pack found: %s", clean_text)
+ guess["season"] = [season]
+ else:
+ logger.debug("Nothing guessed from release: %s", clean_text)
+ continue
+
+ if season in guess["season"] and episode in guess.get("episode", []):
+ logger.debug("Episode match found: %s - %s", guess, clean_text)
+ valid_item = item
+
+ elif season in guess["season"] and not "episode" in guess:
+ logger.debug("Season pack found: %s", clean_text)
+ valid_item = item
+
+ if valid_item is None:
+ continue
+
+ subtitle = _get_subtitle_from_item(item, language, episode)
+
+ if subtitle is None:
+ continue
+
+ subtitle.episode_title = episode_title
+
+ logger.debug("Found subtitle: %s", subtitle)
+ subtitles.append(subtitle)
+
+ return subtitles
+
+ def _get_subtitle_page_soup(self, path, language):
+ language_path = self._supported_languages_reversed[language]
+ result = self._scraper_call(f"{_BASE_URL}{path}/{language_path}")
+ return bso(result.content, "html.parser")
+
+ def list_subtitles(self, video, languages):
+ is_episode = isinstance(video, Episode)
+
+ if is_episode:
+ result = self._search_tv_show_season(video.series, video.season, video.year)
+ else:
+ result = self._search_movie(video.title, video.year)
+
+ if result is None:
+ logger.debug("No results")
+ return []
+
+ subtitles = []
+
+ for language in languages:
+ if is_episode:
+ subtitles.extend(
+ self._find_episode_subtitles(
+ result, video.season, video.episode, language, video.title
+ )
+ )
+ else:
+ subtitles.extend(self._find_movie_subtitles(result, language))
+
+ return subtitles
+
+ def download_subtitle(self, subtitle):
+ # TODO: add MustGetBlacklisted support
+
+ result = self._scraper_call(subtitle.page_link)
+ soup = bso(result.content, "html.parser")
+ try:
+ download_url = _BASE_URL + str(
+ soup.select_one("a[id='downloadButton']")["href"] # type: ignore
+ )
+ except (AttributeError, KeyError, TypeError):
+ raise APIThrottled(f"Couldn't get download url from {subtitle.page_link}")
+
+ downloaded = self._scraper_call(download_url)
+ archive = get_archive_from_bytes(downloaded.content)
+
+ if archive is None:
+ raise APIThrottled(f"Invalid archive: {subtitle.page_link}")
+
+ subtitle.content = get_subtitle_from_archive(
+ archive,
+ episode=subtitle.episode_number,
+ episode_title=subtitle.episode_title,
+ )
+
+
[email protected]_cache(2048)
+def _memoized_episode_guess(content):
+ # Use include to save time from unnecessary checks
+ return guessit(
+ content,
+ {
+ "type": "episode",
+ # Add codec keys to avoid matching x264, 5.1, etc as episode info
+ "includes": ["season", "episode", "video_codec", "audio_codec"],
+ "enforce_list": True,
+ },
+ )
+
+
+def _get_subtitle_from_item(item, language, episode_number=None):
+ release_infos = []
+
+ try:
+ release_infos.append(item.find("td", {"class": "a6"}).text.strip())
+ except (AttributeError, KeyError):
+ pass
+
+ try:
+ release_infos.append(
+ item.find("td", {"class": "a1"}).find_all("span")[-1].text.strip()
+ )
+ except (AttributeError, KeyError):
+ pass
+
+ release_info = "".join(r_info for r_info in release_infos if r_info)
+
+ try:
+ path = item.find("td", {"class": "a1"}).find("a")["href"]
+ except (AttributeError, KeyError):
+ logger.debug("Couldn't get path: %s", item)
+ return None
+
+ return SubsceneSubtitle(language, _BASE_URL + path, release_info, episode_number)
diff --git a/tests/subliminal_patch/test_subscene.py b/tests/subliminal_patch/test_subscene.py
new file mode 100644
index 000000000..72063aae3
--- /dev/null
+++ b/tests/subliminal_patch/test_subscene.py
@@ -0,0 +1,50 @@
+from subliminal_patch.providers import subscene_cloudscraper as subscene
+
+
+def test_provider_scraper_call():
+ with subscene.SubsceneProvider() as provider:
+ result = provider._scraper_call(
+ "https://subscene.com/subtitles/breaking-bad-fifth-season"
+ )
+ assert result.status_code == 200
+
+
+def test_provider_gen_results():
+ with subscene.SubsceneProvider() as provider:
+ assert list(provider._gen_results("Breaking Bad"))
+
+
+def test_provider_search_movie():
+ with subscene.SubsceneProvider() as provider:
+ result = provider._search_movie("Taxi Driver", 1976)
+ assert result == "/subtitles/taxi-driver"
+
+
+def test_provider_find_movie_subtitles(languages):
+ with subscene.SubsceneProvider() as provider:
+ result = provider._find_movie_subtitles(
+ "/subtitles/taxi-driver", languages["en"]
+ )
+ assert result
+
+
+def test_provider_search_tv_show_season():
+ with subscene.SubsceneProvider() as provider:
+ result = provider._search_tv_show_season("The Wire", 1)
+ assert result == "/subtitles/the-wire--first-season"
+
+
+def test_provider_find_episode_subtitles(languages):
+ with subscene.SubsceneProvider() as provider:
+ result = provider._find_episode_subtitles(
+ "/subtitles/the-wire--first-season", 1, 1, languages["en"]
+ )
+ assert result
+
+
+def test_provider_download_subtitle(languages):
+ path = "https://subscene.com/subtitles/the-wire--first-season/english/115904"
+ subtitle = subscene.SubsceneSubtitle(languages["en"], path, "", 1)
+ with subscene.SubsceneProvider() as provider:
+ provider.download_subtitle(subtitle)
+ assert subtitle.is_valid()