diff options
2 files changed, 461 insertions, 0 deletions
diff --git a/libs/subliminal_patch/providers/ b/libs/subliminal_patch/providers/
new file mode 100644
index 000000000..89418cfb6
--- /dev/null
+++ b/libs/subliminal_patch/providers/
@@ -0,0 +1,411 @@
+# -*- coding: utf-8 -*-
+from difflib import SequenceMatcher
+import functools
+import logging
+import re
+import time
+import urllib.parse
+from bs4 import BeautifulSoup as bso
+import cloudscraper
+from guessit import guessit
+from requests import Session
+from requests.exceptions import HTTPError
+from subliminal_patch.core import Episode
+from subliminal_patch.core import Movie
+from subliminal_patch.exceptions import APIThrottled
+from subliminal_patch.providers import Provider
+from subliminal_patch.providers.utils import get_archive_from_bytes
+from subliminal_patch.providers.utils import get_subtitle_from_archive
+from subliminal_patch.providers.utils import update_matches
+from subliminal_patch.subtitle import Subtitle
+from subzero.language import Language
+from libs.subliminal.exceptions import ProviderError
+logger = logging.getLogger(__name__)
+class SubsceneSubtitle(Subtitle):
+ provider_name = "subscene_cloudscraper"
+ hash_verifiable = False
+ def __init__(self, language, page_link, release_info, episode_number=None):
+ super().__init__(language, page_link=page_link)
+ self.release_info = release_info
+ self.episode_number = episode_number
+ self.episode_title = None
+ self._matches = set(
+ ("title", "year")
+ if episode_number is None
+ else ("title", "series", "year", "season", "episode")
+ )
+ def get_matches(self, video):
+ update_matches(self._matches, video, self.release_info)
+ return self._matches
+ @property
+ def id(self):
+ return self.page_link
+_BASE_URL = ""
+# TODO: add more seasons and languages
+ "First",
+ "Second",
+ "Third",
+ "Fourth",
+ "Fifth",
+ "Sixth",
+ "Seventh",
+ "Eighth",
+ "Ninth",
+ "Tenth",
+ "Eleventh",
+ "Twelfth",
+ "Thirdteenth",
+ "Fourthteenth",
+ "Fifteenth",
+ "Sixteenth",
+ "Seventeenth",
+ "Eightheenth",
+ "Nineteenth",
+ "Tweentieth",
+ "english": "eng",
+ "farsi_persian": "per",
+ "arabic": "ara",
+ "spanish": "spa",
+ "portuguese": "por",
+ "italian": "ita",
+ "dutch": "dut",
+ "hebrew": "heb",
+ "indonesian": "ind",
+ "danish": "dan",
+ "norwegian": "nor",
+ "bengali": "ben",
+ "bulgarian": "bul",
+ "croatian": "hrv",
+ "swedish": "swe",
+ "vietnamese": "vie",
+ "czech": "cze",
+ "finnish": "fin",
+ "french": "fre",
+ "german": "ger",
+ "greek": "gre",
+ "hungarian": "hun",
+ "icelandic": "ice",
+ "japanese": "jpn",
+ "macedonian": "mac",
+ "malay": "may",
+ "polish": "pol",
+ "romanian": "rum",
+ "russian": "rus",
+ "serbian": "srp",
+ "thai": "tha",
+ "turkish": "tur",
+class SubsceneProvider(Provider):
+ provider_name = "subscene_cloudscraper"
+ _movie_title_regex = re.compile(r"^(.+?)( \((\d{4})\))?$")
+ _tv_show_title_regex = re.compile(
+ r"^(.+?) [-\(]\s?(.*?) (season|series)\)?( \((\d{4})\))?$"
+ )
+ _supported_languages = {}
+ _supported_languages["brazillian-portuguese"] = Language("por", "BR")
+ for key, val in _LANGUAGE_MAP.items():
+ _supported_languages[key] = Language.fromalpha3b(val)
+ _supported_languages_reversed = {
+ val: key for key, val in _supported_languages.items()
+ }
+ languages = set(_supported_languages.values())
+ video_types = (Episode, Movie)
+ subtitle_class = SubsceneSubtitle
+ def initialize(self):
+ pass
+ def terminate(self):
+ pass
+ def _scraper_call(self, url, retry=7, method="GET", sleep=5, **kwargs):
+ last_exc = None
+ for n in range(retry):
+ # Creating an instance for every try in order to avoid dropped connections.
+ # This could probably be improved!
+ scraper = cloudscraper.create_scraper()
+ if method == "GET":
+ req = scraper.get(url, **kwargs)
+ elif method == "POST":
+ req =, **kwargs)
+ else:
+ raise NotImplementedError(f"{method} not allowed")
+ try:
+ req.raise_for_status()
+ except HTTPError as error:
+ logger.debug(
+ "'%s' returned. Trying again [%d] in %s", error, n + 1, sleep
+ )
+ last_exc = error
+ time.sleep(sleep)
+ else:
+ return req
+ raise ProviderError("403 Retry count exceeded") from last_exc
+ def _gen_results(self, query):
+ url = (
+ f"{_BASE_URL}/subtitles/searchbytitle?query={urllib.parse.quote(query)}&l="
+ )
+ result = self._scraper_call(url, method="POST")
+ soup = bso(result.content, "html.parser")
+ for title in"li div[class='title'] a"):
+ yield title
+ def _search_movie(self, title, year):
+ title = title.lower()
+ year = str(year)
+ found_movie = None
+ results = []
+ for result in self._gen_results(title):
+ text = result.text.lower()
+ match = self._movie_title_regex.match(text)
+ if not match:
+ continue
+ match_title =
+ match_year =
+ if year == match_year:
+ results.append(
+ {
+ "href": result.get("href"),
+ "similarity": SequenceMatcher(None, title, match_title).ratio(),
+ }
+ )
+ if results:
+ results.sort(key=lambda x: x["similarity"], reverse=True)
+ found_movie = results[0]["href"]
+ logger.debug("Movie found: %s", results[0])
+ return found_movie
+ def _search_tv_show_season(self, title, season, year=None):
+ try:
+ season_str = _SEASONS[season - 1].lower()
+ except IndexError:
+ logger.debug("Season number not supported: %s", season)
+ return None
+ found_tv_show_season = None
+ results = []
+ for result in self._gen_results(title):
+ text = result.text.lower()
+ match = self._tv_show_title_regex.match(text)
+ if not match:
+ logger.debug("Series title not matched: %s", text)
+ continue
+ else:
+ logger.debug("Series title matched: %s", text)
+ match_title =
+ match_season =
+ # Match "complete series" titles as they usually contain season packs
+ if season_str == match_season or "complete" in match_season:
+ plus = 0.1 if year and str(year) in text else 0
+ results.append(
+ {
+ "href": result.get("href"),
+ "similarity": SequenceMatcher(None, title, match_title).ratio()
+ + plus,
+ }
+ )
+ if results:
+ results.sort(key=lambda x: x["similarity"], reverse=True)
+ found_tv_show_season = results[0]["href"]
+ logger.debug("TV Show season found: %s", results[0])
+ return found_tv_show_season
+ def _find_movie_subtitles(self, path, language):
+ soup = self._get_subtitle_page_soup(path, language)
+ subtitles = []
+ for item in"tr"):
+ subtitle = _get_subtitle_from_item(item, language)
+ if subtitle is None:
+ continue
+ logger.debug("Found subtitle: %s", subtitle)
+ subtitles.append(subtitle)
+ return subtitles
+ def _find_episode_subtitles(
+ self, path, season, episode, language, episode_title=None
+ ):
+ soup = self._get_subtitle_page_soup(path, language)
+ subtitles = []
+ for item in"tr"):
+ valid_item = None
+ clean_text = " ".join(item.text.split())
+ if not clean_text:
+ continue
+ # It will return list values
+ guess = _memoized_episode_guess(clean_text)
+ if "season" not in guess:
+ if "complete series" in clean_text.lower():
+ logger.debug("Complete series pack found: %s", clean_text)
+ guess["season"] = [season]
+ else:
+ logger.debug("Nothing guessed from release: %s", clean_text)
+ continue
+ if season in guess["season"] and episode in guess.get("episode", []):
+ logger.debug("Episode match found: %s - %s", guess, clean_text)
+ valid_item = item
+ elif season in guess["season"] and not "episode" in guess:
+ logger.debug("Season pack found: %s", clean_text)
+ valid_item = item
+ if valid_item is None:
+ continue
+ subtitle = _get_subtitle_from_item(item, language, episode)
+ if subtitle is None:
+ continue
+ subtitle.episode_title = episode_title
+ logger.debug("Found subtitle: %s", subtitle)
+ subtitles.append(subtitle)
+ return subtitles
+ def _get_subtitle_page_soup(self, path, language):
+ language_path = self._supported_languages_reversed[language]
+ result = self._scraper_call(f"{_BASE_URL}{path}/{language_path}")
+ return bso(result.content, "html.parser")
+ def list_subtitles(self, video, languages):
+ is_episode = isinstance(video, Episode)
+ if is_episode:
+ result = self._search_tv_show_season(video.series, video.season, video.year)
+ else:
+ result = self._search_movie(video.title, video.year)
+ if result is None:
+ logger.debug("No results")
+ return []
+ subtitles = []
+ for language in languages:
+ if is_episode:
+ subtitles.extend(
+ self._find_episode_subtitles(
+ result, video.season, video.episode, language, video.title
+ )
+ )
+ else:
+ subtitles.extend(self._find_movie_subtitles(result, language))
+ return subtitles
+ def download_subtitle(self, subtitle):
+ # TODO: add MustGetBlacklisted support
+ result = self._scraper_call(subtitle.page_link)
+ soup = bso(result.content, "html.parser")
+ try:
+ download_url = _BASE_URL + str(
+ soup.select_one("a[id='downloadButton']")["href"] # type: ignore
+ )
+ except (AttributeError, KeyError, TypeError):
+ raise APIThrottled(f"Couldn't get download url from {subtitle.page_link}")
+ downloaded = self._scraper_call(download_url)
+ archive = get_archive_from_bytes(downloaded.content)
+ if archive is None:
+ raise APIThrottled(f"Invalid archive: {subtitle.page_link}")
+ subtitle.content = get_subtitle_from_archive(
+ archive,
+ episode=subtitle.episode_number,
+ episode_title=subtitle.episode_title,
+ )
[email protected]_cache(2048)
+def _memoized_episode_guess(content):
+ # Use include to save time from unnecessary checks
+ return guessit(
+ content,
+ {
+ "type": "episode",
+ # Add codec keys to avoid matching x264, 5.1, etc as episode info
+ "includes": ["season", "episode", "video_codec", "audio_codec"],
+ "enforce_list": True,
+ },
+ )
+def _get_subtitle_from_item(item, language, episode_number=None):
+ release_infos = []
+ try:
+ release_infos.append(item.find("td", {"class": "a6"}).text.strip())
+ except (AttributeError, KeyError):
+ pass
+ try:
+ release_infos.append(
+ item.find("td", {"class": "a1"}).find_all("span")[-1].text.strip()
+ )
+ except (AttributeError, KeyError):
+ pass
+ release_info = "".join(r_info for r_info in release_infos if r_info)
+ try:
+ path = item.find("td", {"class": "a1"}).find("a")["href"]
+ except (AttributeError, KeyError):
+ logger.debug("Couldn't get path: %s", item)
+ return None
+ return SubsceneSubtitle(language, _BASE_URL + path, release_info, episode_number)
diff --git a/tests/subliminal_patch/ b/tests/subliminal_patch/
new file mode 100644
index 000000000..72063aae3
--- /dev/null
+++ b/tests/subliminal_patch/
@@ -0,0 +1,50 @@
+from subliminal_patch.providers import subscene_cloudscraper as subscene
+def test_provider_scraper_call():
+ with subscene.SubsceneProvider() as provider:
+ result = provider._scraper_call(
+ ""
+ )
+ assert result.status_code == 200
+def test_provider_gen_results():
+ with subscene.SubsceneProvider() as provider:
+ assert list(provider._gen_results("Breaking Bad"))
+def test_provider_search_movie():
+ with subscene.SubsceneProvider() as provider:
+ result = provider._search_movie("Taxi Driver", 1976)
+ assert result == "/subtitles/taxi-driver"
+def test_provider_find_movie_subtitles(languages):
+ with subscene.SubsceneProvider() as provider:
+ result = provider._find_movie_subtitles(
+ "/subtitles/taxi-driver", languages["en"]
+ )
+ assert result
+def test_provider_search_tv_show_season():
+ with subscene.SubsceneProvider() as provider:
+ result = provider._search_tv_show_season("The Wire", 1)
+ assert result == "/subtitles/the-wire--first-season"
+def test_provider_find_episode_subtitles(languages):
+ with subscene.SubsceneProvider() as provider:
+ result = provider._find_episode_subtitles(
+ "/subtitles/the-wire--first-season", 1, 1, languages["en"]
+ )
+ assert result
+def test_provider_download_subtitle(languages):
+ path = ""
+ subtitle = subscene.SubsceneSubtitle(languages["en"], path, "", 1)
+ with subscene.SubsceneProvider() as provider:
+ provider.download_subtitle(subtitle)
+ assert subtitle.is_valid()