Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1""" 

2@file 

3@brief This extension contains various functionalities to help unittesting. 

4""" 

5import os 

6import sys 

7import unittest 

8from datetime import datetime 

9import warnings 

10import sqlite3 

11from .code_exceptions import CoverageException, SetupHookException 

12from .coverage_helper import publish_coverage_on_codecov, find_coverage_report, coverage_combine 

13from .utils_tests_stringio import StringIOAndFile 

14 

15 

16class TestWrappedException(Exception): 

17 "Raised by @see fn main_wrapper_tests" 

18 pass 

19 

20 

21def _modifies_coverage_report(name, bsrcp, bproj): 

22 conn = sqlite3.connect(name) 

23 sql = [] 

24 for row in conn.execute("select * from file"): 

25 name = row[1] 

26 for b in bsrcp: 

27 name = name.replace(b, bproj) 

28 name = name.replace('\\', '/') 

29 s = "UPDATE file SET path='{}' WHERE id={};".format(name, row[0]) 

30 sql.append(s) 

31 

32 c = conn.cursor() 

33 for s in sql: 

34 c.execute(s) 

35 conn.commit() 

36 conn.close() 

37 

38 

39def main_wrapper_tests(logfile, skip_list=None, processes=False, add_coverage=False, report_folder=None, 

40 skip_function=None, setup_params=None, only_setup_hook=False, 

41 coverage_options=None, coverage_exclude_lines=None, additional_ut_path=None, 

42 covtoken=None, hook_print=True, stdout=None, stderr=None, filter_warning=None, 

43 dump_coverage=None, add_coverage_folder=None, coverage_root="src", fLOG=None): 

