1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
|
# -*- coding: utf-8 -*-
import functools
import logging
import os
import shutil
import tempfile
from babelfish import language_converters
import fese
from fese import check_integrity
from fese import FFprobeSubtitleStream
from fese import FFprobeVideoContainer
from fese import InvalidFile
from fese import to_srt
from subliminal.subtitle import fix_line_ending
from subliminal_patch.core import Episode
from subliminal_patch.core import Movie
from subliminal_patch.exceptions import MustGetBlacklisted
from subliminal_patch.providers import Provider
from subliminal_patch.subtitle import Subtitle
from subzero.language import Language
logger = logging.getLogger(__name__)
# Replace Babelfish's Language with Subzero's Language
fese.Language = Language
class EmbeddedSubtitle(Subtitle):
provider_name = "embeddedsubtitles"
hash_verifiable = False
def __init__(self, stream, container, matches, media_type):
super().__init__(stream.language, stream.disposition.hearing_impaired)
if stream.disposition.forced:
self.language = Language.rebuild(stream.language, forced=True)
self.stream: FFprobeSubtitleStream = stream
self.container: FFprobeVideoContainer = container
self.forced = stream.disposition.forced
self.page_link = self.container.path
self.release_info = _get_pretty_release_name(stream, container)
self.media_type = media_type
self._matches: set = matches
def get_matches(self, video):
if self.hearing_impaired:
self._matches.add("hearing_impaired")
self._matches.add("hash")
return self._matches
@property
def id(self):
return f"{self.container.path}_{self.stream.index}"
class EmbeddedSubtitlesProvider(Provider):
provider_name = "embeddedsubtitles"
languages = {Language("por", "BR"), Language("spa", "MX")} | {
Language.fromalpha2(l) for l in language_converters["alpha2"].codes
}
languages.update(set(Language.rebuild(lang, hi=True) for lang in languages))
languages.update(set(Language.rebuild(lang, forced=True) for lang in languages))
video_types = (Episode, Movie)
subtitle_class = EmbeddedSubtitle
_blacklist = set()
def __init__(
self,
include_ass=True,
include_srt=True,
cache_dir=None,
ffprobe_path=None,
ffmpeg_path=None,
hi_fallback=False,
mergerfs_mode=False,
timeout=600,
):
self._include_ass = include_ass
self._include_srt = include_srt
self._cache_dir = os.path.join(
cache_dir or tempfile.gettempdir(), self.__class__.__name__.lower()
)
self._hi_fallback = hi_fallback
self._cached_paths = {}
self._mergerfs_mode = mergerfs_mode
self._timeout = float(timeout)
fese.FFPROBE_PATH = ffprobe_path or fese.FFPROBE_PATH
fese.FFMPEG_PATH = ffmpeg_path or fese.FFMPEG_PATH
if logger.getEffectiveLevel() == logging.DEBUG:
fese.FF_LOG_LEVEL = "warning"
else:
# Default is True
fese.FFMPEG_STATS = False
def initialize(self):
os.makedirs(self._cache_dir, exist_ok=True)
def terminate(self):
# Remove leftovers
shutil.rmtree(self._cache_dir, ignore_errors=True)
def query(self, path: str, languages, media_type):
video = _get_memoized_video_container(path)
try:
streams = filter(_check_allowed_extensions, video.get_subtitles())
except fese.InvalidSource as error:
logger.error("Error trying to get subtitles for %s: %s", video, error)
self._blacklist.add(path)
streams = []
if not streams:
logger.debug("No subtitles found for container: %s", video)
only_forced = all(lang.forced for lang in languages)
also_forced = any(lang.forced for lang in languages)
allowed_streams = []
for stream in streams:
if not self._include_ass and stream.extension == "ass":
logger.debug("Ignoring ASS: %s", stream)
continue
if not self._include_srt and stream.extension == "srt":
logger.debug("Ignoring SRT: %s", stream)
continue
if stream.language not in languages:
continue
disposition = stream.disposition
if only_forced and not disposition.forced:
continue
if (
disposition.generic
or disposition.hearing_impaired
or (disposition.forced and also_forced)
):
logger.debug("Appending subtitle: %s", stream)
allowed_streams.append(stream)
else:
logger.debug("Ignoring unwanted subtitle: %s", stream)
if self._hi_fallback:
_check_hi_fallback(allowed_streams, languages)
logger.debug("Cache info: %s", _get_memoized_video_container.cache_info())
return [
EmbeddedSubtitle(stream, video, {"hash"}, media_type)
for stream in allowed_streams
]
def list_subtitles(self, video, languages):
if not self._is_path_valid(video.original_path):
logger.debug("Ignoring video: %s", video)
return []
return self.query(
video.original_path,
languages,
"series" if isinstance(video, Episode) else "movie",
)
def download_subtitle(self, subtitle):
path = self._get_subtitle_path(subtitle)
with open(path, "rb") as sub:
content = sub.read()
subtitle.content = fix_line_ending(content)
def _get_subtitle_path(self, subtitle: EmbeddedSubtitle):
container = subtitle.container
# Check if the container is not already in the instance
if container.path not in self._cached_paths:
# Extract all subittle streams to avoid reading the entire
# container over and over
streams = filter(_check_allowed_extensions, container.get_subtitles())
extracted = container.extract_subtitles(
list(streams), self._cache_dir, timeout=self._timeout
)
# Add the extracted paths to the containter path key
self._cached_paths[container.path] = extracted
cached_path = self._cached_paths[container.path]
# Get the subtitle file by index
subtitle_path = cached_path[subtitle.stream.index]
try:
check_integrity(subtitle.stream, subtitle_path)
except InvalidFile as error:
raise MustGetBlacklisted(subtitle.id, subtitle.media_type) from error
# Convert to SRT if the subtitle is ASS
new_subtitle_path = to_srt(subtitle_path, remove_source=True)
if new_subtitle_path != subtitle_path:
cached_path[subtitle.stream.index] = new_subtitle_path
return new_subtitle_path
def _is_path_valid(self, path):
if path in self._blacklist:
logger.debug("Blacklisted path: %s", path)
return False
if not os.path.isfile(path):
logger.debug("Inexistent file: %s", path)
return False
if self._mergerfs_mode and _is_fuse_rclone_mount(path):
logger.debug("Potential cloud file: %s", path)
return False
return True
class _MemoizedFFprobeVideoContainer(FFprobeVideoContainer):
# 128 is the default value for maxsize since Python 3.8. We ste it here for previous versions.
@functools.lru_cache(maxsize=128)
def get_subtitles(self, *args, **kwargs):
return super().get_subtitles(*args, **kwargs)
@functools.lru_cache(maxsize=8096)
def _get_memoized_video_container(path: str):
return _MemoizedFFprobeVideoContainer(path)
def _check_allowed_extensions(subtitle: FFprobeSubtitleStream):
return subtitle.extension in ("ass", "srt")
def _check_hi_fallback(streams, languages):
for language in languages:
compatible_streams = [
stream for stream in streams if stream.language == language
]
if len(compatible_streams) == 1:
stream = compatible_streams[0]
logger.debug("HI fallback: updating %s HI to False", stream)
stream.disposition.hearing_impaired = False
elif all(stream.disposition.hearing_impaired for stream in streams):
for stream in streams:
logger.debug("HI fallback: updating %s HI to False", stream)
stream.disposition.hearing_impaired = False
else:
logger.debug("HI fallback not needed: %s", compatible_streams)
def _is_fuse_rclone_mount(path: str):
# Experimental!
# This function only makes sense if you are combining a rclone mount with a local mount
# with mergerfs or similar tools. Don't use it otherwise.
# It tries to guess whether a file is a cloud mount by the length
# of the inode number. See the following links for reference.
# https://forum.rclone.org/t/fuse-inode-number-aufs/215/5
# https://pkg.go.dev/bazil.org/fuse/fs?utm_source=godoc#GenerateDynamicInode
return len(str(os.stat(path).st_ino)) > 18
def _get_pretty_release_name(stream, container):
bname = os.path.basename(container.path)
return f"{os.path.splitext(bname)[0]}.{stream.suffix}"
|