diff options
author | JayZed <[email protected]> | 2024-09-29 21:05:46 -0400 |
---|---|---|
committer | JayZed <[email protected]> | 2024-09-29 21:05:46 -0400 |
commit | c2a1e4d62c1bbb372127a78e4419a96d3c00e81c (patch) | |
tree | af19b97bbb71b514ef39928e55769fa85064e23a /custom_libs/subliminal_patch/providers/legendasnet.py | |
parent | 4cc6806193127f9d6d3f2dab26969471d9bbf159 (diff) | |
parent | 0200bb96d98127ee32b6b66f8d6b9e21d4571a4d (diff) | |
download | bazarr-c2a1e4d62c1bbb372127a78e4419a96d3c00e81c.tar.gz bazarr-c2a1e4d62c1bbb372127a78e4419a96d3c00e81c.zip |
Merge branch 'development' of https://github.com/morpheus65535/bazarr into developmentv1.4.5-beta.7
Diffstat (limited to 'custom_libs/subliminal_patch/providers/legendasnet.py')
-rw-r--r-- | custom_libs/subliminal_patch/providers/legendasnet.py | 264 |
1 files changed, 264 insertions, 0 deletions
diff --git a/custom_libs/subliminal_patch/providers/legendasnet.py b/custom_libs/subliminal_patch/providers/legendasnet.py new file mode 100644 index 000000000..b5c8e0cd9 --- /dev/null +++ b/custom_libs/subliminal_patch/providers/legendasnet.py @@ -0,0 +1,264 @@ +# -*- coding: utf-8 -*- +import logging +import os +import time +import io +import json + +from zipfile import ZipFile, is_zipfile +from urllib.parse import urljoin +from requests import Session + +from subzero.language import Language +from subliminal import Episode, Movie +from subliminal.exceptions import ConfigurationError, ProviderError, DownloadLimitExceeded +from subliminal_patch.exceptions import APIThrottled +from .mixins import ProviderRetryMixin +from subliminal_patch.subtitle import Subtitle +from subliminal.subtitle import fix_line_ending +from subliminal_patch.providers import Provider +from subliminal_patch.providers import utils + +logger = logging.getLogger(__name__) + +retry_amount = 3 +retry_timeout = 5 + + +class LegendasNetSubtitle(Subtitle): + provider_name = 'legendasnet' + hash_verifiable = False + + def __init__(self, language, forced, page_link, download_link, file_id, release_names, uploader, + season=None, episode=None): + super().__init__(language) + language = Language.rebuild(language, forced=forced) + + self.season = season + self.episode = episode + self.releases = release_names + self.release_info = ', '.join(release_names) + self.language = language + self.forced = forced + self.file_id = file_id + self.page_link = page_link + self.download_link = download_link + self.uploader = uploader + self.matches = None + + @property + def id(self): + return self.file_id + + def get_matches(self, video): + matches = set() + + # handle movies and series separately + if isinstance(video, Episode): + # series + matches.add('series') + # season + if video.season == self.season: + matches.add('season') + # episode + if video.episode == self.episode: + matches.add('episode') + # imdb + matches.add('series_imdb_id') + else: + # title + matches.add('title') + # imdb + matches.add('imdb_id') + + utils.update_matches(matches, video, self.release_info) + + self.matches = matches + + return matches + + +class LegendasNetProvider(ProviderRetryMixin, Provider): + """Legendas.Net Provider""" + server_hostname = 'legendas.net/api' + + languages = {Language('por', 'BR')} + video_types = (Episode, Movie) + + def __init__(self, username, password): + self.session = Session() + self.session.headers = {'User-Agent': os.environ.get("SZ_USER_AGENT", "Sub-Zero/2")} + self.username = username + self.password = password + self.access_token = None + self.video = None + self._started = None + self.login() + + def login(self): + headersList = { + "Accept": "*/*", + "User-Agent": self.session.headers['User-Agent'], + "Content-Type": "application/json" + } + + payload = json.dumps({ + "email": self.username, + "password": self.password + }) + + response = self.session.request("POST", self.server_url() + 'login', data=payload, headers=headersList) + if response.status_code != 200: + raise ConfigurationError('Failed to login and retrieve access token') + self.access_token = response.json().get('access_token') + if not self.access_token: + raise ConfigurationError('Access token not found in login response') + self.session.headers.update({'Authorization': f'Bearer {self.access_token}'}) + + def initialize(self): + self._started = time.time() + + def terminate(self): + self.session.close() + + def server_url(self): + return f'https://{self.server_hostname}/v1/' + + def query(self, languages, video): + self.video = video + + # query the server + if isinstance(self.video, Episode): + res = self.retry( + lambda: self.session.get(self.server_url() + 'search/tv', + json={ + 'name': video.series, + 'page': 1, + 'per_page': 25, + 'tv_episode': video.episode, + 'tv_season': video.season, + 'imdb_id': video.series_imdb_id + }, + headers={'Content-Type': 'application/json'}, + timeout=30), + amount=retry_amount, + retry_timeout=retry_timeout + ) + else: + res = self.retry( + lambda: self.session.get(self.server_url() + 'search/movie', + json={ + 'name': video.title, + 'page': 1, + 'per_page': 25, + 'imdb_id': video.imdb_id + }, + headers={'Content-Type': 'application/json'}, + timeout=30), + amount=retry_amount, + retry_timeout=retry_timeout + ) + + if res.status_code == 404: + logger.error(f"Endpoint not found: {res.url}") + raise ProviderError("Endpoint not found") + elif res.status_code == 429: + raise APIThrottled("Too many requests") + elif res.status_code == 403: + raise ConfigurationError("Invalid access token") + elif res.status_code != 200: + res.raise_for_status() + + subtitles = [] + + result = res.json() + + if ('success' in result and not result['success']) or ('status' in result and not result['status']): + logger.debug(result["error"]) + return [] + + if isinstance(self.video, Episode): + if len(result['tv_shows']): + for item in result['tv_shows']: + subtitle = LegendasNetSubtitle( + language=Language('por', 'BR'), + forced=self._is_forced(item), + page_link=f"https://legendas.net/tv_legenda?movie_id={result['tv_shows'][0]['tmdb_id']}&" + f"legenda_id={item['id']}", + download_link=item['path'], + file_id=item['id'], + release_names=[item.get('release_name', '')], + uploader=item['uploader'], + season=item.get('season', ''), + episode=item.get('episode', '') + ) + subtitle.get_matches(self.video) + if subtitle.language in languages: + subtitles.append(subtitle) + else: + if len(result['movies']): + for item in result['movies']: + subtitle = LegendasNetSubtitle( + language=Language('por', 'BR'), + forced=self._is_forced(item), + page_link=f"https://legendas.net/legenda?movie_id={result['movies'][0]['tmdb_id']}&" + f"legenda_id={item['id']}", + download_link=item['path'], + file_id=item['id'], + release_names=[item.get('release_name', '')], + uploader=item['uploader'], + season=None, + episode=None + ) + subtitle.get_matches(self.video) + if subtitle.language in languages: + subtitles.append(subtitle) + + return subtitles + + @staticmethod + def _is_forced(item): + forced_tags = ['forced', 'foreign'] + for tag in forced_tags: + if tag in item.get('comment', '').lower(): + return True + + # nothing match so we consider it as normal subtitles + return False + + def list_subtitles(self, video, languages): + return self.query(languages, video) + + def download_subtitle(self, subtitle): + logger.debug('Downloading subtitle %r', subtitle) + download_link = urljoin("https://legendas.net", subtitle.download_link) + + r = self.retry( + lambda: self.session.get(download_link, timeout=30), + amount=retry_amount, + retry_timeout=retry_timeout + ) + + if r.status_code == 429: + raise DownloadLimitExceeded("Daily download limit exceeded") + elif r.status_code == 403: + raise ConfigurationError("Invalid access token") + elif r.status_code != 200: + r.raise_for_status() + + if not r: + logger.error(f'Could not download subtitle from {download_link}') + subtitle.content = None + return + else: + archive_stream = io.BytesIO(r.content) + if is_zipfile(archive_stream): + archive = ZipFile(archive_stream) + for name in archive.namelist(): + subtitle_content = archive.read(name) + subtitle.content = fix_line_ending(subtitle_content) + return + else: + subtitle_content = r.content + subtitle.content = fix_line_ending(subtitle_content) + return |