Code source de ensae_teaching_cs.automation_students.projects_repository

"""
Some automation helpers to grab mails from students about their projects.


:githublink:`%|py|5`
"""
import re
import os
import sys
import json
import textwrap
import warnings
import zipfile
from urllib.parse import urlparse
import numpy
from pyquickhelper.loghelper import noLOG
from pyquickhelper.texthelper import remove_diacritics
from pyquickhelper.filehelper import remove_folder, explore_folder_iterfile
from pyquickhelper.filehelper import (
    unzip_files, zip_files, ungzip_files, un7zip_files, unrar_files,
    untar_files
)
from pyquickhelper.helpgen import nb2html
from pyquickhelper.ipythonhelper import upgrade_notebook
from pymmails import EmailMessageRenderer, EmailMessage
from .repository_exception import RegexRepositoryException, TooManyProjectsException
from ..td_1a import edit_distance
from ..homeblog.python_exemple_py_to_html import py_to_html_file


[docs]class ProjectsRepository: """ Handle a repository of students projects. See example :ref:`sphx_glr_automation_fetch_student_projects_from_gmail.py`. :githublink:`%|py|33` """
[docs] class MailNotFound(Exception): """ Raises an exception if mail not found. :githublink:`%|py|38` """ pass
_email_regex = re.compile("[*] *e?mails? *: *([^*+\\n]+)") _gitlab_regex = re.compile("[*] *gitlab *: *([^*+\\n]+[.]git)") _video_regex = re.compile("[*] *videos? *: *([^*\\n]+)")
[docs] def __init__(self, location, suivi="suivi.rst", fLOG=noLOG): """ Location of the repository. :param location: location of the repository :param suivi: name of the file gathering information about each project :githublink:`%|py|51` """ self._location = location self._suivi = suivi self.fLOG = fLOG
@property def Location(self): """ :return: location of the repository :githublink:`%|py|60` """ return self._location @property def Groups(self): """ Returns all available groups in the repository. :githublink:`%|py|67` """ return [_ for _ in os.listdir(self._location) if os.path.isdir(os.path.join(self._location, _))]
[docs] def get_group_location(self, group): """ Returns the local folder associated to a group. :param group: group name :return: local folder :githublink:`%|py|77` """ return os.path.join(self._location, group)
[docs] @staticmethod def get_regex(path, regex, suivi="suivi.rst", skip_if_empty=False): """ Retrieves data from file ``suivi.rst`` using a regular expression. :param path: sub folder to look into :param suivi: name of the file ``suivi.rst`` :param skip_if_empty: skip of no mail? :return: list of mails :githublink:`%|py|89` """ if not os.path.exists(path): raise FileNotFoundError(path) # pragma: no cover filename = os.path.join(path, suivi) if not os.path.exists(filename): raise FileNotFoundError(filename) # pragma: no cover try: with open(filename, "r", encoding="utf8") as f: content = f.read() except UnicodeDecodeError as e: raise ValueError( # pragma: no cover 'unable to parse file:\n File "{0}", line 1'.format(filename)) from e mails = regex.findall(content) if len(mails) == 0: if skip_if_empty: return [] raise RuntimeError( # pragma: no cover "Unable to find the regular expression '{0}' in '{1}'".format( regex.pattern, filename)) allmails = [] for m in mails: allmails.extend(m.strip("\n\r\t ").split(";")) return [_.strip() for _ in allmails for _ in allmails]
[docs] def get_emails(self, group, skip_if_empty=False): """ Retrieves student emails from file ``suivi.rst``. :param group: group :param skip_if_empty: skip if no mail? :return: list of mails :githublink:`%|py|124` """ path = os.path.join(self._location, group) allmails = ProjectsRepository.get_regex(path, ProjectsRepository._email_regex, self._suivi, skip_if_empty=skip_if_empty) for a in allmails: if "\n" in a: raise ValueError( # pragma: no cover "unable to interpret " + str([a]) + " from path " + path) ff = a.split("@") if len(ff) != 2: raise RegexRepositoryException( # pragma: no cover "unable to understand mail {0} in {1} (suivi={2} (mail separator is ;)".format( a, path, self._suivi)) return allmails
[docs] def get_videos(self, group): """ Retrieves student emails from file ``suivi.rst``. :param group: group :return: list of videos :githublink:`%|py|148` """ return ProjectsRepository.get_regex(group, ProjectsRepository._video_regex, self._suivi)
[docs] def get_sections(self, group): """ Extracts sections from a filename used to follow a group of students. :param group: group :return: dictionary { section : content } Example of a file:: rapport +++++++ * bla 1 extrait +++++++ :: paragraphe 1 paragraphe 2 :githublink:`%|py|174` """ path = os.path.join(self._location, group) if not os.path.exists(path): raise FileNotFoundError(path) # pragma: no cover filename = os.path.join(path, self._suivi) if not os.path.exists(filename): raise FileNotFoundError(filename) # pragma: no cover try: with open(filename, "r", encoding="utf8") as f: content = f.read() except UnicodeDecodeError as e: raise ValueError( # pragma: no cover 'unable to parse file:\n File "{0}", line 1'.format(filename)) from e lines = [_.strip("\r").rstrip() for _ in content.split("\n")] added_in = [] sections = {"": []} title = "" for i, line in enumerate(lines): if len(line) == 0: sections[title].append(line) added_in.append(title) else: f = line[0] if f == " ": if title is not None: sections[title].append(line) added_in.append(title) else: sections[""].append(line) added_in.append("") elif f in "=+-": if line == f * len(line): title = lines[i - 1] if len(added_in) > 0: t = added_in[-1] sections[t] = sections[t][:-1] added_in[-1] = title if f == "=": sections["title"] = [title] added_in.append("title") title = "title" else: sections[title] = [] added_in.append(title) else: sections[title].append(line) added_in.append(title) else: sections[title].append(line) added_in.append(title) return sections
_regex_split = re.compile("[-;,. @]")
[docs] @staticmethod def match_mail(name, emails, threshold=3, exc=True): """ Tries to match a name among a list of mails. :param name: a name (first name last name separated by a space) :param emails: list of emails :param threshold: above this threshold, mails and names don't match :param exc: raise an Exception if not found :return: list of available mails, boolean The second results is True if no email were found in the list. :githublink:`%|py|243` """ # we check the easy case if isinstance(name, float): name = str(name) if not numpy.isnan(name) else "" if name in emails: return [(0, name)] pieces = [_.strip() for _ in ProjectsRepository._regex_split.split( remove_diacritics(name.lower()))] pieces.sort() pieces = " ".join(pieces) res = [] for email in emails: spl = [_.strip() for _ in ProjectsRepository._regex_split.split( remove_diacritics(email.split("@")[0].lower()))] spl.sort() mail = " ".join(spl) d = edit_distance(mail, pieces)[0] res.append((d, email)) res = [_ for _ in res if _[0] <= threshold] res.sort() if exc and len(res) == 0: raise ProjectsRepository.MailNotFound( # pragma: no cover "unable to find a mail for {0} among\n{1}".format(name, "\n".join(emails))) return res
[docs] @staticmethod def match_mails(names, emails, threshold=3, exc=True, skip_names=None): """ Tries to match a series of names among a list of mails. :param names: list of names (first name last name separated by a space) :param emails: list of emails :param threshold: above this threshold, mails and names don't match :param exc: raise an Exception if not found :param skip_names: the second boolean is True is one of the name belongs to this list :return: list of available mails, boolean The second results is True if no email were found in the list. :githublink:`%|py|283` """ res = [] skip = False for name in names: if skip_names is not None and name in skip_names: skip = True r = ProjectsRepository.match_mail(name, emails, threshold, exc) res.extend([_[1] for _ in r]) return res, skip
[docs] @staticmethod def create_folders_from_dataframe(df, root, report="suivi.rst", col_student=None, col_group="Groupe", col_subject="Sujet", col_mail="mail", overwrite=False, email_function=None, must_have_email=True, skip_if_nomail=False, skip_names=None, fLOG=noLOG): """ Creates a series of folders for groups of students. :param root: where to create the folders :param col_student: column which contains the student name (firt name + last name), equal to *col_mail* if *None* :param col_group: index of the group (it can be *None* if each student is a group) :param col_subject: column which contains the subject :param col_mail: if there is a column which contains the mail in the input dataframe :param df: DataFrame :param email_function: function which infers email from first and last names, see below :param report: report file :param overwrite: if False, skip if the report already exists :param must_have_email: if True, raises an exception if no mail is found :param skip_if_nomail: skip a name if no mail is found :param skip_names: less checking for a given set of names :param fLOG: logging function :return: list of creates folders The function *email_function* has the following signature:: def email_function(names): # part of a names is a list of tokens # ... return list of mails, skip=boolean The boolean tells the function to skip this group. *email_function* can be a list of mails. In that case, this function is replaced by :meth:`match_mails <ensae_teaching_cs.automation_students.projects_repository.ProjectsRepository.match_mails>`. :githublink:`%|py|327` """ if col_mail is None and email_function is None: raise ValueError( # pragma: no cover "col_mail cannot be None if email_function is None") if col_student is None: col_student = col_mail def local_email_function(names, skip_names): return ProjectsRepository.match_mails(names, email_function, exc=False, skip_names=skip_names) def local_email_function_column(names, skip_names, mapping): res = [] skip = False for name in names: if skip_names is not None and name in skip_names: skip = True r = mapping.get(name, None) if r: res.append(r) return res, skip if isinstance(email_function, (list, set)): if col_mail is None: local_function = local_email_function else: try: ind_student = list(df.columns).index(col_student) + 1 ind_mail = list(df.columns).index(col_mail) + 1 except ValueError as e: raise ValueError( # pragma: no cover "Unable to find '{0}' or '{1}' in {2}".format( col_student, col_mail, df.columns)) from e mapping = {} for row in df.itertuples(): mapping[row[ind_student]] = row[ind_mail] local_function = \ lambda names, skip, mp=mapping: \ local_email_function_column(names, skip_names, mp) else: local_function = email_function def ul(last): res = "" for i, c in enumerate(last): if c == " ": res += "." elif c == "-": res += "." elif c == '@': break else: res += c return res folds = [] if df.shape[1] == 0: raise Exception("No column in the dataframe.") # pragma: no cover if col_group: gr = df.groupby(col_group) else: df2 = df.copy() df2["gid"] = df.index df2["gid2"] = df2.gid.apply(lambda x: "G%d" % x) gr = df2.groupby("gid2") fLOG("[ProjectsRepository.create_folders_from_dataframe] number of groups {0}".format( len(gr))) for name, group in gr: if col_subject: s = list(set(group[col_subject].copy())) s = [_ for _ in s if not isinstance( _, float) or ~numpy.isnan(_)] if len(s) > 1: raise TooManyProjectsException( # pragma: no cover "more than one subject for group: " + str(name) + "\n" + str(s)) elif len(s) == 0: s = ["unknown"] subject = s[0] else: subject = None eleves = list(group[col_student]) eleves.sort() if email_function is not None: mails, skip = local_function(eleves, skip_names) if must_have_email and (not skip and len(mails) == 0): # we skip only if a group has no mails at all if isinstance(email_function, (list, set)): mes = "unable to find a mail for\n{0}\nname={1}\nskip:{4}\n{5}\namong\n{3}\nGROUP\n{2}\nlocal_function: {6}" raise ProjectsRepository.MailNotFound( # pragma: no cover mes.format("; ".join("'%s'" % _ for _ in eleves), name, group, "\n".join(email_function), skip, skip_names, local_function)) raise ProjectsRepository.MailNotFound( # pragma: no cover "unable to find a mail for {0}\nname={1}\n with function\n{3}\nGROUP\n{2}\nTYPE:\n{4}".format( " ;".join(eleves), name, group, email_function, type(email_function))) if skip_if_nomail and (not skip and len(mails) == 0): fLOG("[ProjectsRepository.create_folders_from_dataframe] skipping {0}".format( "; ".join(eleves))) continue if mails: for m in mails: if "@" not in m: raise ValueError( # pragma: no cover "mails contains a mail with no @: {0}".format(m)) if "<" in m or ">" in m: raise ValueError( # pragma: no cover "one mail contains weird characters: {0}".format(m)) jmail = "; ".join(mails) else: jmail = None else: jmail = None if jmail is not None: if "@" not in jmail: raise ValueError( # pragma: no cover "jmail does not contain any @: {0}".format(jmail)) members = ", ".join(map(str, eleves)) content = [members] content.append("=" * len(members)) content.append("") content.append("* members: {0}".format(members)) if subject: content.append("* subject: {0}".format(subject)) content.append("* G: {0}".format(name)) if jmail: content.append("* mails: " + jmail) content.append("") content.append("") last = "-".join(ul(a) for a in sorted(map(str, eleves))) folder = os.path.join(root, last) filename = os.path.join(folder, report) if not os.path.exists(folder): if '@' in folder: raise ValueError( # pragma: no cover "Folder '{0}' must not contain '@'.".format(folder)) os.mkdir(folder) if overwrite or not os.path.exists(filename): with open(filename, "w", encoding="utf8") as f: f.write("\n".join(content)) folds.append(folder) proj = ProjectsRepository(root, suivi=report, fLOG=fLOG) if must_have_email: for gr in proj.Groups: mails = proj.get_emails(gr) if len(mails) == 0: raise ValueError( # pragma: no cover "No mail for group '{0}'.".format(gr)) return proj
[docs] def enumerate_group_mails(self, group, mailbox, subfolder, date=None, skip_function=None, max_dest=5): """ Enumerates all mails sent by or sent to a given group. :param group: group (if None, goes through all mails) :param mailbox: mailbox (see `pymmails <http://www.xavierdupre.fr/app/pymmails/helpsphinx/>`_) :param subfolder: which subfolder of the mailbox to look into :param date: date :param skip_function: if not None, use this function on the header/body to avoid loading the entire message (and skip it) :param max_dest: maximum number of receivers :return: iterator on mails :githublink:`%|py|506` """ if group is None: for group_ in self.Groups: self.fLOG( "[ProjectsRepository.enumerate_group_mails] group='{0}'".format(group_)) iter = self.enumerate_group_mails(group_, mailbox, subfolder=subfolder, date=date, skip_function=skip_function, max_dest=max_dest) for mail in iter: yield mail else: mails = self.get_emails(group) self.fLOG("[ProjectsRepository.enumerate_group_mails] mails='{0}' folder='{1}' date={2}".format( str(mails), subfolder, date)) iter = mailbox.enumerate_search_person( person=mails, folder=subfolder, skip_function=skip_function, date=date, max_dest=5) for mail in iter: yield mail
[docs] def dump_group_mails(self, renderer, group, mailbox, subfolder, date=None, skip_function=None, max_dest=5, filename="index_mails.html", overwrite=False, skip_if_empty=False, convert_files=False): """ Enumerates all mails sent by or sent to a given group. :param renderer: instance of class `EmailMessageListRenderer <http://www.xavierdupre.fr/app/pymmails/helpsphinx/pymmails/render/ email_message_list_renderer.html>`_ :param group: group :param mailbox: mailbox (see `pymmails <http://www.xavierdupre.fr/app/pymmails/helpsphinx/>`_) :param subfolder: which subfolder of the mailbox to look into :param date: date :param skip_function: if not None, use this function on the header/body to avoid loading the entire message (and skip it) :param max_dest: maximum number of receivers :param filename: filename which gathers a link to every mail :param overwrite: overwrite :param skip_if_empty: skip if no mail? :param convert_files: unzip and convert :return: list of files (see `EmailMessageListRenderer.write <http://www.xavierdupre.fr/app/pymmails/helpsphinx/pymmails/render/ email_message_list_renderer.html>`_) zip, gz, rar, 7z can be uncompressed. It then convert *.py* and *.ipynb* into html. :githublink:`%|py|554` """ if group is None: res = [] for group_ in self.Groups: r = self.dump_group_mails(renderer, group_, mailbox, subfolder=subfolder, date=date, skip_function=skip_function, max_dest=max_dest, overwrite=overwrite, skip_if_empty=skip_if_empty, convert_files=convert_files) res.extend(r) return res else: mails = self.get_emails(group, skip_if_empty=skip_if_empty) if skip_if_empty and len(mails) == 0: self.fLOG("[ProjectsRepository.dump_group_mails] SKIP group='{0}' folder='{1}' date={2} mails={3}".format( group, subfolder, date, str(mails))) return [] else: self.fLOG("[ProjectsRepository.dump_group_mails] group='{0}' folder='{1}' date={2} mails={3}".format( group, subfolder, date, str(mails))) def iter_mail(body=True): return mailbox.enumerate_search_person(person=mails, folder=subfolder, skip_function=skip_function, date=date, max_dest=max_dest, body=body) nbmails = len(self.list_mails(group)) nbcur = len(list(iter_mail(body=False))) if nbmails != nbcur: overwrite = True self.fLOG("[dump_group_mails] group='{0}' - new mails".format( group), nbcur, "<", "nbmails") iter = iter_mail(body=True) location = self.get_group_location(group) r = renderer.write(iter=iter, location=location, filename=filename, overwrite=overwrite, file_jsatt="_summaryattachements_raw.json", attach_folder="attachments") renderer.flush() # attachments in JSON format json_att = [] metadata = {} for name in self.enumerate_group_files(group): if "attachments" not in name or not name.endswith('.metadata'): continue sname = os.path.relpath(name, location).replace("\\", "/") metadata[sname[:-9]] = sname for name in self.enumerate_group_files(group): if "attachments" not in name or name.endswith('.metadata'): continue sname = os.path.relpath(name, location).replace("\\", "/") info = dict(a=sname, name=sname) if sname in metadata: info['info'] = '<a href="{0}">metadata</a>'.format( metadata[sname]) json_att.append(info) if convert_files: converted = self.unzip_convert(group) for conv in converted: sconv = os.path.relpath(conv, location).replace("\\", "/") json_att.append( dict(a=sconv, name=sconv, unzip_convert='Yes')) file_jsatt = os.path.join(location, "_summaryattachements.json") if json_att and not renderer.BufferWrite.exists(file_jsatt, local=not overwrite): f = renderer.BufferWrite.open( file_jsatt, text=True, encoding='utf-8') js = json.dumps(json_att) f.write(js) return r
[docs] def remove_group(self, group): """ Removes a group. :param group: group :return: list of removed files See `remove_folder <http://www.xavierdupre.fr/app/pyquickhelper/helpsphinx/ pyquickhelper/filehelper/synchelper.html#module-pyquickhelper.filehelper.synchelper>`_. :githublink:`%|py|639` """ loc = self.get_group_location(group) return remove_folder(loc)
[docs] def enumerate_group_files(self, group): """ Enumerates all files in a group. :param group: group :return: iterator on files :githublink:`%|py|649` """ if group is None: for g in self.Groups: for _ in self.enumerate_group_files(g): yield _ else: loc = self.get_group_location(group) for _ in explore_folder_iterfile(loc): yield _
[docs] def list_mails(self, group): """ Returns the number of mails of a group. :param group: group name :return: list of mails :githublink:`%|py|665` """ names = list(self.enumerate_group_files(group)) mails = [] for name in names: if "attachments" in name: continue name_d = os.path.split(name)[-1] if name_d.startswith("d_") and name_d.endswith(".html"): mails.append(name) return mails
[docs] def zip_group(self, group, outfile, addition=None): """ Zips a group. :param group: group :param outfile: output file :param addition: additional files (sequence) :return: list of zipped files :githublink:`%|py|684` """ def iter_files(): for _ in self.enumerate_group_files(group): yield _ if addition: for _ in addition: yield _ return zip_files(outfile, iter_files(), root=self._location)
_link_regex = re.compile("(https?[:][^ \\\"<>)(]+)") _known_strings = ["xavierdupre.fr", "doodle", "ensaenotebook", "teralab", "outlook.com", "gohlke", "support.google", "help.github", "api.jcdecaux"] _default_template_summary = """<?xml version="1.0" encoding="utf-8"?> <head> <meta http-equiv="content-type" content="text/html; charset=utf-8" /> </head> <body> <html> <head> <title>{{ title }}</title> <link rel="stylesheet" type="text/css" href="{{ css }}"> </head> <body> <h1>{{ title }}</h1> <ol type="1"> {% for ps in groups %} <li><a href="{{ ps["link"] }}">{{ ps["group"] }}</a><small><i> {{ ps["nb"] }} files - {{ format_size(ps["size"]) }} - {% if len(ps["emails"]) > 0 %} last mail {{ ps["emails"][-1]["date"] }} ---{% else %} No mail found. {% endif %} {{ len(ps["attachments"]) }} attachments</i></small> {% if len(ps["attachments"]) + len(ps["links"]) > 0 %} <ul> {% for day, att, data in ps["attachments"] %} <li>att: {{ day }} - <a href="{{ att }}">{{ os.path.split(att)[-1] }}</a></li> {% endfor %} {% for date, from_, url, domain, last in ps["links"] %} <li>link: {{ date }} <a href="{{ url }}">{{ domain }} // {{ last }}</a> from {{ from_ }}</li> {% endfor %} </ul> {% endif %} {% if len(ps["created_files"]) > 0 %} <ul> {% for name, relpath, size in ps["created_files"] %} <li>added: <a href="{{ relpath }}">{{ name }}</a> {{ size }}</li> {% endfor %} </ul> {% endif %} </li> {% endfor %} </ol> </body> </html> """.replace(" ", "")
[docs] def write_run_command(self, filename=None, renderer=None): """ Writes a command script to run a server for this local content. The server runs the javascripts fetching for local files. The content is available at ``http://localhost:9000/``. :githublink:`%|py|748` """ if filename is None: if sys.platform.startswith('win'): filename = "run_server.bat" else: filename = "run_server.sh" url = "http://localhost:9000/" content = textwrap.dedent(""" echo Open a browser with url '{}' python3 -m http.server 9000 """).format(url) dest = os.path.join(self.Location, filename) self.fLOG("[write_run_command] write '{}'.".format(dest)) with open(dest, 'w') as f: f.write(content)
[docs] def write_summary(self, renderer=None, link="index_mails.html", outfile="index.html", title="summary", nolink_if=None): """ Produces a summary and uses a :epkg:`Jinja2` template. :param renderer: instance of `EmailMessageRenderer <http://www.xavierdupre.fr/app/pymmails/ helpsphinx//pymmails/render/email_message_renderer.html>`_), can be None :param link: look for this file in each folder :param outfile: output file :param nolink_if: link containing those strings will be removed (if None, a default set will be assigned) :param title: title :return: summary The current default template is:: .. runpython:: from ensae_teaching_cs.automation_students.projects_repository import _default_template_summary_template print(_default_template_summary) :githublink:`%|py|787` """ if nolink_if is None: nolink_if = ProjectsRepository._known_strings def filter_in(url): if "\n" in url or "\r" in url or "\t" in url: return False if url.endswith("&quot;"): return False for _ in nolink_if: if _ in url: return False if ".ipynb_checkpoints" in url: return False return True def clean_url(u): u = u.replace("&#43;", "+").strip(".#'/ \r\n\t ") if u.endswith("&nbsp;"): u = u[:-6] return u def url_domain_name(url): r = urlparse(url) domain = r.netloc name = [_ for _ in url.split("/") if _] last = name[-1] if len(name) > 0 else domain if len(last) > 30: last = last[-30:] return domain, clean_url(last) def format_size(s): if s <= 2 ** 11: return "{0} bytes".format(s) elif s <= 2 ** 21: return "{0} Kb".format(s // (2 ** 10)) elif s <= 2 ** 31: return "{0} Mb".format(s // (2 ** 20)) else: return "{0} Gb".format(s // (2 ** 30)) groups = [] for group in self.Groups: lp = os.path.join(self.get_group_location(group), link) if os.path.exists(lp): c = os.path.relpath(lp, self._location), group else: c = "file:///{0}".format(group), group nb_files = 0 size = 0 atts = [] emails = [] links = [] created_files = [] for name in self.enumerate_group_files(group): if name.endswith(".metadata"): continue loc = self.get_group_location(group) nb_files += 1 tn = name size += os.stat(tn).st_size folder = os.path.split(name)[0] splf = folder.replace("\\", "/").split("/") if folder.endswith("attachments"): meta = name + ".metadata" if os.path.exists(meta): data = EmailMessage.read_metadata(meta) day = data["date"].strftime("%Y-%m-%d") else: data = None day = "" atts.append((day, os.path.relpath( name, self._location), data)) elif "attachments" in splf: rel = os.path.relpath(name, loc) dest = os.path.relpath(name, self._location) if rel == dest: raise Exception( # pragma: no cover "weird\n{0}\n{1}".format(rel, dest)) ssize = format_size(os.stat(name).st_size) if "__MACOSX" not in rel and "__MACOSX" not in dest and \ ".ipynb_checkpoints" not in dest and ".ipynb_checkpoints" not in rel: created_files.append((rel, dest, ssize)) else: mail = os.path.split(name)[-1] res = EmailMessage.interpret_default_filename(mail) if "date" in res and "uid" in res and "from" in res: emails.append( (res["date"], res["from"], res["uid"], res)) with open(os.path.join(loc, mail), "r", encoding="utf8") as f: content = f.read() urls = ProjectsRepository._link_regex.findall(content) if urls: for u in set(urls): u = clean_url(u) if not filter_in(u): continue domain, last = url_domain_name(u) links.append( (res["date"], res["from"], clean_url(u), domain, last)) # we sort atts.sort() links.sort() # we clean duplicated links mlinks = links links = [] done = {} for date, from_, url, domain, last in mlinks: if url in done: continue if "__MACOSX" in url or "__MACOSX" in last or \ ".ipynb_checkpoints" in last or ".ipynb_checkpoints" in url: continue links.append((date, from_, url, domain, last)) done[url] = True # we create the variable for the template emails = [_[-1] for _ in sorted(emails)] c = dict(link=c[0].replace("\\", "/"), group=c[1], nb=nb_files, size=size, attachments=atts, emails=emails, links=links, created_files=created_files) groups.append(c) # final summary if renderer is None: tmpl = ProjectsRepository._default_template_summary renderer = EmailMessageRenderer(tmpl=tmpl, fLOG=self.fLOG) dof = True else: dof = False res = renderer.write(filename=outfile, location=self.Location, mail=None, attachments=None, groups=groups, title=title, len=len, os=os, format_size=format_size) if dof: renderer.flush() return res
[docs] def unzip_convert(self, group): """ Unzips files and convert notebooks into :epkg:`HTML`. :param group: group name :return: list of new files :githublink:`%|py|934` """ self.unzip_files(group) return self.convert_files(group)
[docs] def unzip_files(self, group): """ Unzips files and convert notebooks into :epkg:`HTML`. :param group: group name :return: list of new filess :githublink:`%|py|944` """ def fvalid(zip_name, local_name): if "__pycache__" in zip_name: return False if zip_name.endswith(".pyc"): return False return True def clean_f(folder): folder = folder.replace(" ", "_").replace( ",", "_").replace("&", "_").replace("\r", "_") folder = folder.replace("\n", "_").replace("\t", "_") return folder names = list(self.enumerate_group_files(group)) files = [] for name in names: if "attachments" not in name: continue ext = os.path.splitext(name)[-1] if ext == ".zip": folder = os.path.splitext(name)[0] + "_zip" folder = clean_f(folder) if not os.path.exists(folder): self.fLOG( "[ProjectsRepository.unzip_files] unzip '{0}'".format(name)) self.fLOG( "[ProjectsRepository.unzip_files] creating '{0}'".format(folder)) os.makedirs(folder) try: lf = unzip_files( name, folder, fLOG=self.fLOG, fvalid=fvalid, fail_if_error=False) except (zipfile.BadZipFile, NotImplementedError, OSError) as e: self.fLOG( "[ProjectsRepository.unzip_files] ERROR: unable to unzip '{0}' because of '{1}']".format(name, e)) lf = [] files.extend(lf) else: # already done, we do not do it again pass elif ext == ".7z": folder = os.path.splitext(name)[0] + "_7z" folder = clean_f(folder) if not os.path.exists(folder): self.fLOG( "[ProjectsRepository.un7zip_files] un7zip '{0}'".format(name)) self.fLOG( "[ProjectsRepository.un7zip_files] creating '{0}'".format(folder)) os.makedirs(folder) lf = un7zip_files( name, folder, fLOG=self.fLOG, fvalid=fvalid) files.extend(lf) else: # already done, we do not do it again pass elif ext == ".rar": folder = os.path.splitext(name)[0] + "_rar" folder = clean_f(folder) if not os.path.exists(folder): self.fLOG( "[ProjectsRepository.unrar_files] unrar '{0}'".format(name)) self.fLOG( "[ProjectsRepository.unrar_files] creating '{0}'".format(folder)) os.makedirs(folder) lf = unrar_files( name, folder, fLOG=self.fLOG, fvalid=fvalid) files.extend(lf) else: # already done, we do not do it again pass elif name.endswith(".tar.gz"): folder = os.path.splitext(name)[0] + "_targz" folder = clean_f(folder) if not os.path.exists(folder): self.fLOG( "[ProjectsRepository.untar_files] ungzip '{0}'".format(name)) self.fLOG( "[ProjectsRepository.untar_files] creating '{0}'".format(folder)) os.makedirs(folder) unzip = "pkl.gz" not in name lf = untar_files(name, folder, fLOG=self.fLOG) files.extend(lf) else: # already done, we do not do it again pass elif ext == ".gz": folder = os.path.splitext(name)[0] + "_gz" folder = clean_f(folder) if not os.path.exists(folder): self.fLOG( "[ProjectsRepository.ungzip_files] ungzip '{0}'".format(name)) self.fLOG( "[ProjectsRepository.ungzip_files] creating '{0}'".format(folder)) os.makedirs(folder) unzip = "pkl.gz" not in name lf = ungzip_files( name, folder, fLOG=self.fLOG, fvalid=fvalid, unzip=unzip) files.extend(lf) else: # already done, we do not do it again pass return files
[docs] def convert_files(self, group): """ Converts all notebooks and python scripts into :epkg:`HTML` for a group. :param group: group name :return: list of new files :githublink:`%|py|1053` """ names = list(self.enumerate_group_files(group)) files = [] for name in names: if "attachments" not in name: continue ext = os.path.splitext(name)[-1] if ext == ".ipynb": self.fLOG( "[ProjectsRepository.convert_files] convert '{0}'".format(name)) out = name + ".html" if os.path.exists(out): warnings.warn( "[convert_files] overwriting '{0}'".format(out)) try: upgrade_notebook(name) nb2html(name, out, exc=False) files.append(out) except Exception as e: warnings.warn( "unable to convert a notebook '{0}' because of {1}".format(name, e)) elif ext == ".py": self.fLOG( "[ProjectsRepository.convert_files] convert '{0}'".format(name)) out = name + ".html" if os.path.exists(out): warnings.warn( "[convert_files] overwriting '{0}'".format(out)) try: py_to_html_file(name, out, False, title=os.path.relpath( name, self.get_group_location(group))) files.append(out) except Exception: # the syntax of the python file might be wrong warnings.warn( "unable to convert File \"{0}\"".format(name)) return files