Coverage for pyquickhelper/pycode/coverage_helper.py: 82%

130 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-03 02:21 +0200

1""" 

2@file 

3@brief Publishing coverage 

4""" 

5import os 

6import re 

7from collections import Counter 

8import shutil 

9import pprint 

10import sqlite3 

11from contextlib import redirect_stderr, redirect_stdout 

12from io import StringIO 

13 

14 

15def _attr_(var, name1, name2): 

16 try: 

17 return getattr(var, name1, getattr(var, name2)) 

18 except AttributeError: # pragma: no cover 

19 raise AttributeError( 

20 "Unable to find '{}' or '{}' ({}) in \n{}\n--".format( 

21 name1, name2, type(var), 

22 "\n".join(sorted(dir(var))))) 

23 

24 

25def get_source(cov): 

26 return cov.config.source 

27 

28 

29def publish_coverage_on_codecov(path, token, commandline=True, fLOG=None): 

30 """ 

31 Publishes the coverage report on `codecov <https://codecov.io/>`_. 

32 See blog post :ref:`blogpost_coverage_codecov`. 

33 

34 @param path path to source 

35 @param token token on codecov 

36 @param commandline see @see cl SourceRepository 

37 @param fLOG logging function 

38 @return out, err from function @see fn run_cmd 

39 """ 

40 if fLOG is None: 

41 from ..loghelper import noLOG 

42 fLOG = noLOG 

43 # delayed import to speed up import of pycode 

44 from ..loghelper import SourceRepository 

45 if os.path.isfile(path) or path.endswith(".xml"): 

46 report = path 

47 else: 

48 report = os.path.join(path, "_doc", "sphinxdoc", 

49 "source", "coverage", "coverage_report.xml") 

50 

51 report = os.path.normpath(report) 

52 if not os.path.exists(report): 

53 raise FileNotFoundError( # pragma: no cover 

54 f"Unable to find '{report}'.") 

55 

56 proj = os.path.normpath(os.path.join( 

57 os.path.dirname(report), "..", "..", "..", "..")) 

58 

59 src = SourceRepository(commandline=commandline) 

60 last = src.get_last_commit_hash(proj) 

61 cmd = [f"--token={token}", f"--file={report}", 

62 f"--commit={last}", f"--root={proj} -X gcov"] 

63 if token is not None: # pragma: no cover 

64 import codecov 

65 new_out = StringIO() 

66 new_err = StringIO() 

67 with redirect_stdout(new_out): 

68 with redirect_stderr(new_err): 

69 codecov.main(*cmd) 

70 out = new_out.getvalue() 

71 err = new_err.getvalue() 

72 if err: 

73 raise RuntimeError( # pragma: no cover 

74 f"Unable to run:\nCMD:\n{cmd}\nOUT:\n{out}\n[pyqerror]\n{err}") 

75 return out, err 

76 return cmd 

77 

78 

79def find_coverage_report(folder, exclude=None, filter_out='.*conda.*'): # pragma: no cover 

80 """ 

81 Finds all coverage reports in one subfolder. 

82 

83 @param folder which folder to look at 

84 @param exclude list of subfolder not to look at 

85 @param filter_out filter out from the name 

86 @return list of files ``.coverage`` 

87 

88 The structure is supposed to: 

89 

90 :: 

91 

92 folder 

93 +- hash1 

94 | +- date1 

95 | | +- .coverage - not selected 

96 | +- date2 

97 | +- .coverage - selected 

98 +- hash2 

99 +- date 

100 +- .coverage - selected 

101 """ 

102 # delayed import to speed up import of pycode 

103 from ..filehelper import explore_folder_iterfile 

104 regexp = re.compile('data_file=([0-9a-zA-Z_]+)') 

105 regcov = re.compile( 

106 '<h1>Coveragereport:<spanclass=.?pc_cov.?>([0-9]+)%</span>') 

107 regout = re.compile(filter_out) if filter_out else None 

108 covs = {} 

109 subfold = os.listdir(folder) 

110 for sub in subfold: 

111 if exclude is not None and sub in exclude: 

112 continue 

113 full = os.path.join(folder, sub) 

114 keep = [] 

115 nn = None 

116 cov = None 

