Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1""" 

2@file 

3@brief This extension contains various functionalities to help unittesting. 

4""" 

5import os 

6import sys 

7import glob 

8import re 

9import unittest 

10import warnings 

11from io import StringIO 

12from .utils_tests_stringio import StringIOAndFile 

13from .default_filter_warning import default_filter_warning 

14from ..filehelper.synchelper import remove_folder 

15from ..loghelper.flog import run_cmd, noLOG 

16 

17 

18def get_test_file(filter, folder=None, no_subfolder=False, fLOG=noLOG, root=None): 

19 """ 

20 Returns the list of test files. 

21 

22 @param folder path to look (or paths to look if it is a list) 

23 @param filter only select files matching the pattern (ex: test*) 

24 @param no_subfolder the function investigates the folder *folder* and does not try any subfolder in 

25 ``{"_nrt", "_unittest", "_unittests"}`` 

26 @param fLOG logging function 

27 @param root root or folder which contains the project, 

28 rules applyong on folder name will not apply on it 

29 @return a list of test files 

30 """ 

31 if no_subfolder: 

32 dirs = [folder] 

33 expected = {} 

34 else: 

35 expected = {"_nrt", "_unittest", "_unittests"} 

36 if folder is None: 

37 path = os.path.split(__file__)[0] 

38 dirs = [os.path.join(path, "..", "..", d) for d in expected] 

39 elif isinstance(folder, str): 

40 if not os.path.exists(folder): 

41 raise FileNotFoundError(folder) # pragma: no cover 

42 last = os.path.split(folder)[-1] 

43 if last in expected: 

44 dirs = [folder] 

45 else: 

46 dirs = [os.path.join(folder, d) for d in expected] 

47 else: 

48 dirs = folder 

49 for d in dirs: 

50 if not os.path.exists(d): 

51 raise FileNotFoundError(d) 

52 

53 def simplify(folds): 

54 if len(expected) == 0: 

55 expe = {"_nrt", "_unittest", "_unittests"} 

56 else: 

57 expe = expected 

58 res = [] 

59 for fold in folds: 

60 nn = fold 

61 nf = fold.replace("\\", "/").split('/') 

62 for nr in expe: 

63 if nr in nf: 

64 i = nf.index(nr) - 1 

65 nn = "/".join(nf[i:]) 

66 res.append(nn) 

67 return res 

68 

69 copypaths = list(sys.path) 

70 

71 li = [] 

72 for fold in dirs: 

73 if "__pycache__" in fold or "site-packages" in fold: 

74 continue 

75 if not os.path.exists(fold): 

76 continue 

77 if fold not in sys.path and fold != ".": 

78 sys.path.append(fold) 

79 content = glob.glob(fold + "/" + filter) 

80 if filter != "temp_*": 

81 if root is not None: 

82 def remove_root(p): 

83 if p.startswith(root): 

84 return p[len(root):] 

85 return p 

86 couples = [(remove_root(il), il) for il in content] 

87 else: 

88 couples = [(il, il) for il in content] 

89 

90 content = [] 

91 for il, fu in couples: 

92 if "test_" in il and ".py" in il and ".py.err" not in il and \ 

93 ".py.out" not in il and ".py.warn" not in il and \ 

94 "test_main" not in il and "temp_" not in il and \ 

95 "temp2_" not in il and ".pyo" not in il and \ 

96 "out.test_copyfile.py.2.txt" not in il and \ 

97 ".pyc" not in il and ".pyd" not in il and \ 

98 ".so" not in il and ".py~" not in il: 

99 content.append(fu) 

100 li.extend(content) 

101 fLOG("[get_test_file], inspecting", simplify(dirs)) 

102 

103 lid = glob.glob(fold + "/*") 

104 for il in lid: 

105 if os.path.isdir(il): 

106 temp = get_test_file( 

107 filter, il, no_subfolder=True, fLOG=fLOG, root=root) 

108 temp = list(temp) 

109 li.extend(temp) 

110 

111 # we restore sys.path 

112 sys.path = copypaths 

113 

114 return li 

115 

116 

117def get_estimation_time(file): 

