summaryrefslogtreecommitdiffhomepage
path: root/libs/enzyme
diff options
context:
space:
mode:
authorpanni <[email protected]>2018-10-31 17:08:29 +0100
committerpanni <[email protected]>2018-10-31 17:08:29 +0100
commit8f584143f8afc46a75a83dab5243739772e3562b (patch)
treec7dae21e993880af8bee71ad7b5a63f2977db577 /libs/enzyme
parent4beaeaa99e84bbe1ed87d0466a55a22ba25c8437 (diff)
downloadbazarr-8f584143f8afc46a75a83dab5243739772e3562b.tar.gz
bazarr-8f584143f8afc46a75a83dab5243739772e3562b.zip
update deps
Diffstat (limited to 'libs/enzyme')
-rw-r--r--libs/enzyme/HISTORY.rst33
-rw-r--r--libs/enzyme/LICENSE13
-rw-r--r--libs/enzyme/README.rst27
-rw-r--r--libs/enzyme/__init__.py1
-rw-r--r--libs/enzyme/mkv.py65
-rw-r--r--libs/enzyme/parsers/ebml/core.py64
-rw-r--r--libs/enzyme/subtitle.py185
-rw-r--r--libs/enzyme/tests/__init__.py6
-rw-r--r--libs/enzyme/tests/test_mkv.py42
-rw-r--r--libs/enzyme/tests/test_parsers.py15
-rw-r--r--libs/enzyme/tests/test_subtitle.py86
11 files changed, 461 insertions, 76 deletions
diff --git a/libs/enzyme/HISTORY.rst b/libs/enzyme/HISTORY.rst
new file mode 100644
index 000000000..5cb844eb4
--- /dev/null
+++ b/libs/enzyme/HISTORY.rst
@@ -0,0 +1,33 @@
+Changelog
+=========
+
+0.4.1
+-----
+**release date:** 2013-11-05
+
+* Fix parsing nested SeekHead elements
+* Make parsing nested SeekHead elements optional
+
+
+0.4.0
+-----
+**release date:** 2013-10-30
+
+* Import exceptions under enzyme namespace
+* Change repr format
+* Rename base exception
+* Remove test file
+
+
+0.3.1
+-----
+**release date:** 2013-10-20
+
+* Fix package distribution
+
+
+0.3
+---
+**release date:** 2013-05-18
+
+* Complete refactoring, for the old enzyme see https://github.com/Diaoul/enzyme-old
diff --git a/libs/enzyme/LICENSE b/libs/enzyme/LICENSE
new file mode 100644
index 000000000..32c6448c1
--- /dev/null
+++ b/libs/enzyme/LICENSE
@@ -0,0 +1,13 @@
+Copyright 2013 Antoine Bertin
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/libs/enzyme/README.rst b/libs/enzyme/README.rst
new file mode 100644
index 000000000..6d4fc21e3
--- /dev/null
+++ b/libs/enzyme/README.rst
@@ -0,0 +1,27 @@
+Enzyme
+======
+
+Enzyme is a Python module to parse video metadata.
+
+.. image:: https://travis-ci.org/Diaoul/enzyme.png?branch=master
+ :target: https://travis-ci.org/Diaoul/enzyme
+
+
+Usage
+-----
+Parse a MKV file::
+
+ >>> with open('How.I.Met.Your.Mother.S08E21.720p.HDTV.X264-DIMENSION.mkv', 'rb') as f:
+ ... mkv = enzyme.MKV(f)
+ ...
+ >>> mkv.info
+ <Info [title=None, duration=0:20:56.005000, date=2013-04-15 14:06:50]>
+ >>> mkv.video_tracks
+ [<VideoTrack [1, 1280x720, V_MPEG4/ISO/AVC, name=None, language=eng]>]
+ >>> mkv.audio_tracks
+ [<AudioTrack [2, 6 channel(s), 48000Hz, A_AC3, name=None, language=und]>]
+
+
+License
+-------
+Apache2
diff --git a/libs/enzyme/__init__.py b/libs/enzyme/__init__.py
index 3bd89f336..04f616c80 100644
--- a/libs/enzyme/__init__.py
+++ b/libs/enzyme/__init__.py
@@ -8,6 +8,7 @@ __copyright__ = 'Copyright 2013 Antoine Bertin'
import logging
from .exceptions import *
from .mkv import *
+from .subtitle import *
logging.getLogger(__name__).addHandler(logging.NullHandler())
diff --git a/libs/enzyme/mkv.py b/libs/enzyme/mkv.py
index f90c043f4..1670a6920 100644
--- a/libs/enzyme/mkv.py
+++ b/libs/enzyme/mkv.py
@@ -65,30 +65,53 @@ class MKV(object):
continue
if element_name == 'Info':
logger.info('Processing element %s from SeekHead at position %d', element_name, element_position)
- stream.seek(element_position)
- self.info = Info.fromelement(ebml.parse_element(stream, specs, True, ignore_element_names=['Void', 'CRC-32']))
+ element = self._load_element(stream, specs, element_position)
+ self.info = Info.fromelement(element)
elif element_name == 'Tracks':
logger.info('Processing element %s from SeekHead at position %d', element_name, element_position)
- stream.seek(element_position)
- tracks = ebml.parse_element(stream, specs, True, ignore_element_names=['Void', 'CRC-32'])
+ tracks = self._load_element(stream, specs, element_position)
self.video_tracks.extend([VideoTrack.fromelement(t) for t in tracks if t['TrackType'].data == VIDEO_TRACK])
self.audio_tracks.extend([AudioTrack.fromelement(t) for t in tracks if t['TrackType'].data == AUDIO_TRACK])
self.subtitle_tracks.extend([SubtitleTrack.fromelement(t) for t in tracks if t['TrackType'].data == SUBTITLE_TRACK])
elif element_name == 'Chapters':
logger.info('Processing element %s from SeekHead at position %d', element_name, element_position)
- stream.seek(element_position)
- self.chapters.extend([Chapter.fromelement(c) for c in ebml.parse_element(stream, specs, True, ignore_element_names=['Void', 'CRC-32'])[0] if c.name == 'ChapterAtom'])
+ element = self._load_element(stream, specs, element_position)
+ self.chapters.extend([Chapter.fromelement(c) for c in element[0] if c.name == 'ChapterAtom'])
elif element_name == 'Tags':
logger.info('Processing element %s from SeekHead at position %d', element_name, element_position)
- stream.seek(element_position)
- self.tags.extend([Tag.fromelement(t) for t in ebml.parse_element(stream, specs, True, ignore_element_names=['Void', 'CRC-32'])])
+ element = self._load_element(stream, specs, element_position)
+ self.tags.extend([Tag.fromelement(t) for t in element])
elif element_name == 'SeekHead' and self.recurse_seek_head:
logger.info('Processing element %s from SeekHead at position %d', element_name, element_position)
- stream.seek(element_position)
- self._parse_seekhead(ebml.parse_element(stream, specs, True, ignore_element_names=['Void', 'CRC-32']), segment, stream, specs)
+ element = self._load_element(stream, specs, element_position)
+ self._parse_seekhead(element, segment, stream, specs)
else:
logger.debug('Element %s ignored', element_name)
self._parsed_positions.add(element_position)
+
+ def _load_element(self,stream, specs, position):
+ stream.seek(position)
+ element = ebml.parse_element(stream,specs)
+ element.load(stream, specs, ignore_element_names=['Void', 'CRC-32'])
+ return element
+
+ def get_srt_subtitles_track_by_language(self):
+ """get a dictionary of the SRT subtitles track id's indexed by language"""
+
+ subtitles = dict()
+ for track in self.subtitle_tracks:
+
+ logger.info("Found subtitle language %s, with codec %s and lacing %s",
+ track.language,track.codec_id,track.lacing)
+
+ if not track.is_srt():
+ logger.debug("Ignoring subtitle language %s with codec %s",track.language,track.codec_id)
+ elif track.lacing:
+ logger.info("Ignoring subtitle language %s with lacing %s",track.language,track.lacing)
+ else:
+ subtitles[track.language] = track
+
+ return subtitles
def to_dict(self):
return {'info': self.info.__dict__, 'video_tracks': [t.__dict__ for t in self.video_tracks],
@@ -103,6 +126,7 @@ class Info(object):
"""Object for the Info EBML element"""
def __init__(self, title=None, duration=None, date_utc=None, timecode_scale=None, muxing_app=None, writing_app=None):
self.title = title
+ self.timecode_scale = timecode_scale
self.duration = timedelta(microseconds=duration * (timecode_scale or 1000000) // 1000) if duration else None
self.date_utc = date_utc
self.muxing_app = muxing_app
@@ -119,7 +143,7 @@ class Info(object):
title = element.get('Title')
duration = element.get('Duration')
date_utc = element.get('DateUTC')
- timecode_scale = element.get('TimecodeScale')
+ timecode_scale = element.get('TimecodeScale',1000000)
muxing_app = element.get('MuxingApp')
writing_app = element.get('WritingApp')
return cls(title, duration, date_utc, timecode_scale, muxing_app, writing_app)
@@ -133,7 +157,7 @@ class Info(object):
class Track(object):
"""Base object for the Tracks EBML element"""
- def __init__(self, type=None, number=None, name=None, language=None, enabled=None, default=None, forced=None, lacing=None, # @ReservedAssignment
+ def __init__(self, type=None, number=None, name=None, language=None, enabled=None, default=None, forced=None, lacing=None,
codec_id=None, codec_name=None):
self.type = type
self.number = number
@@ -154,10 +178,10 @@ class Track(object):
:type element: :class:`~enzyme.parsers.ebml.Element`
"""
- type = element.get('TrackType') # @ReservedAssignment
+ type = element.get('TrackType')
number = element.get('TrackNumber', 0)
name = element.get('Name')
- language = element.get('Language', 'eng')
+ language = element.get('Language','eng')
enabled = bool(element.get('FlagEnabled', 1))
default = bool(element.get('FlagDefault', 1))
forced = bool(element.get('FlagForced', 0))
@@ -201,7 +225,7 @@ class VideoTrack(Track):
videotrack.width = element['Video'].get('PixelWidth', 0)
videotrack.height = element['Video'].get('PixelHeight', 0)
videotrack.interlaced = bool(element['Video'].get('FlagInterlaced', False))
- videotrack.stereo_mode = element['Video'].get('StereoMode', 0)
+ videotrack.stereo_mode = element['Video'].get('StereoMode')
videotrack.crop = {}
if 'PixelCropTop' in element['Video']:
videotrack.crop['top'] = element['Video']['PixelCropTop']
@@ -211,10 +235,10 @@ class VideoTrack(Track):
videotrack.crop['left'] = element['Video']['PixelCropLeft']
if 'PixelCropRight' in element['Video']:
videotrack.crop['right'] = element['Video']['PixelCropRight']
- videotrack.display_unit = element['Video'].get('DisplayUnit')
videotrack.display_width = element['Video'].get('DisplayWidth')
videotrack.display_height = element['Video'].get('DisplayHeight')
- videotrack.aspect_ratio_type = element['Video'].get('AspectRatioType', 0)
+ videotrack.display_unit = element['Video'].get('DisplayUnit')
+ videotrack.aspect_ratio_type = element['Video'].get('AspectRatioType')
return videotrack
def __repr__(self):
@@ -245,7 +269,7 @@ class AudioTrack(Track):
audiotrack = super(AudioTrack, cls).fromelement(element)
audiotrack.sampling_frequency = element['Audio'].get('SamplingFrequency', 8000.0)
audiotrack.channels = element['Audio'].get('Channels', 1)
- audiotrack.output_sampling_frequency = element['Audio'].get('OutputSamplingFrequency', audiotrack.sampling_frequency)
+ audiotrack.output_sampling_frequency = element['Audio'].get('OutputSamplingFrequency')
audiotrack.bit_depth = element['Audio'].get('BitDepth')
return audiotrack
@@ -256,8 +280,9 @@ class AudioTrack(Track):
class SubtitleTrack(Track):
"""Object for the Tracks EBML element with :data:`SUBTITLE_TRACK` TrackType"""
- pass
-
+
+ def is_srt(self):
+ return self.codec_id == 'S_TEXT/UTF8'
class Tag(object):
"""Object for the Tag EBML element"""
diff --git a/libs/enzyme/parsers/ebml/core.py b/libs/enzyme/parsers/ebml/core.py
index ae025ac73..ef2dec8b2 100644
--- a/libs/enzyme/parsers/ebml/core.py
+++ b/libs/enzyme/parsers/ebml/core.py
@@ -38,8 +38,15 @@ READERS = {
BINARY: read_element_binary
}
+class BaseElement(object):
-class Element(object):
+ def __init__(self, id=None, position=None, size=None, data=None):
+ self.id = id
+ self.position = position
+ self.size = size
+ self.data = data
+
+class Element(BaseElement):
"""Base object of EBML
:param int id: id of the element, best represented as hexadecimal (0x18538067 for Matroska Segment element)
@@ -52,14 +59,11 @@ class Element(object):
:param data: data as read by the corresponding :data:`READERS`
"""
- def __init__(self, id=None, type=None, name=None, level=None, position=None, size=None, data=None): # @ReservedAssignment
- self.id = id
+ def __init__(self, id=None, type=None, name=None, level=None, position=None, size=None, data=None):
+ super(Element, self).__init__(id, position, size, data)
self.type = type
self.name = name
self.level = level
- self.position = position
- self.size = size
- self.data = data
def __repr__(self):
return '<%s [%s, %r]>' % (self.__class__.__name__, self.name, self.data)
@@ -89,7 +93,7 @@ class MasterElement(Element):
Element(DocType, u'matroska')
"""
- def __init__(self, id=None, name=None, level=None, position=None, size=None, data=None): # @ReservedAssignment
+ def __init__(self, id=None, name=None, level=None, position=None, size=None, data=None):
super(MasterElement, self).__init__(id, MASTER, name, level, position, size, data)
def load(self, stream, specs, ignore_element_types=None, ignore_element_names=None, max_level=None):
@@ -137,8 +141,7 @@ class MasterElement(Element):
def __iter__(self):
return iter(self.data)
-
-def parse(stream, specs, size=None, ignore_element_types=None, ignore_element_names=None, max_level=None):
+def parse(stream, specs, size=None, ignore_element_types=None, ignore_element_names=None, max_level=None, include_element_names=None):
"""Parse a stream for `size` bytes according to the `specs`
:param stream: file-like object from which to read
@@ -148,6 +151,7 @@ def parse(stream, specs, size=None, ignore_element_types=None, ignore_element_na
:param list ignore_element_types: list of element types to ignore
:param list ignore_element_names: list of element names to ignore
:param int max_level: maximum level of elements
+ :param list include_element_names: list of element names to include exclusively, so ignoring all other element names
:return: parsed data as a tree of :class:`~enzyme.parsers.ebml.core.Element`
:rtype: list
@@ -158,26 +162,36 @@ def parse(stream, specs, size=None, ignore_element_types=None, ignore_element_na
"""
ignore_element_types = ignore_element_types if ignore_element_types is not None else []
ignore_element_names = ignore_element_names if ignore_element_names is not None else []
+ include_element_names = include_element_names if include_element_names is not None else []
start = stream.tell()
elements = []
while size is None or stream.tell() - start < size:
try:
element = parse_element(stream, specs)
- if element is None:
+ if not element or not hasattr(element, "type"):
+ stream.seek(element.size, 1)
continue
- logger.debug('%s %s parsed', element.__class__.__name__, element.name)
- if element.type in ignore_element_types or element.name in ignore_element_names:
- logger.info('%s %s ignored', element.__class__.__name__, element.name)
- if element.type == MASTER:
- stream.seek(element.size, 1)
+
+ if element.type is None:
+ logger.error('Element with id 0x%x is not in the specs' % element.id)
+ stream.seek(element.size, 1)
+ continue
+ elif element.type in ignore_element_types or element.name in ignore_element_names:
+ logger.info('%s %s %s ignored', element.__class__.__name__, element.name, element.type)
+ stream.seek(element.size, 1)
continue
- if element.type == MASTER:
+ elif len(include_element_names) > 0 and element.name not in include_element_names:
+ stream.seek(element.size, 1)
+ continue
+ elif element.type == MASTER:
if max_level is not None and element.level >= max_level:
logger.info('Maximum level %d reached for children of %s %s', max_level, element.__class__.__name__, element.name)
stream.seek(element.size, 1)
else:
logger.debug('Loading child elements for %s %s with size %d', element.__class__.__name__, element.name, element.size)
- element.data = parse(stream, specs, element.size, ignore_element_types, ignore_element_names, max_level)
+ element.data = parse(stream, specs, element.size, ignore_element_types, ignore_element_names, max_level,include_element_names)
+ else:
+ element.data = READERS[element.type](stream, element.size)
elements.append(element)
except ReadError:
if size is not None:
@@ -186,21 +200,15 @@ def parse(stream, specs, size=None, ignore_element_types=None, ignore_element_na
return elements
-def parse_element(stream, specs, load_children=False, ignore_element_types=None, ignore_element_names=None, max_level=None):
+def parse_element(stream, specs):
"""Extract a single :class:`Element` from the `stream` according to the `specs`
:param stream: file-like object from which to read
:param dict specs: see :ref:`specs`
- :param bool load_children: load children elements if the parsed element is a :class:`MasterElement`
- :param list ignore_element_types: list of element types to ignore
- :param list ignore_element_names: list of element names to ignore
- :param int max_level: maximum level for children elements
:return: the parsed element
:rtype: :class:`Element`
"""
- ignore_element_types = ignore_element_types if ignore_element_types is not None else []
- ignore_element_names = ignore_element_names if ignore_element_names is not None else []
element_id = read_element_id(stream)
if element_id is None:
raise ReadError('Cannot read element id')
@@ -208,20 +216,14 @@ def parse_element(stream, specs, load_children=False, ignore_element_types=None,
if element_size is None:
raise ReadError('Cannot read element size')
if element_id not in specs:
- logger.error('Element with id 0x%x is not in the specs' % element_id)
- stream.seek(element_size, 1)
- return None
+ return BaseElement(element_id,stream.tell(),element_size)
element_type, element_name, element_level = specs[element_id]
if element_type == MASTER:
element = MasterElement(element_id, element_name, element_level, stream.tell(), element_size)
- if load_children:
- element.data = parse(stream, specs, element.size, ignore_element_types, ignore_element_names, max_level)
else:
element = Element(element_id, element_type, element_name, element_level, stream.tell(), element_size)
- element.data = READERS[element_type](stream, element_size)
return element
-
def get_matroska_specs(webm_only=False):
"""Get the Matroska specs
diff --git a/libs/enzyme/subtitle.py b/libs/enzyme/subtitle.py
new file mode 100644
index 000000000..f33690858
--- /dev/null
+++ b/libs/enzyme/subtitle.py
@@ -0,0 +1,185 @@
+# -*- coding: utf-8 -*-
+from .exceptions import ReadError
+from .parsers import ebml
+from .mkv import MKV
+from .parsers import ebml
+import logging
+import codecs
+import os
+import io
+
+__all__ = ['Subtitle']
+logger = logging.getLogger(__name__)
+
+class Subtitle(object):
+ """Subtitle extractor for Matroska Video File.
+
+ Currently only SRT subtitles stored without lacing are supported
+ """
+
+ def __init__(self, stream):
+ """Read the available subtitles from a MKV file-like object"""
+ self._stream = stream
+ #Use the MKV class to parse the META information
+ mkv = MKV(stream)
+ self._timecode_scale = mkv.info.timecode_scale
+ self._subtitles = mkv.get_srt_subtitles_track_by_language()
+
+ def has_subtitle(self, language):
+ return language in self._subtitles
+
+ def write_subtitle_to_stream(self, language):
+ """Write a single subtitle to stream or return None if language not available"""
+ if language in self._subtitles:
+ subtitle = self._subtitles[language]
+ return _write_track_to_srt_stream(self._stream,subtitle.number,self._timecode_scale)
+ logger.info("Writing subtitle for language %s to stream",language)
+ else:
+ logger.info("Subtitle for language %s not found",language)
+
+ def write_subtitles_to_stream(self):
+ """Write all available subtitles as streams to a dictionary with language as the key"""
+ subtitles = dict()
+ for language in self._subtitles:
+ subtitles[language] = self.write_subtitle_to_stream(language)
+ return subtitles
+
+def _write_track_to_srt_stream(mkv_stream, track, timecode_scale):
+
+ srt_stream = io.StringIO()
+ index = 0
+ for cluster in _parse_segment(mkv_stream,track):
+ for blockgroup in cluster.blockgroups:
+ index = index + 1
+ timeRange = _print_time_range(timecode_scale,cluster.timecode,blockgroup.block.timecode,blockgroup.duration)
+ srt_stream.write(str(index) + '\n')
+ srt_stream.write(timeRange + '\n')
+ srt_stream.write(codecs.decode(blockgroup.block.data.read(),'utf-8') + '\n')
+ srt_stream.write('\n')
+ return srt_stream
+
+def _parse_segment(stream,track):
+
+ stream.seek(0)
+ specs = ebml.get_matroska_specs()
+
+ # Find all level 1 Cluster elements and its subelements. Speed up this process by excluding all other currently known level 1 elements
+ try:
+ segments = ebml.parse(stream, specs,include_element_names=['Segment','Cluster','BlockGroup','Timecode','Block','BlockDuration',],max_level=3)
+ except ReadError:
+ pass
+
+ clusters = []
+ for cluster in segments[0].data:
+ _parse_cluster(track, clusters, cluster)
+ return clusters
+
+def _parse_cluster(track, clusters, cluster):
+
+ blockgroups = []
+ timecode = None
+ for child in cluster.data:
+ if child.name == 'BlockGroup':
+ _parse_blockgroup(track, blockgroups, child)
+ elif child.name == 'Timecode':
+ timecode = child.data
+
+ if len(blockgroups) > 0 and timecode != None:
+ clusters.append(Cluster(timecode, blockgroups))
+
+def _parse_blockgroup(track, blockgroups, blockgroup):
+
+ block = None
+ duration = None
+ for child in blockgroup.data:
+ if child.name == 'Block':
+ block = Block.fromelement(child)
+ if block.track != track:
+ block = None
+ elif child.name == 'BlockDuration':
+ duration = child.data
+
+ if duration != None and block != None:
+ blockgroups.append(BlockGroup(block, duration))
+
+def _print_time_range(timecode_scale,clusterTimecode,blockTimecode,duration):
+
+ timecode_scale_ms = timecode_scale / 1000000 #Timecode
+ rawTimecode = clusterTimecode + blockTimecode
+ startTimeMilleSeconds = (rawTimecode) * timecode_scale_ms
+ endTimeMilleSeconds = (rawTimecode + duration) * timecode_scale_ms
+
+ return _print_time(startTimeMilleSeconds) + " --> " + _print_time(endTimeMilleSeconds)
+
+def _print_time(timeInMilleSeconds):
+
+ timeInSeconds, milleSeconds = divmod(timeInMilleSeconds, 1000)
+ timeInMinutes, seconds = divmod(timeInSeconds, 60)
+ hours, minutes = divmod(timeInMinutes, 60)
+
+ return '%d:%02d:%02d,%d' % (hours,minutes,seconds,milleSeconds)
+
+class Cluster(object):
+
+ def __init__(self,timecode=None, blockgroups=[]):
+ self.timecode = timecode
+ self.blockgroups = blockgroups
+
+class BlockGroup(object):
+
+ def __init__(self,block=None,duration=None):
+ self.block = block
+ self.duration = duration
+
+class Block(object):
+
+ def __init__(self, track=None, timecode=None, invisible=False, lacing=None, flags=None, data=None):
+ self.track = track
+ self.timecode = timecode
+ self.invisible = invisible
+ self.lacing = lacing
+ self.flags = flags
+ self.data = data
+
+ @classmethod
+ def fromelement(cls,element):
+ stream = element.data
+ track = ebml.read_element_size(stream)
+ timecode = ebml.read_element_integer(stream,2)
+ flags = ord(stream.read(1))
+
+ invisible = bool(flags & 0x8)
+
+ if (flags & 0x6):
+ lacing = 'EBML'
+ elif (flags & 0x4):
+ lacing = 'fixed-size'
+ elif (flags & 0x2):
+ lacing = 'Xiph'
+ else:
+ lacing = None
+
+ if lacing:
+ raise ReadError('Laced blocks are not implemented yet')
+
+ data = ebml.read_element_binary(stream, element.size - stream.tell())
+ return cls(track,timecode,invisible,lacing,flags,data)
+
+ def __repr__(self):
+ return '<%s track=%d, timecode=%d, invisible=%d, lacing=%s>' % (self.__class__.__name__, self.track,self.timecode,self.invisible,self.lacing)
+
+class SimpleBlock(Block):
+
+ def __init__(self, track=None, timecode=None, keyframe=False, invisible=False, lacing=None, flags=None, data=None, discardable=False):
+ super(SimpleBlock,self).__init__(track,timecode,invisible,lacing,flags,data)
+ self.keyframe = keyframe
+ self.discardable = discardable
+
+ def fromelement(cls,element):
+ simpleblock = super(SimpleBlock, cls).fromelement(element)
+ simpleblock.keyframe = bool(simpleblock.flags & 0x80)
+ simpleblock.discardable = bool(simpleblock.flags & 0x1)
+ return simpleblock
+
+ def __repr__(self):
+ return '<%s track=%d, timecode=%d, keyframe=%d, invisible=%d, lacing=%s, discardable=%d>' % (self.__class__.__name__, self.track,self.timecode,self.keyframe,self.invisible,self.lacing,self.discardable) \ No newline at end of file
diff --git a/libs/enzyme/tests/__init__.py b/libs/enzyme/tests/__init__.py
index 426d3598f..739abe747 100644
--- a/libs/enzyme/tests/__init__.py
+++ b/libs/enzyme/tests/__init__.py
@@ -1,9 +1,11 @@
# -*- coding: utf-8 -*-
-from . import test_mkv, test_parsers
+from . import test_mkv, test_parsers, test_subtitle
import unittest
-suite = unittest.TestSuite([test_mkv.suite(), test_parsers.suite()])
+suite = unittest.TestSuite([test_mkv.suite(), test_parsers.suite(), test_subtitle.suite()])
+
+
if __name__ == '__main__':
diff --git a/libs/enzyme/tests/test_mkv.py b/libs/enzyme/tests/test_mkv.py
index ac4617fa3..71aceee30 100644
--- a/libs/enzyme/tests/test_mkv.py
+++ b/libs/enzyme/tests/test_mkv.py
@@ -44,12 +44,12 @@ class MKVTestCase(unittest.TestCase):
self.assertTrue(mkv.video_tracks[0].width == 854)
self.assertTrue(mkv.video_tracks[0].height == 480)
self.assertTrue(mkv.video_tracks[0].interlaced == False)
- self.assertTrue(mkv.video_tracks[0].stereo_mode == 0)
+ self.assertTrue(mkv.video_tracks[0].stereo_mode is None)
self.assertTrue(mkv.video_tracks[0].crop == {})
self.assertTrue(mkv.video_tracks[0].display_width is None)
self.assertTrue(mkv.video_tracks[0].display_height is None)
self.assertTrue(mkv.video_tracks[0].display_unit is None)
- self.assertTrue(mkv.video_tracks[0].aspect_ratio_type == 0)
+ self.assertTrue(mkv.video_tracks[0].aspect_ratio_type is None)
# audio track
self.assertTrue(len(mkv.audio_tracks) == 1)
self.assertTrue(mkv.audio_tracks[0].type == AUDIO_TRACK)
@@ -64,7 +64,7 @@ class MKVTestCase(unittest.TestCase):
self.assertTrue(mkv.audio_tracks[0].codec_name is None)
self.assertTrue(mkv.audio_tracks[0].sampling_frequency == 48000.0)
self.assertTrue(mkv.audio_tracks[0].channels == 2)
- self.assertTrue(mkv.audio_tracks[0].output_sampling_frequency == 48000.0)
+ self.assertTrue(mkv.audio_tracks[0].output_sampling_frequency is None)
self.assertTrue(mkv.audio_tracks[0].bit_depth is None)
# subtitle track
self.assertTrue(len(mkv.subtitle_tracks) == 0)
@@ -113,12 +113,12 @@ class MKVTestCase(unittest.TestCase):
self.assertTrue(mkv.video_tracks[0].width == 1024)
self.assertTrue(mkv.video_tracks[0].height == 576)
self.assertTrue(mkv.video_tracks[0].interlaced == False)
- self.assertTrue(mkv.video_tracks[0].stereo_mode == 0)
+ self.assertTrue(mkv.video_tracks[0].stereo_mode is None)
self.assertTrue(mkv.video_tracks[0].crop == {})
self.assertTrue(mkv.video_tracks[0].display_width == 1354)
self.assertTrue(mkv.video_tracks[0].display_height is None)
self.assertTrue(mkv.video_tracks[0].display_unit is None)
- self.assertTrue(mkv.video_tracks[0].aspect_ratio_type == 0)
+ self.assertTrue(mkv.video_tracks[0].aspect_ratio_type is None)
# audio track
self.assertTrue(len(mkv.audio_tracks) == 1)
self.assertTrue(mkv.audio_tracks[0].type == AUDIO_TRACK)
@@ -133,7 +133,7 @@ class MKVTestCase(unittest.TestCase):
self.assertTrue(mkv.audio_tracks[0].codec_name is None)
self.assertTrue(mkv.audio_tracks[0].sampling_frequency == 48000.0)
self.assertTrue(mkv.audio_tracks[0].channels == 2)
- self.assertTrue(mkv.audio_tracks[0].output_sampling_frequency == 48000.0)
+ self.assertTrue(mkv.audio_tracks[0].output_sampling_frequency is None)
self.assertTrue(mkv.audio_tracks[0].bit_depth is None)
# subtitle track
self.assertTrue(len(mkv.subtitle_tracks) == 0)
@@ -182,12 +182,12 @@ class MKVTestCase(unittest.TestCase):
self.assertTrue(mkv.video_tracks[0].width == 1024)
self.assertTrue(mkv.video_tracks[0].height == 576)
self.assertTrue(mkv.video_tracks[0].interlaced == False)
- self.assertTrue(mkv.video_tracks[0].stereo_mode == 0)
+ self.assertTrue(mkv.video_tracks[0].stereo_mode is None)
self.assertTrue(mkv.video_tracks[0].crop == {})
self.assertTrue(mkv.video_tracks[0].display_width is None)
self.assertTrue(mkv.video_tracks[0].display_height is None)
self.assertTrue(mkv.video_tracks[0].display_unit is None)
- self.assertTrue(mkv.video_tracks[0].aspect_ratio_type == 0)
+ self.assertTrue(mkv.video_tracks[0].aspect_ratio_type is None)
# audio track
self.assertTrue(len(mkv.audio_tracks) == 1)
self.assertTrue(mkv.audio_tracks[0].type == AUDIO_TRACK)
@@ -202,7 +202,7 @@ class MKVTestCase(unittest.TestCase):
self.assertTrue(mkv.audio_tracks[0].codec_name is None)
self.assertTrue(mkv.audio_tracks[0].sampling_frequency == 48000.0)
self.assertTrue(mkv.audio_tracks[0].channels == 2)
- self.assertTrue(mkv.audio_tracks[0].output_sampling_frequency == 48000.0)
+ self.assertTrue(mkv.audio_tracks[0].output_sampling_frequency is None)
self.assertTrue(mkv.audio_tracks[0].bit_depth is None)
# subtitle track
self.assertTrue(len(mkv.subtitle_tracks) == 0)
@@ -251,12 +251,12 @@ class MKVTestCase(unittest.TestCase):
self.assertTrue(mkv.video_tracks[0].width == 1024)
self.assertTrue(mkv.video_tracks[0].height == 576)
self.assertTrue(mkv.video_tracks[0].interlaced == False)
- self.assertTrue(mkv.video_tracks[0].stereo_mode == 0)
+ self.assertTrue(mkv.video_tracks[0].stereo_mode is None)
self.assertTrue(mkv.video_tracks[0].crop == {})
self.assertTrue(mkv.video_tracks[0].display_width == 1024)
self.assertTrue(mkv.video_tracks[0].display_height == 576)
self.assertTrue(mkv.video_tracks[0].display_unit is None)
- self.assertTrue(mkv.video_tracks[0].aspect_ratio_type == 0)
+ self.assertTrue(mkv.video_tracks[0].aspect_ratio_type is None)
# audio tracks
self.assertTrue(len(mkv.audio_tracks) == 2)
self.assertTrue(mkv.audio_tracks[0].type == AUDIO_TRACK)
@@ -271,7 +271,7 @@ class MKVTestCase(unittest.TestCase):
self.assertTrue(mkv.audio_tracks[0].codec_name is None)
self.assertTrue(mkv.audio_tracks[0].sampling_frequency == 48000.0)
self.assertTrue(mkv.audio_tracks[0].channels == 2)
- self.assertTrue(mkv.audio_tracks[0].output_sampling_frequency == 48000.0)
+ self.assertTrue(mkv.audio_tracks[0].output_sampling_frequency is None)
self.assertTrue(mkv.audio_tracks[0].bit_depth is None)
self.assertTrue(mkv.audio_tracks[1].type == AUDIO_TRACK)
self.assertTrue(mkv.audio_tracks[1].number == 10)
@@ -414,12 +414,12 @@ class MKVTestCase(unittest.TestCase):
self.assertTrue(mkv.video_tracks[0].width == 854)
self.assertTrue(mkv.video_tracks[0].height == 480)
self.assertTrue(mkv.video_tracks[0].interlaced == False)
- self.assertTrue(mkv.video_tracks[0].stereo_mode == 0)
+ self.assertTrue(mkv.video_tracks[0].stereo_mode is None)
self.assertTrue(mkv.video_tracks[0].crop == {})
self.assertTrue(mkv.video_tracks[0].display_width is None)
self.assertTrue(mkv.video_tracks[0].display_height is None)
self.assertTrue(mkv.video_tracks[0].display_unit is None)
- self.assertTrue(mkv.video_tracks[0].aspect_ratio_type == 0)
+ self.assertTrue(mkv.video_tracks[0].aspect_ratio_type is None)
# audio track
self.assertTrue(len(mkv.audio_tracks) == 1)
self.assertTrue(mkv.audio_tracks[0].type == AUDIO_TRACK)
@@ -434,7 +434,7 @@ class MKVTestCase(unittest.TestCase):
self.assertTrue(mkv.audio_tracks[0].codec_name is None)
self.assertTrue(mkv.audio_tracks[0].sampling_frequency == 48000.0)
self.assertTrue(mkv.audio_tracks[0].channels == 2)
- self.assertTrue(mkv.audio_tracks[0].output_sampling_frequency == 48000.0)
+ self.assertTrue(mkv.audio_tracks[0].output_sampling_frequency is None)
self.assertTrue(mkv.audio_tracks[0].bit_depth is None)
# subtitle track
self.assertTrue(len(mkv.subtitle_tracks) == 0)
@@ -483,12 +483,12 @@ class MKVTestCase(unittest.TestCase):
self.assertTrue(mkv.video_tracks[0].width == 1024)
self.assertTrue(mkv.video_tracks[0].height == 576)
self.assertTrue(mkv.video_tracks[0].interlaced == False)
- self.assertTrue(mkv.video_tracks[0].stereo_mode == 0)
+ self.assertTrue(mkv.video_tracks[0].stereo_mode is None)
self.assertTrue(mkv.video_tracks[0].crop == {})
self.assertTrue(mkv.video_tracks[0].display_width is None)
self.assertTrue(mkv.video_tracks[0].display_height is None)
self.assertTrue(mkv.video_tracks[0].display_unit is None)
- self.assertTrue(mkv.video_tracks[0].aspect_ratio_type == 0)
+ self.assertTrue(mkv.video_tracks[0].aspect_ratio_type is None)
# audio track
self.assertTrue(len(mkv.audio_tracks) == 1)
self.assertTrue(mkv.audio_tracks[0].type == AUDIO_TRACK)
@@ -503,7 +503,7 @@ class MKVTestCase(unittest.TestCase):
self.assertTrue(mkv.audio_tracks[0].codec_name is None)
self.assertTrue(mkv.audio_tracks[0].sampling_frequency == 48000.0)
self.assertTrue(mkv.audio_tracks[0].channels == 2)
- self.assertTrue(mkv.audio_tracks[0].output_sampling_frequency == 48000.0)
+ self.assertTrue(mkv.audio_tracks[0].output_sampling_frequency is None)
self.assertTrue(mkv.audio_tracks[0].bit_depth is None)
# subtitle track
self.assertTrue(len(mkv.subtitle_tracks) == 0)
@@ -552,12 +552,12 @@ class MKVTestCase(unittest.TestCase):
self.assertTrue(mkv.video_tracks[0].width == 1024)
self.assertTrue(mkv.video_tracks[0].height == 576)
self.assertTrue(mkv.video_tracks[0].interlaced == False)
- self.assertTrue(mkv.video_tracks[0].stereo_mode == 0)
+ self.assertTrue(mkv.video_tracks[0].stereo_mode is None)
self.assertTrue(mkv.video_tracks[0].crop == {})
self.assertTrue(mkv.video_tracks[0].display_width is None)
self.assertTrue(mkv.video_tracks[0].display_height is None)
self.assertTrue(mkv.video_tracks[0].display_unit is None)
- self.assertTrue(mkv.video_tracks[0].aspect_ratio_type == 0)
+ self.assertTrue(mkv.video_tracks[0].aspect_ratio_type is None)
# audio track
self.assertTrue(len(mkv.audio_tracks) == 1)
self.assertTrue(mkv.audio_tracks[0].type == AUDIO_TRACK)
@@ -572,7 +572,7 @@ class MKVTestCase(unittest.TestCase):
self.assertTrue(mkv.audio_tracks[0].codec_name is None)
self.assertTrue(mkv.audio_tracks[0].sampling_frequency == 48000.0)
self.assertTrue(mkv.audio_tracks[0].channels == 2)
- self.assertTrue(mkv.audio_tracks[0].output_sampling_frequency == 48000.0)
+ self.assertTrue(mkv.audio_tracks[0].output_sampling_frequency is None)
self.assertTrue(mkv.audio_tracks[0].bit_depth is None)
# subtitle track
self.assertTrue(len(mkv.subtitle_tracks) == 0)
diff --git a/libs/enzyme/tests/test_parsers.py b/libs/enzyme/tests/test_parsers.py
index 0fa320ce0..611aa9bef 100644
--- a/libs/enzyme/tests/test_parsers.py
+++ b/libs/enzyme/tests/test_parsers.py
@@ -33,7 +33,7 @@ class EBMLTestCase(unittest.TestCase):
self.stream.close()
def check_element(self, element_id, element_type, element_name, element_level, element_position, element_size, element_data, element,
- ignore_element_types=None, ignore_element_names=None, max_level=None):
+ ignore_element_types=None, ignore_element_names=None, max_level=None, include_element_names=None):
"""Recursively check an element"""
# base
self.assertTrue(element.id == element_id)
@@ -53,6 +53,8 @@ class EBMLTestCase(unittest.TestCase):
element_data = [e for e in element_data if e[1] not in ignore_element_types]
if ignore_element_names is not None: # filter validation on element names
element_data = [e for e in element_data if e[2] not in ignore_element_names]
+ if include_element_names is not None: # filter validation on element names
+ element_data = [e for e in element_data if e[2] in include_element_names]
if element.level == max_level: # special check when maximum level is reached
self.assertTrue(element.data is None)
return
@@ -60,7 +62,7 @@ class EBMLTestCase(unittest.TestCase):
for i in range(len(element.data)):
self.check_element(element_data[i][0], element_data[i][1], element_data[i][2], element_data[i][3],
element_data[i][4], element_data[i][5], element_data[i][6], element.data[i], ignore_element_types,
- ignore_element_names, max_level)
+ ignore_element_names, max_level,include_element_names)
def test_parse_full(self):
result = ebml.parse(self.stream, self.specs)
@@ -87,6 +89,15 @@ class EBMLTestCase(unittest.TestCase):
self.check_element(self.validation[i][0], self.validation[i][1], self.validation[i][2], self.validation[i][3],
self.validation[i][4], self.validation[i][5], self.validation[i][6], result[i], ignore_element_names=ignore_element_names)
+ def test_parse_include_element_names(self):
+ include_element_names = ['Segment','Cluster']
+ result = ebml.parse(self.stream, self.specs, include_element_names=include_element_names)
+ self.validation = [e for e in self.validation if e[2] in include_element_names]
+ self.assertTrue(len(result) == len(self.validation))
+ for i in range(len(self.validation)):
+ self.check_element(self.validation[i][0], self.validation[i][1], self.validation[i][2], self.validation[i][3],
+ self.validation[i][4], self.validation[i][5], self.validation[i][6], result[i], include_element_names=include_element_names)
+
def test_parse_max_level(self):
max_level = 3
result = ebml.parse(self.stream, self.specs, max_level=max_level)
diff --git a/libs/enzyme/tests/test_subtitle.py b/libs/enzyme/tests/test_subtitle.py
new file mode 100644
index 000000000..a8fa0e7b2
--- /dev/null
+++ b/libs/enzyme/tests/test_subtitle.py
@@ -0,0 +1,86 @@
+# -*- coding: utf-8 -*-
+from enzyme.subtitle import Subtitle, _print_time_range, _print_time
+import unittest
+import os
+import io
+import requests
+import zipfile
+import glob
+
+# Test directory
+TEST_DIR = os.path.join(os.path.dirname(__file__), os.path.splitext(__file__)[0])
+
+def setUpModule():
+ if not os.path.exists(TEST_DIR):
+ r = requests.get('http://downloads.sourceforge.net/project/matroska/test_files/matroska_test_w1_1.zip')
+ with zipfile.ZipFile(io.BytesIO(r.content), 'r') as f:
+ f.extractall(TEST_DIR)
+
+class SubtitleTestCase(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+
+ file = 'test5.mkv'
+ stream = io.open(os.path.join(TEST_DIR, file), 'rb')
+ cls.subtitle = Subtitle(stream)
+
+ def test_subtitles_found(self):
+
+ subtitles = self.subtitle._subtitles
+ self.assertTrue('eng' in subtitles)
+ self.assertTrue('hun' in subtitles)
+ self.assertTrue('ger' in subtitles)
+ self.assertTrue('fre' in subtitles)
+ self.assertTrue('spa' in subtitles)
+ self.assertTrue('ita' in subtitles)
+ self.assertTrue('jpn' in subtitles)
+ self.assertTrue('und' in subtitles)
+
+ def test_write_subtitle_to_stream(self):
+
+ subtitle_stream = self.subtitle.write_subtitle_to_stream("eng")
+ self.assertIsInstance(subtitle_stream,io.StringIO,"Expecting a StringIO stream")
+
+ def test_write_subtitle_to_stream(self):
+
+ subtitle_streams = self.subtitle.write_subtitles_to_stream()
+
+ self.assertIn("eng", subtitle_streams, "Expecting a subtitle stream for language eng")
+ self.assertIsInstance(subtitle_streams["eng"],io.StringIO,"Expecting a StringIO stream")
+ self.assertIn("hun", subtitle_streams, "Expecting a subtitle stream for language hun")
+ self.assertIsInstance(subtitle_streams["hun"],io.StringIO,"Expecting a StringIO stream")
+ self.assertIn("ger", subtitle_streams, "Expecting a subtitle stream for language ger")
+ self.assertIsInstance(subtitle_streams["ger"],io.StringIO,"Expecting a StringIO stream")
+ self.assertIn("fre", subtitle_streams, "Expecting a subtitle stream for language fre")
+ self.assertIsInstance(subtitle_streams["fre"],io.StringIO,"Expecting a StringIO stream")
+ self.assertIn("spa", subtitle_streams, "Expecting a subtitle stream for language spa")
+ self.assertIsInstance(subtitle_streams["spa"],io.StringIO,"Expecting a StringIO stream")
+ self.assertIn("ita", subtitle_streams, "Expecting a subtitle stream for language ita")
+ self.assertIsInstance(subtitle_streams["ita"],io.StringIO,"Expecting a StringIO stream")
+ self.assertIn("jpn", subtitle_streams, "Expecting a subtitle stream for language jpn")
+ self.assertIsInstance(subtitle_streams["jpn"],io.StringIO,"Expecting a StringIO stream")
+
+ def test_print_time(self):
+
+ self.assertEqual('0:00:00,0',_print_time(0))
+ self.assertEqual('0:00:00,1',_print_time(1))
+ self.assertEqual('0:00:00,999',_print_time(999))
+ self.assertEqual('0:00:01,0',_print_time(1000))
+ self.assertEqual('0:00:59,999',_print_time(1000*60-1))
+ self.assertEqual('0:01:00,0',_print_time(1000*60))
+ self.assertEqual('0:59:59,999',_print_time(1000*60*60-1))
+ self.assertEqual('1:00:00,0',_print_time(1000*60*60))
+
+ def test_print_time_range(self):
+
+ self.assertEqual('0:00:00,0 --> 0:00:00,0',_print_time_range(1000000,0,0,0))
+ self.assertEqual('0:01:00,0 --> 0:01:01,0',_print_time_range(1000000,0,60000,1000))
+
+def suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.TestLoader().loadTestsFromTestCase(SubtitleTestCase))
+ return suite
+
+if __name__ == '__main__':
+ unittest.TextTestRunner().run(suite())