Coverage for pyquickhelper/filehelper/file_tree_node.py: 82%

244 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-03 02:21 +0200

1# -*- coding: utf-8 -*- 

2""" 

3@file 

4@brief a node which contains a file or a folder 

5""" 

6import os 

7import re 

8import datetime 

9import time 

10import shutil 

11import hashlib 

12import warnings 

13from ..loghelper.pqh_exception import PQHException 

14from ..loghelper.flog import noLOG 

15from ..loghelper.pyrepo_helper import SourceRepository 

16 

17 

18class FileTreeNode: 

19 

20 """ 

21 Defines a node for a folder or a tree. 

22 Example: 

23 

24 :: 

25 

26 def example (p1, p2, hash_size = 1024**2*2, svn1 = True, svn2 = False) : 

27 extout = re.compile (FileTreeNode.build_expression ("dvi bbl blg ilg ind old out pyc pyd " \\ 

28 "bak idx obj log aux pdb sbr ncb res idb suo dep " \\ 

29 "ogm manifest dsp dsz user ilk bsc exp eps".split ())) 

30 extfou = re.compile ("(exeinterpreter[/\\\\].*[.]dll)|([/\\\\]upgradereport)|" \\ 

31 "(thumbs[.]db)|([.]svn)|(temp[_/\\\\].*)") 

32 

33 def filter (root, path, f, d) : 

34 root = root.lower () 

35 path = path.lower () 

36 f = f.lower () 

37 if extout.search (f) : 

38 if not d and not f.endswith(".pyc"): 

39 print("rejected (o1)", path, f) 

40 return False 

41 fu = os.path.join (path, f) 

42 if extfou.search (fu) : 

43 if not d and not f.endswith(".pyc"): 

44 print("rejected (o2)", path, f) 

45 return False 

46 return True 

47 

48 f1 = p1 

49 f2 = p2 

50 

51 node1 = FileTreeNode(f1, filter = filter, repository = svn1) 

52 node2 = FileTreeNode(f2, filter = filter, repository = svn2) 

53 print(len(node1), node1.max_date()) 

54 print(len(node2), node2.max_date()) 

55 

56 res = node1.difference(node2, hash_size=hash_size) 

57 return res 

58 

59 print(__file__, "synchro", OutputPrint = __name__ == "__main__") 

60 res = example (p1, p2) 

61 """ 

62 

63 _default_not_ext = "bbl out pyc log lib ind pdb opt".split() 

64 _default_out = re.compile("([.]svn)|(hal.*[.]((exe)|(dll)|(so)|(sln)|(vcproj)))" + 

65 "|".join([f"(.*[.]{e}$)" for e in _default_not_ext])) 

66 

67 @staticmethod 

68 def build_expression(ext): 

69 """ 

70 Builds a regular expression validating a list of extension. 

71 

72 @param ext list of extension (with no points) 

73 @return pattern (string) 

74 """ 

75 return ".*[.]" + "|".join([f"({e}$)" for e in ext]) 

76 

77 def __init__(self, root, file=None, filter=None, level=0, parent=None, 

78 repository=False, log=False, log1=False, fLOG=noLOG): 

79 """ 

80 Defines a file, relative to a root. 

81 @param root root (it must exist) 

82 @param file file, if None, fill _children 

83 @param filter function (root, path, f, dir) --> True or False 

84 if this is a string, it will be converted into a 

85 regular expression (using re), and it will 

86 look into subfolders 

87 @param level hierarchy level 

88 @param parent link to the parent 

89 @param repository use SVN or GIT if True 

90 @param log log every explored folder 

91 @param log1 intermediate logs (first level) 

92 @param fLOG logging function to use 

93 """ 

94 if root is None: 

95 raise ValueError( # pragma: no cover 

96 "root cannot be None.") 

97 self._root = root 

98 self._file = None if file is None else file 

99 self._children = [] 

100 self._type = None 

101 self._date = None 

102 self._size = None 

103 self._level = level 

104 self._parent = parent 

105 self._log = log 

106 self._log1 = log1 

107 self.module = None 

108 self.fLOG = fLOG 

109 

110 if not os.path.exists(root): 

111 raise PQHException(f"path '{root}' does not exist") 

112 if not os.path.isdir(root): 

