Coverage for pyquickhelper/loghelper/flog.py: 86%

452 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-03 02:21 +0200

1# -*- coding: utf-8 -*- 

2""" 

3@file 

4@brief logging functionalities 

5 

6The function fLOG (or fLOG) is used to logged everything into a log file. 

7 

8:: 

9 

10 from pyquickhelper.loghelper.flog import fLOG 

11 fLOG(OutputPrint = True) # the logs are also displayed in the output stream 

12 fLOG(LogPath = "...") # chanages the path returned by GetPath 

13 fLOG("un", "deux", 4, ["gt"]) # log everything in a log file 

14 

15 from pyquickhelper.loghelper.flog import GetPath () 

16 print GetPath() # return the log path (file temp_log.txt) 

17 

18 fLOG(LogPath = "c:/temp/log_path") # change the log path, creates it if it does not exist 

19 

20@warning This module inserts static variable in module :epkg:`*py:sys`. 

21 I was done to deal with several instances of the same module 

22 in earlier versions of :epkg:`python`. 

23""" 

24import copy 

25import datetime 

26import decimal 

27import math 

28import os 

29import pprint 

30import random 

31import sys 

32import time 

33import re 

34import zipfile 

35import urllib.request as urllib_request 

36from .flog_fake_classes import FlogStatic, LogFakeFileStream, LogFileStream, PQHException 

37from .run_cmd import run_cmd 

38 

39 

40flog_static = FlogStatic() 

41 

42 

43def init(path=None, filename=None, create=True, path_add=None): 

44 """ 

45 initialisation 

46 @param path new path, if path==*"###"*, then uses ``d:\\temp\\log_pyquickhelper`` 

47 if it exists or ``c:\\temp\\log_pyquickhelper`` if not 

48 @param filename new filename 

49 @param create force the creation 

50 @param path_add subfolder to append to the current folder 

51 

52 This function is also called when LogPath is specified while calling function fLOG. 

53 """ 

54 if path_add is None: 

55 path_add = [] # pragma: no cover 

56 if path is None: 

57 path = flog_static.store_log_values["__log_path"] 

58 

59 if path == "###": 

60 if sys.platform.startswith("win"): # pragma: no cover 

61 path = "d:\\temp" if os.path.exists("d:\\temp") else "c:\\temp" 

62 path = os.path.join(path, "log_pyquickhelper") 

63 else: 

64 path = "/tmp" 

65 path = os.path.join(path, "log_pyquickhelper") 

66 

67 if len(path_add) > 0: # pragma: no cover 

68 if not isinstance(path_add, list): 

69 path_add = [path_add] 

70 temp = [] 

71 for p in path_add: 

72 spl = os.path.splitext(p) 

73 temp.append(spl[0]) 

74 path = os.path.join(path, *temp) 

75 

76 if filename is None: 

77 filename = flog_static.store_log_values["__log_file_name"] 

78 

79 if (flog_static.store_log_values["__log_path"] != path or flog_static.store_log_values["__log_file_name"] != filename) \ 

80 and flog_static.store_log_values["__log_file"] is not None: 

81 flog_static.store_log_values["__log_file"].close() 

82 flog_static.store_log_values["__log_file"] = None 

83 flog_static.store_log_values["__log_path"] = path 

84 flog_static.store_log_values["__log_file_name"] = filename 

85 

86 if create: 

87 if not os.path.exists(flog_static.store_log_values["__log_path"]): 

88 os.makedirs(flog_static.store_log_values["__log_path"]) 

89 else: # pragma: no cover 

90 if not os.path.exists(flog_static.store_log_values["__log_path"]): 

91 raise PQHException( 

92 "unable to find path " + flog_static.store_log_values["__log_path"]) 

93 

94 

95def GetSepLine(): 

96 """ 

97 Always returns ``\\n`` 

98 """ 

99 return "\n" # pragma: no cover 

100 

101 

102def GetPath(): 

103 """ 

104 returns a path where the log file is stored. 

105 @return path to the logs 

106 """ 

107 return flog_static.store_log_values["__log_path"] 

108 

109 

110def Print(redirect=True): 

111 """ 

112 if True, redirect everything which is displayed to the standard output 

113 """ 

114 lock = flog_static.store_log_values.get("Lock", False) 

115 if not lock: 

116 flog_static.store_log_values["__log_display"] = redirect 

117 

118 

119def GetLogFile(physical=False, filename=None): 

120 """ 

121 Returns a file name containing the log 

122 

123 :param physical: use a physical file or not 

124 :param filename: file name (if physical is True, default value is ``temp_log.txt``) 

125 :return: a pointer to a log file 

126 :rtype: str 

127 :raises OSError: if this file cannot be created 

128 """ 

