aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorjosdion <[email protected]>2020-05-26 19:52:22 +0300
committerjosdion <[email protected]>2020-05-26 19:52:22 +0300
commit8a981553bfaae0bf0197bf45671bfa4bee7fe0f5 (patch)
tree3ee601da9c3f38db4439d4514019b453f2cde3cc
parent99a98a564afbe8c1009cb792a892404a1b2acb8e (diff)
downloadbazarr-8a981553bfaae0bf0197bf45671bfa4bee7fe0f5.tar.gz
bazarr-8a981553bfaae0bf0197bf45671bfa4bee7fe0f5.zip
add py7zr library version 0.7.0
-rw-r--r--libs/py7zr/__init__.py29
-rw-r--r--libs/py7zr/archiveinfo.py1103
-rw-r--r--libs/py7zr/callbacks.py61
-rw-r--r--libs/py7zr/compression.py384
-rw-r--r--libs/py7zr/exceptions.py42
-rw-r--r--libs/py7zr/extra.py122
-rw-r--r--libs/py7zr/helpers.py362
-rw-r--r--libs/py7zr/properties.py155
-rw-r--r--libs/py7zr/py7zr.py974
-rw-r--r--libs/py7zr/win32compat.py174
10 files changed, 3406 insertions, 0 deletions
diff --git a/libs/py7zr/__init__.py b/libs/py7zr/__init__.py
new file mode 100644
index 000000000..b01a37e57
--- /dev/null
+++ b/libs/py7zr/__init__.py
@@ -0,0 +1,29 @@
+#!/usr/bin/env python
+#
+# Pure python p7zr implementation
+# Copyright (C) 2019 Hiroshi Miura
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+from py7zr.exceptions import Bad7zFile, DecompressionError, UnsupportedCompressionMethodError
+from py7zr.py7zr import ArchiveInfo, FileInfo, SevenZipFile, is_7zfile, pack_7zarchive, unpack_7zarchive
+
+__copyright__ = 'Copyright (C) 2019 Hiroshi Miura'
+__version__ = "0.7.0"
+
+__all__ = ['__version__', 'ArchiveInfo', 'FileInfo', 'SevenZipFile', 'is_7zfile',
+ 'UnsupportedCompressionMethodError', 'Bad7zFile', 'DecompressionError',
+ 'pack_7zarchive', 'unpack_7zarchive']
+
diff --git a/libs/py7zr/archiveinfo.py b/libs/py7zr/archiveinfo.py
new file mode 100644
index 000000000..cbd42381d
--- /dev/null
+++ b/libs/py7zr/archiveinfo.py
@@ -0,0 +1,1103 @@
+#!/usr/bin/python -u
+#
+# p7zr library
+#
+# Copyright (c) 2019,2020 Hiroshi Miura <[email protected]>
+# Copyright (c) 2004-2015 by Joachim Bauch, [email protected]
+# 7-Zip Copyright (C) 1999-2010 Igor Pavlov
+# LZMA SDK Copyright (C) 1999-2010 Igor Pavlov
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+
+import functools
+import io
+import os
+import struct
+from binascii import unhexlify
+from functools import reduce
+from io import BytesIO
+from operator import and_, or_
+from struct import pack, unpack
+from typing import Any, BinaryIO, Dict, List, Optional, Tuple
+
+from py7zr.compression import SevenZipCompressor, SevenZipDecompressor
+from py7zr.exceptions import Bad7zFile, UnsupportedCompressionMethodError
+from py7zr.helpers import ArchiveTimestamp, calculate_crc32
+from py7zr.properties import MAGIC_7Z, CompressionMethod, Property
+
+MAX_LENGTH = 65536
+P7ZIP_MAJOR_VERSION = b'\x00'
+P7ZIP_MINOR_VERSION = b'\x04'
+
+
+def read_crcs(file: BinaryIO, count: int) -> List[int]:
+ data = file.read(4 * count)
+ return [unpack('<L', data[i * 4:i * 4 + 4])[0] for i in range(count)]
+
+
+def write_crcs(file: BinaryIO, crcs):
+ for crc in crcs:
+ write_uint32(file, crc)
+
+
+def read_bytes(file: BinaryIO, length: int) -> Tuple[bytes, ...]:
+ return unpack(b'B' * length, file.read(length))
+
+
+def read_byte(file: BinaryIO) -> int:
+ return ord(file.read(1))
+
+
+def write_bytes(file: BinaryIO, data: bytes):
+ return file.write(data)
+
+
+def write_byte(file: BinaryIO, data):
+ assert len(data) == 1
+ return write_bytes(file, data)
+
+
+def read_real_uint64(file: BinaryIO) -> Tuple[int, bytes]:
+ """read 8 bytes, return unpacked value as a little endian unsigned long long, and raw data."""
+ res = file.read(8)
+ a = unpack('<Q', res)[0]
+ return a, res
+
+
+def read_uint32(file: BinaryIO) -> Tuple[int, bytes]:
+ """read 4 bytes, return unpacked value as a little endian unsigned long, and raw data."""
+ res = file.read(4)
+ a = unpack('<L', res)[0]
+ return a, res
+
+
+def write_uint32(file: BinaryIO, value):
+ """write uint32 value in 4 bytes."""
+ b = pack('<L', value)
+ file.write(b)
+
+
+def read_uint64(file: BinaryIO) -> int:
+ """read UINT64, definition show in write_uint64()"""
+ b = ord(file.read(1))
+ if b == 255:
+ return read_real_uint64(file)[0]
+ blen = [(0b01111111, 0), (0b10111111, 1), (0b11011111, 2), (0b11101111, 3),
+ (0b11110111, 4), (0b11111011, 5), (0b11111101, 6), (0b11111110, 7)]
+ mask = 0x80
+ vlen = 8
+ for v, l in blen:
+ if b <= v:
+ vlen = l
+ break
+ mask >>= 1
+ if vlen == 0:
+ return b & (mask - 1)
+ val = file.read(vlen)
+ value = int.from_bytes(val, byteorder='little')
+ highpart = b & (mask - 1)
+ return value + (highpart << (vlen * 8))
+
+
+def write_real_uint64(file: BinaryIO, value: int):
+ """write 8 bytes, as an unsigned long long."""
+ file.write(pack('<Q', value))
+
+
+def write_uint64(file: BinaryIO, value: int):
+ """
+ UINT64 means real UINT64 encoded with the following scheme:
+
+ | Size of encoding sequence depends from first byte:
+ | First_Byte Extra_Bytes Value
+ | (binary)
+ | 0xxxxxxx : ( xxxxxxx )
+ | 10xxxxxx BYTE y[1] : ( xxxxxx << (8 * 1)) + y
+ | 110xxxxx BYTE y[2] : ( xxxxx << (8 * 2)) + y
+ | ...
+ | 1111110x BYTE y[6] : ( x << (8 * 6)) + y
+ | 11111110 BYTE y[7] : y
+ | 11111111 BYTE y[8] : y
+ """
+ if value < 0x80:
+ file.write(pack('B', value))
+ return
+ if value > 0x01ffffffffffffff:
+ file.write(b'\xff')
+ file.write(value.to_bytes(8, 'little'))
+ return
+ byte_length = (value.bit_length() + 7) // 8
+ ba = bytearray(value.to_bytes(byte_length, 'little'))
+ high_byte = int(ba[-1])
+ if high_byte < 2 << (8 - byte_length - 1):
+ for x in range(byte_length - 1):
+ high_byte |= 0x80 >> x
+ file.write(pack('B', high_byte))
+ file.write(ba[:byte_length - 1])
+ else:
+ mask = 0x80
+ for x in range(byte_length):
+ mask |= 0x80 >> x
+ file.write(pack('B', mask))
+ file.write(ba)
+
+
+def read_boolean(file: BinaryIO, count: int, checkall: bool = False) -> List[bool]:
+ if checkall:
+ all_defined = file.read(1)
+ if all_defined != unhexlify('00'):
+ return [True] * count
+ result = []
+ b = 0
+ mask = 0
+ for i in range(count):
+ if mask == 0:
+ b = ord(file.read(1))
+ mask = 0x80
+ result.append(b & mask != 0)
+ mask >>= 1
+ return result
+
+
+def write_boolean(file: BinaryIO, booleans: List[bool], all_defined: bool = False):
+ if all_defined and reduce(and_, booleans, True):
+ file.write(b'\x01')
+ return
+ elif all_defined:
+ file.write(b'\x00')
+ o = bytearray(-(-len(booleans) // 8))
+ for i, b in enumerate(booleans):
+ if b:
+ o[i // 8] |= 1 << (7 - i % 8)
+ file.write(o)
+
+
+def read_utf16(file: BinaryIO) -> str:
+ """read a utf-16 string from file"""
+ val = ''
+ for _ in range(MAX_LENGTH):
+ ch = file.read(2)
+ if ch == unhexlify('0000'):
+ break
+ val += ch.decode('utf-16LE')
+ return val
+
+
+def write_utf16(file: BinaryIO, val: str):
+ """write a utf-16 string to file"""
+ for c in val:
+ file.write(c.encode('utf-16LE'))
+ file.write(b'\x00\x00')
+
+
+def bits_to_bytes(bit_length: int) -> int:
+ return - (-bit_length // 8)
+
+
+class ArchiveProperties:
+
+ __slots__ = ['property_data']
+
+ def __init__(self):
+ self.property_data = []
+
+ @classmethod
+ def retrieve(cls, file):
+ return cls()._read(file)
+
+ def _read(self, file):
+ pid = file.read(1)
+ if pid == Property.ARCHIVE_PROPERTIES:
+ while True:
+ ptype = file.read(1)
+ if ptype == Property.END:
+ break
+ size = read_uint64(file)
+ props = read_bytes(file, size)
+ self.property_data.append(props)
+ return self
+
+ def write(self, file):
+ if len(self.property_data) > 0:
+ write_byte(file, Property.ARCHIVE_PROPERTIES)
+ for data in self.property_data:
+ write_uint64(file, len(data))
+ write_bytes(file, data)
+ write_byte(file, Property.END)
+
+
+class PackInfo:
+ """ information about packed streams """
+
+ __slots__ = ['packpos', 'numstreams', 'packsizes', 'packpositions', 'crcs']
+
+ def __init__(self) -> None:
+ self.packpos = 0 # type: int
+ self.numstreams = 0 # type: int
+ self.packsizes = [] # type: List[int]
+ self.crcs = None # type: Optional[List[int]]
+
+ @classmethod
+ def retrieve(cls, file: BinaryIO):
+ return cls()._read(file)
+
+ def _read(self, file: BinaryIO):
+ self.packpos = read_uint64(file)
+ self.numstreams = read_uint64(file)
+ pid = file.read(1)
+ if pid == Property.SIZE:
+ self.packsizes = [read_uint64(file) for _ in range(self.numstreams)]
+ pid = file.read(1)
+ if pid == Property.CRC:
+ self.crcs = [read_uint64(file) for _ in range(self.numstreams)]
+ pid = file.read(1)
+ if pid != Property.END:
+ raise Bad7zFile('end id expected but %s found' % repr(pid))
+ self.packpositions = [sum(self.packsizes[:i]) for i in range(self.numstreams + 1)] # type: List[int]
+ return self
+
+ def write(self, file: BinaryIO):
+ assert self.packpos is not None
+ numstreams = len(self.packsizes)
+ assert self.crcs is None or len(self.crcs) == numstreams
+ write_byte(file, Property.PACK_INFO)
+ write_uint64(file, self.packpos)
+ write_uint64(file, numstreams)
+ write_byte(file, Property.SIZE)
+ for size in self.packsizes:
+ write_uint64(file, size)
+ if self.crcs is not None:
+ write_bytes(file, Property.CRC)
+ for crc in self.crcs:
+ write_uint64(file, crc)
+ write_byte(file, Property.END)
+
+
+class Folder:
+ """ a "Folder" represents a stream of compressed data.
+ coders: list of coder
+ num_coders: length of coders
+ coder: hash list
+ keys of coders: method, numinstreams, numoutstreams, properties
+ unpacksizes: uncompressed sizes of outstreams
+ """
+
+ __slots__ = ['unpacksizes', 'solid', 'coders', 'digestdefined', 'totalin', 'totalout',
+ 'bindpairs', 'packed_indices', 'crc', 'decompressor', 'compressor', 'files']
+
+ def __init__(self) -> None:
+ self.unpacksizes = None # type: Optional[List[int]]
+ self.coders = [] # type: List[Dict[str, Any]]
+ self.bindpairs = [] # type: List[Any]
+ self.packed_indices = [] # type: List[int]
+ # calculated values
+ self.totalin = 0 # type: int
+ self.totalout = 0 # type: int
+ # internal values
+ self.solid = False # type: bool
+ self.digestdefined = False # type: bool
+ self.crc = None # type: Optional[int]
+ # compress/decompress objects
+ self.decompressor = None # type: Optional[SevenZipDecompressor]
+ self.compressor = None # type: Optional[SevenZipCompressor]
+ self.files = None
+
+ @classmethod
+ def retrieve(cls, file: BinaryIO):
+ obj = cls()
+ obj._read(file)
+ return obj
+
+ def _read(self, file: BinaryIO) -> None:
+ num_coders = read_uint64(file)
+ for _ in range(num_coders):
+ b = read_byte(file)
+ methodsize = b & 0xf
+ iscomplex = b & 0x10 == 0x10
+ hasattributes = b & 0x20 == 0x20
+ c = {'method': file.read(methodsize)} # type: Dict[str, Any]
+ if iscomplex:
+ c['numinstreams'] = read_uint64(file)
+ c['numoutstreams'] = read_uint64(file)
+ else:
+ c['numinstreams'] = 1
+ c['numoutstreams'] = 1
+ self.totalin += c['numinstreams']
+ self.totalout += c['numoutstreams']
+ if hasattributes:
+ proplen = read_uint64(file)
+ c['properties'] = file.read(proplen)
+ self.coders.append(c)
+ num_bindpairs = self.totalout - 1
+ for i in range(num_bindpairs):
+ self.bindpairs.append((read_uint64(file), read_uint64(file),))
+ num_packedstreams = self.totalin - num_bindpairs
+ if num_packedstreams == 1:
+ for i in range(self.totalin):
+ if self._find_in_bin_pair(i) < 0: # there is no in_bin_pair
+ self.packed_indices.append(i)
+ elif num_packedstreams > 1:
+ for i in range(num_packedstreams):
+ self.packed_indices.append(read_uint64(file))
+
+ def write(self, file: BinaryIO):
+ num_coders = len(self.coders)
+ assert num_coders > 0
+ write_uint64(file, num_coders)
+ for i, c in enumerate(self.coders):
+ id = c['method'] # type: bytes
+ id_size = len(id) & 0x0f
+ iscomplex = 0x10 if not self.is_simple(c) else 0x00
+ hasattributes = 0x20 if c['properties'] is not None else 0x00
+ flag = struct.pack('B', id_size | iscomplex | hasattributes)
+ write_byte(file, flag)
+ write_bytes(file, id[:id_size])
+ if not self.is_simple(c):
+ write_uint64(file, c['numinstreams'])
+ assert c['numoutstreams'] == 1
+ write_uint64(file, c['numoutstreams'])
+ if c['properties'] is not None:
+ write_uint64(file, len(c['properties']))
+ write_bytes(file, c['properties'])
+ num_bindpairs = self.totalout - 1
+ assert len(self.bindpairs) == num_bindpairs
+ num_packedstreams = self.totalin - num_bindpairs
+ for bp in self.bindpairs:
+ write_uint64(file, bp[0])
+ write_uint64(file, bp[1])
+ if num_packedstreams > 1:
+ for pi in self.packed_indices:
+ write_uint64(file, pi)
+
+ def is_simple(self, coder):
+ return coder['numinstreams'] == 1 and coder['numoutstreams'] == 1
+
+ def get_decompressor(self, size: int, reset: bool = False) -> SevenZipDecompressor:
+ if self.decompressor is not None and not reset:
+ return self.decompressor
+ else:
+ try:
+ self.decompressor = SevenZipDecompressor(self.coders, size, self.crc)
+ except Exception as e:
+ raise e
+ if self.decompressor is not None:
+ return self.decompressor
+ else:
+ raise
+
+ def get_compressor(self) -> SevenZipCompressor:
+ if self.compressor is not None:
+ return self.compressor
+ else:
+ try:
+ # FIXME: set filters
+ self.compressor = SevenZipCompressor()
+ self.coders = self.compressor.coders
+ return self.compressor
+ except Exception as e:
+ raise e
+
+ def get_unpack_size(self) -> int:
+ if self.unpacksizes is None:
+ return 0
+ for i in range(len(self.unpacksizes) - 1, -1, -1):
+ if self._find_out_bin_pair(i):
+ return self.unpacksizes[i]
+ raise TypeError('not found')
+
+ def _find_in_bin_pair(self, index: int) -> int:
+ for idx, (a, b) in enumerate(self.bindpairs):
+ if a == index:
+ return idx
+ return -1
+
+ def _find_out_bin_pair(self, index: int) -> int:
+ for idx, (a, b) in enumerate(self.bindpairs):
+ if b == index:
+ return idx
+ return -1
+
+ def is_encrypted(self) -> bool:
+ return CompressionMethod.CRYPT_AES256_SHA256 in [x['method'] for x in self.coders]
+
+
+class UnpackInfo:
+ """ combines multiple folders """
+
+ __slots__ = ['numfolders', 'folders', 'datastreamidx']
+
+ @classmethod
+ def retrieve(cls, file: BinaryIO):
+ obj = cls()
+ obj._read(file)
+ return obj
+
+ def __init__(self):
+ self.numfolders = None
+ self.folders = []
+ self.datastreamidx = None
+
+ def _read(self, file: BinaryIO):
+ pid = file.read(1)
+ if pid != Property.FOLDER:
+ raise Bad7zFile('folder id expected but %s found' % repr(pid))
+ self.numfolders = read_uint64(file)
+ self.folders = []
+ external = read_byte(file)
+ if external == 0x00:
+ self.folders = [Folder.retrieve(file) for _ in range(self.numfolders)]
+ else:
+ datastreamidx = read_uint64(file)
+ current_pos = file.tell()
+ file.seek(datastreamidx, 0)
+ self.folders = [Folder.retrieve(file) for _ in range(self.numfolders)]
+ file.seek(current_pos, 0)
+ self._retrieve_coders_info(file)
+
+ def _retrieve_coders_info(self, file: BinaryIO):
+ pid = file.read(1)
+ if pid != Property.CODERS_UNPACK_SIZE:
+ raise Bad7zFile('coders unpack size id expected but %s found' % repr(pid))
+ for folder in self.folders:
+ folder.unpacksizes = [read_uint64(file) for _ in range(folder.totalout)]
+ pid = file.read(1)
+ if pid == Property.CRC:
+ defined = read_boolean(file, self.numfolders, checkall=True)
+ crcs = read_crcs(file, self.numfolders)
+ for idx, folder in enumerate(self.folders):
+ folder.digestdefined = defined[idx]
+ folder.crc = crcs[idx]
+ pid = file.read(1)
+ if pid != Property.END:
+ raise Bad7zFile('end id expected but %s found at %d' % (repr(pid), file.tell()))
+
+ def write(self, file: BinaryIO):
+ assert self.numfolders is not None
+ assert self.folders is not None
+ assert self.numfolders == len(self.folders)
+ file.write(Property.UNPACK_INFO)
+ file.write(Property.FOLDER)
+ write_uint64(file, self.numfolders)
+ write_byte(file, b'\x00')
+ for folder in self.folders:
+ folder.write(file)
+ # If support external entity, we may write
+ # self.datastreamidx here.
+ # folder data will be written in another place.
+ # write_byte(file, b'\x01')
+ # assert self.datastreamidx is not None
+ # write_uint64(file, self.datastreamidx)
+ write_byte(file, Property.CODERS_UNPACK_SIZE)
+ for folder in self.folders:
+ for i in range(folder.totalout):
+ write_uint64(file, folder.unpacksizes[i])
+ write_byte(file, Property.END)
+
+
+class SubstreamsInfo:
+ """ defines the substreams of a folder """
+
+ __slots__ = ['digests', 'digestsdefined', 'unpacksizes', 'num_unpackstreams_folders']
+
+ def __init__(self):
+ self.digests = [] # type: List[int]
+ self.digestsdefined = [] # type: List[bool]
+ self.unpacksizes = None # type: Optional[List[int]]
+ self.num_unpackstreams_folders = [] # type: List[int]
+
+ @classmethod
+ def retrieve(cls, file: BinaryIO, numfolders: int, folders: List[Folder]):
+ obj = cls()
+ obj._read(file, numfolders, folders)
+ return obj
+
+ def _read(self, file: BinaryIO, numfolders: int, folders: List[Folder]):
+ pid = file.read(1)
+ if pid == Property.NUM_UNPACK_STREAM:
+ self.num_unpackstreams_folders = [read_uint64(file) for _ in range(numfolders)]
+ pid = file.read(1)
+ else:
+ self.num_unpackstreams_folders = [1] * numfolders
+ if pid == Property.SIZE:
+ self.unpacksizes = []
+ for i in range(len(self.num_unpackstreams_folders)):
+ totalsize = 0 # type: int
+ for j in range(1, self.num_unpackstreams_folders[i]):
+ size = read_uint64(file)
+ self.unpacksizes.append(size)
+ totalsize += size
+ self.unpacksizes.append(folders[i].get_unpack_size() - totalsize)
+ pid = file.read(1)
+ num_digests = 0
+ num_digests_total = 0
+ for i in range(numfolders):
+ numsubstreams = self.num_unpackstreams_folders[i]
+ if numsubstreams != 1 or not folders[i].digestdefined:
+ num_digests += numsubstreams
+ num_digests_total += numsubstreams
+ if pid == Property.CRC:
+ defined = read_boolean(file, num_digests, checkall=True)
+ crcs = read_crcs(file, num_digests)
+ didx = 0
+ for i in range(numfolders):
+ folder = folders[i]
+ numsubstreams = self.num_unpackstreams_folders[i]
+ if numsubstreams == 1 and folder.digestdefined and folder.crc is not None:
+ self.digestsdefined.append(True)
+ self.digests.append(folder.crc)
+ else:
+ for j in range(numsubstreams):
+ self.digestsdefined.append(defined[didx])
+ self.digests.append(crcs[didx])
+ didx += 1
+ pid = file.read(1)
+ if pid != Property.END:
+ raise Bad7zFile('end id expected but %r found' % pid)
+ if not self.digestsdefined:
+ self.digestsdefined = [False] * num_digests_total
+ self.digests = [0] * num_digests_total
+
+ def write(self, file: BinaryIO, numfolders: int):
+ assert self.num_unpackstreams_folders is not None
+ if len(self.num_unpackstreams_folders) == 0:
+ # nothing to write
+ return
+ if self.unpacksizes is None:
+ raise ValueError
+ write_byte(file, Property.SUBSTREAMS_INFO)
+ if not functools.reduce(lambda x, y: x and (y == 1), self.num_unpackstreams_folders, True):
+ write_byte(file, Property.NUM_UNPACK_STREAM)
+ for n in self.num_unpackstreams_folders:
+ write_uint64(file, n)
+ write_byte(file, Property.SIZE)
+ idx = 0
+ for i in range(numfolders):
+ for j in range(1, self.num_unpackstreams_folders[i]):
+ size = self.unpacksizes[idx]
+ write_uint64(file, size)
+ idx += 1
+ idx += 1
+ if functools.reduce(lambda x, y: x or y, self.digestsdefined, False):
+ write_byte(file, Property.CRC)
+ write_boolean(file, self.digestsdefined, all_defined=True)
+ write_crcs(file, self.digests)
+ write_byte(file, Property.END)
+
+
+class StreamsInfo:
+ """ information about compressed streams """
+
+ __slots__ = ['packinfo', 'unpackinfo', 'substreamsinfo']
+
+ def __init__(self):
+ self.packinfo = None # type: PackInfo
+ self.unpackinfo = None # type: UnpackInfo
+ self.substreamsinfo = None # type: Optional[SubstreamsInfo]
+
+ @classmethod
+ def retrieve(cls, file: BinaryIO):
+ obj = cls()
+ obj.read(file)
+ return obj
+
+ def read(self, file: BinaryIO) -> None:
+ pid = file.read(1)
+ if pid == Property.PACK_INFO:
+ self.packinfo = PackInfo.retrieve(file)
+ pid = file.read(1)
+ if pid == Property.UNPACK_INFO:
+ self.unpackinfo = UnpackInfo.retrieve(file)
+ pid = file.read(1)
+ if pid == Property.SUBSTREAMS_INFO:
+ self.substreamsinfo = SubstreamsInfo.retrieve(file, self.unpackinfo.numfolders, self.unpackinfo.folders)
+ pid = file.read(1)
+ if pid != Property.END:
+ raise Bad7zFile('end id expected but %s found' % repr(pid))
+
+ def write(self, file: BinaryIO):
+ write_byte(file, Property.MAIN_STREAMS_INFO)
+ self._write(file)
+
+ def _write(self, file: BinaryIO):
+ if self.packinfo is not None:
+ self.packinfo.write(file)
+ if self.unpackinfo is not None:
+ self.unpackinfo.write(file)
+ if self.substreamsinfo is not None:
+ self.substreamsinfo.write(file, self.unpackinfo.numfolders)
+ write_byte(file, Property.END)
+
+
+class HeaderStreamsInfo(StreamsInfo):
+
+ def __init__(self):
+ super().__init__()
+ self.packinfo = PackInfo()
+ self.unpackinfo = UnpackInfo()
+ folder = Folder()
+ folder.compressor = SevenZipCompressor()
+ folder.coders = folder.compressor.coders
+ folder.solid = False
+ folder.digestdefined = False
+ folder.bindpairs = []
+ folder.totalin = 1
+ folder.totalout = 1
+ folder.digestdefined = [True]
+ self.unpackinfo.numfolders = 1
+ self.unpackinfo.folders = [folder]
+
+ def write(self, file: BinaryIO):
+ self._write(file)
+
+
+class FilesInfo:
+ """ holds file properties """
+
+ __slots__ = ['files', 'emptyfiles', 'antifiles']
+
+ def __init__(self):
+ self.files = [] # type: List[Dict[str, Any]]
+ self.emptyfiles = [] # type: List[bool]
+ self.antifiles = None
+
+ @classmethod
+ def retrieve(cls, file: BinaryIO):
+ obj = cls()
+ obj._read(file)
+ return obj
+
+ def _read(self, fp: BinaryIO):
+ numfiles = read_uint64(fp)
+ self.files = [{'emptystream': False} for _ in range(numfiles)]
+ numemptystreams = 0
+ while True:
+ prop = fp.read(1)
+ if prop == Property.END:
+ break
+ size = read_uint64(fp)
+ if prop == Property.DUMMY:
+ # Added by newer versions of 7z to adjust padding.
+ fp.seek(size, os.SEEK_CUR)
+ continue
+ buffer = io.BytesIO(fp.read(size))
+ if prop == Property.EMPTY_STREAM:
+ isempty = read_boolean(buffer, numfiles, checkall=False)
+ list(map(lambda x, y: x.update({'emptystream': y}), self.files, isempty)) # type: ignore
+ numemptystreams += isempty.count(True)
+ elif prop == Property.EMPTY_FILE:
+ self.emptyfiles = read_boolean(buffer, numemptystreams, checkall=False)
+ elif prop == Property.ANTI:
+ self.antifiles = read_boolean(buffer, numemptystreams, checkall=False)
+ elif prop == Property.NAME:
+ external = buffer.read(1)
+ if external == b'\x00':
+ self._read_name(buffer)
+ else:
+ dataindex = read_uint64(buffer)
+ current_pos = fp.tell()
+ fp.seek(dataindex, 0)
+ self._read_name(fp)
+ fp.seek(current_pos, 0)
+ elif prop == Property.CREATION_TIME:
+ self._read_times(buffer, 'creationtime')
+ elif prop == Property.LAST_ACCESS_TIME:
+ self._read_times(buffer, 'lastaccesstime')
+ elif prop == Property.LAST_WRITE_TIME:
+ self._read_times(buffer, 'lastwritetime')
+ elif prop == Property.ATTRIBUTES:
+ defined = read_boolean(buffer, numfiles, checkall=True)
+ external = buffer.read(1)
+ if external == b'\x00':
+ self._read_attributes(buffer, defined)
+ else:
+ dataindex = read_uint64(buffer)
+ # try to read external data
+ current_pos = fp.tell()
+ fp.seek(dataindex, 0)
+ self._read_attributes(fp, defined)
+ fp.seek(current_pos, 0)
+ elif prop == Property.START_POS:
+ self._read_start_pos(buffer)
+ else:
+ raise Bad7zFile('invalid type %r' % prop)
+
+ def _read_name(self, buffer: BinaryIO) -> None:
+ for f in self.files:
+ f['filename'] = read_utf16(buffer).replace('\\', '/')
+
+ def _read_attributes(self, buffer: BinaryIO, defined: List[bool]) -> None:
+ for idx, f in enumerate(self.files):
+ f['attributes'] = read_uint32(buffer)[0] if defined[idx] else None
+
+ def _read_times(self, fp: BinaryIO, name: str) -> None:
+ defined = read_boolean(fp, len(self.files), checkall=True)
+ # NOTE: the "external" flag is currently ignored, should be 0x00
+ external = fp.read(1)
+ assert external == b'\x00'
+ for i, f in enumerate(self.files):
+ f[name] = ArchiveTimestamp(read_real_uint64(fp)[0]) if defined[i] else None
+
+ def _read_start_pos(self, fp: BinaryIO) -> None:
+ defined = read_boolean(fp, len(self.files), checkall=True)
+ # NOTE: the "external" flag is currently ignored, should be 0x00
+ external = fp.read(1)
+ assert external == 0x00
+ for i, f in enumerate(self.files):
+ f['startpos'] = read_real_uint64(fp)[0] if defined[i] else None
+
+ def _write_times(self, fp: BinaryIO, propid, name: str) -> None:
+ write_byte(fp, propid)
+ defined = [] # type: List[bool]
+ num_defined = 0 # type: int
+ for f in self.files:
+ if name in f.keys():
+ if f[name] is not None:
+ defined.append(True)
+ num_defined += 1
+ size = num_defined * 8 + 2
+ if not reduce(and_, defined, True):
+ size += bits_to_bytes(num_defined)
+ write_uint64(fp, size)
+ write_boolean(fp, defined, all_defined=True)
+ write_byte(fp, b'\x00')
+ for i, file in enumerate(self.files):
+ if defined[i]:
+ write_real_uint64(fp, ArchiveTimestamp.from_datetime(file[name]))
+ else:
+ pass
+
+ def _write_prop_bool_vector(self, fp: BinaryIO, propid, vector) -> None:
+ write_byte(fp, propid)
+ write_boolean(fp, vector, all_defined=True)
+
+ @staticmethod
+ def _are_there(vector) -> bool:
+ if vector is not None:
+ if functools.reduce(or_, vector, False):
+ return True
+ return False
+
+ def _write_names(self, file: BinaryIO):
+ name_defined = 0
+ names = []
+ name_size = 0
+ for f in self.files:
+ if f.get('filename', None) is not None:
+ name_defined += 1
+ names.append(f['filename'])
+ name_size += len(f['filename'].encode('utf-16LE')) + 2 # len(str + NULL_WORD)
+ if name_defined > 0:
+ write_byte(file, Property.NAME)
+ write_uint64(file, name_size + 1)
+ write_byte(file, b'\x00')
+ for n in names:
+ write_utf16(file, n)
+
+ def _write_attributes(self, file):
+ defined = [] # type: List[bool]
+ num_defined = 0
+ for f in self.files:
+ if 'attributes' in f.keys() and f['attributes'] is not None:
+ defined.append(True)
+ num_defined += 1
+ else:
+ defined.append(False)
+ size = num_defined * 4 + 2
+ if num_defined != len(defined):
+ size += bits_to_bytes(num_defined)
+ write_byte(file, Property.ATTRIBUTES)
+ write_uint64(file, size)
+ write_boolean(file, defined, all_defined=True)
+ write_byte(file, b'\x00')
+ for i, f in enumerate(self.files):
+ if defined[i]:
+ write_uint32(file, f['attributes'])
+
+ def write(self, file: BinaryIO):
+ assert self.files is not None
+ write_byte(file, Property.FILES_INFO)
+ numfiles = len(self.files)
+ write_uint64(file, numfiles)
+ emptystreams = [] # List[bool]
+ for f in self.files:
+ emptystreams.append(f['emptystream'])
+ if self._are_there(emptystreams):
+ write_byte(file, Property.EMPTY_STREAM)
+ write_uint64(file, bits_to_bytes(numfiles))
+ write_boolean(file, emptystreams, all_defined=False)
+ else:
+ if self._are_there(self.emptyfiles):
+ self._write_prop_bool_vector(file, Property.EMPTY_FILE, self.emptyfiles)
+ if self._are_there(self.antifiles):
+ self._write_prop_bool_vector(file, Property.ANTI, self.antifiles)
+ # Name
+ self._write_names(file)
+ # timestamps
+ self._write_times(file, Property.CREATION_TIME, 'creationtime')
+ self._write_times(file, Property.LAST_ACCESS_TIME, 'lastaccesstime')
+ self._write_times(file, Property.LAST_WRITE_TIME, 'lastwritetime')
+ # start_pos
+ # FIXME: TBD
+ # attribute
+ self._write_attributes(file)
+ write_byte(file, Property.END)
+
+
+class Header:
+ """ the archive header """
+
+ __slot__ = ['solid', 'properties', 'additional_streams', 'main_streams', 'files_info',
+ 'size', '_start_pos']
+
+ def __init__(self) -> None:
+ self.solid = False
+ self.properties = None
+ self.additional_streams = None
+ self.main_streams = None
+ self.files_info = None
+ self.size = 0 # fixme. Not implemented yet
+ self._start_pos = 0
+
+ @classmethod
+ def retrieve(cls, fp: BinaryIO, buffer: BytesIO, start_pos: int):
+ obj = cls()
+ obj._read(fp, buffer, start_pos)
+ return obj
+
+ def _read(self, fp: BinaryIO, buffer: BytesIO, start_pos: int) -> None:
+ self._start_pos = start_pos
+ fp.seek(self._start_pos)
+ self._decode_header(fp, buffer)
+
+ def _decode_header(self, fp: BinaryIO, buffer: BytesIO) -> None:
+ """
+ Decode header data or encoded header data from buffer.
+ When buffer consist of encoded buffer, it get stream data
+ from it and call itself recursively
+ """
+ pid = buffer.read(1)
+ if not pid:
+ # empty archive
+ return
+ elif pid == Property.HEADER:
+ self._extract_header_info(buffer)
+ return
+ elif pid != Property.ENCODED_HEADER:
+ raise TypeError('Unknown field: %r' % id)
+ # get from encoded header
+ streams = HeaderStreamsInfo.retrieve(buffer)
+ self._decode_header(fp, self._get_headerdata_from_streams(fp, streams))
+
+ def _get_headerdata_from_streams(self, fp: BinaryIO, streams: StreamsInfo) -> BytesIO:
+ """get header data from given streams.unpackinfo and packinfo.
+ folder data are stored in raw data positioned in afterheader."""
+ buffer = io.BytesIO()
+ src_start = self._start_pos
+ for folder in streams.unpackinfo.folders:
+ if folder.is_encrypted():
+ raise UnsupportedCompressionMethodError()
+
+ uncompressed = folder.unpacksizes
+ if not isinstance(uncompressed, (list, tuple)):
+ uncompressed = [uncompressed] * len(folder.coders)
+ compressed_size = streams.packinfo.packsizes[0]
+ uncompressed_size = uncompressed[-1]
+
+ src_start += streams.packinfo.packpos
+ fp.seek(src_start, 0)
+ decompressor = folder.get_decompressor(compressed_size)
+ folder_data = decompressor.decompress(fp.read(compressed_size))[:uncompressed_size]
+ src_start += uncompressed_size
+ if folder.digestdefined:
+ if folder.crc != calculate_crc32(folder_data):
+ raise Bad7zFile('invalid block data')
+ buffer.write(folder_data)
+ buffer.seek(0, 0)
+ return buffer
+
+ def _encode_header(self, file: BinaryIO, afterheader: int):
+ startpos = file.tell()
+ packpos = startpos - afterheader
+ buf = io.BytesIO()
+ _, raw_header_len, raw_crc = self.write(buf, 0, False)
+ streams = HeaderStreamsInfo()
+ streams.packinfo.packpos = packpos
+ folder = streams.unpackinfo.folders[0]
+ folder.crc = [raw_crc]
+ folder.unpacksizes = [raw_header_len]
+ compressed_len = 0
+ buf.seek(0, 0)
+ data = buf.read(io.DEFAULT_BUFFER_SIZE)
+ while data:
+ out = folder.compressor.compress(data)
+ compressed_len += len(out)
+ file.write(out)
+ data = buf.read(io.DEFAULT_BUFFER_SIZE)
+ out = folder.compressor.flush()
+ compressed_len += len(out)
+ file.write(out)
+ #
+ streams.packinfo.packsizes = [compressed_len]
+ # actual header start position
+ startpos = file.tell()
+ write_byte(file, Property.ENCODED_HEADER)
+ streams.write(file)
+ write_byte(file, Property.END)
+ return startpos
+
+ def write(self, file: BinaryIO, afterheader: int, encoded: bool = True):
+ startpos = file.tell()
+ if encoded:
+ startpos = self._encode_header(file, afterheader)
+ else:
+ write_byte(file, Property.HEADER)
+ # Archive properties
+ if self.main_streams is not None:
+ self.main_streams.write(file)
+ # Files Info
+ if self.files_info is not None:
+ self.files_info.write(file)
+ if self.properties is not None:
+ self.properties.write(file)
+ # AdditionalStreams
+ if self.additional_streams is not None:
+ self.additional_streams.write(file)
+ write_byte(file, Property.END)
+ endpos = file.tell()
+ header_len = endpos - startpos
+ file.seek(startpos, io.SEEK_SET)
+ crc = calculate_crc32(file.read(header_len))
+ file.seek(endpos, io.SEEK_SET)
+ return startpos, header_len, crc
+
+ def _extract_header_info(self, fp: BinaryIO) -> None:
+ pid = fp.read(1)
+ if pid == Property.ARCHIVE_PROPERTIES:
+ self.properties = ArchiveProperties.retrieve(fp)
+ pid = fp.read(1)
+ if pid == Property.ADDITIONAL_STREAMS_INFO:
+ self.additional_streams = StreamsInfo.retrieve(fp)
+ pid = fp.read(1)
+ if pid == Property.MAIN_STREAMS_INFO:
+ self.main_streams = StreamsInfo.retrieve(fp)
+ pid = fp.read(1)
+ if pid == Property.FILES_INFO:
+ self.files_info = FilesInfo.retrieve(fp)
+ pid = fp.read(1)
+ if pid != Property.END:
+ raise Bad7zFile('end id expected but %s found' % (repr(pid)))
+
+ @staticmethod
+ def build_header(folders):
+ header = Header()
+ header.files_info = FilesInfo()
+ header.main_streams = StreamsInfo()
+ header.main_streams.packinfo = PackInfo()
+ header.main_streams.packinfo.numstreams = 0
+ header.main_streams.packinfo.packpos = 0
+ header.main_streams.unpackinfo = UnpackInfo()
+ header.main_streams.unpackinfo.numfolders = len(folders)
+ header.main_streams.unpackinfo.folders = folders
+ header.main_streams.substreamsinfo = SubstreamsInfo()
+ header.main_streams.substreamsinfo.num_unpackstreams_folders = [len(folders)]
+ header.main_streams.substreamsinfo.unpacksizes = []
+ return header
+
+
+class SignatureHeader:
+ """The SignatureHeader class hold information of a signature header of archive."""
+
+ __slots__ = ['version', 'startheadercrc', 'nextheaderofs', 'nextheadersize', 'nextheadercrc']
+
+ def __init__(self) -> None:
+ self.version = (P7ZIP_MAJOR_VERSION, P7ZIP_MINOR_VERSION) # type: Tuple[bytes, ...]
+ self.startheadercrc = None # type: Optional[int]
+ self.nextheaderofs = None # type: Optional[int]
+ self.nextheadersize = None # type: Optional[int]
+ self.nextheadercrc = None # type: Optional[int]
+
+ @classmethod
+ def retrieve(cls, file: BinaryIO):
+ obj = cls()
+ obj._read(file)
+ return obj
+
+ def _read(self, file: BinaryIO) -> None:
+ file.seek(len(MAGIC_7Z), 0)
+ self.version = read_bytes(file, 2)
+ self.startheadercrc, _ = read_uint32(file)
+ self.nextheaderofs, data = read_real_uint64(file)
+ crc = calculate_crc32(data)
+ self.nextheadersize, data = read_real_uint64(file)
+ crc = calculate_crc32(data, crc)
+ self.nextheadercrc, data = read_uint32(file)
+ crc = calculate_crc32(data, crc)
+ if crc != self.startheadercrc:
+ raise Bad7zFile('invalid header data')
+
+ def calccrc(self, length: int, header_crc: int):
+ self.nextheadersize = length
+ self.nextheadercrc = header_crc
+ assert self.nextheaderofs is not None
+ buf = io.BytesIO()
+ write_real_uint64(buf, self.nextheaderofs)
+ write_real_uint64(buf, self.nextheadersize)
+ write_uint32(buf, self.nextheadercrc)
+ startdata = buf.getvalue()
+ self.startheadercrc = calculate_crc32(startdata)
+
+ def write(self, file: BinaryIO):
+ assert self.startheadercrc is not None
+ assert self.nextheadercrc is not None
+ assert self.nextheaderofs is not None
+ assert self.nextheadersize is not None
+ file.seek(0, 0)
+ write_bytes(file, MAGIC_7Z)
+ write_byte(file, self.version[0])
+ write_byte(file, self.version[1])
+ write_uint32(file, self.startheadercrc)
+ write_real_uint64(file, self.nextheaderofs)
+ write_real_uint64(file, self.nextheadersize)
+ write_uint32(file, self.nextheadercrc)
+
+ def _write_skelton(self, file: BinaryIO):
+ file.seek(0, 0)
+ write_bytes(file, MAGIC_7Z)
+ write_byte(file, self.version[0])
+ write_byte(file, self.version[1])
+ write_uint32(file, 1)
+ write_real_uint64(file, 2)
+ write_real_uint64(file, 3)
+ write_uint32(file, 4)
+
+
+class FinishHeader():
+ """Finish header for multi-volume 7z file."""
+
+ def __init__(self):
+ self.archive_start_offset = None # data offset from end of the finish header
+ self.additional_start_block_size = None # start signature & start header size
+ self.finish_header_size = 20 + 16
+
+ @classmethod
+ def retrieve(cls, file):
+ obj = cls()
+ obj._read(file)
+ return obj
+
+ def _read(self, file):
+ self.archive_start_offset = read_uint64(file)
+ self.additional_start_block_size = read_uint64(file)
diff --git a/libs/py7zr/callbacks.py b/libs/py7zr/callbacks.py
new file mode 100644
index 000000000..6b2c08383
--- /dev/null
+++ b/libs/py7zr/callbacks.py
@@ -0,0 +1,61 @@
+#!/usr/bin/python -u
+#
+# p7zr library
+#
+# Copyright (c) 2020 Hiroshi Miura <[email protected]>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+
+from abc import ABC, abstractmethod
+
+
+class Callback(ABC):
+ """Abstrat base class for progress callbacks."""
+
+ @abstractmethod
+ def report_start_preparation(self):
+ """report a start of preparation event such as making list of files and looking into its properties."""
+ pass
+
+ @abstractmethod
+ def report_start(self, processing_file_path, processing_bytes):
+ """report a start event of specified archive file and its input bytes."""
+ pass
+
+ @abstractmethod
+ def report_end(self, processing_file_path, wrote_bytes):
+ """report an end event of specified archive file and its output bytes."""
+ pass
+
+ @abstractmethod
+ def report_warning(self, message):
+ """report an warning event with its message"""
+ pass
+
+ @abstractmethod
+ def report_postprocess(self):
+ """report a start of post processing event such as set file properties and permissions or creating symlinks."""
+ pass
+
+
+class ExtractCallback(Callback):
+ """Abstrat base class for extraction progress callbacks."""
+ pass
+
+
+class ArchiveCallback(Callback):
+ """Abstrat base class for progress callbacks."""
+ pass
diff --git a/libs/py7zr/compression.py b/libs/py7zr/compression.py
new file mode 100644
index 000000000..25d5726ac
--- /dev/null
+++ b/libs/py7zr/compression.py
@@ -0,0 +1,384 @@
+#!/usr/bin/python -u
+#
+# p7zr library
+#
+# Copyright (c) 2019 Hiroshi Miura <[email protected]>
+# Copyright (c) 2004-2015 by Joachim Bauch, [email protected]
+# 7-Zip Copyright (C) 1999-2010 Igor Pavlov
+# LZMA SDK Copyright (C) 1999-2010 Igor Pavlov
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+import bz2
+import io
+import lzma
+import os
+import queue
+import sys
+import threading
+from typing import IO, Any, BinaryIO, Dict, List, Optional, Union
+
+from py7zr import UnsupportedCompressionMethodError
+from py7zr.extra import CopyDecompressor, DeflateDecompressor, ISevenZipDecompressor, ZstdDecompressor
+from py7zr.helpers import MemIO, NullIO, calculate_crc32, readlink
+from py7zr.properties import READ_BLOCKSIZE, ArchivePassword, CompressionMethod
+
+if sys.version_info < (3, 6):
+ import pathlib2 as pathlib
+else:
+ import pathlib
+try:
+ import zstandard as Zstd # type: ignore
+except ImportError:
+ Zstd = None
+
+
+class Worker:
+ """Extract worker class to invoke handler"""
+
+ def __init__(self, files, src_start: int, header) -> None:
+ self.target_filepath = {} # type: Dict[int, Union[MemIO, pathlib.Path, None]]
+ self.files = files
+ self.src_start = src_start
+ self.header = header
+
+ def extract(self, fp: BinaryIO, parallel: bool, q=None) -> None:
+ """Extract worker method to handle 7zip folder and decompress each files."""
+ if hasattr(self.header, 'main_streams') and self.header.main_streams is not None:
+ src_end = self.src_start + self.header.main_streams.packinfo.packpositions[-1]
+ numfolders = self.header.main_streams.unpackinfo.numfolders
+ if numfolders == 1:
+ self.extract_single(fp, self.files, self.src_start, src_end, q)
+ else:
+ folders = self.header.main_streams.unpackinfo.folders
+ positions = self.header.main_streams.packinfo.packpositions
+ empty_files = [f for f in self.files if f.emptystream]
+ if not parallel:
+ self.extract_single(fp, empty_files, 0, 0, q)
+ for i in range(numfolders):
+ self.extract_single(fp, folders[i].files, self.src_start + positions[i],
+ self.src_start + positions[i + 1], q)
+ else:
+ filename = getattr(fp, 'name', None)
+ self.extract_single(open(filename, 'rb'), empty_files, 0, 0, q)
+ extract_threads = []
+ for i in range(numfolders):
+ p = threading.Thread(target=self.extract_single,
+ args=(filename, folders[i].files,
+ self.src_start + positions[i], self.src_start + positions[i + 1], q))
+ p.start()
+ extract_threads.append((p))
+ for p in extract_threads:
+ p.join()
+ else:
+ empty_files = [f for f in self.files if f.emptystream]
+ self.extract_single(fp, empty_files, 0, 0, q)
+
+ def extract_single(self, fp: Union[BinaryIO, str], files, src_start: int, src_end: int,
+ q: Optional[queue.Queue]) -> None:
+ """Single thread extractor that takes file lists in single 7zip folder."""
+ if files is None:
+ return
+ if isinstance(fp, str):
+ fp = open(fp, 'rb')
+ fp.seek(src_start)
+ for f in files:
+ if q is not None:
+ q.put(('s', str(f.filename), str(f.compressed) if f.compressed is not None else '0'))
+ fileish = self.target_filepath.get(f.id, None)
+ if fileish is not None:
+ fileish.parent.mkdir(parents=True, exist_ok=True)
+ with fileish.open(mode='wb') as ofp:
+ if not f.emptystream:
+ # extract to file
+ self.decompress(fp, f.folder, ofp, f.uncompressed[-1], f.compressed, src_end)
+ ofp.seek(0)
+ else:
+ pass # just create empty file
+ elif not f.emptystream:
+ # read and bin off a data but check crc
+ with NullIO() as ofp:
+ self.decompress(fp, f.folder, ofp, f.uncompressed[-1], f.compressed, src_end)
+ if q is not None:
+ q.put(('e', str(f.filename), str(f.uncompressed[-1])))
+
+ def decompress(self, fp: BinaryIO, folder, fq: IO[Any],
+ size: int, compressed_size: Optional[int], src_end: int) -> None:
+ """decompressor wrapper called from extract method.
+
+ :parameter fp: archive source file pointer
+ :parameter folder: Folder object that have decompressor object.
+ :parameter fq: output file pathlib.Path
+ :parameter size: uncompressed size of target file.
+ :parameter compressed_size: compressed size of target file.
+ :parameter src_end: end position of the folder
+ :returns None
+ """
+ assert folder is not None
+ out_remaining = size
+ decompressor = folder.get_decompressor(compressed_size)
+ while out_remaining > 0:
+ max_length = min(out_remaining, io.DEFAULT_BUFFER_SIZE)
+ rest_size = src_end - fp.tell()
+ read_size = min(READ_BLOCKSIZE, rest_size)
+ if read_size == 0:
+ tmp = decompressor.decompress(b'', max_length)
+ if len(tmp) == 0:
+ raise Exception("decompression get wrong: no output data.")
+ else:
+ inp = fp.read(read_size)
+ tmp = decompressor.decompress(inp, max_length)
+ if len(tmp) > 0 and out_remaining >= len(tmp):
+ out_remaining -= len(tmp)
+ fq.write(tmp)
+ if out_remaining <= 0:
+ break
+ if fp.tell() >= src_end:
+ if decompressor.crc is not None and not decompressor.check_crc():
+ print('\nCRC error! expected: {}, real: {}'.format(decompressor.crc, decompressor.digest))
+ return
+
+ def _find_link_target(self, target):
+ """Find the target member of a symlink or hardlink member in the archive.
+ """
+ targetname = target.as_posix() # type: str
+ linkname = readlink(targetname)
+ # Check windows full path symlinks
+ if linkname.startswith("\\\\?\\"):
+ linkname = linkname[4:]
+ # normalize as posix style
+ linkname = pathlib.Path(linkname).as_posix() # type: str
+ member = None
+ for j in range(len(self.files)):
+ if linkname == self.files[j].origin.as_posix():
+ # FIXME: when API user specify arcname, it will break
+ member = os.path.relpath(linkname, os.path.dirname(targetname))
+ break
+ if member is None:
+ member = linkname
+ return member
+
+ def archive(self, fp: BinaryIO, folder, deref=False):
+ """Run archive task for specified 7zip folder."""
+ compressor = folder.get_compressor()
+ outsize = 0
+ self.header.main_streams.packinfo.numstreams = 1
+ num_unpack_streams = 0
+ self.header.main_streams.substreamsinfo.digests = []
+ self.header.main_streams.substreamsinfo.digestsdefined = []
+ last_file_index = 0
+ foutsize = 0
+ for i, f in enumerate(self.files):
+ file_info = f.file_properties()
+ self.header.files_info.files.append(file_info)
+ self.header.files_info.emptyfiles.append(f.emptystream)
+ foutsize = 0
+ if f.is_symlink and not deref:
+ last_file_index = i
+ num_unpack_streams += 1
+ link_target = self._find_link_target(f.origin) # type: str
+ tgt = link_target.encode('utf-8') # type: bytes
+ insize = len(tgt)
+ crc = calculate_crc32(tgt, 0) # type: int
+ out = compressor.compress(tgt)
+ outsize += len(out)
+ foutsize += len(out)
+ fp.write(out)
+ self.header.main_streams.substreamsinfo.digests.append(crc)
+ self.header.main_streams.substreamsinfo.digestsdefined.append(True)
+ self.header.main_streams.substreamsinfo.unpacksizes.append(insize)
+ self.header.files_info.files[i]['maxsize'] = foutsize
+ elif not f.emptystream:
+ last_file_index = i
+ num_unpack_streams += 1
+ insize = 0
+ with f.origin.open(mode='rb') as fd:
+ data = fd.read(READ_BLOCKSIZE)
+ insize += len(data)
+ crc = 0
+ while data:
+ crc = calculate_crc32(data, crc)
+ out = compressor.compress(data)
+ outsize += len(out)
+ foutsize += len(out)
+ fp.write(out)
+ data = fd.read(READ_BLOCKSIZE)
+ insize += len(data)
+ self.header.main_streams.substreamsinfo.digests.append(crc)
+ self.header.main_streams.substreamsinfo.digestsdefined.append(True)
+ self.header.files_info.files[i]['maxsize'] = foutsize
+ self.header.main_streams.substreamsinfo.unpacksizes.append(insize)
+ else:
+ out = compressor.flush()
+ outsize += len(out)
+ foutsize += len(out)
+ fp.write(out)
+ if len(self.files) > 0:
+ self.header.files_info.files[last_file_index]['maxsize'] = foutsize
+ # Update size data in header
+ self.header.main_streams.packinfo.packsizes = [outsize]
+ folder.unpacksizes = [sum(self.header.main_streams.substreamsinfo.unpacksizes)]
+ self.header.main_streams.substreamsinfo.num_unpackstreams_folders = [num_unpack_streams]
+
+ def register_filelike(self, id: int, fileish: Union[MemIO, pathlib.Path, None]) -> None:
+ """register file-ish to worker."""
+ self.target_filepath[id] = fileish
+
+
+class SevenZipDecompressor:
+ """Main decompressor object which is properly configured and bind to each 7zip folder.
+ because 7zip folder can have a custom compression method"""
+
+ lzma_methods_map = {
+ CompressionMethod.LZMA: lzma.FILTER_LZMA1,
+ CompressionMethod.LZMA2: lzma.FILTER_LZMA2,
+ CompressionMethod.DELTA: lzma.FILTER_DELTA,
+ CompressionMethod.P7Z_BCJ: lzma.FILTER_X86,
+ CompressionMethod.BCJ_ARM: lzma.FILTER_ARM,
+ CompressionMethod.BCJ_ARMT: lzma.FILTER_ARMTHUMB,
+ CompressionMethod.BCJ_IA64: lzma.FILTER_IA64,
+ CompressionMethod.BCJ_PPC: lzma.FILTER_POWERPC,
+ CompressionMethod.BCJ_SPARC: lzma.FILTER_SPARC,
+ }
+
+ FILTER_BZIP2 = 0x31
+ FILTER_ZIP = 0x32
+ FILTER_COPY = 0x33
+ FILTER_AES = 0x34
+ FILTER_ZSTD = 0x35
+ alt_methods_map = {
+ CompressionMethod.MISC_BZIP2: FILTER_BZIP2,
+ CompressionMethod.MISC_DEFLATE: FILTER_ZIP,
+ CompressionMethod.COPY: FILTER_COPY,
+ CompressionMethod.CRYPT_AES256_SHA256: FILTER_AES,
+ CompressionMethod.MISC_ZSTD: FILTER_ZSTD,
+ }
+
+ def __init__(self, coders: List[Dict[str, Any]], size: int, crc: Optional[int]) -> None:
+ # Get password which was set when creation of py7zr.SevenZipFile object.
+ self.input_size = size
+ self.consumed = 0 # type: int
+ self.crc = crc
+ self.digest = None # type: Optional[int]
+ if self._check_lzma_coders(coders):
+ self._set_lzma_decompressor(coders)
+ else:
+ self._set_alternative_decompressor(coders)
+
+ def _check_lzma_coders(self, coders: List[Dict[str, Any]]) -> bool:
+ res = True
+ for coder in coders:
+ if self.lzma_methods_map.get(coder['method'], None) is None:
+ res = False
+ break
+ return res
+
+ def _set_lzma_decompressor(self, coders: List[Dict[str, Any]]) -> None:
+ filters = [] # type: List[Dict[str, Any]]
+ for coder in coders:
+ if coder['numinstreams'] != 1 or coder['numoutstreams'] != 1:
+ raise UnsupportedCompressionMethodError('Only a simple compression method is currently supported.')
+ filter_id = self.lzma_methods_map.get(coder['method'], None)
+ if filter_id is None:
+ raise UnsupportedCompressionMethodError
+ properties = coder.get('properties', None)
+ if properties is not None:
+ filters[:0] = [lzma._decode_filter_properties(filter_id, properties)] # type: ignore
+ else:
+ filters[:0] = [{'id': filter_id}]
+ self.decompressor = lzma.LZMADecompressor(format=lzma.FORMAT_RAW, filters=filters) # type: Union[bz2.BZ2Decompressor, lzma.LZMADecompressor, ISevenZipDecompressor] # noqa
+
+ def _set_alternative_decompressor(self, coders: List[Dict[str, Any]]) -> None:
+ filter_id = self.alt_methods_map.get(coders[0]['method'], None)
+ if filter_id == self.FILTER_BZIP2:
+ self.decompressor = bz2.BZ2Decompressor()
+ elif filter_id == self.FILTER_ZIP:
+ self.decompressor = DeflateDecompressor()
+ elif filter_id == self.FILTER_COPY:
+ self.decompressor = CopyDecompressor()
+ elif filter_id == self.FILTER_ZSTD and Zstd:
+ self.decompressor = ZstdDecompressor()
+ else:
+ raise UnsupportedCompressionMethodError
+
+ def decompress(self, data: bytes, max_length: Optional[int] = None) -> bytes:
+ self.consumed += len(data)
+ if max_length is not None:
+ folder_data = self.decompressor.decompress(data, max_length=max_length)
+ else:
+ folder_data = self.decompressor.decompress(data)
+ # calculate CRC with uncompressed data
+ if self.crc is not None:
+ self.digest = calculate_crc32(folder_data, self.digest)
+ return folder_data
+
+ def check_crc(self):
+ return self.crc == self.digest
+
+
+class SevenZipCompressor:
+
+ """Main compressor object to configured for each 7zip folder."""
+
+ __slots__ = ['filters', 'compressor', 'coders']
+
+ lzma_methods_map_r = {
+ lzma.FILTER_LZMA2: CompressionMethod.LZMA2,
+ lzma.FILTER_DELTA: CompressionMethod.DELTA,
+ lzma.FILTER_X86: CompressionMethod.P7Z_BCJ,
+ }
+
+ def __init__(self, filters=None):
+ if filters is None:
+ self.filters = [{"id": lzma.FILTER_LZMA2, "preset": 7 | lzma.PRESET_EXTREME}, ]
+ else:
+ self.filters = filters
+ self.compressor = lzma.LZMACompressor(format=lzma.FORMAT_RAW, filters=self.filters)
+ self.coders = []
+ for filter in self.filters:
+ if filter is None:
+ break
+ method = self.lzma_methods_map_r[filter['id']]
+ properties = lzma._encode_filter_properties(filter)
+ self.coders.append({'method': method, 'properties': properties, 'numinstreams': 1, 'numoutstreams': 1})
+
+ def compress(self, data):
+ return self.compressor.compress(data)
+
+ def flush(self):
+ return self.compressor.flush()
+
+
+def get_methods_names(coders: List[dict]) -> List[str]:
+ """Return human readable method names for specified coders"""
+ methods_name_map = {
+ CompressionMethod.LZMA2: "LZMA2",
+ CompressionMethod.LZMA: "LZMA",
+ CompressionMethod.DELTA: "delta",
+ CompressionMethod.P7Z_BCJ: "BCJ",
+ CompressionMethod.BCJ_ARM: "BCJ(ARM)",
+ CompressionMethod.BCJ_ARMT: "BCJ(ARMT)",
+ CompressionMethod.BCJ_IA64: "BCJ(IA64)",
+ CompressionMethod.BCJ_PPC: "BCJ(POWERPC)",
+ CompressionMethod.BCJ_SPARC: "BCJ(SPARC)",
+ CompressionMethod.CRYPT_AES256_SHA256: "7zAES",
+ }
+ methods_names = [] # type: List[str]
+ for coder in coders:
+ try:
+ methods_names.append(methods_name_map[coder['method']])
+ except KeyError:
+ raise UnsupportedCompressionMethodError("Unknown method {}".format(coder['method']))
+ return methods_names
diff --git a/libs/py7zr/exceptions.py b/libs/py7zr/exceptions.py
new file mode 100644
index 000000000..1a25e2089
--- /dev/null
+++ b/libs/py7zr/exceptions.py
@@ -0,0 +1,42 @@
+#
+# p7zr library
+#
+# Copyright (c) 2019 Hiroshi Miura <[email protected]>
+# Copyright (c) 2004-2015 by Joachim Bauch, [email protected]
+# 7-Zip Copyright (C) 1999-2010 Igor Pavlov
+# LZMA SDK Copyright (C) 1999-2010 Igor Pavlov
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+
+
+class ArchiveError(Exception):
+ pass
+
+
+class Bad7zFile(ArchiveError):
+ pass
+
+
+class UnsupportedCompressionMethodError(ArchiveError):
+ pass
+
+
+class DecompressionError(ArchiveError):
+ pass
+
+
+class InternalError(ArchiveError):
+ pass
diff --git a/libs/py7zr/extra.py b/libs/py7zr/extra.py
new file mode 100644
index 000000000..48cc840a5
--- /dev/null
+++ b/libs/py7zr/extra.py
@@ -0,0 +1,122 @@
+#!/usr/bin/python -u
+#
+# p7zr library
+#
+# Copyright (c) 2019 Hiroshi Miura <[email protected]>
+# Copyright (c) 2004-2015 by Joachim Bauch, [email protected]
+# 7-Zip Copyright (C) 1999-2010 Igor Pavlov
+# LZMA SDK Copyright (C) 1999-2010 Igor Pavlov
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+import lzma
+import zlib
+from abc import ABC, abstractmethod
+from typing import Any, Dict, List, Union
+
+from py7zr import UnsupportedCompressionMethodError
+from py7zr.helpers import Buffer, calculate_key
+from py7zr.properties import READ_BLOCKSIZE, CompressionMethod
+
+try:
+ import zstandard as Zstd # type: ignore
+except ImportError:
+ Zstd = None
+
+
+class ISevenZipCompressor(ABC):
+ @abstractmethod
+ def compress(self, data: Union[bytes, bytearray, memoryview]) -> bytes:
+ pass
+
+ @abstractmethod
+ def flush(self) -> bytes:
+ pass
+
+
+class ISevenZipDecompressor(ABC):
+ @abstractmethod
+ def decompress(self, data: Union[bytes, bytearray, memoryview], max_length: int = -1) -> bytes:
+ pass
+
+
+class DeflateDecompressor(ISevenZipDecompressor):
+ def __init__(self):
+ self.buf = b''
+ self._decompressor = zlib.decompressobj(-15)
+
+ def decompress(self, data: Union[bytes, bytearray, memoryview], max_length: int = -1):
+ if max_length < 0:
+ res = self.buf + self._decompressor.decompress(data)
+ self.buf = b''
+ else:
+ tmp = self.buf + self._decompressor.decompress(data)
+ res = tmp[:max_length]
+ self.buf = tmp[max_length:]
+ return res
+
+
+class CopyDecompressor(ISevenZipDecompressor):
+
+ def __init__(self):
+ self._buf = bytes()
+
+ def decompress(self, data: Union[bytes, bytearray, memoryview], max_length: int = -1) -> bytes:
+ if max_length < 0:
+ length = len(data)
+ else:
+ length = min(len(data), max_length)
+ buflen = len(self._buf)
+ if length > buflen:
+ res = self._buf + data[:length - buflen]
+ self._buf = data[length - buflen:]
+ else:
+ res = self._buf[:length]
+ self._buf = self._buf[length:] + data
+ return res
+
+
+class ZstdDecompressor(ISevenZipDecompressor):
+
+ def __init__(self):
+ if Zstd is None:
+ raise UnsupportedCompressionMethodError
+ self.buf = b'' # type: bytes
+ self._ctc = Zstd.ZstdDecompressor() # type: ignore
+
+ def decompress(self, data: Union[bytes, bytearray, memoryview], max_length: int = -1) -> bytes:
+ dobj = self._ctc.decompressobj() # type: ignore
+ if max_length < 0:
+ res = self.buf + dobj.decompress(data)
+ self.buf = b''
+ else:
+ tmp = self.buf + dobj.decompress(data)
+ res = tmp[:max_length]
+ self.buf = tmp[max_length:]
+ return res
+
+
+class ZstdCompressor(ISevenZipCompressor):
+
+ def __init__(self):
+ if Zstd is None:
+ raise UnsupportedCompressionMethodError
+ self._ctc = Zstd.ZstdCompressor() # type: ignore
+
+ def compress(self, data: Union[bytes, bytearray, memoryview]) -> bytes:
+ return self._ctc.compress(data) # type: ignore
+
+ def flush(self):
+ pass
diff --git a/libs/py7zr/helpers.py b/libs/py7zr/helpers.py
new file mode 100644
index 000000000..1f84417b8
--- /dev/null
+++ b/libs/py7zr/helpers.py
@@ -0,0 +1,362 @@
+#!/usr/bin/python -u
+#
+# p7zr library
+#
+# Copyright (c) 2019 Hiroshi Miura <[email protected]>
+# Copyright (c) 2004-2015 by Joachim Bauch, [email protected]
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+#
+
+import _hashlib # type: ignore # noqa
+import ctypes
+import os
+import pathlib
+import platform
+import sys
+import time as _time
+import zlib
+from datetime import datetime, timedelta, timezone, tzinfo
+from typing import BinaryIO, Optional, Union
+
+import py7zr.win32compat
+
+
+def calculate_crc32(data: bytes, value: Optional[int] = None, blocksize: int = 1024 * 1024) -> int:
+ """Calculate CRC32 of strings with arbitrary lengths."""
+ length = len(data)
+ pos = blocksize
+ if value:
+ value = zlib.crc32(data[:pos], value)
+ else:
+ value = zlib.crc32(data[:pos])
+ while pos < length:
+ value = zlib.crc32(data[pos:pos + blocksize], value)
+ pos += blocksize
+
+ return value & 0xffffffff
+
+
+def _calculate_key1(password: bytes, cycles: int, salt: bytes, digest: str) -> bytes:
+ """Calculate 7zip AES encryption key."""
+ if digest not in ('sha256'):
+ raise ValueError('Unknown digest method for password protection.')
+ assert cycles <= 0x3f
+ if cycles == 0x3f:
+ ba = bytearray(salt + password + bytes(32))
+ key = bytes(ba[:32]) # type: bytes
+ else:
+ rounds = 1 << cycles
+ m = _hashlib.new(digest)
+ for round in range(rounds):
+ m.update(salt + password + round.to_bytes(8, byteorder='little', signed=False))
+ key = m.digest()[:32]
+ return key
+
+
+def _calculate_key2(password: bytes, cycles: int, salt: bytes, digest: str):
+ """Calculate 7zip AES encryption key.
+ It utilize ctypes and memoryview buffer and zero-copy technology on Python."""
+ if digest not in ('sha256'):
+ raise ValueError('Unknown digest method for password protection.')
+ assert cycles <= 0x3f
+ if cycles == 0x3f:
+ key = bytes(bytearray(salt + password + bytes(32))[:32]) # type: bytes
+ else:
+ rounds = 1 << cycles
+ m = _hashlib.new(digest)
+ length = len(salt) + len(password)
+
+ class RoundBuf(ctypes.LittleEndianStructure):
+ _pack_ = 1
+ _fields_ = [
+ ('saltpassword', ctypes.c_ubyte * length),
+ ('round', ctypes.c_uint64)
+ ]
+
+ buf = RoundBuf()
+ for i, c in enumerate(salt + password):
+ buf.saltpassword[i] = c
+ buf.round = 0
+ mv = memoryview(buf) # type: ignore # noqa
+ while buf.round < rounds:
+ m.update(mv)
+ buf.round += 1
+ key = m.digest()[:32]
+ return key
+
+
+if platform.python_implementation() == "PyPy":
+ calculate_key = _calculate_key1 # Avoid https://foss.heptapod.net/pypy/pypy/issues/3209
+else:
+ calculate_key = _calculate_key2 # ver2 is 1.7-2.0 times faster than ver1
+
+
+def filetime_to_dt(ft):
+ """Convert Windows NTFS file time into python datetime object."""
+ EPOCH_AS_FILETIME = 116444736000000000
+ us = (ft - EPOCH_AS_FILETIME) // 10
+ return datetime(1970, 1, 1, tzinfo=timezone.utc) + timedelta(microseconds=us)
+
+
+ZERO = timedelta(0)
+HOUR = timedelta(hours=1)
+SECOND = timedelta(seconds=1)
+
+# A class capturing the platform's idea of local time.
+# (May result in wrong values on historical times in
+# timezones where UTC offset and/or the DST rules had
+# changed in the past.)
+
+STDOFFSET = timedelta(seconds=-_time.timezone)
+if _time.daylight:
+ DSTOFFSET = timedelta(seconds=-_time.altzone)
+else:
+ DSTOFFSET = STDOFFSET
+
+DSTDIFF = DSTOFFSET - STDOFFSET
+
+
+class LocalTimezone(tzinfo):
+
+ def fromutc(self, dt):
+ assert dt.tzinfo is self
+ stamp = (dt - datetime(1970, 1, 1, tzinfo=self)) // SECOND
+ args = _time.localtime(stamp)[:6]
+ dst_diff = DSTDIFF // SECOND
+ # Detect fold
+ fold = (args == _time.localtime(stamp - dst_diff))
+ return datetime(*args, microsecond=dt.microsecond, tzinfo=self)
+
+ def utcoffset(self, dt):
+ if self._isdst(dt):
+ return DSTOFFSET
+ else:
+ return STDOFFSET
+
+ def dst(self, dt):
+ if self._isdst(dt):
+ return DSTDIFF
+ else:
+ return ZERO
+
+ def tzname(self, dt):
+ return _time.tzname[self._isdst(dt)]
+
+ def _isdst(self, dt):
+ tt = (dt.year, dt.month, dt.day,
+ dt.hour, dt.minute, dt.second,
+ dt.weekday(), 0, 0)
+ stamp = _time.mktime(tt)
+ tt = _time.localtime(stamp)
+ return tt.tm_isdst > 0
+
+
+Local = LocalTimezone()
+TIMESTAMP_ADJUST = -11644473600
+
+
+class UTC(tzinfo):
+ """UTC"""
+
+ def utcoffset(self, dt):
+ return ZERO
+
+ def tzname(self, dt):
+ return "UTC"
+
+ def dst(self, dt):
+ return ZERO
+
+ def _call__(self):
+ return self
+
+
+class ArchiveTimestamp(int):
+ """Windows FILETIME timestamp."""
+
+ def __repr__(self):
+ return '%s(%d)' % (type(self).__name__, self)
+
+ def totimestamp(self) -> float:
+ """Convert 7z FILETIME to Python timestamp."""
+ # FILETIME is 100-nanosecond intervals since 1601/01/01 (UTC)
+ return (self / 10000000.0) + TIMESTAMP_ADJUST
+
+ def as_datetime(self):
+ """Convert FILETIME to Python datetime object."""
+ return datetime.fromtimestamp(self.totimestamp(), UTC())
+
+ @staticmethod
+ def from_datetime(val):
+ return ArchiveTimestamp((val - TIMESTAMP_ADJUST) * 10000000.0)
+
+
+def islink(path):
+ """
+ Cross-platform islink implementation.
+ Supports Windows NT symbolic links and reparse points.
+ """
+ is_symlink = os.path.islink(path)
+ if sys.version_info >= (3, 8) or sys.platform != "win32" or sys.getwindowsversion()[0] < 6:
+ return is_symlink
+ # special check for directory junctions which py38 does.
+ if is_symlink:
+ if py7zr.win32compat.is_reparse_point(path):
+ is_symlink = False
+ return is_symlink
+
+
+def readlink(path: Union[str, pathlib.Path], *, dir_fd=None) -> Union[str, pathlib.Path]:
+ """
+ Cross-platform compat implementation of os.readlink and Path.readlink().
+ Supports Windows NT symbolic links and reparse points.
+ When called with path argument as pathlike(str), return result as a pathlike(str).
+ When called with Path object, return also Path object.
+ When called with path argument as bytes, return result as a bytes.
+ """
+ is_path_pathlib = isinstance(path, pathlib.Path)
+ if sys.version_info >= (3, 9):
+ if is_path_pathlib and dir_fd is None:
+ return path.readlink()
+ else:
+ return os.readlink(path, dir_fd=dir_fd)
+ elif sys.version_info >= (3, 8) or sys.platform != "win32":
+ res = os.readlink(path, dir_fd=dir_fd)
+ # Hack to handle a wrong type of results
+ if isinstance(res, bytes):
+ res = os.fsdecode(res)
+ if is_path_pathlib:
+ return pathlib.Path(res)
+ else:
+ return res
+ elif not os.path.exists(str(path)):
+ raise OSError(22, 'Invalid argument', path)
+ return py7zr.win32compat.readlink(path)
+
+
+class MemIO:
+ """pathlib.Path-like IO class to write memory(io.Bytes)"""
+ def __init__(self, buf: BinaryIO):
+ self._buf = buf
+
+ def write(self, data: bytes) -> int:
+ return self._buf.write(data)
+
+ def read(self, length: Optional[int] = None) -> bytes:
+ if length is not None:
+ return self._buf.read(length)
+ else:
+ return self._buf.read()
+
+ def close(self) -> None:
+ self._buf.seek(0)
+
+ def flush(self) -> None:
+ pass
+
+ def seek(self, position: int) -> None:
+ self._buf.seek(position)
+
+ def open(self, mode=None):
+ return self
+
+ @property
+ def parent(self):
+ return self
+
+ def mkdir(self, parents=None, exist_ok=False):
+ return None
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ pass
+
+
+class NullIO:
+ """pathlib.Path-like IO class of /dev/null"""
+
+ def __init__(self):
+ pass
+
+ def write(self, data):
+ return len(data)
+
+ def read(self, length=None):
+ if length is not None:
+ return bytes(length)
+ else:
+ return b''
+
+ def close(self):
+ pass
+
+ def flush(self):
+ pass
+
+ def open(self, mode=None):
+ return self
+
+ @property
+ def parent(self):
+ return self
+
+ def mkdir(self):
+ return None
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ pass
+
+
+class BufferOverflow(Exception):
+ pass
+
+
+class Buffer:
+
+ def __init__(self, size: int = 16):
+ self._size = size
+ self._buf = bytearray(size)
+ self._buflen = 0
+ self.view = memoryview(self._buf[0:0])
+
+ def add(self, data: Union[bytes, bytearray, memoryview]):
+ length = len(data)
+ if length + self._buflen > self._size:
+ raise BufferOverflow()
+ self._buf[self._buflen:self._buflen + length] = data
+ self._buflen += length
+ self.view = memoryview(self._buf[0:self._buflen])
+
+ def reset(self) -> None:
+ self._buflen = 0
+ self.view = memoryview(self._buf[0:0])
+
+ def set(self, data: Union[bytes, bytearray, memoryview]) -> None:
+ length = len(data)
+ if length > self._size:
+ raise BufferOverflow()
+ self._buf[0:length] = data
+ self._buflen = length
+ self.view = memoryview(self._buf[0:length])
+
+ def __len__(self) -> int:
+ return self._buflen
diff --git a/libs/py7zr/properties.py b/libs/py7zr/properties.py
new file mode 100644
index 000000000..38cfbe8f5
--- /dev/null
+++ b/libs/py7zr/properties.py
@@ -0,0 +1,155 @@
+#
+# p7zr library
+#
+# Copyright (c) 2019 Hiroshi Miura <[email protected]>
+# Copyright (c) 2004-2015 by Joachim Bauch, [email protected]
+# 7-Zip Copyright (C) 1999-2010 Igor Pavlov
+# LZMA SDK Copyright (C) 1999-2010 Igor Pavlov
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+
+import binascii
+from enum import Enum
+from typing import Optional
+
+MAGIC_7Z = binascii.unhexlify('377abcaf271c')
+FINISH_7Z = binascii.unhexlify('377abcaf271d')
+READ_BLOCKSIZE = 32248
+QUEUELEN = READ_BLOCKSIZE * 2
+
+READ_BLOCKSIZE = 32248
+
+
+class ByteEnum(bytes, Enum):
+ pass
+
+
+class Property(ByteEnum):
+ """Hold 7zip property fixed values."""
+ END = binascii.unhexlify('00')
+ HEADER = binascii.unhexlify('01')
+ ARCHIVE_PROPERTIES = binascii.unhexlify('02')
+ ADDITIONAL_STREAMS_INFO = binascii.unhexlify('03')
+ MAIN_STREAMS_INFO = binascii.unhexlify('04')
+ FILES_INFO = binascii.unhexlify('05')
+ PACK_INFO = binascii.unhexlify('06')
+ UNPACK_INFO = binascii.unhexlify('07')
+ SUBSTREAMS_INFO = binascii.unhexlify('08')
+ SIZE = binascii.unhexlify('09')
+ CRC = binascii.unhexlify('0a')
+ FOLDER = binascii.unhexlify('0b')
+ CODERS_UNPACK_SIZE = binascii.unhexlify('0c')
+ NUM_UNPACK_STREAM = binascii.unhexlify('0d')
+ EMPTY_STREAM = binascii.unhexlify('0e')
+ EMPTY_FILE = binascii.unhexlify('0f')
+ ANTI = binascii.unhexlify('10')
+ NAME = binascii.unhexlify('11')
+ CREATION_TIME = binascii.unhexlify('12')
+ LAST_ACCESS_TIME = binascii.unhexlify('13')
+ LAST_WRITE_TIME = binascii.unhexlify('14')
+ ATTRIBUTES = binascii.unhexlify('15')
+ COMMENT = binascii.unhexlify('16')
+ ENCODED_HEADER = binascii.unhexlify('17')
+ START_POS = binascii.unhexlify('18')
+ DUMMY = binascii.unhexlify('19')
+
+
+class CompressionMethod(ByteEnum):
+ """Hold fixed values for method parameter."""
+ COPY = binascii.unhexlify('00')
+ DELTA = binascii.unhexlify('03')
+ BCJ = binascii.unhexlify('04')
+ PPC = binascii.unhexlify('05')
+ IA64 = binascii.unhexlify('06')
+ ARM = binascii.unhexlify('07')
+ ARMT = binascii.unhexlify('08')
+ SPARC = binascii.unhexlify('09')
+ # SWAP = 02..
+ SWAP2 = binascii.unhexlify('020302')
+ SWAP4 = binascii.unhexlify('020304')
+ # 7Z = 03..
+ LZMA = binascii.unhexlify('030101')
+ PPMD = binascii.unhexlify('030401')
+ P7Z_BCJ = binascii.unhexlify('03030103')
+ P7Z_BCJ2 = binascii.unhexlify('0303011B')
+ BCJ_PPC = binascii.unhexlify('03030205')
+ BCJ_IA64 = binascii.unhexlify('03030401')
+ BCJ_ARM = binascii.unhexlify('03030501')
+ BCJ_ARMT = binascii.unhexlify('03030701')
+ BCJ_SPARC = binascii.unhexlify('03030805')
+ LZMA2 = binascii.unhexlify('21')
+ # MISC : 04..
+ MISC_ZIP = binascii.unhexlify('0401')
+ MISC_BZIP2 = binascii.unhexlify('040202')
+ MISC_DEFLATE = binascii.unhexlify('040108')
+ MISC_DEFLATE64 = binascii.unhexlify('040109')
+ MISC_Z = binascii.unhexlify('0405')
+ MISC_LZH = binascii.unhexlify('0406')
+ NSIS_DEFLATE = binascii.unhexlify('040901')
+ NSIS_BZIP2 = binascii.unhexlify('040902')
+ #
+ MISC_ZSTD = binascii.unhexlify('04f71101')
+ MISC_BROTLI = binascii.unhexlify('04f71102')
+ MISC_LZ4 = binascii.unhexlify('04f71104')
+ MISC_LZS = binascii.unhexlify('04f71105')
+ MISC_LIZARD = binascii.unhexlify('04f71106')
+ # CRYPTO 06..
+ CRYPT_ZIPCRYPT = binascii.unhexlify('06f10101')
+ CRYPT_RAR29AES = binascii.unhexlify('06f10303')
+ CRYPT_AES256_SHA256 = binascii.unhexlify('06f10701')
+
+
+class SupportedMethods:
+ """Hold list of methods which python3 can support."""
+ formats = [{'name': "7z", 'magic': MAGIC_7Z}]
+ codecs = [{'id': CompressionMethod.LZMA, 'name': "LZMA"},
+ {'id': CompressionMethod.LZMA2, 'name': "LZMA2"},
+ {'id': CompressionMethod.DELTA, 'name': "DELTA"},
+ {'id': CompressionMethod.P7Z_BCJ, 'name': "BCJ"},
+ {'id': CompressionMethod.BCJ_PPC, 'name': 'PPC'},
+ {'id': CompressionMethod.BCJ_IA64, 'name': 'IA64'},
+ {'id': CompressionMethod.BCJ_ARM, 'name': "ARM"},
+ {'id': CompressionMethod.BCJ_ARMT, 'name': "ARMT"},
+ {'id': CompressionMethod.BCJ_SPARC, 'name': 'SPARC'}
+ ]
+
+
+# this class is Borg/Singleton
+class ArchivePassword:
+
+ _shared_state = {
+ '_password': None,
+ }
+
+ def __init__(self, password: Optional[str] = None):
+ self.__dict__ = self._shared_state
+ if password is not None:
+ self._password = password
+
+ def set(self, password):
+ self._password = password
+
+ def get(self):
+ if self._password is not None:
+ return self._password
+ else:
+ return ''
+
+ def __str__(self):
+ if self._password is not None:
+ return self._password
+ else:
+ return ''
diff --git a/libs/py7zr/py7zr.py b/libs/py7zr/py7zr.py
new file mode 100644
index 000000000..466ae6274
--- /dev/null
+++ b/libs/py7zr/py7zr.py
@@ -0,0 +1,974 @@
+#!/usr/bin/python -u
+#
+# p7zr library
+#
+# Copyright (c) 2019,2020 Hiroshi Miura <[email protected]>
+# Copyright (c) 2004-2015 by Joachim Bauch, [email protected]
+# 7-Zip Copyright (C) 1999-2010 Igor Pavlov
+# LZMA SDK Copyright (C) 1999-2010 Igor Pavlov
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+#
+"""Read 7zip format archives."""
+import collections.abc
+import datetime
+import errno
+import functools
+import io
+import operator
+import os
+import queue
+import stat
+import sys
+import threading
+from io import BytesIO
+from typing import IO, Any, BinaryIO, Dict, List, Optional, Tuple, Union
+
+from py7zr.archiveinfo import Folder, Header, SignatureHeader
+from py7zr.callbacks import ExtractCallback
+from py7zr.compression import SevenZipCompressor, Worker, get_methods_names
+from py7zr.exceptions import Bad7zFile, InternalError
+from py7zr.helpers import ArchiveTimestamp, MemIO, calculate_crc32, filetime_to_dt
+from py7zr.properties import MAGIC_7Z, READ_BLOCKSIZE, ArchivePassword
+
+if sys.version_info < (3, 6):
+ import contextlib2 as contextlib
+ import pathlib2 as pathlib
+else:
+ import contextlib
+ import pathlib
+
+if sys.platform.startswith('win'):
+ import _winapi
+
+FILE_ATTRIBUTE_UNIX_EXTENSION = 0x8000
+FILE_ATTRIBUTE_WINDOWS_MASK = 0x04fff
+
+
+class ArchiveFile:
+ """Represent each files metadata inside archive file.
+ It holds file properties; filename, permissions, and type whether
+ it is directory, link or normal file.
+
+ Instances of the :class:`ArchiveFile` class are returned by iterating :attr:`files_list` of
+ :class:`SevenZipFile` objects.
+ Each object stores information about a single member of the 7z archive. Most of users use :meth:`extractall()`.
+
+ The class also hold an archive parameter where file is exist in
+ archive file folder(container)."""
+ def __init__(self, id: int, file_info: Dict[str, Any]) -> None:
+ self.id = id
+ self._file_info = file_info
+
+ def file_properties(self) -> Dict[str, Any]:
+ """Return file properties as a hash object. Following keys are included: ‘readonly’, ‘is_directory’,
+ ‘posix_mode’, ‘archivable’, ‘emptystream’, ‘filename’, ‘creationtime’, ‘lastaccesstime’,
+ ‘lastwritetime’, ‘attributes’
+ """
+ properties = self._file_info
+ if properties is not None:
+ properties['readonly'] = self.readonly
+ properties['posix_mode'] = self.posix_mode
+ properties['archivable'] = self.archivable
+ properties['is_directory'] = self.is_directory
+ return properties
+
+ def _get_property(self, key: str) -> Any:
+ try:
+ return self._file_info[key]
+ except KeyError:
+ return None
+
+ @property
+ def origin(self) -> pathlib.Path:
+ return self._get_property('origin')
+
+ @property
+ def folder(self) -> Folder:
+ return self._get_property('folder')
+
+ @property
+ def filename(self) -> str:
+ """return filename of archive file."""
+ return self._get_property('filename')
+
+ @property
+ def emptystream(self) -> bool:
+ """True if file is empty(0-byte file), otherwise False"""
+ return self._get_property('emptystream')
+
+ @property
+ def uncompressed(self) -> List[int]:
+ return self._get_property('uncompressed')
+
+ @property
+ def uncompressed_size(self) -> int:
+ """Uncompressed file size."""
+ return functools.reduce(operator.add, self.uncompressed)
+
+ @property
+ def compressed(self) -> Optional[int]:
+ """Compressed size"""
+ return self._get_property('compressed')
+
+ def _test_attribute(self, target_bit: int) -> bool:
+ attributes = self._get_property('attributes')
+ if attributes is None:
+ return False
+ return attributes & target_bit == target_bit
+
+ @property
+ def archivable(self) -> bool:
+ """File has a Windows `archive` flag."""
+ return self._test_attribute(stat.FILE_ATTRIBUTE_ARCHIVE) # type: ignore # noqa
+
+ @property
+ def is_directory(self) -> bool:
+ """True if file is a directory, otherwise False."""
+ return self._test_attribute(stat.FILE_ATTRIBUTE_DIRECTORY) # type: ignore # noqa
+
+ @property
+ def readonly(self) -> bool:
+ """True if file is readonly, otherwise False."""
+ return self._test_attribute(stat.FILE_ATTRIBUTE_READONLY) # type: ignore # noqa
+
+ def _get_unix_extension(self) -> Optional[int]:
+ attributes = self._get_property('attributes')
+ if self._test_attribute(FILE_ATTRIBUTE_UNIX_EXTENSION):
+ return attributes >> 16
+ return None
+
+ @property
+ def is_symlink(self) -> bool:
+ """True if file is a symbolic link, otherwise False."""
+ e = self._get_unix_extension()
+ if e is not None:
+ return stat.S_ISLNK(e)
+ return self._test_attribute(stat.FILE_ATTRIBUTE_REPARSE_POINT) # type: ignore # noqa
+
+ @property
+ def is_junction(self) -> bool:
+ """True if file is a junction/reparse point on windows, otherwise False."""
+ return self._test_attribute(stat.FILE_ATTRIBUTE_REPARSE_POINT | # type: ignore # noqa
+ stat.FILE_ATTRIBUTE_DIRECTORY) # type: ignore # noqa
+
+ @property
+ def is_socket(self) -> bool:
+ """True if file is a socket, otherwise False."""
+ e = self._get_unix_extension()
+ if e is not None:
+ return stat.S_ISSOCK(e)
+ return False
+
+ @property
+ def lastwritetime(self) -> Optional[ArchiveTimestamp]:
+ """Return last written timestamp of a file."""
+ return self._get_property('lastwritetime')
+
+ @property
+ def posix_mode(self) -> Optional[int]:
+ """
+ posix mode when a member has a unix extension property, or None
+ :return: Return file stat mode can be set by os.chmod()
+ """
+ e = self._get_unix_extension()
+ if e is not None:
+ return stat.S_IMODE(e)
+ return None
+
+ @property
+ def st_fmt(self) -> Optional[int]:
+ """
+ :return: Return the portion of the file mode that describes the file type
+ """
+ e = self._get_unix_extension()
+ if e is not None:
+ return stat.S_IFMT(e)
+ return None
+
+
+class ArchiveFileList(collections.abc.Iterable):
+ """Iteratable container of ArchiveFile."""
+
+ def __init__(self, offset: int = 0):
+ self.files_list = [] # type: List[dict]
+ self.index = 0
+ self.offset = offset
+
+ def append(self, file_info: Dict[str, Any]) -> None:
+ self.files_list.append(file_info)
+
+ def __len__(self) -> int:
+ return len(self.files_list)
+
+ def __iter__(self) -> 'ArchiveFileListIterator':
+ return ArchiveFileListIterator(self)
+
+ def __getitem__(self, index):
+ if index > len(self.files_list):
+ raise IndexError
+ if index < 0:
+ raise IndexError
+ res = ArchiveFile(index + self.offset, self.files_list[index])
+ return res
+
+
+class ArchiveFileListIterator(collections.abc.Iterator):
+
+ def __init__(self, archive_file_list):
+ self._archive_file_list = archive_file_list
+ self._index = 0
+
+ def __next__(self) -> ArchiveFile:
+ if self._index == len(self._archive_file_list):
+ raise StopIteration
+ res = self._archive_file_list[self._index]
+ self._index += 1
+ return res
+
+
+# ------------------
+# Exported Classes
+# ------------------
+class ArchiveInfo:
+ """Hold archive information"""
+
+ def __init__(self, filename, size, header_size, method_names, solid, blocks, uncompressed):
+ self.filename = filename
+ self.size = size
+ self.header_size = header_size
+ self.method_names = method_names
+ self.solid = solid
+ self.blocks = blocks
+ self.uncompressed = uncompressed
+
+
+class FileInfo:
+ """Hold archived file information."""
+
+ def __init__(self, filename, compressed, uncompressed, archivable, is_directory, creationtime):
+ self.filename = filename
+ self.compressed = compressed
+ self.uncompressed = uncompressed
+ self.archivable = archivable
+ self.is_directory = is_directory
+ self.creationtime = creationtime
+
+
+class SevenZipFile(contextlib.AbstractContextManager):
+ """The SevenZipFile Class provides an interface to 7z archives."""
+
+ def __init__(self, file: Union[BinaryIO, str, pathlib.Path], mode: str = 'r',
+ *, filters: Optional[str] = None, dereference=False, password: Optional[str] = None) -> None:
+ if mode not in ('r', 'w', 'x', 'a'):
+ raise ValueError("ZipFile requires mode 'r', 'w', 'x', or 'a'")
+ if password is not None:
+ if mode not in ('r'):
+ raise NotImplementedError("It has not been implemented to create archive with password.")
+ ArchivePassword(password)
+ self.password_protected = True
+ else:
+ self.password_protected = False
+ # Check if we were passed a file-like object or not
+ if isinstance(file, str):
+ self._filePassed = False # type: bool
+ self.filename = file # type: str
+ if mode == 'r':
+ self.fp = open(file, 'rb') # type: BinaryIO
+ elif mode == 'w':
+ self.fp = open(file, 'w+b')
+ elif mode == 'x':
+ self.fp = open(file, 'x+b')
+ elif mode == 'a':
+ self.fp = open(file, 'r+b')
+ else:
+ raise ValueError("File open error.")
+ self.mode = mode
+ elif isinstance(file, pathlib.Path):
+ self._filePassed = False
+ self.filename = str(file)
+ if mode == 'r':
+ self.fp = file.open(mode='rb') # type: ignore # noqa # typeshed issue: 2911
+ elif mode == 'w':
+ self.fp = file.open(mode='w+b') # type: ignore # noqa
+ elif mode == 'x':
+ self.fp = file.open(mode='x+b') # type: ignore # noqa
+ elif mode == 'a':
+ self.fp = file.open(mode='r+b') # type: ignore # noqa
+ else:
+ raise ValueError("File open error.")
+ self.mode = mode
+ elif isinstance(file, io.IOBase):
+ self._filePassed = True
+ self.fp = file
+ self.filename = getattr(file, 'name', None)
+ self.mode = mode # type: ignore #noqa
+ else:
+ raise TypeError("invalid file: {}".format(type(file)))
+ self._fileRefCnt = 1
+ try:
+ if mode == "r":
+ self._real_get_contents(self.fp)
+ self._reset_worker()
+ elif mode in 'w':
+ # FIXME: check filters here
+ self.folder = self._create_folder(filters)
+ self.files = ArchiveFileList()
+ self._prepare_write()
+ self._reset_worker()
+ elif mode in 'x':
+ raise NotImplementedError
+ elif mode == 'a':
+ raise NotImplementedError
+ else:
+ raise ValueError("Mode must be 'r', 'w', 'x', or 'a'")
+ except Exception as e:
+ self._fpclose()
+ raise e
+ self.encoded_header_mode = False
+ self._dict = {} # type: Dict[str, IO[Any]]
+ self.dereference = dereference
+ self.reporterd = None # type: Optional[threading.Thread]
+ self.q = queue.Queue() # type: queue.Queue[Any]
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.close()
+
+ def _create_folder(self, filters):
+ folder = Folder()
+ folder.compressor = SevenZipCompressor(filters)
+ folder.coders = folder.compressor.coders
+ folder.solid = True
+ folder.digestdefined = False
+ folder.bindpairs = []
+ folder.totalin = 1
+ folder.totalout = 1
+ return folder
+
+ def _fpclose(self) -> None:
+ assert self._fileRefCnt > 0
+ self._fileRefCnt -= 1
+ if not self._fileRefCnt and not self._filePassed:
+ self.fp.close()
+
+ def _real_get_contents(self, fp: BinaryIO) -> None:
+ if not self._check_7zfile(fp):
+ raise Bad7zFile('not a 7z file')
+ self.sig_header = SignatureHeader.retrieve(self.fp)
+ self.afterheader = self.fp.tell()
+ buffer = self._read_header_data()
+ header = Header.retrieve(self.fp, buffer, self.afterheader)
+ if header is None:
+ return
+ self.header = header
+ buffer.close()
+ self.files = ArchiveFileList()
+ if getattr(self.header, 'files_info', None) is not None:
+ self._filelist_retrieve()
+
+ def _read_header_data(self) -> BytesIO:
+ self.fp.seek(self.sig_header.nextheaderofs, os.SEEK_CUR)
+ buffer = io.BytesIO(self.fp.read(self.sig_header.nextheadersize))
+ if self.sig_header.nextheadercrc != calculate_crc32(buffer.getvalue()):
+ raise Bad7zFile('invalid header data')
+ return buffer
+
+ class ParseStatus:
+ def __init__(self, src_pos=0):
+ self.src_pos = src_pos
+ self.folder = 0 # 7zip folder where target stored
+ self.outstreams = 0 # output stream count
+ self.input = 0 # unpack stream count in each folder
+ self.stream = 0 # target input stream position
+
+ def _gen_filename(self) -> str:
+ # compressed file is stored without a name, generate one
+ try:
+ basefilename = self.filename
+ except AttributeError:
+ # 7z archive file doesn't have a name
+ return 'contents'
+ else:
+ if basefilename is not None:
+ fn, ext = os.path.splitext(os.path.basename(basefilename))
+ return fn
+ else:
+ return 'contents'
+
+ def _get_fileinfo_sizes(self, pstat, subinfo, packinfo, folder, packsizes, unpacksizes, file_in_solid, numinstreams):
+ if pstat.input == 0:
+ folder.solid = subinfo.num_unpackstreams_folders[pstat.folder] > 1
+ maxsize = (folder.solid and packinfo.packsizes[pstat.stream]) or None
+ uncompressed = unpacksizes[pstat.outstreams]
+ if not isinstance(uncompressed, (list, tuple)):
+ uncompressed = [uncompressed] * len(folder.coders)
+ if file_in_solid > 0:
+ compressed = None
+ elif pstat.stream < len(packsizes): # file is compressed
+ compressed = packsizes[pstat.stream]
+ else: # file is not compressed
+ compressed = uncompressed
+ packsize = packsizes[pstat.stream:pstat.stream + numinstreams]
+ return maxsize, compressed, uncompressed, packsize, folder.solid
+
+ def _filelist_retrieve(self) -> None:
+ # Initialize references for convenience
+ if hasattr(self.header, 'main_streams') and self.header.main_streams is not None:
+ folders = self.header.main_streams.unpackinfo.folders
+ packinfo = self.header.main_streams.packinfo
+ subinfo = self.header.main_streams.substreamsinfo
+ packsizes = packinfo.packsizes
+ unpacksizes = subinfo.unpacksizes if subinfo.unpacksizes is not None else [x.unpacksizes for x in folders]
+ else:
+ subinfo = None
+ folders = None
+ packinfo = None
+ packsizes = []
+ unpacksizes = [0]
+
+ pstat = self.ParseStatus()
+ pstat.src_pos = self.afterheader
+ file_in_solid = 0
+
+ for file_id, file_info in enumerate(self.header.files_info.files):
+ if not file_info['emptystream'] and folders is not None:
+ folder = folders[pstat.folder]
+ numinstreams = max([coder.get('numinstreams', 1) for coder in folder.coders])
+ (maxsize, compressed, uncompressed,
+ packsize, solid) = self._get_fileinfo_sizes(pstat, subinfo, packinfo, folder, packsizes,
+ unpacksizes, file_in_solid, numinstreams)
+ pstat.input += 1
+ folder.solid = solid
+ file_info['folder'] = folder
+ file_info['maxsize'] = maxsize
+ file_info['compressed'] = compressed
+ file_info['uncompressed'] = uncompressed
+ file_info['packsizes'] = packsize
+ if subinfo.digestsdefined[pstat.outstreams]:
+ file_info['digest'] = subinfo.digests[pstat.outstreams]
+ if folder is None:
+ pstat.src_pos += file_info['compressed']
+ else:
+ if folder.solid:
+ file_in_solid += 1
+ pstat.outstreams += 1
+ if folder.files is None:
+ folder.files = ArchiveFileList(offset=file_id)
+ folder.files.append(file_info)
+ if pstat.input >= subinfo.num_unpackstreams_folders[pstat.folder]:
+ file_in_solid = 0
+ pstat.src_pos += sum(packinfo.packsizes[pstat.stream:pstat.stream + numinstreams])
+ pstat.folder += 1
+ pstat.stream += numinstreams
+ pstat.input = 0
+ else:
+ file_info['folder'] = None
+ file_info['maxsize'] = 0
+ file_info['compressed'] = 0
+ file_info['uncompressed'] = [0]
+ file_info['packsizes'] = [0]
+
+ if 'filename' not in file_info:
+ file_info['filename'] = self._gen_filename()
+ self.files.append(file_info)
+
+ def _num_files(self) -> int:
+ if getattr(self.header, 'files_info', None) is not None:
+ return len(self.header.files_info.files)
+ return 0
+
+ def _set_file_property(self, outfilename: pathlib.Path, properties: Dict[str, Any]) -> None:
+ # creation time
+ creationtime = ArchiveTimestamp(properties['lastwritetime']).totimestamp()
+ if creationtime is not None:
+ os.utime(str(outfilename), times=(creationtime, creationtime))
+ if os.name == 'posix':
+ st_mode = properties['posix_mode']
+ if st_mode is not None:
+ outfilename.chmod(st_mode)
+ return
+ # fallback: only set readonly if specified
+ if properties['readonly'] and not properties['is_directory']:
+ ro_mask = 0o777 ^ (stat.S_IWRITE | stat.S_IWGRP | stat.S_IWOTH)
+ outfilename.chmod(outfilename.stat().st_mode & ro_mask)
+
+ def _reset_decompressor(self) -> None:
+ if self.header.main_streams is not None and self.header.main_streams.unpackinfo.numfolders > 0:
+ for i, folder in enumerate(self.header.main_streams.unpackinfo.folders):
+ folder.decompressor = None
+
+ def _reset_worker(self) -> None:
+ """Seek to where archive data start in archive and recreate new worker."""
+ self.fp.seek(self.afterheader)
+ self.worker = Worker(self.files, self.afterheader, self.header)
+
+ def set_encoded_header_mode(self, mode: bool) -> None:
+ self.encoded_header_mode = mode
+
+ @staticmethod
+ def _check_7zfile(fp: Union[BinaryIO, io.BufferedReader]) -> bool:
+ result = MAGIC_7Z == fp.read(len(MAGIC_7Z))[:len(MAGIC_7Z)]
+ fp.seek(-len(MAGIC_7Z), 1)
+ return result
+
+ def _get_method_names(self) -> str:
+ methods_names = [] # type: List[str]
+ for folder in self.header.main_streams.unpackinfo.folders:
+ methods_names += get_methods_names(folder.coders)
+ return ', '.join(x for x in methods_names)
+
+ def _test_digest_raw(self, pos: int, size: int, crc: int) -> bool:
+ self.fp.seek(pos)
+ remaining_size = size
+ digest = None
+ while remaining_size > 0:
+ block = min(READ_BLOCKSIZE, remaining_size)
+ digest = calculate_crc32(self.fp.read(block), digest)
+ remaining_size -= block
+ return digest == crc
+
+ def _test_pack_digest(self) -> bool:
+ self._reset_worker()
+ crcs = self.header.main_streams.packinfo.crcs
+ if crcs is not None and len(crcs) > 0:
+ # check packed stream's crc
+ for i, p in enumerate(self.header.main_streams.packinfo.packpositions):
+ if not self._test_digest_raw(p, self.header.main_streams.packinfo.packsizes[i], crcs[i]):
+ return False
+ return True
+
+ def _test_unpack_digest(self) -> bool:
+ self._reset_worker()
+ for f in self.files:
+ self.worker.register_filelike(f.id, None)
+ try:
+ self.worker.extract(self.fp, parallel=(not self.password_protected)) # TODO: print progress
+ except Bad7zFile:
+ return False
+ else:
+ return True
+
+ def _test_digests(self) -> bool:
+ if self._test_pack_digest():
+ if self._test_unpack_digest():
+ return True
+ return False
+
+ def _prepare_write(self) -> None:
+ self.sig_header = SignatureHeader()
+ self.sig_header._write_skelton(self.fp)
+ self.afterheader = self.fp.tell()
+ self.folder.totalin = 1
+ self.folder.totalout = 1
+ self.folder.bindpairs = []
+ self.folder.unpacksizes = []
+ self.header = Header.build_header([self.folder])
+
+ def _write_archive(self):
+ self.worker.archive(self.fp, self.folder, deref=self.dereference)
+ # Write header and update signature header
+ (header_pos, header_len, header_crc) = self.header.write(self.fp, self.afterheader,
+ encoded=self.encoded_header_mode)
+ self.sig_header.nextheaderofs = header_pos - self.afterheader
+ self.sig_header.calccrc(header_len, header_crc)
+ self.sig_header.write(self.fp)
+ return
+
+ def _is_solid(self):
+ for f in self.header.main_streams.substreamsinfo.num_unpackstreams_folders:
+ if f > 1:
+ return True
+ return False
+
+ def _var_release(self):
+ self._dict = None
+ self.files = None
+ self.folder = None
+ self.header = None
+ self.worker = None
+ self.sig_header = None
+
+ @staticmethod
+ def _make_file_info(target: pathlib.Path, arcname: Optional[str] = None, dereference=False) -> Dict[str, Any]:
+ f = {} # type: Dict[str, Any]
+ f['origin'] = target
+ if arcname is not None:
+ f['filename'] = pathlib.Path(arcname).as_posix()
+ else:
+ f['filename'] = target.as_posix()
+ if os.name == 'nt':
+ fstat = target.lstat()
+ if target.is_symlink():
+ if dereference:
+ fstat = target.stat()
+ if stat.S_ISDIR(fstat.st_mode):
+ f['emptystream'] = True
+ f['attributes'] = fstat.st_file_attributes & FILE_ATTRIBUTE_WINDOWS_MASK # type: ignore # noqa
+ else:
+ f['emptystream'] = False
+ f['attributes'] = stat.FILE_ATTRIBUTE_ARCHIVE # type: ignore # noqa
+ f['uncompressed'] = fstat.st_size
+ else:
+ f['emptystream'] = False
+ f['attributes'] = fstat.st_file_attributes & FILE_ATTRIBUTE_WINDOWS_MASK # type: ignore # noqa
+ # f['attributes'] |= stat.FILE_ATTRIBUTE_REPARSE_POINT # type: ignore # noqa
+ elif target.is_dir():
+ f['emptystream'] = True
+ f['attributes'] = fstat.st_file_attributes & FILE_ATTRIBUTE_WINDOWS_MASK # type: ignore # noqa
+ elif target.is_file():
+ f['emptystream'] = False
+ f['attributes'] = stat.FILE_ATTRIBUTE_ARCHIVE # type: ignore # noqa
+ f['uncompressed'] = fstat.st_size
+ else:
+ fstat = target.lstat()
+ if target.is_symlink():
+ if dereference:
+ fstat = target.stat()
+ if stat.S_ISDIR(fstat.st_mode):
+ f['emptystream'] = True
+ f['attributes'] = stat.FILE_ATTRIBUTE_DIRECTORY # type: ignore # noqa
+ f['attributes'] |= FILE_ATTRIBUTE_UNIX_EXTENSION | (stat.S_IFDIR << 16)
+ f['attributes'] |= (stat.S_IMODE(fstat.st_mode) << 16)
+ else:
+ f['emptystream'] = False
+ f['attributes'] = stat.FILE_ATTRIBUTE_ARCHIVE # type: ignore # noqa
+ f['attributes'] |= FILE_ATTRIBUTE_UNIX_EXTENSION | (stat.S_IMODE(fstat.st_mode) << 16)
+ else:
+ f['emptystream'] = False
+ f['attributes'] = stat.FILE_ATTRIBUTE_ARCHIVE | stat.FILE_ATTRIBUTE_REPARSE_POINT # type: ignore # noqa
+ f['attributes'] |= FILE_ATTRIBUTE_UNIX_EXTENSION | (stat.S_IFLNK << 16)
+ f['attributes'] |= (stat.S_IMODE(fstat.st_mode) << 16)
+ elif target.is_dir():
+ f['emptystream'] = True
+ f['attributes'] = stat.FILE_ATTRIBUTE_DIRECTORY # type: ignore # noqa
+ f['attributes'] |= FILE_ATTRIBUTE_UNIX_EXTENSION | (stat.S_IFDIR << 16)
+ f['attributes'] |= (stat.S_IMODE(fstat.st_mode) << 16)
+ elif target.is_file():
+ f['emptystream'] = False
+ f['uncompressed'] = fstat.st_size
+ f['attributes'] = stat.FILE_ATTRIBUTE_ARCHIVE # type: ignore # noqa
+ f['attributes'] |= FILE_ATTRIBUTE_UNIX_EXTENSION | (stat.S_IMODE(fstat.st_mode) << 16)
+
+ f['creationtime'] = fstat.st_ctime
+ f['lastwritetime'] = fstat.st_mtime
+ f['lastaccesstime'] = fstat.st_atime
+ return f
+
+ # --------------------------------------------------------------------------
+ # The public methods which SevenZipFile provides:
+ def getnames(self) -> List[str]:
+ """Return the members of the archive as a list of their names. It has
+ the same order as the list returned by getmembers().
+ """
+ return list(map(lambda x: x.filename, self.files))
+
+ def archiveinfo(self) -> ArchiveInfo:
+ fstat = os.stat(self.filename)
+ uncompressed = 0
+ for f in self.files:
+ uncompressed += f.uncompressed_size
+ return ArchiveInfo(self.filename, fstat.st_size, self.header.size, self._get_method_names(),
+ self._is_solid(), len(self.header.main_streams.unpackinfo.folders),
+ uncompressed)
+
+ def list(self) -> List[FileInfo]:
+ """Returns contents information """
+ alist = [] # type: List[FileInfo]
+ creationtime = None # type: Optional[datetime.datetime]
+ for f in self.files:
+ if f.lastwritetime is not None:
+ creationtime = filetime_to_dt(f.lastwritetime)
+ alist.append(FileInfo(f.filename, f.compressed, f.uncompressed_size, f.archivable, f.is_directory,
+ creationtime))
+ return alist
+
+ def test(self) -> bool:
+ """Test archive using CRC digests."""
+ return self._test_digests()
+
+ def readall(self) -> Optional[Dict[str, IO[Any]]]:
+ return self._extract(path=None, return_dict=True)
+
+ def extractall(self, path: Optional[Any] = None, callback: Optional[ExtractCallback] = None) -> None:
+ """Extract all members from the archive to the current working
+ directory and set owner, modification time and permissions on
+ directories afterwards. `path' specifies a different directory
+ to extract to.
+ """
+ self._extract(path=path, return_dict=False, callback=callback)
+
+ def read(self, targets: Optional[List[str]] = None) -> Optional[Dict[str, IO[Any]]]:
+ return self._extract(path=None, targets=targets, return_dict=True)
+
+ def extract(self, path: Optional[Any] = None, targets: Optional[List[str]] = None) -> None:
+ self._extract(path, targets, return_dict=False)
+
+ def _extract(self, path: Optional[Any] = None, targets: Optional[List[str]] = None,
+ return_dict: bool = False, callback: Optional[ExtractCallback] = None) -> Optional[Dict[str, IO[Any]]]:
+ if callback is not None and not isinstance(callback, ExtractCallback):
+ raise ValueError('Callback specified is not a subclass of py7zr.callbacks.ExtractCallback class')
+ elif callback is not None:
+ self.reporterd = threading.Thread(target=self.reporter, args=(callback,), daemon=True)
+ self.reporterd.start()
+ target_junction = [] # type: List[pathlib.Path]
+ target_sym = [] # type: List[pathlib.Path]
+ target_files = [] # type: List[Tuple[pathlib.Path, Dict[str, Any]]]
+ target_dirs = [] # type: List[pathlib.Path]
+ if path is not None:
+ if isinstance(path, str):
+ path = pathlib.Path(path)
+ try:
+ if not path.exists():
+ path.mkdir(parents=True)
+ else:
+ pass
+ except OSError as e:
+ if e.errno == errno.EEXIST and path.is_dir():
+ pass
+ else:
+ raise e
+ fnames = [] # type: List[str] # check duplicated filename in one archive?
+ self.q.put(('pre', None, None))
+ for f in self.files:
+ # TODO: sanity check
+ # check whether f.filename with invalid characters: '../'
+ if f.filename.startswith('../'):
+ raise Bad7zFile
+ # When archive has a multiple files which have same name
+ # To guarantee order of archive, multi-thread decompression becomes off.
+ # Currently always overwrite by latter archives.
+ # TODO: provide option to select overwrite or skip.
+ if f.filename not in fnames:
+ outname = f.filename
+ else:
+ i = 0
+ while True:
+ outname = f.filename + '_%d' % i
+ if outname not in fnames:
+ break
+ fnames.append(outname)
+ if path is not None:
+ outfilename = path.joinpath(outname)
+ else:
+ outfilename = pathlib.Path(outname)
+ if os.name == 'nt':
+ if outfilename.is_absolute():
+ # hack for microsoft windows path length limit < 255
+ outfilename = pathlib.WindowsPath('\\\\?\\' + str(outfilename))
+ if targets is not None and f.filename not in targets:
+ self.worker.register_filelike(f.id, None)
+ continue
+ if f.is_directory:
+ if not outfilename.exists():
+ target_dirs.append(outfilename)
+ target_files.append((outfilename, f.file_properties()))
+ else:
+ pass
+ elif f.is_socket:
+ pass
+ elif return_dict:
+ fname = outfilename.as_posix()
+ _buf = io.BytesIO()
+ self._dict[fname] = _buf
+ self.worker.register_filelike(f.id, MemIO(_buf))
+ elif f.is_symlink:
+ target_sym.append(outfilename)
+ try:
+ if outfilename.exists():
+ outfilename.unlink()
+ except OSError as ose:
+ if ose.errno not in [errno.ENOENT]:
+ raise
+ self.worker.register_filelike(f.id, outfilename)
+ elif f.is_junction:
+ target_junction.append(outfilename)
+ self.worker.register_filelike(f.id, outfilename)
+ else:
+ self.worker.register_filelike(f.id, outfilename)
+ target_files.append((outfilename, f.file_properties()))
+ for target_dir in sorted(target_dirs):
+ try:
+ target_dir.mkdir()
+ except FileExistsError:
+ if target_dir.is_dir():
+ # skip rare case
+ pass
+ elif target_dir.is_file():
+ raise Exception("Directory name is existed as a normal file.")
+ else:
+ raise Exception("Directory making fails on unknown condition.")
+
+ if callback is not None:
+ self.worker.extract(self.fp, parallel=(not self.password_protected and not self._filePassed), q=self.q)
+ else:
+ self.worker.extract(self.fp, parallel=(not self.password_protected and not self._filePassed))
+
+ self.q.put(('post', None, None))
+ if return_dict:
+ return self._dict
+ else:
+ # create symbolic links on target path as a working directory.
+ # if path is None, work on current working directory.
+ for t in target_sym:
+ sym_dst = t.resolve()
+ with sym_dst.open('rb') as b:
+ sym_src = b.read().decode(encoding='utf-8') # symlink target name stored in utf-8
+ sym_dst.unlink() # unlink after close().
+ sym_dst.symlink_to(pathlib.Path(sym_src))
+ # create junction point only on windows platform
+ if sys.platform.startswith('win'):
+ for t in target_junction:
+ junction_dst = t.resolve()
+ with junction_dst.open('rb') as b:
+ junction_target = pathlib.Path(b.read().decode(encoding='utf-8'))
+ junction_dst.unlink()
+ _winapi.CreateJunction(junction_target, str(junction_dst)) # type: ignore # noqa
+ # set file properties
+ for o, p in target_files:
+ self._set_file_property(o, p)
+ return None
+
+ def reporter(self, callback: ExtractCallback):
+ while True:
+ try:
+ item: Optional[Tuple[str, str, str]] = self.q.get(timeout=1)
+ except queue.Empty:
+ pass
+ else:
+ if item is None:
+ break
+ elif item[0] == 's':
+ callback.report_start(item[1], item[2])
+ elif item[0] == 'e':
+ callback.report_end(item[1], item[2])
+ elif item[0] == 'pre':
+ callback.report_start_preparation()
+ elif item[0] == 'post':
+ callback.report_postprocess()
+ elif item[0] == 'w':
+ callback.report_warning(item[1])
+ else:
+ pass
+ self.q.task_done()
+
+ def writeall(self, path: Union[pathlib.Path, str], arcname: Optional[str] = None):
+ """Write files in target path into archive."""
+ if isinstance(path, str):
+ path = pathlib.Path(path)
+ if not path.exists():
+ raise ValueError("specified path does not exist.")
+ if path.is_dir() or path.is_file():
+ self._writeall(path, arcname)
+ else:
+ raise ValueError("specified path is not a directory or a file")
+
+ def _writeall(self, path, arcname):
+ try:
+ if path.is_symlink() and not self.dereference:
+ self.write(path, arcname)
+ elif path.is_file():
+ self.write(path, arcname)
+ elif path.is_dir():
+ if not path.samefile('.'):
+ self.write(path, arcname)
+ for nm in sorted(os.listdir(str(path))):
+ arc = os.path.join(arcname, nm) if arcname is not None else None
+ self._writeall(path.joinpath(nm), arc)
+ else:
+ return # pathlib ignores ELOOP and return False for is_*().
+ except OSError as ose:
+ if self.dereference and ose.errno in [errno.ELOOP]:
+ return # ignore ELOOP here, this resulted to stop looped symlink reference.
+ elif self.dereference and sys.platform == 'win32' and ose.errno in [errno.ENOENT]:
+ return # ignore ENOENT which is happened when a case of ELOOP on windows.
+ else:
+ raise
+
+ def write(self, file: Union[pathlib.Path, str], arcname: Optional[str] = None):
+ """Write single target file into archive(Not implemented yet)."""
+ if isinstance(file, str):
+ path = pathlib.Path(file)
+ elif isinstance(file, pathlib.Path):
+ path = file
+ else:
+ raise ValueError("Unsupported file type.")
+ file_info = self._make_file_info(path, arcname, self.dereference)
+ self.files.append(file_info)
+
+ def close(self):
+ """Flush all the data into archive and close it.
+ When close py7zr start reading target and writing actual archive file.
+ """
+ if 'w' in self.mode:
+ self._write_archive()
+ if 'r' in self.mode:
+ if self.reporterd is not None:
+ self.q.put_nowait(None)
+ self.reporterd.join(1)
+ if self.reporterd.is_alive():
+ raise InternalError("Progress report thread terminate error.")
+ self.reporterd = None
+ self._fpclose()
+ self._var_release()
+
+ def reset(self) -> None:
+ """When read mode, it reset file pointer, decompress worker and decompressor"""
+ if self.mode == 'r':
+ self._reset_worker()
+ self._reset_decompressor()
+
+
+# --------------------
+# exported functions
+# --------------------
+def is_7zfile(file: Union[BinaryIO, str, pathlib.Path]) -> bool:
+ """Quickly see if a file is a 7Z file by checking the magic number.
+ The file argument may be a filename or file-like object too.
+ """
+ result = False
+ try:
+ if isinstance(file, io.IOBase) and hasattr(file, "read"):
+ result = SevenZipFile._check_7zfile(file) # type: ignore # noqa
+ elif isinstance(file, str):
+ with open(file, 'rb') as fp:
+ result = SevenZipFile._check_7zfile(fp)
+ elif isinstance(file, pathlib.Path) or isinstance(file, pathlib.PosixPath) or \
+ isinstance(file, pathlib.WindowsPath):
+ with file.open(mode='rb') as fp: # type: ignore # noqa
+ result = SevenZipFile._check_7zfile(fp)
+ else:
+ raise TypeError('invalid type: file should be str, pathlib.Path or BinaryIO, but {}'.format(type(file)))
+ except OSError:
+ pass
+ return result
+
+
+def unpack_7zarchive(archive, path, extra=None):
+ """Function for registering with shutil.register_unpack_format()"""
+ arc = SevenZipFile(archive)
+ arc.extractall(path)
+ arc.close()
+
+
+def pack_7zarchive(base_name, base_dir, owner=None, group=None, dry_run=None, logger=None):
+ """Function for registering with shutil.register_archive_format()"""
+ target_name = '{}.7z'.format(base_name)
+ archive = SevenZipFile(target_name, mode='w')
+ archive.writeall(path=base_dir)
+ archive.close()
diff --git a/libs/py7zr/win32compat.py b/libs/py7zr/win32compat.py
new file mode 100644
index 000000000..dc72bfdf3
--- /dev/null
+++ b/libs/py7zr/win32compat.py
@@ -0,0 +1,174 @@
+import pathlib
+import stat
+import sys
+from logging import getLogger
+from typing import Union
+
+if sys.platform == "win32":
+ import ctypes
+ from ctypes.wintypes import BOOL, DWORD, HANDLE, LPCWSTR, LPDWORD, LPVOID, LPWSTR
+
+ _stdcall_libraries = {}
+ _stdcall_libraries['kernel32'] = ctypes.WinDLL('kernel32')
+ CloseHandle = _stdcall_libraries['kernel32'].CloseHandle
+ CreateFileW = _stdcall_libraries['kernel32'].CreateFileW
+ DeviceIoControl = _stdcall_libraries['kernel32'].DeviceIoControl
+ GetFileAttributesW = _stdcall_libraries['kernel32'].GetFileAttributesW
+ OPEN_EXISTING = 3
+ GENERIC_READ = 2147483648
+ FILE_FLAG_OPEN_REPARSE_POINT = 0x00200000
+ FSCTL_GET_REPARSE_POINT = 0x000900A8
+ FILE_FLAG_BACKUP_SEMANTICS = 0x02000000
+ IO_REPARSE_TAG_MOUNT_POINT = 0xA0000003
+ IO_REPARSE_TAG_SYMLINK = 0xA000000C
+ MAXIMUM_REPARSE_DATA_BUFFER_SIZE = 16 * 1024
+
+ def _check_bit(val: int, flag: int) -> bool:
+ return bool(val & flag == flag)
+
+ class SymbolicLinkReparseBuffer(ctypes.Structure):
+ """ Implementing the below in Python:
+
+ typedef struct _REPARSE_DATA_BUFFER {
+ ULONG ReparseTag;
+ USHORT ReparseDataLength;
+ USHORT Reserved;
+ union {
+ struct {
+ USHORT SubstituteNameOffset;
+ USHORT SubstituteNameLength;
+ USHORT PrintNameOffset;
+ USHORT PrintNameLength;
+ ULONG Flags;
+ WCHAR PathBuffer[1];
+ } SymbolicLinkReparseBuffer;
+ struct {
+ USHORT SubstituteNameOffset;
+ USHORT SubstituteNameLength;
+ USHORT PrintNameOffset;
+ USHORT PrintNameLength;
+ WCHAR PathBuffer[1];
+ } MountPointReparseBuffer;
+ struct {
+ UCHAR DataBuffer[1];
+ } GenericReparseBuffer;
+ } DUMMYUNIONNAME;
+ } REPARSE_DATA_BUFFER, *PREPARSE_DATA_BUFFER;
+ """
+ # See https://docs.microsoft.com/en-us/windows-hardware/drivers/ddi/content/ntifs/ns-ntifs-_reparse_data_buffer
+ _fields_ = [
+ ('flags', ctypes.c_ulong),
+ ('path_buffer', ctypes.c_byte * (MAXIMUM_REPARSE_DATA_BUFFER_SIZE - 20))
+ ]
+
+ class MountReparseBuffer(ctypes.Structure):
+ _fields_ = [
+ ('path_buffer', ctypes.c_byte * (MAXIMUM_REPARSE_DATA_BUFFER_SIZE - 16)),
+ ]
+
+ class ReparseBufferField(ctypes.Union):
+ _fields_ = [
+ ('symlink', SymbolicLinkReparseBuffer),
+ ('mount', MountReparseBuffer)
+ ]
+
+ class ReparseBuffer(ctypes.Structure):
+ _anonymous_ = ("u",)
+ _fields_ = [
+ ('reparse_tag', ctypes.c_ulong),
+ ('reparse_data_length', ctypes.c_ushort),
+ ('reserved', ctypes.c_ushort),
+ ('substitute_name_offset', ctypes.c_ushort),
+ ('substitute_name_length', ctypes.c_ushort),
+ ('print_name_offset', ctypes.c_ushort),
+ ('print_name_length', ctypes.c_ushort),
+ ('u', ReparseBufferField)
+ ]
+
+ def is_reparse_point(path: Union[str, pathlib.Path]) -> bool:
+ GetFileAttributesW.argtypes = [LPCWSTR]
+ GetFileAttributesW.restype = DWORD
+ return _check_bit(GetFileAttributesW(str(path)), stat.FILE_ATTRIBUTE_REPARSE_POINT)
+
+ def readlink(path: Union[str, pathlib.Path]) -> Union[str, pathlib.WindowsPath]:
+ # FILE_FLAG_OPEN_REPARSE_POINT alone is not enough if 'path'
+ # is a symbolic link to a directory or a NTFS junction.
+ # We need to set FILE_FLAG_BACKUP_SEMANTICS as well.
+ # See https://docs.microsoft.com/en-us/windows/desktop/api/fileapi/nf-fileapi-createfilea
+
+ # description from _winapi.c:601
+ # /* REPARSE_DATA_BUFFER usage is heavily under-documented, especially for
+ # junction points. Here's what I've learned along the way:
+ # - A junction point has two components: a print name and a substitute
+ # name. They both describe the link target, but the substitute name is
+ # the physical target and the print name is shown in directory listings.
+ # - The print name must be a native name, prefixed with "\??\".
+ # - Both names are stored after each other in the same buffer (the
+ # PathBuffer) and both must be NUL-terminated.
+ # - There are four members defining their respective offset and length
+ # inside PathBuffer: SubstituteNameOffset, SubstituteNameLength,
+ # PrintNameOffset and PrintNameLength.
+ # - The total size we need to allocate for the REPARSE_DATA_BUFFER, thus,
+ # is the sum of:
+ # - the fixed header size (REPARSE_DATA_BUFFER_HEADER_SIZE)
+ # - the size of the MountPointReparseBuffer member without the PathBuffer
+ # - the size of the prefix ("\??\") in bytes
+ # - the size of the print name in bytes
+ # - the size of the substitute name in bytes
+ # - the size of two NUL terminators in bytes */
+
+ target_is_path = isinstance(path, pathlib.Path)
+ if target_is_path:
+ target = str(path)
+ else:
+ target = path
+ CreateFileW.argtypes = [LPWSTR, DWORD, DWORD, LPVOID, DWORD, DWORD, HANDLE]
+ CreateFileW.restype = HANDLE
+ DeviceIoControl.argtypes = [HANDLE, DWORD, LPVOID, DWORD, LPVOID, DWORD, LPDWORD, LPVOID]
+ DeviceIoControl.restype = BOOL
+ handle = HANDLE(CreateFileW(target, GENERIC_READ, 0, None, OPEN_EXISTING,
+ FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OPEN_REPARSE_POINT, 0))
+ buf = ReparseBuffer()
+ ret = DWORD(0)
+ status = DeviceIoControl(handle, FSCTL_GET_REPARSE_POINT, None, 0, ctypes.byref(buf),
+ MAXIMUM_REPARSE_DATA_BUFFER_SIZE, ctypes.byref(ret), None)
+ CloseHandle(handle)
+ if not status:
+ logger = getLogger(__file__)
+ logger.error("Failed IOCTL access to REPARSE_POINT {})".format(target))
+ raise ValueError("not a symbolic link or access permission violation")
+
+ if buf.reparse_tag == IO_REPARSE_TAG_SYMLINK:
+ offset = buf.substitute_name_offset
+ ending = offset + buf.substitute_name_length
+ rpath = bytearray(buf.symlink.path_buffer)[offset:ending].decode('UTF-16-LE')
+ elif buf.reparse_tag == IO_REPARSE_TAG_MOUNT_POINT:
+ offset = buf.substitute_name_offset
+ ending = offset + buf.substitute_name_length
+ rpath = bytearray(buf.mount.path_buffer)[offset:ending].decode('UTF-16-LE')
+ else:
+ raise ValueError("not a symbolic link")
+ # on posixmodule.c:7859 in py38, we do that
+ # ```
+ # else if (rdb->ReparseTag == IO_REPARSE_TAG_MOUNT_POINT)
+ # {
+ # name = (wchar_t *)((char*)rdb->MountPointReparseBuffer.PathBuffer +
+ # rdb->MountPointReparseBuffer.SubstituteNameOffset);
+ # nameLen = rdb->MountPointReparseBuffer.SubstituteNameLength / sizeof(wchar_t);
+ # }
+ # else
+ # {
+ # PyErr_SetString(PyExc_ValueError, "not a symbolic link");
+ # }
+ # if (nameLen > 4 && wcsncmp(name, L"\\??\\", 4) == 0) {
+ # /* Our buffer is mutable, so this is okay */
+ # name[1] = L'\\';
+ # }
+ # ```
+ # so substitute prefix here.
+ if rpath.startswith('\\??\\'):
+ rpath = '\\\\' + rpath[2:]
+ if target_is_path:
+ return pathlib.WindowsPath(rpath)
+ else:
+ return rpath