Coverage for pyquickhelper/pycode/utils_tests_private.py: 90%
315 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-03 02:21 +0200
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-03 02:21 +0200
1"""
2@file
3@brief This extension contains various functionalities to help unittesting.
4"""
5import os
6import sys
7import glob
8import re
9import unittest
10import warnings
11from io import StringIO
12from .utils_tests_stringio import StringIOAndFile
13from .default_filter_warning import default_filter_warning
14from ..filehelper.synchelper import remove_folder
15from ..loghelper.flog import run_cmd, noLOG
18def get_test_file(filter, folder=None, no_subfolder=False, fLOG=noLOG, root=None):
19 """
20 Returns the list of test files.
22 @param folder path to look (or paths to look if it is a list)
23 @param filter only select files matching the pattern (ex: test*)
24 @param no_subfolder the function investigates the folder *folder* and does not try any subfolder in
25 ``{"_nrt", "_unittest", "_unittests"}``
26 @param fLOG logging function
27 @param root root or folder which contains the project,
28 rules applyong on folder name will not apply on it
29 @return a list of test files
30 """
31 if no_subfolder:
32 dirs = [folder]
33 expected = {}
34 else:
35 expected = {"_nrt", "_unittest", "_unittests"}
36 if folder is None:
37 path = os.path.split(__file__)[0]
38 dirs = [os.path.join(path, "..", "..", d) for d in expected]
39 elif isinstance(folder, str):
40 if not os.path.exists(folder):
41 raise FileNotFoundError(folder) # pragma: no cover
42 last = os.path.split(folder)[-1]
43 if last in expected:
44 dirs = [folder]
45 else:
46 dirs = [os.path.join(folder, d) for d in expected]
47 else:
48 dirs = folder
49 for d in dirs:
50 if not os.path.exists(d):
51 raise FileNotFoundError(d)
53 def simplify(folds):
54 if len(expected) == 0:
55 expe = {"_nrt", "_unittest", "_unittests"}
56 else:
57 expe = expected
58 res = []
59 for fold in folds:
60 nn = fold
61 nf = fold.replace("\\", "/").split('/')
62 for nr in expe:
63 if nr in nf:
64 i = nf.index(nr) - 1
65 nn = "/".join(nf[i:])
66 res.append(nn)
67 return res
69 copypaths = list(sys.path)
71 li = []
72 for fold in dirs:
73 if "__pycache__" in fold or "site-packages" in fold:
74 continue
75 if not os.path.exists(fold):
76 continue
77 if fold not in sys.path and fold != ".":
78 sys.path.append(fold)
79 content = glob.glob(fold + "/" + filter)
80 if filter != "temp_*":
81 if root is not None:
82 def remove_root(p):
83 if p.startswith(root):
84 return p[len(root):]
85 return p
86 couples = [(remove_root(il), il) for il in content]
87 else:
88 couples = [(il, il) for il in content]
90 content = []
91 for il, fu in couples:
92 if "test_" in il and ".py" in il and ".py.err" not in il and \
93 ".py.out" not in il and ".py.warn" not in il and \
94 "test_main" not in il and "temp_" not in il and \
95 "temp2_" not in il and ".pyo" not in il and \
96 "out.test_copyfile.py.2.txt" not in il and \
97 ".pyc" not in il and ".pyd" not in il and \
98 ".so" not in il and ".py~" not in il:
99 content.append(fu)
100 li.extend(content)
101 fLOG("[get_test_file], inspecting", simplify(dirs))
103 lid = glob.glob(fold + "/*")
104 for il in lid:
105 if os.path.isdir(il):
106 temp = get_test_file(
107 filter, il, no_subfolder=True, fLOG=fLOG, root=root)
108 temp = list(temp)
109 li.extend(temp)
111 # we restore sys.path
112 sys.path = copypaths
114 return li
117def get_estimation_time(file):
118 """
119 Return an estimation of the processing time,
120 it extracts the number in ``(time=5s)`` for example.
122 @param file filename
123 @return int
124 """
125 try:
126 f = open(file, "r", errors="ignore")
127 li = f.readlines()
128 f.close()
129 except Exception as e: # pragma: no cover
130 warnings.warn(f"Issue with '{file}'\n{type(e)}\n{e}", UserWarning)
131 return 10
132 try:
133 s = ''.join(li)
134 except Exception as e: # pragma: no cover
135 warnings.warn(
136 "Probably an enconding issue for file '{0}'\n{1}\n{2}".format(
137 file, type(e), e), UserWarning)
138 return 10
139 c = re.compile("[(]time=([0-9]+)s[)]").search(s)
140 if c is None:
141 return 0
142 return int(c.groups()[0])
145def import_files(li, additional_ut_path=None, fLOG=noLOG):
146 """
147 Runs all tests in file list ``li``.
149 @param li list of files (python scripts)
150 @param additional_ut_path additional paths to add when running the unit tests
151 @param fLOG logging function
152 @return list of tests [ ( testsuite, file) ]
153 """
154 allsuite = []
155 for le in li:
157 copypath = list(sys.path)
159 sdir = os.path.split(le)[0]
160 if sdir not in sys.path:
161 sys.path.append(sdir)
162 if additional_ut_path:
163 for p in additional_ut_path:
164 if isinstance(p, tuple):
165 if p[1]:
166 sys.path.insert(0, p[0])
167 else:
168 sys.path.append(p[0])
169 else:
170 sys.path.append(p)
171 tl = os.path.split(le)[1]
172 fi = tl.replace(".py", "")
173 if "test_do_not_include" in fi:
174 continue
176 try:
177 mo = __import__(fi)
178 except Exception as e: # pragma: no cover
179 raise ImportError(
180 "Unable to import '{}' due to {}.\nsys.path=\n{}".format(
181 fi, e, "\n".join(sys.path)))
183 # some tests can mess up with the import path
184 sys.path = copypath
186 cl = dir(mo)
187 for c in cl:
188 if len(c) < 5 or c[:4] != "Test":
189 continue
190 # test class c
191 testsuite = unittest.TestSuite()
192 loc = locals()
193 exec(
194 compile("di = dir (mo." + c + ")", "", "exec"), globals(), loc)
195 di = loc["di"]
196 for d in di:
197 if len(d) >= 6 and d[:5] == "_test":
198 raise RuntimeError( # pragma: no cover
199 f"a function _test is still deactivated {d} in {c}")
200 if len(d) < 5 or d[:4] != "test":
201 continue
202 # method d.c
203 loc = locals()
204 code = "t = mo." + c + "(\"" + d + "\")"
205 cp = compile(code, "", "exec")
206 try:
207 exec(cp, globals(), loc)
208 except Exception as e: # pragma: no cover
209 raise RuntimeError(
210 f"Unable to execute code '{code}'") from e
211 t = loc["t"]
212 testsuite.addTest(t)
214 allsuite.append((testsuite, le))
216 return allsuite
219def clean(folder=None, fLOG=noLOG):
220 """
221 Does the cleaning.
223 @param dir directory
224 @param fLOG logging function
225 """
226 # do not use SVN here just in case some files are not checked in.
227 for log_file in ["temp_hal_log.txt", "temp_hal_log2.txt",
228 "temp_hal_log_.txt", "temp_log.txt", "temp_log2.txt", ]:
229 li = get_test_file(log_file, folder=folder)
230 for el in li:
231 try:
232 if os.path.isfile(el):
233 os.remove(el)
234 except Exception as e: # pragma: no cover
235 fLOG("[clean] unable to remove file '{}' due to {}".format(
236 el, str(e).replace("\n", " ")))
238 li = get_test_file("temp_*")
239 for el in li:
240 try:
241 if os.path.isfile(el):
242 os.remove(el)
243 except Exception as e: # pragma: no cover
244 fLOG("[clean] unable to remove file '{}' due to {}".format(
245 el, str(e).replace("\n", " ")))
246 for el in li:
247 try:
248 if os.path.isdir(el):
249 remove_folder(el)
250 except Exception as e: # pragma: no cover
251 fLOG("[clean] unable to remove dir '{}' due to {}".format(
252 el, str(e).replace("\n", " ")))
255def main_run_test(runner, path_test=None, limit_max=1e9, log=False, skip=-1, skip_list=None,
256 on_stderr=False, processes=False, skip_function=None,
257 additional_ut_path=None, stdout=None, stderr=None, filter_warning=None,
258 fLOG=noLOG):
259 """
260 Runs all unit tests,
261 the function looks into the folder _unittest and extract from all files
262 beginning by `test_` all methods starting by `test_`.
263 Each files should mention an execution time.
264 Tests are sorted by increasing order.
266 @param runner unittest Runner
267 @param path_test path to look, if None, looks for defaults path related to this project
268 @param limit_max avoid running tests longer than limit seconds
269 @param log if True, enables intermediate files
270 @param skip if skip != -1, skip the first "skip" test files
271 @param skip_list skip unit test id in this list (by index, starting by 1)
272 @param skip_function *function(filename,content,duration) --> boolean* to skip a unit test
273 @param on_stderr if True, publish everything on stderr at the end
274 @param processes to run the unit test in a separate process (with function @see fn run_cmd),
275 however, to make that happen, you need to specify
276 ``exit=False`` for each test file, see `unittest.main
277 <https://docs.python.org/3/library/unittest.html#unittest.main>`_
278 @param additional_ut_path additional paths to add when running the unit tests
279 @param stdout if not None, use this stream instead of *sys.stdout*
280 @param stderr if not None, use this stream instead of *sys.stderr*
281 @param filter_warning function which removes some warnings in the final output,
282 if None, the function filters out some recurrent warnings
283 in jupyter (signature: ``def filter_warning(w: warning) -> bool``),
284 @see fn default_filter_warning
285 @param fLOG logging function
286 @return dictionnary: ``{ "err": err, "tests":list of couple (file, test results) }``
287 """
288 if os.environ.get('PYTHONPATH', '') == 'src':
289 full_src = os.path.abspath('src')
290 if not os.path.exists(full_src):
291 raise FileNotFoundError(
292 f"Unable to interpret path {'src'!r} - {full_src!r}.")
293 os.environ['PYTHONPATH'] = full_src
294 if skip_list is None:
295 skip_list = set()
296 else:
297 skip_list = set(skip_list)
298 if filter_warning is None:
299 filter_warning = default_filter_warning
301 # checking that the module does not belong to the installed modules
302 if path_test is not None:
303 path_module = os.path.join(sys.executable, "Lib", "site-packages")
304 paths = [os.path.join(path_module, "src"), ]
305 for path in paths:
306 if os.path.exists(path):
307 raise FileExistsError( # pragma: no cover
308 f"This path should not exist '{path}'.")
310 def short_name(el):
311 cut = os.path.split(el)
312 cut = os.path.split(cut[0])[-1] + "/" + cut[-1]
313 return cut
315 # sort the test by increasing expected time
316 fLOG(f"[main_run_test] path_test {path_test!r}")
317 li = get_test_file("test*", folder=path_test, fLOG=fLOG, root=path_test)
318 if len(li) == 0:
319 raise FileNotFoundError( # pragma: no cover
320 f"No test files in {path_test!r}.")
321 est = [get_estimation_time(el) for el in li]
322 co = [(e, short_name(el), el) for e, el in zip(est, li)]
323 co.sort()
325 # we check we do not run twice the same file
326 done = {}
327 duplicate = []
328 for _, cut, lc in co:
329 if cut in done:
330 duplicate.append((cut, lc))
331 done[cut] = True
333 if len(duplicate) > 0: # pragma: no cover
334 s = list(set(duplicate))
335 s.sort()
336 mes = "\n".join(str(_) for _ in s)
337 raise RuntimeError("Duplicated test files were detected:\n" + mes)
339 # check existing
340 if len(co) == 0:
341 raise FileNotFoundError( # pragma: no cover
342 f"Unable to find any test files in '{path_test}'.")
344 if skip != -1:
345 fLOG(f"[main_run_test] found {len(co)} test files skipping.")
346 else:
347 fLOG(f"[main_run_test] found {len(co)} test files.")
349 # extract the test classes
350 cco = []
351 duration = {}
352 index = 0
353 for e, cut, l in co:
354 if e > limit_max:
355 continue # pragma: no cover
356 cco.append((e, l))
357 cut = os.path.split(l)
358 cut = os.path.split(cut[0])[-1] + "/" + cut[-1]
359 duration[cut] = e
360 index += 1
362 exp = re.compile("Ran ([0-9]+) tests? in ([.0-9]+)s")
364 # run the test
365 li = [a[1] for a in cco]
366 suite = import_files(li, additional_ut_path=additional_ut_path, fLOG=fLOG)
367 lis = [os.path.split(name)[-1] for _, name in suite]
368 keep = []
370 # redirect standard output, error
371 fLOG("[main_run_test] redirect stdout, stderr")
372 memo_stdout = sys.stdout
373 memout = sys.stdout if stdout is None else stdout
374 fail = 0
375 allwarn = []
377 memo_stderr = sys.stderr
378 memerr = sys.stderr if stderr is None else stderr
379 fullstderr = StringIO()
381 # displays
382 memout.write("[main_run_test] ---- JENKINS BEGIN UNIT TESTS ----")
383 memout.write(f"[main_run_test] ---- BEGIN UNIT TEST for {path_test!r}")
385 # display all tests
386 for i, s in enumerate(suite):
387 if skip >= 0 and i < skip:
388 continue # pragma: no cover
389 if i + 1 in skip_list:
390 continue # pragma: no cover
391 cut = os.path.split(s[1])
392 cut = os.path.split(cut[0])[-1] + "/" + cut[-1]
393 if skip_function is not None:
394 with open(s[1], "r") as f:
395 content = f.read()
396 if skip_function(s[1], content, duration.get(cut, None)):
397 continue
399 if cut not in duration:
400 raise RuntimeError("[{0}] not found in\n{1}".format(
401 cut, "\n".join(sorted(duration.keys()))))
402 dur = duration[cut]
403 zzz = "\ntest % 3d (%04ds), %s" % (i + 1, dur, cut)
404 memout.write(zzz)
405 memout.write("\n")
407 # displays
408 memout.write("[main_run_test] ---- RUN UT\n")
409 memout.write(
410 "[main_run_test] ---- len(suite)=%d len(skip_list)=%d skip=%d\n" % (
411 len(suite), len(skip_list), skip))
412 original_stream = runner.stream.stream if isinstance(
413 runner.stream.stream, StringIOAndFile) else None
415 # run all tests
416 failed_test = {}
417 n_runs = 0
418 last_s = None
419 for i, s in enumerate(suite):
420 last_s = s
421 if skip >= 0 and i < skip:
422 continue # pragma: no cover
423 if i + 1 in skip_list:
424 continue # pragma: no cover
425 cut = os.path.split(s[1])
426 cut = os.path.split(cut[0])[-1] + "/" + cut[-1]
427 if skip_function is not None:
428 with open(s[1], "r") as f:
429 content = f.read()
430 if skip_function(s[1], content, duration.get(cut, None)):
431 continue
433 zzz = "running test % 3d, %s" % (i + 1, cut)
434 zzz += (60 - len(zzz)) * " "
435 memout.write(zzz)
437 # the errors are logged into a file just beside the test file
438 newstdr = StringIOAndFile(s[1] + ".err")
439 keepstdr = sys.stderr
440 sys.stderr = newstdr
441 list_warn = []
443 if processes:
444 cmd = sys.executable.replace("w.exe", ".exe") + " " + li[i]
445 out, err = run_cmd(cmd, wait=True)
446 if len(err) > 0:
447 sys.stderr.write(err) # pragma: no cover
448 else:
449 with warnings.catch_warnings(record=True) as w:
450 warnings.simplefilter("always")
451 if original_stream is not None:
452 original_stream.begin_test(s[1])
453 r = runner.run(s[0])
454 out = r.stream.getvalue()
455 if original_stream is not None:
456 original_stream.end_test(s[1])
457 for ww in w:
458 list_warn.append((ww, s))
459 n_runs += 1
461 ti = exp.findall(out)[-1]
462 # don't modify it, PyCharm does not get it right (ti is a tuple)
463 add = " ran %s tests in %ss" % ti
465 sys.stderr = keepstdr
467 memout.write(add)
469 if not r.wasSuccessful(): # pragma: no cover
470 err = out.split("===========")
471 err = err[-1]
472 memout.write("\n")
473 failed_test[cut] = err
474 try:
475 memout.write(err)
476 except UnicodeDecodeError:
477 err_e = err.decode("ascii", errors="ignore")
478 memout.write(err_e)
479 except UnicodeEncodeError:
480 try:
481 err_e = err.encode("ascii", errors="ignore")
482 memout.write(err_e)
483 except TypeError:
484 err_e = err.encode("ascii", errors="ignore").decode(
485 'ascii', errors='ingore')
486 memout.write(err_e)
488 # stores the output in case of an error
489 with open(s[1] + ".err", "w", encoding="utf-8", errors="ignore") as f:
490 f.write(out)
492 fail += 1
494 fullstderr.write("\n#-----" + lis[i] + "\n")
495 fullstderr.write("OUT:\n")
496 fullstderr.write(out)
498 if err:
499 fullstderr.write("[pyqerror]o:\n")
500 try:
501 fullstderr.write(err)
502 except UnicodeDecodeError:
503 err_e = err.decode("ascii", errors="ignore")
504 fullstderr.write(err_e)
505 except UnicodeEncodeError:
506 err_e = err.encode("ascii", errors="ignore")
507 fullstderr.write(err_e)
509 list_warn = [(w, s) for w, s in list_warn if filter_warning(w)]
510 if len(list_warn) > 0:
511 fullstderr.write("*[pyqwarning]:\n")
512 warndone = set()
513 for w, slw in list_warn:
514 sw = str(slw)
515 if sw not in warndone:
516 # we display only one time the same warning
517 fullstderr.write(f"w{i}: {sw}\n")
518 warndone.add(sw)
519 serr = newstdr.getvalue()
520 if serr.strip(" \n\r\t"):
521 fullstderr.write("ERRs:\n")
522 fullstderr.write(serr)
523 else:
524 list_warn = [(w, s) for w, s in list_warn if filter_warning(w)]
525 allwarn.append((lis[i], list_warn))
526 val = newstdr.getvalue()
527 if val.strip(" \n\r\t"):
528 # Remove most of the Sphinx warnings (sphinx < 1.8)
529 lines = val.strip(" \n\r\t").split("\n")
530 lines = [
531 _ for _ in lines if _ and "is already registered, it will be overridden" not in _]
532 val = "\n".join(lines)
533 if len(val) > 0 and is_valid_error(val): # pragma: no cover
534 fullstderr.write("\n*-----" + lis[i] + "\n")
535 if len(list_warn) > 0:
536 fullstderr.write("[main_run_test] +WARN:\n")
537 for w, _ in list_warn:
538 fullstderr.write(
539 f"[in:{cut}] w{i}: {str(w)}\n")
540 if val.strip(" \n\r\t"):
541 fullstderr.write(f"[in:{cut}] ERRv:\n")
542 fullstderr.write(val)
544 memout.write("\n")
545 keep.append((last_s[1], r))
547 # displays
548 memout.write("[main_run_test] ---- END UT\n")
549 memout.write("[main_run_test] ---- JENKINS END UNIT TESTS ----\n")
550 if n_runs == 0:
551 raise RuntimeError( # pragma: no cover
552 "No unit tests was run.")
554 fLOG("[main_run_test] restore stdout, stderr")
556 # end, catch standard output and err
557 sys.stderr = memo_stderr
558 sys.stdout = memo_stdout
559 val = fullstderr.getvalue()
561 if len(val) > 0: # pragma: no cover
562 fLOG("[main_run_test] -- STDERR (from unittests) on STDOUT")
563 fLOG(val)
564 fLOG("[main_run_test] -- end STDERR on STDOUT")
566 if on_stderr:
567 memerr.write(
568 "[main_run_test] ##### STDERR (from unittests) #####\n")
569 memerr.write(val)
570 memerr.write("[main_run_test] ##### end STDERR #####\n")
572 if fail == 0:
573 clean(fLOG=fLOG)
575 fLOG("[main_run_test] printing warnings")
577 for fi, lw in allwarn:
578 if len(lw) > 0:
579 memout.write(f"[main_run_test] -WARN: {fi}\n")
580 wdone = {}
581 for i, (w, s) in enumerate(lw):
582 sw = str(w)
583 if sw in wdone:
584 continue
585 wdone[sw] = w
586 try:
587 sw = f" w{i}: {w}\n"
588 except UnicodeEncodeError: # pragma: no cover
589 sw = " w{0}: Unable to convert a warnings of type {1} into a string (1)".format(
590 i, type(w))
591 try:
592 memout.write(sw)
593 except UnicodeEncodeError: # pragma: no cover
594 sw = " w{0}: Unable to convert a warnings of type {1} into a string (2)".format(
595 i, type(w))
596 memout.write(sw)
598 fLOG("[main_run_test] END of unit tests")
599 memout.write("[main_run_test] END of unit tests\n")
600 return dict(err=val, tests=keep, failed=failed_test)
603def is_valid_error(error):
604 """
605 Checks if the text written on stderr is an error or not,
606 a local server can push logs on this stream,
607 it looks for keywords such as ``Exception``,
608 ``Error``, ``TraceBack``...
610 @param error text
611 @return boolean
612 """
613 lines = error.strip("\n\r").replace("\r", "").split('\n')
614 if lines[0] == "--- Logging error ---":
615 return False
616 lines = [
617 line for line in lines if "No module named 'numpy.core._multiarray_umath'" not in line]
618 error = "\n".join(lines)
619 keys = ["Exception", "Error", "TraceBack", "invalid", " line "]
620 error = error.lower()
621 for key in keys:
622 if key.lower() in error:
623 return True
624 return False
627def default_skip_function(name, code, duration):
628 """
629 Default skip function for function @see fn main_wrapper_tests.
631 @param name name of the test file
632 @param code code of the test file
633 @param duration estimated duration of the tests (specified in the file documentation)
634 @return True if skipped, False otherwise
635 """
636 if "test_SKIP_" in name or "test_LONG_" in name or "test_GUI_" in name:
637 return True
638 return False