129 if flog_static.store_log_values["__log_file"] is None: 

130 if physical: 

131 path = GetPath() 

132 if flog_static.store_log_values["__log_file_name"] is None: 

133 if os.path.exists(path): 

134 flog_static.store_log_values["__log_file_name"] = os.path.join( 

135 path, flog_static.store_log_values["__log_const"]) 

136 else: 

137 raise PQHException( # pragma: no cover 

138 "unable to create a log file in folder " + path) 

139 

140 if not isinstance(flog_static.store_log_values["__log_file_name"], str): 

141 flog_static.store_log_values["__log_file"] = flog_static.store_log_values[ 

142 "__log_file_name"] 

143 else: 

144 flog_static.store_log_values[ 

145 "__log_file"] = LogFileStream(filename=filename) 

146 else: 

147 flog_static.store_log_values["__log_file"] = LogFakeFileStream() 

148 

149 return flog_static.store_log_values["__log_file"] 

150 

151 

152def noLOG(*args, **kwargs): 

153 """ 

154 does nothing 

155 """ 

156 if len(args) > 0: 

157 return args[0] 

158 return None # pragma: no cover 

159 

160 

161def fLOG(*args, **kwargs): 

162 """ 

163 Builds a message on a single line with the date, it deals with encoding issues. 

164 

165 @param args list of fields 

166 @param kwargs dictionary of fields (see below) 

167 @exception OSError When the log file cannot be created. 

168 

169 About parameter *p*: 

170 

171 - if *p* contains *OutputPrint*, call ``Print(OutputPrint)`` 

172 - if *p* contains *LogPath*, it calls ``init(v)`` 

173 - if *p* contains *LogFile*, it changes the log file name 

174 (it creates a new one, the previous is closed). 

175 - if *p* contains *LogPathAdd*, it adds this path to the temporary file 

176 - if *p* contains *Lock*, it locks option *OutputPrint* 

177 - if *p* contains *UnLock*, it unlocks option *OutputPrint* 

178 - if *p* contains *_pp*, it uses :epkg:`*py:pprint` 

179 

180 Example: 

181 

182 :: 

183 

184 fLOG (LogPath = "###", LogPathAdd = __file__, OutputPrint = True) 

185 

186 .. faqref:: 

187 :title: How to activate the logs? 

188 

189 The following instruction will do: 

190 

191 :: 

192 

193 fLOG(OutputPrint=True) 

194 

195 To log everything into a file: 

196 

197 :: 

198 

199 fLOG(OutputPrint=True, LogFile="log_file.txt") 

200 

201 Parameter *OutputStream* allows to print 

202 the message on a different stream. 

203 """ 

204 path_add = kwargs.get("LogPathAdd", []) 

205 outstream = kwargs.get('OutputStream', None) 

206 if outstream is not None: 

207 del kwargs['OutputStream'] 

208 

209 lock = kwargs.get("Lock", None) 

210 if lock is not None: 

211 flog_static.store_log_values["Lock"] = lock 

212 

213 if "LogFile" in kwargs and "LogPath" in kwargs: 

214 init(kwargs["LogPath"], kwargs["LogFile"]) 

215 elif "LogFile" in kwargs: 

216 init(filename=kwargs["LogFile"], path_add=path_add) 

217 elif "LogPath" in kwargs: 

218 init(path=kwargs["LogPath"], path_add=path_add) 

219 

220 def myprint(s): 

221 if outstream is not None: 

222 outstream.write(s + '\n') 

223 else: 

224 print(s) # pragma: no cover 

225 

226 if "OutputPrint" in kwargs: 

227 Print(kwargs["OutputPrint"]) 

228 

229 if "LogFile" in kwargs: 

230 GetLogFile(True, filename=kwargs["LogFile"]) 

231 

232 message = fLOGFormat(flog_static.store_log_values["__log_file_sep"], 

233 *args, **kwargs) 

234 GetLogFile().write(message) 

235 if flog_static.store_log_values["__log_display"]: 

236 try: 

237 myprint(message.strip("\r\n")) 

238 except UnicodeEncodeError: # pragma: no cover 

239 mes = "\n".join(repr(message.strip("\r\n")).split("\\n")) 

240 try: 

241 myprint(mes) 

242 except UnicodeEncodeError: 

243 mes2 = mes.encode("utf-8").decode("cp1252", errors="ignore") 

244 myprint(mes2) 

245 

246 GetLogFile().flush() 

247 if len(args) > 0: 

248 return args[0] 

249 return None 

250 

251 

252def fLOGFormat(sep, *args, **kwargs): 