113 raise PQHException( # pragma: no cover 

114 f"path '{root}' is not a folder") 

115 

116 if self._file is not None: 

117 if not self.exists(): 

118 raise PQHException( # pragma: no cover 

119 f"{self.get_fullname()} does not exist [{root},{file}]") 

120 

121 self._fillstat() 

122 if self.isdir(): 

123 if isinstance(filter, str): 

124 # it assumes it is a regular expression instead of a function 

125 exp = re.compile(filter) 

126 

127 def fil(root, path, f, dir, e=exp): 

128 "local function" 

129 return dir or (e.search(f) is not None) 

130 

131 self._fill(fil, repository=repository) 

132 else: 

133 self._fill(filter, repository=repository) 

134 

135 @property 

136 def name(self): 

137 """ 

138 Returns the file name from the root. 

139 """ 

140 return self._file 

141 

142 @property 

143 def root(self): 

144 """ 

145 Returns the root directory, the one used as a root for a synchronization. 

146 """ 

147 return self._root 

148 

149 @property 

150 def size(self): 

151 """ 

152 Returns the size. 

153 """ 

154 return self._size 

155 

156 @property 

157 def date(self): 

158 """ 

159 Returns the modification date. 

160 """ 

161 return self._date 

162 

163 @property 

164 def type(self): 

165 """ 

166 Returns the file type (``file`` or ``folder``). 

167 """ 

168 return self._type 

169 

170 @property 

171 def fullname(self): 

172 """ 

173 Returns the full name. 

174 """ 

175 return self.get_fullname() 

176 

177 def hash_md5_readfile(self): 

178 """ 

179 Computes a hash of a file. 

180 

181 @return string 

182 """ 

183 filename = self.get_fullname() 

184 f = open(filename, 'rb') 

185 m = hashlib.md5() 

186 readBytes = 1024 ** 2 # read 1024 bytes per time 

187 totalBytes = 0 

188 while readBytes: 

189 readString = f.read(readBytes) 

190 m.update(readString) 

191 readBytes = len(readString) 

192 totalBytes += readBytes 

193 f.close() 

194 return m.hexdigest() 

195 

196 def get_content(self, encoding="utf8"): 

197 """ 

198 Returns the content of a text file. 

199 

200 @param encoding encoding 

201 @return content as a string 

202 """ 

203 with open(self.fullname, "r", encoding=encoding) as f: 

204 return f.read() 

205 

206 def get_fullname(self): 

207 """ 

208 @return the full name 

209 """ 

210 if self._file is None: 

211 return self._root 

212 else: 

213 return os.path.join(self._root, self._file) 

214 

215 def exists(self): 

216 """ 

217 say if it does exist or not 

218 

219 @return boolean 

220 """ 

221 return os.path.exists(self.get_fullname()) 

222 

223 def _fillstat(self): 

224 """ 

225 private: fill _type, _size 

226 """ 

227 full = self.get_fullname() 

228 if os.path.isfile(full): 

229 self._type = "file" 

230 else: 

231 self._type = "folder" 

232 

233 stat = os.stat(self.get_fullname()) 

234 self._size = stat.st_size 

235 temp = datetime.datetime.utcfromtimestamp(stat.st_mtime) 

236 self._date = temp 

237 

238 def isdir(self): 

239 """ 

240 is it a folder? 

241 

242 @return boolean 

243 """ 

244 return os.path.isdir(self.get_fullname()) 

245 

246 def isfile(self): 

247 """ 

248 is it a file? 

249 

250 @return boolean 

251 """ 

252 return os.path.isfile(self.get_fullname()) 

253 

254 def __str__(self): 

255 """ 

256 usual 

257 """ 

258 line = [self._root] if self._level == 0 else [] 

259 fi = "" if self._file is None else self._file 

260 fi = os.path.split(fi)[-1] 

261 if len(fi) > 0: 

262 line.append(" " * self._level + fi) 

263 for c in self._children: 

264 r = str(c) 

265 line.append(r) 

266 return "\n".join(line) 

267 

268 def repo_ls(self, path): 

269 """ 

270 call ls of an instance of @see cl SourceRepository 

271 """ 

272 if "_repo_" not in self.__dict__: 

273 self._repo_ = SourceRepository(True) 

274 return self._repo_.ls(path) 

275 

276 def _fill(self, filter, repository): 

