aboutsummaryrefslogtreecommitdiffhomepage
path: root/libs/guessit/rules/processors.py
blob: 5b018140c8ee027052e65e232fbac1c1d541ff67 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Processors
"""
from collections import defaultdict
import copy

import six

from rebulk import Rebulk, Rule, CustomRule, POST_PROCESS, PRE_PROCESS, AppendMatch, RemoveMatch

from .common import seps_no_groups
from .common.formatters import cleanup
from .common.comparators import marker_sorted
from .common.date import valid_year
from .common.words import iter_words


class EnlargeGroupMatches(CustomRule):
    """
    Enlarge matches that are starting and/or ending group to include brackets in their span.
    """
    priority = PRE_PROCESS

    def when(self, matches, context):
        starting = []
        ending = []

        for group in matches.markers.named('group'):
            for match in matches.starting(group.start + 1):
                starting.append(match)

            for match in matches.ending(group.end - 1):
                ending.append(match)

        if starting or ending:
            return starting, ending
        return False

    def then(self, matches, when_response, context):
        starting, ending = when_response
        for match in starting:
            matches.remove(match)
            match.start -= 1
            match.raw_start += 1
            matches.append(match)

        for match in ending:
            matches.remove(match)
            match.end += 1
            match.raw_end -= 1
            matches.append(match)


class EquivalentHoles(Rule):
    """
    Creates equivalent matches for holes that have same values than existing (case insensitive)
    """
    priority = POST_PROCESS
    consequence = AppendMatch

    def when(self, matches, context):
        new_matches = []

        for filepath in marker_sorted(matches.markers.named('path'), matches):
            holes = matches.holes(start=filepath.start, end=filepath.end, formatter=cleanup)
            for name in matches.names:
                for hole in list(holes):
                    for current_match in matches.named(name):
                        if isinstance(current_match.value, six.string_types) and \
                                        hole.value.lower() == current_match.value.lower():
                            if 'equivalent-ignore' in current_match.tags:
                                continue
                            new_value = _preferred_string(hole.value, current_match.value)
                            if hole.value != new_value:
                                hole.value = new_value
                            if current_match.value != new_value:
                                current_match.value = new_value
                            hole.name = name
                            hole.tags = ['equivalent']
                            new_matches.append(hole)
                            if hole in holes:
                                holes.remove(hole)

        return new_matches


class RemoveAmbiguous(Rule):
    """
    If multiple matches are found with same name and different values, keep the one in the most valuable filepart.
    Also keep others match with same name and values than those kept ones.
    """

    priority = POST_PROCESS
    consequence = RemoveMatch

    def __init__(self, sort_function=marker_sorted, predicate=None):
        super(RemoveAmbiguous, self).__init__()
        self.sort_function = sort_function
        self.predicate = predicate

    def when(self, matches, context):
        fileparts = self.sort_function(matches.markers.named('path'), matches)

        previous_fileparts_names = set()
        values = defaultdict(list)

        to_remove = []
        for filepart in fileparts:
            filepart_matches = matches.range(filepart.start, filepart.end, predicate=self.predicate)

            filepart_names = set()
            for match in filepart_matches:
                filepart_names.add(match.name)
                if match.name in previous_fileparts_names:
                    if match.value not in values[match.name]:
                        to_remove.append(match)
                else:
                    if match.value not in values[match.name]:
                        values[match.name].append(match.value)

            previous_fileparts_names.update(filepart_names)

        return to_remove


class RemoveLessSpecificSeasonEpisode(RemoveAmbiguous):
    """
    If multiple season/episodes matches are found with different values,
    keep the one tagged as 'SxxExx' or in the rightmost filepart.
    """
    def __init__(self, name):
        super(RemoveLessSpecificSeasonEpisode, self).__init__(
            sort_function=(lambda markers, matches:
                           marker_sorted(list(reversed(markers)), matches,
                                         lambda match: match.name == name and 'SxxExx' in match.tags)),
            predicate=lambda match: match.name == name)


def _preferred_string(value1, value2):  # pylint:disable=too-many-return-statements
    """
    Retrieves preferred title from both values.
    :param value1:
    :type value1: str
    :param value2:
    :type value2: str
    :return: The preferred title
    :rtype: str
    """
    if value1 == value2:
        return value1
    if value1.istitle() and not value2.istitle():
        return value1
    if not value1.isupper() and value2.isupper():
        return value1
    if not value1.isupper() and value1[0].isupper() and not value2[0].isupper():
        return value1
    if _count_title_words(value1) > _count_title_words(value2):
        return value1
    return value2


def _count_title_words(value):
    """
    Count only many words are titles in value.
    :param value:
    :type value:
    :return:
    :rtype:
    """
    ret = 0
    for word in iter_words(value):
        if word.value.istitle():
            ret += 1
    return ret


class SeasonYear(Rule):
    """
    If a season is a valid year and no year was found, create an match with year.
    """
    priority = POST_PROCESS
    consequence = AppendMatch

    def when(self, matches, context):
        ret = []
        if not matches.named('year'):
            for season in matches.named('season'):
                if valid_year(season.value):
                    year = copy.copy(season)
                    year.name = 'year'
                    ret.append(year)
        return ret


class YearSeason(Rule):
    """
    If a year is found, no season found, and episode is found, create an match with season.
    """
    priority = POST_PROCESS
    consequence = AppendMatch

    def when(self, matches, context):
        ret = []
        if not matches.named('season') and matches.named('episode'):
            for year in matches.named('year'):
                season = copy.copy(year)
                season.name = 'season'
                ret.append(season)
        return ret


class Processors(CustomRule):
    """
    Empty rule for ordering post_processing properly.
    """
    priority = POST_PROCESS

    def when(self, matches, context):
        pass

    def then(self, matches, when_response, context):  # pragma: no cover
        pass


class StripSeparators(CustomRule):
    """
    Strip separators from matches. Keep separators if they are from acronyms, like in ".S.H.I.E.L.D."
    """
    priority = POST_PROCESS

    def when(self, matches, context):
        return matches

    def then(self, matches, when_response, context):  # pragma: no cover
        for match in matches:
            for _ in range(0, len(match.span)):
                if match.raw[0] in seps_no_groups and (len(match.raw) < 3 or match.raw[2] not in seps_no_groups):
                    match.raw_start += 1

            for _ in reversed(range(0, len(match.span))):
                if match.raw[-1] in seps_no_groups and (len(match.raw) < 3 or match.raw[-3] not in seps_no_groups):
                    match.raw_end -= 1


def processors(config):  # pylint:disable=unused-argument
    """
    Builder for rebulk object.

    :param config: rule configuration
    :type config: dict
    :return: Created Rebulk object
    :rtype: Rebulk
    """
    return Rebulk().rules(EnlargeGroupMatches, EquivalentHoles,
                          RemoveLessSpecificSeasonEpisode('season'),
                          RemoveLessSpecificSeasonEpisode('episode'),
                          RemoveAmbiguous, SeasonYear, YearSeason, Processors, StripSeparators)