253 """ 

254 Formats a message. 

255 

256 @param sep line separator 

257 @param args list of anything 

258 @param kwargs dictioanry of anything 

259 @return string 

260 

261 if *_pp* is True, the function uses :epkg:`*py:pprint`. 

262 """ 

263 upp = kwargs.get('_pp', False) 

264 dt = datetime.datetime(2009, 1, 1).now() 

265 typstr = str 

266 if len(args) > 0: 

267 def _str_process(s): 

268 if isinstance(s, str): 

269 if upp: 

270 return pprint.pformat(s) 

271 return s 

272 if isinstance(s, bytes): 

273 return s.decode("utf8") 

274 try: 

275 if upp: 

276 return pprint.pformat(s) 

277 return typstr(s) 

278 except Exception as e: # pragma: no cover 

279 raise RuntimeError( # pragma: no cover 

280 "unable to convert s into string: type(s)=" + str(type(s))) from e 

281 

282 message = (str(dt).split(".", maxsplit=1)[0] + " " + 

283 " ".join([_str_process(s) for s in args]) + sep) 

284 st = " " 

285 else: 

286 message = typstr(dt).split(".", maxsplit=1)[0] + " " 

287 st = " " 

288 

289 messages = [message] 

290 

291 for k, v in kwargs.items(): 

292 if k in ("OutputPrint", '_pp') and v: 

293 continue 

294 message = st + f"{typstr(k)} = {typstr(v)}{sep}" 

295 messages.append(message) 

296 return sep.join(messages) 

297 

298 

299def _this_fLOG(*args, **kwargs): 

300 """ 

301 Other name private to this module. 

302 """ 

303 fLOG(*args, **kwargs) # pragma: no cover 

304 

305 

306def get_relative_path(folder, file, exists=True, absolute=True): 

307 """ 

308 private function, return the relative path or absolute between a folder and a file, 

309 use `relpath <https://docs.python.org/3/library/os.path.html#os.path.relpath>`_ 

310 

311 @param folder folder 

312 @param file file 

313 @param exists check existence 

314 @param absolute if True return a path which starts from the root 

315 @return relative path 

316 @rtype str 

317 """ 

318 if exists: 

319 if not os.path.exists(folder): 

320 raise PQHException(folder + " does not exist.") # pragma: no cover 

321 if not os.path.exists(file): 

322 raise PQHException(file + " does not exist.") # pragma: no cover 

323 sd = os.path.normpath(folder).replace("\\", "/").split("/") 

324 sf = os.path.normpath(file).replace("\\", "/").split("/") 

325 i = 0 

326 while i < len(sd): 

327 if i >= len(sf): 

328 break 

329 if sf[i] != sd[i]: 

330 break 

331 i += 1 

332 if absolute: 

333 res = copy.copy(sd) 

334 else: 

335 res = [] 

336 j = i 

337 while i < len(sd): 

338 i += 1 

339 res.append("..") # pragma: no cover 

340 res.extend(sf[j:]) 

341 return os.path.join(*res) 

342 

343 

344def download(httpfile, path_unzip=None, outfile=None, flatten=True, fLOG=None): 

345 """ 

346 Download a file to the folder path_unzip if not present, if the downloading is interrupted, 

347 the next time, it will start from where it stopped. Before downloading, the function creates a temporary file, 

348 which means the downloading has began. If the connection is lost, an exception is raised and the program stopped. 

349 Next time, the program will detect the existence of the temporary file and will start downloading from where it previously stopped. 

350 After it ends, the temporary file is removed. 

351 

352 @param httpfile (str) url 

353 @param path_unzip (str) path where to unzip the file, if None, choose GetPath () 

354 @param outfile (str) if None, the function will assign a filename unless this parameter is specified 

355 @param flatten (bool) put all files in the same folder (forget subfolders) 

356 @param fLOG (str) logging function 

357 @return local file name 

358 """ 

359 if fLOG is None: 

360 fLOG = noLOG 

361 if fLOG == "fLOG": 

362 fLOG = fLOG # pylint: disable=W0127 

363 if path_unzip is None: 

364 path_unzip = GetPath() 

365 file = _check_source(httpfile, path_unzip=path_unzip, 

366 outfile=outfile, flatten=flatten, fLOG=fLOG) 

367 return file 

368 

369 

370def unzip(file, path_unzip=None, outfile=None, flatten=True, fLOG=noLOG): 

371 """ 

372 Unzips a file into the temporary folder, 

373 the function expects to have only one zipped file. 

374 

375 @param file (str) zip files 

376 @param path_unzip (str) where to unzip the file, if None, choose GetPath () 

377 @param outfile (str) if None, the function will assign a filename unless this parameter is specified 

378 @param flatten (bool) put all files in the same folder (forget subfolders) 

379 @return expanded file name 

380 """ 

