Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1""" 

2@file 

3@brief This extension contains various functionalities to help unittesting. 

4""" 

5import os 

6import sys 

7import unittest 

8from datetime import datetime 

9import warnings 

10import sqlite3 

11from .code_exceptions import ( 

12 CoverageException, SetupHookException) 

13from .coverage_helper import ( 

14 publish_coverage_on_codecov, find_coverage_report, 

15 coverage_combine) 

16from .utils_tests_stringio import StringIOAndFile 

17 

18 

19class TestWrappedException(Exception): 

20 "Raised by @see fn main_wrapper_tests" 

21 pass 

22 

23 

24def _modifies_coverage_report(name, bsrcp, bproj): 

25 conn = sqlite3.connect(name) 

26 sql = [] 

27 for row in conn.execute("select * from file"): 

28 name = row[1] 

29 for b in bsrcp: 

30 name = name.replace(b, bproj) 

31 name = name.replace('\\', '/') 

32 s = "UPDATE file SET path='{}' WHERE id={};".format(name, row[0]) 

33 sql.append(s) 

34 

35 c = conn.cursor() 

36 for s in sql: 

37 c.execute(s) 

38 conn.commit() 

39 conn.close() 

40 

41 

42def main_wrapper_tests(logfile, skip_list=None, processes=False, add_coverage=False, 

43 report_folder=None, skip_function=None, setup_params=None, 

44 only_setup_hook=False, coverage_options=None, 

45 coverage_exclude_lines=None, additional_ut_path=None, 

46 covtoken=None, hook_print=True, stdout=None, stderr=None, 

47 filter_warning=None, dump_coverage=None, 

48 add_coverage_folder=None, coverage_root="src", fLOG=None): 

