Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
2"""
3@file
4@brief logging functionalities
6The function fLOG (or fLOG) is used to logged everything into a log file.
8::
10 from pyquickhelper.loghelper.flog import fLOG
11 fLOG(OutputPrint = True) # the logs are also displayed in the output stream
12 fLOG(LogPath = "...") # chanages the path returned by GetPath
13 fLOG("un", "deux", 4, ["gt"]) # log everything in a log file
15 from pyquickhelper.loghelper.flog import GetPath ()
16 print GetPath() # return the log path (file temp_log.txt)
18 fLOG(LogPath = "c:/temp/log_path") # change the log path, creates it if it does not exist
20@warning This module inserts static variable in module :epkg:`*py:sys`.
21 I was done to deal with several instances of the same module
22 in earlier versions of :epkg:`python`.
23"""
24import copy
25import datetime
26import decimal
27import math
28import os
29import pprint
30import random
31import sys
32import time
33import re
34import zipfile
35import urllib.request as urllib_request
36from .flog_fake_classes import FlogStatic, LogFakeFileStream, LogFileStream, PQHException
37from .run_cmd import run_cmd
40flog_static = FlogStatic()
43def init(path=None, filename=None, create=True, path_add=None):
44 """
45 initialisation
46 @param path new path, if path==*"###"*, then uses ``d:\\temp\\log_pyquickhelper``
47 if it exists or ``c:\\temp\\log_pyquickhelper`` if not
48 @param filename new filename
49 @param create force the creation
50 @param path_add subfolder to append to the current folder
52 This function is also called when LogPath is specified while calling function fLOG.
53 """
54 if path_add is None:
55 path_add = [] # pragma: no cover
56 if path is None:
57 path = flog_static.store_log_values["__log_path"]
59 if path == "###":
60 if sys.platform.startswith("win"): # pragma: no cover
61 path = "d:\\temp" if os.path.exists("d:\\temp") else "c:\\temp"
62 path = os.path.join(path, "log_pyquickhelper")
63 else:
64 path = "/tmp"
65 path = os.path.join(path, "log_pyquickhelper")
67 if len(path_add) > 0: # pragma: no cover
68 if not isinstance(path_add, list):
69 path_add = [path_add]
70 temp = []
71 for p in path_add:
72 spl = os.path.splitext(p)
73 temp.append(spl[0])
74 path = os.path.join(path, *temp)
76 if filename is None:
77 filename = flog_static.store_log_values["__log_file_name"]
79 if (flog_static.store_log_values["__log_path"] != path or flog_static.store_log_values["__log_file_name"] != filename) \
80 and flog_static.store_log_values["__log_file"] is not None:
81 flog_static.store_log_values["__log_file"].close()
82 flog_static.store_log_values["__log_file"] = None
83 flog_static.store_log_values["__log_path"] = path
84 flog_static.store_log_values["__log_file_name"] = filename
86 if create:
87 if not os.path.exists(flog_static.store_log_values["__log_path"]):
88 os.makedirs(flog_static.store_log_values["__log_path"])
89 else: # pragma: no cover
90 if not os.path.exists(flog_static.store_log_values["__log_path"]):
91 raise PQHException(
92 "unable to find path " + flog_static.store_log_values["__log_path"])
95def GetSepLine():
96 """
97 Always returns ``\\n``
98 """
99 return "\n" # pragma: no cover
102def GetPath():
103 """
104 returns a path where the log file is stored.
105 @return path to the logs
106 """
107 return flog_static.store_log_values["__log_path"]
110def Print(redirect=True):
111 """
112 if True, redirect everything which is displayed to the standard output
113 """
114 lock = flog_static.store_log_values.get("Lock", False)
115 if not lock:
116 flog_static.store_log_values["__log_display"] = redirect
119def GetLogFile(physical=False, filename=None):
120 """
121 Returns a file name containing the log
123 :param physical: use a physical file or not
124 :param filename: file name (if physical is True, default value is ``temp_log.txt``)
125 :return: a pointer to a log file
126 :rtype: str
127 :raises OSError: if this file cannot be created
128 """
129 if flog_static.store_log_values["__log_file"] is None:
130 if physical:
131 path = GetPath()
132 if flog_static.store_log_values["__log_file_name"] is None:
133 if os.path.exists(path):
134 flog_static.store_log_values["__log_file_name"] = os.path.join(
135 path, flog_static.store_log_values["__log_const"])
136 else:
137 raise PQHException( # pragma: no cover
138 "unable to create a log file in folder " + path)
140 if not isinstance(flog_static.store_log_values["__log_file_name"], str):
141 flog_static.store_log_values["__log_file"] = flog_static.store_log_values[
142 "__log_file_name"]
143 else:
144 flog_static.store_log_values[
145 "__log_file"] = LogFileStream(filename=filename)
146 else:
147 flog_static.store_log_values["__log_file"] = LogFakeFileStream()
149 return flog_static.store_log_values["__log_file"]
152def noLOG(*args, **kwargs):
153 """
154 does nothing
155 """
156 if len(args) > 0:
157 return args[0]
158 return None # pragma: no cover
161def fLOG(*args, **kwargs):
162 """
163 Builds a message on a single line with the date, it deals with encoding issues.
165 @param args list of fields
166 @param kwargs dictionary of fields (see below)
167 @exception OSError When the log file cannot be created.
169 About parameter *p*:
171 - if *p* contains *OutputPrint*, call ``Print(OutputPrint)``
172 - if *p* contains *LogPath*, it calls ``init(v)``
173 - if *p* contains *LogFile*, it changes the log file name
174 (it creates a new one, the previous is closed).
175 - if *p* contains *LogPathAdd*, it adds this path to the temporary file
176 - if *p* contains *Lock*, it locks option *OutputPrint*
177 - if *p* contains *UnLock*, it unlocks option *OutputPrint*
178 - if *p* contains *_pp*, it uses :epkg:`*py:pprint`
180 Example:
182 ::
184 fLOG (LogPath = "###", LogPathAdd = __file__, OutputPrint = True)
186 .. faqref::
187 :title: How to activate the logs?
189 The following instruction will do:
191 ::
193 fLOG(OutputPrint=True)
195 To log everything into a file:
197 ::
199 fLOG(OutputPrint=True, LogFile="log_file.txt")
201 Parameter *OutputStream* allows to print
202 the message on a different stream.
203 """
204 path_add = kwargs.get("LogPathAdd", [])
205 outstream = kwargs.get('OutputStream', None)
206 if outstream is not None:
207 del kwargs['OutputStream']
209 lock = kwargs.get("Lock", None)
210 if lock is not None:
211 flog_static.store_log_values["Lock"] = lock
213 if "LogFile" in kwargs and "LogPath" in kwargs:
214 init(kwargs["LogPath"], kwargs["LogFile"])
215 elif "LogFile" in kwargs:
216 init(filename=kwargs["LogFile"], path_add=path_add)
217 elif "LogPath" in kwargs:
218 init(path=kwargs["LogPath"], path_add=path_add)
220 def myprint(s):
221 if outstream is not None:
222 outstream.write(s + '\n')
223 else:
224 print(s) # pragma: no cover
226 if "OutputPrint" in kwargs:
227 Print(kwargs["OutputPrint"])
229 if "LogFile" in kwargs:
230 GetLogFile(True, filename=kwargs["LogFile"])
232 message = fLOGFormat(flog_static.store_log_values["__log_file_sep"],
233 *args, **kwargs)
234 GetLogFile().write(message)
235 if flog_static.store_log_values["__log_display"]:
236 try:
237 myprint(message.strip("\r\n"))
238 except UnicodeEncodeError: # pragma: no cover
239 mes = "\n".join(repr(message.strip("\r\n")).split("\\n"))
240 try:
241 myprint(mes)
242 except UnicodeEncodeError:
243 mes2 = mes.encode("utf-8").decode("cp1252", errors="ignore")
244 myprint(mes2)
246 GetLogFile().flush()
247 if len(args) > 0:
248 return args[0]
249 return None
252def fLOGFormat(sep, *args, **kwargs):
253 """
254 Formats a message.
256 @param sep line separator
257 @param args list of anything
258 @param kwargs dictioanry of anything
259 @return string
261 if *_pp* is True, the function uses :epkg:`*py:pprint`.
262 """
263 upp = kwargs.get('_pp', False)
264 dt = datetime.datetime(2009, 1, 1).now()
265 typstr = str
266 if len(args) > 0:
267 def _str_process(s):
268 if isinstance(s, str):
269 if upp:
270 return pprint.pformat(s)
271 return s
272 if isinstance(s, bytes):
273 return s.decode("utf8")
274 try:
275 if upp:
276 return pprint.pformat(s)
277 return typstr(s)
278 except Exception as e: # pragma: no cover
279 raise Exception( # pragma: no cover
280 "unable to convert s into string: type(s)=" + str(type(s))) from e
282 message = (str(dt).split(".", maxsplit=1)[0] + " " +
283 " ".join([_str_process(s) for s in args]) + sep)
284 st = " "
285 else:
286 message = typstr(dt).split(".", maxsplit=1)[0] + " "
287 st = " "
289 messages = [message]
291 for k, v in kwargs.items():
292 if k in ("OutputPrint", '_pp') and v:
293 continue
294 message = st + "%s = %s%s" % (typstr(k), typstr(v), sep)
295 messages.append(message)
296 return sep.join(messages)
299def _this_fLOG(*args, **kwargs):
300 """
301 Other name private to this module.
302 """
303 fLOG(*args, **kwargs) # pragma: no cover
306def get_relative_path(folder, file, exists=True, absolute=True):
307 """
308 private function, return the relative path or absolute between a folder and a file,
309 use `relpath <https://docs.python.org/3/library/os.path.html#os.path.relpath>`_
311 @param folder folder
312 @param file file
313 @param exists check existence
314 @param absolute if True return a path which starts from the root
315 @return relative path
316 @rtype str
317 """
318 if exists:
319 if not os.path.exists(folder):
320 raise PQHException(folder + " does not exist.") # pragma: no cover
321 if not os.path.exists(file):
322 raise PQHException(file + " does not exist.") # pragma: no cover
323 sd = os.path.normpath(folder).replace("\\", "/").split("/")
324 sf = os.path.normpath(file).replace("\\", "/").split("/")
325 i = 0
326 while i < len(sd):
327 if i >= len(sf):
328 break
329 if sf[i] != sd[i]:
330 break
331 i += 1
332 if absolute:
333 res = copy.copy(sd)
334 else:
335 res = []
336 j = i
337 while i < len(sd):
338 i += 1
339 res.append("..") # pragma: no cover
340 res.extend(sf[j:])
341 return os.path.join(*res)
344def download(httpfile, path_unzip=None, outfile=None, flatten=True, fLOG=None):
345 """
346 Download a file to the folder path_unzip if not present, if the downloading is interrupted,
347 the next time, it will start from where it stopped. Before downloading, the function creates a temporary file,
348 which means the downloading has began. If the connection is lost, an exception is raised and the program stopped.
349 Next time, the program will detect the existence of the temporary file and will start downloading from where it previously stopped.
350 After it ends, the temporary file is removed.
352 @param httpfile (str) url
353 @param path_unzip (str) path where to unzip the file, if None, choose GetPath ()
354 @param outfile (str) if None, the function will assign a filename unless this parameter is specified
355 @param flatten (bool) put all files in the same folder (forget subfolders)
356 @param fLOG (str) logging function
357 @return local file name
358 """
359 if fLOG is None:
360 fLOG = noLOG
361 if fLOG == "fLOG":
362 fLOG = fLOG # pylint: disable=W0127
363 if path_unzip is None:
364 path_unzip = GetPath()
365 file = _check_source(httpfile, path_unzip=path_unzip,
366 outfile=outfile, flatten=flatten, fLOG=fLOG)
367 return file
370def unzip(file, path_unzip=None, outfile=None, flatten=True, fLOG=noLOG):
371 """
372 Unzips a file into the temporary folder,
373 the function expects to have only one zipped file.
375 @param file (str) zip files
376 @param path_unzip (str) where to unzip the file, if None, choose GetPath ()
377 @param outfile (str) if None, the function will assign a filename unless this parameter is specified
378 @param flatten (bool) put all files in the same folder (forget subfolders)
379 @return expanded file name
380 """
381 if path_unzip is None:
382 path_unzip = GetPath()
383 fLOG("[loghelper.flog] unzip file", file)
384 file = _check_source(file, path_unzip=path_unzip,
385 outfile=outfile, flatten=flatten, fLOG=fLOG)
387 nb = 0
388 while not os.path.exists(file) and nb < 10:
389 time.sleep(0.5)
390 nb += 1
392 if not os.path.exists(file):
393 raise FileNotFoundError(file) # pragma: no cover
395 return file
398def _get_file_url(url, path):
399 """
400 build a filename knowing an url
402 @param url url
403 @param path where to download the file
404 @return filename
405 """
406 path = path + "/" + \
407 url.replace("/", "!") \
408 .replace(":", "") \
409 .replace(".", "-") \
410 .replace("=", "_") \
411 .replace("?", "_")
412 spl = path.split("-")
413 if len(spl) >= 2:
414 ext = spl[len(spl) - 1].lower()
415 if 2 <= len(ext) <= 3 and ext in [
416 "png", "jpeg", "jpg", "zip", "txt", "gif", "py", "cpp",
417 "gz", "pdf", "tif", "py", "html", "h", "hpp", "cc"]:
418 spl = path.split("-")
419 spl = spl[:len(spl) - 1]
420 path = "-".join(spl) + "." + ext
421 return path
424def _get_file_txt(zipname):
425 """
426 build a filename knowing an url, same name but in default_path
427 @param zipname filename of the zip
428 @return filename
429 """
430 file = os.path.split(zipname)[1]
431 file = file.replace(".zip", ".txt")
432 file = file.replace(".ZIP", ".txt")
433 file = file.replace(".gz", ".txt")
434 file = file.replace(".GZ", ".txt")
435 return file
438def _check_zip_file(filename, path_unzip, outfile, flatten=True, fLOG=noLOG):
439 """
440 This function tests if a file is a zip file (extension zip),
441 if it is the case, it unzips it into another file and return the new name,
442 if the unzipped file already exists, the file is not unzipped a second time.
444 @param filename any filename (.zip or not), if txt, it has no effect
445 @param path_unzip if None, unzip it where it stands, otherwise, place it into path
446 @param outfile if None, the function will assign a filename unless this parameter is specified
447 @param flatten unzip all files into the same directory
448 @param fLOG logging function
449 @return the unzipped file or filename if the format was not zip
450 """
451 if path_unzip is None:
452 raise ValueError("path_unzip cannot be None") # pragma: no cover
453 file, ext = os.path.splitext(filename)
454 ext = ext.lower()
455 if ext == ".gz":
457 import gzip
459 if outfile is None:
460 dest = filename.split("!")
461 dest = dest[len(dest) - 1]
462 ext = os.path.splitext(dest)[1]
463 dest = dest.replace(ext, ".txt")
464 path = os.path.split(filename)
465 path = "/".join(path[:len(path) - 1])
466 dest = path + "/" + dest
467 else:
468 dest = outfile
470 if not os.path.exists(dest):
471 file = gzip.GzipFile(filename, "r")
472 if outfile is None:
473 dest = os.path.split(dest)[1]
474 dest = os.path.join(path_unzip, dest)
476 if os.path.exists(dest):
477 st1 = datetime.datetime.utcfromtimestamp(
478 os.stat(filename).st_mtime)
479 st2 = datetime.datetime.utcfromtimestamp(
480 os.stat(dest).st_mtime)
481 if st2 > st1:
482 fLOG("[loghelper.flog] ungzipping file (already done)", dest)
483 return dest
485 fLOG("[loghelper.flog] ungzipping file", dest)
486 f = open(dest, "w")
487 data = file.read(2 ** 27)
488 size = 0
489 while len(data) > 0:
490 size += len(data)
491 fLOG("[loghelper.flog] ungzipping ", size, "bytes")
492 if isinstance(data, bytes):
493 f.write(bytes.decode(data))
494 else:
495 f.write(data)
496 data = file.read(2 ** 27)
497 f.close()
498 file.close()
500 return dest
502 if ext == ".zip":
504 try:
505 file = zipfile.ZipFile(filename, "r")
506 except Exception as e: # pragma: no cover
507 fLOG("[loghelper.flog] problem with ", filename)
508 raise e
510 if len(file.infolist()) != 1:
511 if outfile is not None:
512 raise PQHException( # pragma: no cover
513 "the archive contains %d files and not one as you expected "
514 "by filling outfile" % len(file.infolist()))
515 fLOG("[loghelper.flog] unzip file (multiple) ", filename)
516 #message = "\n".join ([ fi.filename for fi in file.infolist() ] )
517 #raise Exception.YstException("ColumnInfoSet.load_from_file: file %s contains no file or more than one file\n" + message)
518 folder = os.path.split(filename)[0]
519 todo = 0
520 _zip7_path = r"c:\Program Files\7-Zip"
521 zip7 = not flatten and os.path.exists(_zip7_path)
522 if zip7:
523 fLOG("[loghelper.flog] using ", _zip7_path) # pragma: no cover
524 wait = []
525 for info in file.infolist():
526 # equivalent to is_dir (Python 3.6+)
527 if info.filename[-1] == '/':
528 continue
529 fileinside = info.filename
530 dest = os.path.join(folder, fileinside)
531 if not os.path.exists(dest):
532 fol = os.path.split(dest)[0]
533 if not os.path.exists(fol):
534 os.makedirs(fol)
535 if os.path.exists(dest):
536 st1 = datetime.datetime.utcfromtimestamp(
537 os.stat(filename).st_mtime)
538 st2 = datetime.datetime.utcfromtimestamp(
539 os.stat(dest).st_mtime)
540 if st2 > st1:
541 continue
543 if not sys.platform.startswith("win") or not zip7:
544 data = file.read(fileinside)
545 if flatten:
546 dest2 = os.path.split(dest)[1]
547 dest2 = os.path.join(path_unzip, dest2)
548 else:
549 dest2 = dest
550 fLOG("[loghelper.flog] unzipping file", dest2)
551 wait.append(dest2)
552 f = open(dest2, "wb" if isinstance(
553 data, bytes) else "w")
554 f.write(data)
555 f.close()
556 else:
557 todo += 1
559 if todo > 0 and zip7: # pragma: no cover
560 dest = os.path.realpath(path_unzip)
561 cmd = '"' + _zip7_path + \
562 '\\7z.exe" x -y -r -o"%s" "%s"' % (dest,
563 os.path.realpath(filename))
564 out, err = run_cmd(cmd, wait=True)
565 if len(err) > 0:
566 raise PQHException(
567 "command {0} failed\n{1}".format(cmd, err))
568 if "Error" in out:
569 raise PQHException(
570 "command {0} failed\n{1}".format(cmd, out))
571 else:
572 dest = path_unzip
574 file.close()
576 ch = False
577 while not ch:
578 ch = True
579 for a in wait:
580 if not os.path.exists(a): # pragma: no cover
581 ch = False
582 break
583 time.sleep(0.5)
585 return dest
587 else:
588 for info in file.infolist():
589 fileinside = info.filename
591 path = os.path.split(filename)
592 dest = outfile if outfile is not None else path[
593 0] + "/" + fileinside
594 if not os.path.exists(dest):
595 data = file.read(fileinside)
596 if outfile is None:
597 if flatten:
598 dest = os.path.split(dest)[1]
599 dest = os.path.join(path_unzip, dest)
600 else:
601 dest = os.path.join(path_unzip, dest)
603 if os.path.exists(dest):
604 st1 = datetime.datetime.utcfromtimestamp(
605 os.stat(filename).st_mtime)
606 st2 = datetime.datetime.utcfromtimestamp(
607 os.stat(dest).st_mtime)
608 if st2 > st1:
609 fLOG(
610 "[loghelper.flog] unzipping one file (already done)", dest)
611 return dest
613 fLOG("[loghelper.flog] unzipping one file", dest)
614 if isinstance(data, bytes):
615 f = open(dest, "wb")
616 f.write(data)
617 else:
618 f = open(dest, "w")
619 f.write(data)
620 f.close()
621 file.close()
622 return dest
624 return filename
627def _first_more_recent(f1, path):
628 """
629 Checks if the first file (opened url)
630 is more recent of the second file (path).
631 @param f1 opened url
632 @param path path name
633 @return boolean
634 """
635 typstr = str
636 s = typstr(f1.info())
637 da = re.compile("Last[-]Modified: (.+) GMT").search(s)
638 if da is None:
639 return True
640 else: # pragma: no cover
641 da = da.groups()[0]
642 gr = re.compile(
643 "[\\w, ]* ([ \\d]{2}) ([\\w]{3}) ([\\d]{4}) "
644 "([\\d]{2}):([\\d]{2}):([\\d]{2})").search(da)
645 if gr is None:
646 return True
647 gr = gr.groups()
648 dau = datetime.datetime(
649 int(gr[2]),
650 flog_static.store_log_values["month_date"][gr[1].lower()],
651 int(gr[0]), int(gr[3]), int(gr[4]), int(gr[5]))
653 p = time.ctime(os.path.getmtime(path))
654 gr = re.compile(
655 "[\\w, ]* ([\\w]{3}) ([ \\d]{2}) ([\\d]{2}):([\\d]{2}):"
656 "([\\d]{2}) ([\\d]{4})").search(p)
657 if gr is None:
658 return True
659 gr = gr.groups()
660 da = datetime.datetime(
661 int(gr[5]),
662 flog_static.store_log_values["month_date"][gr[0].lower()],
663 int(gr[1]), int(gr[2]), int(gr[3]), int(gr[4]))
665 file = da
666 return dau > file
669def _check_url_file(url, path_download, outfile, fLOG=noLOG):
670 """If *url* is an url, download the file and return the downloaded
671 if it has already been downloaded, it is not downloaded again.
672 @param url url
673 @param path_download download the file here
674 @param outfile if None, the function will assign a filename unless this parameter is specified
675 @param fLOG logging function
676 @return the filename
677 """
678 urll = url.lower()
679 if "http://" in urll or "https://" in urll:
680 dest = outfile if outfile is not None else _get_file_url(
681 url, path_download)
682 down = False
683 nyet = dest + ".notyet"
685 if os.path.exists(dest) and not os.path.exists(nyet):
686 try: # pragma: no cover
687 fLOG("[loghelper.flog] trying to connect", url)
688 f1 = urllib_request.urlopen(url)
689 down = _first_more_recent(f1, dest)
690 newdate = down
691 f1.close()
692 except IOError: # pragma: no cover
693 fLOG(
694 "unable to connect Internet, working offline for url", url)
695 down = False
696 else:
697 down = True
698 newdate = False
700 if down:
701 if newdate:
702 fLOG( # pragma: no cover
703 "[loghelper.flog] downloading (updated) ", url)
704 else:
705 fLOG("[loghelper.flog] downloading ", url)
707 if (len(url) > 4 and
708 url[-4].lower() in [".txt", ".csv", ".tsv", ".log", '.tmpl']):
709 fLOG( # pragma: no cover
710 "[loghelper.flog] creating text file '{0}'".format(dest))
711 formatopen = "w" # pragma: no cover
712 else:
713 fLOG(
714 "[loghelper.flog] creating binary file '{0}'".format(dest))
715 formatopen = "wb"
717 if os.path.exists(nyet): # pragma: no cover
718 size = os.stat(dest).st_size
719 fLOG("[loghelper.flog] resume downloading (stop at",
720 size, ") from '{0}'".format(url))
721 request = urllib_request.Request(url)
722 request.add_header("Range", "bytes=%d-" % size)
723 fu = urllib_request.urlopen(request)
724 f = open(dest, formatopen.replace( # pylint: disable=W1501
725 "w", "a")) # pylint: disable=W1501
726 else:
727 fLOG("[loghelper.flog] downloading ", url)
728 request = urllib_request.Request(url)
729 fu = urllib_request.urlopen(url)
730 f = open(dest, formatopen)
732 open(nyet, "w").close()
733 c = fu.read(2 ** 21)
734 size = 0
735 while len(c) > 0:
736 size += len(c)
737 fLOG("[loghelper.flog] size", size)
738 f.write(c)
739 f.flush()
740 c = fu.read(2 ** 21)
741 fLOG("[loghelper.flog] end downloading")
742 f.close()
743 fu.close()
744 os.remove(nyet)
746 url = dest
747 return url
750def _check_source(fileurl, path_unzip, outfile, flatten=True, fLOG=noLOG):
751 """
752 Check the existence of a file, downloads it if not existing.
754 @param fileurl can be an url, a zip file, a text file
755 @param path_unzip if None, unzip the file where it stands, otherwise, put it in path
756 @param outfile if None, the function will assign a filename unless this parameter is specified
757 @param flatten extract all files into the same directory
758 @param fLOG logging function
759 @return a text file name
761 if it is:
762 - an url: download it and copy it into default_path
763 - a zipfile: beside the true file
764 - a text file: do nothing
766 If the file has already been downloaded and unzipped, it is not done twice.
767 """
768 if outfile is not None and os.path.splitext(
769 outfile)[1].lower() == os.path.splitext(fileurl)[1].lower():
770 file = _check_url_file(
771 fileurl, path_download=path_unzip, outfile=outfile, fLOG=fLOG)
772 return file
773 else:
774 file = _check_url_file(
775 fileurl, path_download=path_unzip, outfile=None, fLOG=fLOG)
776 txt = _check_zip_file(
777 file, path_unzip=path_unzip, outfile=outfile, fLOG=fLOG, flatten=flatten)
778 if not os.path.exists(txt): # pragma: no cover
779 message = "_check_source: unable to find file '" + \
780 txt + "' source '" + fileurl + "'"
781 raise PQHException(message)
782 return txt
785def get_prefix():
786 """
787 Returns a prefix for a file based on time.
788 """
789 typstr = str
790 t = datetime.datetime(2010, 1, 1).now()
791 t = typstr(t).replace(":", "_").replace("/", "_").replace(" ", "_")
792 t += "_" + typstr(random.randint(0, 1000000)) + "_"
793 return os.path.join(GetPath(), "temp_" + t)
796def removedirs(folder, silent=False, use_command_line=False):
797 """
798 Removes all files and folders in *folder*.
800 @param folder folder
801 @param silent silent mode or not
802 @param use_command_line see below
803 @return list of not remove files or folders
805 Sometimes it fails due to PermissionError exception,
806 in that case, you can try to remove the folder through the command
807 line ``rmdir /q /s + <folder>``. In that case, the function
808 does not return the list of removed files but the output of
809 the command line
810 """
811 if use_command_line:
812 if sys.platform.startswith("win"): # pragma: no cover
813 out, err = run_cmd("rmdir /s /q " + folder, wait=True)
814 else:
815 out, err = run_cmd("rm -Rf " + folder, wait=True)
816 if len(err) > 0: # pragma: no cover
817 raise Exception("Unable to remove '{0}'\n{1}".format(folder, err))
818 return out
820 file, rep = [], []
821 for r, d, f in os.walk(folder):
822 for a in d:
823 rep.append(os.path.join(r, a))
824 for a in f:
825 file.append(os.path.join(r, a))
826 impos = []
827 file.sort()
828 rep.sort(reverse=True)
829 for f in file:
830 try:
831 if os.path.exists(f):
832 os.remove(f)
833 except Exception as e: # pragma: no cover
834 typstr = str
835 fLOG(
836 "Unable to remove file '{0}' --- {1}".format(f, typstr(e).replace("\n", " ")))
837 if silent:
838 impos.append(f)
839 else:
840 raise
841 for f in rep:
842 try:
843 if os.path.exists(f):
844 os.removedirs(f)
845 except Exception as e: # pragma: no cover
846 typstr = str
847 fLOG(
848 "Unable to remove folder '{0}' --- {1}".format(f, typstr(e).replace("\n", " ")))
849 if silent:
850 impos.append(f)
851 else:
852 raise
854 if os.path.exists(folder):
855 try:
856 os.rmdir(folder)
857 except Exception as e: # pragma: no cover
858 impos.append(folder)
859 return impos
862def guess_type_value(x, none=None):
863 """
864 Guessees the type of a value.
866 @param x type
867 @param none if True and all values are empty, return None
868 @return type
870 @warning if an integer starts with a zero, then it is a string
871 """
872 try:
873 int(x)
874 if x[0] == '0' and len(x) > 1:
875 return str
876 return int if len(x) < 9 else str
877 except ValueError:
878 try:
879 x = float(x)
880 return float # pragma: no cover
881 except ValueError:
882 if none:
883 if x is None:
884 return None # pragma: no cover
885 try:
886 if len(x) > 0:
887 return str # pragma: no cover
888 return None
889 except Exception: # pragma: no cover
890 return None
891 return str
894def guess_type_value_type(none=True):
895 """
896 @param none if True and all values are empty, return None
897 @return the list of types recognized by guess_type_value
898 """
899 typstr = str
900 return [None, typstr, int, float] if none else [typstr, int, float]
903def get_default_value_type(ty, none=True):
904 """
905 @param ty type in guess_type_value_type
906 @param none if True and all values are empty, return None
907 @return a default value for this type
908 """
909 if ty is None and none:
910 return None
911 if ty == str:
912 return ""
913 if ty == int:
914 return 0
915 if ty == decimal.Decimal:
916 return decimal.Decimal(0)
917 if ty == float:
918 return 0.0
919 raise PQHException( # pragma: no cover
920 "type expected in " + str(guess_type_value_type()))
923def guess_type_list(args, tolerance=0.01, none=True):
924 """
925 guess the type of a list
926 @param args list
927 @param tolerance let's denote m as the frequency of the most representative type,
928 and m2 the second one, if m2 > m * tolerance --> str
929 @param none if True and all values are empty, return None
930 @return type, length (order of preference (int, float, str))
931 the parameter length has a meaning only for str result
932 """
933 defa = None if none else str
934 length = 0
935 typstr = str
936 if args in [typstr, float, int, None, decimal.Decimal]:
937 raise PQHException("this case is unexpected %s" %
938 typstr(args)) # pragma: no cover
940 if len(args) == 0:
941 res = defa
943 elif len(args) == 1:
944 res = guess_type_value(args[0], none)
945 if res == typstr:
946 length = len(args[0])
947 else:
948 count = {}
949 for x in args:
950 t = guess_type_value(x, none)
951 length = max(length, len(x))
952 if t in count:
953 count[t] += 1
954 else:
955 count[t] = 1
957 val = [(v, k) for k, v in count.items()]
958 val.sort(reverse=True)
959 if len(val) == 1:
960 res = val[0][1]
961 elif val[0][0] * tolerance < val[1][0]:
962 res = str
963 else:
964 res = val[0][1]
966 if res != typstr:
967 olength = 0
968 else:
969 if length > 0:
970 x = math.log(length) / math.log(2) + 0.99999
971 x = int(x)
972 olength = math.exp(x * math.log(2)) + 0.9999
973 olength = int(olength) * 2
974 else:
975 olength = length
977 return res, olength
980def guess_machine_parameter():
981 """
982 Determines many parameters on this machine:
983 - machine name
984 - user name
985 - domain...
986 @return dictionary { name : value }
987 """
988 val = ["COMPUTERNAME", "NUMBER_OF_PROCESSORS", "OS",
989 "PATH", "USERDOMAIN", "USERNAME", "USERPROFILE",
990 "windir", "TEMP", "USER"]
991 res = {}
992 sep = ";" if sys.platform.startswith("win") else ":"
993 for v in val:
994 if v == "PATH":
995 x = os.getenv(v)
996 x = x.split(sep)
997 res[v] = x
998 else:
999 res[v] = os.getenv(v)
1001 if not sys.platform.startswith("win"):
1002 if "TEMP" not in res or res["TEMP"] is None:
1003 res["TEMP"] = "/tmp"
1005 return res
1008def IsEmptyString(s):
1009 """
1010 Empty string or not?
1012 :param s: any string (str, None)
1013 :return: is it empty or not?
1014 :rtype: bool
1015 :raises PQHException: when a type is unexpected
1016 """
1017 if s is None:
1018 return True
1019 if isinstance(s, str):
1020 return len(s) == 0
1021 raise PQHException( # pragma: no cover
1022 "the type is unexpected {0}".format(type(s)))
1025def load_content_file_with_encoding(filename):
1026 """
1027 Tries different encoding to load a file, tries utf8, latin1 and None.
1028 @param filename filename
1029 @return couple (content, encoding)
1030 """
1031 error = None
1032 for enc in ["utf8", "latin1", None]:
1033 try:
1034 with open(filename, "r", encoding=enc) as f:
1035 content = f.read()
1036 return content, enc
1037 except Exception as e: # pragma: no cover
1038 error = e
1039 raise error # pragma: no cover