381 if path_unzip is None: 

382 path_unzip = GetPath() 

383 fLOG("[loghelper.flog] unzip file", file) 

384 file = _check_source(file, path_unzip=path_unzip, 

385 outfile=outfile, flatten=flatten, fLOG=fLOG) 

386 

387 nb = 0 

388 while not os.path.exists(file) and nb < 10: 

389 time.sleep(0.5) 

390 nb += 1 

391 

392 if not os.path.exists(file): 

393 raise FileNotFoundError(file) # pragma: no cover 

394 

395 return file 

396 

397 

398def _get_file_url(url, path): 

399 """ 

400 build a filename knowing an url 

401 

402 @param url url 

403 @param path where to download the file 

404 @return filename 

405 """ 

406 path = path + "/" + \ 

407 url.replace("/", "!") \ 

408 .replace(":", "") \ 

409 .replace(".", "-") \ 

410 .replace("=", "_") \ 

411 .replace("?", "_") 

412 spl = path.split("-") 

413 if len(spl) >= 2: 

414 ext = spl[len(spl) - 1].lower() 

415 if 2 <= len(ext) <= 3 and ext in [ 

416 "png", "jpeg", "jpg", "zip", "txt", "gif", "py", "cpp", 

417 "gz", "pdf", "tif", "py", "html", "h", "hpp", "cc"]: 

418 spl = path.split("-") 

419 spl = spl[:len(spl) - 1] 

420 path = "-".join(spl) + "." + ext 

421 return path 

422 

423 

424def _get_file_txt(zipname): 

425 """ 

426 build a filename knowing an url, same name but in default_path 

427 @param zipname filename of the zip 

428 @return filename 

429 """ 

430 file = os.path.split(zipname)[1] 

431 file = file.replace(".zip", ".txt") 

432 file = file.replace(".ZIP", ".txt") 

433 file = file.replace(".gz", ".txt") 

434 file = file.replace(".GZ", ".txt") 

435 return file 

436 

437 

438def _check_zip_file(filename, path_unzip, outfile, flatten=True, fLOG=noLOG): 

439 """ 

440 This function tests if a file is a zip file (extension zip), 

441 if it is the case, it unzips it into another file and return the new name, 

442 if the unzipped file already exists, the file is not unzipped a second time. 

443 

444 @param filename any filename (.zip or not), if txt, it has no effect 

445 @param path_unzip if None, unzip it where it stands, otherwise, place it into path 

446 @param outfile if None, the function will assign a filename unless this parameter is specified 

447 @param flatten unzip all files into the same directory 

448 @param fLOG logging function 

449 @return the unzipped file or filename if the format was not zip 

450 """ 

451 if path_unzip is None: 

452 raise ValueError("path_unzip cannot be None") # pragma: no cover 

453 file, ext = os.path.splitext(filename) 

454 ext = ext.lower() 

455 if ext == ".gz": 

456 

457 import gzip 

458 

459 if outfile is None: 

460 dest = filename.split("!") 

461 dest = dest[len(dest) - 1] 

462 ext = os.path.splitext(dest)[1] 

463 dest = dest.replace(ext, ".txt") 

464 path = os.path.split(filename) 

465 path = "/".join(path[:len(path) - 1]) 

466 dest = path + "/" + dest 

467 else: 

468 dest = outfile 

469 

470 if not os.path.exists(dest): 

471 file = gzip.GzipFile(filename, "r") 

472 if outfile is None: 

473 dest = os.path.split(dest)[1] 

474 dest = os.path.join(path_unzip, dest) 

475 

476 if os.path.exists(dest): 

477 st1 = datetime.datetime.utcfromtimestamp( 

478 os.stat(filename).st_mtime) 

479 st2 = datetime.datetime.utcfromtimestamp( 

480 os.stat(dest).st_mtime) 

481 if st2 > st1: 

482 fLOG("[loghelper.flog] ungzipping file (already done)", dest) 

483 return dest 

484 

485 fLOG("[loghelper.flog] ungzipping file", dest) 

486 f = open(dest, "w") 

487 data = file.read(2 ** 27) 

488 size = 0 

489 while len(data) > 0: 

490 size += len(data) 

491 fLOG("[loghelper.flog] ungzipping ", size, "bytes") 

492 if isinstance(data, bytes): 

493 f.write(bytes.decode(data)) 

494 else: 

495 f.write(data) 

496 data = file.read(2 ** 27) 

497 f.close() 

498 file.close() 

499 

500 return dest 

501 

502 if ext == ".zip": 

503 

504 try: 

505 file = zipfile.ZipFile(filename, "r") 

506 except Exception as e: # pragma: no cover 

