Source code for pyquickhelper.filehelper.compression_helper

"""
Functions about compressing files.


:githublink:`%|py|5`
"""

import os
import zipfile
import datetime
import gzip
import sys
import warnings
import tarfile
from io import BytesIO

from ..loghelper.flog import noLOG, run_cmd
from .fexceptions import FileException
from ..texthelper.diacritic_helper import remove_diacritics
from .synchelper import explore_folder


[docs]def zip_files(filename, file_set, root=None, fLOG=noLOG): """ Zips all files from an iterator. :param filename: final zip file (can be None) :param file_set: iterator on file to add :param root: if not None, all path are relative to this path :param fLOG: logging function :return: number of added files (or content if filename is None) *filename* can be None, the function compresses into bytes without saving the results. :githublink:`%|py|33` """ nb = 0 a1980 = datetime.datetime(1980, 1, 1) if filename is None: filename = BytesIO() with zipfile.ZipFile(filename, 'w') as myzip: for file in file_set: if not os.path.exists(file): continue if fLOG: fLOG("[zip_files] '{0}'".format(file)) st = os.stat(file) atime = datetime.datetime.fromtimestamp(st.st_atime) mtime = datetime.datetime.fromtimestamp(st.st_mtime) if atime < a1980 or mtime < a1980: # pragma: no cover new_mtime = st.st_mtime + (4 * 3600) # new modification time while datetime.datetime.fromtimestamp(new_mtime) < a1980: new_mtime += (4 * 3600) # new modification time fLOG( "[zip_files] changing time timestamp for file '{0}'".format(file)) os.utime(file, (st.st_atime, new_mtime)) arcname = os.path.relpath(file, root) if root else None myzip.write(file, arcname=arcname) nb += 1 return filename.getvalue() if isinstance(filename, BytesIO) else nb
[docs]def unzip_files(zipf, where_to=None, fLOG=noLOG, fvalid=None, remove_space=True, fail_if_error=True): """ Unzips files from a zip archive. :param zipf: archive (or bytes or BytesIO) :param where_to: destination folder (can be None, the result is a list of tuple) :param fLOG: logging function :param fvalid: function which takes two paths (zip name, local name) and return True if the file must be unzipped, False otherwise, if None, the default answer is True :param remove_space: remove spaces in created local path (+ ``',()``) :param fail_if_error: fails if an error is encountered (typically a weird character in a filename), otherwise a warning is thrown. :return: list of unzipped files :githublink:`%|py|77` """ if isinstance(zipf, bytes): zipf = BytesIO(zipf) try: with zipfile.ZipFile(zipf, "r"): pass except zipfile.BadZipFile as e: # pragma: no cover if isinstance(zipf, BytesIO): raise e raise IOError("Unable to read file '{0}'".format(zipf)) from e files = [] with zipfile.ZipFile(zipf, "r") as file: for info in file.infolist(): if fLOG: fLOG("[unzip_files] unzip '{0}'".format(info.filename)) if where_to is None: try: content = file.read(info.filename) except zipfile.BadZipFile as e: # pragma: no cover if fail_if_error: raise zipfile.BadZipFile( "Unable to extract '{0}' due to {1}".format(info.filename, e)) from e warnings.warn( "Unable to extract '{0}' due to {1}".format(info.filename, e), UserWarning) continue files.append((info.filename, content)) else: clean = remove_diacritics(info.filename) if remove_space: clean = clean.replace(" ", "").replace("'", "").replace(",", "_") \ .replace("(", "_").replace(")", "_") tos = os.path.join(where_to, clean) if not os.path.exists(tos): if fvalid and not fvalid(info.filename, tos): fLOG("[unzip_files] skipping", info.filename) continue try: data = file.read(info.filename) except zipfile.BadZipFile as e: # pragma: no cover if fail_if_error: raise zipfile.BadZipFile( "Unable to extract '{0}' due to {1}".format(info.filename, e)) from e warnings.warn( "Unable to extract '{0}' due to {1}".format(info.filename, e), UserWarning) continue # check encoding to avoid characters not allowed in paths if not os.path.exists(tos): if sys.platform.startswith("win"): tos = tos.replace("/", "\\") finalfolder = os.path.split(tos)[0] if not os.path.exists(finalfolder): fLOG("[unzip_files] creating folder (zip)", os.path.abspath(finalfolder)) try: os.makedirs(finalfolder) except FileNotFoundError as e: mes = "Unexpected error\ninfo.filename={0}\ntos={1}\nfinalfolder={2}\nlen(nfinalfolder)={3}".format( info.filename, tos, finalfolder, len(finalfolder)) raise FileNotFoundError(mes) from e if not info.filename.endswith("/"): try: with open(tos, "wb") as u: u.write(data) except FileNotFoundError as e: # pragma: no cover # probably an issue in the path name # the next lines are just here to distinguish # between the two cases if not os.path.exists(finalfolder): raise e newname = info.filename.replace( " ", "_").replace(",", "_") if sys.platform.startswith("win"): newname = newname.replace("/", "\\") tos = os.path.join(where_to, newname) finalfolder = os.path.split(tos)[0] if not os.path.exists(finalfolder): fLOG("[unzip_files] creating folder (zip)", os.path.abspath(finalfolder)) os.makedirs(finalfolder) with open(tos, "wb") as u: u.write(data) files.append(tos) fLOG("[unzip_files] unzipped ", info.filename, " to ", tos) elif not tos.endswith("/"): files.append(tos) elif not info.filename.endswith("/"): files.append(tos) return files
[docs]def gzip_files(filename, file_set, encoding=None, fLOG=noLOG): """ Compresses all files from an iterator in a zip file and then in a gzip file. :param filename: final gzip file (double compression, extension should something like .zip.gz) :param file_set: iterator on file to add :param encoding: encoding of input files (no double compression then) :param fLOG: logging function :return: bytes (if filename is None) or None :githublink:`%|py|180` """ if filename is None: filename = BytesIO() if encoding is None: content = zip_files(None, file_set, fLOG=fLOG) f = gzip.open(filename, 'wb') f.write(content) f.close() return filename.getvalue() if isinstance(filename, BytesIO) else None f = gzip.open(filename, 'wt', encoding="utf-8") for name in file_set: with open(name, "r", encoding="utf-8") as ft: content = ft.read() f.write(content) f.close() return filename.getvalue() if isinstance(filename, BytesIO) else None
[docs]def ungzip_files(filename, where_to=None, fLOG=noLOG, fvalid=None, remove_space=True, unzip=True, encoding=None): """ Uncompresses files from a gzip file. :param filename: final gzip file (double compression, extension should something like .zip.gz) :param where_to: destination folder (can be None, the result is a list of tuple) :param fLOG: logging function :param fvalid: function which takes two paths (zip name, local name) and return True if the file must be unzipped, False otherwise, if None, the default answer is True :param remove_space: remove spaces in created local path (+ ``',()``) :param unzip: unzip file after gzip :param encoding: encoding :return: list of unzipped files :githublink:`%|py|212` """ if isinstance(filename, bytes): is_file = False filename = BytesIO(filename) else: is_file = True if encoding is None: f = gzip.open(filename, 'rb') content = f.read() f.close() if unzip: try: return unzip_files(content, where_to=where_to, fLOG=fLOG) except Exception as e: # pragma: no cover raise IOError( "Unable to unzip file '{0}'".format(filename)) from e elif where_to is not None: filename = os.path.split(filename)[-1].replace(".gz", "") filename = os.path.join(where_to, filename) with open(filename, "wb") as f: f.write(content) return filename return content else: f = gzip.open(filename, 'rt', encoding="utf-8") content = f.read() f.close() if is_file: filename = filename.replace(".gz", "") with open(filename, "wb") as f: f.write(content) return filename return content
[docs]def zip7_files(filename_7z, file_set, fLOG=noLOG, temp_folder="."): """ If :epkg:`7z` is installed, the function uses it to compress file into 7z format. The file *filename_7z* must not exist. :param filename_7z: final destination :param file_set: list of files to compress :param fLOG: logging function :param temp_folder: the function stores the list of files in a file in the folder *temp_folder*, it will be removed afterwords :return: number of added files .. faqref:: :title: Why module pylzma does not work? :lid: faq-pylzma-ref The module :epkg:`pylzma` failed to decompress the file produced by the latest version of :epkg:`7z` (2016-09-23). The compression was changed by tweaking the command line. LZMA is used instead LZMA2. The current version does not include this `commit <https://github.com/fancycode/pylzma/commit/b5c3c2bd4ab7abfb65de772861ecc600fe37394b>`_. Or you can clone the package `sdpython.pylzma <https://github.com/sdpython/pylzma>`_ and build it yourself with ``python setup.py bdist_wheel``. :githublink:`%|py|273` """ if sys.platform.startswith("win"): # pragma: no cover exe = r"C:\Program Files\7-Zip\7z.exe" if not os.path.exists(exe): raise FileNotFoundError("unable to find: {0}".format(exe)) elif sys.platform.startswith("darwin"): exe = "7za" else: exe = "7z" if os.path.exists(filename_7z): raise FileException( # pragma: no cover "'{0}' already exists".format(filename_7z)) notxist = [fn for fn in file_set if not os.path.exists(fn)] if len(notxist) > 0: raise FileNotFoundError( # pragma: no cover "unable to compress unexisting files:\n{0}".format("\n".join(notxist))) flist = os.path.join(temp_folder, "listfiles7z.txt") with open(flist, "w", encoding="utf8") as f: f.write("\n".join(file_set)) cmd = '"{0}" -m0=lzma -mfb=64 a "{1}" "@{2}"'.format( exe, filename_7z, flist) out, err = run_cmd(cmd, wait=True) if "Error:" in out or not os.path.exists(filename_7z): raise FileException( # pragma: no cover "An error occurred with cmd: '{0}'\n" "--OUT--\n{1}\n--ERR--\n{2}\n----".format( cmd, out, err)) return len(file_set)
[docs]def un7zip_files(zipf, where_to=None, fLOG=noLOG, fvalid=None, remove_space=True, cmd_line=False): """ Unzips files from a zip archive compress with :epkg:`7z`. :param zipf: archive (or bytes or BytesIO) :param where_to: destination folder (can be None, the result is a list of tuple) :param fLOG: logging function :param fvalid: function which takes two paths (zip name, local name) and return True if the file must be unzipped, False otherwise, if None, the default answer is True :param remove_space: remove spaces in created local path (+ ``',()``) :param cmd_line: use command line instead of module :epkg:`pylzma` :return: list of unzipped files The function requires module :epkg:`pylzma`. See :ref:`Why module pylzma does not work? <faq-pylzma-ref>`. :githublink:`%|py|323` """ if cmd_line: if not isinstance(zipf, str # unicode ): raise TypeError("Cannot use command line unless zipf is a file.") if remove_space: warnings.warn( '[un7zip_files] remove_space and cmd_line are incompatible options.', UserWarning) if fvalid: warnings.warn( 'fvalid and cmd_line are incompatible options.', UserWarning) if sys.platform.startswith("win"): # pragma: no cover exe = r"C:\Program Files\7-Zip\7z.exe" if not os.path.exists(exe): raise FileNotFoundError("unable to find: {0}".format(exe)) if where_to is None: where_to = os.path.abspath(".") elif sys.platform.startswith("darwin"): exe = "7za" else: exe = "7z" cmd = '"{0}" x "{1}" -o{2}'.format(exe, zipf, where_to) out, err = run_cmd(cmd, wait=True, fLOG=fLOG) if len(err) > 0 or "Error:" in out: raise FileException( # pragma: no cover "Unable to un-7zip file '{0}'\n--CMD--\n{3}\n--OUT--\n{1}\n--ERR--\n{2}".format(zipf, out, err, cmd)) return explore_folder(where_to)[1] else: from py7zlib import Archive7z, FormatError file_zipf = None if not isinstance(zipf, BytesIO): file_zipf = zipf if isinstance(zipf, bytes): zipf = BytesIO(zipf) else: zipf = open(zipf, "rb") files = [] try: file = Archive7z(zipf) except FormatError as e: raise FileException( # pragma: no cover "You should use a modified version available at https://github.com/sdpython/pylzma") from e for info in file.files: if where_to is None: files.append((info.filename, info.read())) else: clean = remove_diacritics(info.filename) if remove_space: clean = clean.replace(" ", "").replace("'", "") \ .replace(",", "_").replace("(", "_") \ .replace(")", "_") tos = os.path.join(where_to, clean) if not os.path.exists(tos): if fvalid and not fvalid(info.filename, tos): fLOG("[un7zip_files] skipping", info.filename) continue try: data = info.read() except NotImplementedError as e: # pragma: no cover # You should use command line. if file_zipf is None: raise TypeError( "Cannot switch to command line unless zipf is a file.") warnings.warn( "[un7zip_files] '{0}' --> Unavailable format. Use command line.".format(zipf), UserWarning) return un7zip_files(file_zipf, where_to=where_to, fLOG=fLOG, fvalid=fvalid, remove_space=remove_space, cmd_line=True) except Exception as e: # pragma: no cover raise FileException("Unable to unzip file '{0}' from '{1}'".format( info.filename, zipf)) from e # check encoding to avoid characters not allowed in paths if not os.path.exists(tos): if sys.platform.startswith("win"): tos = tos.replace("/", "\\") finalfolder = os.path.split(tos)[0] if not os.path.exists(finalfolder): fLOG("[un7zip_files] creating folder (7z)", os.path.abspath(finalfolder)) try: os.makedirs(finalfolder) except FileNotFoundError as e: mes = "Unexpected error\ninfo.filename={0}\ntos={1}\nfinalfolder={2}\nlen(nfinalfolder)={3}".format( info.filename, tos, finalfolder, len(finalfolder)) raise FileNotFoundError(mes) from e if not info.filename.endswith("/"): try: with open(tos, "wb") as u: u.write(data) except FileNotFoundError as e: # probably an issue in the path name # the next lines are just here to distinguish # between the two cases if not os.path.exists(finalfolder): raise e newname = info.filename.replace( " ", "_").replace(",", "_") if sys.platform.startswith("win"): newname = newname.replace("/", "\\") tos = os.path.join(where_to, newname) finalfolder = os.path.split(tos)[0] if not os.path.exists(finalfolder): fLOG("[un7zip_files] creating folder (7z)", os.path.abspath(finalfolder)) os.makedirs(finalfolder) with open(tos, "wb") as u: u.write(data) files.append(tos) fLOG("[un7zip_files] unzipped ", info.filename, " to ", tos) elif not tos.endswith("/"): files.append(tos) elif not info.filename.endswith("/"): files.append(tos) return files
[docs]def unrar_files(zipf, where_to=None, fLOG=noLOG, fvalid=None, remove_space=True): """ Uncompresses files from a rar archive compress with :epkg:`7z` on Window or *unrar* on linux. :param zipf: archive (or bytes or BytesIO) :param where_to: destination folder (can be None, the result is a list of tuple) :param fLOG: logging function :param fvalid: function which takes two paths (zip name, local name) and return True if the file must be unzipped, False otherwise, if None, the default answer is True :param remove_space: remove spaces in created local path (+ ``',()``) :return: list of unzipped files :githublink:`%|py|455` """ if sys.platform.startswith("win"): # pragma: no cover exe = r"C:\Program Files\7-Zip\7z.exe" if not os.path.exists(exe): raise FileNotFoundError("unable to find: {0}".format(exe)) if where_to is None: where_to = os.path.abspath(".") cmd = '"{0}" x "{1}" "-o{2}"'.format(exe, zipf, where_to) out, err = run_cmd(cmd, wait=True, fLOG=fLOG) if len(err) > 0 or "Error:" in out: raise FileException( "Unable to unrar file '{0}'\n" "--OUT--\n{1}\n--ERR--\n{2}".format( zipf, out, err)) return explore_folder(where_to)[1] else: exe = "unrar" if where_to is None: where_to = os.path.abspath(".") cmd = '"{0}" x "{1}"'.format(exe, zipf) out, err = run_cmd(cmd, wait=True, fLOG=fLOG, change_path=where_to) if len(err) > 0: raise FileException( "Unable to unrar file '{0}'\n--CMD--\n{3}\n--OUT--\n{1}\n--ERR--\n{2}".format(zipf, out, err, cmd)) return explore_folder(where_to)[1]
[docs]def untar_files(filename, where_to=None, fLOG=noLOG, encoding=None): """ Uncompresses files from a tar file. :param filename: final tar file (double compression, extension should something like .zip.gz) :param where_to: destination folder (can be None, the result is a list of tuple) :param fLOG: logging function :param encoding: encoding :return: list of unzipped files :githublink:`%|py|495` """ if isinstance(filename, bytes): fileobj = filename name = None targz = True else: name = filename fileobj = None targz = name.endswith(".tar.gz") if targz: tar = tarfile.open(name=name, fileobj=fileobj, mode="r:gz", encoding=encoding) names = tar.getnames() tar.extractall(where_to) tar.close() else: tar = tarfile.open(name=name, fileobj=fileobj, mode="r:", encoding=encoding) names = tar.getnames() tar.extractall(where_to) tar.close() if where_to is not None: return [os.path.join(where_to, name) for name in names] return names