"""
@helper Build history for a module.
.. versionadded:: 1.7
:githublink:`%|py|7`
"""
from datetime import datetime, timedelta
import re
import warnings
import requests
from jinja2 import Template
from .github_api import call_github_api
from .pypi_helper import enumerate_pypi_versions_date
[docs]def enumerate_closed_issues(owner, repo, since=None, issues=None,
url=None, max_issue=None):
"""
Enumerates github issues for a repo and an owner
since a given date.
:param owner: repo owner
:param repo: repository
:param since: not older than that date, if None,
do not go beyond a year
:param issues: to bypass :func:`call_github_api <pyquickhelper.loghelper.github_api.call_github_api>`
:param url: if available, something like
``https://api.github.com/repos/sdpython/pyquickhelper/issues/{0}``
:param max_issue: max number of issues
:return: iterator on issues ``(number, date, title)``
:githublink:`%|py|31`
"""
if since is None:
since = datetime.now() - timedelta(365)
if issues is None and url is not None and max_issue is not None:
issues = [dict(url=url.format(k)) for k in range(max_issue, 0, -1)]
elif issues is None:
issues = call_github_api(owner, repo, 'issues?state=closed')
if len(issues) == 0:
raise ValueError("No issue found.")
for issue in issues:
if 'title' not in issue:
url = issue['url']
response = requests.get(url)
content = response.json()
if 'API rate limit exceeded' in content.get('message', ''):
warnings.warn('API rate limit exceeded', ResourceWarning)
break
else:
content = issue
closed = content.get('closed_at', None)
if closed is None:
continue
title = content['title']
closed = datetime.strptime(closed.strip('Z'), "%Y-%m-%dT%H:%M:%S")
number = content['number']
if closed < since:
break
if ("[WIP]" not in title and
"[remove]" not in title.lower() and
"[removed]" not in title.lower() and
"[DEL]" not in title and
"[WONT]" not in title and
"[SKIP]" not in title and
"[won't fix]" not in title.lower() and
"[WONTDO]" not in title):
yield number, closed, title
[docs]def build_history(owner, repo, name=None, since=None, issues=None, url=None,
max_issue=None, releases=None, unpublished=False,
existing_history=None, skip_issues=None, fLOG=None):
"""
Returns an history of a module.
:param owner: repo owner
:param repo: repository
:param name: None if ``name == repo``
:param since: not older than that date, if None,
do not go beyond a year
:param issues: see :func:`call_github_api <pyquickhelper.loghelper.github_api.call_github_api>` (unit test)
:param url: see :func:`call_github_api <pyquickhelper.loghelper.github_api.call_github_api>` (unit test)
:param max_issue: see :func:`call_github_api <pyquickhelper.loghelper.github_api.call_github_api>` (unit test)
:param releases: bypass :epkg:`pypi` (unit test)
:param unpublished: keep unpublished released
:param existing_history: existing history, retrieves existing issues stored
in that file
:param skip_issues: skip a given list of issues when building the history
:param fLOG: logging function
:return: iterator on issues ``(number, date, title)``
:githublink:`%|py|90`
"""
if since is None:
since = datetime.now() - timedelta(730)
if name is None:
name = repo
kept_issues = []
if existing_history is not None:
res = extract_issue_from_history(existing_history)
for k, v in sorted(res.items()):
if skip_issues is not None and k in skip_issues:
continue
kept_issues.append((k, v[0], v[1]))
for issue in enumerate_closed_issues(owner, repo, since, issues=issues,
url=url, max_issue=max_issue):
if skip_issues is not None and issue[0] in skip_issues:
continue
kept_issues.append(issue)
if fLOG:
fLOG("[build_history] ", name, issue[:2])
if len(kept_issues) == 0:
raise ValueError("No issue found.")
# remove duplicates
current = kept_issues
kept_issues = []
done = set()
for nb, dt, desc in current:
if nb not in done:
kept_issues.append((nb, dt, desc))
done.add(nb)
kept_issues.sort()
if releases is None:
versions = []
for date, version, size in enumerate_pypi_versions_date(name):
if date < since:
break
if fLOG:
fLOG("[build_history] ", name, version, date)
versions.append((date, version, size))
else:
versions = releases
if len(versions) == 0:
versions = [(datetime.now(), '0.0.0', 0)]
# merge
dates = [(v[0], "v", v) for v in versions]
dates.extend((i[1], "i", i) for i in kept_issues)
dates.sort(reverse=True)
merged = []
current = None
if unpublished:
current = dict(release="current", size=0,
date=datetime.now(), issues=[])
for _, v, obj in dates:
if v == 'v':
if current is not None:
merged.append(current)
current = dict(release=obj[1], size=obj[2], date=obj[0], issues=[])
elif v == 'i':
if current is not None:
issue = dict(title=obj[2], date=obj[1], number=obj[0])
current['issues'].append(issue)
if current is not None:
merged.append(current)
return merged
_template = """
.. _l-HISTORY:
=======
History
=======
{% for release in releases %}
{{ release['release'] }} - {{ release['date'].strftime("%Y-%m-%d") }} - {{ '%1.2fMb' % (release['size'] * 2**(-20)) }}
{{ '=' * (len(release['release']) + 22) }}
{% for issue in release['issues'] %}
* `{{issue['number']}}`: {{issue['title']}} ({{issue['date'].strftime("%Y-%m-%d")}}){% endfor %}
{% endfor %}
"""
[docs]def compile_history(releases, template=None):
"""
Compile history and produces a :epkg:`rst` file.
:param releases: output of :func:`build_history <pyquickhelper.loghelper.history_helper.build_history>`
:param template: :epkg:`jinja2` template (None means default one)
:return: output
:githublink:`%|py|185`
"""
if template is None:
global _template
template = _template
tmpl = Template(template)
return tmpl.render(releases=releases, len=len)
[docs]class open_stream_file:
"""
Opens a stream or a filename.
It works with keyword ``with``.
.. runpython::
:showcode:
from pyquickhelper.loghelper.history_helper import open_stream_file
from io import StringIO
st = StringIO("a\\nb")
with open_stream_file(st) as f:
for line in f.readlines():
print(line)
:githublink:`%|py|207`
"""
[docs] def __init__(self, name, mode="r", encoding="utf-8"):
"""
:param name: stream or filename
:param mode: open mode, works only if filename
:param encoding: encoding, works only if filename
:githublink:`%|py|214`
"""
self.name = name
self.mode = mode
self.encoding = encoding
[docs] def __enter__(self):
"""
Opens the stream or the file.
:githublink:`%|py|222`
"""
if hasattr(self, '_content'):
del self._content
if hasattr(self.name, "read"):
self.st = self.name
else:
self.st = open(self.name, self.mode, encoding=self.encoding)
return self
[docs] def __exit__(self, exception_type, exception_value, traceback):
"""
Leaves the stream or the filename.
:githublink:`%|py|234`
"""
if hasattr(self.name, "read"):
pass
else:
self.st.close()
if hasattr(self, '_content'):
del self._content
[docs] def read(self, size=None):
"""
Reads some bytes.
:param size: number of bytes or characters to read
:return: content
:githublink:`%|py|248`
"""
return self.st.read(size=size)
[docs] def readline(self):
"""
Basic implementation.
:return: next line
:githublink:`%|py|256`
"""
if hasattr(self.st, "readline"):
return self.st.readline()
else:
if hasattr(self, '_content'):
self._content = self.read().split('\n')
self._pos = 0
if self._pos >= len(self._content):
return None
res = self._content[self._pos]
self._pos += 1
return res
[docs] def readlines(self):
"""
Basic implementation.
:return: all text lines
:githublink:`%|py|274`
"""
if hasattr(self.st, "readlines"):
return self.st.readlines()
else:
line = self.readline()
lines = []
while line:
lines.append(line)
line = self.readline()
return lines