118 """ 

119 Return an estimation of the processing time, 

120 it extracts the number in ``(time=5s)`` for example. 

121 

122 @param file filename 

123 @return int 

124 """ 

125 try: 

126 f = open(file, "r", errors="ignore") 

127 li = f.readlines() 

128 f.close() 

129 except Exception as e: # pragma: no cover 

130 warnings.warn("Issue with '{0}'\n{1}\n{2}".format( 

131 file, type(e), e), UserWarning) 

132 return 10 

133 try: 

134 s = ''.join(li) 

135 except Exception as e: # pragma: no cover 

136 warnings.warn( 

137 "Probably an enconding issue for file '{0}'\n{1}\n{2}".format( 

138 file, type(e), e), UserWarning) 

139 return 10 

140 c = re.compile("[(]time=([0-9]+)s[)]").search(s) 

141 if c is None: 

142 return 0 

143 return int(c.groups()[0]) 

144 

145 

146def import_files(li, additional_ut_path=None, fLOG=noLOG): 

147 """ 

148 Runs all tests in file list ``li``. 

149 

150 @param li list of files (python scripts) 

151 @param additional_ut_path additional paths to add when running the unit tests 

152 @param fLOG logging function 

153 @return list of tests [ ( testsuite, file) ] 

154 """ 

155 allsuite = [] 

156 for le in li: 

157 

158 copypath = list(sys.path) 

159 

160 sdir = os.path.split(le)[0] 

161 if sdir not in sys.path: 

162 sys.path.append(sdir) 

163 if additional_ut_path: 

164 for p in additional_ut_path: 

165 if isinstance(p, tuple): 

166 if p[1]: 

167 sys.path.insert(0, p[0]) 

168 else: 

169 sys.path.append(p[0]) 

170 else: 

171 sys.path.append(p) 

172 tl = os.path.split(le)[1] 

173 fi = tl.replace(".py", "") 

174 if "test_do_not_include" in fi: 

175 continue 

176 

177 try: 

178 mo = __import__(fi) 

179 except Exception as e: # pragma: no cover 

180 raise ImportError( 

181 "Unable to import '{}' due to {}.\nsys.path=\n{}".format( 

182 fi, e, "\n".join(sys.path))) 

183 

184 # some tests can mess up with the import path 

185 sys.path = copypath 

186 

187 cl = dir(mo) 

188 for c in cl: 

189 if len(c) < 5 or c[:4] != "Test": 

190 continue 

191 # test class c 

192 testsuite = unittest.TestSuite() 

193 loc = locals() 

194 exec( 

195 compile("di = dir (mo." + c + ")", "", "exec"), globals(), loc) 

196 di = loc["di"] 

197 for d in di: 

198 if len(d) >= 6 and d[:5] == "_test": 

199 raise RuntimeError( # pragma: no cover 

200 "a function _test is still deactivated %s in %s" % (d, c)) 

201 if len(d) < 5 or d[:4] != "test": 

202 continue 

203 # method d.c 

204 loc = locals() 

205 code = "t = mo." + c + "(\"" + d + "\")" 

206 cp = compile(code, "", "exec") 

207 try: 

208 exec(cp, globals(), loc) 

209 except Exception as e: # pragma: no cover 

210 raise Exception( 

211 "Unable to execute code '{0}'".format(code)) from e 

212 t = loc["t"] 

213 testsuite.addTest(t) 

214 

215 allsuite.append((testsuite, le)) 

216 

217 return allsuite 

218 

219 

220def clean(folder=None, fLOG=noLOG): 

221 """ 

222 Does the cleaning. 

223 

224 @param dir directory 

225 @param fLOG logging function 

226 """ 

227 # do not use SVN here just in case some files are not checked in. 

228 for log_file in ["temp_hal_log.txt", "temp_hal_log2.txt", 

229 "temp_hal_log_.txt", "temp_log.txt", "temp_log2.txt", ]: 

230 li = get_test_file(log_file, folder=folder) 

231 for el in li: 

232 try: 

233 if os.path.isfile(el): 

234 os.remove(el) 

235 except Exception as e: # pragma: no cover 