49 """ 

50 Calls function :func:`main <pyquickhelper.unittests.utils_tests.main>` 

51 and throws an exception if it fails. 

52 

53 @param logfile locatio of a logfile 

54 @param skip_list to skip a list of unit tests (by index, starting by 1) 

55 @param processes to run the unit test in a separate process (with function @see fn run_cmd), 

56 however, to make that happen, you need to specify 

57 ``exit=False`` for each test file, see `unittest.main 

58 <https://docs.python.org/3/library/unittest.html#unittest.main>`_ 

59 @param add_coverage (bool) run the unit tests and measure the coverage at the same time 

60 @param report_folder (str) folder where the coverage report will be stored 

61 @param skip_function *function(filename,content,duration) --> boolean* to skip a unit test 

62 @param setup_params parameters sent to @see fn call_setup_hook 

63 @param only_setup_hook calls only @see fn call_setup_hook, do not run the unit test 

64 @param coverage_options (dict) options for module coverage as a dictionary, see below, default is None 

65 @param coverage_exclude_lines (list) options for module coverage, lines to exclude from the coverage report, defaul is None 

66 @param additional_ut_path (list) additional paths to add when running the unit tests 

67 @param covtoken (str|tuple(str, str)) token used when publishing coverage report to `codecov <https://codecov.io/>`_ 

68 or None to not publish 

69 @param hook_print enable print display when calling *_setup_hook* 

70 @param stdout if not None, write output on this stream instead of *sys.stdout* 

71 @param stderr if not None, write errors on this stream instead of *sys.stderr* 

72 @param filter_warning function which removes some warnings in the final output, 

73 if None, the function filters out some recurrent warnings 

74 in jupyter (signature: ``def filter_warning(w: warning) -> bool``), 

75 @see fn default_filter_warning 

76 @param dump_coverage dump or copy the coverage at this location 

77 @param add_coverage_folder additional coverage folder reports 

78 @param coverage_root subfolder for the coverage 

79 @param fLOG ``function(*l, **p)``, logging function 

80 

81 *covtoken* can be a string ``<token>`` or a 

82 tuple ``(<token>, <condition>)``. The condition is evaluated 

83 by the python interpreter and determines whether or not the coverage 

84 needs to be published. 

85 

86 .. faqref:: 

87 :title: How to build pyquickhelper with Jenkins? 

88 :index: Jenkins 

89 

90 :epkg:`Jenkins` is a task scheduler for continuous integration. 

91 You can easily schedule batch command to build and run unit tests for a specific project. 

92 To build pyquickhelper, you need to install :epkg:`python`, 

93 :epkg:`pymyinstall`, 

94 :epkg:`miktex`, :epkg:`pandoc`, 

95 :epkg:`sphinx`. 

96 

97 Once Jenkins is installed, the command to schedule is:: 

98 

99 set PATH=%PATH%;%USERPOFILE%\\AppData\\Local\\Pandoc 

100 build_setup_help_on_windows.bat 

101 

102 This works if you installed Jenkins with your credentials. 

103 Otherwise, the path to ``pandoc.exe`` needs to be changed. 

104 And you can also read `Schedule builds with Jenkins 

105 <http://www.xavierdupre.fr/blog/2014-12-06_nojs.html>`_. 

106 :epkg:`node.js` might be required if a notebooks contain javascript. 

107 

108 Parameters *add_coverage* and *report_folder* are used to compute the coverage 

109 using the module `coverage <http://nedbatchelder.com/code/coverage/>`_. 

110 The function does something about the following error: 

111 

112 :: 

113 

114 _tkinter.TclError: no display name and no $DISPLAY environment variable 

115 

116 It is due to :epkg:`matplotlib`. 

117 See `Generating matplotlib graphs without a running X server 

118 <http://stackoverflow.com/questions/4931376/ 

119 generating-matplotlib-graphs-without-a-running-x-server>`_. 

120 If the skip function is None, it will replace it by 

121 the function @see fn default_skip_function. 

122 Calls function @see fn _setup_hook if it is available 

123 in the unit tested module. Parameter *tested_module* was added, 

124 the function then checks the presence of 

125 function @see fn _setup_hook, it is the case, it runs it. 

126 

127 Parameter *setup_params*: a mechanism was put in place 

128 to let the module to test a possibility to run some preprocessing steps 

129 in a separate process. They are described in @see fn _setup_hook 

130 which must be found in the main file ``__init__.py``. 

131 Parameter *only_setup_hook*: 

132 saves the report in XML format, binary format, 

133 replace full paths by relative path. 

134 

135 Parameters *coverage_options*, *coverage_exclude_lines*, *additional_ut_path*: 

136 see class `Coverage <https://coverage.readthedocs.io/en/coverage-5.5/api_coverage.html>`_ 

137 and `Configuration files <https://coverage.readthedocs.io/en/coverage-4.0b1/config.html>`_ 

138 to specify those options. If both values are left to None, this function will 

139 compute the code coverage for all files in this module. The function 

140 now exports the coverage options which were used. 

141 For example, to exclude files from the coverage report:: 

142 

143 coverage_options=dict(omit=["*exclude*.py"]) 

144 

145 Parameter *covtoken*: used to post the coverage report to 

146 `codecov <https://codecov.io/>`_. 

147 """ 

148 # delayed import 

149 from ..loghelper.os_helper import get_user 

150 

151 if skip_function is None: # pragma: no cover 

152 from .utils_tests_private import default_skip_function 

153 skip_function = default_skip_function 

154 

155 if fLOG is None: # pragma: no cover 

156 from ..loghelper.flog import noLOG 

157 fLOG = noLOG 

158 

159 whole_ouput = StringIOAndFile(logfile) 

160 runner = unittest.TextTestRunner(verbosity=0, stream=whole_ouput) 

161 path = os.path.abspath(os.path.join(os.path.split(logfile)[0])) 

162 stdout_this = stdout if stdout else sys.stdout 

163 datetime_begin = datetime.now() 

164 

165 def _find_source(fold): # pragma: no cover 

166 fold0 = fold 

167 exists = os.path.exists(os.path.join(fold, ".gitignore")) 

168 while not exists: 

169 if len(fold) < 2: 

170 raise FileNotFoundError( 

171 "Unable to guess source from '{0}'.".format(fold0)) 

172 fold = os.path.split(fold)[0] 

173 exists = os.path.exists(os.path.join(fold, ".gitignore")) 