44 """ 

45 Calls function :func:`main <pyquickhelper.unittests.utils_tests.main>` 

46 and throws an exception if it fails. 

47 

48 @param logfile locatio of a logfile 

49 @param skip_list to skip a list of unit tests (by index, starting by 1) 

50 @param processes to run the unit test in a separate process (with function @see fn run_cmd), 

51 however, to make that happen, you need to specify 

52 ``exit=False`` for each test file, see `unittest.main 

53 <https://docs.python.org/3/library/unittest.html#unittest.main>`_ 

54 @param add_coverage (bool) run the unit tests and measure the coverage at the same time 

55 @param report_folder (str) folder where the coverage report will be stored 

56 @param skip_function *function(filename,content,duration) --> boolean* to skip a unit test 

57 @param setup_params parameters sent to @see fn call_setup_hook 

58 @param only_setup_hook calls only @see fn call_setup_hook, do not run the unit test 

59 @param coverage_options (dict) options for module coverage as a dictionary, see below, default is None 

60 @param coverage_exclude_lines (list) options for module coverage, lines to exclude from the coverage report, defaul is None 

61 @param additional_ut_path (list) additional paths to add when running the unit tests 

62 @param covtoken (str|tuple(str, str)) token used when publishing coverage report to `codecov <https://codecov.io/>`_ 

63 or None to not publish 

64 @param hook_print enable print display when calling *_setup_hook* 

65 @param stdout if not None, write output on this stream instead of *sys.stdout* 

66 @param stderr if not None, write errors on this stream instead of *sys.stderr* 

67 @param filter_warning function which removes some warnings in the final output, 

68 if None, the function filters out some recurrent warnings 

69 in jupyter (signature: ``def filter_warning(w: warning) -> bool``), 

70 @see fn default_filter_warning 

71 @param dump_coverage dump or copy the coverage at this location 

72 @param add_coverage_folder additional coverage folder reports 

73 @param coverage_root subfolder for the coverage 

74 @param fLOG ``function(*l, **p)``, logging function 

75 

76 *covtoken* can be a string ``<token>`` or a 

77 tuple ``(<token>, <condition>)``. The condition is evaluated 

78 by the python interpreter and determines whether or not the coverage 

79 needs to be published. 

80 

81 .. faqref:: 

82 :title: How to build pyquickhelper with Jenkins? 

83 :index: Jenkins 

84 

85 :epkg:`Jenkins` is a task scheduler for continuous integration. 

86 You can easily schedule batch command to build and run unit tests for a specific project. 

87 To build pyquickhelper, you need to install :epkg:`python`, 

88 :epkg:`pymyinstall`, 

89 :epkg:`miktex`, :epkg:`pandoc`, 

90 :epkg:`sphinx`. 

91 

92 Once Jenkins is installed, the command to schedule is:: 

93 

94 set PATH=%PATH%;%USERPOFILE%\\AppData\\Local\\Pandoc 

95 build_setup_help_on_windows.bat 

96 

97 This works if you installed Jenkins with your credentials. 

98 Otherwise, the path to ``pandoc.exe`` needs to be changed. 

99 And you can also read `Schedule builds with Jenkins 

100 <http://www.xavierdupre.fr/blog/2014-12-06_nojs.html>`_. 

101 :epkg:`node.js` might be required if a notebooks contain javascript. 

102 

103 Parameters *add_coverage* and *report_folder* are used to compute the coverage 

104 using the module `coverage <http://nedbatchelder.com/code/coverage/>`_. 

105 The function does something about the following error: 

106 

107 _tkinter.TclError: no display name and no $DISPLAY environment variable 

108 

109 It is due to :epkg:`matplotlib`. 

110 See `Generating matplotlib graphs without a running X server 

111 <http://stackoverflow.com/questions/4931376/generating-matplotlib-graphs-without-a-running-x-server>`_. 

112 If the skip function is None, it will replace it by the function @see fn default_skip_function. 

113 Calls function @see fn _setup_hook if it is available in the unit tested module. 

114 Parameter *tested_module* was added, the function then checks the presence of 

115 function @see fn _setup_hook, it is the case, it runs it. 

116 

117 Parameter *setup_params*: a mechanism was put in place 

118 to let the module to test a possibility to run some preprocessing steps 

119 in a separate process. They are described in @see fn _setup_hook 

120 which must be found in the main file ``__init__.py``. 

121 Parameter *only_setup_hook*: 

122 saves the report in XML format, binary format, replace full paths by relative path. 

123 

124 Parameters *coverage_options*, *coverage_exclude_lines*, *additional_ut_path*: 

125 see class `Coverage <http://coverage.readthedocs.org/en/coverage-4.0b1/api_coverage.html?highlight=coverage#coverage.Coverage.__init__>`_ 

126 and `Configuration files <http://coverage.readthedocs.org/en/coverage-4.0b1/config.html>`_ 

127 to specify those options. If both values are left to None, this function will 

128 compute the code coverage for all files in this module. The function 

129 now exports the coverage options which were used. 

130 For example, to exclude files from the coverage report:: 

131 

132 coverage_options=dict(omit=["*exclude*.py"]) 

133 

134 Parameter *covtoken*: used to post the coverage report to 

135 `codecov <https://codecov.io/>`_. 

136 

137 .. versionchanged:: 1.8 

138 Parameter *coverage_root* was added. 

139 """ 

140 # delayed import 

141 from ..loghelper.os_helper import get_user 

142 

143 if skip_function is None: 

144 from .utils_tests_private import default_skip_function 

145 skip_function = default_skip_function 

146 

147 if fLOG is None: 

148 from ..loghelper.flog import noLOG 

149 fLOG = noLOG 

150 

151 whole_ouput = StringIOAndFile(logfile) 

152 runner = unittest.TextTestRunner(verbosity=0, stream=whole_ouput) 

153 path = os.path.abspath(os.path.join(os.path.split(logfile)[0])) 

154 stdout_this = stdout if stdout else sys.stdout 

155 datetime_begin = datetime.now() 

156 

157 def _find_source(fold): 

158 fold0 = fold 

159 exists = os.path.exists(os.path.join(fold, ".gitignore")) 

160 while not exists: 

161 if len(fold) < 2: 

