Coverage for pyquickhelper/filehelper/file_tree_node.py: 82%
244 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-03 02:21 +0200
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-03 02:21 +0200
1# -*- coding: utf-8 -*-
2"""
3@file
4@brief a node which contains a file or a folder
5"""
6import os
7import re
8import datetime
9import time
10import shutil
11import hashlib
12import warnings
13from ..loghelper.pqh_exception import PQHException
14from ..loghelper.flog import noLOG
15from ..loghelper.pyrepo_helper import SourceRepository
18class FileTreeNode:
20 """
21 Defines a node for a folder or a tree.
22 Example:
24 ::
26 def example (p1, p2, hash_size = 1024**2*2, svn1 = True, svn2 = False) :
27 extout = re.compile (FileTreeNode.build_expression ("dvi bbl blg ilg ind old out pyc pyd " \\
28 "bak idx obj log aux pdb sbr ncb res idb suo dep " \\
29 "ogm manifest dsp dsz user ilk bsc exp eps".split ()))
30 extfou = re.compile ("(exeinterpreter[/\\\\].*[.]dll)|([/\\\\]upgradereport)|" \\
31 "(thumbs[.]db)|([.]svn)|(temp[_/\\\\].*)")
33 def filter (root, path, f, d) :
34 root = root.lower ()
35 path = path.lower ()
36 f = f.lower ()
37 if extout.search (f) :
38 if not d and not f.endswith(".pyc"):
39 print("rejected (o1)", path, f)
40 return False
41 fu = os.path.join (path, f)
42 if extfou.search (fu) :
43 if not d and not f.endswith(".pyc"):
44 print("rejected (o2)", path, f)
45 return False
46 return True
48 f1 = p1
49 f2 = p2
51 node1 = FileTreeNode(f1, filter = filter, repository = svn1)
52 node2 = FileTreeNode(f2, filter = filter, repository = svn2)
53 print(len(node1), node1.max_date())
54 print(len(node2), node2.max_date())
56 res = node1.difference(node2, hash_size=hash_size)
57 return res
59 print(__file__, "synchro", OutputPrint = __name__ == "__main__")
60 res = example (p1, p2)
61 """
63 _default_not_ext = "bbl out pyc log lib ind pdb opt".split()
64 _default_out = re.compile("([.]svn)|(hal.*[.]((exe)|(dll)|(so)|(sln)|(vcproj)))" +
65 "|".join([f"(.*[.]{e}$)" for e in _default_not_ext]))
67 @staticmethod
68 def build_expression(ext):
69 """
70 Builds a regular expression validating a list of extension.
72 @param ext list of extension (with no points)
73 @return pattern (string)
74 """
75 return ".*[.]" + "|".join([f"({e}$)" for e in ext])
77 def __init__(self, root, file=None, filter=None, level=0, parent=None,
78 repository=False, log=False, log1=False, fLOG=noLOG):
79 """
80 Defines a file, relative to a root.
81 @param root root (it must exist)
82 @param file file, if None, fill _children
83 @param filter function (root, path, f, dir) --> True or False
84 if this is a string, it will be converted into a
85 regular expression (using re), and it will
86 look into subfolders
87 @param level hierarchy level
88 @param parent link to the parent
89 @param repository use SVN or GIT if True
90 @param log log every explored folder
91 @param log1 intermediate logs (first level)
92 @param fLOG logging function to use
93 """
94 if root is None:
95 raise ValueError( # pragma: no cover
96 "root cannot be None.")
97 self._root = root
98 self._file = None if file is None else file
99 self._children = []
100 self._type = None
101 self._date = None
102 self._size = None
103 self._level = level
104 self._parent = parent
105 self._log = log
106 self._log1 = log1
107 self.module = None
108 self.fLOG = fLOG
110 if not os.path.exists(root):
111 raise PQHException(f"path '{root}' does not exist")
112 if not os.path.isdir(root):
113 raise PQHException( # pragma: no cover
114 f"path '{root}' is not a folder")
116 if self._file is not None:
117 if not self.exists():
118 raise PQHException( # pragma: no cover
119 f"{self.get_fullname()} does not exist [{root},{file}]")
121 self._fillstat()
122 if self.isdir():
123 if isinstance(filter, str):
124 # it assumes it is a regular expression instead of a function
125 exp = re.compile(filter)
127 def fil(root, path, f, dir, e=exp):
128 "local function"
129 return dir or (e.search(f) is not None)
131 self._fill(fil, repository=repository)
132 else:
133 self._fill(filter, repository=repository)
135 @property
136 def name(self):
137 """
138 Returns the file name from the root.
139 """
140 return self._file
142 @property
143 def root(self):
144 """
145 Returns the root directory, the one used as a root for a synchronization.
146 """
147 return self._root
149 @property
150 def size(self):
151 """
152 Returns the size.
153 """
154 return self._size
156 @property
157 def date(self):
158 """
159 Returns the modification date.
160 """
161 return self._date
163 @property
164 def type(self):
165 """
166 Returns the file type (``file`` or ``folder``).
167 """
168 return self._type
170 @property
171 def fullname(self):
172 """
173 Returns the full name.
174 """
175 return self.get_fullname()
177 def hash_md5_readfile(self):
178 """
179 Computes a hash of a file.
181 @return string
182 """
183 filename = self.get_fullname()
184 f = open(filename, 'rb')
185 m = hashlib.md5()
186 readBytes = 1024 ** 2 # read 1024 bytes per time
187 totalBytes = 0
188 while readBytes:
189 readString = f.read(readBytes)
190 m.update(readString)
191 readBytes = len(readString)
192 totalBytes += readBytes
193 f.close()
194 return m.hexdigest()
196 def get_content(self, encoding="utf8"):
197 """
198 Returns the content of a text file.
200 @param encoding encoding
201 @return content as a string
202 """
203 with open(self.fullname, "r", encoding=encoding) as f:
204 return f.read()
206 def get_fullname(self):
207 """
208 @return the full name
209 """
210 if self._file is None:
211 return self._root
212 else:
213 return os.path.join(self._root, self._file)
215 def exists(self):
216 """
217 say if it does exist or not
219 @return boolean
220 """
221 return os.path.exists(self.get_fullname())
223 def _fillstat(self):
224 """
225 private: fill _type, _size
226 """
227 full = self.get_fullname()
228 if os.path.isfile(full):
229 self._type = "file"
230 else:
231 self._type = "folder"
233 stat = os.stat(self.get_fullname())
234 self._size = stat.st_size
235 temp = datetime.datetime.utcfromtimestamp(stat.st_mtime)
236 self._date = temp
238 def isdir(self):
239 """
240 is it a folder?
242 @return boolean
243 """
244 return os.path.isdir(self.get_fullname())
246 def isfile(self):
247 """
248 is it a file?
250 @return boolean
251 """
252 return os.path.isfile(self.get_fullname())
254 def __str__(self):
255 """
256 usual
257 """
258 line = [self._root] if self._level == 0 else []
259 fi = "" if self._file is None else self._file
260 fi = os.path.split(fi)[-1]
261 if len(fi) > 0:
262 line.append(" " * self._level + fi)
263 for c in self._children:
264 r = str(c)
265 line.append(r)
266 return "\n".join(line)
268 def repo_ls(self, path):
269 """
270 call ls of an instance of @see cl SourceRepository
271 """
272 if "_repo_" not in self.__dict__:
273 self._repo_ = SourceRepository(True)
274 return self._repo_.ls(path)
276 def _fill(self, filter, repository):
277 """look for subfolders
278 @param filter boolean function
279 @param repository use svn or git
280 """
281 if not self.isdir():
282 raise PQHException( # pragma: no cover
283 "Unable to look into a file %r full %r." % (
284 self._file, self.get_fullname()))
286 if repository:
287 opt = "repo_ls"
288 full = self.get_fullname()
289 fi = "" if self._file is None else self._file
290 entry = self.repo_ls(full)
291 temp = [os.path.relpath(p.name, full) for p in entry]
292 all = []
293 for s in temp:
294 all.append(s)
295 else:
296 opt = "listdir"
297 full = self.get_fullname()
298 fi = "" if self._file is None else self._file
299 all = [a for a in os.listdir(full) if a not in [".", ".."]]
301 all.sort()
302 self._children = []
303 for a in all:
304 fu = os.path.join(full, a)
305 isd = os.path.isdir(fu)
306 if self._log and isd:
307 self.fLOG("[FileTreeNode], entering", a)
308 elif self._log1 and self._level <= 0:
309 self.fLOG("[FileTreeNode], entering", a)
310 if filter is None or filter(self._root, fi, a, isd):
311 try:
312 n = FileTreeNode(self._root, os.path.join(fi, a), filter, level=self._level + 1,
313 parent=self, repository=repository, log=self._log,
314 log1=self._log1 or self._log, fLOG=self.fLOG)
315 except PQHException as e: # pragma: no cover
316 if "does not exist" in str(e):
317 self.fLOG(
318 f"a folder should exist, but is it is not, it continues [opt={opt}]")
319 self.fLOG(e)
320 continue
321 if n.isdir() and len(n._children) == 0:
322 continue
323 self._children.append(n)
325 def get(self):
326 """
327 return a dictionary with some values which describe the file
329 @return dict
330 """
331 res = {"name": "" if self._file is None else self._file,
332 "root___": self._root,
333 "time": str(self._date),
334 "size": self._size,
335 "type___": self._type}
336 return res
338 def __getitem__(self, i):
339 """returns the element i
340 @param i element
341 @return element
342 """
343 return self._children[i]
345 def nb_children(self):
346 """
347 return the number of children
349 @return int
350 """
351 return len(self._children)
353 def __iter__(self):
354 """
355 iterator on the element
357 @return iterator on all contained files
358 """
359 yield self
360 for c in self._children:
361 for t in c:
362 yield t
364 def max_date(self):
365 """return the more recent date
366 """
367 return max([node._date for node in self])
369 def __len__(self):
370 """
371 Returns the number of elements in this folder and
372 in the subfolders.
373 """
374 n = 0
375 for _ in self:
376 n += 1
377 return n
379 def get_dict(self, lower=False):
380 """
381 Returns a dictionary ``{ self._file : node }``.
382 @param lower if True, every filename is converted into lower case
383 """
384 res = {}
385 if lower:
386 for node in self:
387 if node._file is not None:
388 res[node._file.lower()] = node
389 else:
390 for node in self:
391 if node._file is not None:
392 res[node._file] = node
393 return res
395 def sign(self, node, hash_size):
396 """
397 Returns ``==``, ``<`` or ``>`` according the dates
398 if the size is not too big, if the sign is ``<`` or ``>``,
399 applies the hash method.
400 """
401 if self._date == node._date:
402 return "=="
403 elif self._date < node._date:
404 if self.isdir(
405 ) or self._size != node._size or node._size > hash_size:
406 return "<"
407 else:
408 h1 = self.hash_md5_readfile()
409 h2 = node.hash_md5_readfile()
410 if h1 != h2:
411 return "<"
412 else:
413 return "=="
414 else:
415 if self.isdir(
416 ) or self._size != node._size or node._size > hash_size:
417 return ">"
418 else:
419 h1 = self.hash_md5_readfile()
420 h2 = node.hash_md5_readfile()
421 if h1 != h2:
422 return ">"
423 else:
424 return "=="
426 def difference(self, node, hash_size=1024 ** 2 * 2, lower=False):
427 """
428 Returns the differences with another folder.
430 @param node other node
431 @param hash_size above this size, it does not compute the hash key
432 @param lower if True, every filename is converted into lower case
433 @return list of [ (``?``, self._file, node (in self), node (in node)) ], see below for the choice of ``?``
435 The question mark ``?`` means:
436 - ``==`` no change
437 - ``>`` more recent in self
438 - ``<`` more recent in node
439 - ``>+`` absent in node
440 - ``<+`` absent in self
442 """
443 ti = time.perf_counter()
444 d1 = self.get_dict(lower=lower)
445 d2 = node.get_dict(lower=lower)
446 res = []
447 nb = 0
448 for k, v in d1.items():
449 ti2 = time.perf_counter()
450 if ti2 - ti > 10:
451 self.fLOG("FileTreeNode.difference: processed files", nb)
452 ti = ti2
453 if k not in d2:
454 res.append((k, ">+", v, None))
455 else:
456 res.append((k, v.sign(d2[k], hash_size), v, d2[k]))
457 nb += 1
459 for k, v in d2.items():
460 ti2 = time.perf_counter()
461 if ti2 - ti > 10:
462 self.fLOG("FileTreeNode.difference: processed files", nb)
463 ti = ti2
464 if k not in d1:
465 res.append((k, "<+", None, v))
466 nb += 1
468 res.sort()
469 zoo = [(v[1], v[0]) + v[2:] for v in res]
471 return zoo
473 def remove(self):
474 """
475 Removes the file.
476 """
477 full = self.get_fullname()
478 self.fLOG("removing ", full)
479 try:
480 os.remove(full)
481 except OSError as e: # pragma: no cover
482 self.fLOG(
483 "unable to remove ", full, " --- ", str(e).replace("\n", " "))
484 self.fLOG("[pyqerror] ", e)
486 def copy_to(self, path, exc=True):
487 """
488 Copies the file to *path*.
490 @param path path
491 @param exc catch exception when possible, warning otherwise
493 If the new path doe nots exist, it will be created.
495 @warning If a file already exists at the new location,
496 it checks the dates. The file is copied only if
497 the new file is older.
498 """
499 if not os.path.exists(path):
500 raise PQHException( # pragma: no cover
501 f"This path does not exist: '{path}'.")
502 if self.isdir():
503 raise PQHException( # pragma: no cover
504 f"This node represents a folder {self.get_fullname()!r}.")
505 full = self.get_fullname()
506 temp = os.path.split(self._file)[0]
507 dest = os.path.join(path, temp)
508 fina = dest # os.path.split (dest) [0]
509 if not os.path.exists(fina):
510 self.fLOG("creating directory: ", fina)
511 os.makedirs(fina)
512 try:
513 # if 1 :
514 self.fLOG("+ copy ", full, " to ", dest)
515 shutil.copy(full, dest)
516 cop = os.path.join(dest, os.path.split(full)[1])
517 if not os.path.exists(cop):
518 raise PQHException(f"Unable to copy '{cop}'.")
519 st1 = os.stat(full)
520 st2 = os.stat(cop)
521 t1 = datetime.datetime.utcfromtimestamp(st1.st_mtime)
522 t2 = datetime.datetime.utcfromtimestamp(st2.st_mtime)
523 if t1 >= t2:
524 mes = f"t1={t1} for file '{full}' >= t2={t2} for file '{cop}'"
525 if t1 > t2 and exc:
526 raise PQHException(mes)
527 warnings.warn(mes, RuntimeWarning)
528 except OSError as e: # pragma: no cover
529 # else :
530 self.fLOG("unable to copy file ", full, " to ", path)
531 self.fLOG("[pyqerror]", e)