174 return os.path.normpath(os.path.abspath(fold)) 

175 

176 def run_main(): 

177 # delayed import to speed up import of pycode 

178 from .utils_tests_private import main_run_test 

179 res = main_run_test( 

180 runner, path_test=path, skip=-1, skip_list=skip_list, 

181 processes=processes, skip_function=skip_function, 

182 additional_ut_path=additional_ut_path, stdout=stdout, stderr=stderr, 

183 filter_warning=filter_warning, fLOG=fLOG) 

184 return res 

185 

186 if "win" not in sys.platform and "DISPLAY" not in os.environ: 

187 # issue detected with travis 

188 # _tkinter.TclError: no display name and 

189 # no $DISPLAY environment variable 

190 # os.environ["DISPLAY"] = "localhost:0" 

191 pass 

192 

193 # other coverage reports 

194 if (add_coverage_folder is not None and 

195 dump_coverage is not None): # pragma: no cover 

196 sub = os.path.split(dump_coverage)[0] 

197 sub = os.path.split(sub)[-1] 

198 other_cov_folders = find_coverage_report( 

199 add_coverage_folder, exclude=sub) 

200 mes = "[main_wrapper_tests] other_cov_folders...sub='{0}'".format(sub) 

201 stdout_this.write(mes + "\n") 

202 for k, v in sorted(other_cov_folders.items()): 

203 mes = "[main_wrapper_tests] k='{0}' v={1}".format(k, v) 

204 stdout_this.write(mes + "\n") 

205 if len(other_cov_folders) == 0: 

206 other_cov_folders = None 

207 else: 

208 other_cov_folders = None 

209 

210 # to deal with: _tkinter.TclError: no display name and no $DISPLAY 

211 # environment variable 

212 from .tkinter_helper import fix_tkinter_issues_virtualenv, _first_execution 

213 fLOG("[main_wrapper_tests] MODULES (1): matplotlib already imported", 

214 "matplotlib" in sys.modules, "first execution", _first_execution) 

215 r = fix_tkinter_issues_virtualenv(fLOG=fLOG) 

216 fLOG("[main_wrapper_tests] MODULES (2): matplotlib imported", 

217 "matplotlib" in sys.modules, "first execution", _first_execution) 

218 fLOG("[main_wrapper_tests] fix_tkinter_issues_virtualenv", r) 

219 

220 def tested_module(folder, project_var_name, setup_params): 

221 # module mod 

222 # delayed import 

223 from .call_setup_hook import call_setup_hook 

224 if setup_params is None: 

225 setup_params = {} 

226 out, err = call_setup_hook( 

227 folder, project_var_name, fLOG=fLOG, use_print=hook_print, **setup_params) 

228 if len(err) > 0 and err != "no _setup_hook": # pragma: no cover 

229 # fix introduced because pip 8.0 displays annoying warnings 

230 # RuntimeWarning: Config variable 'Py_DEBUG' is unset, Python ABI tag may be incorrect 

231 # RuntimeWarning: Config variable 'WITH_PYMALLOC' is unset, Python 

232 # ABI tag may be incorrect 

233 lines = err.split("\n") 

234 keep = [] 

235 for line in lines: 

236 line = line.rstrip("\r\t ") 

237 if (line and not line.startswith(" ") and 

238 "RuntimeWarning: Config variable" not in line): 

239 keep.append(line) 

240 if len(keep) > 0: 

241 raise SetupHookException( 

242 "Unable to run _setup_hook\n**OUT:\n{0}\n**[pyqerror]" 

243 "\n{1}\n**FOLDER:\n{2}\n**NAME:\n{3}\n**KEEP:\n{4}\n**" 

244 "".format(out, err, folder, project_var_name, 

245 "\n".join(keep))) 

246 out += "\nWARNINGS:\n" + err 

247 err = None 

248 return out, err 

249 

250 # project_var_name 

251 folder = os.path.normpath( 

252 os.path.join(os.path.dirname(logfile), "..", "src")) 

253 if not os.path.exists(folder): 

