Coverage for pyquickhelper/loghelper/history_helper.py: 70%

155 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-03 02:21 +0200

1""" 

2@file 

3@helper Build history for a module. 

4""" 

5from datetime import datetime, timedelta 

6import re 

7import warnings 

8import requests 

9from jinja2 import Template 

10from .github_api import call_github_api 

11from .pypi_helper import enumerate_pypi_versions_date 

12 

13 

14def enumerate_closed_issues(owner, repo, since=None, issues=None, 

15 url=None, max_issue=None): 

16 """ 

17 Enumerates github issues for a repo and an owner 

18 since a given date. 

19 

20 @param owner repo owner 

21 @param repo repository 

22 @param since not older than that date, if None, 

23 do not go beyond a year 

24 @param issues to bypass @see fn call_github_api 

25 @param url if available, something like 

26 ``https://api.github.com/repos/sdpython/pyquickhelper/issues/{0}`` 

27 @param max_issue max number of issues 

28 @return iterator on issues ``(number, date, title)`` 

29 """ 

30 if since is None: 

31 since = datetime.now() - timedelta(365) 

32 if issues is None and url is not None and max_issue is not None: 

33 issues = [dict(url=url.format(k)) for k in range(max_issue, 0, -1)] 

34 elif issues is None: 

35 issues = call_github_api(owner, repo, 'issues?state=closed') 

36 if len(issues) == 0: 

37 raise ValueError("No issue found.") 

38 for issue in issues: 

39 if 'title' not in issue: 

40 url = issue['url'] 

41 response = requests.get(url, timeout=10) 

42 content = response.json() 

43 if 'API rate limit exceeded' in content.get('message', ''): 

44 warnings.warn('API rate limit exceeded', ResourceWarning) 

45 break 

46 else: 

47 content = issue 

48 closed = content.get('closed_at', None) 

49 if closed is None: 

50 continue 

51 title = content['title'] 

52 closed = datetime.strptime(closed.strip('Z'), "%Y-%m-%dT%H:%M:%S") 

53 number = content['number'] 

54 if closed < since: 

55 break 

56 if ("[WIP]" not in title and 

57 "[remove]" not in title.lower() and 

58 "[removed]" not in title.lower() and 

59 "[DEL]" not in title and 

60 "[WONT]" not in title and 

61 "[SKIP]" not in title and 

62 "[won't fix]" not in title.lower() and 

63 "[WONTDO]" not in title): 

64 yield number, closed, title 

65 

66 

67def build_history(owner, repo, name=None, since=None, issues=None, url=None, 

68 max_issue=None, releases=None, unpublished=False, 

69 existing_history=None, skip_issues=None, fLOG=None): 

70 """ 

71 Returns an history of a module. 

72 

73 @param owner repo owner 

74 @param repo repository 

75 @param name None if ``name == repo`` 

76 @param since not older than that date, if None, 

77 do not go beyond a year 

78 @param issues see @see fn call_github_api (unit test) 

79 @param url see @see fn call_github_api (unit test) 

80 @param max_issue see @see fn call_github_api (unit test) 

81 @param releases bypass :epkg:`pypi` (unit test) 

82 @param unpublished keep unpublished released 

83 @param existing_history existing history, retrieves existing issues stored 

84 in that file 

85 @param skip_issues skip a given list of issues when building the history 

86 @param fLOG logging function 

87 @return iterator on issues ``(number, date, title)`` 

88 """ 

89 if since is None: 

90 since = datetime.now() - timedelta(730) 

91 if name is None: 

92 name = repo 

93 

94 kept_issues = [] 

95 if existing_history is not None: 

96 res = extract_issue_from_history(existing_history) 

97 for k, v in sorted(res.items()): 

98 if skip_issues is not None and k in skip_issues: 

99 continue 

100 kept_issues.append((k, v[0], v[1])) 

101 

102 for issue in enumerate_closed_issues(owner, repo, since, issues=issues, 

103 url=url, max_issue=max_issue): 

104 if skip_issues is not None and issue[0] in skip_issues: 

105 continue 

106 kept_issues.append(issue) 

107 if fLOG: 

108 fLOG("[build_history] ", name, issue[:2]) 

109 if len(kept_issues) == 0: 

110 raise ValueError("No issue found.") 

111 

112 # remove duplicates 

113 current = kept_issues 

114 kept_issues = [] 

115 done = set() 

116 for nb, dt, desc in current: 

117 if nb not in done: 

118 kept_issues.append((nb, dt, desc)) 

119 done.add(nb) 

120 kept_issues.sort() 

121 

122 if releases is None: 

123 versions = [] 

124 for date, version, size in enumerate_pypi_versions_date(name): 

125 if date < since: 

126 break 

127 if fLOG: 

128 fLOG("[build_history] ", name, version, date) 

129 versions.append((date, version, size)) 

130 else: 

131 versions = releases 

132 if len(versions) == 0: 

133 versions = [(datetime.now(), '0.0.0', 0)] 

134 

135 # merge 

136 dates = [(v[0], "v", v) for v in versions] 

137 dates.extend((i[1], "i", i) for i in kept_issues) 

138 dates.sort(reverse=True) 

139 

140 merged = [] 

141 current = None 

142 if unpublished: 

