1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
|
# coding=utf-8
from __future__ import absolute_import
import logging
import rarfile
import os
from subliminal.exceptions import ConfigurationError
from subliminal.providers.legendastv import LegendasTVSubtitle as _LegendasTVSubtitle, \
LegendasTVProvider as _LegendasTVProvider, Episode, Movie, guessit, sanitize, region, type_map, \
raise_for_status, json, SHOW_EXPIRATION_TIME, title_re, season_re, datetime, pytz, NO_VALUE, releases_key, \
SUBTITLE_EXTENSIONS, language_converters, ServiceUnavailable
from requests.exceptions import RequestException
from subliminal_patch.providers import reinitialize_on_error
from subliminal_patch.subtitle import guess_matches
from subzero.language import Language
logger = logging.getLogger(__name__)
class LegendasTVSubtitle(_LegendasTVSubtitle):
def __init__(self, language, type, title, year, imdb_id, season, archive, name):
super(LegendasTVSubtitle, self).__init__(language, type, title, year, imdb_id, season, archive, name)
self.archive.content = None
self.release_info = name.rstrip('.srt').split('/')[-1]
self.page_link = archive.link
def make_picklable(self):
self.archive.content = None
return self
def get_matches(self, video, hearing_impaired=False):
matches = set()
# episode
if isinstance(video, Episode) and self.type == 'episode':
# series
if video.series and (sanitize(self.title) in (
sanitize(name) for name in [video.series] + video.alternative_series)):
matches.add('series')
# year
if video.original_series and self.year is None or video.year and video.year == self.year:
matches.add('year')
# imdb_id
if video.series_imdb_id and self.imdb_id == video.series_imdb_id:
matches.add('series_imdb_id')
# movie
elif isinstance(video, Movie) and self.type == 'movie':
# title
if video.title and (sanitize(self.title) in (
sanitize(name) for name in [video.title] + video.alternative_titles)):
matches.add('title')
# year
if video.year and self.year == video.year:
matches.add('year')
# imdb_id
if video.imdb_id and self.imdb_id == video.imdb_id:
matches.add('imdb_id')
# name
matches |= guess_matches(video, guessit(self.name, {'type': self.type}))
return matches
class LegendasTVProvider(_LegendasTVProvider):
languages = {Language(*l) for l in language_converters['legendastv'].to_legendastv.keys()}
video_types = (Episode, Movie)
subtitle_class = LegendasTVSubtitle
def __init__(self, username=None, password=None, featured_only=False):
# Provider needs UNRAR installed. If not available raise ConfigurationError
try:
rarfile.tool_setup()
except rarfile.RarCannotExec:
raise ConfigurationError('RAR extraction tool not available')
if any((username, password)) and not all((username, password)):
raise ConfigurationError('Username and password must be specified')
self.username = username
self.password = password
self.logged_in = False
self.session = None
self.featured_only = featured_only
@staticmethod
def is_valid_title(title, title_id, sanitized_title, season, year, imdb_id):
"""Check if is a valid title."""
if title["imdb_id"] and title["imdb_id"] == imdb_id:
logger.debug(u'Matched title "%s" as IMDB ID %s', sanitized_title, title["imdb_id"])
return True
if title["title2"] and sanitize(title['title2']) == sanitized_title:
logger.debug(u'Matched title "%s" as "%s"', sanitized_title, title["title2"])
return True
return _LegendasTVProvider.is_valid_title(title, title_id, sanitized_title, season, year)
@region.cache_on_arguments(expiration_time=SHOW_EXPIRATION_TIME, should_cache_fn=lambda value: value)
def search_titles(self, titles, season, title_year, imdb_id):
"""Search for titles matching the `title`.
For episodes, each season has it own title
:param str titles: the titles to search for.
:param int season: season of the title
:param int title_year: year of the title
:return: found titles.
:rtype: dict
"""
titles_found = {}
for title in titles:
sanitized_titles = [sanitize(title)]
ignore_characters = {'\'', '.'}
if any(c in title for c in ignore_characters):
sanitized_titles.append(sanitize(title, ignore_characters=ignore_characters))
for sanitized_title in sanitized_titles:
# make the query
if season:
logger.info('Searching episode title %r for season %r', sanitized_title, season)
else:
logger.info('Searching movie title %r', sanitized_title)
r = self.session.get(self.server_url + 'legenda/sugestao/{}'.format(sanitized_title), timeout=10)
raise_for_status(r)
results = json.loads(r.text)
# loop over results
for result in results:
source = result['_source']
# extract id
title_id = int(source['id_filme'])
# extract type
title = {'type': type_map[source['tipo']], 'title2': None, 'imdb_id': None}
# extract title, year and country
name, year, country = title_re.match(source['dsc_nome']).groups()
title['title'] = name
if "dsc_nome_br" in source:
name2, ommit1, ommit2 = title_re.match(source['dsc_nome_br']).groups()
title['title2'] = name2
# extract imdb_id
if source['id_imdb'] != '0':
if not source['id_imdb'].startswith('tt'):
title['imdb_id'] = 'tt' + source['id_imdb'].zfill(7)
else:
title['imdb_id'] = source['id_imdb']
# extract season
if title['type'] == 'episode':
if source['temporada'] and source['temporada'].isdigit():
title['season'] = int(source['temporada'])
else:
match = season_re.search(source['dsc_nome_br'])
if match:
title['season'] = int(match.group('season'))
else:
logger.debug('No season detected for title %d (%s)', title_id, name)
# extract year
if year:
title['year'] = int(year)
elif source['dsc_data_lancamento'] and source['dsc_data_lancamento'].isdigit():
# year is based on season air date hence the adjustment
title['year'] = int(source['dsc_data_lancamento']) - title.get('season', 1) + 1
# add title only if is valid
# Check against title without ignored chars
if self.is_valid_title(title, title_id, sanitized_titles[0], season, title_year, imdb_id):
logger.debug(u'Found title: %s', title)
titles_found[title_id] = title
logger.debug('Found %d titles', len(titles_found))
return titles_found
@reinitialize_on_error((RequestException, ServiceUnavailable), attempts=1)
def query(self, language, titles, season=None, episode=None, year=None, imdb_id=None):
# search for titles
titles_found = self.search_titles(titles, season, year, imdb_id)
subtitles = []
# iterate over titles
for title_id, t in titles_found.items():
# Skip episodes or movies if it's not what was requested
if (season and t['type'] == 'movie') or (not season and t['type'] == 'episode'):
continue
# Skip if season isn't matching
if season and season != t.get('season'):
continue
# Skip if season wasn't provided (not an episode) but one is returned by provider (wrong type)
if not season and t.get('season'):
continue
logger.info('Getting archives for title %d and language %d', title_id, language.legendastv)
archives = self.get_archives(title_id, language.legendastv, t['type'], season, episode)
if not archives:
logger.info('No archives found for title %d and language %d', title_id, language.legendastv)
# iterate over title's archives
for a in archives:
# Check if featured
if self.featured_only and a.featured == False:
logger.info('Subtitle is not featured, skipping')
continue
# compute an expiration time based on the archive timestamp
expiration_time = (datetime.utcnow().replace(tzinfo=pytz.utc) - a.timestamp).total_seconds()
# attempt to get the releases from the cache
cache_key = str(a.id + "|" + a.name)
releases = region.get(cache_key, expiration_time=expiration_time)
# the releases are not in cache or cache is expired
if releases == NO_VALUE:
logger.info('Releases not found in cache')
# download archive
self.download_archive(a)
# extract the releases
releases = []
for name in a.content.namelist():
# discard the legendastv file
if name.startswith('Legendas.tv'):
continue
# discard hidden files
if os.path.split(name)[-1].startswith('.'):
continue
# discard non-subtitle files
if not name.lower().endswith(SUBTITLE_EXTENSIONS):
continue
releases.append(name)
# cache the releases
region.set(cache_key, releases)
# iterate over releases
for r in releases:
subtitle = self.subtitle_class(language, t['type'], t['title'], t.get('year'), t.get('imdb_id'),
t.get('season'), a, r)
logger.debug('Found subtitle %r', subtitle)
subtitles.append(subtitle)
return subtitles
def list_subtitles(self, video, languages):
season = episode = None
if isinstance(video, Episode):
titles = [video.series] + video.alternative_series
season = video.season
episode = video.episode
imdb = video.series_imdb_id
else:
titles = [video.title] + video.alternative_titles
imdb = video.imdb_id
subtitles = [s for l in languages for s in
self.query(l, titles, season=season, episode=episode, year=video.year, imdb_id=imdb)]
if subtitles:
return subtitles
else:
return []
def download_subtitle(self, subtitle):
super(LegendasTVProvider, self).download_subtitle(subtitle)
subtitle.archive.content = None
def get_archives(self, title_id, language_code, title_type, season, episode):
return super(LegendasTVProvider, self).get_archives.original(self, title_id, language_code, title_type,
season, episode)
|