507 fLOG("[loghelper.flog] problem with ", filename) 

508 raise e 

509 

510 if len(file.infolist()) != 1: 

511 if outfile is not None: 

512 raise PQHException( # pragma: no cover 

513 "the archive contains %d files and not one as you expected " 

514 "by filling outfile" % len(file.infolist())) 

515 fLOG("[loghelper.flog] unzip file (multiple) ", filename) 

516 # message = "\n".join ([ fi.filename for fi in file.infolist() ] ) 

517 # raise RuntimeError.YstException("ColumnInfoSet.load_from_file: file %s contains no file or more than one file\n" + message) 

518 folder = os.path.split(filename)[0] 

519 todo = 0 

520 _zip7_path = r"c:\Program Files\7-Zip" 

521 zip7 = not flatten and os.path.exists(_zip7_path) 

522 if zip7: 

523 fLOG("[loghelper.flog] using ", _zip7_path) # pragma: no cover 

524 wait = [] 

525 for info in file.infolist(): 

526 # equivalent to is_dir (Python 3.6+) 

527 if info.filename[-1] == '/': 

528 continue 

529 fileinside = info.filename 

530 dest = os.path.join(folder, fileinside) 

531 if not os.path.exists(dest): 

532 fol = os.path.split(dest)[0] 

533 if not os.path.exists(fol): 

534 os.makedirs(fol) 

535 if os.path.exists(dest): 

536 st1 = datetime.datetime.utcfromtimestamp( 

537 os.stat(filename).st_mtime) 

538 st2 = datetime.datetime.utcfromtimestamp( 

539 os.stat(dest).st_mtime) 

540 if st2 > st1: 

541 continue 

542 

543 if not sys.platform.startswith("win") or not zip7: 

544 data = file.read(fileinside) 

545 if flatten: 

546 dest2 = os.path.split(dest)[1] 

547 dest2 = os.path.join(path_unzip, dest2) 

548 else: 

549 dest2 = dest 

550 fLOG("[loghelper.flog] unzipping file", dest2) 

551 wait.append(dest2) 

552 f = open(dest2, "wb" if isinstance( 

553 data, bytes) else "w") 

554 f.write(data) 

555 f.close() 

556 else: 

557 todo += 1 

558 

559 if todo > 0 and zip7: # pragma: no cover 

560 dest = os.path.realpath(path_unzip) 

561 cmd = '"' + _zip7_path + \ 

562 f'\\7z.exe" x -y -r -o"{dest}" "{os.path.realpath(filename)}"' 

563 out, err = run_cmd(cmd, wait=True) 

564 if len(err) > 0: 

565 raise PQHException( 

566 f"command {cmd} failed\n{err}") 

567 if "Error" in out: 

568 raise PQHException( 

569 f"command {cmd} failed\n{out}") 

570 else: 

571 dest = path_unzip 

572 

573 file.close() 

574 

575 ch = False 

576 while not ch: 

577 ch = True 

578 for a in wait: 

579 if not os.path.exists(a): # pragma: no cover 

580 ch = False 

581 break 

582 time.sleep(0.5) 

583 

584 return dest 

585 

586 else: 

587 for info in file.infolist(): 

588 fileinside = info.filename 

589 

590 path = os.path.split(filename) 

591 dest = outfile if outfile is not None else path[ 

592 0] + "/" + fileinside 

593 if not os.path.exists(dest): 

594 data = file.read(fileinside) 

595 if outfile is None: 

596 if flatten: 

597 dest = os.path.split(dest)[1] 

598 dest = os.path.join(path_unzip, dest) 

599 else: 

600 dest = os.path.join(path_unzip, dest) 

601 

602 if os.path.exists(dest): 

603 st1 = datetime.datetime.utcfromtimestamp( 

604 os.stat(filename).st_mtime) 

605 st2 = datetime.datetime.utcfromtimestamp( 

606 os.stat(dest).st_mtime) 

607 if st2 > st1: 

608 fLOG( 

609 "[loghelper.flog] unzipping one file (already done)", dest) 

610 return dest 

611 

612 fLOG("[loghelper.flog] unzipping one file", dest) 

613 if isinstance(data, bytes): 

614 f = open(dest, "wb") 

615 f.write(data) 

616 else: 

617 f = open(dest, "w") 

618 f.write(data) 

619 f.close() 

620 file.close() 

621 return dest 

622 

623 return filename 

624 

625 

626def _first_more_recent(f1, path): 

627 """ 

628 Checks if the first file (opened url) 

629 is more recent of the second file (path). 

630 @param f1 opened url 

631 @param path path name 

632 @return boolean 

633 """ 

634 typstr = str 

635 s = typstr(f1.info()) 

