1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
|
# -*- coding: utf-8 -*-
import enum
import io
import logging
import re
import zipfile
from random import randint
from urllib.parse import urljoin, urlparse, parse_qs, quote
import rarfile
from guessit import guessit
from requests import Session
from requests.adapters import HTTPAdapter
from requests.exceptions import HTTPError
from subliminal.cache import region as cache
from subliminal.exceptions import AuthenticationError, ConfigurationError, DownloadLimitExceeded, ProviderError
from subliminal.providers import ParserBeautifulSoup
from subliminal.subtitle import fix_line_ending
from subliminal.video import Episode, Movie
from subliminal_patch.providers import Provider
from subliminal_patch.providers.mixins import ProviderSubtitleArchiveMixin
from subliminal_patch.subtitle import Subtitle, guess_matches
from dogpile.cache.api import NO_VALUE
from subzero.language import Language
from .utils import FIRST_THOUSAND_OR_SO_USER_AGENTS as AGENT_LIST
logger = logging.getLogger(__name__)
class SubtitlesType(enum.Enum):
EPISODE = enum.auto()
MOVIE = enum.auto()
class TitulkySubtitle(Subtitle):
provider_name = 'titulky'
hash_verifiable = False
hearing_impaired_verifiable = False
def __init__(self,
sub_id,
imdb_id,
language,
season,
episode,
release_info,
uploader,
approved,
page_link,
download_link,
asked_for_episode=None):
super().__init__(language, page_link=page_link)
self.sub_id = sub_id
self.imdb_id = imdb_id
self.season = season
self.episode = episode
self.releases = [release_info]
self.release_info = release_info
self.language = language
self.approved = approved
self.page_link = page_link
self.uploader = uploader
self.download_link = download_link
self.asked_for_episode = asked_for_episode
self.matches = None
@property
def id(self):
return self.sub_id
def get_matches(self, video):
matches = set()
media_type = 'movie' if isinstance(video, Movie) else 'episode'
if media_type == 'episode':
# match imdb_id of a series
if video.series_imdb_id and video.series_imdb_id == self.imdb_id:
matches |= {'series_imdb_id', 'series', 'year'}
# match season/episode
if self.season and self.season == video.season:
matches.add('season')
if self.episode and self.episode == video.episode:
matches.add('episode')
elif media_type == 'movie':
# match imdb_id of a movie
if video.imdb_id and video.imdb_id == self.imdb_id:
matches |= {'imdb_id', 'title', 'year'}
matches |= guess_matches(video, guessit(self.release_info, {"type": media_type}))
self.matches = matches
return matches
class TitulkyProvider(Provider, ProviderSubtitleArchiveMixin):
languages = {Language(l) for l in ['ces', 'slk']}
video_types = (Episode, Movie)
hash_verifiable = False
hearing_impaired_verifiable = False
server_url = 'https://premium.titulky.com'
login_url = server_url
logout_url = f"{server_url}?action=logout"
download_url = f"{server_url}/download.php?id="
timeout = 30
max_threads = 5
subtitle_class = TitulkySubtitle
def __init__(self,
username=None,
password=None,
approved_only=None):
if not all([username, password]):
raise ConfigurationError("Username and password must be specified!")
if type(approved_only) is not bool:
raise ConfigurationError(f"Approved_only {approved_only} must be a boolean!")
self.username = username
self.password = password
self.approved_only = approved_only
self.session = None
def initialize(self):
self.session = Session()
# Set headers
cached_user_agent = cache.get('titulky_user_agent')
if cached_user_agent == NO_VALUE:
new_user_agent = AGENT_LIST[randint(0, len(AGENT_LIST) - 1)]
cache.set('titulky_user_agent', new_user_agent)
self.session.headers['User-Agent'] = new_user_agent
else:
self.session.headers['User-Agent'] = cached_user_agent
self.session.headers['Accept'] = 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'
self.session.headers['Accept-Language'] = 'sk,cz,en;q=0.5'
self.session.headers['Accept-Encoding'] = 'gzip, deflate'
self.session.headers['DNT'] = '1'
self.session.headers['Connection'] = 'keep-alive'
self.session.headers['Upgrade-Insecure-Requests'] = '1'
self.session.headers['Cache-Control'] = 'max-age=0'
self.login()
def terminate(self):
self.session.close()
def login(self, bypass_cache=False):
# Reuse all cookies if found in cache and skip login.
cached_cookiejar = cache.get('titulky_cookiejar')
if not bypass_cache and cached_cookiejar != NO_VALUE:
logger.info("Titulky.com: Reusing cached cookies.")
self.session.cookies.update(cached_cookiejar)
return True
logger.debug("Titulky.com: Logging in...")
data = {'LoginName': self.username, 'LoginPassword': self.password}
res = self.session.post(self.server_url,
data,
allow_redirects=False,
timeout=self.timeout,
headers={'Referer': self.server_url})
location_qs = parse_qs(urlparse(res.headers['Location']).query)
# If the response is a redirect and doesnt point to an error message page, then we are logged in
if res.status_code == 302 and location_qs['msg_type'][0] == 'i':
if 'omezené' in location_qs['msg'][0].lower():
raise AuthenticationError("V.I.P. account is required for this provider to work!")
else:
logger.info("Titulky.com: Successfully logged in, caching cookies for future connections...")
cache.set('titulky_cookiejar', self.session.cookies.copy())
return True
else:
raise AuthenticationError("Login failed")
def logout(self):
logger.info("Titulky.com: Logging out")
res = self.session.get(self.logout_url,
allow_redirects=False,
timeout=self.timeout,
headers={'Referer': self.server_url})
location_qs = parse_qs(urlparse(res.headers['Location']).query)
logger.info("Titulky.com: Clearing cache...")
cache.delete('titulky_cookiejar')
cache.delete('titulky_user_agent')
# If the response is a redirect and doesnt point to an error message page, then we are logged out
if res.is_redirect and location_qs['msg_type'][0] == 'i':
return True
else:
raise AuthenticationError("Logout failed.")
# GET request a page. This functions acts as a requests.session.get proxy handling expired cached cookies
# and subsequent relogging and sending the original request again. If all went well, returns the response.
# Additionally handle allow_redirects by ourselves to follow redirects UNLESS they are redirecting to an
# error page. In such case we would like to know what has happend and act accordingly.
def get_request(self, url, ref=server_url, allow_redirects=False, _recursion=0):
# That's deep... recursion... Stop. We don't have infinite memmory. And don't want to
# spam titulky's server either. So we have to just accept the defeat. Let it throw!
if _recursion >= 10:
raise AuthenticationError("Got into a redirect loop! Oops.")
logger.debug(f"Titulky.com: Fetching url: {url}")
res = self.session.get(
url,
timeout=self.timeout,
allow_redirects=False,
headers={'Referer': quote(ref) if ref else None}) # URL encode ref if it has value
if res.is_redirect:
# Dont bother doing anything if we do not want to redirect. Just return the original response..
if allow_redirects is False:
return res
location_qs = parse_qs(urlparse(res.headers['Location']).query)
# If the msg_type query parameter does NOT equal to 'e' or is absent, follow the URL in the Location header.
if allow_redirects is True and ('msg_type' not in location_qs or ('msg_type' in location_qs and location_qs['msg_type'][0] != 'e')):
return self.get_request(urljoin(res.headers['Origin'] or self.server_url, res.headers['Location']), ref=url, allow_redirects=True, _recursion=(_recursion + 1))
# Check if we got redirected because login cookies expired.
if "přihlašte" in location_qs['msg'][0].lower():
logger.info(f"Titulky.com: Login cookies expired.")
self.login(True)
return self.get_request(url, ref=ref, allow_redirects=True, _recursion=(_recursion + 1))
return res
def fetch_page(self, url, ref=server_url, allow_redirects=False):
res = self.get_request(url, ref=ref, allow_redirects=allow_redirects)
if res.status_code != 200:
raise HTTPError(f"Fetch failed with status code {res.status_code}")
if not res.text:
raise ProviderError("No response returned from the provider")
return res.text
def build_url(self, params):
result = f"{self.server_url}/?"
for key, value in params.items():
result += f'{key}={value}&'
# Remove the last &
result = result[:-1]
# Remove spaces
result = result.replace(' ', '+')
return result
"""
There are multiple ways to find substitles on Titulky.com, however we are
going to utilize a page that lists all available subtitles for all episodes in a season
To my surprise, the server in this case treats movies as a tv series with a "0" season and "0" episode
BROWSE subtitles by IMDB ID:
- Subtitles are here categorised by seasons and episodes
- URL: https://premium.titulky.com/?action=serial&step=<SEASON>&id=<IMDB ID>
- it seems that the url redirects to a page with their own internal ID, redirects should be allowed here
"""
def query(self, languages,
media_type,
imdb_id,
season=0,
episode=0):
params = {
'action': 'serial',
# If browsing subtitles for a movie, then set the step parameter to 0
'step': season,
# Remove the "tt" prefix
'id': imdb_id[2:]
}
browse_url = self.build_url(params)
html_src = self.fetch_page(browse_url, allow_redirects=True)
browse_page_soup = ParserBeautifulSoup(html_src, ['lxml', 'html.parser'])
# Container element containing subtitle div rows, None if the series was not found or similar
container = browse_page_soup.find('form', class_='cloudForm')
# No container with subtitles
if not container:
logger.info("Titulky.com: Could not find container element. No subtitles found.")
return []
# All rows: subtitle rows, episode number rows, useless rows... Gotta filter this out.
all_rows = container.find_all('div', class_='row')
# Filtering and parsing rows
episodes_dict = {}
last_ep_num = None
for row in all_rows:
# This element holds the episode number of following row(s) of subtitles
# E.g.: 1., 2., 3., 4.
number_container = row.find('h5')
# Link to the sub details
anchor = row.find('a') if 'pbl1' in row['class'] or 'pbl0' in row['class'] else None
if number_container:
# The text content of this container is the episode number
try:
# Remove period at the end and parse the string into a number
number_str = number_container.text.strip().rstrip('.')
number = int(number_str) if number_str else 0
last_ep_num = number
except:
raise ProviderError("Could not parse episode number!")
elif anchor:
# The container contains link to details page
if last_ep_num is None:
raise ProviderError("Previous episode number missing, can't parse.")
release_info = anchor.get_text(strip=True)
if release_info == '???':
release_info = ''
details_link = f"{self.server_url}{anchor.get('href')[1:]}"
id_match = re.findall(r'id=(\d+)', details_link)
sub_id = id_match[0] if len(id_match) > 0 else None
download_link = f"{self.download_url}{sub_id}"
# Approved subtitles have a pbl1 class for their row, others have a pbl0 class
approved = True if 'pbl1' in row.get('class') else False
uploader = row.contents[5].get_text(strip=True)
# Parse language to filter out subtitles that are not in the desired language
sub_language = None
czech_flag = row.select('img[src*=\'flag-CZ\']')
slovak_flag = row.select('img[src*=\'flag-SK\']')
if czech_flag and not slovak_flag:
sub_language = Language('ces')
elif slovak_flag and not czech_flag:
sub_language = Language('slk')
else:
logger.debug("Titulky.com: Unknown language while parsing subtitles!")
continue
# If the subtitles language is not requested
if sub_language not in languages:
logger.debug("Titulky.com: Language not in desired languages, skipping...")
continue
# Skip unapproved subtitles if turned on in settings
if self.approved_only and not approved:
logger.debug("Titulky.com: Approved only, skipping...")
continue
result = {
'id': sub_id,
'release_info': release_info,
'approved': approved,
'language': sub_language,
'uploader': uploader,
'details_link': details_link,
'download_link': download_link
}
# If this row contains the first subtitles to an episode number,
# add an empty array into the episodes dict at its place.
if not last_ep_num in episodes_dict:
episodes_dict[last_ep_num] = []
episodes_dict[last_ep_num].append(result)
# Clean up
browse_page_soup.decompose()
browse_page_soup = None
# Rows parsed into episodes_dict, now lets read what we got.
if not episode in episodes_dict:
# well, we got nothing, that happens!
logger.info("Titulky.com: No subtitles found")
return []
sub_infos = episodes_dict[episode]
# After parsing, create new instances of Subtitle class
subtitles = []
for sub_info in sub_infos:
subtitle_instance = self.subtitle_class(
sub_info['id'],
imdb_id,
sub_info['language'],
season if media_type is SubtitlesType.EPISODE else None,
episode if media_type is SubtitlesType.EPISODE else None,
sub_info['release_info'],
sub_info['uploader'],
sub_info['approved'],
sub_info['details_link'],
sub_info['download_link'],
asked_for_episode=(media_type is SubtitlesType.EPISODE)
)
subtitles.append(subtitle_instance)
return subtitles
def list_subtitles(self, video, languages):
subtitles = []
if isinstance(video, Episode):
if video.series_imdb_id:
logger.info("Titulky.com: Searching subtitles for a TV series episode")
subtitles = self.query(languages, SubtitlesType.EPISODE,
imdb_id=video.series_imdb_id,
season=video.season,
episode=video.episode)
else:
logger.info(f"Titulky.com: Skipping {video}! No IMDB ID found.")
elif isinstance(video, Movie):
if video.imdb_id:
logger.info("Titulky.com: Searching subtitles for a movie")
subtitles = self.query(languages, SubtitlesType.MOVIE, imdb_id=video.imdb_id)
else:
logger.info(f"Titulky.com: Skipping {video}! No IMDB ID found.")
return subtitles
def download_subtitle(self, subtitle):
res = self.get_request(subtitle.download_link, ref=subtitle.page_link)
try:
res.raise_for_status()
except:
raise HTTPError(f"An error occured during the download request to {subtitle.download_link}")
archive_stream = io.BytesIO(res.content)
archive = None
if rarfile.is_rarfile(archive_stream):
logger.debug("Titulky.com: Identified rar archive")
archive = rarfile.RarFile(archive_stream)
subtitle_content = self.get_subtitle_from_archive(subtitle, archive)
elif zipfile.is_zipfile(archive_stream):
logger.debug("Titulky.com: Identified zip archive")
archive = zipfile.ZipFile(archive_stream)
subtitle_content = self.get_subtitle_from_archive(subtitle, archive)
else:
subtitle_content = fix_line_ending(res.content)
if archive and len(archive.infolist()) > 1 and not subtitle_content:
logger.info(f"Titulky.com: Couldn't find a proper subtitle file in the downloaded archive.")
elif archive and len(archive.infolist()) == 1 and not subtitle_content:
raise DownloadLimitExceeded("Subtitles download limit has been exceeded")
elif not subtitle_content:
raise ProviderError("No subtitles provided from titulky")
subtitle.content = subtitle_content
|