254 folder = os.path.normpath( 

255 os.path.join(os.path.dirname(logfile), "..")) 

256 if not os.path.exists(folder): 

257 raise FileNotFoundError(folder) # pragma: no cover 

258 

259 def selec_name(folder, name): 

260 if name.startswith('_') or name.startswith('.'): 

261 return False 

262 if name in ('bin', 'dist', 'build'): 

263 return False # pragma: no cover 

264 if '.egg' in name or 'dist_module27' in name: 

265 return False 

266 fold = os.path.join(folder, name) 

267 if not os.path.isdir(fold): 

268 return False 

269 init = os.path.join(fold, '__init__.py') 

270 if not os.path.exists(init): 

271 return False # pragma: no cover 

272 return True 

273 

274 content = [_ for _ in os.listdir(folder) if selec_name(folder, _)] 

275 if len(content) != 1: 

276 raise FileNotFoundError( # pragma: no cover 

277 "Unable to guess the project name in '{0}', content=\n{1}\n---" 

278 "\n{2}\n---".format(folder, "\n".join(content), 

279 "\n".join(os.listdir(folder)))) 

280 

281 project_var_name = content[0] 

282 src_abs = os.path.normpath(os.path.abspath( 

283 os.path.join(os.path.dirname(logfile), ".."))) 

284 

285 root_src = os.path.join(src_abs, "src", project_var_name) 

286 if not os.path.exists(root_src): 

287 root_src = os.path.join(src_abs, project_var_name) 

288 if not os.path.exists(root_src): 

289 raise FileNotFoundError( # pragma: no cover 

290 "Unable to find '{}'.".format(root_src)) 

291 srcp = os.path.relpath(root_src, os.getcwd()) 

292 

293 if get_user() in srcp: 

294 raise FileNotFoundError( # pragma: no cover 

295 "The location of the source should not contain " 

296 "'{0}': {1}".format(get_user(), srcp)) 

297 

298 if only_setup_hook: 

299 tested_module(src_abs, project_var_name, setup_params) 

300 

301 else: 

302 # coverage 

303 if add_coverage: # pragma: no cover 

304 stdout_this.write("[main_wrapper_tests] --- COVERAGE BEGIN ---\n") 

305 if report_folder is None: 

306 report_folder = os.path.join( 

307 os.path.abspath(os.path.dirname(logfile)), "..", "_doc", 

308 "sphinxdoc", "source", "coverage") 

309 

310 fLOG("[main_wrapper_tests] call _setup_hook", 

311 src_abs, "name=", project_var_name) 

312 tested_module(src_abs, project_var_name, setup_params) 

313 fLOG("[main_wrapper_tests] end _setup_hook") 

314 

315 fLOG("[main_wrapper_tests] current folder", os.getcwd()) 

316 fLOG("[main_wrapper_tests] enabling coverage", srcp) 

317 dfile = os.path.join(report_folder, ".coverage") 

318 

319 # we clean previous report or we create an empty folder 

320 if os.path.exists(report_folder): 

321 for afile in os.listdir(report_folder): 

322 full = os.path.join(report_folder, afile) 

323 os.remove(full) 

324 

325 # we run the coverage 

326 if coverage_options is None: 

327 coverage_options = {} 

328 if "source" in coverage_options: 

329 coverage_options["source"].append(srcp) 

330 else: 

331 coverage_options["source"] = [srcp] 

332 if "data_file" not in coverage_options: 

333 coverage_options["data_file"] = dfile 

334 

335 from coverage import coverage 

336 cov = coverage(**coverage_options) 

337 if coverage_exclude_lines is not None: 

338 for line in coverage_exclude_lines: 

339 cov.exclude(line) 

340 else: 

341 cov.exclude("raise NotImplementedError") 

342 stdout_this.write("[main_wrapper_tests] ENABLE COVERAGE\n") 

343 cov.start() 

344 

345 res = run_main() 

346 

347 cov.stop() 

348 stdout_this.write( 

349 "[main_wrapper_tests] STOP COVERAGE + REPORT into '{0}" 

350 "\n'".format(report_folder)) 

351 