117 for it in explore_folder_iterfile(full): 

118 name = os.path.split(it)[-1] 

119 dt = os.stat(full).st_mtime 

120 if name == 'index.html': 

121 with open(it, 'r') as f: 

122 htd = f.read().replace('\n', '').replace('\r', '').replace(' ', '') 

123 cont = regcov.findall(htd) 

124 if len(cont) > 0: 

125 cov = cont[0] 

126 if name == 'covlog.txt': 

127 with open(it, 'r') as f: 

128 logd = f.read() 

129 cont = regexp.findall(logd) 

130 if len(cont) > 0: 

131 nn = cont[0] 

132 if name == '.coverage': 

133 keep.append((dt, it)) 

134 if len(keep) == 0: 

135 continue 

136 mx = max(keep) 

137 if isinstance(nn, str) and regout is not None and regout.search(nn): 

138 continue 

139 covs[sub] = (mx[-1], nn, cov) 

140 return covs 

141 

142 

143def coverage_combine(data_files, output_path, source, process=None): 

144 """ 

145 Merges multiples reports. 

146 

147 @param data_files report files (``.coverage``) 

148 @param output_path output path 

149 @param source source directory 

150 @param process function which processes the coverage report 

151 @return coverage report 

152 

153 The function *process* should have the signature: 

154 

155 :: 

156 

157 def process(content): 

158 # ... 

159 return content 

160 

161 On :epkg:`Windows`, file name have to have the right case. 

162 If not, coverage reports an empty coverage and raises an exception. 

163 """ 

164 def raise_exc(exc, content, ex, ex2, outfile, destcov, source, 

165 dests, inter, cov, infos): # pragma: no cover 

166 

167 def shorten(t): 

168 if len(t) > 2000: 

169 return t[:2000] + "\n..." 

170 else: 

171 return t 

172 if len(content) > 2000: 

173 content = content[:2000] + '\n...' 

174 ex = "\n-\n".join(shorten(_) for _ in ex) 

175 ex2 = "\n-\n".join(shorten(_) for _ in ex2) 

176 rows = ['-----------------', 

177 f"destcov='{destcov}'", 

178 f"outfile='{outfile}'", 

179 f"source='{source}'", 

180 f"cov.source={get_source(cov)}", 

181 f"dests='{';'.join(dests)}'", 

182 f"inter={inter}"] 

183 for ii, info in enumerate(infos): 

184 rows.append(f'----------------- {ii}/{len(infos)}') 

185 for k, v in sorted(info.items()): 

186 rows.append(f"{k}='{v}'") 

187 rows.append('-----------------') 

188 if cov is not None and _attr_(cov, '_data', 'data')._lines is not None: 

189 rows.append("##### LINES") 

190 end = min(5, len(_attr_(cov, '_data', 'data')._lines)) 

191 for k, v in list(sorted(_attr_(cov, '_data', 'data')._lines.items()))[:end]: 

192 rows.append(f' {k}:{v}') 

193 rows.append("----- RUNS") 

194 end = min(5, len(_attr_(cov, '_data', 'data')._runs)) 

195 for k in _attr_(cov, '_data', 'data')._runs[:end]: 

196 rows.append(f' {k}') 

197 rows.append("----- END") 

198 

199 mes = "{5}. In '{0}'.\n{1}\n{2}\n---AFTER---\n{3}\n---BEGIN---\n{4}" 

200 raise RuntimeError(mes.format(output_path, "\n".join( 

201 rows), content, ex, ex2, exc, cov)) from exc 

202 

203 # We copy the origin coverage if the report is produced 

204 # in a folder part of the merge. 

205 destcov = os.path.join(output_path, '.coverage') 

206 if os.path.exists(destcov): 

207 destcov2 = destcov + '_old' 

208 shutil.copy(destcov, destcov2) 

209 

210 # Starts merging coverage. 

211 from coverage import Coverage 

212 cov = Coverage(data_file=destcov, source=[source]) 

213 cov._init() 

214 cov.get_data() 

215 if get_source(cov) is None or len(get_source(cov)) == 0: 

216 raise_exc(FileNotFoundError(f"Probably unable to find '{source}'"), 

217 "", [], [], "", destcov, source, [], [], cov, []) 

