diff options
author | panni <[email protected]> | 2018-10-31 17:08:29 +0100 |
---|---|---|
committer | panni <[email protected]> | 2018-10-31 17:08:29 +0100 |
commit | 8f584143f8afc46a75a83dab5243739772e3562b (patch) | |
tree | c7dae21e993880af8bee71ad7b5a63f2977db577 /libs/enzyme | |
parent | 4beaeaa99e84bbe1ed87d0466a55a22ba25c8437 (diff) | |
download | bazarr-8f584143f8afc46a75a83dab5243739772e3562b.tar.gz bazarr-8f584143f8afc46a75a83dab5243739772e3562b.zip |
update deps
Diffstat (limited to 'libs/enzyme')
-rw-r--r-- | libs/enzyme/HISTORY.rst | 33 | ||||
-rw-r--r-- | libs/enzyme/LICENSE | 13 | ||||
-rw-r--r-- | libs/enzyme/README.rst | 27 | ||||
-rw-r--r-- | libs/enzyme/__init__.py | 1 | ||||
-rw-r--r-- | libs/enzyme/mkv.py | 65 | ||||
-rw-r--r-- | libs/enzyme/parsers/ebml/core.py | 64 | ||||
-rw-r--r-- | libs/enzyme/subtitle.py | 185 | ||||
-rw-r--r-- | libs/enzyme/tests/__init__.py | 6 | ||||
-rw-r--r-- | libs/enzyme/tests/test_mkv.py | 42 | ||||
-rw-r--r-- | libs/enzyme/tests/test_parsers.py | 15 | ||||
-rw-r--r-- | libs/enzyme/tests/test_subtitle.py | 86 |
11 files changed, 461 insertions, 76 deletions
diff --git a/libs/enzyme/HISTORY.rst b/libs/enzyme/HISTORY.rst new file mode 100644 index 000000000..5cb844eb4 --- /dev/null +++ b/libs/enzyme/HISTORY.rst @@ -0,0 +1,33 @@ +Changelog +========= + +0.4.1 +----- +**release date:** 2013-11-05 + +* Fix parsing nested SeekHead elements +* Make parsing nested SeekHead elements optional + + +0.4.0 +----- +**release date:** 2013-10-30 + +* Import exceptions under enzyme namespace +* Change repr format +* Rename base exception +* Remove test file + + +0.3.1 +----- +**release date:** 2013-10-20 + +* Fix package distribution + + +0.3 +--- +**release date:** 2013-05-18 + +* Complete refactoring, for the old enzyme see https://github.com/Diaoul/enzyme-old diff --git a/libs/enzyme/LICENSE b/libs/enzyme/LICENSE new file mode 100644 index 000000000..32c6448c1 --- /dev/null +++ b/libs/enzyme/LICENSE @@ -0,0 +1,13 @@ +Copyright 2013 Antoine Bertin + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/libs/enzyme/README.rst b/libs/enzyme/README.rst new file mode 100644 index 000000000..6d4fc21e3 --- /dev/null +++ b/libs/enzyme/README.rst @@ -0,0 +1,27 @@ +Enzyme +====== + +Enzyme is a Python module to parse video metadata. + +.. image:: https://travis-ci.org/Diaoul/enzyme.png?branch=master + :target: https://travis-ci.org/Diaoul/enzyme + + +Usage +----- +Parse a MKV file:: + + >>> with open('How.I.Met.Your.Mother.S08E21.720p.HDTV.X264-DIMENSION.mkv', 'rb') as f: + ... mkv = enzyme.MKV(f) + ... + >>> mkv.info + <Info [title=None, duration=0:20:56.005000, date=2013-04-15 14:06:50]> + >>> mkv.video_tracks + [<VideoTrack [1, 1280x720, V_MPEG4/ISO/AVC, name=None, language=eng]>] + >>> mkv.audio_tracks + [<AudioTrack [2, 6 channel(s), 48000Hz, A_AC3, name=None, language=und]>] + + +License +------- +Apache2 diff --git a/libs/enzyme/__init__.py b/libs/enzyme/__init__.py index 3bd89f336..04f616c80 100644 --- a/libs/enzyme/__init__.py +++ b/libs/enzyme/__init__.py @@ -8,6 +8,7 @@ __copyright__ = 'Copyright 2013 Antoine Bertin' import logging from .exceptions import * from .mkv import * +from .subtitle import * logging.getLogger(__name__).addHandler(logging.NullHandler()) diff --git a/libs/enzyme/mkv.py b/libs/enzyme/mkv.py index f90c043f4..1670a6920 100644 --- a/libs/enzyme/mkv.py +++ b/libs/enzyme/mkv.py @@ -65,30 +65,53 @@ class MKV(object): continue if element_name == 'Info': logger.info('Processing element %s from SeekHead at position %d', element_name, element_position) - stream.seek(element_position) - self.info = Info.fromelement(ebml.parse_element(stream, specs, True, ignore_element_names=['Void', 'CRC-32'])) + element = self._load_element(stream, specs, element_position) + self.info = Info.fromelement(element) elif element_name == 'Tracks': logger.info('Processing element %s from SeekHead at position %d', element_name, element_position) - stream.seek(element_position) - tracks = ebml.parse_element(stream, specs, True, ignore_element_names=['Void', 'CRC-32']) + tracks = self._load_element(stream, specs, element_position) self.video_tracks.extend([VideoTrack.fromelement(t) for t in tracks if t['TrackType'].data == VIDEO_TRACK]) self.audio_tracks.extend([AudioTrack.fromelement(t) for t in tracks if t['TrackType'].data == AUDIO_TRACK]) self.subtitle_tracks.extend([SubtitleTrack.fromelement(t) for t in tracks if t['TrackType'].data == SUBTITLE_TRACK]) elif element_name == 'Chapters': logger.info('Processing element %s from SeekHead at position %d', element_name, element_position) - stream.seek(element_position) - self.chapters.extend([Chapter.fromelement(c) for c in ebml.parse_element(stream, specs, True, ignore_element_names=['Void', 'CRC-32'])[0] if c.name == 'ChapterAtom']) + element = self._load_element(stream, specs, element_position) + self.chapters.extend([Chapter.fromelement(c) for c in element[0] if c.name == 'ChapterAtom']) elif element_name == 'Tags': logger.info('Processing element %s from SeekHead at position %d', element_name, element_position) - stream.seek(element_position) - self.tags.extend([Tag.fromelement(t) for t in ebml.parse_element(stream, specs, True, ignore_element_names=['Void', 'CRC-32'])]) + element = self._load_element(stream, specs, element_position) + self.tags.extend([Tag.fromelement(t) for t in element]) elif element_name == 'SeekHead' and self.recurse_seek_head: logger.info('Processing element %s from SeekHead at position %d', element_name, element_position) - stream.seek(element_position) - self._parse_seekhead(ebml.parse_element(stream, specs, True, ignore_element_names=['Void', 'CRC-32']), segment, stream, specs) + element = self._load_element(stream, specs, element_position) + self._parse_seekhead(element, segment, stream, specs) else: logger.debug('Element %s ignored', element_name) self._parsed_positions.add(element_position) + + def _load_element(self,stream, specs, position): + stream.seek(position) + element = ebml.parse_element(stream,specs) + element.load(stream, specs, ignore_element_names=['Void', 'CRC-32']) + return element + + def get_srt_subtitles_track_by_language(self): + """get a dictionary of the SRT subtitles track id's indexed by language""" + + subtitles = dict() + for track in self.subtitle_tracks: + + logger.info("Found subtitle language %s, with codec %s and lacing %s", + track.language,track.codec_id,track.lacing) + + if not track.is_srt(): + logger.debug("Ignoring subtitle language %s with codec %s",track.language,track.codec_id) + elif track.lacing: + logger.info("Ignoring subtitle language %s with lacing %s",track.language,track.lacing) + else: + subtitles[track.language] = track + + return subtitles def to_dict(self): return {'info': self.info.__dict__, 'video_tracks': [t.__dict__ for t in self.video_tracks], @@ -103,6 +126,7 @@ class Info(object): """Object for the Info EBML element""" def __init__(self, title=None, duration=None, date_utc=None, timecode_scale=None, muxing_app=None, writing_app=None): self.title = title + self.timecode_scale = timecode_scale self.duration = timedelta(microseconds=duration * (timecode_scale or 1000000) // 1000) if duration else None self.date_utc = date_utc self.muxing_app = muxing_app @@ -119,7 +143,7 @@ class Info(object): title = element.get('Title') duration = element.get('Duration') date_utc = element.get('DateUTC') - timecode_scale = element.get('TimecodeScale') + timecode_scale = element.get('TimecodeScale',1000000) muxing_app = element.get('MuxingApp') writing_app = element.get('WritingApp') return cls(title, duration, date_utc, timecode_scale, muxing_app, writing_app) @@ -133,7 +157,7 @@ class Info(object): class Track(object): """Base object for the Tracks EBML element""" - def __init__(self, type=None, number=None, name=None, language=None, enabled=None, default=None, forced=None, lacing=None, # @ReservedAssignment + def __init__(self, type=None, number=None, name=None, language=None, enabled=None, default=None, forced=None, lacing=None, codec_id=None, codec_name=None): self.type = type self.number = number @@ -154,10 +178,10 @@ class Track(object): :type element: :class:`~enzyme.parsers.ebml.Element` """ - type = element.get('TrackType') # @ReservedAssignment + type = element.get('TrackType') number = element.get('TrackNumber', 0) name = element.get('Name') - language = element.get('Language', 'eng') + language = element.get('Language','eng') enabled = bool(element.get('FlagEnabled', 1)) default = bool(element.get('FlagDefault', 1)) forced = bool(element.get('FlagForced', 0)) @@ -201,7 +225,7 @@ class VideoTrack(Track): videotrack.width = element['Video'].get('PixelWidth', 0) videotrack.height = element['Video'].get('PixelHeight', 0) videotrack.interlaced = bool(element['Video'].get('FlagInterlaced', False)) - videotrack.stereo_mode = element['Video'].get('StereoMode', 0) + videotrack.stereo_mode = element['Video'].get('StereoMode') videotrack.crop = {} if 'PixelCropTop' in element['Video']: videotrack.crop['top'] = element['Video']['PixelCropTop'] @@ -211,10 +235,10 @@ class VideoTrack(Track): videotrack.crop['left'] = element['Video']['PixelCropLeft'] if 'PixelCropRight' in element['Video']: videotrack.crop['right'] = element['Video']['PixelCropRight'] - videotrack.display_unit = element['Video'].get('DisplayUnit') videotrack.display_width = element['Video'].get('DisplayWidth') videotrack.display_height = element['Video'].get('DisplayHeight') - videotrack.aspect_ratio_type = element['Video'].get('AspectRatioType', 0) + videotrack.display_unit = element['Video'].get('DisplayUnit') + videotrack.aspect_ratio_type = element['Video'].get('AspectRatioType') return videotrack def __repr__(self): @@ -245,7 +269,7 @@ class AudioTrack(Track): audiotrack = super(AudioTrack, cls).fromelement(element) audiotrack.sampling_frequency = element['Audio'].get('SamplingFrequency', 8000.0) audiotrack.channels = element['Audio'].get('Channels', 1) - audiotrack.output_sampling_frequency = element['Audio'].get('OutputSamplingFrequency', audiotrack.sampling_frequency) + audiotrack.output_sampling_frequency = element['Audio'].get('OutputSamplingFrequency') audiotrack.bit_depth = element['Audio'].get('BitDepth') return audiotrack @@ -256,8 +280,9 @@ class AudioTrack(Track): class SubtitleTrack(Track): """Object for the Tracks EBML element with :data:`SUBTITLE_TRACK` TrackType""" - pass - + + def is_srt(self): + return self.codec_id == 'S_TEXT/UTF8' class Tag(object): """Object for the Tag EBML element""" diff --git a/libs/enzyme/parsers/ebml/core.py b/libs/enzyme/parsers/ebml/core.py index ae025ac73..ef2dec8b2 100644 --- a/libs/enzyme/parsers/ebml/core.py +++ b/libs/enzyme/parsers/ebml/core.py @@ -38,8 +38,15 @@ READERS = { BINARY: read_element_binary } +class BaseElement(object): -class Element(object): + def __init__(self, id=None, position=None, size=None, data=None): + self.id = id + self.position = position + self.size = size + self.data = data + +class Element(BaseElement): """Base object of EBML :param int id: id of the element, best represented as hexadecimal (0x18538067 for Matroska Segment element) @@ -52,14 +59,11 @@ class Element(object): :param data: data as read by the corresponding :data:`READERS` """ - def __init__(self, id=None, type=None, name=None, level=None, position=None, size=None, data=None): # @ReservedAssignment - self.id = id + def __init__(self, id=None, type=None, name=None, level=None, position=None, size=None, data=None): + super(Element, self).__init__(id, position, size, data) self.type = type self.name = name self.level = level - self.position = position - self.size = size - self.data = data def __repr__(self): return '<%s [%s, %r]>' % (self.__class__.__name__, self.name, self.data) @@ -89,7 +93,7 @@ class MasterElement(Element): Element(DocType, u'matroska') """ - def __init__(self, id=None, name=None, level=None, position=None, size=None, data=None): # @ReservedAssignment + def __init__(self, id=None, name=None, level=None, position=None, size=None, data=None): super(MasterElement, self).__init__(id, MASTER, name, level, position, size, data) def load(self, stream, specs, ignore_element_types=None, ignore_element_names=None, max_level=None): @@ -137,8 +141,7 @@ class MasterElement(Element): def __iter__(self): return iter(self.data) - -def parse(stream, specs, size=None, ignore_element_types=None, ignore_element_names=None, max_level=None): +def parse(stream, specs, size=None, ignore_element_types=None, ignore_element_names=None, max_level=None, include_element_names=None): """Parse a stream for `size` bytes according to the `specs` :param stream: file-like object from which to read @@ -148,6 +151,7 @@ def parse(stream, specs, size=None, ignore_element_types=None, ignore_element_na :param list ignore_element_types: list of element types to ignore :param list ignore_element_names: list of element names to ignore :param int max_level: maximum level of elements + :param list include_element_names: list of element names to include exclusively, so ignoring all other element names :return: parsed data as a tree of :class:`~enzyme.parsers.ebml.core.Element` :rtype: list @@ -158,26 +162,36 @@ def parse(stream, specs, size=None, ignore_element_types=None, ignore_element_na """ ignore_element_types = ignore_element_types if ignore_element_types is not None else [] ignore_element_names = ignore_element_names if ignore_element_names is not None else [] + include_element_names = include_element_names if include_element_names is not None else [] start = stream.tell() elements = [] while size is None or stream.tell() - start < size: try: element = parse_element(stream, specs) - if element is None: + if not element or not hasattr(element, "type"): + stream.seek(element.size, 1) continue - logger.debug('%s %s parsed', element.__class__.__name__, element.name) - if element.type in ignore_element_types or element.name in ignore_element_names: - logger.info('%s %s ignored', element.__class__.__name__, element.name) - if element.type == MASTER: - stream.seek(element.size, 1) + + if element.type is None: + logger.error('Element with id 0x%x is not in the specs' % element.id) + stream.seek(element.size, 1) + continue + elif element.type in ignore_element_types or element.name in ignore_element_names: + logger.info('%s %s %s ignored', element.__class__.__name__, element.name, element.type) + stream.seek(element.size, 1) continue - if element.type == MASTER: + elif len(include_element_names) > 0 and element.name not in include_element_names: + stream.seek(element.size, 1) + continue + elif element.type == MASTER: if max_level is not None and element.level >= max_level: logger.info('Maximum level %d reached for children of %s %s', max_level, element.__class__.__name__, element.name) stream.seek(element.size, 1) else: logger.debug('Loading child elements for %s %s with size %d', element.__class__.__name__, element.name, element.size) - element.data = parse(stream, specs, element.size, ignore_element_types, ignore_element_names, max_level) + element.data = parse(stream, specs, element.size, ignore_element_types, ignore_element_names, max_level,include_element_names) + else: + element.data = READERS[element.type](stream, element.size) elements.append(element) except ReadError: if size is not None: @@ -186,21 +200,15 @@ def parse(stream, specs, size=None, ignore_element_types=None, ignore_element_na return elements -def parse_element(stream, specs, load_children=False, ignore_element_types=None, ignore_element_names=None, max_level=None): +def parse_element(stream, specs): """Extract a single :class:`Element` from the `stream` according to the `specs` :param stream: file-like object from which to read :param dict specs: see :ref:`specs` - :param bool load_children: load children elements if the parsed element is a :class:`MasterElement` - :param list ignore_element_types: list of element types to ignore - :param list ignore_element_names: list of element names to ignore - :param int max_level: maximum level for children elements :return: the parsed element :rtype: :class:`Element` """ - ignore_element_types = ignore_element_types if ignore_element_types is not None else [] - ignore_element_names = ignore_element_names if ignore_element_names is not None else [] element_id = read_element_id(stream) if element_id is None: raise ReadError('Cannot read element id') @@ -208,20 +216,14 @@ def parse_element(stream, specs, load_children=False, ignore_element_types=None, if element_size is None: raise ReadError('Cannot read element size') if element_id not in specs: - logger.error('Element with id 0x%x is not in the specs' % element_id) - stream.seek(element_size, 1) - return None + return BaseElement(element_id,stream.tell(),element_size) element_type, element_name, element_level = specs[element_id] if element_type == MASTER: element = MasterElement(element_id, element_name, element_level, stream.tell(), element_size) - if load_children: - element.data = parse(stream, specs, element.size, ignore_element_types, ignore_element_names, max_level) else: element = Element(element_id, element_type, element_name, element_level, stream.tell(), element_size) - element.data = READERS[element_type](stream, element_size) return element - def get_matroska_specs(webm_only=False): """Get the Matroska specs diff --git a/libs/enzyme/subtitle.py b/libs/enzyme/subtitle.py new file mode 100644 index 000000000..f33690858 --- /dev/null +++ b/libs/enzyme/subtitle.py @@ -0,0 +1,185 @@ +# -*- coding: utf-8 -*- +from .exceptions import ReadError +from .parsers import ebml +from .mkv import MKV +from .parsers import ebml +import logging +import codecs +import os +import io + +__all__ = ['Subtitle'] +logger = logging.getLogger(__name__) + +class Subtitle(object): + """Subtitle extractor for Matroska Video File. + + Currently only SRT subtitles stored without lacing are supported + """ + + def __init__(self, stream): + """Read the available subtitles from a MKV file-like object""" + self._stream = stream + #Use the MKV class to parse the META information + mkv = MKV(stream) + self._timecode_scale = mkv.info.timecode_scale + self._subtitles = mkv.get_srt_subtitles_track_by_language() + + def has_subtitle(self, language): + return language in self._subtitles + + def write_subtitle_to_stream(self, language): + """Write a single subtitle to stream or return None if language not available""" + if language in self._subtitles: + subtitle = self._subtitles[language] + return _write_track_to_srt_stream(self._stream,subtitle.number,self._timecode_scale) + logger.info("Writing subtitle for language %s to stream",language) + else: + logger.info("Subtitle for language %s not found",language) + + def write_subtitles_to_stream(self): + """Write all available subtitles as streams to a dictionary with language as the key""" + subtitles = dict() + for language in self._subtitles: + subtitles[language] = self.write_subtitle_to_stream(language) + return subtitles + +def _write_track_to_srt_stream(mkv_stream, track, timecode_scale): + + srt_stream = io.StringIO() + index = 0 + for cluster in _parse_segment(mkv_stream,track): + for blockgroup in cluster.blockgroups: + index = index + 1 + timeRange = _print_time_range(timecode_scale,cluster.timecode,blockgroup.block.timecode,blockgroup.duration) + srt_stream.write(str(index) + '\n') + srt_stream.write(timeRange + '\n') + srt_stream.write(codecs.decode(blockgroup.block.data.read(),'utf-8') + '\n') + srt_stream.write('\n') + return srt_stream + +def _parse_segment(stream,track): + + stream.seek(0) + specs = ebml.get_matroska_specs() + + # Find all level 1 Cluster elements and its subelements. Speed up this process by excluding all other currently known level 1 elements + try: + segments = ebml.parse(stream, specs,include_element_names=['Segment','Cluster','BlockGroup','Timecode','Block','BlockDuration',],max_level=3) + except ReadError: + pass + + clusters = [] + for cluster in segments[0].data: + _parse_cluster(track, clusters, cluster) + return clusters + +def _parse_cluster(track, clusters, cluster): + + blockgroups = [] + timecode = None + for child in cluster.data: + if child.name == 'BlockGroup': + _parse_blockgroup(track, blockgroups, child) + elif child.name == 'Timecode': + timecode = child.data + + if len(blockgroups) > 0 and timecode != None: + clusters.append(Cluster(timecode, blockgroups)) + +def _parse_blockgroup(track, blockgroups, blockgroup): + + block = None + duration = None + for child in blockgroup.data: + if child.name == 'Block': + block = Block.fromelement(child) + if block.track != track: + block = None + elif child.name == 'BlockDuration': + duration = child.data + + if duration != None and block != None: + blockgroups.append(BlockGroup(block, duration)) + +def _print_time_range(timecode_scale,clusterTimecode,blockTimecode,duration): + + timecode_scale_ms = timecode_scale / 1000000 #Timecode + rawTimecode = clusterTimecode + blockTimecode + startTimeMilleSeconds = (rawTimecode) * timecode_scale_ms + endTimeMilleSeconds = (rawTimecode + duration) * timecode_scale_ms + + return _print_time(startTimeMilleSeconds) + " --> " + _print_time(endTimeMilleSeconds) + +def _print_time(timeInMilleSeconds): + + timeInSeconds, milleSeconds = divmod(timeInMilleSeconds, 1000) + timeInMinutes, seconds = divmod(timeInSeconds, 60) + hours, minutes = divmod(timeInMinutes, 60) + + return '%d:%02d:%02d,%d' % (hours,minutes,seconds,milleSeconds) + +class Cluster(object): + + def __init__(self,timecode=None, blockgroups=[]): + self.timecode = timecode + self.blockgroups = blockgroups + +class BlockGroup(object): + + def __init__(self,block=None,duration=None): + self.block = block + self.duration = duration + +class Block(object): + + def __init__(self, track=None, timecode=None, invisible=False, lacing=None, flags=None, data=None): + self.track = track + self.timecode = timecode + self.invisible = invisible + self.lacing = lacing + self.flags = flags + self.data = data + + @classmethod + def fromelement(cls,element): + stream = element.data + track = ebml.read_element_size(stream) + timecode = ebml.read_element_integer(stream,2) + flags = ord(stream.read(1)) + + invisible = bool(flags & 0x8) + + if (flags & 0x6): + lacing = 'EBML' + elif (flags & 0x4): + lacing = 'fixed-size' + elif (flags & 0x2): + lacing = 'Xiph' + else: + lacing = None + + if lacing: + raise ReadError('Laced blocks are not implemented yet') + + data = ebml.read_element_binary(stream, element.size - stream.tell()) + return cls(track,timecode,invisible,lacing,flags,data) + + def __repr__(self): + return '<%s track=%d, timecode=%d, invisible=%d, lacing=%s>' % (self.__class__.__name__, self.track,self.timecode,self.invisible,self.lacing) + +class SimpleBlock(Block): + + def __init__(self, track=None, timecode=None, keyframe=False, invisible=False, lacing=None, flags=None, data=None, discardable=False): + super(SimpleBlock,self).__init__(track,timecode,invisible,lacing,flags,data) + self.keyframe = keyframe + self.discardable = discardable + + def fromelement(cls,element): + simpleblock = super(SimpleBlock, cls).fromelement(element) + simpleblock.keyframe = bool(simpleblock.flags & 0x80) + simpleblock.discardable = bool(simpleblock.flags & 0x1) + return simpleblock + + def __repr__(self): + return '<%s track=%d, timecode=%d, keyframe=%d, invisible=%d, lacing=%s, discardable=%d>' % (self.__class__.__name__, self.track,self.timecode,self.keyframe,self.invisible,self.lacing,self.discardable)
\ No newline at end of file diff --git a/libs/enzyme/tests/__init__.py b/libs/enzyme/tests/__init__.py index 426d3598f..739abe747 100644 --- a/libs/enzyme/tests/__init__.py +++ b/libs/enzyme/tests/__init__.py @@ -1,9 +1,11 @@ # -*- coding: utf-8 -*- -from . import test_mkv, test_parsers +from . import test_mkv, test_parsers, test_subtitle import unittest -suite = unittest.TestSuite([test_mkv.suite(), test_parsers.suite()]) +suite = unittest.TestSuite([test_mkv.suite(), test_parsers.suite(), test_subtitle.suite()]) + + if __name__ == '__main__': diff --git a/libs/enzyme/tests/test_mkv.py b/libs/enzyme/tests/test_mkv.py index ac4617fa3..71aceee30 100644 --- a/libs/enzyme/tests/test_mkv.py +++ b/libs/enzyme/tests/test_mkv.py @@ -44,12 +44,12 @@ class MKVTestCase(unittest.TestCase): self.assertTrue(mkv.video_tracks[0].width == 854) self.assertTrue(mkv.video_tracks[0].height == 480) self.assertTrue(mkv.video_tracks[0].interlaced == False) - self.assertTrue(mkv.video_tracks[0].stereo_mode == 0) + self.assertTrue(mkv.video_tracks[0].stereo_mode is None) self.assertTrue(mkv.video_tracks[0].crop == {}) self.assertTrue(mkv.video_tracks[0].display_width is None) self.assertTrue(mkv.video_tracks[0].display_height is None) self.assertTrue(mkv.video_tracks[0].display_unit is None) - self.assertTrue(mkv.video_tracks[0].aspect_ratio_type == 0) + self.assertTrue(mkv.video_tracks[0].aspect_ratio_type is None) # audio track self.assertTrue(len(mkv.audio_tracks) == 1) self.assertTrue(mkv.audio_tracks[0].type == AUDIO_TRACK) @@ -64,7 +64,7 @@ class MKVTestCase(unittest.TestCase): self.assertTrue(mkv.audio_tracks[0].codec_name is None) self.assertTrue(mkv.audio_tracks[0].sampling_frequency == 48000.0) self.assertTrue(mkv.audio_tracks[0].channels == 2) - self.assertTrue(mkv.audio_tracks[0].output_sampling_frequency == 48000.0) + self.assertTrue(mkv.audio_tracks[0].output_sampling_frequency is None) self.assertTrue(mkv.audio_tracks[0].bit_depth is None) # subtitle track self.assertTrue(len(mkv.subtitle_tracks) == 0) @@ -113,12 +113,12 @@ class MKVTestCase(unittest.TestCase): self.assertTrue(mkv.video_tracks[0].width == 1024) self.assertTrue(mkv.video_tracks[0].height == 576) self.assertTrue(mkv.video_tracks[0].interlaced == False) - self.assertTrue(mkv.video_tracks[0].stereo_mode == 0) + self.assertTrue(mkv.video_tracks[0].stereo_mode is None) self.assertTrue(mkv.video_tracks[0].crop == {}) self.assertTrue(mkv.video_tracks[0].display_width == 1354) self.assertTrue(mkv.video_tracks[0].display_height is None) self.assertTrue(mkv.video_tracks[0].display_unit is None) - self.assertTrue(mkv.video_tracks[0].aspect_ratio_type == 0) + self.assertTrue(mkv.video_tracks[0].aspect_ratio_type is None) # audio track self.assertTrue(len(mkv.audio_tracks) == 1) self.assertTrue(mkv.audio_tracks[0].type == AUDIO_TRACK) @@ -133,7 +133,7 @@ class MKVTestCase(unittest.TestCase): self.assertTrue(mkv.audio_tracks[0].codec_name is None) self.assertTrue(mkv.audio_tracks[0].sampling_frequency == 48000.0) self.assertTrue(mkv.audio_tracks[0].channels == 2) - self.assertTrue(mkv.audio_tracks[0].output_sampling_frequency == 48000.0) + self.assertTrue(mkv.audio_tracks[0].output_sampling_frequency is None) self.assertTrue(mkv.audio_tracks[0].bit_depth is None) # subtitle track self.assertTrue(len(mkv.subtitle_tracks) == 0) @@ -182,12 +182,12 @@ class MKVTestCase(unittest.TestCase): self.assertTrue(mkv.video_tracks[0].width == 1024) self.assertTrue(mkv.video_tracks[0].height == 576) self.assertTrue(mkv.video_tracks[0].interlaced == False) - self.assertTrue(mkv.video_tracks[0].stereo_mode == 0) + self.assertTrue(mkv.video_tracks[0].stereo_mode is None) self.assertTrue(mkv.video_tracks[0].crop == {}) self.assertTrue(mkv.video_tracks[0].display_width is None) self.assertTrue(mkv.video_tracks[0].display_height is None) self.assertTrue(mkv.video_tracks[0].display_unit is None) - self.assertTrue(mkv.video_tracks[0].aspect_ratio_type == 0) + self.assertTrue(mkv.video_tracks[0].aspect_ratio_type is None) # audio track self.assertTrue(len(mkv.audio_tracks) == 1) self.assertTrue(mkv.audio_tracks[0].type == AUDIO_TRACK) @@ -202,7 +202,7 @@ class MKVTestCase(unittest.TestCase): self.assertTrue(mkv.audio_tracks[0].codec_name is None) self.assertTrue(mkv.audio_tracks[0].sampling_frequency == 48000.0) self.assertTrue(mkv.audio_tracks[0].channels == 2) - self.assertTrue(mkv.audio_tracks[0].output_sampling_frequency == 48000.0) + self.assertTrue(mkv.audio_tracks[0].output_sampling_frequency is None) self.assertTrue(mkv.audio_tracks[0].bit_depth is None) # subtitle track self.assertTrue(len(mkv.subtitle_tracks) == 0) @@ -251,12 +251,12 @@ class MKVTestCase(unittest.TestCase): self.assertTrue(mkv.video_tracks[0].width == 1024) self.assertTrue(mkv.video_tracks[0].height == 576) self.assertTrue(mkv.video_tracks[0].interlaced == False) - self.assertTrue(mkv.video_tracks[0].stereo_mode == 0) + self.assertTrue(mkv.video_tracks[0].stereo_mode is None) self.assertTrue(mkv.video_tracks[0].crop == {}) self.assertTrue(mkv.video_tracks[0].display_width == 1024) self.assertTrue(mkv.video_tracks[0].display_height == 576) self.assertTrue(mkv.video_tracks[0].display_unit is None) - self.assertTrue(mkv.video_tracks[0].aspect_ratio_type == 0) + self.assertTrue(mkv.video_tracks[0].aspect_ratio_type is None) # audio tracks self.assertTrue(len(mkv.audio_tracks) == 2) self.assertTrue(mkv.audio_tracks[0].type == AUDIO_TRACK) @@ -271,7 +271,7 @@ class MKVTestCase(unittest.TestCase): self.assertTrue(mkv.audio_tracks[0].codec_name is None) self.assertTrue(mkv.audio_tracks[0].sampling_frequency == 48000.0) self.assertTrue(mkv.audio_tracks[0].channels == 2) - self.assertTrue(mkv.audio_tracks[0].output_sampling_frequency == 48000.0) + self.assertTrue(mkv.audio_tracks[0].output_sampling_frequency is None) self.assertTrue(mkv.audio_tracks[0].bit_depth is None) self.assertTrue(mkv.audio_tracks[1].type == AUDIO_TRACK) self.assertTrue(mkv.audio_tracks[1].number == 10) @@ -414,12 +414,12 @@ class MKVTestCase(unittest.TestCase): self.assertTrue(mkv.video_tracks[0].width == 854) self.assertTrue(mkv.video_tracks[0].height == 480) self.assertTrue(mkv.video_tracks[0].interlaced == False) - self.assertTrue(mkv.video_tracks[0].stereo_mode == 0) + self.assertTrue(mkv.video_tracks[0].stereo_mode is None) self.assertTrue(mkv.video_tracks[0].crop == {}) self.assertTrue(mkv.video_tracks[0].display_width is None) self.assertTrue(mkv.video_tracks[0].display_height is None) self.assertTrue(mkv.video_tracks[0].display_unit is None) - self.assertTrue(mkv.video_tracks[0].aspect_ratio_type == 0) + self.assertTrue(mkv.video_tracks[0].aspect_ratio_type is None) # audio track self.assertTrue(len(mkv.audio_tracks) == 1) self.assertTrue(mkv.audio_tracks[0].type == AUDIO_TRACK) @@ -434,7 +434,7 @@ class MKVTestCase(unittest.TestCase): self.assertTrue(mkv.audio_tracks[0].codec_name is None) self.assertTrue(mkv.audio_tracks[0].sampling_frequency == 48000.0) self.assertTrue(mkv.audio_tracks[0].channels == 2) - self.assertTrue(mkv.audio_tracks[0].output_sampling_frequency == 48000.0) + self.assertTrue(mkv.audio_tracks[0].output_sampling_frequency is None) self.assertTrue(mkv.audio_tracks[0].bit_depth is None) # subtitle track self.assertTrue(len(mkv.subtitle_tracks) == 0) @@ -483,12 +483,12 @@ class MKVTestCase(unittest.TestCase): self.assertTrue(mkv.video_tracks[0].width == 1024) self.assertTrue(mkv.video_tracks[0].height == 576) self.assertTrue(mkv.video_tracks[0].interlaced == False) - self.assertTrue(mkv.video_tracks[0].stereo_mode == 0) + self.assertTrue(mkv.video_tracks[0].stereo_mode is None) self.assertTrue(mkv.video_tracks[0].crop == {}) self.assertTrue(mkv.video_tracks[0].display_width is None) self.assertTrue(mkv.video_tracks[0].display_height is None) self.assertTrue(mkv.video_tracks[0].display_unit is None) - self.assertTrue(mkv.video_tracks[0].aspect_ratio_type == 0) + self.assertTrue(mkv.video_tracks[0].aspect_ratio_type is None) # audio track self.assertTrue(len(mkv.audio_tracks) == 1) self.assertTrue(mkv.audio_tracks[0].type == AUDIO_TRACK) @@ -503,7 +503,7 @@ class MKVTestCase(unittest.TestCase): self.assertTrue(mkv.audio_tracks[0].codec_name is None) self.assertTrue(mkv.audio_tracks[0].sampling_frequency == 48000.0) self.assertTrue(mkv.audio_tracks[0].channels == 2) - self.assertTrue(mkv.audio_tracks[0].output_sampling_frequency == 48000.0) + self.assertTrue(mkv.audio_tracks[0].output_sampling_frequency is None) self.assertTrue(mkv.audio_tracks[0].bit_depth is None) # subtitle track self.assertTrue(len(mkv.subtitle_tracks) == 0) @@ -552,12 +552,12 @@ class MKVTestCase(unittest.TestCase): self.assertTrue(mkv.video_tracks[0].width == 1024) self.assertTrue(mkv.video_tracks[0].height == 576) self.assertTrue(mkv.video_tracks[0].interlaced == False) - self.assertTrue(mkv.video_tracks[0].stereo_mode == 0) + self.assertTrue(mkv.video_tracks[0].stereo_mode is None) self.assertTrue(mkv.video_tracks[0].crop == {}) self.assertTrue(mkv.video_tracks[0].display_width is None) self.assertTrue(mkv.video_tracks[0].display_height is None) self.assertTrue(mkv.video_tracks[0].display_unit is None) - self.assertTrue(mkv.video_tracks[0].aspect_ratio_type == 0) + self.assertTrue(mkv.video_tracks[0].aspect_ratio_type is None) # audio track self.assertTrue(len(mkv.audio_tracks) == 1) self.assertTrue(mkv.audio_tracks[0].type == AUDIO_TRACK) @@ -572,7 +572,7 @@ class MKVTestCase(unittest.TestCase): self.assertTrue(mkv.audio_tracks[0].codec_name is None) self.assertTrue(mkv.audio_tracks[0].sampling_frequency == 48000.0) self.assertTrue(mkv.audio_tracks[0].channels == 2) - self.assertTrue(mkv.audio_tracks[0].output_sampling_frequency == 48000.0) + self.assertTrue(mkv.audio_tracks[0].output_sampling_frequency is None) self.assertTrue(mkv.audio_tracks[0].bit_depth is None) # subtitle track self.assertTrue(len(mkv.subtitle_tracks) == 0) diff --git a/libs/enzyme/tests/test_parsers.py b/libs/enzyme/tests/test_parsers.py index 0fa320ce0..611aa9bef 100644 --- a/libs/enzyme/tests/test_parsers.py +++ b/libs/enzyme/tests/test_parsers.py @@ -33,7 +33,7 @@ class EBMLTestCase(unittest.TestCase): self.stream.close() def check_element(self, element_id, element_type, element_name, element_level, element_position, element_size, element_data, element, - ignore_element_types=None, ignore_element_names=None, max_level=None): + ignore_element_types=None, ignore_element_names=None, max_level=None, include_element_names=None): """Recursively check an element""" # base self.assertTrue(element.id == element_id) @@ -53,6 +53,8 @@ class EBMLTestCase(unittest.TestCase): element_data = [e for e in element_data if e[1] not in ignore_element_types] if ignore_element_names is not None: # filter validation on element names element_data = [e for e in element_data if e[2] not in ignore_element_names] + if include_element_names is not None: # filter validation on element names + element_data = [e for e in element_data if e[2] in include_element_names] if element.level == max_level: # special check when maximum level is reached self.assertTrue(element.data is None) return @@ -60,7 +62,7 @@ class EBMLTestCase(unittest.TestCase): for i in range(len(element.data)): self.check_element(element_data[i][0], element_data[i][1], element_data[i][2], element_data[i][3], element_data[i][4], element_data[i][5], element_data[i][6], element.data[i], ignore_element_types, - ignore_element_names, max_level) + ignore_element_names, max_level,include_element_names) def test_parse_full(self): result = ebml.parse(self.stream, self.specs) @@ -87,6 +89,15 @@ class EBMLTestCase(unittest.TestCase): self.check_element(self.validation[i][0], self.validation[i][1], self.validation[i][2], self.validation[i][3], self.validation[i][4], self.validation[i][5], self.validation[i][6], result[i], ignore_element_names=ignore_element_names) + def test_parse_include_element_names(self): + include_element_names = ['Segment','Cluster'] + result = ebml.parse(self.stream, self.specs, include_element_names=include_element_names) + self.validation = [e for e in self.validation if e[2] in include_element_names] + self.assertTrue(len(result) == len(self.validation)) + for i in range(len(self.validation)): + self.check_element(self.validation[i][0], self.validation[i][1], self.validation[i][2], self.validation[i][3], + self.validation[i][4], self.validation[i][5], self.validation[i][6], result[i], include_element_names=include_element_names) + def test_parse_max_level(self): max_level = 3 result = ebml.parse(self.stream, self.specs, max_level=max_level) diff --git a/libs/enzyme/tests/test_subtitle.py b/libs/enzyme/tests/test_subtitle.py new file mode 100644 index 000000000..a8fa0e7b2 --- /dev/null +++ b/libs/enzyme/tests/test_subtitle.py @@ -0,0 +1,86 @@ +# -*- coding: utf-8 -*- +from enzyme.subtitle import Subtitle, _print_time_range, _print_time +import unittest +import os +import io +import requests +import zipfile +import glob + +# Test directory +TEST_DIR = os.path.join(os.path.dirname(__file__), os.path.splitext(__file__)[0]) + +def setUpModule(): + if not os.path.exists(TEST_DIR): + r = requests.get('http://downloads.sourceforge.net/project/matroska/test_files/matroska_test_w1_1.zip') + with zipfile.ZipFile(io.BytesIO(r.content), 'r') as f: + f.extractall(TEST_DIR) + +class SubtitleTestCase(unittest.TestCase): + + @classmethod + def setUpClass(cls): + + file = 'test5.mkv' + stream = io.open(os.path.join(TEST_DIR, file), 'rb') + cls.subtitle = Subtitle(stream) + + def test_subtitles_found(self): + + subtitles = self.subtitle._subtitles + self.assertTrue('eng' in subtitles) + self.assertTrue('hun' in subtitles) + self.assertTrue('ger' in subtitles) + self.assertTrue('fre' in subtitles) + self.assertTrue('spa' in subtitles) + self.assertTrue('ita' in subtitles) + self.assertTrue('jpn' in subtitles) + self.assertTrue('und' in subtitles) + + def test_write_subtitle_to_stream(self): + + subtitle_stream = self.subtitle.write_subtitle_to_stream("eng") + self.assertIsInstance(subtitle_stream,io.StringIO,"Expecting a StringIO stream") + + def test_write_subtitle_to_stream(self): + + subtitle_streams = self.subtitle.write_subtitles_to_stream() + + self.assertIn("eng", subtitle_streams, "Expecting a subtitle stream for language eng") + self.assertIsInstance(subtitle_streams["eng"],io.StringIO,"Expecting a StringIO stream") + self.assertIn("hun", subtitle_streams, "Expecting a subtitle stream for language hun") + self.assertIsInstance(subtitle_streams["hun"],io.StringIO,"Expecting a StringIO stream") + self.assertIn("ger", subtitle_streams, "Expecting a subtitle stream for language ger") + self.assertIsInstance(subtitle_streams["ger"],io.StringIO,"Expecting a StringIO stream") + self.assertIn("fre", subtitle_streams, "Expecting a subtitle stream for language fre") + self.assertIsInstance(subtitle_streams["fre"],io.StringIO,"Expecting a StringIO stream") + self.assertIn("spa", subtitle_streams, "Expecting a subtitle stream for language spa") + self.assertIsInstance(subtitle_streams["spa"],io.StringIO,"Expecting a StringIO stream") + self.assertIn("ita", subtitle_streams, "Expecting a subtitle stream for language ita") + self.assertIsInstance(subtitle_streams["ita"],io.StringIO,"Expecting a StringIO stream") + self.assertIn("jpn", subtitle_streams, "Expecting a subtitle stream for language jpn") + self.assertIsInstance(subtitle_streams["jpn"],io.StringIO,"Expecting a StringIO stream") + + def test_print_time(self): + + self.assertEqual('0:00:00,0',_print_time(0)) + self.assertEqual('0:00:00,1',_print_time(1)) + self.assertEqual('0:00:00,999',_print_time(999)) + self.assertEqual('0:00:01,0',_print_time(1000)) + self.assertEqual('0:00:59,999',_print_time(1000*60-1)) + self.assertEqual('0:01:00,0',_print_time(1000*60)) + self.assertEqual('0:59:59,999',_print_time(1000*60*60-1)) + self.assertEqual('1:00:00,0',_print_time(1000*60*60)) + + def test_print_time_range(self): + + self.assertEqual('0:00:00,0 --> 0:00:00,0',_print_time_range(1000000,0,0,0)) + self.assertEqual('0:01:00,0 --> 0:01:01,0',_print_time_range(1000000,0,60000,1000)) + +def suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.TestLoader().loadTestsFromTestCase(SubtitleTestCase)) + return suite + +if __name__ == '__main__': + unittest.TextTestRunner().run(suite()) |