277 """look for subfolders 

278 @param filter boolean function 

279 @param repository use svn or git 

280 """ 

281 if not self.isdir(): 

282 raise PQHException( # pragma: no cover 

283 "Unable to look into a file %r full %r." % ( 

284 self._file, self.get_fullname())) 

285 

286 if repository: 

287 opt = "repo_ls" 

288 full = self.get_fullname() 

289 fi = "" if self._file is None else self._file 

290 entry = self.repo_ls(full) 

291 temp = [os.path.relpath(p.name, full) for p in entry] 

292 all = [] 

293 for s in temp: 

294 all.append(s) 

295 else: 

296 opt = "listdir" 

297 full = self.get_fullname() 

298 fi = "" if self._file is None else self._file 

299 all = [a for a in os.listdir(full) if a not in [".", ".."]] 

300 

301 all.sort() 

302 self._children = [] 

303 for a in all: 

304 fu = os.path.join(full, a) 

305 isd = os.path.isdir(fu) 

306 if self._log and isd: 

307 self.fLOG("[FileTreeNode], entering", a) 

308 elif self._log1 and self._level <= 0: 

309 self.fLOG("[FileTreeNode], entering", a) 

310 if filter is None or filter(self._root, fi, a, isd): 

311 try: 

312 n = FileTreeNode(self._root, os.path.join(fi, a), filter, level=self._level + 1, 

313 parent=self, repository=repository, log=self._log, 

314 log1=self._log1 or self._log, fLOG=self.fLOG) 

315 except PQHException as e: # pragma: no cover 

316 if "does not exist" in str(e): 

317 self.fLOG( 

318 f"a folder should exist, but is it is not, it continues [opt={opt}]") 

319 self.fLOG(e) 

320 continue 

321 if n.isdir() and len(n._children) == 0: 

322 continue 

323 self._children.append(n) 

324 

325 def get(self): 

326 """ 

327 return a dictionary with some values which describe the file 

328 

329 @return dict 

330 """ 

331 res = {"name": "" if self._file is None else self._file, 

332 "root___": self._root, 

333 "time": str(self._date), 

334 "size": self._size, 

335 "type___": self._type} 

336 return res 

337 

338 def __getitem__(self, i): 

339 """returns the element i 

340 @param i element 

341 @return element 

342 """ 

343 return self._children[i] 

344 

345 def nb_children(self): 

346 """ 

347 return the number of children 

348 

349 @return int 

350 """ 

351 return len(self._children) 

352 

353 def __iter__(self): 

354 """ 

355 iterator on the element 

356 

357 @return iterator on all contained files 

358 """ 

359 yield self 

360 for c in self._children: 

361 for t in c: 

362 yield t 

363 

364 def max_date(self): 

365 """return the more recent date 

366 """ 

367 return max([node._date for node in self]) 

368 

369 def __len__(self): 

370 """ 

371 Returns the number of elements in this folder and 

372 in the subfolders. 

373 """ 

374 n = 0 

375 for _ in self: 

376 n += 1 

377 return n 

378 

379 def get_dict(self, lower=False): 

380 """ 

381 Returns a dictionary ``{ self._file : node }``. 

382 @param lower if True, every filename is converted into lower case 

383 """ 

384 res = {} 

385 if lower: 

386 for node in self: 

387 if node._file is not None: 

388 res[node._file.lower()] = node 

389 else: 

390 for node in self: 

391 if node._file is not None: 

392 res[node._file] = node 

393 return res 

394 

395 def sign(self, node, hash_size): 

396 """ 

397 Returns ``==``, ``<`` or ``>`` according the dates 

398 if the size is not too big, if the sign is ``<`` or ``>``, 

399 applies the hash method. 

400 """ 

401 if self._date == node._date: 

402 return "==" 

403 elif self._date < node._date: 

404 if self.isdir( 

405 ) or self._size != node._size or node._size > hash_size: 

406 return "<" 

407 else: 

408 h1 = self.hash_md5_readfile() 

409 h2 = node.hash_md5_readfile() 

410 if h1 != h2: 

411 return "<" 

412 else: 

413 return "==" 

414 else: 

415 if self.isdir( 

416 ) or self._size != node._size or node._size > hash_size: 

417 return ">" 

418 else: 

419 h1 = self.hash_md5_readfile() 