218 

219 inter = [] 

220 

221 def find_longest_common_root(names, begin): 

222 counts = {} 

223 for name in names: 

224 spl = name.split(begin) 

225 for i in range(1, len(spl) + 1): 

226 if spl[i - 1] == 'src': 

227 break 

228 sub = begin.join(spl[:i]) 

229 if sub in counts: 

230 counts[sub] += 1 

231 else: 

232 counts[sub] = 1 

233 item = max((v, k) for k, v in counts.items()) 

234 return item[1] 

235 

236 def copy_replace(source, dest, root_source, keep_infos): 

237 shutil.copy(source, dest) 

238 

239 co = Counter(root_source) 

240 slash = co.get('/', 0) >= co.get('\\', 0) 

241 if slash: 

242 begin = "/" 

243 root_source_dup = root_source.replace('\\', '/').replace('//', '/') 

244 else: 

245 begin = "\\" 

246 root_source_dup = root_source.replace("\\", "\\\\") 

247 

248 keep_infos["slash"] = slash 

249 keep_infos["begin"] = begin 

250 keep_infos["root_source_dup"] = root_source_dup 

251 keep_infos["root_source"] = root_source 

252 keep_infos["source"] = source 

253 keep_infos["dest"] = dest 

254 

255 conn = sqlite3.connect(dest) 

256 sql = [] 

257 names = [] 

258 for row in conn.execute("select * from file"): 

259 names.append(row[1]) 

260 name = row[1].replace('/', begin) 

261 if not name.startswith(root_source): 

262 name = root_source + begin + name 

263 s = f"UPDATE file SET path='{name}' WHERE id={row[0]};" 

264 sql.append(s) 

265 

266 keep_infos['root_common'] = find_longest_common_root(names, begin) 

267 

268 c = conn.cursor() 

269 for s in sql: 

270 c.execute(s) 

271 conn.commit() 

272 conn.close() 

273 

274 # We modify the root in every coverage file. 

275 dests = [os.path.join(output_path, f'.coverage{i}') 

276 for i in range(len(data_files))] 

277 infos = [] 

278 for fi, de in zip(data_files, dests): 

279 keep_infos = {} 

280 copy_replace(fi, de, source, keep_infos) 

281 infos.append(keep_infos) 

282 shutil.copy(de, de + "~") 

283 

284 # Keeping information (for exception). 

285 ex = [] 

286 for d in dests: 

287 with open(d, "rb") as f: 

288 ex.append(f.read()) 

289 ex2 = [] 

290 for d in data_files: 

291 with open(d, "rb") as f: 

292 ex2.append(f.read()) 

293 

294 # We replace destcov by destcov2 if found in dests. 

295 if destcov in dests: 

296 ind = dests.index(destcov) 

297 dests[ind] = destcov2 

298 

299 # Let's combine. 

300 cov.combine(dests) # dest 

301 cov.save() 

302 report = True 

303 

304 try: 

305 from coverage.exceptions import CoverageException 

306 except ImportError: 

307 # older version of coverage 

308 from coverage.misc import CoverageException 

309 try: 

310 from coverage.exceptions import NoSource 

311 except ImportError: 

312 # older version of coverage 

313 from coverage.misc import NoSource 

314 try: 

315 cov.html_report(directory=output_path, 

316 ignore_errors=True) 

317 except NoSource as e: 

318 raise_exc(e, "", ex, ex2, "", destcov, source, 

319 dests, inter, cov, infos) 

320 except CoverageException as e: 

321 if "No data to report" in str(e): 

322 # issue with path 

323 report = False 

324 else: 

325 msg = pprint.pformat(infos) 

326 raise RuntimeError( # pragma: no cover 

327 f"Unable to process report in '{output_path}'.\n----\n{msg}") from e 

328 

329 if report: 

330 outfile = os.path.join(output_path, "coverage_report.xml") 

331 cov.xml_report(outfile=outfile) 

332 cov.save() 

333 

334 # Verifications 

335 with open(outfile, "r", encoding="utf-8") as f: 

336 content = f.read() 

337 if len(content) == 0: 

338 raise RuntimeError("No report was generated.") 

339 

340 return cov