143 current = dict(release="current", size=0, 

144 date=datetime.now(), issues=[]) 

145 for _, v, obj in dates: 

146 if v == 'v': 

147 if current is not None: 

148 merged.append(current) 

149 current = dict(release=obj[1], size=obj[2], date=obj[0], issues=[]) 

150 elif v == 'i': 

151 if current is not None: 

152 issue = dict(title=obj[2], date=obj[1], number=obj[0]) 

153 current['issues'].append(issue) 

154 

155 if current is not None: 

156 merged.append(current) 

157 return merged 

158 

159 

160_template = """ 

161 

162.. _l-HISTORY: 

163 

164======= 

165History 

166======= 

167{% for release in releases %} 

168{{ release['release'] }} - {{ release['date'].strftime("%Y-%m-%d") }} - {{ '%1.2fMb' % (release['size'] * 2**(-20)) }} 

169{{ '=' * (len(release['release']) + 22) }} 

170{% for issue in release['issues'] %} 

171* #{{issue['number']}}: {{issue['title']}} ({{issue['date'].strftime("%Y-%m-%d")}}){% endfor %} 

172{% endfor %} 

173""" 

174 

175 

176def compile_history(releases, template=None): 

177 """ 

178 Compile history and produces a :epkg:`rst` file. 

179 

180 @param releases output of @see fn build_history 

181 @param template :epkg:`jinja2` template (None means default one) 

182 @return output 

183 """ 

184 if template is None: 

185 global _template 

186 template = _template 

187 tmpl = Template(template) 

188 return tmpl.render(releases=releases, len=len) 

189 

190 

191class open_stream_file: 

192 """ 

193 Opens a stream or a filename. 

194 It works with keyword ``with``. 

195 

196 .. runpython:: 

197 :showcode: 

198 

199 from pyquickhelper.loghelper.history_helper import open_stream_file 

200 from io import StringIO 

201 st = StringIO("a\\nb") 

202 with open_stream_file(st) as f: 

203 for line in f.readlines(): 

204 print(line) 

205 """ 

206 

207 def __init__(self, name, mode="r", encoding="utf-8"): 

208 """ 

209 @param name stream or filename 

210 @param mode open mode, works only if filename 

211 @param encoding encoding, works only if filename 

212 """ 

213 self.name = name 

214 self.mode = mode 

215 self.encoding = encoding 

216 

217 def __enter__(self): 

218 """ 

219 Opens the stream or the file. 

220 """ 

221 if hasattr(self, '_content'): 

222 del self._content 

223 if hasattr(self.name, "read"): 

224 self.st = self.name 

225 else: 

226 self.st = open(self.name, self.mode, encoding=self.encoding) 

227 return self 

228 

229 def __exit__(self, exception_type, exception_value, traceback): 

230 """ 

231 Leaves the stream or the filename. 

232 """ 

233 if hasattr(self.name, "read"): 

234 pass 

235 else: 

236 self.st.close() 

237 if hasattr(self, '_content'): 

238 del self._content 

239 

240 def read(self, size=None): 

241 """ 

242 Reads some bytes. 

243 

244 @param size number of bytes or characters to read 

245 @return content 

246 """ 

247 return self.st.read(size=size) 

248 

249 def readline(self): 

250 """ 

251 Basic implementation. 

252 

253 @return next line 

254 """ 

255 if hasattr(self.st, "readline"): 

256 return self.st.readline() 

257 else: 

258 if hasattr(self, '_content'): 

259 self._content = self.read().split('\n') 

260 self._pos = 0 

261 if self._pos >= len(self._content): 

262 return None 

263 res = self._content[self._pos] 

264 self._pos += 1 

265 return res 

266 

267 def readlines(self): 

268 """ 

269 Basic implementation. 

270 

271 @return all text lines 

272 """ 

273 if hasattr(self.st, "readlines"): 

274 return self.st.readlines() 

275 else: 

276 line = self.readline() 

277 lines = [] 

278 while line: 

279 lines.append(line) 

280 line = self.readline() 

281 return lines 

282 

283 

284def extract_issue_from_history(filename_or_stream): 

285 """ 

286 Extracts issues from exsiting history stored 

287 in ``HISTORY.rst``. The pattern must extract 

288 from the following lines: 

289 

290 :: 

291 

292 * `133`: add a collapsible container, adapt it for runpython (2018-04-22) 

293 

294 or 

295 

296 * #133: add a collapsible container, adapt it for runpython (2018-04-22) 

297 

298 @param filename stream or filename 

299 @return ancient history, dictionary *{issue: (date, description)}* 

300 """ 

301 with open_stream_file(filename_or_stream, mode='r', encoding='utf-8') as f: 

302 lines = f.readlines() 

303 reg = re.compile('((`([0-9]+)`:)|([#]([0-9]+):))(.*?)[(]([-0-9]{10})') 

304 res = {} 

305 for line in lines: 

306 match = reg.search(line) 

307 if match: 

308 gr = match.groups() 

309 issue = gr[2] 

310 if issue is None or len(issue) == 0: 

311 issue = gr[4] 

312 desc = gr[5].strip() 

313 date = datetime.strptime(gr[6], '%Y-%m-%d') 

314 res[int(issue)] = (date, desc) 

315 return res