162 raise FileNotFoundError( 

163 "Unable to guess source from '{0}'.".format(fold0)) 

164 fold = os.path.split(fold)[0] 

165 exists = os.path.exists(os.path.join(fold, ".gitignore")) 

166 return os.path.normpath(os.path.abspath(fold)) 

167 

168 def run_main(): 

169 # delayed import to speed up import of pycode 

170 from .utils_tests_private import main_run_test 

171 res = main_run_test(runner, path_test=path, skip=-1, skip_list=skip_list, 

172 processes=processes, skip_function=skip_function, 

173 additional_ut_path=additional_ut_path, stdout=stdout, stderr=stderr, 

174 filter_warning=filter_warning, fLOG=fLOG) 

175 return res 

176 

177 if "win" not in sys.platform and "DISPLAY" not in os.environ: 

178 # issue detected with travis 

179 # _tkinter.TclError: no display name and no $DISPLAY environment variable 

180 #os.environ["DISPLAY"] = "localhost:0" 

181 pass 

182 

183 # other coverage reports 

184 if add_coverage_folder is not None and dump_coverage is not None: 

185 sub = os.path.split(dump_coverage)[0] 

186 sub = os.path.split(sub)[-1] 

187 other_cov_folders = find_coverage_report( 

188 add_coverage_folder, exclude=sub) 

189 mes = "[main_wrapper_tests] other_cov_folders...sub='{0}'".format(sub) 

190 stdout_this.write(mes + "\n") 

191 for k, v in sorted(other_cov_folders.items()): 

192 mes = "[main_wrapper_tests] k='{0}' v={1}".format(k, v) 

193 stdout_this.write(mes + "\n") 

194 if len(other_cov_folders) == 0: 

195 other_cov_folders = None 

196 else: 

197 other_cov_folders = None 

198 

199 # to deal with: _tkinter.TclError: no display name and no $DISPLAY 

200 # environment variable 

201 from .tkinter_helper import fix_tkinter_issues_virtualenv, _first_execution 

202 fLOG("[main_wrapper_tests] MODULES (1): matplotlib already imported", 

203 "matplotlib" in sys.modules, "first execution", _first_execution) 

204 r = fix_tkinter_issues_virtualenv(fLOG=fLOG) 

205 fLOG("[main_wrapper_tests] MODULES (2): matplotlib imported", 

206 "matplotlib" in sys.modules, "first execution", _first_execution) 

207 fLOG("[main_wrapper_tests] fix_tkinter_issues_virtualenv", r) 

208 

209 def tested_module(folder, project_var_name, setup_params): 

210 # module mod 

211 # delayed import 

212 from .call_setup_hook import call_setup_hook 

213 if setup_params is None: 

214 setup_params = {} 

215 out, err = call_setup_hook( 

216 folder, project_var_name, fLOG=fLOG, use_print=hook_print, **setup_params) 

217 if len(err) > 0 and err != "no _setup_hook": # pragma: no cover 

218 # fix introduced because pip 8.0 displays annoying warnings 

219 # RuntimeWarning: Config variable 'Py_DEBUG' is unset, Python ABI tag may be incorrect 

220 # RuntimeWarning: Config variable 'WITH_PYMALLOC' is unset, Python 

221 # ABI tag may be incorrect 

222 lines = err.split("\n") 

223 keep = [] 

224 for line in lines: 

225 line = line.rstrip("\r\t ") 

226 if line and not line.startswith(" ") and "RuntimeWarning: Config variable" not in line: 

227 keep.append(line) 

228 if len(keep) > 0: 

229 raise SetupHookException( 

230 "unable to run _setup_hook\n**OUT:\n{0}\n**[pyqerror]\n{1}\n**FOLDER:\n{2}\n**NAME:\n{3}\n**KEEP:\n{4}\n**" 

231 .format(out, err, folder, project_var_name, "\n".join(keep))) 

232 out += "\nWARNINGS:\n" + err 

233 err = None 

234 return out, err 

235 

236 # project_var_name 