352 from coverage.misc import CoverageException as RawCoverageException 

353 try: 

354 cov.html_report(directory=report_folder) 

355 except RawCoverageException as e: 

356 raise RuntimeError( 

357 "Unable to publish the coverage repot into '{}'," 

358 "\nsource='{}'\ndata='{}'".format( 

359 report_folder, coverage_options["source"], 

360 coverage_options.get("data_file", ''))) from e 

361 outfile = os.path.join(report_folder, "coverage_report.xml") 

362 cov.xml_report(outfile=outfile) 

363 cov.save() 

364 srcp_s = [] 

365 

366 # we clean absolute path from the produced files 

367 def clean_absolute_path(): 

368 fLOG("[main_wrapper_tests] replace ", 

369 srcp, ' by ', project_var_name) 

370 srcp_s.clear() 

371 srcp_s.extend([os.path.abspath(os.path.normpath(srcp)), 

372 os.path.normpath(srcp)]) 

373 bsrcp = [bytes(b, encoding="utf-8") for b in srcp_s] 

374 bproj = bytes(project_var_name, encoding="utf-8") 

375 for afile in os.listdir(report_folder): 

376 full = os.path.join(report_folder, afile) 

377 if '.coverage' in afile: 

378 # sqlite3 format 

379 _modifies_coverage_report( 

380 full, srcp_s, project_var_name) 

381 else: 

382 with open(full, "rb") as f: 

383 content = f.read() 

384 for b in bsrcp: 

385 content = content.replace(b, bproj) 

386 with open(full, "wb") as f: 

387 f.write(content) 

388 

389 clean_absolute_path() 

390 

391 # we print debug information for the coverage 

392 def write_covlog(covs): 

393 fLOG("[main_wrapper_tests] add debug information") 

394 outcov = os.path.join(report_folder, "covlog.txt") 

395 rows = [] 

396 rows.append("COVERAGE OPTIONS") 

397 for k, v in sorted(coverage_options.items()): 

398 rows.append("{0}={1}".format(k, v)) 

399 rows.append("") 

400 rows.append("EXCLUDE LINES") 

401 for k in cov.get_exclude_list(): 

402 rows.append(k) 

403 rows.append("") 

404 rows.append("OPTIONS") 

405 for option_spec in sorted(cov.config.CONFIG_FILE_OPTIONS): 

406 attr = option_spec[0] 

407 if attr == "sort": 

408 # we skip, it raises an exception with coverage 4.2 

409 continue 

410 v = getattr(cov.config, attr) 

411 st = "{0}={1}".format(attr, v) 

412 rows.append(st) 

413 rows.append("") 

414 

415 if covs is not None: 

416 for add in sorted(covs): 

417 rows.append("MERGE='{0}'".format(add)) 

418 

419 content = "\n".join(rows) 

420 

421 reps = [] 

422 for _ in srcp_s[:1]: 

423 __ = os.path.normpath(os.path.join(_, "..", "..", "..")) 

424 __ += "/" 

425 reps.append(__) 

426 reps.append(__.replace("/", "\\")) 

427 reps.append(__.replace("/", "\\\\")) 

428 reps.append(__.replace("\\", "\\\\")) 

429 

430 for s in reps: 

431 content = content.replace(s, "") 

432 

433 with open(outcov, "w", encoding="utf8") as f: 

434 f.write(content) 

435 

436 write_covlog(None) 

437 

438 if dump_coverage is not None: 

439 # delayed import 

440 from ..filehelper import synchronize_folder 

441 src = os.path.dirname(outfile) 

442 stdout_this.write( 

443 "[main_wrapper_tests] dump coverage from '{1}' to '{0}'" 

444 "\n".format(dump_coverage, outfile)) 

445 synchronize_folder(src, dump_coverage, 

446 copy_1to2=True, fLOG=fLOG) 

447 

448 if other_cov_folders is not None: 

449 source = _find_source(src) 

450 if not source: 

451 raise FileNotFoundError( 

452 "Unable to find source '{0}' from '{1}'".format( 

453 source, src)) 

454 if coverage_root: 

