summaryrefslogtreecommitdiffhomepage
path: root/libs/subliminal_patch/providers/legendastv.py
blob: 3c3476b53842b4cd3469958ad44144c5e1bb8b8f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
# coding=utf-8
from __future__ import absolute_import
import logging
import rarfile
import os
from subliminal.exceptions import ConfigurationError

from subliminal.providers.legendastv import LegendasTVSubtitle as _LegendasTVSubtitle, \
    LegendasTVProvider as _LegendasTVProvider, Episode, Movie, guessit, sanitize, region, type_map, \
    raise_for_status, json, SHOW_EXPIRATION_TIME, title_re, season_re, datetime, pytz, NO_VALUE, releases_key, \
    SUBTITLE_EXTENSIONS, language_converters, ServiceUnavailable

from requests.exceptions import RequestException
from subliminal_patch.providers import reinitialize_on_error
from subliminal_patch.subtitle import guess_matches
from subzero.language import Language

logger = logging.getLogger(__name__)


class LegendasTVSubtitle(_LegendasTVSubtitle):
    def __init__(self, language, type, title, year, imdb_id, season, archive, name):
        super(LegendasTVSubtitle, self).__init__(language, type, title, year, imdb_id, season, archive, name)
        self.archive.content = None
        self.release_info = name.rstrip('.srt').split('/')[-1]
        self.page_link = archive.link

    def make_picklable(self):
        self.archive.content = None
        return self

    def get_matches(self, video, hearing_impaired=False):
        matches = set()

        # episode
        if isinstance(video, Episode) and self.type == 'episode':
            # series
            if video.series and (sanitize(self.title) in (
                    sanitize(name) for name in [video.series] + video.alternative_series)):
                matches.add('series')

            # year
            if video.original_series and self.year is None or video.year and video.year == self.year:
                matches.add('year')

            # imdb_id
            if video.series_imdb_id and self.imdb_id == video.series_imdb_id:
                matches.add('series_imdb_id')

        # movie
        elif isinstance(video, Movie) and self.type == 'movie':
            # title
            if video.title and (sanitize(self.title) in (
                    sanitize(name) for name in [video.title] + video.alternative_titles)):
                matches.add('title')

            # year
            if video.year and self.year == video.year:
                matches.add('year')

            # imdb_id
            if video.imdb_id and self.imdb_id == video.imdb_id:
                matches.add('imdb_id')

        # name
        matches |= guess_matches(video, guessit(self.name, {'type': self.type}))

        return matches


