summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authormorpheus65535 <[email protected]>2022-06-28 23:31:30 -0400
committermorpheus65535 <[email protected]>2022-06-28 23:31:30 -0400
commitf760d3198b75f5f24eb576ff032f656b18b83c40 (patch)
tree01d9db51bfa06f72be6bcaca6aca1e41d567b169
parent44ffac67b3639ea954e744f97a44e360a341b345 (diff)
parent171747751ad42b75e1dbfacafd98ae6c84017d6e (diff)
downloadbazarr-f760d3198b75f5f24eb576ff032f656b18b83c40.tar.gz
bazarr-f760d3198b75f5f24eb576ff032f656b18b83c40.zip
Merge remote-tracking branch 'origin/development' into development
-rw-r--r--bazarr/app/config.py5
-rw-r--r--bazarr/app/get_providers.py6
-rw-r--r--frontend/src/pages/Settings/Providers/list.ts27
-rwxr-xr-xlibs/fese/__init__.py442
-rwxr-xr-xlibs/fese/container.py238
-rw-r--r--libs/fese/disposition.py68
-rw-r--r--libs/fese/exceptions.py32
-rwxr-xr-xlibs/fese/stream.py162
-rw-r--r--libs/fese/tags.py175
-rw-r--r--libs/subliminal_patch/providers/embeddedsubtitles.py112
-rw-r--r--tests/bazarr/app/test_get_providers.py10
-rw-r--r--tests/subliminal_patch/test_embeddedsubtitles.py76
12 files changed, 790 insertions, 563 deletions
diff --git a/bazarr/app/config.py b/bazarr/app/config.py
index ac4c65eb0..8f001ed52 100644
--- a/bazarr/app/config.py
+++ b/bazarr/app/config.py
@@ -203,10 +203,8 @@ defaults = {
'approved_only': 'False'
},
'embeddedsubtitles': {
- 'include_ass': 'True',
- 'include_srt': 'True',
+ 'included_codecs': '[]',
'hi_fallback': 'False',
- 'mergerfs_mode': 'False',
'timeout': '600',
},
'subsync': {
@@ -261,6 +259,7 @@ raw_keys = ['movie_default_forced', 'serie_default_forced']
array_keys = ['excluded_tags',
'exclude',
+ 'included_codecs',
'subzero_mods',
'excluded_series_types',
'enabled_providers',
diff --git a/bazarr/app/get_providers.py b/bazarr/app/get_providers.py
index 87737f52d..dcfc2588a 100644
--- a/bazarr/app/get_providers.py
+++ b/bazarr/app/get_providers.py
@@ -227,15 +227,13 @@ def get_providers_auth():
'hashed_password': settings.ktuvit.hashed_password,
},
'embeddedsubtitles': {
- 'include_ass': settings.embeddedsubtitles.getboolean('include_ass'),
- 'include_srt': settings.embeddedsubtitles.getboolean('include_srt'),
+ 'included_codecs': get_array_from(settings.embeddedsubtitles.included_codecs),
'hi_fallback': settings.embeddedsubtitles.getboolean('hi_fallback'),
- 'mergerfs_mode': settings.embeddedsubtitles.getboolean('mergerfs_mode'),
'cache_dir': os.path.join(args.config_dir, "cache"),
'ffprobe_path': _FFPROBE_BINARY,
'ffmpeg_path': _FFMPEG_BINARY,
'timeout': settings.embeddedsubtitles.timeout,
- }
+ },
}
diff --git a/frontend/src/pages/Settings/Providers/list.ts b/frontend/src/pages/Settings/Providers/list.ts
index fe5d9b9cf..069d70aeb 100644
--- a/frontend/src/pages/Settings/Providers/list.ts
+++ b/frontend/src/pages/Settings/Providers/list.ts
@@ -87,33 +87,22 @@ export const ProviderList: Readonly<ProviderInfo[]> = [
description: "Embedded Subtitles from your Media Files",
inputs: [
{
- type: "switch",
- key: "include_srt",
- name: "Include SRT",
- defaultValue: true,
+ type: "chips",
+ key: "included_codecs",
+ name: "Allowed codecs (subrip, ass, webvtt, mov_text). Leave empty to allow all.",
+ defaultValue: [],
},
{
- type: "switch",
- key: "include_ass",
- name: "Include ASS (will be converted to SRT)",
- defaultValue: true,
+ type: "text",
+ key: "timeout",
+ defaultValue: 600,
+ name: "Extraction timeout in seconds",
},
{
type: "switch",
key: "hi_fallback",
name: "Use HI subtitles as a fallback (don't enable it if you have a HI language profile)",
},
- {
- type: "switch",
- key: "mergerfs_mode",
- name: "[EXPERIMENTAL] Ignore cloud video files from rclone/mergerfs",
- },
- {
- type: "text",
- key: "timeout",
- defaultValue: 600,
- name: "Extraction timeout in seconds",
- },
],
message:
"Warning for cloud users: this provider needs to read the entire file in order to extract subtitles.",
diff --git a/libs/fese/__init__.py b/libs/fese/__init__.py
index fbe3f44b7..1ce31e010 100755
--- a/libs/fese/__init__.py
+++ b/libs/fese/__init__.py
@@ -1,443 +1,7 @@
# -*- coding: utf-8 -*-
# License: GPL
-from __future__ import annotations
+from .container import FFprobeVideoContainer
+from .stream import FFprobeSubtitleStream
-import json
-import logging
-import os
-import re
-import subprocess
-from typing import List, Optional
-
-from babelfish import Language
-from babelfish.exceptions import LanguageError
-import pysubs2
-
-__version__ = "0.1.4"
-
-logger = logging.getLogger(__name__)
-
-# Paths to executables
-FFPROBE_PATH = os.environ.get("FFPROBE_PATH", "ffprobe")
-FFMPEG_PATH = os.environ.get("FFMPEG_PATH", "ffmpeg")
-
-FFMPEG_STATS = True
-FF_LOG_LEVEL = "quiet"
-
-
-class FeseError(Exception):
- pass
-
-
-class ExtractionError(FeseError):
- pass
-
-
-class InvalidFile(FeseError):
- pass
-
-
-class InvalidStream(FeseError):
- pass
-
-
-class InvalidSource(FeseError):
- pass
-
-
-class ConversionError(FeseError):
- pass
-
-
-class LanguageNotFound(FeseError):
- pass
-
-
-# Extensions
-
-SRT = "srt"
-ASS = "ass"
-
-
-class FFprobeSubtitleDisposition:
- def __init__(self, data: dict):
- self.default = False
- self.generic = False
- self.dub = False
- self.original = False
- self.comment = False
- self.lyrics = False
- self.karaoke = False
- self.forced = False
- self.hearing_impaired = False
- self.visual_impaired = False
- self.clean_effects = False
- self.attached_pic = False
- self.timed_thumbnails = False
- self._content_type = None
-
- for key, val in data.items():
- if hasattr(self, key):
- setattr(self, key, bool(val))
-
- def update_from_tags(self, tags):
- tag_title = tags.get("title")
- if tag_title is None:
- logger.debug("Title not found. Marking as generic")
- self.generic = True
- return None
-
- l_tag_title = tag_title.lower()
-
- for key, val in _content_types.items():
- if val.search(l_tag_title) is not None:
- logger.debug("Found %s: %s", key, l_tag_title)
- self._content_type = key
- setattr(self, key, True)
- return None
-
- logger.debug("Generic disposition title found: %s", l_tag_title)
- self.generic = True
- return None
-
- @property
- def suffix(self):
- if self._content_type is not None:
- return f"-{self._content_type}"
-
- return ""
-
- def __str__(self):
- return self.suffix.lstrip("-").upper() or "GENERIC"
-
-
-class FFprobeSubtitleStream:
- """Base class for FFprobe (FFmpeg) extractable subtitle streams."""
-
- def __init__(self, stream: dict):
- """
- :raises: LanguageNotFound
- """
- self.index = int(stream.get("index", 0))
- self.codec_name = stream.get("codec_name", "Unknown")
- self.extension = _subtitle_extensions.get(self.codec_name, self.codec_name)
- self.r_frame_rate = stream.get("r_frame_rate")
- self.avg_frame_rate = stream.get("avg_frame_rate")
- self.time_base = stream.get("time_base")
- self.tags = stream.get("tags", {})
- self.start_time = float(stream.get("start_time", 0))
- # TODO: separate tags
- self.number_of_frames = int(self.tags.get("NUMBER_OF_FRAMES", 0))
- self.number_of_frames_eng = int(
- self.tags.get("NUMBER_OF_FRAMES-eng", self.number_of_frames)
- )
-
- self.duration, self.duration_ts = 0, 0
-
- # some subtitles streams miss the duration_ts field and only have tags->DURATION field
- # fixme: we still don't know if "DURATION" is a common tag/key
- if "DURATION" in self.tags:
- try:
- h, m, s = [
- ts.replace(",", ".").strip()
- for ts in self.tags["DURATION"].split(":")
- ]
- self.duration = float(s) + float(m) * 60 + float(h) * 60 * 60
- self.duration_ts = int(self.duration * 1000)
- except ValueError as error:
- logger.warning("Couldn't get duration field: %s. Using 0", error)
- else:
- try:
- self.duration = float(stream.get("duration", "0").replace(",", "."))
- self.duration_ts = int(stream.get("duration_ts", self.duration * 1000))
- # some subtitles streams miss a duration completely and has "N/A" as value
- except ValueError as error:
- logger.warning("Couldn't get duration field: %s. Using 0", error)
-
- self.start_pts = int(stream.get("start_pts", 0))
-
- self.disposition = FFprobeSubtitleDisposition(stream.get("disposition", {}))
-
- if self.tags:
- self.disposition.update_from_tags(self.tags)
-
- self.language: Language = self._language()
-
- @property
- def suffix(self):
- lang = self.language.alpha2
- if self.language.country is not None:
- lang = f"{lang}-{self.language.country}"
-
- return f"{lang}{self.disposition.suffix}.{self.extension}"
-
- def _language(self) -> Language:
- og_lang = self.tags.get("language")
- last_exc = None
-
- if og_lang is not None:
- if og_lang in _extra_languages:
- extra = _extra_languages[og_lang]
- title = self.tags.get("title", "n/a").lower()
- if any(possible in title for possible in extra["matches"]):
- logger.debug("Found extra language %s", extra["language_args"])
- return Language(*extra["language_args"])
-
- try:
- lang = Language.fromalpha3b(og_lang)
- # Test for suffix
- assert lang.alpha2
-
- return lang
- except LanguageError as error:
- last_exc = error
- logger.debug("Error with '%s' language: %s", og_lang, error)
-
- raise LanguageNotFound(
- f"Couldn't detect language for stream: {self.tags}"
- ) from last_exc
-
- def __repr__(self) -> str:
- return f"<{self.codec_name.upper()}: {self.language}@{self.disposition}>"
-
-
-class FFprobeVideoContainer:
- def __init__(self, path: str):
- self.path = path
-
- @property
- def extension(self):
- return os.path.splitext(self.path)[-1].lstrip(".")
-
- def get_subtitles(self, timeout: int = 600) -> List[FFprobeSubtitleStream]:
- """Factory function to create subtitle instances from FFprobe.
-
- :param timeout: subprocess timeout in seconds (default: 600)
- :raises: InvalidSource"""
-
- ff_command = [
- FFPROBE_PATH,
- "-v",
- FF_LOG_LEVEL,
- "-print_format",
- "json",
- "-show_format",
- "-show_streams",
- self.path,
- ]
- try:
- result = subprocess.run(
- ff_command, stdout=subprocess.PIPE, check=True, timeout=timeout
- )
- streams = json.loads(result.stdout)["streams"]
- except _ffprobe_exceptions as error:
- raise InvalidSource(
- f"{error} trying to get information from {self.path}"
- ) from error # We want to see the traceback
-
- subs = []
- for stream in streams:
- if stream.get("codec_type", "n/a") != "subtitle":
- continue
- try:
- subs.append(FFprobeSubtitleStream(stream))
- except LanguageNotFound:
- pass
-
- if not subs:
- logger.debug("Source doesn't have any subtitle valid streams")
- return []
-
- logger.debug("Found subtitle streams: %s", subs)
- return subs
-
- def extract_subtitles(
- self,
- subtitles: List[FFprobeSubtitleStream],
- custom_dir=None,
- overwrite=True,
- timeout=600,
- ):
- """Extracts a list of subtitles. Returns a dictionary of the extracted
- filenames by index.
-
- :param subtitles: a list of FFprobeSubtitle instances
- :param custom_dir: a custom directory to save the subtitles. Defaults to
- same directory as the media file
- :param overwrite: overwrite files with the same name (default: True)
- :param timeout: subprocess timeout in seconds (default: 600)
- :raises: ExtractionError, OSError
- """
- extract_command = [FFMPEG_PATH, "-v", FF_LOG_LEVEL]
- if FFMPEG_STATS:
- extract_command.append("-stats")
- extract_command.extend(["-y", "-i", self.path])
-
- if custom_dir is not None:
- # May raise OSError
- os.makedirs(custom_dir, exist_ok=True)
-
- items = {}
- collected_paths = set()
-
- for subtitle in subtitles:
- sub_path = f"{os.path.splitext(self.path)[0]}.{subtitle.suffix}"
- if custom_dir is not None:
- sub_path = os.path.join(custom_dir, os.path.basename(sub_path))
-
- if sub_path in collected_paths:
- sub_path = (
- f"{sub_path.rstrip(f'.{subtitle.suffix}')}"
- f"-{len(collected_paths)}.{subtitle.suffix}"
- )
-
- if not overwrite and os.path.isfile(sub_path):
- logger.debug("Ignoring path (OVERWRITE TRUE): %s", sub_path)
- continue
-
- extract_command.extend(
- ["-map", f"0:{subtitle.index}", "-c", "copy", sub_path]
- )
- logger.debug("Appending subtitle path: %s", sub_path)
-
- collected_paths.add(sub_path)
-
- items[subtitle.index] = sub_path
-
- if not items:
- logger.debug("No subtitles to extract")
- return {}
-
- logger.debug("Extracting subtitle with command %s", " ".join(extract_command))
-
- try:
- subprocess.run(extract_command, timeout=timeout, check=True)
- except (subprocess.SubprocessError, FileNotFoundError) as error:
- raise ExtractionError(f"Error calling ffmpeg: {error}") from error
-
- for path in items.values():
- if not os.path.isfile(path):
- logger.debug("%s was not extracted", path)
-
- return items
-
- def __repr__(self) -> str:
- return f"<FFprobeVideoContainer {self.extension}: {self.path}>"
-
-
-def check_integrity(
- subtitle: FFprobeSubtitleStream, path: str, sec_offset_threshold=900
-):
- """A relative check for the integriy of a file. This can be used to find a failed
- ffmpeg extraction where the final file might not be complete or might be corrupted.
- Currently, only ASS and Subrip are supported.
-
- :param subtitle: FFprobeSubtitle instance
- :param path: the path of the subtitle file (ass or srt)
- :param sec_offset_threshold: the maximum seconds offset to determine if the file is complete
- :raises: InvalidFile
- """
- if subtitle.extension not in (ASS, SRT):
- raise InvalidFile(f"Extension not supported: {subtitle.extension}")
-
- try:
- sub = pysubs2.load(path)
- except (pysubs2.Pysubs2Error, UnicodeError, OSError, FileNotFoundError) as error:
- raise InvalidFile(error) from error
- else:
- # ignore the duration check if the stream has no duration listed at all
- if subtitle.duration_ts:
- off = abs(int(sub[-1].end) - subtitle.duration_ts)
- if off > abs(sec_offset_threshold) * 1000:
- raise InvalidFile(
- f"The last subtitle timestamp ({sub[-1].end/1000} sec) is {off/1000} sec ahead"
- f" from the subtitle stream total duration ({subtitle.duration} sec)"
- )
- logger.debug("Integrity check passed (%d sec offset)", off / 1000)
- else:
- logger.warning(
- "Ignoring duration check, subtitle stream has bad duration values: %s",
- subtitle,
- )
-
-
-def to_srt(
- source: str, output: Optional[str] = None, remove_source: bool = False
-) -> str:
- """Convert a subtitle to SubRip. Currently, only ASS is supported. SubRip
- files will be silently ignored.
-
- raises: ConversionError, OSError"""
- if source.endswith(".srt"):
- return source
-
- split_path = os.path.splitext(source)
-
- if split_path[-1] not in (".ass"):
- raise ConversionError(
- f"No converter found for extension: {split_path[-1]}"
- ) from None
-
- output = output or f"{split_path[0]}.srt"
-
- try:
- parsed = pysubs2.load(source)
- parsed.save(output)
- except (pysubs2.Pysubs2Error, UnicodeError) as error:
- raise ConversionError(f"Exception converting {output}: {error}") from error
-
- logger.debug("Converted: %s", output)
-
- if remove_source and source != output:
- try:
- os.remove(source)
- except OSError as error:
- logger.debug("Can't remove source: %s (%s)", source, error)
-
- return output
-
-
-_subtitle_extensions = {
- "subrip": "srt",
- "ass": "ass",
- "hdmv_pgs_subtitle": "sup",
- "pgs": "sup",
-}
-
-
-_content_types = {
- "hearing_impaired": re.compile(r"sdh|hearing impaired"),
- "forced": re.compile(r"forced"),
- "comment": re.compile(r"comment"),
- "visual_impaired": re.compile(r"signs|visual impair"),
- "karaoke": re.compile(r"karaoke|songs"),
-}
-
-
-_ffprobe_exceptions = (
- subprocess.SubprocessError,
- json.JSONDecodeError,
- FileNotFoundError,
- KeyError,
-)
-
-_extra_languages = {
- "spa": {
- "matches": (
- "es-la",
- "spa-la",
- "spl",
- "mx",
- "latin",
- "mexic",
- "argent",
- "latam",
- ),
- "language_args": ("spa", "MX"),
- },
- "por": {
- "matches": ("pt-br", "pob", "pb", "brazilian", "brasil", "brazil"),
- "language_args": ("por", "BR"),
- },
-}
+__version__ = "0.2"
diff --git a/libs/fese/container.py b/libs/fese/container.py
new file mode 100755
index 000000000..d65aee17f
--- /dev/null
+++ b/libs/fese/container.py
@@ -0,0 +1,238 @@
+# -*- coding: utf-8 -*-
+# License: GPL
+
+from __future__ import annotations
+
+import json
+import logging
+import os
+import subprocess
+
+from .exceptions import ExtractionError
+from .exceptions import InvalidSource
+from .exceptions import LanguageNotFound
+from .exceptions import UnsupportedCodec
+from .stream import FFprobeSubtitleStream
+
+logger = logging.getLogger(__name__)
+
+# Paths to executables
+FFPROBE_PATH = os.environ.get("FFPROBE_PATH", "ffprobe")
+FFMPEG_PATH = os.environ.get("FFMPEG_PATH", "ffmpeg")
+
+FFMPEG_STATS = True
+FF_LOG_LEVEL = "quiet"
+
+
+class FFprobeVideoContainer:
+ def __init__(self, path: str):
+ self.path = path
+
+ @property
+ def extension(self):
+ return os.path.splitext(self.path)[-1].lstrip(".")
+
+ def get_subtitles(self, timeout: int = 600):
+ """Factory function to create subtitle (stream) instances from FFprobe.
+
+ :param timeout: subprocess timeout in seconds (default: 600)
+ :raises: InvalidSource"""
+
+ ff_command = [
+ FFPROBE_PATH,
+ "-v",
+ FF_LOG_LEVEL,
+ "-print_format",
+ "json",
+ "-show_format",
+ "-show_streams",
+ self.path,
+ ]
+ try:
+ result = subprocess.run(
+ ff_command, stdout=subprocess.PIPE, check=True, timeout=timeout
+ )
+ streams = json.loads(result.stdout)["streams"]
+ except _ffprobe_exceptions as error:
+ raise InvalidSource(
+ f"{error} trying to get information from {self.path}"
+ ) from error # We want to see the traceback
+
+ subs = []
+ for stream in streams:
+ if stream.get("codec_type", "n/a") != "subtitle":
+ continue
+ try:
+ subs.append(FFprobeSubtitleStream(stream))
+ except (LanguageNotFound, UnsupportedCodec) as error:
+ logger.debug("Ignoring %s: %s", stream.get("codec_name"), error)
+
+ if not subs:
+ logger.debug("Source doesn't have any subtitle valid streams")
+ return []
+
+ logger.debug("Found subtitle streams: %s", subs)
+ return subs
+
+ def extract_subtitles(
+ self,
+ subtitles,
+ custom_dir=None,
+ overwrite=True,
+ timeout=600,
+ convert_format=None,
+ ):
+ """Extracts a list of subtitles converting them. Returns a dictionary of the
+ extracted filenames by index.
+
+ Most bitmap subtitles will raise UnsupportedCodec as they don't support conversion.
+ For such formats use copy instead.
+
+ :param subtitles: a list of FFprobeSubtitle instances
+ :param custom_dir: a custom directory to save the subtitles. Defaults to
+ same directory as the media file
+ :param overwrite: overwrite files with the same name (default: True)
+ :param timeout: subprocess timeout in seconds (default: 600)
+ :param convert_format: format to convert selected subtitles. Defaults to
+ srt
+ :raises: ExtractionError, UnsupportedCodec, OSError
+ """
+ extract_command = [FFMPEG_PATH, "-v", FF_LOG_LEVEL]
+ if FFMPEG_STATS:
+ extract_command.append("-stats")
+ extract_command.extend(["-y", "-i", self.path])
+
+ if custom_dir is not None:
+ # May raise OSError
+ os.makedirs(custom_dir, exist_ok=True)
+
+ items = {}
+ collected_paths = set()
+
+ for subtitle in subtitles:
+ extension_to_use = convert_format or subtitle.convert_default_format
+
+ sub_path = (
+ f"{os.path.splitext(self.path)[0]}.{subtitle.suffix}.{extension_to_use}"
+ )
+ if custom_dir is not None:
+ sub_path = os.path.join(custom_dir, os.path.basename(sub_path))
+
+ if not overwrite and sub_path in collected_paths:
+ sub_path = f"{os.path.splitext(sub_path)[0]}.{len(collected_paths):02}.{extension_to_use}"
+
+ if not overwrite and os.path.isfile(sub_path):
+ logger.debug("Ignoring path (OVERWRITE TRUE): %s", sub_path)
+ continue
+
+ extract_command.extend(subtitle.convert_args(convert_format, sub_path))
+
+ logger.debug("Appending subtitle path: %s", sub_path)
+ collected_paths.add(sub_path)
+
+ items[subtitle.index] = sub_path
+
+ if not items:
+ logger.debug("No subtitles to extract")
+ return {}
+
+ logger.debug("Extracting subtitle with command %s", " ".join(extract_command))
+
+ try:
+ subprocess.run(extract_command, timeout=timeout, check=True)
+ except (subprocess.SubprocessError, FileNotFoundError) as error:
+ raise ExtractionError(f"Error calling ffmpeg: {error}") from error
+
+ for path in items.values():
+ if not os.path.isfile(path):
+ logger.warning("%s was not extracted", path)
+
+ return items
+
+ def copy_subtitles(
+ self,
+ subtitles,
+ custom_dir=None,
+ overwrite=True,
+ timeout=600,
+ fallback_to_convert=True,
+ ):
+ """Extracts a list of subtitles with ffmpeg's copy method. Returns a dictionary
+ of the extracted filenames by index.
+
+ :param subtitles: a list of FFprobeSubtitle instances
+ :param custom_dir: a custom directory to save the subtitles. Defaults to
+ same directory as the media file
+ :param overwrite: overwrite files with the same name (default: True)
+ :param timeout: subprocess timeout in seconds (default: 600)
+ :param fallback_to_convert: fallback to stream's default convert format if it is
+ incompatible with copy
+ :raises: ExtractionError, UnsupportedCodec, OSError
+ """
+ extract_command = [FFMPEG_PATH, "-v", FF_LOG_LEVEL]
+ if FFMPEG_STATS:
+ extract_command.append("-stats")
+ extract_command.extend(["-y", "-i", self.path])
+
+ if custom_dir is not None:
+ # May raise OSError
+ os.makedirs(custom_dir, exist_ok=True)
+
+ items = {}
+ collected_paths = set()
+
+ for subtitle in subtitles:
+ sub_path = f"{os.path.splitext(self.path)[0]}.{subtitle.suffix}.{subtitle.extension}"
+ if custom_dir is not None:
+ sub_path = os.path.join(custom_dir, os.path.basename(sub_path))
+
+ if not overwrite and sub_path in collected_paths:
+ sub_path = f"{os.path.splitext(sub_path)[0]}.{len(collected_paths):02}.{subtitle.extension}"
+
+ if not overwrite and os.path.isfile(sub_path):
+ logger.debug("Ignoring path (OVERWRITE TRUE): %s", sub_path)
+ continue
+
+ try:
+ extract_command.extend(subtitle.copy_args(sub_path))
+ except UnsupportedCodec:
+ if fallback_to_convert:
+ logger.warning(
+ "%s incompatible with copy. Using fallback", subtitle
+ )
+ extract_command.extend(subtitle.convert_args(None, sub_path))
+ else:
+ raise
+
+ logger.debug("Appending subtitle path: %s", sub_path)
+ collected_paths.add(sub_path)
+
+ items[subtitle.index] = sub_path
+
+ if not items:
+ logger.debug("No subtitles to extract")
+ return {}
+
+ logger.debug("Extracting subtitle with command %s", " ".join(extract_command))
+
+ try:
+ subprocess.run(extract_command, timeout=timeout, check=True)
+ except (subprocess.SubprocessError, FileNotFoundError) as error:
+ raise ExtractionError(f"Error calling ffmpeg: {error}") from error
+
+ for path in items.values():
+ if not os.path.isfile(path):
+ logger.warning("%s was not extracted", path)
+
+ return items
+
+ def __repr__(self) -> str:
+ return f"<FFprobeVideoContainer {self.extension}: {self.path}>"
+
+
+_ffprobe_exceptions = (
+ subprocess.SubprocessError,
+ json.JSONDecodeError,
+ FileNotFoundError,
+ KeyError,
+)
diff --git a/libs/fese/disposition.py b/libs/fese/disposition.py
new file mode 100644
index 000000000..d3582fda5
--- /dev/null
+++ b/libs/fese/disposition.py
@@ -0,0 +1,68 @@
+# -*- coding: utf-8 -*-
+
+import logging
+import re
+
+logger = logging.getLogger(__name__)
+
+
+class FFprobeSubtitleDisposition:
+ def __init__(self, data: dict):
+ self.default = False
+ self.generic = False
+ self.dub = False
+ self.original = False
+ self.comment = False
+ self.lyrics = False
+ self.karaoke = False
+ self.forced = False
+ self.hearing_impaired = False
+ self.visual_impaired = False
+ self.clean_effects = False
+ self.attached_pic = False
+ self.timed_thumbnails = False
+ self._content_type = None
+
+ for key, val in data.items():
+ if hasattr(self, key):
+ setattr(self, key, bool(val))
+
+ for key in _content_types.keys():
+ if getattr(self, key, None):
+ self._content_type = key
+
+ def update_from_tags(self, tags):
+ tag_title = tags.get("title")
+ if tag_title is None:
+ logger.debug("Title not found. Marking as generic")
+ self.generic = True
+ return None
+
+ l_tag_title = tag_title.lower()
+
+ for key, val in _content_types.items():
+ if val.search(l_tag_title) is not None:
+ logger.debug("Found %s: %s", key, l_tag_title)
+ self._content_type = key
+ setattr(self, key, True)
+ return None
+
+ logger.debug("Generic disposition title found: %s", l_tag_title)
+ self.generic = True
+ return None
+
+ @property
+ def suffix(self):
+ return self._content_type or ""
+
+ def __str__(self):
+ return self.suffix.upper() or "GENERIC"
+
+
+_content_types = {
+ "hearing_impaired": re.compile(r"sdh|hearing impaired|cc"),
+ "forced": re.compile(r"forced|non[- ]english"),
+ "comment": re.compile(r"comment"),
+ "visual_impaired": re.compile(r"signs|visual impair"),
+ "karaoke": re.compile(r"karaoke|songs"),
+}
diff --git a/libs/fese/exceptions.py b/libs/fese/exceptions.py
new file mode 100644
index 000000000..1496577f5
--- /dev/null
+++ b/libs/fese/exceptions.py
@@ -0,0 +1,32 @@
+# -*- coding: utf-8 -*-
+
+class FeseError(Exception):
+ pass
+
+
+class ExtractionError(FeseError):
+ pass
+
+
+class InvalidFile(FeseError):
+ pass
+
+
+class InvalidStream(FeseError):
+ pass
+
+
+class InvalidSource(FeseError):
+ pass
+
+
+class ConversionError(FeseError):
+ pass
+
+
+class LanguageNotFound(FeseError):
+ pass
+
+
+class UnsupportedCodec(FeseError):
+ pass
diff --git a/libs/fese/stream.py b/libs/fese/stream.py
new file mode 100755
index 000000000..3c73b1d3c
--- /dev/null
+++ b/libs/fese/stream.py
@@ -0,0 +1,162 @@
+# -*- coding: utf-8 -*-
+
+from __future__ import annotations
+
+from datetime import timedelta
+import logging
+
+from .disposition import FFprobeSubtitleDisposition
+from .exceptions import UnsupportedCodec
+from .tags import FFprobeGenericSubtitleTags
+
+logger = logging.getLogger(__name__)
+
+
+class FFprobeSubtitleStream:
+ """Base class for FFprobe (FFmpeg) extractable subtitle streams."""
+
+ def __init__(self, stream: dict):
+ """
+ :raises: LanguageNotFound, UnsupportedCodec
+ """
+ self.index = int(stream["index"])
+ self.codec_name = stream["codec_name"]
+
+ try:
+ self._codec = _codecs[self.codec_name]
+ except KeyError:
+ raise UnsupportedCodec(f"{self.codec_name} is not supported")
+
+ self.r_frame_rate = stream.get("r_frame_rate")
+ self.avg_frame_rate = stream.get("avg_frame_rate")
+ self.start_time = timedelta(seconds=float(stream.get("start_time", 0)))
+ self.start_pts = timedelta(milliseconds=int(stream.get("start_pts", 0)))
+ self.duration_ts = timedelta(milliseconds=int(stream.get("duration_ts", 0)))
+ self.duration = timedelta(seconds=float(stream.get("duration", 0)))
+
+ self.tags = FFprobeGenericSubtitleTags.detect_cls_from_data(
+ stream.get("tags", {})
+ )
+ self.disposition = FFprobeSubtitleDisposition(stream.get("disposition", {}))
+
+ if stream.get("tags") is not None:
+ self.disposition.update_from_tags(stream["tags"])
+
+ def convert_args(self, convert_format, outfile):
+ """
+ convert_format: Union[str, None] = the codec format to convert. if None is set, defaults
+ to 'convert_default_format' codec's key
+ outfile: str = output file
+
+ raises UnsupportedCodec if convert_format doesn't exist or if the codec doesn't
+ support conversion
+ """
+ convert_format = convert_format or self._codec["convert_default_format"]
+
+ if convert_format is None or not any(
+ convert_format == item["copy_format"] for item in _codecs.values()
+ ):
+ raise UnsupportedCodec(f"Unknown convert format: {convert_format}")
+
+ if not self._codec["convert"]:
+ raise UnsupportedCodec(
+ f"{self.codec_name} codec doesn't support conversion"
+ )
+
+ return ["-map", f"0:{self.index}", "-f", convert_format, outfile]
+
+ def copy_args(self, outfile):
+ "raises UnsupportedCodec if the codec doesn't support copy"
+ if not self._codec["copy"] or not self._codec["copy_format"]:
+ raise UnsupportedCodec(f"{self.codec_name} doesn't support copy")
+
+ return [
+ "-map",
+ f"0:{self.index}",
+ "-c:s",
+ "copy",
+ "-f",
+ self._codec["copy_format"],
+ outfile,
+ ]
+
+ @property
+ def language(self):
+ # Legacy
+ return self.tags.language
+
+ @property
+ def extension(self):
+ return self._codec["copy_format"] or self._codec["convert_default_format"] or ""
+
+ @property
+ def convert_default_format(self):
+ return self._codec["convert_default_format"]
+
+ @property
+ def type(self):
+ return self._codec["type"]
+
+ @property
+ def suffix(self):
+ return ".".join(
+ item
+ for item in (self.tags.suffix, self.disposition.suffix, self.extension)
+ if item
+ )
+
+ def __repr__(self) -> str:
+ return f"<{self.codec_name.upper()}: {self.tags}@{self.disposition}>"
+
+
+_codecs = {
+ "ass": {
+ "type": "text",
+ "copy": True,
+ "copy_format": "ass",
+ "convert": True,
+ "convert_default_format": "srt",
+ },
+ "subrip": {
+ "type": "text",
+ "copy": True,
+ "copy_format": "srt",
+ "convert": True,
+ "convert_default_format": "srt",
+ },
+ "webvtt": {
+ "type": "text",
+ "copy": True,
+ "copy_format": "webvtt",
+ "convert": True,
+ "convert_default_format": "srt",
+ },
+ "mov_text": {
+ "type": "text",
+ "copy": False,
+ "copy_format": None,
+ "convert": True,
+ "convert_default_format": "srt",
+ },
+ "hdmv_pgs_subtitle": {
+ "type": "bitmap",
+ "copy": True,
+ "copy_format": "sup",
+ "convert": False,
+ "convert_default_format": None,
+ },
+ "dvb_subtitle": {
+ "type": "bitmap",
+ "copy": True,
+ "copy_format": "sup",
+ "convert": False,
+ "convert_default_format": None,
+ },
+ "dvd_subtitle": {
+ "type": "bitmap",
+ "copy": True,
+ "copy_format": "sup",
+ "convert": False,
+ "convert_default_format": None,
+ },
+}
diff --git a/libs/fese/tags.py b/libs/fese/tags.py
new file mode 100644
index 000000000..f31217ace
--- /dev/null
+++ b/libs/fese/tags.py
@@ -0,0 +1,175 @@
+from datetime import timedelta
+import logging
+
+from babelfish import Language
+from babelfish.exceptions import LanguageError
+
+from .exceptions import LanguageNotFound
+
+logger = logging.getLogger(__name__)
+
+
+class FFprobeGenericSubtitleTags:
+ _DETECTABLE_TAGS = None
+
+ def __init__(self, data: dict):
+ self.language = _get_language(data)
+ self._data = data
+
+ @classmethod
+ def detect_cls_from_data(cls, data):
+ for cls_ in (FFprobeMkvSubtitleTags, FFprobeMp4SubtitleTags):
+ if cls_.is_compatible(data):
+ logger.debug("Detected tags class: %s", cls_)
+ return cls_(data)
+
+ logger.debug("Unable to detect tags class. Using generic")
+ return FFprobeGenericSubtitleTags(data)
+
+ @property
+ def suffix(self):
+ lang = self.language.alpha2
+ if self.language.country is not None:
+ lang = f"{lang}-{self.language.country}"
+
+ return str(lang)
+
+ @property
+ def frames(self):
+ return 0
+
+ @classmethod
+ def is_compatible(cls, data):
+ return False
+
+ def __str__(self) -> str:
+ return f"{type(self).__name__}: {self.suffix}"
+
+
+class FFprobeMkvSubtitleTags(FFprobeGenericSubtitleTags):
+ _DETECTABLE_TAGS = (
+ "BPS",
+ "BPS-eng",
+ "DURATION",
+ "DURATION-eng",
+ "NUMBER_OF_FRAMES",
+ "NUMBER_OF_FRAMES-eng",
+ "NUMBER_OF_BYTES",
+ "NUMBER_OF_BYTES-eng",
+ )
+
+ def __init__(self, data: dict):
+ super().__init__(data)
+
+ self.title = data.get("title")
+ self.bps = _safe_int(data.get("BPS"))
+ self.bps_eng = _safe_int(data.get("BPS-eng"))
+ self.duration = _safe_td(data.get("DURATION"))
+ self.duration_eng = _safe_td(data.get("DURATION-eng"))
+ self.number_of_frames = _safe_int(data.get("NUMBER_OF_FRAMES"))
+ self.number_of_frames_eng = _safe_int(data.get("NUMBER_OF_FRAMES-eng"))
+ self.number_of_bytes = _safe_int(data.get("NUMBER_OF_BYTES"))
+ self.number_of_bytes_eng = _safe_int(data.get("NUMBER_OF_BYTES-eng"))
+
+ @property
+ def frames(self):
+ return self.number_of_frames or self.number_of_frames_eng or 0
+
+ @classmethod
+ def is_compatible(cls, data):
+ return any(
+ key
+ in (
+ "BPS",
+ "BPS-eng",
+ "DURATION",
+ "DURATION-eng",
+ "NUMBER_OF_FRAMES",
+ "NUMBER_OF_FRAMES-eng",
+ "NUMBER_OF_BYTES",
+ "NUMBER_OF_BYTES-eng",
+ )
+ for key in data.keys()
+ )
+
+
+class FFprobeMp4SubtitleTags(FFprobeGenericSubtitleTags):
+ _DETECTABLE_TAGS = ("creation_time", "handler_name")
+
+ def __init__(self, data: dict):
+ super().__init__(data)
+ self.creation_time = data.get("creation_time")
+ self.handler_name = data.get("handler_name")
+
+ @classmethod
+ def is_compatible(cls, data):
+ return any(key in ("creation_time", "handler_name") for key in data.keys())
+
+
+def _get_language(tags) -> Language:
+ og_lang = tags.get("language")
+ last_exc = None
+
+ if og_lang is not None:
+ if og_lang in _extra_languages:
+ extra = _extra_languages[og_lang]
+ title = tags.get("title", "n/a").lower()
+ if any(possible in title for possible in extra["matches"]):
+ logger.debug("Found extra language %s", extra["language_args"])
+ return Language(*extra["language_args"])
+
+ try:
+ lang = Language.fromalpha3b(og_lang)
+ # Test for suffix
+ assert lang.alpha2
+
+ return lang
+ except LanguageError as error:
+ last_exc = error
+ logger.debug("Error with '%s' language: %s", og_lang, error)
+
+ raise LanguageNotFound(f"Couldn't detect language from tags: {tags}") from last_exc
+
+
+def _safe_td(value, default=None):
+ if value is None:
+ return default
+
+ try:
+ h, m, s = [float(ts.replace(",", ".").strip()) for ts in value.split(":")]
+ return timedelta(hours=h, minutes=m, seconds=s)
+ except ValueError as error:
+ logger.warning("Couldn't get duration field: %s. Returning %s", error, default)
+ return default
+
+
+def _safe_int(value, default=None):
+ if value is None:
+ return default
+
+ try:
+ return int(value)
+ except ValueError:
+ logger.warning("Couldn't convert to int: %s. Returning %s", value, default)
+ return default
+
+
+_extra_languages = {
+ "spa": {
+ "matches": (
+ "es-la",
+ "spa-la",
+ "spl",
+ "mx",
+ "latin",
+ "mexic",
+ "argent",
+ "latam",
+ ),
+ "language_args": ("spa", "MX"),
+ },
+ "por": {
+ "matches": ("pt-br", "pob", "pb", "brazilian", "brasil", "brazil"),
+ "language_args": ("por", "BR"),
+ },
+}
diff --git a/libs/subliminal_patch/providers/embeddedsubtitles.py b/libs/subliminal_patch/providers/embeddedsubtitles.py
index 0cff01a8e..5482eebbc 100644
--- a/libs/subliminal_patch/providers/embeddedsubtitles.py
+++ b/libs/subliminal_patch/providers/embeddedsubtitles.py
@@ -7,16 +7,14 @@ import shutil
import tempfile
from babelfish import language_converters
-import fese
-from fese import check_integrity
+from fese import tags
+from fese import container
from fese import FFprobeSubtitleStream
from fese import FFprobeVideoContainer
-from fese import InvalidFile
-from fese import to_srt
+from fese.exceptions import InvalidSource
from subliminal.subtitle import fix_line_ending
from subliminal_patch.core import Episode
from subliminal_patch.core import Movie
-from subliminal_patch.exceptions import MustGetBlacklisted
from subliminal_patch.providers import Provider
from subliminal_patch.subtitle import Subtitle
from subzero.language import Language
@@ -24,7 +22,7 @@ from subzero.language import Language
logger = logging.getLogger(__name__)
# Replace Babelfish's Language with Subzero's Language
-fese.Language = Language
+tags.Language = Language
class EmbeddedSubtitle(Subtitle):
@@ -57,6 +55,9 @@ class EmbeddedSubtitle(Subtitle):
return f"{self.container.path}_{self.stream.index}"
+_ALLOWED_CODECS = ("ass", "subrip", "webvtt", "mov_text")
+
+
class EmbeddedSubtitlesProvider(Provider):
provider_name = "embeddedsubtitles"
@@ -72,33 +73,37 @@ class EmbeddedSubtitlesProvider(Provider):
def __init__(
self,
- include_ass=True,
- include_srt=True,
+ included_codecs=None,
cache_dir=None,
ffprobe_path=None,
ffmpeg_path=None,
hi_fallback=False,
- mergerfs_mode=False,
timeout=600,
+ include_ass=None,
+ include_srt=None,
+ mergerfs_mode=None,
):
- self._include_ass = include_ass
- self._include_srt = include_srt
+ self._included_codecs = set(included_codecs or _ALLOWED_CODECS)
+
+ for codec in self._included_codecs:
+ if codec not in _ALLOWED_CODECS:
+ logger.warning("Unallowed codec: %s", codec)
+
self._cache_dir = os.path.join(
cache_dir or tempfile.gettempdir(), self.__class__.__name__.lower()
)
self._hi_fallback = hi_fallback
self._cached_paths = {}
- self._mergerfs_mode = mergerfs_mode
- self._timeout = float(timeout)
+ self._timeout = int(timeout)
- fese.FFPROBE_PATH = ffprobe_path or fese.FFPROBE_PATH
- fese.FFMPEG_PATH = ffmpeg_path or fese.FFMPEG_PATH
+ container.FFPROBE_PATH = ffprobe_path or container.FFPROBE_PATH
+ container.FFMPEG_PATH = ffmpeg_path or container.FFMPEG_PATH
if logger.getEffectiveLevel() == logging.DEBUG:
- fese.FF_LOG_LEVEL = "warning"
+ container.FF_LOG_LEVEL = "warning"
else:
# Default is True
- fese.FFMPEG_STATS = False
+ container.FFMPEG_STATS = False
def initialize(self):
os.makedirs(self._cache_dir, exist_ok=True)
@@ -111,8 +116,8 @@ class EmbeddedSubtitlesProvider(Provider):
video = _get_memoized_video_container(path)
try:
- streams = filter(_check_allowed_extensions, video.get_subtitles())
- except fese.InvalidSource as error:
+ streams = filter(_check_allowed_codecs, video.get_subtitles())
+ except InvalidSource as error:
logger.error("Error trying to get subtitles for %s: %s", video, error)
self._blacklist.add(path)
streams = []
@@ -128,12 +133,12 @@ class EmbeddedSubtitlesProvider(Provider):
allowed_streams = []
for stream in streams:
- if not self._include_ass and stream.extension == "ass":
- logger.debug("Ignoring ASS: %s", stream)
- continue
-
- if not self._include_srt and stream.extension == "srt":
- logger.debug("Ignoring SRT: %s", stream)
+ if stream.codec_name not in self._included_codecs:
+ logger.debug(
+ "Ignoring %s (codec not included in %s)",
+ stream,
+ self._included_codecs,
+ )
continue
if stream.language not in languages:
@@ -188,28 +193,19 @@ class EmbeddedSubtitlesProvider(Provider):
if container.path not in self._cached_paths:
# Extract all subittle streams to avoid reading the entire
# container over and over
- streams = filter(_check_allowed_extensions, container.get_subtitles())
- extracted = container.extract_subtitles(
- list(streams), self._cache_dir, timeout=self._timeout
+ streams = filter(_check_allowed_codecs, container.get_subtitles())
+ extracted = container.copy_subtitles(
+ list(streams),
+ self._cache_dir,
+ timeout=self._timeout,
+ fallback_to_convert=True,
)
# Add the extracted paths to the containter path key
self._cached_paths[container.path] = extracted
cached_path = self._cached_paths[container.path]
# Get the subtitle file by index
- subtitle_path = cached_path[subtitle.stream.index]
-
- try:
- check_integrity(subtitle.stream, subtitle_path)
- except InvalidFile as error:
- raise MustGetBlacklisted(subtitle.id, subtitle.media_type) from error
-
- # Convert to SRT if the subtitle is ASS
- new_subtitle_path = to_srt(subtitle_path, remove_source=True)
- if new_subtitle_path != subtitle_path:
- cached_path[subtitle.stream.index] = new_subtitle_path
-
- return new_subtitle_path
+ return cached_path[subtitle.stream.index]
def _is_path_valid(self, path):
if path in self._blacklist:
@@ -220,10 +216,6 @@ class EmbeddedSubtitlesProvider(Provider):
logger.debug("Inexistent file: %s", path)
return False
- if self._mergerfs_mode and _is_fuse_rclone_mount(path):
- logger.debug("Potential cloud file: %s", path)
- return False
-
return True
@@ -239,8 +231,12 @@ def _get_memoized_video_container(path: str):
return _MemoizedFFprobeVideoContainer(path)
-def _check_allowed_extensions(subtitle: FFprobeSubtitleStream):
- return subtitle.extension in ("ass", "srt")
+def _check_allowed_codecs(subtitle: FFprobeSubtitleStream):
+ if subtitle.codec_name not in _ALLOWED_CODECS:
+ logger.debug("Unallowed codec: %s", subtitle)
+ return False
+
+ return True
def _check_hi_fallback(streams, languages):
@@ -270,10 +266,10 @@ def _check_hi_fallback(streams, languages):
def _discard_possible_incomplete_subtitles(streams):
- """Check number_of_frames attributes from subtitle streams in order to find
+ """Check frame properties from subtitle streams in order to find
supposedly incomplete subtitles"""
try:
- max_frames = max(stream.number_of_frames for stream in streams)
+ max_frames = max(stream.tags.frames for stream in streams)
except ValueError:
return []
@@ -288,11 +284,11 @@ def _discard_possible_incomplete_subtitles(streams):
for stream in streams:
# 500 < 1200
- if stream.number_of_frames < max_frames // 2:
+ if stream.tags.frames < max_frames // 2:
logger.debug(
"Possible bad subtitle found: %s (%s frames - %s frames)",
stream,
- stream.number_of_frames,
+ stream.tags.frames,
max_frames,
)
continue
@@ -302,20 +298,6 @@ def _discard_possible_incomplete_subtitles(streams):
return valid_streams
-def _is_fuse_rclone_mount(path: str):
- # Experimental!
-
- # This function only makes sense if you are combining a rclone mount with a local mount
- # with mergerfs or similar tools. Don't use it otherwise.
-
- # It tries to guess whether a file is a cloud mount by the length
- # of the inode number. See the following links for reference.
-
- # https://forum.rclone.org/t/fuse-inode-number-aufs/215/5
- # https://pkg.go.dev/bazil.org/fuse/fs?utm_source=godoc#GenerateDynamicInode
- return len(str(os.stat(path).st_ino)) > 18
-
-
def _get_pretty_release_name(stream, container):
bname = os.path.basename(container.path)
return f"{os.path.splitext(bname)[0]}.{stream.suffix}"
diff --git a/tests/bazarr/app/test_get_providers.py b/tests/bazarr/app/test_get_providers.py
index d4db1c942..20e1f1a62 100644
--- a/tests/bazarr/app/test_get_providers.py
+++ b/tests/bazarr/app/test_get_providers.py
@@ -24,3 +24,13 @@ def test_get_providers_auth_with_provider_registry():
raise ValueError(f"'{sub_key}' parameter not present in {provider}")
assert sign.parameters[sub_key] is not None
+
+
+def test_get_providers_auth_embeddedsubtitles():
+ item = get_providers.get_providers_auth()["embeddedsubtitles"]
+ assert isinstance(item["included_codecs"], list)
+ assert isinstance(item["hi_fallback"], bool)
+ assert isinstance(item["cache_dir"], str)
+ assert isinstance(item["ffprobe_path"], str)
+ assert isinstance(item["ffmpeg_path"], str)
+ assert isinstance(item["timeout"], str)
diff --git a/tests/subliminal_patch/test_embeddedsubtitles.py b/tests/subliminal_patch/test_embeddedsubtitles.py
index 998964e92..6771cad5d 100644
--- a/tests/subliminal_patch/test_embeddedsubtitles.py
+++ b/tests/subliminal_patch/test_embeddedsubtitles.py
@@ -1,23 +1,21 @@
# -*- coding: utf-8 -*-
import os
-import tempfile
-import fese
from fese import FFprobeSubtitleStream
+from fese import FFprobeVideoContainer
+from fese import tags
import pytest
-import subliminal_patch
from subliminal_patch.core import Episode
from subliminal_patch.core import Movie
-from subliminal_patch.exceptions import MustGetBlacklisted
-from subliminal_patch.providers.embeddedsubtitles import _MemoizedFFprobeVideoContainer
-from subliminal_patch.providers.embeddedsubtitles import EmbeddedSubtitlesProvider
from subliminal_patch.providers.embeddedsubtitles import (
_discard_possible_incomplete_subtitles,
)
+from subliminal_patch.providers.embeddedsubtitles import _get_pretty_release_name
+from subliminal_patch.providers.embeddedsubtitles import _MemoizedFFprobeVideoContainer
+from subliminal_patch.providers.embeddedsubtitles import EmbeddedSubtitlesProvider
from subzero.language import Language
-
-fese.Language = Language
+tags.Language = Language
@pytest.fixture
@@ -46,8 +44,7 @@ def video_multiple_languages(data):
@pytest.fixture
def config(tmpdir):
return {
- "include_ass": True,
- "include_srt": True,
+ "included_codecs": None,
"cache_dir": tmpdir,
"ffprobe_path": None,
"ffmpeg_path": None,
@@ -65,11 +62,25 @@ def video_inexistent(tmpdir):
)
+def test_language_is_subzero_type():
+ assert tags.Language == Language
+
+
def test_init(config):
with EmbeddedSubtitlesProvider(**config) as provider:
assert provider is not None
+def test_init_empty_included_codecs():
+ with EmbeddedSubtitlesProvider(included_codecs=[]) as provider:
+ assert provider._included_codecs == {"ass", "subrip", "webvtt", "mov_text"}
+
+
+def test_init_custom_included_codecs():
+ with EmbeddedSubtitlesProvider(included_codecs=["ass"]) as provider:
+ assert provider._included_codecs == {"ass"}
+
+
def test_inexistent_video(video_inexistent):
with EmbeddedSubtitlesProvider() as provider:
subtitles = provider.list_subtitles(video_inexistent, {})
@@ -124,7 +135,6 @@ def test_list_subtitles_hi_fallback_one_stream(
)
fake = _MemoizedFFprobeVideoContainer.get_subtitles("")[0]
assert fake.disposition.hearing_impaired == True
-
subs = provider.list_subtitles(video_single_language, {language})
assert subs
assert subs[0].hearing_impaired == False
@@ -154,13 +164,17 @@ def test_list_subtitles_hi_fallback_multiple_language_streams(
mocker.patch(
# "fese.FFprobeVideoContainer.get_subtitles",
"subliminal_patch.providers.embeddedsubtitles._MemoizedFFprobeVideoContainer.get_subtitles",
- return_value=[fake_streams["en_hi"], fake_streams["es"], fake_streams["es_hi"]],
+ return_value=[
+ fake_streams["en_hi"],
+ fake_streams["es"],
+ fake_streams["es_hi"],
+ ],
)
subs = provider.list_subtitles(video_single_language, languages)
assert len(subs) == 3
assert subs[0].hearing_impaired == False # English subittle
assert subs[1].hearing_impaired == False # Spanish subtitle
- assert subs[2].hearing_impaired == True # Spanish HI subtitle
+ assert subs[2].hearing_impaired == True # Spanish HI subtitle
def test_list_subtitles_hi_fallback_multiple_hi_streams(
@@ -218,7 +232,7 @@ def test_list_subtitles_multiple_languages(video_multiple_languages):
def test_list_subtitles_wo_ass(video_single_language):
- with EmbeddedSubtitlesProvider(include_ass=False) as provider:
+ with EmbeddedSubtitlesProvider(included_codecs=("srt",)) as provider:
subs = provider.list_subtitles(
video_single_language, {Language.fromalpha2("en")}
)
@@ -226,13 +240,25 @@ def test_list_subtitles_wo_ass(video_single_language):
def test_list_subtitles_wo_srt(video_multiple_languages):
- with EmbeddedSubtitlesProvider(include_srt=False) as provider:
+ with EmbeddedSubtitlesProvider(included_codecs=("ass",)) as provider:
subs = provider.list_subtitles(
video_multiple_languages, {Language.fromalpha2("en")}
)
assert not subs
+def test_get_pretty_release_name():
+ stream = FFprobeSubtitleStream(
+ {
+ "index": 1,
+ "codec_name": "subrip",
+ "tags": {"language": "eng", "title": "forced"},
+ }
+ )
+ container = FFprobeVideoContainer("foo.mkv")
+ assert _get_pretty_release_name(stream, container) == "foo.en.forced.srt"
+
+
def test_download_subtitle_multiple(video_multiple_languages):
with EmbeddedSubtitlesProvider() as provider:
languages = {Language.fromalpha2(code) for code in ("en", "it", "fr")} | {
@@ -242,7 +268,7 @@ def test_download_subtitle_multiple(video_multiple_languages):
subs = provider.list_subtitles(video_multiple_languages, languages)
for sub in subs:
provider.download_subtitle(sub)
- assert sub.content is not None
+ assert sub.is_valid()
def test_download_subtitle_single(video_single_language):
@@ -251,23 +277,7 @@ def test_download_subtitle_single(video_single_language):
video_single_language, {Language.fromalpha2("en")}
)[0]
provider.download_subtitle(subtitle)
- assert subtitle.content is not None
-
-
-def test_download_invalid_subtitle(video_single_language):
- with EmbeddedSubtitlesProvider() as provider:
- subtitle = provider.list_subtitles(
- video_single_language, {Language.fromalpha2("en")}
- )[0]
-
- provider._cached_paths[subtitle.container.path] = {
- subtitle.stream.index: "dummy.srt"
- }
- try:
- provider.download_subtitle(subtitle)
- except MustGetBlacklisted as error:
- assert error.id == subtitle.id
- assert error.media_type == subtitle.media_type
+ assert subtitle.is_valid()
def test_memoized(video_single_language, mocker):