"""
Some automation helpers to grab mails from students about their projects.
:githublink:`%|py|5`
"""
import re
import os
import sys
import json
import textwrap
import warnings
import zipfile
from urllib.parse import urlparse
import numpy
from pyquickhelper.loghelper import noLOG
from pyquickhelper.texthelper import remove_diacritics
from pyquickhelper.filehelper import remove_folder, explore_folder_iterfile
from pyquickhelper.filehelper import (
unzip_files, zip_files, ungzip_files, un7zip_files, unrar_files,
untar_files
)
from pyquickhelper.helpgen import nb2html
from pyquickhelper.ipythonhelper import upgrade_notebook
from pymmails import EmailMessageRenderer, EmailMessage
from .repository_exception import RegexRepositoryException, TooManyProjectsException
from ..td_1a import edit_distance
from ..homeblog.python_exemple_py_to_html import py_to_html_file
[docs]class ProjectsRepository:
"""
Handle a repository of students projects.
See example :ref:`sphx_glr_automation_fetch_student_projects_from_gmail.py`.
:githublink:`%|py|33`
"""
[docs] class MailNotFound(Exception):
"""
Raises an exception if mail not found.
:githublink:`%|py|38`
"""
pass
_email_regex = re.compile("[*] *e?mails? *: *([^*+\\n]+)")
_gitlab_regex = re.compile("[*] *gitlab *: *([^*+\\n]+[.]git)")
_video_regex = re.compile("[*] *videos? *: *([^*\\n]+)")
[docs] def __init__(self, location, suivi="suivi.rst", fLOG=noLOG):
"""
Location of the repository.
:param location: location of the repository
:param suivi: name of the file gathering information about each project
:githublink:`%|py|51`
"""
self._location = location
self._suivi = suivi
self.fLOG = fLOG
@property
def Location(self):
"""
:return: location of the repository
:githublink:`%|py|60`
"""
return self._location
@property
def Groups(self):
"""
Returns all available groups in the repository.
:githublink:`%|py|67`
"""
return [_ for _ in os.listdir(self._location)
if os.path.isdir(os.path.join(self._location, _))]
[docs] def get_group_location(self, group):
"""
Returns the local folder associated to a group.
:param group: group name
:return: local folder
:githublink:`%|py|77`
"""
return os.path.join(self._location, group)
[docs] @staticmethod
def get_regex(path, regex, suivi="suivi.rst", skip_if_empty=False):
"""
Retrieves data from file ``suivi.rst`` using a regular expression.
:param path: sub folder to look into
:param suivi: name of the file ``suivi.rst``
:param skip_if_empty: skip of no mail?
:return: list of mails
:githublink:`%|py|89`
"""
if not os.path.exists(path):
raise FileNotFoundError(path) # pragma: no cover
filename = os.path.join(path, suivi)
if not os.path.exists(filename):
raise FileNotFoundError(filename) # pragma: no cover
try:
with open(filename, "r", encoding="utf8") as f:
content = f.read()
except UnicodeDecodeError as e:
raise ValueError( # pragma: no cover
'unable to parse file:\n File "{0}", line 1'.format(filename)) from e
mails = regex.findall(content)
if len(mails) == 0:
if skip_if_empty:
return []
raise RuntimeError( # pragma: no cover
"Unable to find the regular expression '{0}' in '{1}'".format(
regex.pattern, filename))
allmails = []
for m in mails:
allmails.extend(m.strip("\n\r\t ").split(";"))
return [_.strip() for _ in allmails for _ in allmails]
[docs] def get_emails(self, group, skip_if_empty=False):
"""
Retrieves student emails from file ``suivi.rst``.
:param group: group
:param skip_if_empty: skip if no mail?
:return: list of mails
:githublink:`%|py|124`
"""
path = os.path.join(self._location, group)
allmails = ProjectsRepository.get_regex(path,
ProjectsRepository._email_regex, self._suivi,
skip_if_empty=skip_if_empty)
for a in allmails:
if "\n" in a:
raise ValueError( # pragma: no cover
"unable to interpret " + str([a]) + " from path " + path)
ff = a.split("@")
if len(ff) != 2:
raise RegexRepositoryException( # pragma: no cover
"unable to understand mail {0} in {1} (suivi={2} (mail separator is ;)".format(
a,
path,
self._suivi))
return allmails
[docs] def get_videos(self, group):
"""
Retrieves student emails from file ``suivi.rst``.
:param group: group
:return: list of videos
:githublink:`%|py|148`
"""
return ProjectsRepository.get_regex(group, ProjectsRepository._video_regex, self._suivi)
[docs] def get_sections(self, group):
"""
Extracts sections from a filename used to follow a group of students.
:param group: group
:return: dictionary { section : content }
Example of a file::
rapport
+++++++
* bla 1
extrait
+++++++
::
paragraphe 1
paragraphe 2
:githublink:`%|py|174`
"""
path = os.path.join(self._location, group)
if not os.path.exists(path):
raise FileNotFoundError(path) # pragma: no cover
filename = os.path.join(path, self._suivi)
if not os.path.exists(filename):
raise FileNotFoundError(filename) # pragma: no cover
try:
with open(filename, "r", encoding="utf8") as f:
content = f.read()
except UnicodeDecodeError as e:
raise ValueError( # pragma: no cover
'unable to parse file:\n File "{0}", line 1'.format(filename)) from e
lines = [_.strip("\r").rstrip() for _ in content.split("\n")]
added_in = []
sections = {"": []}
title = ""
for i, line in enumerate(lines):
if len(line) == 0:
sections[title].append(line)
added_in.append(title)
else:
f = line[0]
if f == " ":
if title is not None:
sections[title].append(line)
added_in.append(title)
else:
sections[""].append(line)
added_in.append("")
elif f in "=+-":
if line == f * len(line):
title = lines[i - 1]
if len(added_in) > 0:
t = added_in[-1]
sections[t] = sections[t][:-1]
added_in[-1] = title
if f == "=":
sections["title"] = [title]
added_in.append("title")
title = "title"
else:
sections[title] = []
added_in.append(title)
else:
sections[title].append(line)
added_in.append(title)
else:
sections[title].append(line)
added_in.append(title)
return sections
_regex_split = re.compile("[-;,. @]")
[docs] @staticmethod
def match_mail(name, emails, threshold=3, exc=True):
"""
Tries to match a name among a list of mails.
:param name: a name (first name last name separated by a space)
:param emails: list of emails
:param threshold: above this threshold, mails and names don't match
:param exc: raise an Exception if not found
:return: list of available mails, boolean
The second results is True if no email were found in the list.
:githublink:`%|py|243`
"""
# we check the easy case
if isinstance(name, float):
name = str(name) if not numpy.isnan(name) else ""
if name in emails:
return [(0, name)]
pieces = [_.strip() for _ in ProjectsRepository._regex_split.split(
remove_diacritics(name.lower()))]
pieces.sort()
pieces = " ".join(pieces)
res = []
for email in emails:
spl = [_.strip() for _ in ProjectsRepository._regex_split.split(
remove_diacritics(email.split("@")[0].lower()))]
spl.sort()
mail = " ".join(spl)
d = edit_distance(mail, pieces)[0]
res.append((d, email))
res = [_ for _ in res if _[0] <= threshold]
res.sort()
if exc and len(res) == 0:
raise ProjectsRepository.MailNotFound( # pragma: no cover
"unable to find a mail for {0} among\n{1}".format(name, "\n".join(emails)))
return res
[docs] @staticmethod
def match_mails(names, emails, threshold=3, exc=True, skip_names=None):
"""
Tries to match a series of names among a list of mails.
:param names: list of names (first name last name separated by a space)
:param emails: list of emails
:param threshold: above this threshold, mails and names don't match
:param exc: raise an Exception if not found
:param skip_names: the second boolean is True is one of the name
belongs to this list
:return: list of available mails, boolean
The second results is True if no email were found in the list.
:githublink:`%|py|283`
"""
res = []
skip = False
for name in names:
if skip_names is not None and name in skip_names:
skip = True
r = ProjectsRepository.match_mail(name, emails, threshold, exc)
res.extend([_[1] for _ in r])
return res, skip
[docs] @staticmethod
def create_folders_from_dataframe(df, root, report="suivi.rst", col_student=None, col_group="Groupe",
col_subject="Sujet", col_mail="mail", overwrite=False, email_function=None,
must_have_email=True, skip_if_nomail=False, skip_names=None,
fLOG=noLOG):
"""
Creates a series of folders for groups of students.
:param root: where to create the folders
:param col_student: column which contains the student name (firt name + last name),
equal to *col_mail* if *None*
:param col_group: index of the group (it can be *None* if each student is a group)
:param col_subject: column which contains the subject
:param col_mail: if there is a column which contains the mail in the input dataframe
:param df: DataFrame
:param email_function: function which infers email from first and last names, see below
:param report: report file
:param overwrite: if False, skip if the report already exists
:param must_have_email: if True, raises an exception if no mail is found
:param skip_if_nomail: skip a name if no mail is found
:param skip_names: less checking for a given set of names
:param fLOG: logging function
:return: list of creates folders
The function *email_function* has the following signature::
def email_function(names):
# part of a names is a list of tokens
# ...
return list of mails, skip=boolean
The boolean tells the function to skip this group.
*email_function* can be a list of mails. In that case,
this function is replaced by :meth:`match_mails <ensae_teaching_cs.automation_students.projects_repository.ProjectsRepository.match_mails>`.
:githublink:`%|py|327`
"""
if col_mail is None and email_function is None:
raise ValueError( # pragma: no cover
"col_mail cannot be None if email_function is None")
if col_student is None:
col_student = col_mail
def local_email_function(names, skip_names):
return ProjectsRepository.match_mails(names, email_function,
exc=False, skip_names=skip_names)
def local_email_function_column(names, skip_names, mapping):
res = []
skip = False
for name in names:
if skip_names is not None and name in skip_names:
skip = True
r = mapping.get(name, None)
if r:
res.append(r)
return res, skip
if isinstance(email_function, (list, set)):
if col_mail is None:
local_function = local_email_function
else:
try:
ind_student = list(df.columns).index(col_student) + 1
ind_mail = list(df.columns).index(col_mail) + 1
except ValueError as e:
raise ValueError( # pragma: no cover
"Unable to find '{0}' or '{1}' in {2}".format(
col_student, col_mail, df.columns)) from e
mapping = {}
for row in df.itertuples():
mapping[row[ind_student]] = row[ind_mail]
local_function = \
lambda names, skip, mp=mapping: \
local_email_function_column(names, skip_names, mp)
else:
local_function = email_function
def ul(last):
res = ""
for i, c in enumerate(last):
if c == " ":
res += "."
elif c == "-":
res += "."
elif c == '@':
break
else:
res += c
return res
folds = []
if df.shape[1] == 0:
raise Exception("No column in the dataframe.") # pragma: no cover
if col_group:
gr = df.groupby(col_group)
else:
df2 = df.copy()
df2["gid"] = df.index
df2["gid2"] = df2.gid.apply(lambda x: "G%d" % x)
gr = df2.groupby("gid2")
fLOG("[ProjectsRepository.create_folders_from_dataframe] number of groups {0}".format(
len(gr)))
for name, group in gr:
if col_subject:
s = list(set(group[col_subject].copy()))
s = [_ for _ in s if not isinstance(
_, float) or ~numpy.isnan(_)]
if len(s) > 1:
raise TooManyProjectsException( # pragma: no cover
"more than one subject for group: " + str(name) + "\n" + str(s))
elif len(s) == 0:
s = ["unknown"]
subject = s[0]
else:
subject = None
eleves = list(group[col_student])
eleves.sort()
if email_function is not None:
mails, skip = local_function(eleves, skip_names)
if must_have_email and (not skip and len(mails) == 0):
# we skip only if a group has no mails at all
if isinstance(email_function, (list, set)):
mes = "unable to find a mail for\n{0}\nname={1}\nskip:{4}\n{5}\namong\n{3}\nGROUP\n{2}\nlocal_function: {6}"
raise ProjectsRepository.MailNotFound( # pragma: no cover
mes.format("; ".join("'%s'" % _ for _ in eleves),
name, group, "\n".join(email_function),
skip, skip_names, local_function))
raise ProjectsRepository.MailNotFound( # pragma: no cover
"unable to find a mail for {0}\nname={1}\n with function\n{3}\nGROUP\n{2}\nTYPE:\n{4}".format(
" ;".join(eleves), name, group, email_function, type(email_function)))
if skip_if_nomail and (not skip and len(mails) == 0):
fLOG("[ProjectsRepository.create_folders_from_dataframe] skipping {0}".format(
"; ".join(eleves)))
continue
if mails:
for m in mails:
if "@" not in m:
raise ValueError( # pragma: no cover
"mails contains a mail with no @: {0}".format(m))
if "<" in m or ">" in m:
raise ValueError( # pragma: no cover
"one mail contains weird characters: {0}".format(m))
jmail = "; ".join(mails)
else:
jmail = None
else:
jmail = None
if jmail is not None:
if "@" not in jmail:
raise ValueError( # pragma: no cover
"jmail does not contain any @: {0}".format(jmail))
members = ", ".join(map(str, eleves))
content = [members]
content.append("=" * len(members))
content.append("")
content.append("* members: {0}".format(members))
if subject:
content.append("* subject: {0}".format(subject))
content.append("* G: {0}".format(name))
if jmail:
content.append("* mails: " + jmail)
content.append("")
content.append("")
last = "-".join(ul(a) for a in sorted(map(str, eleves)))
folder = os.path.join(root, last)
filename = os.path.join(folder, report)
if not os.path.exists(folder):
if '@' in folder:
raise ValueError( # pragma: no cover
"Folder '{0}' must not contain '@'.".format(folder))
os.mkdir(folder)
if overwrite or not os.path.exists(filename):
with open(filename, "w", encoding="utf8") as f:
f.write("\n".join(content))
folds.append(folder)
proj = ProjectsRepository(root, suivi=report, fLOG=fLOG)
if must_have_email:
for gr in proj.Groups:
mails = proj.get_emails(gr)
if len(mails) == 0:
raise ValueError( # pragma: no cover
"No mail for group '{0}'.".format(gr))
return proj
[docs] def enumerate_group_mails(self, group, mailbox, subfolder, date=None,
skip_function=None, max_dest=5):
"""
Enumerates all mails sent by or sent to a given group.
:param group: group (if None, goes through all mails)
:param mailbox: mailbox (see `pymmails <http://www.xavierdupre.fr/app/pymmails/helpsphinx/>`_)
:param subfolder: which subfolder of the mailbox to look into
:param date: date
:param skip_function: if not None, use this function on the header/body to avoid loading the entire message (and skip it)
:param max_dest: maximum number of receivers
:return: iterator on mails
:githublink:`%|py|506`
"""
if group is None:
for group_ in self.Groups:
self.fLOG(
"[ProjectsRepository.enumerate_group_mails] group='{0}'".format(group_))
iter = self.enumerate_group_mails(group_, mailbox, subfolder=subfolder,
date=date, skip_function=skip_function, max_dest=max_dest)
for mail in iter:
yield mail
else:
mails = self.get_emails(group)
self.fLOG("[ProjectsRepository.enumerate_group_mails] mails='{0}' folder='{1}' date={2}".format(
str(mails), subfolder, date))
iter = mailbox.enumerate_search_person(
person=mails,
folder=subfolder,
skip_function=skip_function,
date=date,
max_dest=5)
for mail in iter:
yield mail
[docs] def dump_group_mails(self, renderer, group, mailbox, subfolder, date=None,
skip_function=None, max_dest=5, filename="index_mails.html",
overwrite=False, skip_if_empty=False, convert_files=False):
"""
Enumerates all mails sent by or sent to a given group.
:param renderer: instance of class `EmailMessageListRenderer
<http://www.xavierdupre.fr/app/pymmails/helpsphinx/pymmails/render/
email_message_list_renderer.html>`_
:param group: group
:param mailbox: mailbox (see `pymmails <http://www.xavierdupre.fr/app/pymmails/helpsphinx/>`_)
:param subfolder: which subfolder of the mailbox to look into
:param date: date
:param skip_function: if not None, use this function on the header/body to avoid loading
the entire message (and skip it)
:param max_dest: maximum number of receivers
:param filename: filename which gathers a link to every mail
:param overwrite: overwrite
:param skip_if_empty: skip if no mail?
:param convert_files: unzip and convert
:return: list of files (see `EmailMessageListRenderer.write
<http://www.xavierdupre.fr/app/pymmails/helpsphinx/pymmails/render/
email_message_list_renderer.html>`_)
zip, gz, rar, 7z can be uncompressed.
It then convert *.py* and *.ipynb* into html.
:githublink:`%|py|554`
"""
if group is None:
res = []
for group_ in self.Groups:
r = self.dump_group_mails(renderer, group_, mailbox, subfolder=subfolder,
date=date, skip_function=skip_function, max_dest=max_dest,
overwrite=overwrite, skip_if_empty=skip_if_empty,
convert_files=convert_files)
res.extend(r)
return res
else:
mails = self.get_emails(group, skip_if_empty=skip_if_empty)
if skip_if_empty and len(mails) == 0:
self.fLOG("[ProjectsRepository.dump_group_mails] SKIP group='{0}' folder='{1}' date={2} mails={3}".format(
group, subfolder, date, str(mails)))
return []
else:
self.fLOG("[ProjectsRepository.dump_group_mails] group='{0}' folder='{1}' date={2} mails={3}".format(
group, subfolder, date, str(mails)))
def iter_mail(body=True):
return mailbox.enumerate_search_person(person=mails, folder=subfolder,
skip_function=skip_function, date=date,
max_dest=max_dest, body=body)
nbmails = len(self.list_mails(group))
nbcur = len(list(iter_mail(body=False)))
if nbmails != nbcur:
overwrite = True
self.fLOG("[dump_group_mails] group='{0}' - new mails".format(
group), nbcur, "<", "nbmails")
iter = iter_mail(body=True)
location = self.get_group_location(group)
r = renderer.write(iter=iter, location=location,
filename=filename, overwrite=overwrite,
file_jsatt="_summaryattachements_raw.json",
attach_folder="attachments")
renderer.flush()
# attachments in JSON format
json_att = []
metadata = {}
for name in self.enumerate_group_files(group):
if "attachments" not in name or not name.endswith('.metadata'):
continue
sname = os.path.relpath(name, location).replace("\\", "/")
metadata[sname[:-9]] = sname
for name in self.enumerate_group_files(group):
if "attachments" not in name or name.endswith('.metadata'):
continue
sname = os.path.relpath(name, location).replace("\\", "/")
info = dict(a=sname, name=sname)
if sname in metadata:
info['info'] = '<a href="{0}">metadata</a>'.format(
metadata[sname])
json_att.append(info)
if convert_files:
converted = self.unzip_convert(group)
for conv in converted:
sconv = os.path.relpath(conv, location).replace("\\", "/")
json_att.append(
dict(a=sconv, name=sconv, unzip_convert='Yes'))
file_jsatt = os.path.join(location, "_summaryattachements.json")
if json_att and not renderer.BufferWrite.exists(file_jsatt, local=not overwrite):
f = renderer.BufferWrite.open(
file_jsatt, text=True, encoding='utf-8')
js = json.dumps(json_att)
f.write(js)
return r
[docs] def remove_group(self, group):
"""
Removes a group.
:param group: group
:return: list of removed files
See `remove_folder <http://www.xavierdupre.fr/app/pyquickhelper/helpsphinx/
pyquickhelper/filehelper/synchelper.html#module-pyquickhelper.filehelper.synchelper>`_.
:githublink:`%|py|639`
"""
loc = self.get_group_location(group)
return remove_folder(loc)
[docs] def enumerate_group_files(self, group):
"""
Enumerates all files in a group.
:param group: group
:return: iterator on files
:githublink:`%|py|649`
"""
if group is None:
for g in self.Groups:
for _ in self.enumerate_group_files(g):
yield _
else:
loc = self.get_group_location(group)
for _ in explore_folder_iterfile(loc):
yield _
[docs] def list_mails(self, group):
"""
Returns the number of mails of a group.
:param group: group name
:return: list of mails
:githublink:`%|py|665`
"""
names = list(self.enumerate_group_files(group))
mails = []
for name in names:
if "attachments" in name:
continue
name_d = os.path.split(name)[-1]
if name_d.startswith("d_") and name_d.endswith(".html"):
mails.append(name)
return mails
[docs] def zip_group(self, group, outfile, addition=None):
"""
Zips a group.
:param group: group
:param outfile: output file
:param addition: additional files (sequence)
:return: list of zipped files
:githublink:`%|py|684`
"""
def iter_files():
for _ in self.enumerate_group_files(group):
yield _
if addition:
for _ in addition:
yield _
return zip_files(outfile, iter_files(), root=self._location)
_link_regex = re.compile("(https?[:][^ \\\"<>)(]+)")
_known_strings = ["xavierdupre.fr", "doodle", "ensaenotebook", "teralab",
"outlook.com", "gohlke", "support.google", "help.github",
"api.jcdecaux"]
_default_template_summary = """<?xml version="1.0" encoding="utf-8"?>
<head>
<meta http-equiv="content-type" content="text/html; charset=utf-8" />
</head>
<body>
<html>
<head>
<title>{{ title }}</title>
<link rel="stylesheet" type="text/css" href="{{ css }}">
</head>
<body>
<h1>{{ title }}</h1>
<ol type="1">
{% for ps in groups %}
<li><a href="{{ ps["link"] }}">{{ ps["group"] }}</a><small><i>
{{ ps["nb"] }} files - {{ format_size(ps["size"]) }} -
{% if len(ps["emails"]) > 0 %}
last mail {{ ps["emails"][-1]["date"] }} ---{% else %}
No mail found. {% endif %}
{{ len(ps["attachments"]) }} attachments</i></small>
{% if len(ps["attachments"]) + len(ps["links"]) > 0 %}
<ul>
{% for day, att, data in ps["attachments"] %}
<li>att: {{ day }} - <a href="{{ att }}">{{ os.path.split(att)[-1] }}</a></li>
{% endfor %}
{% for date, from_, url, domain, last in ps["links"] %}
<li>link: {{ date }} <a href="{{ url }}">{{ domain }} // {{ last }}</a> from {{ from_ }}</li>
{% endfor %}
</ul>
{% endif %}
{% if len(ps["created_files"]) > 0 %}
<ul>
{% for name, relpath, size in ps["created_files"] %}
<li>added: <a href="{{ relpath }}">{{ name }}</a> {{ size }}</li>
{% endfor %}
</ul>
{% endif %}
</li>
{% endfor %}
</ol>
</body>
</html>
""".replace(" ", "")
[docs] def write_run_command(self, filename=None, renderer=None):
"""
Writes a command script to run a server for this local content.
The server runs the javascripts fetching for local files.
The content is available at ``http://localhost:9000/``.
:githublink:`%|py|748`
"""
if filename is None:
if sys.platform.startswith('win'):
filename = "run_server.bat"
else:
filename = "run_server.sh"
url = "http://localhost:9000/"
content = textwrap.dedent("""
echo Open a browser with url '{}'
python3 -m http.server 9000
""").format(url)
dest = os.path.join(self.Location, filename)
self.fLOG("[write_run_command] write '{}'.".format(dest))
with open(dest, 'w') as f:
f.write(content)
[docs] def write_summary(self, renderer=None, link="index_mails.html",
outfile="index.html", title="summary",
nolink_if=None):
"""
Produces a summary and uses a :epkg:`Jinja2` template.
:param renderer: instance of `EmailMessageRenderer
<http://www.xavierdupre.fr/app/pymmails/
helpsphinx//pymmails/render/email_message_renderer.html>`_),
can be None
:param link: look for this file in each folder
:param outfile: output file
:param nolink_if: link containing those strings will be removed (if None, a default set will be assigned)
:param title: title
:return: summary
The current default template is::
.. runpython::
from ensae_teaching_cs.automation_students.projects_repository import _default_template_summary_template
print(_default_template_summary)
:githublink:`%|py|787`
"""
if nolink_if is None:
nolink_if = ProjectsRepository._known_strings
def filter_in(url):
if "\n" in url or "\r" in url or "\t" in url:
return False
if url.endswith("""):
return False
for _ in nolink_if:
if _ in url:
return False
if ".ipynb_checkpoints" in url:
return False
return True
def clean_url(u):
u = u.replace("+", "+").strip(".#'/ \r\n\t ")
if u.endswith(" "):
u = u[:-6]
return u
def url_domain_name(url):
r = urlparse(url)
domain = r.netloc
name = [_ for _ in url.split("/") if _]
last = name[-1] if len(name) > 0 else domain
if len(last) > 30:
last = last[-30:]
return domain, clean_url(last)
def format_size(s):
if s <= 2 ** 11:
return "{0} bytes".format(s)
elif s <= 2 ** 21:
return "{0} Kb".format(s // (2 ** 10))
elif s <= 2 ** 31:
return "{0} Mb".format(s // (2 ** 20))
else:
return "{0} Gb".format(s // (2 ** 30))
groups = []
for group in self.Groups:
lp = os.path.join(self.get_group_location(group), link)
if os.path.exists(lp):
c = os.path.relpath(lp, self._location), group
else:
c = "file:///{0}".format(group), group
nb_files = 0
size = 0
atts = []
emails = []
links = []
created_files = []
for name in self.enumerate_group_files(group):
if name.endswith(".metadata"):
continue
loc = self.get_group_location(group)
nb_files += 1
tn = name
size += os.stat(tn).st_size
folder = os.path.split(name)[0]
splf = folder.replace("\\", "/").split("/")
if folder.endswith("attachments"):
meta = name + ".metadata"
if os.path.exists(meta):
data = EmailMessage.read_metadata(meta)
day = data["date"].strftime("%Y-%m-%d")
else:
data = None
day = ""
atts.append((day, os.path.relpath(
name, self._location), data))
elif "attachments" in splf:
rel = os.path.relpath(name, loc)
dest = os.path.relpath(name, self._location)
if rel == dest:
raise Exception( # pragma: no cover
"weird\n{0}\n{1}".format(rel, dest))
ssize = format_size(os.stat(name).st_size)
if "__MACOSX" not in rel and "__MACOSX" not in dest and \
".ipynb_checkpoints" not in dest and ".ipynb_checkpoints" not in rel:
created_files.append((rel, dest, ssize))
else:
mail = os.path.split(name)[-1]
res = EmailMessage.interpret_default_filename(mail)
if "date" in res and "uid" in res and "from" in res:
emails.append(
(res["date"], res["from"], res["uid"], res))
with open(os.path.join(loc, mail), "r", encoding="utf8") as f:
content = f.read()
urls = ProjectsRepository._link_regex.findall(content)
if urls:
for u in set(urls):
u = clean_url(u)
if not filter_in(u):
continue
domain, last = url_domain_name(u)
links.append(
(res["date"], res["from"], clean_url(u), domain, last))
# we sort
atts.sort()
links.sort()
# we clean duplicated links
mlinks = links
links = []
done = {}
for date, from_, url, domain, last in mlinks:
if url in done:
continue
if "__MACOSX" in url or "__MACOSX" in last or \
".ipynb_checkpoints" in last or ".ipynb_checkpoints" in url:
continue
links.append((date, from_, url, domain, last))
done[url] = True
# we create the variable for the template
emails = [_[-1] for _ in sorted(emails)]
c = dict(link=c[0].replace("\\", "/"), group=c[1], nb=nb_files,
size=size, attachments=atts, emails=emails, links=links,
created_files=created_files)
groups.append(c)
# final summary
if renderer is None:
tmpl = ProjectsRepository._default_template_summary
renderer = EmailMessageRenderer(tmpl=tmpl, fLOG=self.fLOG)
dof = True
else:
dof = False
res = renderer.write(filename=outfile, location=self.Location,
mail=None, attachments=None, groups=groups,
title=title, len=len, os=os,
format_size=format_size)
if dof:
renderer.flush()
return res
[docs] def unzip_convert(self, group):
"""
Unzips files and convert notebooks into :epkg:`HTML`.
:param group: group name
:return: list of new files
:githublink:`%|py|934`
"""
self.unzip_files(group)
return self.convert_files(group)
[docs] def unzip_files(self, group):
"""
Unzips files and convert notebooks into :epkg:`HTML`.
:param group: group name
:return: list of new filess
:githublink:`%|py|944`
"""
def fvalid(zip_name, local_name):
if "__pycache__" in zip_name:
return False
if zip_name.endswith(".pyc"):
return False
return True
def clean_f(folder):
folder = folder.replace(" ", "_").replace(
",", "_").replace("&", "_").replace("\r", "_")
folder = folder.replace("\n", "_").replace("\t", "_")
return folder
names = list(self.enumerate_group_files(group))
files = []
for name in names:
if "attachments" not in name:
continue
ext = os.path.splitext(name)[-1]
if ext == ".zip":
folder = os.path.splitext(name)[0] + "_zip"
folder = clean_f(folder)
if not os.path.exists(folder):
self.fLOG(
"[ProjectsRepository.unzip_files] unzip '{0}'".format(name))
self.fLOG(
"[ProjectsRepository.unzip_files] creating '{0}'".format(folder))
os.makedirs(folder)
try:
lf = unzip_files(
name, folder, fLOG=self.fLOG, fvalid=fvalid, fail_if_error=False)
except (zipfile.BadZipFile, NotImplementedError, OSError) as e:
self.fLOG(
"[ProjectsRepository.unzip_files] ERROR: unable to unzip '{0}' because of '{1}']".format(name, e))
lf = []
files.extend(lf)
else:
# already done, we do not do it again
pass
elif ext == ".7z":
folder = os.path.splitext(name)[0] + "_7z"
folder = clean_f(folder)
if not os.path.exists(folder):
self.fLOG(
"[ProjectsRepository.un7zip_files] un7zip '{0}'".format(name))
self.fLOG(
"[ProjectsRepository.un7zip_files] creating '{0}'".format(folder))
os.makedirs(folder)
lf = un7zip_files(
name, folder, fLOG=self.fLOG, fvalid=fvalid)
files.extend(lf)
else:
# already done, we do not do it again
pass
elif ext == ".rar":
folder = os.path.splitext(name)[0] + "_rar"
folder = clean_f(folder)
if not os.path.exists(folder):
self.fLOG(
"[ProjectsRepository.unrar_files] unrar '{0}'".format(name))
self.fLOG(
"[ProjectsRepository.unrar_files] creating '{0}'".format(folder))
os.makedirs(folder)
lf = unrar_files(
name, folder, fLOG=self.fLOG, fvalid=fvalid)
files.extend(lf)
else:
# already done, we do not do it again
pass
elif name.endswith(".tar.gz"):
folder = os.path.splitext(name)[0] + "_targz"
folder = clean_f(folder)
if not os.path.exists(folder):
self.fLOG(
"[ProjectsRepository.untar_files] ungzip '{0}'".format(name))
self.fLOG(
"[ProjectsRepository.untar_files] creating '{0}'".format(folder))
os.makedirs(folder)
unzip = "pkl.gz" not in name
lf = untar_files(name, folder, fLOG=self.fLOG)
files.extend(lf)
else:
# already done, we do not do it again
pass
elif ext == ".gz":
folder = os.path.splitext(name)[0] + "_gz"
folder = clean_f(folder)
if not os.path.exists(folder):
self.fLOG(
"[ProjectsRepository.ungzip_files] ungzip '{0}'".format(name))
self.fLOG(
"[ProjectsRepository.ungzip_files] creating '{0}'".format(folder))
os.makedirs(folder)
unzip = "pkl.gz" not in name
lf = ungzip_files(
name, folder, fLOG=self.fLOG, fvalid=fvalid, unzip=unzip)
files.extend(lf)
else:
# already done, we do not do it again
pass
return files
[docs] def convert_files(self, group):
"""
Converts all notebooks and python scripts into :epkg:`HTML` for a group.
:param group: group name
:return: list of new files
:githublink:`%|py|1053`
"""
names = list(self.enumerate_group_files(group))
files = []
for name in names:
if "attachments" not in name:
continue
ext = os.path.splitext(name)[-1]
if ext == ".ipynb":
self.fLOG(
"[ProjectsRepository.convert_files] convert '{0}'".format(name))
out = name + ".html"
if os.path.exists(out):
warnings.warn(
"[convert_files] overwriting '{0}'".format(out))
try:
upgrade_notebook(name)
nb2html(name, out, exc=False)
files.append(out)
except Exception as e:
warnings.warn(
"unable to convert a notebook '{0}' because of {1}".format(name, e))
elif ext == ".py":
self.fLOG(
"[ProjectsRepository.convert_files] convert '{0}'".format(name))
out = name + ".html"
if os.path.exists(out):
warnings.warn(
"[convert_files] overwriting '{0}'".format(out))
try:
py_to_html_file(name, out, False, title=os.path.relpath(
name, self.get_group_location(group)))
files.append(out)
except Exception:
# the syntax of the python file might be wrong
warnings.warn(
"unable to convert File \"{0}\"".format(name))
return files