Source code for pyquickhelper.filehelper.winzipfile

"""
Fix a bug: see https://bugs.python.org/issue6839.


:githublink:`%|py|5`
"""
import sys
import struct
from zipfile import ZipFile, ZipInfo, ZipExtFile, _ZipDecrypter, BadZipFile
from zipfile import _FH_EXTRA_FIELD_LENGTH, _FH_FILENAME_LENGTH, _FH_SIGNATURE
from zipfile import stringFileHeader, structFileHeader, sizeFileHeader, _SharedFile


[docs]class WinZipFile(ZipFile): """ Overwrite method :epkg:`*py:zipfile:ZipFile:open`. Issue `6839 <https://bugs.python.org/issue6839>`_ happens when a zip file is created on Windows. The created zip may contain full path with ``\\`` when the file list only contains ``/``. This raises exception ``BadZipFile`` with the following message: *File name in directory ... and header ... differ* due to a mismatch between backslashes. This owerwrite method :epkg:`*py:zipfile:ZipFile:open` to fix the line which checks that names are consistent in the file list and in the compressed content. :githublink:`%|py|24` """
[docs] def open(self, name, mode="r", pwd=None, *, force_zip64=False): """ Returns file-like object for 'name'. :param name: is a string for the file name within the ZIP file, or a ZipInfo object. :param mode: should be 'r' to read a file already in the ZIP file, or 'w' to write to a file newly added to the archive. :param pwd: is the password to decrypt files (only used for reading). When writing, if the file size is not known in advance but may exceed 2 GiB, pass force_zip64 to use the ZIP64 format, which can handle large files. If the size is known in advance, it is best to pass a ZipInfo instance for name, with zinfo.file_size set. :githublink:`%|py|40` """ if mode not in {"r", "w"}: raise ValueError('open() requires mode "r" or "w"') if pwd and not isinstance(pwd, bytes): raise TypeError("pwd: expected bytes, got %s" % type(pwd).__name__) if pwd and (mode == "w"): raise ValueError("pwd is only supported for reading files") if not self.fp: raise ValueError( "Attempt to use ZIP archive that was already closed") # Make sure we have an info object if isinstance(name, ZipInfo): # 'name' is already an info object zinfo = name elif mode == 'w': zinfo = ZipInfo(name) zinfo.compress_type = self.compression else: # Get info object for name zinfo = self.getinfo(name) if mode == 'w': return self._open_to_write(zinfo, force_zip64=force_zip64) if hasattr(self, "_writing") and self._writing: raise ValueError("Can't read from the ZIP file while there " "is an open writing handle on it. " "Close the writing handle before trying to read.") # Open for reading: self._fileRefCnt += 1 if sys.version_info[:2] <= (3, 5): zef_file = _SharedFile( # pylint: disable=E1120 self.fp, zinfo.header_offset, self._fpclose, self._lock) zef_file = _SharedFile(self.fp, zinfo.header_offset, self._fpclose, self._lock, lambda: hasattr(self, "_writing") and self._writing) try: # Skip the file header: fheader = zef_file.read(sizeFileHeader) if len(fheader) != sizeFileHeader: raise BadZipFile("Truncated file header") fheader = struct.unpack(structFileHeader, fheader) if fheader[_FH_SIGNATURE] != stringFileHeader: raise BadZipFile("Bad magic number for file header") fname = zef_file.read(fheader[_FH_FILENAME_LENGTH]) if fheader[_FH_EXTRA_FIELD_LENGTH]: zef_file.read(fheader[_FH_EXTRA_FIELD_LENGTH]) if zinfo.flag_bits & 0x20: # Zip 2.7: compressed patched data raise NotImplementedError( "compressed patched data (flag bit 5)") if zinfo.flag_bits & 0x40: # strong encryption raise NotImplementedError("strong encryption (flag bit 6)") if zinfo.flag_bits & 0x800: # UTF-8 filename fname_str = fname.decode("utf-8") else: fname_str = fname.decode("cp437") if sys.platform.startswith("win"): if fname_str.replace("\\", "/") != zinfo.orig_filename.replace("\\", "/"): raise BadZipFile( 'File name in directory %r and header %r differ.' % (zinfo.orig_filename, fname)) else: if fname_str != zinfo.orig_filename: raise BadZipFile( 'File name in directory %r and header %r differ.' % (zinfo.orig_filename, fname)) # check for encrypted flag & handle password is_encrypted = zinfo.flag_bits & 0x1 zd = None if is_encrypted: if not pwd: pwd = self.pwd if not pwd: raise RuntimeError("File %r is encrypted, password " "required for extraction" % name) zd = _ZipDecrypter(pwd) # The first 12 bytes in the cypher stream is an encryption header # used to strengthen the algorithm. The first 11 bytes are # completely random, while the 12th contains the MSB of the CRC, # or the MSB of the file time depending on the header type # and is used to check the correctness of the password. header = zef_file.read(12) h = list(map(zd, header[0:12])) if zinfo.flag_bits & 0x8: # compare against the file type from extended local headers check_byte = (zinfo._raw_time >> 8) & 0xff else: # compare against the CRC otherwise check_byte = (zinfo.CRC >> 24) & 0xff if h[11] != check_byte: raise RuntimeError("Bad password for file %r" % name) return ZipExtFile(zef_file, mode, zinfo, zd, True) except Exception: zef_file.close() raise