236 fLOG("[clean] unable to remove file '{}' due to {}".format( 

237 el, str(e).replace("\n", " "))) 

238 

239 li = get_test_file("temp_*") 

240 for el in li: 

241 try: 

242 if os.path.isfile(el): 

243 os.remove(el) 

244 except Exception as e: # pragma: no cover 

245 fLOG("[clean] unable to remove file '{}' due to {}".format( 

246 el, str(e).replace("\n", " "))) 

247 for el in li: 

248 try: 

249 if os.path.isdir(el): 

250 remove_folder(el) 

251 except Exception as e: # pragma: no cover 

252 fLOG("[clean] unable to remove dir '{}' due to {}".format( 

253 el, str(e).replace("\n", " "))) 

254 

255 

256def main_run_test(runner, path_test=None, limit_max=1e9, log=False, skip=-1, skip_list=None, 

257 on_stderr=False, processes=False, skip_function=None, 

258 additional_ut_path=None, stdout=None, stderr=None, filter_warning=None, 

259 fLOG=noLOG): 

260 """ 

261 Runs all unit tests, 

262 the function looks into the folder _unittest and extract from all files 

263 beginning by `test_` all methods starting by `test_`. 

264 Each files should mention an execution time. 

265 Tests are sorted by increasing order. 

266 

267 @param runner unittest Runner 

268 @param path_test path to look, if None, looks for defaults path related to this project 

269 @param limit_max avoid running tests longer than limit seconds 

270 @param log if True, enables intermediate files 

271 @param skip if skip != -1, skip the first "skip" test files 

272 @param skip_list skip unit test id in this list (by index, starting by 1) 

273 @param skip_function *function(filename,content,duration) --> boolean* to skip a unit test 

274 @param on_stderr if True, publish everything on stderr at the end 

275 @param processes to run the unit test in a separate process (with function @see fn run_cmd), 

276 however, to make that happen, you need to specify 

277 ``exit=False`` for each test file, see `unittest.main 

278 <https://docs.python.org/3/library/unittest.html#unittest.main>`_ 

279 @param additional_ut_path additional paths to add when running the unit tests 

280 @param stdout if not None, use this stream instead of *sys.stdout* 

281 @param stderr if not None, use this stream instead of *sys.stderr* 

282 @param filter_warning function which removes some warnings in the final output, 

283 if None, the function filters out some recurrent warnings 

284 in jupyter (signature: ``def filter_warning(w: warning) -> bool``), 

285 @see fn default_filter_warning 

286 @param fLOG logging function 

287 @return dictionnary: ``{ "err": err, "tests":list of couple (file, test results) }`` 

288 """ 

289 if os.environ.get('PYTHONPATH', '') == 'src': 

290 full_src = os.path.abspath('src') 

291 if not os.path.exists(full_src): 

292 raise FileNotFoundError( 

293 "Unable to interpret path %r - %r." % ('src', full_src)) 

294 os.environ['PYTHONPATH'] = full_src 

295 if skip_list is None: 

296 skip_list = set() 

297 else: 

298 skip_list = set(skip_list) 

299 if filter_warning is None: 

300 filter_warning = default_filter_warning 

301 

302 # checking that the module does not belong to the installed modules 

303 if path_test is not None: 

304 path_module = os.path.join(sys.executable, "Lib", "site-packages") 

305 paths = [os.path.join(path_module, "src"), ] 

306 for path in paths: 

307 if os.path.exists(path): 

308 raise FileExistsError( # pragma: no cover 

309 "This path should not exist '{}'.".format(path)) 

310 

311 def short_name(el): 

312 cut = os.path.split(el) 

313 cut = os.path.split(cut[0])[-1] + "/" + cut[-1] 

314 return cut 

315 

316 # sort the test by increasing expected time 

317 fLOG("[main_run_test] path_test %r" % path_test) 

318 li = get_test_file("test*", folder=path_test, fLOG=fLOG, root=path_test) 

319 if len(li) == 0: 

320 raise FileNotFoundError( # pragma: no cover 

321 "No test files in %r." % path_test) 

322 est = [get_estimation_time(el) for el in li] 