class LegendasTVProvider(_LegendasTVProvider):
    languages = {Language(*l) for l in language_converters['legendastv'].to_legendastv.keys()}
    video_types = (Episode, Movie)
    subtitle_class = LegendasTVSubtitle

    def __init__(self, username=None, password=None, featured_only=False):

        # Provider needs UNRAR installed. If not available raise ConfigurationError
        try:
            rarfile.tool_setup()
        except rarfile.RarCannotExec:
            raise ConfigurationError('RAR extraction tool not available')

        if any((username, password)) and not all((username, password)):
            raise ConfigurationError('Username and password must be specified')

        self.username = username
        self.password = password
        self.logged_in = False
        self.session = None
        self.featured_only = featured_only

    @staticmethod
    def is_valid_title(title, title_id, sanitized_title, season, year, imdb_id):
        """Check if is a valid title."""
        if title["imdb_id"] and title["imdb_id"] == imdb_id:
            logger.debug(u'Matched title "%s" as IMDB ID %s', sanitized_title, title["imdb_id"])
            return True

        if title["title2"] and sanitize(title['title2']) == sanitized_title:
            logger.debug(u'Matched title "%s" as "%s"', sanitized_title, title["title2"])
            return True

        return _LegendasTVProvider.is_valid_title(title, title_id, sanitized_title, season, year)

    @region.cache_on_arguments(expiration_time=SHOW_EXPIRATION_TIME, should_cache_fn=lambda value: value)
    def search_titles(self, titles, season, title_year, imdb_id):
        """Search for titles matching the `title`.

        For episodes, each season has it own title
        :param str titles: the titles to search for.
        :param int season: season of the title
        :param int title_year: year of the title
        :return: found titles.
        :rtype: dict
        """
        titles_found = {}

        for title in titles:
            sanitized_titles = [sanitize(title)]
            ignore_characters = {'\'', '.'}
            if any(c in title for c in ignore_characters):
                sanitized_titles.append(sanitize(title, ignore_characters=ignore_characters))

            for sanitized_title in sanitized_titles:
                # make the query
                if season:
                    logger.info('Searching episode title %r for season %r', sanitized_title, season)
                else:
                    logger.info('Searching movie title %r', sanitized_title)

                r = self.session.get(self.server_url + 'legenda/sugestao/{}'.format(sanitized_title), timeout=10)
                raise_for_status(r)
                results = json.loads(r.text)

                # loop over results
                for result in results:
                    source = result['_source']

                    # extract id
                    title_id = int(source['id_filme'])

                    # extract type
                    title = {'type': type_map[source['tipo']], 'title2': None, 'imdb_id': None}

                    # extract title, year and country
                    name, year, country = title_re.match(source['dsc_nome']).groups()
                    title['title'] = name

                    if "dsc_nome_br" in source:
                        name2, ommit1, ommit2 = title_re.match(source['dsc_nome_br']).groups()
                        title['title2'] = name2

                    # extract imdb_id
                    if source['id_imdb'] != '0':
                        if not source['id_imdb'].startswith('tt'):
                            title['imdb_id'] = 'tt' + source['id_imdb'].zfill(7)
                        else:
                            title['imdb_id'] = source['id_imdb']

                    # extract season
                    if title['type'] == 'episode':
                        if source['temporada'] and source['temporada'].isdigit():
                            title['season'] = int(source['temporada'])
                        else:
                            match = season_re.search(source['dsc_nome_br'])
                            if match:
                                title['season'] = int(match.group('season'))
                            else:
                                logger.debug('No season detected for title %d (%s)', title_id, name)

                    # extract year
                    if year:
                        title['year'] = int(year)
                    elif source['dsc_data_lancamento'] and source['dsc_data_lancamento'].isdigit():
                        # year is based on season air date hence the adjustment
                        title['year'] = int(source['dsc_data_lancamento']) - title.get('season', 1) + 1

                    # add title only if is valid
                    # Check against title without ignored chars
                    if self.is_valid_title(title, title_id, sanitized_titles[0], season, title_year, imdb_id):
                        logger.debug(u'Found title: %s', title)
                        titles_found[title_id] = title

            logger.debug('Found %d titles', len(titles_found))

        return titles_found

    @reinitialize_on_error((RequestException, ServiceUnavailable), attempts=1)
    def query(self, language, titles, season=None, episode=None, year=None, imdb_id=None):
        # search for titles
        titles_found = self.search_titles(titles, season, year, imdb_id)

        subtitles = []
        # iterate over titles
        for title_id, t in titles_found.items():
            # Skip episodes or movies if it's not what was requested
            if (season and t['type'] == 'movie') or (not season and t['type'] == 'episode'):
                continue

            # Skip if season isn't matching
            if season and season != t.get('season'):
                continue

            # Skip if season wasn't provided (not an episode) but one is returned by provider (wrong type)
            if not season and t.get('season'):
                continue

            logger.info('Getting archives for title %d and language %d', title_id, language.legendastv)
            archives = self.get_archives(title_id, language.legendastv, t['type'], season, episode)
            if not archives:
                logger.info('No archives found for title %d and language %d', title_id, language.legendastv)

            # iterate over title's archives
            for a in archives:

                # Check if featured
                if self.featured_only and a.featured == False:
                    logger.info('Subtitle is not featured, skipping')
                    continue

                # compute an expiration time based on the archive timestamp
                expiration_time = (datetime.utcnow().replace(tzinfo=pytz.utc) - a.timestamp).total_seconds()

                # attempt to get the releases from the cache
                cache_key = str(a.id + "|" + a.name)
                releases = region.get(cache_key, expiration_time=expiration_time)

                # the releases are not in cache or cache is expired
                if releases == NO_VALUE:
                    logger.info('Releases not found in cache')

                    # download archive
                    self.download_archive(a)

                    # extract the releases
                    releases = []
                    for name in a.content.namelist():
                        # discard the legendastv file
                        if name.startswith('Legendas.tv'):
                            continue

                        # discard hidden files
                        if os.path.split(name)[-1].startswith('.'):
                            continue

                        # discard non-subtitle files
                        if not name.lower().endswith(SUBTITLE_EXTENSIONS):
                            continue

                        releases.append(name)

                    # cache the releases
                    region.set(cache_key, releases)

                # iterate over releases
                for r in releases:
                    subtitle = self.subtitle_class(language, t['type'], t['title'], t.get('year'), t.get('imdb_id'),
                                                   t.get('season'), a, r)
                    logger.debug('Found subtitle %r', subtitle)
                    subtitles.append(subtitle)

        return subtitles

    def list_subtitles(self, video, languages):
        season = episode = None
        if isinstance(video, Episode):
            titles = [video.series] + video.alternative_series
            season = video.season
            episode = video.episode
            imdb = video.series_imdb_id
        else:
            titles = [video.title] + video.alternative_titles
            imdb = video.imdb_id

        subtitles = [s for l in languages for s in
                     self.query(l, titles, season=season, episode=episode, year=video.year, imdb_id=imdb)]
        if subtitles:
            return subtitles
        else:
            return []

    def download_subtitle(self, subtitle):
        super(LegendasTVProvider, self).download_subtitle(subtitle)
        subtitle.archive.content = None

    def get_archives(self, title_id, language_code, title_type, season, episode):
        return super(LegendasTVProvider, self).get_archives.original(self, title_id, language_code, title_type,
                                                                     season, episode)