455 source_src = os.path.join(source, coverage_root) 

456 if os.path.exists(source_src): 

457 source = source_src 

458 stdout_this.write( 

459 "[main_wrapper_tests] ADD COVERAGE for source='{0}'" 

460 "\n".format(source)) 

461 covs = list(_[0] for _ in other_cov_folders.values()) 

462 covs.append(os.path.abspath( 

463 os.path.normpath(os.path.join(src, '.coverage')))) 

464 stdout_this.write( 

465 "[main_wrapper_tests] ADD COVERAGE COMBINE={0}" 

466 "\n".format(covs)) 

467 stdout_this.write( 

468 "[main_wrapper_tests] DUMP INTO='{0}'\n".format(src)) 

469 try: 

470 coverage_combine(covs, src, source) 

471 write_covlog(covs) 

472 except Exception as e: 

473 warnings.warn("[main_wrapper_tests] {}".format( 

474 str(e).replace("\n", " "))) 

475 

476 if covtoken: 

477 if isinstance(covtoken, tuple): 

478 if eval(covtoken[1]): 

479 # publishing token 

480 mes = ( 

481 "[main_wrapper_tests] PUBLISH COVERAGE to " 

482 "codecov '{0}' EVAL ({1})".format( 

483 covtoken[0], covtoken[1])) 

484 if stdout is not None: 

485 stdout.write(mes) 

486 stdout_this.write(mes + '\n') 

487 fLOG(mes) 

488 publish_coverage_on_codecov( 

489 token=covtoken[0], path=outfile, fLOG=fLOG) 

490 else: 

491 fLOG( 

492 "[main_wrapper_tests] skip publishing " 

493 "coverage to codecov due to False:", 

494 covtoken[1]) 

495 else: 

496 # publishing token 

497 fLOG( 

498 "[main_wrapper_tests] publishing coverage to " 

499 "codecov %r." % covtoken) 

500 publish_coverage_on_codecov( 

501 token=covtoken, path=outfile, fLOG=fLOG) 

502 else: 

503 stdout_this.write( 

504 "[main_wrapper_tests] NO PUBLISHING {}.\n".format(covtoken)) 

505 stdout_this.write("[main_wrapper_tests] --- COVERAGE END ---\n") 

506 else: 

507 stdout_this.write( 

508 "[main_wrapper_tests] --- NO COVERAGE BEGIN ---\n") 

509 if covtoken and (not isinstance(covtoken, tuple) or eval(covtoken[1])): 

510 raise CoverageException( # pragma: no cover 

511 "covtoken is not null but add_coverage is not True, coverage cannot be published") 

512 tested_module(src_abs, project_var_name, setup_params) 

513 res = run_main() 

514 stdout_this.write("[main_wrapper_tests] --- NO COVERAGE END ---\n") 

515 

516 fLOG("[main_wrapper_tests] SUMMARY -------------------------") 

517 for r in res["tests"]: 

518 k = str(r[1]) 

519 if "errors=0" not in k or "failures=0" not in k: 

520 fLOG("*", r[1], r[0]) # pragma: no cover 

521 

522 fLOG("[main_wrapper_tests] CHECK EXCEPTION -----------------") 

523 err = res.get("err", "") 

524 if len(err) > 0: # pragma: no cover 

525 # Remove most of the Sphinx warnings (sphinx < 1.8) 

526 lines = err.split("\n") 

527 lines = [ 

528 _ for _ in lines if _ and "is already registered, it will be overridden" not in _] 

529 err = "\n".join(lines) 

530 if len(err) > 0: 

531 raise TestWrappedException(err) # pragma: no cover 

532 

533 datetime_end = datetime.now() 

534 

535 rows = ["[main_wrapper_tests] END", 

536 "[main_wrapper_tests] begin time {0}".format(datetime_begin), 

537 "[main_wrapper_tests] end time {0}".format(datetime_end), 

538 "[main_wrapper_tests] duration {0}".format(datetime_end - datetime_begin)] 

539 for row in rows: 

540 fLOG(row) 

541 stdout_this.write(row + "\n")