Source code for pyquickhelper.filehelper.file_tree_node

# -*- coding: utf-8 -*-
"""
a node which contains a file or a folder


:githublink:`%|py|6`
"""
import os
import re
import datetime
import time
import shutil
import hashlib
import warnings
from ..loghelper.pqh_exception import PQHException
from ..loghelper.flog import noLOG
from ..loghelper.pyrepo_helper import SourceRepository


[docs]class FileTreeNode: """ Defines a node for a folder or a tree. Example: :: def example (p1, p2, hash_size = 1024**2*2, svn1 = True, svn2 = False) : extout = re.compile (FileTreeNode.build_expression ("dvi bbl blg ilg ind old out pyc pyd " \\ "bak idx obj log aux pdb sbr ncb res idb suo dep " \\ "ogm manifest dsp dsz user ilk bsc exp eps".split ())) extfou = re.compile ("(exeinterpreter[/\\\\].*[.]dll)|([/\\\\]upgradereport)|" \\ "(thumbs[.]db)|([.]svn)|(temp[_/\\\\].*)") def filter (root, path, f, d) : root = root.lower () path = path.lower () f = f.lower () if extout.search (f) : if not d and not f.endswith(".pyc"): print("rejected (o1)", path, f) return False fu = os.path.join (path, f) if extfou.search (fu) : if not d and not f.endswith(".pyc"): print("rejected (o2)", path, f) return False return True f1 = p1 f2 = p2 node1 = FileTreeNode(f1, filter = filter, repository = svn1) node2 = FileTreeNode(f2, filter = filter, repository = svn2) print(len(node1), node1.max_date()) print(len(node2), node2.max_date()) res = node1.difference(node2, hash_size=hash_size) return res print(__file__, "synchro", OutputPrint = __name__ == "__main__") res = example (p1, p2) :githublink:`%|py|61` """ _default_not_ext = "bbl out pyc log lib ind pdb opt".split() _default_out = re.compile("([.]svn)|(hal.*[.]((exe)|(dll)|(so)|(sln)|(vcproj)))" + "|".join(["(.*[.]%s$)" % e for e in _default_not_ext]))
[docs] @staticmethod def build_expression(ext): """ Builds a regular expression validating a list of extension. :param ext: list of extension (with no points) :return: pattern (string) :githublink:`%|py|74` """ return ".*[.]" + "|".join(["(%s$)" % e for e in ext])
[docs] def __init__(self, root, file=None, filter=None, level=0, parent=None, repository=False, log=False, log1=False, fLOG=noLOG): """ Defines a file, relative to a root. :param root: root (it must exist) :param file: file, if None, fill _children :param filter: function (root, path, f, dir) --> True or False if this is a string, it will be converted into a regular expression (using re), and it will look into subfolders :param level: hierarchy level :param parent: link to the parent :param repository: use SVN or GIT if True :param log: log every explored folder :param log1: intermediate logs (first level) :param fLOG: logging function to use :githublink:`%|py|93` """ if root is None: raise ValueError("root cannot be None") self._root = root self._file = None if file is None else file self._children = [] self._type = None self._date = None self._size = None self._level = level self._parent = parent self._log = log self._log1 = log1 self.module = None self.fLOG = fLOG if not os.path.exists(root): raise PQHException("path '%s' does not exist" % root) if not os.path.isdir(root): raise PQHException("path '%s' is not a folder" % root) if self._file is not None: if not self.exists(): raise PQHException( "%s does not exist [%s,%s]" % (self.get_fullname(), root, file)) self._fillstat() if self.isdir(): if isinstance(filter, str): # it assumes it is a regular expression instead of a function exp = re.compile(filter) def fil(root, path, f, dir, e=exp): "local function" return dir or (e.search(f) is not None) self._fill(fil, repository=repository) else: self._fill(filter, repository=repository)
@property def name(self): """ Returns the file name from the root. :githublink:`%|py|137` """ return self._file @property def root(self): """ Returns the root directory, the one used as a root for a synchronization. :githublink:`%|py|144` """ return self._root @property def size(self): """ Returns the size. :githublink:`%|py|151` """ return self._size @property def date(self): """ Returns the modification date. :githublink:`%|py|158` """ return self._date @property def type(self): """ Returns the file type (``file`` or ``folder``). :githublink:`%|py|165` """ return self._type @property def fullname(self): """ Returns the full name. :githublink:`%|py|172` """ return self.get_fullname()
[docs] def hash_md5_readfile(self): """ Computes a hash of a file. :return: string :githublink:`%|py|180` """ filename = self.get_fullname() f = open(filename, 'rb') m = hashlib.md5() readBytes = 1024 ** 2 # read 1024 bytes per time totalBytes = 0 while readBytes: readString = f.read(readBytes) m.update(readString) readBytes = len(readString) totalBytes += readBytes f.close() return m.hexdigest()
[docs] def get_content(self, encoding="utf8"): """ Returns the content of a text file. :param encoding: encoding :return: content as a string :githublink:`%|py|200` """ with open(self.fullname, "r", encoding=encoding) as f: return f.read()
[docs] def get_fullname(self): """ :return: the full name :githublink:`%|py|207` """ if self._file is None: return self._root else: return os.path.join(self._root, self._file)
[docs] def exists(self): """ say if it does exist or not :return: boolean :githublink:`%|py|218` """ return os.path.exists(self.get_fullname())
[docs] def _fillstat(self): """ private: fill _type, _size :githublink:`%|py|224` """ full = self.get_fullname() if os.path.isfile(full): self._type = "file" else: self._type = "folder" stat = os.stat(self.get_fullname()) self._size = stat.st_size temp = datetime.datetime.utcfromtimestamp(stat.st_mtime) self._date = temp
[docs] def isdir(self): """ is it a folder? :return: boolean :githublink:`%|py|241` """ return os.path.isdir(self.get_fullname())
[docs] def isfile(self): """ is it a file? :return: boolean :githublink:`%|py|249` """ return os.path.isfile(self.get_fullname())
[docs] def __str__(self): """ usual :githublink:`%|py|255` """ line = [self._root] if self._level == 0 else [] fi = "" if self._file is None else self._file fi = os.path.split(fi)[-1] if len(fi) > 0: line.append(" " * self._level + fi) for c in self._children: r = str(c) line.append(r) return "\n".join(line)
[docs] def repo_ls(self, path): """ call ls of an instance of :class:`SourceRepository <pyquickhelper.loghelper.pyrepo_helper.SourceRepository>` :githublink:`%|py|269` """ if "_repo_" not in self.__dict__: self._repo_ = SourceRepository(True) return self._repo_.ls(path)
[docs] def _fill(self, filter, repository): """ look for subfolders :param filter: boolean function :param repository: use svn or git :githublink:`%|py|278` """ if not self.isdir(): raise PQHException( "unable to look into a file %s full %s" % (self._file, self.get_fullname())) if repository: opt = "repo_ls" full = self.get_fullname() fi = "" if self._file is None else self._file entry = self.repo_ls(full) temp = [os.path.relpath(p.name, full) for p in entry] all = [] for s in temp: all.append(s) else: opt = "listdir" full = self.get_fullname() fi = "" if self._file is None else self._file all = [a for a in os.listdir(full) if a not in [".", ".."]] all.sort() self._children = [] for a in all: fu = os.path.join(full, a) isd = os.path.isdir(fu) if self._log and isd: self.fLOG("[FileTreeNode], entering", a) elif self._log1 and self._level <= 0: self.fLOG("[FileTreeNode], entering", a) if filter is None or filter(self._root, fi, a, isd): try: n = FileTreeNode(self._root, os.path.join(fi, a), filter, level=self._level + 1, parent=self, repository=repository, log=self._log, log1=self._log1 or self._log, fLOG=self.fLOG) except PQHException as e: if "does not exist" in str(e): self.fLOG( "a folder should exist, but is it is not, it continues [opt=%s]" % opt) self.fLOG(e) continue if n.isdir() and len(n._children) == 0: continue else: self._children.append(n)
[docs] def get(self): """ return a dictionary with some values which describe the file :return: dict :githublink:`%|py|328` """ res = {"name": "" if self._file is None else self._file, "root___": self._root, "time": str(self._date), "size": self._size, "type___": self._type} return res
[docs] def __getitem__(self, i): """ returns the element i :param i: element :return: element :githublink:`%|py|340` """ return self._children[i]
[docs] def nb_children(self): """ return the number of children :return: int :githublink:`%|py|348` """ return len(self._children)
[docs] def __iter__(self): """ iterator on the element :return: iterator on all contained files :githublink:`%|py|356` """ yield self for c in self._children: for t in c: yield t
[docs] def max_date(self): """ return the more recent date :githublink:`%|py|364` """ return max([node._date for node in self])
[docs] def __len__(self): """ Returns the number of elements in this folder and in the subfolders. :githublink:`%|py|371` """ n = 0 for _ in self: n += 1 return n
[docs] def get_dict(self, lower=False): """ Returns a dictionary ``{ self._file : node }``. :param lower: if True, every filename is converted into lower case :githublink:`%|py|381` """ res = {} if lower: for node in self: if node._file is not None: res[node._file.lower()] = node else: for node in self: if node._file is not None: res[node._file] = node return res
[docs] def sign(self, node, hash_size): """ Returns ``==``, ``<`` or ``>`` according the dates if the size is not too big, if the sign is ``<`` or ``>``, applies the hash method. :githublink:`%|py|398` """ if self._date == node._date: return "==" elif self._date < node._date: if self.isdir( ) or self._size != node._size or node._size > hash_size: return "<" else: h1 = self.hash_md5_readfile() h2 = node.hash_md5_readfile() if h1 != h2: return "<" else: return "==" else: if self.isdir( ) or self._size != node._size or node._size > hash_size: return ">" else: h1 = self.hash_md5_readfile() h2 = node.hash_md5_readfile() if h1 != h2: return ">" else: return "=="
[docs] def difference(self, node, hash_size=1024 ** 2 * 2, lower=False): """ Returns the differences with another folder. :param node: other node :param hash_size: above this size, it does not compute the hash key :param lower: if True, every filename is converted into lower case :return: list of [ (``?``, self._file, node (in self), node (in node)) ], see below for the choice of ``?`` The question mark ``?`` means: - ``==`` no change - ``>`` more recent in self - ``<`` more recent in node - ``>+`` absent in node - ``<+`` absent in self :githublink:`%|py|440` """ ti = time.perf_counter() d1 = self.get_dict(lower=lower) d2 = node.get_dict(lower=lower) res = [] nb = 0 for k, v in d1.items(): ti2 = time.perf_counter() if ti2 - ti > 10: self.fLOG("FileTreeNode.difference: processed files", nb) ti = ti2 if k not in d2: res.append((k, ">+", v, None)) else: res.append((k, v.sign(d2[k], hash_size), v, d2[k])) nb += 1 for k, v in d2.items(): ti2 = time.perf_counter() if ti2 - ti > 10: self.fLOG("FileTreeNode.difference: processed files", nb) ti = ti2 if k not in d1: res.append((k, "<+", None, v)) nb += 1 res.sort() zoo = [(v[1], v[0]) + v[2:] for v in res] return zoo
[docs] def remove(self): """ Removes the file. :githublink:`%|py|474` """ full = self.get_fullname() self.fLOG("removing ", full) try: os.remove(full) except OSError as e: self.fLOG( "unable to remove ", full, " --- ", str(e).replace("\n", " ")) self.fLOG("[pyqerror] ", e)
[docs] def copy_to(self, path, exc=True): """ Copies the file to *path*. :param path: path :param exc: catch exception when possible, warning otherwise If the new path doe nots exist, it will be created. .. warning:: If a file already exists at the new location, it checks the dates. The file is copied only if the new file is older. :githublink:`%|py|496` """ if not os.path.exists(path): raise PQHException("this path does not exist: '{0}'".format(path)) if self.isdir(): raise PQHException( "this node represents a folder " + self.get_fullname()) full = self.get_fullname() temp = os.path.split(self._file)[0] dest = os.path.join(path, temp) fina = dest # os.path.split (dest) [0] if not os.path.exists(fina): self.fLOG("creating directory: ", fina) os.makedirs(fina) try: # if 1 : self.fLOG("+ copy ", full, " to ", dest) shutil.copy(full, dest) cop = os.path.join(dest, os.path.split(full)[1]) if not os.path.exists(cop): raise PQHException("Unable to copy '%s'." % cop) st1 = os.stat(full) st2 = os.stat(cop) t1 = datetime.datetime.utcfromtimestamp(st1.st_mtime) t2 = datetime.datetime.utcfromtimestamp(st2.st_mtime) if t1 >= t2: mes = "t1={0} for file '{1}' >= t2={2} for file '{3}'".format( t1, full, t2, cop) if t1 > t2 and exc: raise PQHException(mes) warnings.warn(mes, RuntimeWarning) except OSError as e: # else : self.fLOG("unable to copy file ", full, " to ", path) self.fLOG("[pyqerror]", e)