237 folder = os.path.normpath( 

238 os.path.join(os.path.dirname(logfile), "..", "src")) 

239 if not os.path.exists(folder): 

240 folder = os.path.normpath( 

241 os.path.join(os.path.dirname(logfile), "..")) 

242 if not os.path.exists(folder): 

243 raise FileNotFoundError(folder) # pragma: no cover 

244 

245 def selec_name(folder, name): 

246 if name.startswith('_') or name.startswith('.'): 

247 return False 

248 if name in ('bin', 'dist', 'build'): 

249 return False 

250 if '.egg' in name or 'dist_module27' in name: 

251 return False 

252 fold = os.path.join(folder, name) 

253 if not os.path.isdir(fold): 

254 return False 

255 init = os.path.join(fold, '__init__.py') 

256 if not os.path.exists(init): 

257 return False 

258 return True 

259 

260 content = [_ for _ in os.listdir(folder) if selec_name(folder, _)] 

261 if len(content) != 1: 

262 raise FileNotFoundError( # pragma: no cover 

263 "Unable to guess the project name in '{0}', content=\n{1}\n---\n{2}\n---".format( 

264 folder, "\n".join(content), "\n".join(os.listdir(folder)))) 

265 

266 project_var_name = content[0] 

267 src_abs = os.path.normpath(os.path.abspath( 

268 os.path.join(os.path.dirname(logfile), ".."))) 

269 

270 root_src = os.path.join(src_abs, "src", project_var_name) 

271 if not os.path.exists(root_src): 

272 root_src = os.path.join(src_abs, project_var_name) 

273 if not os.path.exists(root_src): 

274 raise FileNotFoundError("Unable to find '{}'.".format(root_src)) 

275 srcp = os.path.relpath(root_src, os.getcwd()) 

276 

277 if get_user() in srcp: 

278 raise Exception( 

279 "The location of the source should not contain '{0}': {1}".format(get_user(), srcp)) 

280 

281 if only_setup_hook: 

282 tested_module(src_abs, project_var_name, setup_params) 

283 

284 else: 

285 # coverage 

286 if add_coverage: # pragma: no cover 

287 stdout_this.write("[main_wrapper_tests] --- COVERAGE BEGIN ---\n") 

288 if report_folder is None: 

289 report_folder = os.path.join( 

290 os.path.abspath(os.path.dirname(logfile)), "..", "_doc", "sphinxdoc", "source", "coverage") 

291 

292 fLOG("[main_wrapper_tests] call _setup_hook", 

293 src_abs, "name=", project_var_name) 

294 tested_module(src_abs, project_var_name, setup_params) 

295 fLOG("[main_wrapper_tests] end _setup_hook") 

296 

297 fLOG("[main_wrapper_tests] current folder", os.getcwd()) 

298 fLOG("[main_wrapper_tests] enabling coverage", srcp) 

299 dfile = os.path.join(report_folder, ".coverage") 

300 

301 # we clean previous report or we create an empty folder 

302 if os.path.exists(report_folder): 

303 for afile in os.listdir(report_folder): 

304 full = os.path.join(report_folder, afile) 

305 os.remove(full) 

306 

307 # we run the coverage 

308 if coverage_options is None: 

309 coverage_options = {} 

310 if "source" in coverage_options: 

311 coverage_options["source"].append(srcp) 

312 else: 

313 coverage_options["source"] = [srcp] 

314 if "data_file" not in coverage_options: 

315 coverage_options["data_file"] = dfile 

316 

317 from coverage import coverage 

318 cov = coverage(**coverage_options) 

319 if coverage_exclude_lines is not None: 

320 for line in coverage_exclude_lines: 

321 cov.exclude(line) 

322 else: 

323 cov.exclude("raise NotImplementedError") 

324 stdout_this.write("[main_wrapper_tests] ENABLE COVERAGE\n") 

325 cov.start() 

326 

327 res = run_main() 

328 

329 cov.stop() 

