summaryrefslogtreecommitdiffhomepage
path: root/libs/subliminal_patch/providers/utils.py
blob: f06f75e34cb8484adfa2765cbfee49708345e810 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
from collections import namedtuple
from difflib import SequenceMatcher
import io
import logging
import os
import re
import tempfile
from typing import Iterable, Union
import zipfile

from guessit import guessit
import pysubs2
import rarfile
from subliminal.subtitle import fix_line_ending
from subliminal_patch.core import Episode
from subliminal_patch.subtitle import guess_matches

from ._agent_list import FIRST_THOUSAND_OR_SO_USER_AGENTS

logger = logging.getLogger(__name__)


_MatchingSub = namedtuple("_MatchingSub", ("file", "priority", "context"))


def _get_matching_sub(
    sub_names, forced=False, episode=None, episode_title=None, **kwargs
):
    guess_options = {"single_value": True}
    if episode is not None:
        guess_options["type"] = "episode"  # type: ignore

    matching_subs = []

    for sub_name in sub_names:
        if not forced and os.path.splitext(sub_name.lower())[0].endswith("forced"):
            logger.debug("Ignoring forced subtitle: %s", sub_name)
            continue

        # If it's a movie then get the first subtitle
        if episode is None and episode_title is None:
            logger.debug("Movie subtitle found: %s", sub_name)
            matching_subs.append(_MatchingSub(sub_name, 2, "Movie subtitle"))
            break

        guess = guessit(sub_name, options=guess_options)

        matched_episode_num = guess.get("episode")
        if matched_episode_num:
            logger.debug("No episode number found in file: %s", sub_name)

        if episode_title is not None:
            from_name = _analize_sub_name(sub_name, episode_title)
            if from_name is not None:
                matching_subs.append(from_name)

        if episode == matched_episode_num:
            logger.debug("Episode matched from number: %s", sub_name)
            matching_subs.append(_MatchingSub(sub_name, 2, "Episode number matched"))

    if matching_subs:
        matching_subs.sort(key=lambda x: x.priority, reverse=True)
        logger.debug("Matches: %s", matching_subs)
        return matching_subs[0].file
    else:
        logger.debug("Nothing matched")
        return None


def _analize_sub_name(sub_name: str, title_):
    titles = re.split(r"[.-]", os.path.splitext(sub_name)[0])
    for title in titles:
        title = title.strip()
        ratio = SequenceMatcher(None, title, title_).ratio()
        if ratio > 0.85:
            logger.debug(
                "Episode title matched: '%s' -> '%s' [%s]", title, sub_name, ratio
            )

            # Avoid false positives with short titles
            if len(title_) > 4 and ratio >= 0.98:
                return _MatchingSub(sub_name, 3, "Perfect title ratio")

            return _MatchingSub(sub_name, 1, "Normal title ratio")

    logger.debug("No episode title matched from file: %s", sub_name)
    return None


def get_subtitle_from_archive(
    archive, forced=False, episode=None, get_first_subtitle=False, **kwargs
):
    "Get subtitle from Rarfile/Zipfile object. Return None if nothing is found."
    subs_in_archive = [
        name
        for name in archive.namelist()
        if name.endswith((".srt", ".sub", ".ssa", ".ass"))
    ]

    if not subs_in_archive:
        logger.info("No subtitles found in archive")
        return None

    logger.debug("Subtitles in archive: %s", subs_in_archive)

    if len(subs_in_archive) == 1 or get_first_subtitle:
        logger.debug("Getting first subtitle in archive: %s", subs_in_archive)
        return fix_line_ending(archive.read(subs_in_archive[0]))

    matching_sub = _get_matching_sub(subs_in_archive, forced, episode, **kwargs)

    if matching_sub is not None:
        logger.info("Using %s from archive", matching_sub)
        return fix_line_ending(archive.read(matching_sub))

    logger.debug("No subtitle found in archive")
    return None


def is_episode(content):
    return "episode" in guessit(content, {"type": "episode"})


_ENCS = ("utf-8", "ascii", "iso-8859-1", "iso-8859-2", "iso-8859-5", "cp1252")


def _zip_from_subtitle_file(content):
    with tempfile.NamedTemporaryFile(prefix="spsub", suffix=".srt") as tmp_f:
        tmp_f.write(content)
        sub = None
        for enc in _ENCS:
            try:
                logger.debug("Trying %s encoding", enc)
                sub = pysubs2.load(tmp_f.name, encoding=enc)
            except Exception as error:
                logger.debug("%s: %s", type(error).__name__, error)
                continue
            else:
                break

        if sub is not None:
            logger.debug("Identified subtitle file: %s", sub)
            zip_obj = zipfile.ZipFile(io.BytesIO(), mode="x")
            zip_obj.write(tmp_f.name, os.path.basename(tmp_f.name))
            return zip_obj

        logger.debug("Couldn't load subtitle file")
        return None


def get_archive_from_bytes(content: bytes):
    """Get RarFile/ZipFile object from bytes. A ZipFile instance will be returned
    if a subtitle-like stream is found. Return None if something else is found."""
    archive_stream = io.BytesIO(content)

    if rarfile.is_rarfile(archive_stream):
        logger.debug("Identified rar archive")
        return rarfile.RarFile(archive_stream)
    elif zipfile.is_zipfile(archive_stream):
        logger.debug("Identified zip archive")
        return zipfile.ZipFile(archive_stream)

    logger.debug("No compression format found. Trying with subtitle-like files")
    return _zip_from_subtitle_file(content)


def update_matches(
    matches,
    video,
    release_info: Union[str, Iterable[str]],
    split="\n",
    **guessit_options
):
    """Update matches set from release info string or Iterable.

    Use the split parameter to iterate over the set delimiter; set None to avoid split."""

    guessit_options["type"] = "episode" if isinstance(video, Episode) else "movie"

    logger.debug("Guessit options to update matches: %s", guessit_options)

    if isinstance(release_info, str):
        release_info = release_info.split(split)

    for release in release_info:
        for release_split in release.split(split):
            logger.debug("Updating matches from release info: %s", release)
            matches |= guess_matches(
                video, guessit(release_split.strip(), guessit_options)
            )
            logger.debug("New matches: %s", matches)

    return matches