Source code for pyensae.filehelper.decompress_helper

"""
Various functions to decompress files


:githublink:`%|py|5`
"""
import zipfile
import os
import gzip
import bz2
import warnings
import copy
import tarfile
from tarfile import ExtractError
from pyquickhelper.loghelper import noLOG


[docs]def decompress_zip(filename, whereTo=".", fLOG=noLOG): """ Unzips a :epkg:`zip` file. :param filename: file to process :param whereTo: location of the result :param fLOG: logging function :return: return the list of decompressed files :githublink:`%|py|24` """ try: file = zipfile.ZipFile(filename, "r") except zipfile.BadZipFile as e: # pragma: no cover raise RuntimeError("Unable to unzip '{}'.".format( filename)) from e files = [] for info in file.infolist(): if not os.path.exists(info.filename): data = file.read(info.filename) tos = os.path.join(whereTo, info.filename) if not os.path.exists(tos): finalfolder = os.path.split(tos)[0] if not os.path.exists(finalfolder): fLOG( # pragma: no cover "[decompress_zip] creating folder '{0}'".format( finalfolder)) os.makedirs(finalfolder) # pragma: no cover if not info.filename.endswith("/"): u = open(tos, "wb") u.write(data) u.close() files.append(tos) fLOG("[decompress_zip] unzipped '{0}' to '{1}'".format( info.filename, tos)) elif not tos.endswith("/"): files.append(tos) elif not info.filename.endswith("/"): files.append(info.filename) return files
[docs]def extractall_silent(self, path=".", members=None, *, numeric_owner=False, silent=False): """ Extracts all members from the archive to the current working directory and set owner, modification time and permissions on directories afterwards. `path' specifies a different directory to extract to. `members' is optional and must be a subset of the list returned by getmembers(). If `numeric_owner` is True, only the numbers for user/group names are used and not the names. Same function as `TarFile.extractall <https://github.com/python/cpython/blob/master/Lib/tarfile.py>`_ but raises a warning if something wrong happens if silent is True. :githublink:`%|py|67` """ directories = [] if members is None: members = self for tarinfo in members: if tarinfo.isdir(): # Extract directories with a safe mode. directories.append(tarinfo) tarinfo = copy.copy(tarinfo) tarinfo.mode = 0o700 # Do not set_attrs directories, as we will do that further down if silent: # pragma: no cover try: self.extract(tarinfo, path, set_attrs=not tarinfo.isdir(), numeric_owner=numeric_owner) except FileNotFoundError as e: warnings.warn( "[TarFile.extractall_silent] issue with '{0}' - {1}".format(path, e)) else: self.extract(tarinfo, path, set_attrs=not tarinfo.isdir(), numeric_owner=numeric_owner) # Reverse sort directories. directories.sort(key=lambda a: a.name) directories.reverse() # Set correct owner, mtime and filemode on directories. for tarinfo in directories: dirpath = os.path.join(path, tarinfo.name) try: self.chown(tarinfo, dirpath, numeric_owner=numeric_owner) self.utime(tarinfo, dirpath) self.chmod(tarinfo, dirpath) except ExtractError as e: # pragma: no cover if self.errorlevel > 1: raise self._dbg(1, "tarfile: %s" % e)
[docs]def decompress_targz(filename, whereTo=".", silent=True, fLOG=noLOG): """ Decompresses a :epkg:`tar.gz` file. :param filename: file to process :param folder: location of the result :param silent: raise a warning instead of an error :param fLOG: logging function :return: return the list of decompressed files :githublink:`%|py|117` """ tfile = tarfile.open(filename, 'r:gz') files = tfile.getmembers() extractall_silent(tfile, whereTo, silent=silent) t = [os.path.join(whereTo, f.name) for f in files] return [f for f in t if os.path.isfile(f)]
[docs]def decompress_gz(filename, whereTo=".", fLOG=noLOG): """ Decompresses a :epkg:`tar.gz` file. :param filename: file to process :param folder: location of the result :param fLOG: logging function :return: return the list of decompressed files (only one) :githublink:`%|py|133` """ if not filename.endswith(".gz"): raise NameError( # pragma: no cover "the file should end with .gz: %r" % filename) dest = os.path.join(whereTo, filename[:-3]) with gzip.open(filename, 'rb') as f: with open(dest, "wb") as g: g.write(f.read()) return [dest]
[docs]def decompress_bz2(filename, whereTo=".", fLOG=noLOG): """ Decompresses a :epkg:`bz2` file. :param filename: file to process :param folder: location of the result :param fLOG: logging function :return: return the list of decompressed files (only one) :githublink:`%|py|152` """ if not filename.endswith(".bz2"): raise NameError( # pragma: no cover "the file should end with .bz2 not '{0}'".format(filename)) dest = os.path.join(whereTo, os.path.split(filename)[-1][:-4]) with bz2.open(filename, 'rb') as f: with open(dest, "wb") as g: g.write(f.read()) return [dest]