636 da = re.compile("Last[-]Modified: (.+) GMT").search(s) 

637 if da is None: 

638 return True 

639 else: # pragma: no cover 

640 da = da.groups()[0] 

641 gr = re.compile( 

642 "[\\w, ]* ([ \\d]{2}) ([\\w]{3}) ([\\d]{4}) " 

643 "([\\d]{2}):([\\d]{2}):([\\d]{2})").search(da) 

644 if gr is None: 

645 return True 

646 gr = gr.groups() 

647 dau = datetime.datetime( 

648 int(gr[2]), 

649 flog_static.store_log_values["month_date"][gr[1].lower()], 

650 int(gr[0]), int(gr[3]), int(gr[4]), int(gr[5])) 

651 

652 p = time.ctime(os.path.getmtime(path)) 

653 gr = re.compile( 

654 "[\\w, ]* ([\\w]{3}) ([ \\d]{2}) ([\\d]{2}):([\\d]{2}):" 

655 "([\\d]{2}) ([\\d]{4})").search(p) 

656 if gr is None: 

657 return True 

658 gr = gr.groups() 

659 da = datetime.datetime( 

660 int(gr[5]), 

661 flog_static.store_log_values["month_date"][gr[0].lower()], 

662 int(gr[1]), int(gr[2]), int(gr[3]), int(gr[4])) 

663 

664 file = da 

665 return dau > file 

666 

667 

668def _check_url_file(url, path_download, outfile, fLOG=noLOG): 

669 """If *url* is an url, download the file and return the downloaded 

670 if it has already been downloaded, it is not downloaded again. 

671 @param url url 

672 @param path_download download the file here 

673 @param outfile if None, the function will assign a filename unless this parameter is specified 

674 @param fLOG logging function 

675 @return the filename 

676 """ 

677 urll = url.lower() 

678 if "http://" in urll or "https://" in urll: 

679 dest = outfile if outfile is not None else _get_file_url( 

680 url, path_download) 

681 down = False 

682 nyet = dest + ".notyet" 

683 

684 if os.path.exists(dest) and not os.path.exists(nyet): 

685 try: # pragma: no cover 

686 fLOG("[loghelper.flog] trying to connect", url) 

687 f1 = urllib_request.urlopen(url) 

688 down = _first_more_recent(f1, dest) 

689 newdate = down 

690 f1.close() 

691 except IOError: # pragma: no cover 

692 fLOG( 

693 "unable to connect Internet, working offline for url", url) 

694 down = False 

695 else: 

696 down = True 

697 newdate = False 

698 

699 if down: 

700 if newdate: 

701 fLOG( # pragma: no cover 

702 "[loghelper.flog] downloading (updated) ", url) 

703 else: 

704 fLOG("[loghelper.flog] downloading ", url) 

705 

706 if (len(url) > 4 and 

707 url[-4].lower() in [".txt", ".csv", ".tsv", ".log", '.tmpl']): 

708 fLOG( # pragma: no cover 

709 f"[loghelper.flog] creating text file '{dest}'") 

710 formatopen = "w" # pragma: no cover 

711 else: 

712 fLOG( 

713 f"[loghelper.flog] creating binary file '{dest}'") 

714 formatopen = "wb" 

715 

716 if os.path.exists(nyet): # pragma: no cover 

717 size = os.stat(dest).st_size 

718 fLOG("[loghelper.flog] resume downloading (stop at", 

719 size, f") from '{url}'") 

720 request = urllib_request.Request(url) 

721 request.add_header("Range", "bytes=%d-" % size) 

722 fu = urllib_request.urlopen(request) 

723 f = open(dest, formatopen.replace( # pylint: disable=W1501 

724 "w", "a")) # pylint: disable=W1501 

725 else: 

726 fLOG("[loghelper.flog] downloading ", url) 

727 request = urllib_request.Request(url) 

728 fu = urllib_request.urlopen(url) 

729 f = open(dest, formatopen) 

730 

731 open(nyet, "w").close() 

732 c = fu.read(2 ** 21) 

733 size = 0 

734 while len(c) > 0: 

735 size += len(c) 

736 fLOG("[loghelper.flog] size", size) 

737 f.write(c) 

738 f.flush() 

739 c = fu.read(2 ** 21) 

740 fLOG("[loghelper.flog] end downloading") 

741 f.close() 

742 fu.close() 

743 os.remove(nyet) 

744 

745 url = dest 

746 return url 

747 

748 

749def _check_source(fileurl, path_unzip, outfile, flatten=True, fLOG=noLOG): 