323 co = [(e, short_name(el), el) for e, el in zip(est, li)] 

324 co.sort() 

325 

326 # we check we do not run twice the same file 

327 done = {} 

328 duplicate = [] 

329 for _, cut, lc in co: 

330 if cut in done: 

331 duplicate.append((cut, lc)) 

332 done[cut] = True 

333 

334 if len(duplicate) > 0: # pragma: no cover 

335 s = list(set(duplicate)) 

336 s.sort() 

337 mes = "\n".join(str(_) for _ in s) 

338 raise Exception("Duplicated test files were detected:\n" + mes) 

339 

340 # check existing 

341 if len(co) == 0: 

342 raise FileNotFoundError( # pragma: no cover 

343 "Unable to find any test files in '{0}'.".format(path_test)) 

344 

345 if skip != -1: 

346 fLOG("[main_run_test] found %d test files skipping." % len(co)) 

347 else: 

348 fLOG("[main_run_test] found %d test files." % len(co)) 

349 

350 # extract the test classes 

351 cco = [] 

352 duration = {} 

353 index = 0 

354 for e, cut, l in co: 

355 if e > limit_max: 

356 continue # pragma: no cover 

357 cco.append((e, l)) 

358 cut = os.path.split(l) 

359 cut = os.path.split(cut[0])[-1] + "/" + cut[-1] 

360 duration[cut] = e 

361 index += 1 

362 

363 exp = re.compile("Ran ([0-9]+) tests? in ([.0-9]+)s") 

364 

365 # run the test 

366 li = [a[1] for a in cco] 

367 suite = import_files(li, additional_ut_path=additional_ut_path, fLOG=fLOG) 

368 lis = [os.path.split(name)[-1] for _, name in suite] 

369 keep = [] 

370 

371 # redirect standard output, error 

372 fLOG("[main_run_test] redirect stdout, stderr") 

373 memo_stdout = sys.stdout 

374 memout = sys.stdout if stdout is None else stdout 

375 fail = 0 

376 allwarn = [] 

377 

378 memo_stderr = sys.stderr 

379 memerr = sys.stderr if stderr is None else stderr 

380 fullstderr = StringIO() 

381 

382 # displays 

383 memout.write("[main_run_test] ---- JENKINS BEGIN UNIT TESTS ----") 

384 memout.write( 

385 "[main_run_test] ---- BEGIN UNIT TEST for '{0}'".format(path_test)) 

386 

387 # display all tests 

388 for i, s in enumerate(suite): 

389 if skip >= 0 and i < skip: 

390 continue # pragma: no cover 

391 if i + 1 in skip_list: 

392 continue # pragma: no cover 

393 cut = os.path.split(s[1]) 

394 cut = os.path.split(cut[0])[-1] + "/" + cut[-1] 

395 if skip_function is not None: 

396 with open(s[1], "r") as f: 

397 content = f.read() 

398 if skip_function(s[1], content, duration.get(cut, None)): 

399 continue 

400 

401 if cut not in duration: 

402 raise Exception("[{0}] not found in\n{1}".format( 

403 cut, "\n".join(sorted(duration.keys())))) 

404 dur = duration[cut] 

405 zzz = "\ntest % 3d (%04ds), %s" % (i + 1, dur, cut) 

406 memout.write(zzz) 

407 memout.write("\n") 

408 

409 # displays 

410 memout.write("[main_run_test] ---- RUN UT\n") 

411 memout.write( 

412 "[main_run_test] ---- len(suite)=%d len(skip_list)=%d skip=%d\n" % ( 

413 len(suite), len(skip_list), skip)) 

414 original_stream = runner.stream.stream if isinstance( 

415 runner.stream.stream, StringIOAndFile) else None 

416 

417 # run all tests 

418 n_runs = 0 

419 last_s = None 

420 for i, s in enumerate(suite): 

421 last_s = s 

422 if skip >= 0 and i < skip: 

423 continue # pragma: no cover 

424 if i + 1 in skip_list: 

425 continue # pragma: no cover 

426 cut = os.path.split(s[1]) 

427 cut = os.path.split(cut[0])[-1] + "/" + cut[-1] 

