summaryrefslogtreecommitdiffhomepage
path: root/libs/subliminal_patch/providers/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'libs/subliminal_patch/providers/utils.py')
-rw-r--r--libs/subliminal_patch/providers/utils.py55
1 files changed, 45 insertions, 10 deletions
diff --git a/libs/subliminal_patch/providers/utils.py b/libs/subliminal_patch/providers/utils.py
index bf3aa856b..ea8e411e6 100644
--- a/libs/subliminal_patch/providers/utils.py
+++ b/libs/subliminal_patch/providers/utils.py
@@ -4,9 +4,12 @@ import io
import logging
import os
import re
+import tempfile
+from typing import Iterable, Union
import zipfile
from guessit import guessit
+import pysubs2
import rarfile
from subliminal.subtitle import fix_line_ending
from subliminal_patch.core import Episode
@@ -119,10 +122,10 @@ def is_episode(content):
def get_archive_from_bytes(content: bytes):
- """Get RarFile/ZipFile object from bytes. Return None is something else
- is found."""
- # open the archive
+ """Get RarFile/ZipFile object from bytes. A ZipFile instance will be returned
+ if a subtitle-like stream is found. Return None if something else is found."""
archive_stream = io.BytesIO(content)
+
if rarfile.is_rarfile(archive_stream):
logger.debug("Identified rar archive")
return rarfile.RarFile(archive_stream)
@@ -130,18 +133,50 @@ def get_archive_from_bytes(content: bytes):
logger.debug("Identified zip archive")
return zipfile.ZipFile(archive_stream)
- logger.debug("Unknown compression format")
+ logger.debug("No compression format found. Trying with subtitle-like files")
+
+ # If the file is a subtitle-like file
+ with tempfile.NamedTemporaryFile(prefix="spsub", suffix=".srt") as tmp_f:
+ try:
+ tmp_f.write(content)
+ sub = pysubs2.load(tmp_f.name)
+ except Exception as error:
+ logger.debug("Couldn't load file: '%s'", error)
+ else:
+ if sub is not None:
+ logger.debug("Identified subtitle file: %s", sub)
+ zip_obj = zipfile.ZipFile(io.BytesIO(), mode="x")
+ zip_obj.write(tmp_f.name, os.path.basename(tmp_f.name))
+ return zip_obj
+
+ logger.debug("Nothing found")
return None
-def update_matches(matches, video, release_info: str, **guessit_options):
- "Update matches set from release info string. New lines are iterated."
+def update_matches(
+ matches,
+ video,
+ release_info: Union[str, Iterable[str]],
+ split="\n",
+ **guessit_options
+):
+ """Update matches set from release info string or Iterable.
+
+ Use the split parameter to iterate over the set delimiter; set None to avoid split."""
+
guessit_options["type"] = "episode" if isinstance(video, Episode) else "movie"
+
logger.debug("Guessit options to update matches: %s", guessit_options)
- for release in release_info.split("\n"):
- logger.debug("Updating matches from release info: %s", release)
- matches |= guess_matches(video, guessit(release.strip(), guessit_options))
- logger.debug("New matches: %s", matches)
+ if isinstance(release_info, str):
+ release_info = release_info.split(split)
+
+ for release in release_info:
+ for release_split in release.split(split):
+ logger.debug("Updating matches from release info: %s", release)
+ matches |= guess_matches(
+ video, guessit(release_split.strip(), guessit_options)
+ )
+ logger.debug("New matches: %s", matches)
return matches