# -*- coding: utf-8 -*-
"""
Uses git to get version number.
:githublink:`%|py|6`
"""
import os
import sys
import datetime
import warnings
import xml.etree.ElementTree as ET
import re
from xml.sax.saxutils import escape
from ..flog import fLOG, run_cmd
[docs]class GitException(Exception):
"""
Exception raised by this module.
:githublink:`%|py|21`
"""
pass
[docs]def my_date_conversion(sdate):
"""
Converts a date into a datetime.
:param sdate: string
:return: date
:githublink:`%|py|31`
"""
first = sdate.split(" ")[0]
trois = first.replace(".", "-").replace("/", "-").split("-")
return datetime.datetime(int(trois[0]), int(trois[1]), int(trois[2]))
[docs]def IsRepo(location, commandline=True):
"""
Says if it a repository :epkg:`GIT`.
:param location: (str) location
:param commandline: (bool) use commandline or not
:return: bool
:githublink:`%|py|44`
"""
if location is None:
location = os.path.normpath(os.path.abspath(
os.path.join(os.path.split(__file__)[0], "..", "..", "..", "..")))
try:
get_repo_version(location, commandline, log=False)
return True
except Exception:
return False
[docs]class RepoFile:
"""
Mimic a :epkg:`GIT` file.
:githublink:`%|py|60`
"""
[docs] def __init__(self, **args):
"""
:param args: list of members to add
:githublink:`%|py|65`
"""
for k, v in args.items():
self.__dict__[k] = v
if hasattr(self, "name"):
if '"' in self.name: # pylint: disable=E0203
#defa = sys.stdout.encoding if sys.stdout != None else "utf8"
self.name = self.name.replace('"', "")
#self.name = self.name.encode(defa).decode("utf-8")
if "\\303" in self.name or "\\302" in self.name or "\\342" in self.name:
# don't know yet how to avoid that
name0 = self.name
# see http://www.utf8-chartable.de/unicode-utf8-table.pl?utf8=oct
# far from perfect
self.name = self.name.replace(r"\302\240", chr(160)) \
.replace(r"\302\246", "¦") \
.replace(r"\302\256", "®") \
.replace(r"\302\251", "©") \
.replace(r"\302\260", "°") \
.replace(r"\302\267", "·") \
.replace(r"\303\203", "Ã") \
.replace(r"\303\207", "Ç") \
.replace(r"\303\211", "e") \
.replace(r"\303\232", "Ú") \
.replace(r"\303\240", "à") \
.replace(r"\303\242", "â") \
.replace(r"\303\244", "ä") \
.replace(r"\303\246", "æ") \
.replace(r"\303\247", chr(231)) \
.replace(r"\303\250", chr(232)) \
.replace(r"\303\251", chr(233)) \
.replace(r"\303\252", "ê") \
.replace(r"\303\253", "ë") \
.replace(r"\303\256", "î") \
.replace(r"\303\257", "ï") \
.replace(r"\303\264", "ô") \
.replace(r"\303\266", "ö") \
.replace(r"\303\273", "û") \
.replace(r"\303\274", "ü") \
.replace(r"a\314\200", "à") \
.replace(r"e\314\201", "é") \
.replace(r"\342\200\231", "’")
if not os.path.exists(self.name):
try:
ex = os.path.exists(name0)
except ValueError as e:
ex = str(e)
warnings.warn(
"The modification did not work\n'{0}'\nINTO\n'{1}'\n[{2}\nexists: {3}]".format(
name0, self.name, [self.name], ex))
[docs] def __str__(self):
"""
usual
:githublink:`%|py|119`
"""
return self.name
[docs]def get_cmd_git():
"""
Gets the command line used to run :epkg:`git`.
:return: string
:githublink:`%|py|128`
"""
if sys.platform.startswith("win32"): # pragma: no cover
cmd = r'"C:\Program Files\Git\bin\git.exe"'
if not os.path.exists(cmd):
cmd = r'"C:\Program Files (x86)\Git\bin\git.exe"'
if not os.path.exists(cmd):
# hoping git path is included in environment variable PATH
cmd = "git"
else:
cmd = 'git'
return cmd
[docs]def repo_ls(full, commandline=True):
"""
Runs ``ls`` on a path.
:param full: full path
:param commandline: use command line instead of pysvn
:return: output of client.ls
:githublink:`%|py|148`
"""
if not commandline: # pragma: no cover
try:
raise NotImplementedError()
except Exception:
return repo_ls(full, True)
else:
cmd = get_cmd_git()
cmd += " ls-tree -r HEAD \"%s\"" % full
out, err = run_cmd(cmd,
wait=True,
encerror="strict",
encoding=sys.stdout.encoding if sys.stdout is not None else "utf8",
change_path=os.path.split(
full)[0] if os.path.isfile(full) else full,
shell=sys.platform.startswith("win32"))
if len(err) > 0:
raise GitException( # pragma: no cover
"Issue with path '{0}'\n[OUT]\n{1}\n[ERR]\n{2}".format(full, out, err))
res = [RepoFile(name=os.path.join(full, _.strip().split("\t")[-1]))
for _ in out.split("\n") if len(_) > 0]
return res
[docs]def __get_version_from_version_txt(path):
"""
Private function, tries to find a file ``version.txt`` which should
contains the version number (if :epkg:`svn` is not present).
:param path: folder to look, it will look to the the path of this file,
some parents directories and finally this path
:return: the version number
.. warning:: If ``version.txt`` was not found, it throws an exception.
:githublink:`%|py|184`
"""
file = os.path.split(__file__)[0]
paths = [file,
os.path.join(file, ".."),
os.path.join(file, "..", ".."),
os.path.join(file, "..", "..", ".."),
path]
for p in paths:
fp = os.path.join(p, "version.txt")
if os.path.exists(fp):
with open(fp, "r") as f:
return int(f.read().strip(" \n\r\t"))
raise FileNotFoundError(
"unable to find version.txt in\n" + "\n".join(paths))
_reg_insertion = re.compile("([1-9][0-9]*) insertion")
_reg_deletion = re.compile("([1-9][0-9]*) deletion")
_reg_bytes = re.compile("([1-9][0-9]*) bytes")
[docs]def get_file_details(name, path=None, commandline=True):
"""
Returns information about a file.
:param name: name of the file
:param path: path to repo
:param commandline: if True, use the command line to get the version number, otherwise it uses pysvn
:return: list of tuples
The result is a list of tuple:
* commit
* name
* added
* inserted
* bytes
:githublink:`%|py|221`
"""
if not commandline: # pragma: no cover
try:
raise NotImplementedError()
except Exception:
return get_file_details(name, path, True)
else:
cmd = get_cmd_git()
if sys.platform.startswith("win"):
cmd += ' log --stat "' + os.path.join(path, name) + '"'
else:
cmd = [cmd, 'log', "--stat", os.path.join(path, name)]
enc = sys.stdout.encoding if sys.stdout is not None else "utf8"
out, err = run_cmd(cmd,
wait=True,
encerror="strict",
encoding=enc,
change_path=os.path.split(
path)[0] if os.path.isfile(path) else path,
shell=sys.platform.startswith("win32"),
preprocess=False)
if len(err) > 0: # pragma: no cover
mes = "Problem with file '{0}'".format(os.path.join(path, name))
raise GitException(
mes + "\n" +
err + "\nCMD:\n" + cmd + "\nOUT:\n" + out + "\n[giterror]\n" +
err + "\nCMD:\n" + cmd)
master = get_master_location(path, commandline)
if master.endswith(".git"):
master = master[:-4]
if enc != "utf8" and enc is not None:
by = out.encode(enc)
out = by.decode("utf8")
# We split into commits.
commits = []
current = []
for line in out.split("\n"):
if line.startswith("commit"):
if len(current) > 0:
commits.append("\n".join(current))
current = [line]
else:
current.append(line)
if len(current) > 0:
commits.append("\n".join(current))
# We analyze each commit.
rows = []
for commit in commits:
se = _reg_insertion.findall(commit)
if len(se) > 1:
raise Exception( # pragma: no cover
"A commit is wrong \n{0}".format(commit))
inser = int(se[0]) if len(se) == 1 else 0
de = _reg_deletion.findall(commit)
if len(de) > 1:
raise Exception( # pragma: no cover
"A commit is wrong \n{0}".format(commit))
delet = int(de[0]) if len(de) == 1 else 0
bi = _reg_bytes.findall(commit)
if len(bi) > 1:
raise Exception( # pragma: no cover
"A commit is wrong \n{0}".format(commit))
bite = int(bi[0]) if len(bi) == 1 else 0
com = commit.split("\n")[0].split()[1]
rows.append((com, name.strip(), inser, delet, bite))
return rows
_reg_stat_net = re.compile("(.+) *[|] +([1-9][0-9]*)")
_reg_stat_bytes = re.compile(
"(.+) *[|] Bin ([0-9]+) [-][>] ([0-9]+) bytes")
[docs]def get_file_details_all(path=None, commandline=True):
"""
Returns information about all files
:param path: path to repo
:param commandline: if True, use the command line to get the version number, otherwise it uses pysvn
:return: list of tuples
The result is a list of tuple:
* commit
* name
* net
* bytes
:githublink:`%|py|314`
"""
if not commandline: # pragma: no cover
try:
raise NotImplementedError()
except Exception:
return get_file_details_all(path, True)
else:
cmd = get_cmd_git()
if sys.platform.startswith("win"):
cmd += ' --no-pager log --stat'
else:
cmd = [cmd, '--no-pager', 'log', "--stat"]
enc = sys.stdout.encoding if sys.stdout is not None else "utf8"
out, err = run_cmd(cmd,
wait=True,
encerror="strict",
encoding=enc,
change_path=os.path.split(
path)[0] if os.path.isfile(path) else path,
shell=sys.platform.startswith("win32"),
preprocess=False)
if len(err) > 0: # pragma: no cover
mes = "Problem with '{0}'".format(path)
raise GitException(
mes + "\n" +
err + "\nCMD:\n" + cmd + "\nOUT:\n" + out + "\n[giterror]\n" +
err + "\nCMD:\n" + cmd)
master = get_master_location(path, commandline)
if master.endswith(".git"):
master = master[:-4]
if enc != "utf8" and enc is not None:
by = out.encode(enc)
out = by.decode("utf8")
# We split into commits.
commits = []
current = []
for line in out.split("\n"):
if line.startswith("commit"):
if len(current) > 0:
commits.append("\n".join(current))
current = [line]
else:
current.append(line)
if len(current) > 0:
commits.append("\n".join(current))
# We analyze each commit.
rows = []
for commit in commits:
com = commit.split("\n")[0].split()[1]
lines = commit.split("\n")
for line in lines:
r1 = _reg_stat_net.search(line)
if r1:
name = r1.groups()[0].strip()
net = int(r1.groups()[1])
delta = 0
else:
net = 0
r2 = _reg_stat_bytes.search(line)
if r2:
name = r2.groups()[0].strip()
fr = int(r2.groups()[1])
to = int(r2.groups()[2])
delta = to - fr
else:
continue
rows.append((com, name, net, delta))
return rows
[docs]def get_repo_log(path=None, file_detail=False, commandline=True, subset=None):
"""
Gets the latest changes operated on a file in a folder or a subfolder.
:param path: path to look
:param file_detail: if True, add impacted files
:param commandline: if True, use the command line to get the version number, otherwise it uses pysvn
:param subset: only provide file details for a subset of files
:return: list of changes, each change is a list of tuple (see below)
The return results is a list of tuple with the following fields:
- author
- commit hash [:6]
- date (datetime)
- comment$
- full commit hash
- link to commit (if the repository is http://...)
The function use a command line if an error occurred.
It uses the xml format:
::
<logentry revision="161">
<author>xavier dupre</author>
<date>2013-03-23T15:02:50.311828Z</date>
<msg>pyquickhelper: first version</msg>
<hash>full commit hash</hash>
</logentry>
Add link:
::
https://github.com/sdpython/pyquickhelper/commit/8d5351d1edd4a8997f358be39da80c72b06c2272
More: `git pretty format <http://opensource.apple.com/source/Git/Git-19/src/git-htmldocs/pretty-formats.txt>`_
See also `pretty format <https://www.kernel.org/pub/software/scm/git/docs/git-log.html#_pretty_formats>`_ (html).
To get details about one file and all the commit.
::
git log --stat -- _unittests/ut_loghelper/data/sample_zip.zip
For some reason, the call to :func:`str2datetime <pyquickhelper.loghelper.convert_helper.str2datetime>` seemed to cause exception such as::
File "<frozen importlib._bootstrap>", line 2212, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 321, in _call_with_frames_removed
File "<frozen importlib._bootstrap>", line 2254, in _gcd_import
File "<frozen importlib._bootstrap>", line 2237, in _find_and_load
File "<frozen importlib._bootstrap>", line 2224, in _find_and_load_unlocked
when it was used to generate documentation for others modules than *pyquickhelper*.
Not using this function helps. The cause still remains obscure.
:githublink:`%|py|445`
"""
if file_detail:
if subset is None:
res = get_file_details_all(path, commandline=commandline)
details = {}
for commit in res:
com = commit[0]
if com not in details:
details[com] = []
details[com].append(commit[1:])
else:
files = subset
details = {}
for i, name in enumerate(files):
res = get_file_details(name.name if isinstance(name, RepoFile) else name,
path, commandline=commandline)
for commit in res:
com = commit[0]
if com not in details:
details[com] = []
details[com].append(commit[1:])
logs = get_repo_log(path=path, file_detail=False,
commandline=commandline)
final = []
for log in logs:
com = log[4]
if com not in details:
continue
det = details[com]
for d in det:
final.append(tuple(log) + d)
return final
if path is None:
path = os.path.normpath(
os.path.abspath(os.path.join(os.path.split(__file__)[0], "..", "..", "..")))
if not commandline: # pragma: no cover
try:
raise NotImplementedError()
except Exception:
return get_repo_log(path, file_detail, True)
else:
cmd = get_cmd_git()
if sys.platform.startswith("win"): # pragma: no cover
cmd += ' log --pretty=format:"<logentry revision=\\"%h\\">' + \
'<author>%an</author><date>%ci</date><hash>%H</hash><msg>%s</msg></logentry>" ' + \
path
else:
cmd_tmp = '--pretty=format:<logentry revision="%h"><author>%an</author><date>%ci' + \
'</date><hash>%H</hash><msg>%s</msg></logentry>'
cmd = [cmd, 'log', cmd_tmp, path]
enc = sys.stdout.encoding if sys.stdout is not None else "utf8"
out, err = run_cmd(cmd, wait=True, encerror="strict", encoding=enc,
change_path=os.path.split(
path)[0] if os.path.isfile(path) else path,
shell=sys.platform.startswith("win32"), preprocess=False)
if len(err) > 0: # pragma: no cover
mes = "Problem with file '{0}'".format(path)
raise GitException(mes + "\n" +
err + "\nCMD:\n" + cmd + "\nOUT:\n" + out +
"\n[giterror]\n" + err + "\nCMD:\n" + cmd)
master = get_master_location(path, commandline)
if master.endswith(".git"):
master = master[:-4]
if enc != "utf8" and enc is not None:
by = out.encode(enc)
out = by.decode("utf8")
out = out.replace("\n\n", "\n")
out = "<xml>\n%s\n</xml>" % out
try:
root = ET.fromstring(out)
except ET.ParseError:
# it might be due to character such as << >>
lines = out.split("\n")
out = []
suffix = "</msg></logentry>"
for line in lines:
if line.endswith(suffix):
pos = line.find("<msg>")
if pos == -1:
out.append(line)
continue
begin = line[:pos + 5]
body = line[pos + 5:-len(suffix)]
msg = escape(body)
line = begin + msg + suffix
out.append(line)
out = "\n".join(out)
try:
root = ET.fromstring(out)
except ET.ParseError as eee: # pragma: no cover
raise GitException(
"Unable to parse:\n{0}".format(out)) from eee
res = []
for i in root.iter('logentry'):
revision = i.attrib['revision'].strip()
author = i.find("author").text.strip()
t = i.find("msg").text
hash = i.find("hash").text
msg = t.strip() if t is not None else "-"
sdate = i.find("date").text.strip()
dt = my_date_conversion(sdate.replace("T", " ").strip("Z "))
row = [author, revision, dt, msg, hash]
if master.startswith("http"):
row.append(master + "/commit/" + hash)
else:
row.append("{0}//{1}".format(master, hash))
res.append(row)
return res
[docs]def get_repo_version(path=None, commandline=True, usedate=False, log=False):
"""
Gets the latest check for a specific path or version number
based on the date (if *usedate* is True).
If *usedate* is False, it returns a mini hash (a string then).
:param path: path to look
:param commandline: if True, use the command line to get the version number, otherwise it uses pysvn
:param usedate: if True, it uses the date to return a minor version number (1.1.thisone)
:param log: if True, returns the output instead of a boolean
:return: integer)
:githublink:`%|py|574`
"""
if not usedate:
last = get_nb_commits(path, commandline)
return last
else: # pragma: no cover
if path is None:
path = os.path.normpath(
os.path.abspath(os.path.join(os.path.split(__file__)[0], "..", "..", "..")))
if not commandline:
try:
raise NotImplementedError()
except Exception:
return get_repo_version(path, True)
else:
cmd = get_cmd_git()
cmd += ' git log --format="%h---%ci"'
if path is not None:
cmd += " \"%s\"" % path
try:
out, err = run_cmd(cmd, wait=True, encerror="strict",
encoding=sys.stdout.encoding if sys.stdout is not None else "utf8",
change_path=os.path.split(
path)[0] if os.path.isfile(path) else path,
log_error=False, shell=sys.platform.startswith("win32"))
except Exception as e:
raise GitException(
"Problem with subprocess. Path is '{0}'\n[OUT]\n{1}\n[ERR]\n{2}".format(path, out, err)) from e
if len(err) > 0:
if log:
fLOG("Problem with file ", path, err)
if log:
return "OUT\n{0}\n[giterror]{1}\nCMD:\n{2}".format(out, err, cmd)
else:
raise GitException(
"OUT\n{0}\n[giterror]{1}\nCMD:\n{2}".format(out, err, cmd))
lines = out.split("\n")
lines = [_.split("---") for _ in lines if len(_) > 0]
temp = lines[0]
if usedate:
dt = my_date_conversion(temp[1].replace("T", " ").strip("Z "))
dt0 = datetime.datetime(dt.year, 1, 1, 0, 0, 0)
res = "%d" % (dt - dt0).days
else:
res = temp[0]
if len(res) == 0:
raise GitException(
"The command 'git help' should return something.")
return res
[docs]def get_master_location(path=None, commandline=True):
"""
Gets the remote master location.
:param path: path to look
:param commandline: if True, use the command line to get the version number, otherwise it uses pysvn
:return: integer (check in number)
:githublink:`%|py|638`
"""
if path is None:
path = os.path.normpath(
os.path.abspath(os.path.join(os.path.split(__file__)[0], "..", "..", "..")))
if not commandline: # pragma: no cover
try:
raise NotImplementedError()
except Exception:
return get_master_location(path, True)
else:
cmd = get_cmd_git()
cmd += " config --get remote.origin.url"
try:
out, err = run_cmd(cmd, wait=True, encerror="strict",
encoding=sys.stdout.encoding if sys.stdout is not None else "utf8",
change_path=os.path.split(
path)[0] if os.path.isfile(path) else path,
log_error=False, shell=sys.platform.startswith("win32"))
except Exception as e: # pragma: no cover
raise GitException(
"Problem with subprocess. Path is '{0}'\n[OUT]\n{1}\n[ERR]\n{2}".format(path, out, err)) from e
if len(err) > 0:
raise GitException( # pragma: no cover
"Problem with path '{0}'\n[OUT]\n{1}\n[ERR]\n{2}".format(path, out, err))
lines = out.split("\n")
lines = [_ for _ in lines if len(_) > 0]
res = lines[0]
if len(res) == 0:
raise GitException( # pragma: no cover
"The command 'git help' should return something.")
return res
[docs]def get_nb_commits(path=None, commandline=True):
"""
Returns the number of commit.
:param path: path to look
:param commandline: if True, use the command line to get the version number, otherwise it uses pysvn
:return: integer
:githublink:`%|py|683`
"""
if path is None:
path = os.path.normpath(
os.path.abspath(os.path.join(os.path.split(__file__)[0], "..", "..", "..")))
if not commandline: # pragma: no cover
try:
raise NotImplementedError()
except Exception:
return get_nb_commits(path, True)
else:
cmd = get_cmd_git()
cmd += ' rev-list HEAD --count'
if path is not None:
cmd += " \"%s\"" % path
out, err = run_cmd(cmd,
wait=True,
encerror="strict",
encoding=sys.stdout.encoding if sys.stdout is not None else "utf8",
change_path=os.path.split(
path)[0] if os.path.isfile(path) else path,
log_error=False,
shell=sys.platform.startswith("win32"))
if len(err) > 0:
raise GitException( # pragma: no cover
"Unable to get commit number from path {0}\n[giterror]\n{1}\nCMD:\n{2}".format(path, err, cmd))
lines = out.strip()
try:
nb = int(lines)
except ValueError as e:
raise ValueError( # pragma: no cover
"unable to parse: " + lines + "\nCMD:\n" + cmd) from e
return nb
[docs]def get_file_last_modification(path, commandline=True):
"""
Returns the last modification of a file.
:param path: path to look
:param commandline: if True, use the command line to get the version number, otherwise it uses pysvn
:return: integer
:githublink:`%|py|729`
"""
if path is None:
path = os.path.normpath(
os.path.abspath(os.path.join(os.path.split(__file__)[0], "..", "..", "..")))
if not commandline: # pragma: no cover
try:
raise NotImplementedError()
except Exception:
return get_file_last_modification(path, True)
else:
cmd = get_cmd_git()
cmd += ' log -1 --format="%ad" --'
cmd += " \"%s\"" % path
out, err = run_cmd(cmd,
wait=True,
encerror="strict",
encoding=sys.stdout.encoding if sys.stdout is not None else "utf8",
change_path=os.path.split(
path)[0] if os.path.isfile(path) else path,
log_error=False,
shell=sys.platform.startswith("win32"))
if len(err) > 0:
raise GitException( # pragma: no cover
"Unable to get commit number from path {0}\n[giterror]\n{1}\nCMD:\n{2}".format(path, err, cmd))
lines = out.strip("\n\r ")
return lines
[docs]def clone(location, srv, group, project, username=None, password=None, fLOG=None):
"""
Clones a :epkg:`git` repository.
:param location: location of the clone
:param srv: git server
:param group: group
:param project: project name
:param username: username
:param password: password
:param fLOG: logging function
:return: output, error
See `How to provide username and password when run "git clone git@remote.git"?
<http://stackoverflow.com/questions/10054318/how-to-provide-username-and-password-when-run-git-clone-gitremote-git>`_
.. exref::
:title: Clone a git repository
::
clone("local_folder", "github.com", "sdpython", "pyquickhelper")
:githublink:`%|py|783`
"""
if username is not None:
address = "https://{0}:{1}@{2}/{3}/{4}.git".format(username,
password, srv, group, project)
else:
address = "https://{0}/{1}/{2}.git".format(srv, group, project)
cmd = get_cmd_git()
cmd += " clone " + address + " " + location
out, err = run_cmd(cmd, wait=True, fLOG=fLOG)
if len(err) > 0 and "Cloning into" not in err and "Clonage dans" not in err:
raise GitException( # pragma: no cover
"Unable to clone {0}\n[giterror]\n{1}\nCMD:\n{2}".format(address, err, cmd))
return out, err
[docs]def rebase(location, srv, group, project, username=None, password=None, fLOG=None):
"""
Runs ``git pull -rebase`` on a repository.
:param location: location of the clone
:param srv: git server
:param group: group
:param project: project name
:param username: username
:param password: password
:param fLOG: logging function
:return: output, error
:githublink:`%|py|811`
"""
if username is not None:
address = "https://{0}:{1}@{2}/{3}/{4}.git".format(username,
password, srv, group, project)
else:
address = "https://{0}/{1}/{2}.git".format(srv, group, project)
cwd = os.getcwd()
os.chdir(location)
cmd = get_cmd_git()
cmd += " pull --rebase " + address
out, err = run_cmd(cmd, wait=True, fLOG=fLOG)
os.chdir(cwd)
if len(err) > 0 and "-> FETCH_HEAD" not in err:
raise GitException( # pragma: no cover
"Unable to rebase {0}\n[giterror]\n{1}\nCMD:\n{2}".format(address, err, cmd))
return out, err