330 stdout_this.write( 

331 "[main_wrapper_tests] STOP COVERAGE + REPORT into '{0}\n'".format(report_folder)) 

332 

333 from coverage.misc import CoverageException as RawCoverageException 

334 try: 

335 cov.html_report(directory=report_folder) 

336 except RawCoverageException as e: 

337 raise RuntimeError("Unable to publish the coverage repot into '{}'," 

338 "\nsource='{}'\ndata='{}'".format( 

339 report_folder, coverage_options["source"], 

340 coverage_options.get("data_file", ''))) from e 

341 outfile = os.path.join(report_folder, "coverage_report.xml") 

342 cov.xml_report(outfile=outfile) 

343 cov.save() 

344 srcp_s = [] 

345 

346 # we clean absolute path from the produced files 

347 def clean_absolute_path(): 

348 fLOG("[main_wrapper_tests] replace ", 

349 srcp, ' by ', project_var_name) 

350 srcp_s.clear() 

351 srcp_s.extend([os.path.abspath(os.path.normpath(srcp)), 

352 os.path.normpath(srcp)]) 

353 bsrcp = [bytes(b, encoding="utf-8") for b in srcp_s] 

354 bproj = bytes(project_var_name, encoding="utf-8") 

355 for afile in os.listdir(report_folder): 

356 full = os.path.join(report_folder, afile) 

357 if '.coverage' in afile: 

358 # sqlite3 format 

359 _modifies_coverage_report( 

360 full, srcp_s, project_var_name) 

361 else: 

362 with open(full, "rb") as f: 

363 content = f.read() 

364 for b in bsrcp: 

365 content = content.replace(b, bproj) 

366 with open(full, "wb") as f: 

367 f.write(content) 

368 

369 clean_absolute_path() 

370 

371 # we print debug information for the coverage 

372 def write_covlog(covs): 

373 fLOG("[main_wrapper_tests] add debug information") 

374 outcov = os.path.join(report_folder, "covlog.txt") 

375 rows = [] 

376 rows.append("COVERAGE OPTIONS") 

377 for k, v in sorted(coverage_options.items()): 

378 rows.append("{0}={1}".format(k, v)) 

379 rows.append("") 

380 rows.append("EXCLUDE LINES") 

381 for k in cov.get_exclude_list(): 

382 rows.append(k) 

383 rows.append("") 

384 rows.append("OPTIONS") 

385 for option_spec in sorted(cov.config.CONFIG_FILE_OPTIONS): 

386 attr = option_spec[0] 

387 if attr == "sort": 

388 # we skip, it raises an exception with coverage 4.2 

389 continue 

390 v = getattr(cov.config, attr) 

391 st = "{0}={1}".format(attr, v) 

392 rows.append(st) 

393 rows.append("") 

394 

395 if covs is not None: 

396 for add in sorted(covs): 

397 rows.append("MERGE='{0}'".format(add)) 

398 

399 content = "\n".join(rows) 

400 

401 reps = [] 

402 for _ in srcp_s[:1]: 

403 __ = os.path.normpath(os.path.join(_, "..", "..", "..")) 

404 __ += "/" 

405 reps.append(__) 

406 reps.append(__.replace("/", "\\")) 

407 reps.append(__.replace("/", "\\\\")) 

408 reps.append(__.replace("\\", "\\\\")) 

409 

410 for s in reps: 

411 content = content.replace(s, "") 

412 

413 with open(outcov, "w", encoding="utf8") as f: 

414 f.write(content) 

415 

416 write_covlog(None) 

417 

418 if dump_coverage is not None: 

419 # delayed import 

420 from ..filehelper import synchronize_folder 

421 src = os.path.dirname(outfile) 

422 stdout_this.write("[main_wrapper_tests] dump coverage from '{1}' to '{0}'\n".format( 

423 dump_coverage, outfile)) 

424 synchronize_folder(src, dump_coverage, 

425 copy_1to2=True, fLOG=fLOG) 

426 

427 if other_cov_folders is not None: 