420 h2 = node.hash_md5_readfile() 

421 if h1 != h2: 

422 return ">" 

423 else: 

424 return "==" 

425 

426 def difference(self, node, hash_size=1024 ** 2 * 2, lower=False): 

427 """ 

428 Returns the differences with another folder. 

429 

430 @param node other node 

431 @param hash_size above this size, it does not compute the hash key 

432 @param lower if True, every filename is converted into lower case 

433 @return list of [ (``?``, self._file, node (in self), node (in node)) ], see below for the choice of ``?`` 

434 

435 The question mark ``?`` means: 

436 - ``==`` no change 

437 - ``>`` more recent in self 

438 - ``<`` more recent in node 

439 - ``>+`` absent in node 

440 - ``<+`` absent in self 

441 

442 """ 

443 ti = time.perf_counter() 

444 d1 = self.get_dict(lower=lower) 

445 d2 = node.get_dict(lower=lower) 

446 res = [] 

447 nb = 0 

448 for k, v in d1.items(): 

449 ti2 = time.perf_counter() 

450 if ti2 - ti > 10: 

451 self.fLOG("FileTreeNode.difference: processed files", nb) 

452 ti = ti2 

453 if k not in d2: 

454 res.append((k, ">+", v, None)) 

455 else: 

456 res.append((k, v.sign(d2[k], hash_size), v, d2[k])) 

457 nb += 1 

458 

459 for k, v in d2.items(): 

460 ti2 = time.perf_counter() 

461 if ti2 - ti > 10: 

462 self.fLOG("FileTreeNode.difference: processed files", nb) 

463 ti = ti2 

464 if k not in d1: 

465 res.append((k, "<+", None, v)) 

466 nb += 1 

467 

468 res.sort() 

469 zoo = [(v[1], v[0]) + v[2:] for v in res] 

470 

471 return zoo 

472 

473 def remove(self): 

474 """ 

475 Removes the file. 

476 """ 

477 full = self.get_fullname() 

478 self.fLOG("removing ", full) 

479 try: 

480 os.remove(full) 

481 except OSError as e: # pragma: no cover 

482 self.fLOG( 

483 "unable to remove ", full, " --- ", str(e).replace("\n", " ")) 

484 self.fLOG("[pyqerror] ", e) 

485 

486 def copy_to(self, path, exc=True): 

487 """ 

488 Copies the file to *path*. 

489 

490 @param path path 

491 @param exc catch exception when possible, warning otherwise 

492 

493 If the new path doe nots exist, it will be created. 

494 

495 @warning If a file already exists at the new location, 

496 it checks the dates. The file is copied only if 

497 the new file is older. 

498 """ 

499 if not os.path.exists(path): 

500 raise PQHException( # pragma: no cover 

501 f"This path does not exist: '{path}'.") 

502 if self.isdir(): 

503 raise PQHException( # pragma: no cover 

504 f"This node represents a folder {self.get_fullname()!r}.") 

505 full = self.get_fullname() 

506 temp = os.path.split(self._file)[0] 

507 dest = os.path.join(path, temp) 

508 fina = dest # os.path.split (dest) [0] 

509 if not os.path.exists(fina): 

510 self.fLOG("creating directory: ", fina) 

511 os.makedirs(fina) 

512 try: 

513 # if 1 : 

514 self.fLOG("+ copy ", full, " to ", dest) 

515 shutil.copy(full, dest) 

516 cop = os.path.join(dest, os.path.split(full)[1]) 

517 if not os.path.exists(cop): 

518 raise PQHException(f"Unable to copy '{cop}'.") 

519 st1 = os.stat(full) 

520 st2 = os.stat(cop) 

521 t1 = datetime.datetime.utcfromtimestamp(st1.st_mtime) 

522 t2 = datetime.datetime.utcfromtimestamp(st2.st_mtime) 

523 if t1 >= t2: 

524 mes = f"t1={t1} for file '{full}' >= t2={t2} for file '{cop}'" 

525 if t1 > t2 and exc: 

526 raise PQHException(mes) 

527 warnings.warn(mes, RuntimeWarning) 

528 except OSError as e: # pragma: no cover 

529 # else : 

530 self.fLOG("unable to copy file ", full, " to ", path) 

531 self.fLOG("[pyqerror]", e)