428 if skip_function is not None: 

429 with open(s[1], "r") as f: 

430 content = f.read() 

431 if skip_function(s[1], content, duration.get(cut, None)): 

432 continue 

433 

434 zzz = "running test % 3d, %s" % (i + 1, cut) 

435 zzz += (60 - len(zzz)) * " " 

436 memout.write(zzz) 

437 

438 # the errors are logged into a file just beside the test file 

439 newstdr = StringIOAndFile(s[1] + ".err") 

440 keepstdr = sys.stderr 

441 sys.stderr = newstdr 

442 list_warn = [] 

443 

444 if processes: 

445 cmd = sys.executable.replace("w.exe", ".exe") + " " + li[i] 

446 out, err = run_cmd(cmd, wait=True) 

447 if len(err) > 0: 

448 sys.stderr.write(err) # pragma: no cover 

449 else: 

450 with warnings.catch_warnings(record=True) as w: 

451 warnings.simplefilter("always") 

452 if original_stream is not None: 

453 original_stream.begin_test(s[1]) 

454 r = runner.run(s[0]) 

455 out = r.stream.getvalue() 

456 if original_stream is not None: 

457 original_stream.end_test(s[1]) 

458 for ww in w: 

459 list_warn.append((ww, s)) 

460 n_runs += 1 

461 

462 ti = exp.findall(out)[-1] 

463 # don't modify it, PyCharm does not get it right (ti is a tuple) 

464 add = " ran %s tests in %ss" % ti 

465 

466 sys.stderr = keepstdr 

467 

468 memout.write(add) 

469 

470 if not r.wasSuccessful(): # pragma: no cover 

471 err = out.split("===========") 

472 err = err[-1] 

473 memout.write("\n") 

474 try: 

475 memout.write(err) 

476 except UnicodeDecodeError: 

477 err_e = err.decode("ascii", errors="ignore") 

478 memout.write(err_e) 

479 except UnicodeEncodeError: 

480 try: 

481 err_e = err.encode("ascii", errors="ignore") 

482 memout.write(err_e) 

483 except TypeError: 

484 err_e = err.encode("ascii", errors="ignore").decode( 

485 'ascii', errors='ingore') 

486 memout.write(err_e) 

487 

488 # stores the output in case of an error 

489 with open(s[1] + ".err", "w", encoding="utf-8", errors="ignore") as f: 

490 f.write(out) 

491 

492 fail += 1 

493 

494 fullstderr.write("\n#-----" + lis[i] + "\n") 

495 fullstderr.write("OUT:\n") 

496 fullstderr.write(out) 

497 

498 if err: 

499 fullstderr.write("[pyqerror]o:\n") 

500 try: 

501 fullstderr.write(err) 

502 except UnicodeDecodeError: 

503 err_e = err.decode("ascii", errors="ignore") 

504 fullstderr.write(err_e) 

505 except UnicodeEncodeError: 

506 err_e = err.encode("ascii", errors="ignore") 

507 fullstderr.write(err_e) 

508 

509 list_warn = [(w, s) for w, s in list_warn if filter_warning(w)] 

510 if len(list_warn) > 0: 

511 fullstderr.write("*[pyqwarning]:\n") 

512 warndone = set() 

513 for w, slw in list_warn: 

514 sw = str(slw) 

515 if sw not in warndone: 

516 # we display only one time the same warning 

517 fullstderr.write("w{0}: {1}\n".format(i, sw)) 

518 warndone.add(sw) 

519 serr = newstdr.getvalue() 

520 if serr.strip(" \n\r\t"): 

521 fullstderr.write("ERRs:\n") 

522 fullstderr.write(serr) 

523 else: 

524 list_warn = [(w, s) for w, s in list_warn if filter_warning(w)] 

525 allwarn.append((lis[i], list_warn)) 

526 val = newstdr.getvalue() 

527 if val.strip(" \n\r\t"): 

528 # Remove most of the Sphinx warnings (sphinx < 1.8) 

529 lines = val.strip(" \n\r\t").split("\n") 

530 lines = [ 

531 _ for _ in lines if _ and "is already registered, it will be overridden" not in _] 