428 source = _find_source(src) 

429 if not source: 

430 raise FileNotFoundError( 

431 "Unable to find source '{0}' from '{1}'".format(source, src)) 

432 if coverage_root: 

433 source_src = os.path.join(source, coverage_root) 

434 if os.path.exists(source_src): 

435 source = source_src 

436 stdout_this.write( 

437 "[main_wrapper_tests] ADD COVERAGE for source='{0}'\n".format(source)) 

438 covs = list(_[0] for _ in other_cov_folders.values()) 

439 covs.append(os.path.abspath( 

440 os.path.normpath(os.path.join(src, '.coverage')))) 

441 stdout_this.write( 

442 "[main_wrapper_tests] ADD COVERAGE COMBINE={0}\n".format(covs)) 

443 stdout_this.write( 

444 "[main_wrapper_tests] DUMP INTO='{0}'\n".format(src)) 

445 try: 

446 coverage_combine(covs, src, source) 

447 write_covlog(covs) 

448 except Exception as e: 

449 warnings.warn("[main_wrapper_tests] {}".format( 

450 str(e).replace("\n", " "))) 

451 

452 if covtoken: 

453 if isinstance(covtoken, tuple): 

454 if eval(covtoken[1]): 

455 # publishing token 

456 mes = "[main_wrapper_tests] PUBLISH COVERAGE to codecov '{0}' EVAL ({1})".format( 

457 covtoken[0], covtoken[1]) 

458 if stdout is not None: 

459 stdout.write(mes) 

460 stdout_this.write(mes + '\n') 

461 fLOG(mes) 

462 publish_coverage_on_codecov( 

463 token=covtoken[0], path=outfile, fLOG=fLOG) 

464 else: 

465 fLOG( 

466 "[main_wrapper_tests] skip publishing coverage to codecov due to False:", covtoken[1]) 

467 else: 

468 # publishing token 

469 fLOG( 

470 "[main_wrapper_tests] publishing coverage to codecov", covtoken) 

471 publish_coverage_on_codecov( 

472 token=covtoken, path=outfile, fLOG=fLOG) 

473 stdout_this.write("[main_wrapper_tests] --- COVERAGE END ---\n") 

474 else: 

475 stdout_this.write( 

476 "[main_wrapper_tests] --- NO COVERAGE BEGIN ---\n") 

477 if covtoken and (not isinstance(covtoken, tuple) or eval(covtoken[1])): 

478 raise CoverageException( # pragma: no cover 

479 "covtoken is not null but add_coverage is not True, coverage cannot be published") 

480 tested_module(src_abs, project_var_name, setup_params) 

481 res = run_main() 

482 stdout_this.write("[main_wrapper_tests] --- NO COVERAGE END ---\n") 

483 

484 fLOG("[main_wrapper_tests] SUMMARY -------------------------") 

485 for r in res["tests"]: 

486 k = str(r[1]) 

487 if "errors=0" not in k or "failures=0" not in k: 

488 fLOG("*", r[1], r[0]) 

489 

490 fLOG("[main_wrapper_tests] CHECK EXCEPTION -----------------") 

491 err = res.get("err", "") 

492 if len(err) > 0: 

493 # Remove most of the Sphinx warnings (sphinx < 1.8) 

494 lines = err.split("\n") 

495 lines = [ 

496 _ for _ in lines if _ and "is already registered, it will be overridden" not in _] 

497 err = "\n".join(lines) 

498 if len(err) > 0: 

499 raise TestWrappedException(err) # pragma: no cover 

500 

501 datetime_end = datetime.now() 

502 

503 rows = ["[main_wrapper_tests] END", 

504 "[main_wrapper_tests] begin time {0}".format(datetime_begin), 

505 "[main_wrapper_tests] end time {0}".format(datetime_end), 

506 "[main_wrapper_tests] duration {0}".format(datetime_end - datetime_begin)] 

507 for row in rows: 

508 fLOG(row) 

509 stdout_this.write(row + "\n")