750 """ 

751 Check the existence of a file, downloads it if not existing. 

752 

753 @param fileurl can be an url, a zip file, a text file 

754 @param path_unzip if None, unzip the file where it stands, otherwise, put it in path 

755 @param outfile if None, the function will assign a filename unless this parameter is specified 

756 @param flatten extract all files into the same directory 

757 @param fLOG logging function 

758 @return a text file name 

759 

760 if it is: 

761 - an url: download it and copy it into default_path 

762 - a zipfile: beside the true file 

763 - a text file: do nothing 

764 

765 If the file has already been downloaded and unzipped, it is not done twice. 

766 """ 

767 if outfile is not None and os.path.splitext( 

768 outfile)[1].lower() == os.path.splitext(fileurl)[1].lower(): 

769 file = _check_url_file( 

770 fileurl, path_download=path_unzip, outfile=outfile, fLOG=fLOG) 

771 return file 

772 else: 

773 file = _check_url_file( 

774 fileurl, path_download=path_unzip, outfile=None, fLOG=fLOG) 

775 txt = _check_zip_file( 

776 file, path_unzip=path_unzip, outfile=outfile, fLOG=fLOG, flatten=flatten) 

777 if not os.path.exists(txt): # pragma: no cover 

778 message = "_check_source: unable to find file '" + \ 

779 txt + "' source '" + fileurl + "'" 

780 raise PQHException(message) 

781 return txt 

782 

783 

784def get_prefix(): 

785 """ 

786 Returns a prefix for a file based on time. 

787 """ 

788 typstr = str 

789 t = datetime.datetime(2010, 1, 1).now() 

790 t = typstr(t).replace(":", "_").replace("/", "_").replace(" ", "_") 

791 t += "_" + typstr(random.randint(0, 1000000)) + "_" 

792 return os.path.join(GetPath(), "temp_" + t) 

793 

794 

795def removedirs(folder, silent=False, use_command_line=False): 

796 """ 

797 Removes all files and folders in *folder*. 

798 

799 @param folder folder 

800 @param silent silent mode or not 

801 @param use_command_line see below 

802 @return list of not remove files or folders 

803 

804 Sometimes it fails due to PermissionError exception, 

805 in that case, you can try to remove the folder through the command 

806 line ``rmdir /q /s + <folder>``. In that case, the function 

807 does not return the list of removed files but the output of 

808 the command line 

809 """ 

810 if use_command_line: 

811 if sys.platform.startswith("win"): # pragma: no cover 

812 out, err = run_cmd("rmdir /s /q " + folder, wait=True) 

813 else: 

814 out, err = run_cmd("rm -Rf " + folder, wait=True) 

815 if len(err) > 0: # pragma: no cover 

816 raise RuntimeError(f"Unable to remove '{folder}'\n{err}") 

817 return out 

818 

819 file, rep = [], [] 

820 for r, d, f in os.walk(folder): 

821 for a in d: 

822 rep.append(os.path.join(r, a)) 

823 for a in f: 

824 file.append(os.path.join(r, a)) 

825 impos = [] 

826 file.sort() 

827 rep.sort(reverse=True) 

828 for f in file: 

829 try: 

830 if os.path.exists(f): 

831 os.remove(f) 

832 except Exception as e: # pragma: no cover 

833 typstr = str 

834 fLOG( 

835 "Unable to remove file '{0}' --- {1}".format(f, typstr(e).replace("\n", " "))) 

836 if silent: 

837 impos.append(f) 

838 else: 

839 raise 

840 for f in rep: 

841 try: 

842 if os.path.exists(f): 

843 os.removedirs(f) 

844 except Exception as e: # pragma: no cover 

845 typstr = str 

846 fLOG( 

847 "Unable to remove folder '{0}' --- {1}".format(f, typstr(e).replace("\n", " "))) 

848 if silent: 

849 impos.append(f) 

850 else: 

851 raise 

852 

853 if os.path.exists(folder): 

854 try: 

855 os.rmdir(folder) 

856 except Exception as e: # pragma: no cover 

857 impos.append(folder) 

858 return impos 

859 

860 

861def guess_type_value(x, none=None): 

862 """ 

863 Guessees the type of a value. 

864 

865 @param x type 

866 @param none if True and all values are empty, return None 

867 @return type 

868 

869 @warning if an integer starts with a zero, then it is a string 

870 """ 

871 try: 

872 int(x) 

873 if x[0] == '0' and len(x) > 1: 

874 return str 

875 return int if len(x) < 9 else str 

876 except ValueError: 

877 try: 

878 x = float(x) 

879 return float # pragma: no cover 

880 except ValueError: 

881 if none: 

882 if x is None: 

883 return None # pragma: no cover 

884 try: 

885 if len(x) > 0: 

886 return str # pragma: no cover 

887 return None 

888 except Exception: # pragma: no cover 

