Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2@file
3@brief This extension contains various functionalities to help unittesting.
4"""
5import os
6import sys
7import glob
8import re
9import unittest
10import warnings
11from io import StringIO
12from .utils_tests_stringio import StringIOAndFile
13from .default_filter_warning import default_filter_warning
14from ..filehelper.synchelper import remove_folder
15from ..loghelper.flog import run_cmd, noLOG
18def get_test_file(filter, folder=None, no_subfolder=False, fLOG=noLOG, root=None):
19 """
20 Returns the list of test files.
22 @param folder path to look (or paths to look if it is a list)
23 @param filter only select files matching the pattern (ex: test*)
24 @param no_subfolder the function investigates the folder *folder* and does not try any subfolder in
25 ``{"_nrt", "_unittest", "_unittests"}``
26 @param fLOG logging function
27 @param root root or folder which contains the project,
28 rules applyong on folder name will not apply on it
29 @return a list of test files
30 """
31 if no_subfolder:
32 dirs = [folder]
33 expected = {}
34 else:
35 expected = {"_nrt", "_unittest", "_unittests"}
36 if folder is None:
37 path = os.path.split(__file__)[0]
38 dirs = [os.path.join(path, "..", "..", d) for d in expected]
39 elif isinstance(folder, str):
40 if not os.path.exists(folder):
41 raise FileNotFoundError(folder) # pragma: no cover
42 last = os.path.split(folder)[-1]
43 if last in expected:
44 dirs = [folder]
45 else:
46 dirs = [os.path.join(folder, d) for d in expected]
47 else:
48 dirs = folder
49 for d in dirs:
50 if not os.path.exists(d):
51 raise FileNotFoundError(d)
53 def simplify(folds):
54 if len(expected) == 0:
55 expe = {"_nrt", "_unittest", "_unittests"}
56 else:
57 expe = expected
58 res = []
59 for fold in folds:
60 nn = fold
61 nf = fold.replace("\\", "/").split('/')
62 for nr in expe:
63 if nr in nf:
64 i = nf.index(nr) - 1
65 nn = "/".join(nf[i:])
66 res.append(nn)
67 return res
69 copypaths = list(sys.path)
71 li = []
72 for fold in dirs:
73 if "__pycache__" in fold or "site-packages" in fold:
74 continue
75 if not os.path.exists(fold):
76 continue
77 if fold not in sys.path and fold != ".":
78 sys.path.append(fold)
79 content = glob.glob(fold + "/" + filter)
80 if filter != "temp_*":
81 if root is not None:
82 def remove_root(p):
83 if p.startswith(root):
84 return p[len(root):]
85 return p
86 couples = [(remove_root(il), il) for il in content]
87 else:
88 couples = [(il, il) for il in content]
90 content = []
91 for il, fu in couples:
92 if "test_" in il and ".py" in il and ".py.err" not in il and \
93 ".py.out" not in il and ".py.warn" not in il and \
94 "test_main" not in il and "temp_" not in il and \
95 "temp2_" not in il and ".pyo" not in il and \
96 "out.test_copyfile.py.2.txt" not in il and \
97 ".pyc" not in il and ".pyd" not in il and \
98 ".so" not in il and ".py~" not in il:
99 content.append(fu)
100 li.extend(content)
101 fLOG("[get_test_file], inspecting", simplify(dirs))
103 lid = glob.glob(fold + "/*")
104 for il in lid:
105 if os.path.isdir(il):
106 temp = get_test_file(
107 filter, il, no_subfolder=True, fLOG=fLOG, root=root)
108 temp = list(temp)
109 li.extend(temp)
111 # we restore sys.path
112 sys.path = copypaths
114 return li
117def get_estimation_time(file):
118 """
119 Return an estimation of the processing time,
120 it extracts the number in ``(time=5s)`` for example.
122 @param file filename
123 @return int
124 """
125 try:
126 f = open(file, "r", errors="ignore")
127 li = f.readlines()
128 f.close()
129 except Exception as e: # pragma: no cover
130 warnings.warn("Issue with '{0}'\n{1}\n{2}".format(
131 file, type(e), e), UserWarning)
132 return 10
133 try:
134 s = ''.join(li)
135 except Exception as e: # pragma: no cover
136 warnings.warn(
137 "Probably an enconding issue for file '{0}'\n{1}\n{2}".format(
138 file, type(e), e), UserWarning)
139 return 10
140 c = re.compile("[(]time=([0-9]+)s[)]").search(s)
141 if c is None:
142 return 0
143 return int(c.groups()[0])
146def import_files(li, additional_ut_path=None, fLOG=noLOG):
147 """
148 Runs all tests in file list ``li``.
150 @param li list of files (python scripts)
151 @param additional_ut_path additional paths to add when running the unit tests
152 @param fLOG logging function
153 @return list of tests [ ( testsuite, file) ]
154 """
155 allsuite = []
156 for le in li:
158 copypath = list(sys.path)
160 sdir = os.path.split(le)[0]
161 if sdir not in sys.path:
162 sys.path.append(sdir)
163 if additional_ut_path:
164 for p in additional_ut_path:
165 if isinstance(p, tuple):
166 if p[1]:
167 sys.path.insert(0, p[0])
168 else:
169 sys.path.append(p[0])
170 else:
171 sys.path.append(p)
172 tl = os.path.split(le)[1]
173 fi = tl.replace(".py", "")
174 if "test_do_not_include" in fi:
175 continue
177 try:
178 mo = __import__(fi)
179 except Exception as e: # pragma: no cover
180 raise ImportError(
181 "Unable to import '{}' due to {}.\nsys.path=\n{}".format(
182 fi, e, "\n".join(sys.path)))
184 # some tests can mess up with the import path
185 sys.path = copypath
187 cl = dir(mo)
188 for c in cl:
189 if len(c) < 5 or c[:4] != "Test":
190 continue
191 # test class c
192 testsuite = unittest.TestSuite()
193 loc = locals()
194 exec(
195 compile("di = dir (mo." + c + ")", "", "exec"), globals(), loc)
196 di = loc["di"]
197 for d in di:
198 if len(d) >= 6 and d[:5] == "_test":
199 raise RuntimeError( # pragma: no cover
200 "a function _test is still deactivated %s in %s" % (d, c))
201 if len(d) < 5 or d[:4] != "test":
202 continue
203 # method d.c
204 loc = locals()
205 code = "t = mo." + c + "(\"" + d + "\")"
206 cp = compile(code, "", "exec")
207 try:
208 exec(cp, globals(), loc)
209 except Exception as e: # pragma: no cover
210 raise Exception(
211 "Unable to execute code '{0}'".format(code)) from e
212 t = loc["t"]
213 testsuite.addTest(t)
215 allsuite.append((testsuite, le))
217 return allsuite
220def clean(folder=None, fLOG=noLOG):
221 """
222 Does the cleaning.
224 @param dir directory
225 @param fLOG logging function
226 """
227 # do not use SVN here just in case some files are not checked in.
228 for log_file in ["temp_hal_log.txt", "temp_hal_log2.txt",
229 "temp_hal_log_.txt", "temp_log.txt", "temp_log2.txt", ]:
230 li = get_test_file(log_file, folder=folder)
231 for el in li:
232 try:
233 if os.path.isfile(el):
234 os.remove(el)
235 except Exception as e: # pragma: no cover
236 fLOG("[clean] unable to remove file '{}' due to {}".format(
237 el, str(e).replace("\n", " ")))
239 li = get_test_file("temp_*")
240 for el in li:
241 try:
242 if os.path.isfile(el):
243 os.remove(el)
244 except Exception as e: # pragma: no cover
245 fLOG("[clean] unable to remove file '{}' due to {}".format(
246 el, str(e).replace("\n", " ")))
247 for el in li:
248 try:
249 if os.path.isdir(el):
250 remove_folder(el)
251 except Exception as e: # pragma: no cover
252 fLOG("[clean] unable to remove dir '{}' due to {}".format(
253 el, str(e).replace("\n", " ")))
256def main_run_test(runner, path_test=None, limit_max=1e9, log=False, skip=-1, skip_list=None,
257 on_stderr=False, processes=False, skip_function=None,
258 additional_ut_path=None, stdout=None, stderr=None, filter_warning=None,
259 fLOG=noLOG):
260 """
261 Runs all unit tests,
262 the function looks into the folder _unittest and extract from all files
263 beginning by `test_` all methods starting by `test_`.
264 Each files should mention an execution time.
265 Tests are sorted by increasing order.
267 @param runner unittest Runner
268 @param path_test path to look, if None, looks for defaults path related to this project
269 @param limit_max avoid running tests longer than limit seconds
270 @param log if True, enables intermediate files
271 @param skip if skip != -1, skip the first "skip" test files
272 @param skip_list skip unit test id in this list (by index, starting by 1)
273 @param skip_function *function(filename,content,duration) --> boolean* to skip a unit test
274 @param on_stderr if True, publish everything on stderr at the end
275 @param processes to run the unit test in a separate process (with function @see fn run_cmd),
276 however, to make that happen, you need to specify
277 ``exit=False`` for each test file, see `unittest.main
278 <https://docs.python.org/3/library/unittest.html#unittest.main>`_
279 @param additional_ut_path additional paths to add when running the unit tests
280 @param stdout if not None, use this stream instead of *sys.stdout*
281 @param stderr if not None, use this stream instead of *sys.stderr*
282 @param filter_warning function which removes some warnings in the final output,
283 if None, the function filters out some recurrent warnings
284 in jupyter (signature: ``def filter_warning(w: warning) -> bool``),
285 @see fn default_filter_warning
286 @param fLOG logging function
287 @return dictionnary: ``{ "err": err, "tests":list of couple (file, test results) }``
288 """
289 if os.environ.get('PYTHONPATH', '') == 'src':
290 full_src = os.path.abspath('src')
291 if not os.path.exists(full_src):
292 raise FileNotFoundError(
293 "Unable to interpret path %r - %r." % ('src', full_src))
294 os.environ['PYTHONPATH'] = full_src
295 if skip_list is None:
296 skip_list = set()
297 else:
298 skip_list = set(skip_list)
299 if filter_warning is None:
300 filter_warning = default_filter_warning
302 # checking that the module does not belong to the installed modules
303 if path_test is not None:
304 path_module = os.path.join(sys.executable, "Lib", "site-packages")
305 paths = [os.path.join(path_module, "src"), ]
306 for path in paths:
307 if os.path.exists(path):
308 raise FileExistsError( # pragma: no cover
309 "This path should not exist '{}'.".format(path))
311 def short_name(el):
312 cut = os.path.split(el)
313 cut = os.path.split(cut[0])[-1] + "/" + cut[-1]
314 return cut
316 # sort the test by increasing expected time
317 fLOG("[main_run_test] path_test %r" % path_test)
318 li = get_test_file("test*", folder=path_test, fLOG=fLOG, root=path_test)
319 if len(li) == 0:
320 raise FileNotFoundError( # pragma: no cover
321 "No test files in %r." % path_test)
322 est = [get_estimation_time(el) for el in li]
323 co = [(e, short_name(el), el) for e, el in zip(est, li)]
324 co.sort()
326 # we check we do not run twice the same file
327 done = {}
328 duplicate = []
329 for _, cut, lc in co:
330 if cut in done:
331 duplicate.append((cut, lc))
332 done[cut] = True
334 if len(duplicate) > 0: # pragma: no cover
335 s = list(set(duplicate))
336 s.sort()
337 mes = "\n".join(str(_) for _ in s)
338 raise Exception("Duplicated test files were detected:\n" + mes)
340 # check existing
341 if len(co) == 0:
342 raise FileNotFoundError( # pragma: no cover
343 "Unable to find any test files in '{0}'.".format(path_test))
345 if skip != -1:
346 fLOG("[main_run_test] found %d test files skipping." % len(co))
347 else:
348 fLOG("[main_run_test] found %d test files." % len(co))
350 # extract the test classes
351 cco = []
352 duration = {}
353 index = 0
354 for e, cut, l in co:
355 if e > limit_max:
356 continue # pragma: no cover
357 cco.append((e, l))
358 cut = os.path.split(l)
359 cut = os.path.split(cut[0])[-1] + "/" + cut[-1]
360 duration[cut] = e
361 index += 1
363 exp = re.compile("Ran ([0-9]+) tests? in ([.0-9]+)s")
365 # run the test
366 li = [a[1] for a in cco]
367 suite = import_files(li, additional_ut_path=additional_ut_path, fLOG=fLOG)
368 lis = [os.path.split(name)[-1] for _, name in suite]
369 keep = []
371 # redirect standard output, error
372 fLOG("[main_run_test] redirect stdout, stderr")
373 memo_stdout = sys.stdout
374 memout = sys.stdout if stdout is None else stdout
375 fail = 0
376 allwarn = []
378 memo_stderr = sys.stderr
379 memerr = sys.stderr if stderr is None else stderr
380 fullstderr = StringIO()
382 # displays
383 memout.write("[main_run_test] ---- JENKINS BEGIN UNIT TESTS ----")
384 memout.write(
385 "[main_run_test] ---- BEGIN UNIT TEST for '{0}'".format(path_test))
387 # display all tests
388 for i, s in enumerate(suite):
389 if skip >= 0 and i < skip:
390 continue # pragma: no cover
391 if i + 1 in skip_list:
392 continue # pragma: no cover
393 cut = os.path.split(s[1])
394 cut = os.path.split(cut[0])[-1] + "/" + cut[-1]
395 if skip_function is not None:
396 with open(s[1], "r") as f:
397 content = f.read()
398 if skip_function(s[1], content, duration.get(cut, None)):
399 continue
401 if cut not in duration:
402 raise Exception("[{0}] not found in\n{1}".format(
403 cut, "\n".join(sorted(duration.keys()))))
404 dur = duration[cut]
405 zzz = "\ntest % 3d (%04ds), %s" % (i + 1, dur, cut)
406 memout.write(zzz)
407 memout.write("\n")
409 # displays
410 memout.write("[main_run_test] ---- RUN UT\n")
411 memout.write(
412 "[main_run_test] ---- len(suite)=%d len(skip_list)=%d skip=%d\n" % (
413 len(suite), len(skip_list), skip))
414 original_stream = runner.stream.stream if isinstance(
415 runner.stream.stream, StringIOAndFile) else None
417 # run all tests
418 n_runs = 0
419 last_s = None
420 for i, s in enumerate(suite):
421 last_s = s
422 if skip >= 0 and i < skip:
423 continue # pragma: no cover
424 if i + 1 in skip_list:
425 continue # pragma: no cover
426 cut = os.path.split(s[1])
427 cut = os.path.split(cut[0])[-1] + "/" + cut[-1]
428 if skip_function is not None:
429 with open(s[1], "r") as f:
430 content = f.read()
431 if skip_function(s[1], content, duration.get(cut, None)):
432 continue
434 zzz = "running test % 3d, %s" % (i + 1, cut)
435 zzz += (60 - len(zzz)) * " "
436 memout.write(zzz)
438 # the errors are logged into a file just beside the test file
439 newstdr = StringIOAndFile(s[1] + ".err")
440 keepstdr = sys.stderr
441 sys.stderr = newstdr
442 list_warn = []
444 if processes:
445 cmd = sys.executable.replace("w.exe", ".exe") + " " + li[i]
446 out, err = run_cmd(cmd, wait=True)
447 if len(err) > 0:
448 sys.stderr.write(err) # pragma: no cover
449 else:
450 with warnings.catch_warnings(record=True) as w:
451 warnings.simplefilter("always")
452 if original_stream is not None:
453 original_stream.begin_test(s[1])
454 r = runner.run(s[0])
455 out = r.stream.getvalue()
456 if original_stream is not None:
457 original_stream.end_test(s[1])
458 for ww in w:
459 list_warn.append((ww, s))
460 n_runs += 1
462 ti = exp.findall(out)[-1]
463 # don't modify it, PyCharm does not get it right (ti is a tuple)
464 add = " ran %s tests in %ss" % ti
466 sys.stderr = keepstdr
468 memout.write(add)
470 if not r.wasSuccessful(): # pragma: no cover
471 err = out.split("===========")
472 err = err[-1]
473 memout.write("\n")
474 try:
475 memout.write(err)
476 except UnicodeDecodeError:
477 err_e = err.decode("ascii", errors="ignore")
478 memout.write(err_e)
479 except UnicodeEncodeError:
480 try:
481 err_e = err.encode("ascii", errors="ignore")
482 memout.write(err_e)
483 except TypeError:
484 err_e = err.encode("ascii", errors="ignore").decode(
485 'ascii', errors='ingore')
486 memout.write(err_e)
488 # stores the output in case of an error
489 with open(s[1] + ".err", "w", encoding="utf-8", errors="ignore") as f:
490 f.write(out)
492 fail += 1
494 fullstderr.write("\n#-----" + lis[i] + "\n")
495 fullstderr.write("OUT:\n")
496 fullstderr.write(out)
498 if err:
499 fullstderr.write("[pyqerror]o:\n")
500 try:
501 fullstderr.write(err)
502 except UnicodeDecodeError:
503 err_e = err.decode("ascii", errors="ignore")
504 fullstderr.write(err_e)
505 except UnicodeEncodeError:
506 err_e = err.encode("ascii", errors="ignore")
507 fullstderr.write(err_e)
509 list_warn = [(w, s) for w, s in list_warn if filter_warning(w)]
510 if len(list_warn) > 0:
511 fullstderr.write("*[pyqwarning]:\n")
512 warndone = set()
513 for w, slw in list_warn:
514 sw = str(slw)
515 if sw not in warndone:
516 # we display only one time the same warning
517 fullstderr.write("w{0}: {1}\n".format(i, sw))
518 warndone.add(sw)
519 serr = newstdr.getvalue()
520 if serr.strip(" \n\r\t"):
521 fullstderr.write("ERRs:\n")
522 fullstderr.write(serr)
523 else:
524 list_warn = [(w, s) for w, s in list_warn if filter_warning(w)]
525 allwarn.append((lis[i], list_warn))
526 val = newstdr.getvalue()
527 if val.strip(" \n\r\t"):
528 # Remove most of the Sphinx warnings (sphinx < 1.8)
529 lines = val.strip(" \n\r\t").split("\n")
530 lines = [
531 _ for _ in lines if _ and "is already registered, it will be overridden" not in _]
532 val = "\n".join(lines)
533 if len(val) > 0 and is_valid_error(val): # pragma: no cover
534 fullstderr.write("\n*-----" + lis[i] + "\n")
535 if len(list_warn) > 0:
536 fullstderr.write("[main_run_test] +WARN:\n")
537 for w, _ in list_warn:
538 fullstderr.write(
539 "[in:{2}] w{0}: {1}\n".format(i, str(w), cut))
540 if val.strip(" \n\r\t"):
541 fullstderr.write("[in:{0}] ERRv:\n".format(cut))
542 fullstderr.write(val)
544 memout.write("\n")
545 keep.append((last_s[1], r))
547 # displays
548 memout.write("[main_run_test] ---- END UT\n")
549 memout.write("[main_run_test] ---- JENKINS END UNIT TESTS ----\n")
550 if n_runs == 0:
551 raise RuntimeError( # pragma: no cover
552 "No unit tests was run.")
554 fLOG("[main_run_test] restore stdout, stderr")
556 # end, catch standard output and err
557 sys.stderr = memo_stderr
558 sys.stdout = memo_stdout
559 val = fullstderr.getvalue()
561 if len(val) > 0: # pragma: no cover
562 fLOG("[main_run_test] -- STDERR (from unittests) on STDOUT")
563 fLOG(val)
564 fLOG("[main_run_test] -- end STDERR on STDOUT")
566 if on_stderr:
567 memerr.write(
568 "[main_run_test] ##### STDERR (from unittests) #####\n")
569 memerr.write(val)
570 memerr.write("[main_run_test] ##### end STDERR #####\n")
572 if fail == 0:
573 clean(fLOG=fLOG)
575 fLOG("[main_run_test] printing warnings")
577 for fi, lw in allwarn:
578 if len(lw) > 0:
579 memout.write("[main_run_test] -WARN: {0}\n".format(fi))
580 wdone = {}
581 for i, (w, s) in enumerate(lw):
582 sw = str(w)
583 if sw in wdone:
584 continue
585 wdone[sw] = w
586 try:
587 sw = " w{0}: {1}\n".format(i, w)
588 except UnicodeEncodeError: # pragma: no cover
589 sw = " w{0}: Unable to convert a warnings of type {1} into a string (1)".format(
590 i, type(w))
591 try:
592 memout.write(sw)
593 except UnicodeEncodeError: # pragma: no cover
594 sw = " w{0}: Unable to convert a warnings of type {1} into a string (2)".format(
595 i, type(w))
596 memout.write(sw)
598 fLOG("[main_run_test] END of unit tests")
599 memout.write("[main_run_test] END of unit tests\n")
601 return dict(err=val, tests=keep)
604def is_valid_error(error):
605 """
606 Checks if the text written on stderr is an error or not,
607 a local server can push logs on this stream,
608 it looks for keywords such as ``Exception``,
609 ``Error``, ``TraceBack``...
611 @param error text
612 @return boolean
613 """
614 lines = error.split('\n')
615 lines = [
616 line for line in lines if "No module named 'numpy.core._multiarray_umath'" not in line]
617 error = "\n".join(lines)
618 keys = ["Exception", "Error", "TraceBack", "invalid", " line "]
619 error = error.lower()
620 for key in keys:
621 if key.lower() in error:
622 return True
623 return False
626def default_skip_function(name, code, duration):
627 """
628 Default skip function for function @see fn main_wrapper_tests.
630 @param name name of the test file
631 @param code code of the test file
632 @param duration estimated duration of the tests (specified in the file documentation)
633 @return True if skipped, False otherwise
634 """
635 if "test_SKIP_" in name or "test_LONG_" in name or "test_GUI_" in name:
636 return True
637 return False