532 val = "\n".join(lines) 

533 if len(val) > 0 and is_valid_error(val): # pragma: no cover 

534 fullstderr.write("\n*-----" + lis[i] + "\n") 

535 if len(list_warn) > 0: 

536 fullstderr.write("[main_run_test] +WARN:\n") 

537 for w, _ in list_warn: 

538 fullstderr.write( 

539 "[in:{2}] w{0}: {1}\n".format(i, str(w), cut)) 

540 if val.strip(" \n\r\t"): 

541 fullstderr.write("[in:{0}] ERRv:\n".format(cut)) 

542 fullstderr.write(val) 

543 

544 memout.write("\n") 

545 keep.append((last_s[1], r)) 

546 

547 # displays 

548 memout.write("[main_run_test] ---- END UT\n") 

549 memout.write("[main_run_test] ---- JENKINS END UNIT TESTS ----\n") 

550 if n_runs == 0: 

551 raise RuntimeError( # pragma: no cover 

552 "No unit tests was run.") 

553 

554 fLOG("[main_run_test] restore stdout, stderr") 

555 

556 # end, catch standard output and err 

557 sys.stderr = memo_stderr 

558 sys.stdout = memo_stdout 

559 val = fullstderr.getvalue() 

560 

561 if len(val) > 0: # pragma: no cover 

562 fLOG("[main_run_test] -- STDERR (from unittests) on STDOUT") 

563 fLOG(val) 

564 fLOG("[main_run_test] -- end STDERR on STDOUT") 

565 

566 if on_stderr: 

567 memerr.write( 

568 "[main_run_test] ##### STDERR (from unittests) #####\n") 

569 memerr.write(val) 

570 memerr.write("[main_run_test] ##### end STDERR #####\n") 

571 

572 if fail == 0: 

573 clean(fLOG=fLOG) 

574 

575 fLOG("[main_run_test] printing warnings") 

576 

577 for fi, lw in allwarn: 

578 if len(lw) > 0: 

579 memout.write("[main_run_test] -WARN: {0}\n".format(fi)) 

580 wdone = {} 

581 for i, (w, s) in enumerate(lw): 

582 sw = str(w) 

583 if sw in wdone: 

584 continue 

585 wdone[sw] = w 

586 try: 

587 sw = " w{0}: {1}\n".format(i, w) 

588 except UnicodeEncodeError: # pragma: no cover 

589 sw = " w{0}: Unable to convert a warnings of type {1} into a string (1)".format( 

590 i, type(w)) 

591 try: 

592 memout.write(sw) 

593 except UnicodeEncodeError: # pragma: no cover 

594 sw = " w{0}: Unable to convert a warnings of type {1} into a string (2)".format( 

595 i, type(w)) 

596 memout.write(sw) 

597 

598 fLOG("[main_run_test] END of unit tests") 

599 memout.write("[main_run_test] END of unit tests\n") 

600 

601 return dict(err=val, tests=keep) 

602 

603 

604def is_valid_error(error): 

605 """ 

606 Checks if the text written on stderr is an error or not, 

607 a local server can push logs on this stream, 

608 it looks for keywords such as ``Exception``, 

609 ``Error``, ``TraceBack``... 

610 

611 @param error text 

612 @return boolean 

613 """ 

614 lines = error.split('\n') 

615 lines = [ 

616 line for line in lines if "No module named 'numpy.core._multiarray_umath'" not in line] 

617 error = "\n".join(lines) 

618 keys = ["Exception", "Error", "TraceBack", "invalid", " line "] 

619 error = error.lower() 

620 for key in keys: 

621 if key.lower() in error: 

622 return True 

623 return False 

624 

625 

626def default_skip_function(name, code, duration): 

627 """ 

628 Default skip function for function @see fn main_wrapper_tests. 

629 

630 @param name name of the test file 

631 @param code code of the test file 

632 @param duration estimated duration of the tests (specified in the file documentation) 

633 @return True if skipped, False otherwise 

634 """ 

635 if "test_SKIP_" in name or "test_LONG_" in name or "test_GUI_" in name: 

636 return True 

637 return False