889 return None 

890 return str 

891 

892 

893def guess_type_value_type(none=True): 

894 """ 

895 @param none if True and all values are empty, return None 

896 @return the list of types recognized by guess_type_value 

897 """ 

898 typstr = str 

899 return [None, typstr, int, float] if none else [typstr, int, float] 

900 

901 

902def get_default_value_type(ty, none=True): 

903 """ 

904 @param ty type in guess_type_value_type 

905 @param none if True and all values are empty, return None 

906 @return a default value for this type 

907 """ 

908 if ty is None and none: 

909 return None 

910 if ty == str: 

911 return "" 

912 if ty == int: 

913 return 0 

914 if ty == decimal.Decimal: 

915 return decimal.Decimal(0) 

916 if ty == float: 

917 return 0.0 

918 raise PQHException( # pragma: no cover 

919 "type expected in " + str(guess_type_value_type())) 

920 

921 

922def guess_type_list(args, tolerance=0.01, none=True): 

923 """ 

924 guess the type of a list 

925 @param args list 

926 @param tolerance let's denote m as the frequency of the most representative type, 

927 and m2 the second one, if m2 > m * tolerance --> str 

928 @param none if True and all values are empty, return None 

929 @return type, length (order of preference (int, float, str)) 

930 the parameter length has a meaning only for str result 

931 """ 

932 defa = None if none else str 

933 length = 0 

934 typstr = str 

935 if args in [typstr, float, int, None, decimal.Decimal]: 

936 raise PQHException("this case is unexpected %s" % 

937 typstr(args)) # pragma: no cover 

938 

939 if len(args) == 0: 

940 res = defa 

941 

942 elif len(args) == 1: 

943 res = guess_type_value(args[0], none) 

944 if res == typstr: 

945 length = len(args[0]) 

946 else: 

947 count = {} 

948 for x in args: 

949 t = guess_type_value(x, none) 

950 length = max(length, len(x)) 

951 if t in count: 

952 count[t] += 1 

953 else: 

954 count[t] = 1 

955 

956 val = [(v, k) for k, v in count.items()] 

957 val.sort(reverse=True) 

958 if len(val) == 1: 

959 res = val[0][1] 

960 elif val[0][0] * tolerance < val[1][0]: 

961 res = str 

962 else: 

963 res = val[0][1] 

964 

965 if res != typstr: 

966 olength = 0 

967 else: 

968 if length > 0: 

969 x = math.log(length) / math.log(2) + 0.99999 

970 x = int(x) 

971 olength = math.exp(x * math.log(2)) + 0.9999 

972 olength = int(olength) * 2 

973 else: 

974 olength = length 

975 

976 return res, olength 

977 

978 

979def guess_machine_parameter(): 

980 """ 

981 Determines many parameters on this machine: 

982 - machine name 

983 - user name 

984 - domain... 

985 @return dictionary { name : value } 

986 """ 

987 val = ["COMPUTERNAME", "NUMBER_OF_PROCESSORS", "OS", 

988 "PATH", "USERDOMAIN", "USERNAME", "USERPROFILE", 

989 "windir", "TEMP", "USER"] 

990 res = {} 

991 sep = ";" if sys.platform.startswith("win") else ":" 

992 for v in val: 

993 if v == "PATH": 

994 x = os.getenv(v) 

995 x = x.split(sep) 

996 res[v] = x 

997 else: 

998 res[v] = os.getenv(v) 

999 

1000 if not sys.platform.startswith("win"): 

1001 if "TEMP" not in res or res["TEMP"] is None: 

1002 res["TEMP"] = "/tmp" 

1003 

1004 return res 

1005 

1006 

1007def IsEmptyString(s): 

1008 """ 

1009 Empty string or not? 

1010 

1011 :param s: any string (str, None) 

1012 :return: is it empty or not? 

1013 :rtype: bool 

1014 :raises PQHException: when a type is unexpected 

1015 """ 

1016 if s is None: 

1017 return True 

1018 if isinstance(s, str): 

1019 return len(s) == 0 

1020 raise PQHException( # pragma: no cover 

1021 f"the type is unexpected {type(s)}") 

1022 

1023 

1024def load_content_file_with_encoding(filename): 

1025 """ 

1026 Tries different encoding to load a file, tries utf8, latin1 and None. 

1027 @param filename filename 

1028 @return couple (content, encoding) 

1029 """ 

1030 error = None 

1031 for enc in ["utf8", "latin1", None]: 

1032 try: 

1033 with open(filename, "r", encoding=enc) as f: 

1034 content = f.read() 

1035 return content, enc 

1036 except Exception as e: # pragma: no cover 

1037 error = e 

1038 raise error # pragma: no cover