diff options
Diffstat (limited to 'libs/py7zr/compression.py')
-rw-r--r-- | libs/py7zr/compression.py | 384 |
1 files changed, 384 insertions, 0 deletions
diff --git a/libs/py7zr/compression.py b/libs/py7zr/compression.py new file mode 100644 index 000000000..25d5726ac --- /dev/null +++ b/libs/py7zr/compression.py @@ -0,0 +1,384 @@ +#!/usr/bin/python -u +# +# p7zr library +# +# Copyright (c) 2019 Hiroshi Miura <[email protected]> +# Copyright (c) 2004-2015 by Joachim Bauch, [email protected] +# 7-Zip Copyright (C) 1999-2010 Igor Pavlov +# LZMA SDK Copyright (C) 1999-2010 Igor Pavlov +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +import bz2 +import io +import lzma +import os +import queue +import sys +import threading +from typing import IO, Any, BinaryIO, Dict, List, Optional, Union + +from py7zr import UnsupportedCompressionMethodError +from py7zr.extra import CopyDecompressor, DeflateDecompressor, ISevenZipDecompressor, ZstdDecompressor +from py7zr.helpers import MemIO, NullIO, calculate_crc32, readlink +from py7zr.properties import READ_BLOCKSIZE, ArchivePassword, CompressionMethod + +if sys.version_info < (3, 6): + import pathlib2 as pathlib +else: + import pathlib +try: + import zstandard as Zstd # type: ignore +except ImportError: + Zstd = None + + +class Worker: + """Extract worker class to invoke handler""" + + def __init__(self, files, src_start: int, header) -> None: + self.target_filepath = {} # type: Dict[int, Union[MemIO, pathlib.Path, None]] + self.files = files + self.src_start = src_start + self.header = header + + def extract(self, fp: BinaryIO, parallel: bool, q=None) -> None: + """Extract worker method to handle 7zip folder and decompress each files.""" + if hasattr(self.header, 'main_streams') and self.header.main_streams is not None: + src_end = self.src_start + self.header.main_streams.packinfo.packpositions[-1] + numfolders = self.header.main_streams.unpackinfo.numfolders + if numfolders == 1: + self.extract_single(fp, self.files, self.src_start, src_end, q) + else: + folders = self.header.main_streams.unpackinfo.folders + positions = self.header.main_streams.packinfo.packpositions + empty_files = [f for f in self.files if f.emptystream] + if not parallel: + self.extract_single(fp, empty_files, 0, 0, q) + for i in range(numfolders): + self.extract_single(fp, folders[i].files, self.src_start + positions[i], + self.src_start + positions[i + 1], q) + else: + filename = getattr(fp, 'name', None) + self.extract_single(open(filename, 'rb'), empty_files, 0, 0, q) + extract_threads = [] + for i in range(numfolders): + p = threading.Thread(target=self.extract_single, + args=(filename, folders[i].files, + self.src_start + positions[i], self.src_start + positions[i + 1], q)) + p.start() + extract_threads.append((p)) + for p in extract_threads: + p.join() + else: + empty_files = [f for f in self.files if f.emptystream] + self.extract_single(fp, empty_files, 0, 0, q) + + def extract_single(self, fp: Union[BinaryIO, str], files, src_start: int, src_end: int, + q: Optional[queue.Queue]) -> None: + """Single thread extractor that takes file lists in single 7zip folder.""" + if files is None: + return + if isinstance(fp, str): + fp = open(fp, 'rb') + fp.seek(src_start) + for f in files: + if q is not None: + q.put(('s', str(f.filename), str(f.compressed) if f.compressed is not None else '0')) + fileish = self.target_filepath.get(f.id, None) + if fileish is not None: + fileish.parent.mkdir(parents=True, exist_ok=True) + with fileish.open(mode='wb') as ofp: + if not f.emptystream: + # extract to file + self.decompress(fp, f.folder, ofp, f.uncompressed[-1], f.compressed, src_end) + ofp.seek(0) + else: + pass # just create empty file + elif not f.emptystream: + # read and bin off a data but check crc + with NullIO() as ofp: + self.decompress(fp, f.folder, ofp, f.uncompressed[-1], f.compressed, src_end) + if q is not None: + q.put(('e', str(f.filename), str(f.uncompressed[-1]))) + + def decompress(self, fp: BinaryIO, folder, fq: IO[Any], + size: int, compressed_size: Optional[int], src_end: int) -> None: + """decompressor wrapper called from extract method. + + :parameter fp: archive source file pointer + :parameter folder: Folder object that have decompressor object. + :parameter fq: output file pathlib.Path + :parameter size: uncompressed size of target file. + :parameter compressed_size: compressed size of target file. + :parameter src_end: end position of the folder + :returns None + """ + assert folder is not None + out_remaining = size + decompressor = folder.get_decompressor(compressed_size) + while out_remaining > 0: + max_length = min(out_remaining, io.DEFAULT_BUFFER_SIZE) + rest_size = src_end - fp.tell() + read_size = min(READ_BLOCKSIZE, rest_size) + if read_size == 0: + tmp = decompressor.decompress(b'', max_length) + if len(tmp) == 0: + raise Exception("decompression get wrong: no output data.") + else: + inp = fp.read(read_size) + tmp = decompressor.decompress(inp, max_length) + if len(tmp) > 0 and out_remaining >= len(tmp): + out_remaining -= len(tmp) + fq.write(tmp) + if out_remaining <= 0: + break + if fp.tell() >= src_end: + if decompressor.crc is not None and not decompressor.check_crc(): + print('\nCRC error! expected: {}, real: {}'.format(decompressor.crc, decompressor.digest)) + return + + def _find_link_target(self, target): + """Find the target member of a symlink or hardlink member in the archive. + """ + targetname = target.as_posix() # type: str + linkname = readlink(targetname) + # Check windows full path symlinks + if linkname.startswith("\\\\?\\"): + linkname = linkname[4:] + # normalize as posix style + linkname = pathlib.Path(linkname).as_posix() # type: str + member = None + for j in range(len(self.files)): + if linkname == self.files[j].origin.as_posix(): + # FIXME: when API user specify arcname, it will break + member = os.path.relpath(linkname, os.path.dirname(targetname)) + break + if member is None: + member = linkname + return member + + def archive(self, fp: BinaryIO, folder, deref=False): + """Run archive task for specified 7zip folder.""" + compressor = folder.get_compressor() + outsize = 0 + self.header.main_streams.packinfo.numstreams = 1 + num_unpack_streams = 0 + self.header.main_streams.substreamsinfo.digests = [] + self.header.main_streams.substreamsinfo.digestsdefined = [] + last_file_index = 0 + foutsize = 0 + for i, f in enumerate(self.files): + file_info = f.file_properties() + self.header.files_info.files.append(file_info) + self.header.files_info.emptyfiles.append(f.emptystream) + foutsize = 0 + if f.is_symlink and not deref: + last_file_index = i + num_unpack_streams += 1 + link_target = self._find_link_target(f.origin) # type: str + tgt = link_target.encode('utf-8') # type: bytes + insize = len(tgt) + crc = calculate_crc32(tgt, 0) # type: int + out = compressor.compress(tgt) + outsize += len(out) + foutsize += len(out) + fp.write(out) + self.header.main_streams.substreamsinfo.digests.append(crc) + self.header.main_streams.substreamsinfo.digestsdefined.append(True) + self.header.main_streams.substreamsinfo.unpacksizes.append(insize) + self.header.files_info.files[i]['maxsize'] = foutsize + elif not f.emptystream: + last_file_index = i + num_unpack_streams += 1 + insize = 0 + with f.origin.open(mode='rb') as fd: + data = fd.read(READ_BLOCKSIZE) + insize += len(data) + crc = 0 + while data: + crc = calculate_crc32(data, crc) + out = compressor.compress(data) + outsize += len(out) + foutsize += len(out) + fp.write(out) + data = fd.read(READ_BLOCKSIZE) + insize += len(data) + self.header.main_streams.substreamsinfo.digests.append(crc) + self.header.main_streams.substreamsinfo.digestsdefined.append(True) + self.header.files_info.files[i]['maxsize'] = foutsize + self.header.main_streams.substreamsinfo.unpacksizes.append(insize) + else: + out = compressor.flush() + outsize += len(out) + foutsize += len(out) + fp.write(out) + if len(self.files) > 0: + self.header.files_info.files[last_file_index]['maxsize'] = foutsize + # Update size data in header + self.header.main_streams.packinfo.packsizes = [outsize] + folder.unpacksizes = [sum(self.header.main_streams.substreamsinfo.unpacksizes)] + self.header.main_streams.substreamsinfo.num_unpackstreams_folders = [num_unpack_streams] + + def register_filelike(self, id: int, fileish: Union[MemIO, pathlib.Path, None]) -> None: + """register file-ish to worker.""" + self.target_filepath[id] = fileish + + +class SevenZipDecompressor: + """Main decompressor object which is properly configured and bind to each 7zip folder. + because 7zip folder can have a custom compression method""" + + lzma_methods_map = { + CompressionMethod.LZMA: lzma.FILTER_LZMA1, + CompressionMethod.LZMA2: lzma.FILTER_LZMA2, + CompressionMethod.DELTA: lzma.FILTER_DELTA, + CompressionMethod.P7Z_BCJ: lzma.FILTER_X86, + CompressionMethod.BCJ_ARM: lzma.FILTER_ARM, + CompressionMethod.BCJ_ARMT: lzma.FILTER_ARMTHUMB, + CompressionMethod.BCJ_IA64: lzma.FILTER_IA64, + CompressionMethod.BCJ_PPC: lzma.FILTER_POWERPC, + CompressionMethod.BCJ_SPARC: lzma.FILTER_SPARC, + } + + FILTER_BZIP2 = 0x31 + FILTER_ZIP = 0x32 + FILTER_COPY = 0x33 + FILTER_AES = 0x34 + FILTER_ZSTD = 0x35 + alt_methods_map = { + CompressionMethod.MISC_BZIP2: FILTER_BZIP2, + CompressionMethod.MISC_DEFLATE: FILTER_ZIP, + CompressionMethod.COPY: FILTER_COPY, + CompressionMethod.CRYPT_AES256_SHA256: FILTER_AES, + CompressionMethod.MISC_ZSTD: FILTER_ZSTD, + } + + def __init__(self, coders: List[Dict[str, Any]], size: int, crc: Optional[int]) -> None: + # Get password which was set when creation of py7zr.SevenZipFile object. + self.input_size = size + self.consumed = 0 # type: int + self.crc = crc + self.digest = None # type: Optional[int] + if self._check_lzma_coders(coders): + self._set_lzma_decompressor(coders) + else: + self._set_alternative_decompressor(coders) + + def _check_lzma_coders(self, coders: List[Dict[str, Any]]) -> bool: + res = True + for coder in coders: + if self.lzma_methods_map.get(coder['method'], None) is None: + res = False + break + return res + + def _set_lzma_decompressor(self, coders: List[Dict[str, Any]]) -> None: + filters = [] # type: List[Dict[str, Any]] + for coder in coders: + if coder['numinstreams'] != 1 or coder['numoutstreams'] != 1: + raise UnsupportedCompressionMethodError('Only a simple compression method is currently supported.') + filter_id = self.lzma_methods_map.get(coder['method'], None) + if filter_id is None: + raise UnsupportedCompressionMethodError + properties = coder.get('properties', None) + if properties is not None: + filters[:0] = [lzma._decode_filter_properties(filter_id, properties)] # type: ignore + else: + filters[:0] = [{'id': filter_id}] + self.decompressor = lzma.LZMADecompressor(format=lzma.FORMAT_RAW, filters=filters) # type: Union[bz2.BZ2Decompressor, lzma.LZMADecompressor, ISevenZipDecompressor] # noqa + + def _set_alternative_decompressor(self, coders: List[Dict[str, Any]]) -> None: + filter_id = self.alt_methods_map.get(coders[0]['method'], None) + if filter_id == self.FILTER_BZIP2: + self.decompressor = bz2.BZ2Decompressor() + elif filter_id == self.FILTER_ZIP: + self.decompressor = DeflateDecompressor() + elif filter_id == self.FILTER_COPY: + self.decompressor = CopyDecompressor() + elif filter_id == self.FILTER_ZSTD and Zstd: + self.decompressor = ZstdDecompressor() + else: + raise UnsupportedCompressionMethodError + + def decompress(self, data: bytes, max_length: Optional[int] = None) -> bytes: + self.consumed += len(data) + if max_length is not None: + folder_data = self.decompressor.decompress(data, max_length=max_length) + else: + folder_data = self.decompressor.decompress(data) + # calculate CRC with uncompressed data + if self.crc is not None: + self.digest = calculate_crc32(folder_data, self.digest) + return folder_data + + def check_crc(self): + return self.crc == self.digest + + +class SevenZipCompressor: + + """Main compressor object to configured for each 7zip folder.""" + + __slots__ = ['filters', 'compressor', 'coders'] + + lzma_methods_map_r = { + lzma.FILTER_LZMA2: CompressionMethod.LZMA2, + lzma.FILTER_DELTA: CompressionMethod.DELTA, + lzma.FILTER_X86: CompressionMethod.P7Z_BCJ, + } + + def __init__(self, filters=None): + if filters is None: + self.filters = [{"id": lzma.FILTER_LZMA2, "preset": 7 | lzma.PRESET_EXTREME}, ] + else: + self.filters = filters + self.compressor = lzma.LZMACompressor(format=lzma.FORMAT_RAW, filters=self.filters) + self.coders = [] + for filter in self.filters: + if filter is None: + break + method = self.lzma_methods_map_r[filter['id']] + properties = lzma._encode_filter_properties(filter) + self.coders.append({'method': method, 'properties': properties, 'numinstreams': 1, 'numoutstreams': 1}) + + def compress(self, data): + return self.compressor.compress(data) + + def flush(self): + return self.compressor.flush() + + +def get_methods_names(coders: List[dict]) -> List[str]: + """Return human readable method names for specified coders""" + methods_name_map = { + CompressionMethod.LZMA2: "LZMA2", + CompressionMethod.LZMA: "LZMA", + CompressionMethod.DELTA: "delta", + CompressionMethod.P7Z_BCJ: "BCJ", + CompressionMethod.BCJ_ARM: "BCJ(ARM)", + CompressionMethod.BCJ_ARMT: "BCJ(ARMT)", + CompressionMethod.BCJ_IA64: "BCJ(IA64)", + CompressionMethod.BCJ_PPC: "BCJ(POWERPC)", + CompressionMethod.BCJ_SPARC: "BCJ(SPARC)", + CompressionMethod.CRYPT_AES256_SHA256: "7zAES", + } + methods_names = [] # type: List[str] + for coder in coders: + try: + methods_names.append(methods_name_map[coder['method']]) + except KeyError: + raise UnsupportedCompressionMethodError("Unknown method {}".format(coder['method'])) + return methods_names |