Code source de ensae_teaching_cs.automation_students.projects_repository

Some automation helpers to grab mails from students about their projects.

import re
import os
import sys
import json
import textwrap
import warnings
import zipfile
from urllib.parse import urlparse
import numpy
from pyquickhelper.loghelper import noLOG
from pyquickhelper.texthelper import remove_diacritics
from pyquickhelper.filehelper import remove_folder, explore_folder_iterfile
from pyquickhelper.filehelper import (
    unzip_files, zip_files, ungzip_files, un7zip_files, unrar_files,
from pyquickhelper.helpgen import nb2html
from pyquickhelper.ipythonhelper import upgrade_notebook
from pymmails import EmailMessageRenderer, EmailMessage
from .repository_exception import RegexRepositoryException, TooManyProjectsException
from ..td_1a import edit_distance
from ..homeblog.python_exemple_py_to_html import py_to_html_file

[docs]class ProjectsRepository: """ Handle a repository of students projects. See example :ref:``. :githublink:`%|py|33` """
[docs] class MailNotFound(Exception): """ Raises an exception if mail not found. :githublink:`%|py|38` """ pass
_email_regex = re.compile("[*] *e?mails? *: *([^*+\\n]+)") _gitlab_regex = re.compile("[*] *gitlab *: *([^*+\\n]+[.]git)") _video_regex = re.compile("[*] *videos? *: *([^*\\n]+)")
[docs] def __init__(self, location, suivi="suivi.rst", fLOG=noLOG): """ Location of the repository. :param location: location of the repository :param suivi: name of the file gathering information about each project :githublink:`%|py|51` """ self._location = location self._suivi = suivi self.fLOG = fLOG
@property def Location(self): """ :return: location of the repository :githublink:`%|py|60` """ return self._location @property def Groups(self): """ Returns all available groups in the repository. :githublink:`%|py|67` """ return [_ for _ in os.listdir(self._location) if os.path.isdir(os.path.join(self._location, _))]
[docs] def get_group_location(self, group): """ Returns the local folder associated to a group. :param group: group name :return: local folder :githublink:`%|py|77` """ return os.path.join(self._location, group)
[docs] @staticmethod def get_regex(path, regex, suivi="suivi.rst", skip_if_empty=False): """ Retrieves data from file ``suivi.rst`` using a regular expression. :param path: sub folder to look into :param suivi: name of the file ``suivi.rst`` :param skip_if_empty: skip of no mail? :return: list of mails :githublink:`%|py|89` """ if not os.path.exists(path): raise FileNotFoundError(path) # pragma: no cover filename = os.path.join(path, suivi) if not os.path.exists(filename): raise FileNotFoundError(filename) # pragma: no cover try: with open(filename, "r", encoding="utf8") as f: content = except UnicodeDecodeError as e: raise ValueError( # pragma: no cover 'unable to parse file:\n File "{0}", line 1'.format(filename)) from e mails = regex.findall(content) if len(mails) == 0: if skip_if_empty: return [] raise RuntimeError( # pragma: no cover "Unable to find the regular expression '{0}' in '{1}'".format( regex.pattern, filename)) allmails = [] for m in mails: allmails.extend(m.strip("\n\r\t ").split(";")) return [_.strip() for _ in allmails for _ in allmails]
[docs] def get_emails(self, group, skip_if_empty=False): """ Retrieves student emails from file ``suivi.rst``. :param group: group :param skip_if_empty: skip if no mail? :return: list of mails :githublink:`%|py|124` """ path = os.path.join(self._location, group) allmails = ProjectsRepository.get_regex(path, ProjectsRepository._email_regex, self._suivi, skip_if_empty=skip_if_empty) for a in allmails: if "\n" in a: raise ValueError( # pragma: no cover "unable to interpret " + str([a]) + " from path " + path) ff = a.split("@") if len(ff) != 2: raise RegexRepositoryException( # pragma: no cover "unable to understand mail {0} in {1} (suivi={2} (mail separator is ;)".format( a, path, self._suivi)) return allmails
[docs] def get_videos(self, group): """ Retrieves student emails from file ``suivi.rst``. :param group: group :return: list of videos :githublink:`%|py|148` """ return ProjectsRepository.get_regex(group, ProjectsRepository._video_regex, self._suivi)
[docs] def get_sections(self, group): """ Extracts sections from a filename used to follow a group of students. :param group: group :return: dictionary { section : content } Example of a file:: rapport +++++++ * bla 1 extrait +++++++ :: paragraphe 1 paragraphe 2 :githublink:`%|py|174` """ path = os.path.join(self._location, group) if not os.path.exists(path): raise FileNotFoundError(path) # pragma: no cover filename = os.path.join(path, self._suivi) if not os.path.exists(filename): raise FileNotFoundError(filename) # pragma: no cover try: with open(filename, "r", encoding="utf8") as f: content = except UnicodeDecodeError as e: raise ValueError( # pragma: no cover 'unable to parse file:\n File "{0}", line 1'.format(filename)) from e lines = [_.strip("\r").rstrip() for _ in content.split("\n")] added_in = [] sections = {"": []} title = "" for i, line in enumerate(lines): if len(line) == 0: sections[title].append(line) added_in.append(title) else: f = line[0] if f == " ": if title is not None: sections[title].append(line) added_in.append(title) else: sections[""].append(line) added_in.append("") elif f in "=+-": if line == f * len(line): title = lines[i - 1] if len(added_in) > 0: t = added_in[-1] sections[t] = sections[t][:-1] added_in[-1] = title if f == "=": sections["title"] = [title] added_in.append("title") title = "title" else: sections[title] = [] added_in.append(title) else: sections[title].append(line) added_in.append(title) else: sections[title].append(line) added_in.append(title) return sections
_regex_split = re.compile("[-;,. @]")
[docs] @staticmethod def match_mail(name, emails, threshold=3, exc=True): """ Tries to match a name among a list of mails. :param name: a name (first name last name separated by a space) :param emails: list of emails :param threshold: above this threshold, mails and names don't match :param exc: raise an Exception if not found :return: list of available mails, boolean The second results is True if no email were found in the list. :githublink:`%|py|243` """ # we check the easy case if isinstance(name, float): name = str(name) if not numpy.isnan(name) else "" if name in emails: return [(0, name)] pieces = [_.strip() for _ in ProjectsRepository._regex_split.split( remove_diacritics(name.lower()))] pieces.sort() pieces = " ".join(pieces) res = [] for email in emails: spl = [_.strip() for _ in ProjectsRepository._regex_split.split( remove_diacritics(email.split("@")[0].lower()))] spl.sort() mail = " ".join(spl) d = edit_distance(mail, pieces)[0] res.append((d, email)) res = [_ for _ in res if _[0] <= threshold] res.sort() if exc and len(res) == 0: raise ProjectsRepository.MailNotFound( # pragma: no cover "unable to find a mail for {0} among\n{1}".format(name, "\n".join(emails))) return res
[docs] @staticmethod def match_mails(names, emails, threshold=3, exc=True, skip_names=None): """ Tries to match a series of names among a list of mails. :param names: list of names (first name last name separated by a space) :param emails: list of emails :param threshold: above this threshold, mails and names don't match :param exc: raise an Exception if not found :param skip_names: the second boolean is True is one of the name belongs to this list :return: list of available mails, boolean The second results is True if no email were found in the list. :githublink:`%|py|283` """ res = [] skip = False for name in names: if skip_names is not None and name in skip_names: skip = True r = ProjectsRepository.match_mail(name, emails, threshold, exc) res.extend([_[1] for _ in r]) return res, skip
[docs] @staticmethod def create_folders_from_dataframe(df, root, report="suivi.rst", col_student=None, col_group="Groupe", col_subject="Sujet", col_mail="mail", overwrite=False, email_function=None, must_have_email=True, skip_if_nomail=False, skip_names=None, fLOG=noLOG): """ Creates a series of folders for groups of students. :param root: where to create the folders :param col_student: column which contains the student name (firt name + last name), equal to *col_mail* if *None* :param col_group: index of the group (it can be *None* if each student is a group) :param col_subject: column which contains the subject :param col_mail: if there is a column which contains the mail in the input dataframe :param df: DataFrame :param email_function: function which infers email from first and last names, see below :param report: report file :param overwrite: if False, skip if the report already exists :param must_have_email: if True, raises an exception if no mail is found :param skip_if_nomail: skip a name if no mail is found :param skip_names: less checking for a given set of names :param fLOG: logging function :return: list of creates folders The function *email_function* has the following signature:: def email_function(names): # part of a names is a list of tokens # ... return list of mails, skip=boolean The boolean tells the function to skip this group. *email_function* can be a list of mails. In that case, this function is replaced by :meth:`match_mails <ensae_teaching_cs.automation_students.projects_repository.ProjectsRepository.match_mails>`. :githublink:`%|py|327` """ if col_mail is None and email_function is None: raise ValueError( # pragma: no cover "col_mail cannot be None if email_function is None") if col_student is None: col_student = col_mail def local_email_function(names, skip_names): return ProjectsRepository.match_mails(names, email_function, exc=False, skip_names=skip_names) def local_email_function_column(names, skip_names, mapping): res = [] skip = False for name in names: if skip_names is not None and name in skip_names: skip = True r = mapping.get(name, None) if r: res.append(r) return res, skip if isinstance(email_function, (list, set)): if col_mail is None: local_function = local_email_function else: try: ind_student = list(df.columns).index(col_student) + 1 ind_mail = list(df.columns).index(col_mail) + 1 except ValueError as e: raise ValueError( # pragma: no cover "Unable to find '{0}' or '{1}' in {2}".format( col_student, col_mail, df.columns)) from e mapping = {} for row in df.itertuples(): mapping[row[ind_student]] = row[ind_mail] local_function = \ lambda names, skip, mp=mapping: \ local_email_function_column(names, skip_names, mp) else: local_function = email_function def ul(last): res = "" for i, c in enumerate(last): if c == " ": res += "." elif c == "-": res += "." elif c == '@': break else: res += c return res folds = [] if df.shape[1] == 0: raise Exception("No column in the dataframe.") # pragma: no cover if col_group: gr = df.groupby(col_group) else: df2 = df.copy() df2["gid"] = df.index df2["gid2"] = df2.gid.apply(lambda x: "G%d" % x) gr = df2.groupby("gid2") fLOG("[ProjectsRepository.create_folders_from_dataframe] number of groups {0}".format( len(gr))) for name, group in gr: if col_subject: s = list(set(group[col_subject].copy())) s = [_ for _ in s if not isinstance( _, float) or ~numpy.isnan(_)] if len(s) > 1: raise TooManyProjectsException( # pragma: no cover "more than one subject for group: " + str(name) + "\n" + str(s)) elif len(s) == 0: s = ["unknown"] subject = s[0] else: subject = None eleves = list(group[col_student]) eleves.sort() if email_function is not None: mails, skip = local_function(eleves, skip_names) if must_have_email and (not skip and len(mails) == 0): # we skip only if a group has no mails at all if isinstance(email_function, (list, set)): mes = "unable to find a mail for\n{0}\nname={1}\nskip:{4}\n{5}\namong\n{3}\nGROUP\n{2}\nlocal_function: {6}" raise ProjectsRepository.MailNotFound( # pragma: no cover mes.format("; ".join("'%s'" % _ for _ in eleves), name, group, "\n".join(email_function), skip, skip_names, local_function)) raise ProjectsRepository.MailNotFound( # pragma: no cover "unable to find a mail for {0}\nname={1}\n with function\n{3}\nGROUP\n{2}\nTYPE:\n{4}".format( " ;".join(eleves), name, group, email_function, type(email_function))) if skip_if_nomail and (not skip and len(mails) == 0): fLOG("[ProjectsRepository.create_folders_from_dataframe] skipping {0}".format( "; ".join(eleves))) continue if mails: for m in mails: if "@" not in m: raise ValueError( # pragma: no cover "mails contains a mail with no @: {0}".format(m)) if "<" in m or ">" in m: raise ValueError( # pragma: no cover "one mail contains weird characters: {0}".format(m)) jmail = "; ".join(mails) else: jmail = None else: jmail = None if jmail is not None: if "@" not in jmail: raise ValueError( # pragma: no cover "jmail does not contain any @: {0}".format(jmail)) members = ", ".join(map(str, eleves)) content = [members] content.append("=" * len(members)) content.append("") content.append("* members: {0}".format(members)) if subject: content.append("* subject: {0}".format(subject)) content.append("* G: {0}".format(name)) if jmail: content.append("* mails: " + jmail) content.append("") content.append("") last = "-".join(ul(a) for a in sorted(map(str, eleves))) folder = os.path.join(root, last) filename = os.path.join(folder, report) if not os.path.exists(folder): if '@' in folder: raise ValueError( # pragma: no cover "Folder '{0}' must not contain '@'.".format(folder)) os.mkdir(folder) if overwrite or not os.path.exists(filename): with open(filename, "w", encoding="utf8") as f: f.write("\n".join(content)) folds.append(folder) proj = ProjectsRepository(root, suivi=report, fLOG=fLOG) if must_have_email: for gr in proj.Groups: mails = proj.get_emails(gr) if len(mails) == 0: raise ValueError( # pragma: no cover "No mail for group '{0}'.".format(gr)) return proj
[docs] def enumerate_group_mails(self, group, mailbox, subfolder, date=None, skip_function=None, max_dest=5): """ Enumerates all mails sent by or sent to a given group. :param group: group (if None, goes through all mails) :param mailbox: mailbox (see `pymmails <>`_) :param subfolder: which subfolder of the mailbox to look into :param date: date :param skip_function: if not None, use this function on the header/body to avoid loading the entire message (and skip it) :param max_dest: maximum number of receivers :return: iterator on mails :githublink:`%|py|506` """ if group is None: for group_ in self.Groups: self.fLOG( "[ProjectsRepository.enumerate_group_mails] group='{0}'".format(group_)) iter = self.enumerate_group_mails(group_, mailbox, subfolder=subfolder, date=date, skip_function=skip_function, max_dest=max_dest) for mail in iter: yield mail else: mails = self.get_emails(group) self.fLOG("[ProjectsRepository.enumerate_group_mails] mails='{0}' folder='{1}' date={2}".format( str(mails), subfolder, date)) iter = mailbox.enumerate_search_person( person=mails, folder=subfolder, skip_function=skip_function, date=date, max_dest=5) for mail in iter: yield mail
[docs] def dump_group_mails(self, renderer, group, mailbox, subfolder, date=None, skip_function=None, max_dest=5, filename="index_mails.html", overwrite=False, skip_if_empty=False, convert_files=False): """ Enumerates all mails sent by or sent to a given group. :param renderer: instance of class `EmailMessageListRenderer < email_message_list_renderer.html>`_ :param group: group :param mailbox: mailbox (see `pymmails <>`_) :param subfolder: which subfolder of the mailbox to look into :param date: date :param skip_function: if not None, use this function on the header/body to avoid loading the entire message (and skip it) :param max_dest: maximum number of receivers :param filename: filename which gathers a link to every mail :param overwrite: overwrite :param skip_if_empty: skip if no mail? :param convert_files: unzip and convert :return: list of files (see `EmailMessageListRenderer.write < email_message_list_renderer.html>`_) zip, gz, rar, 7z can be uncompressed. It then convert *.py* and *.ipynb* into html. :githublink:`%|py|554` """ if group is None: res = [] for group_ in self.Groups: r = self.dump_group_mails(renderer, group_, mailbox, subfolder=subfolder, date=date, skip_function=skip_function, max_dest=max_dest, overwrite=overwrite, skip_if_empty=skip_if_empty, convert_files=convert_files) res.extend(r) return res else: mails = self.get_emails(group, skip_if_empty=skip_if_empty) if skip_if_empty and len(mails) == 0: self.fLOG("[ProjectsRepository.dump_group_mails] SKIP group='{0}' folder='{1}' date={2} mails={3}".format( group, subfolder, date, str(mails))) return [] else: self.fLOG("[ProjectsRepository.dump_group_mails] group='{0}' folder='{1}' date={2} mails={3}".format( group, subfolder, date, str(mails))) def iter_mail(body=True): return mailbox.enumerate_search_person(person=mails, folder=subfolder, skip_function=skip_function, date=date, max_dest=max_dest, body=body) nbmails = len(self.list_mails(group)) nbcur = len(list(iter_mail(body=False))) if nbmails != nbcur: overwrite = True self.fLOG("[dump_group_mails] group='{0}' - new mails".format( group), nbcur, "<", "nbmails") iter = iter_mail(body=True) location = self.get_group_location(group) r = renderer.write(iter=iter, location=location, filename=filename, overwrite=overwrite, file_jsatt="_summaryattachements_raw.json", attach_folder="attachments") renderer.flush() # attachments in JSON format json_att = [] metadata = {} for name in self.enumerate_group_files(group): if "attachments" not in name or not name.endswith('.metadata'): continue sname = os.path.relpath(name, location).replace("\\", "/") metadata[sname[:-9]] = sname for name in self.enumerate_group_files(group): if "attachments" not in name or name.endswith('.metadata'): continue sname = os.path.relpath(name, location).replace("\\", "/") info = dict(a=sname, name=sname) if sname in metadata: info['info'] = '<a href="{0}">metadata</a>'.format( metadata[sname]) json_att.append(info) if convert_files: converted = self.unzip_convert(group) for conv in converted: sconv = os.path.relpath(conv, location).replace("\\", "/") json_att.append( dict(a=sconv, name=sconv, unzip_convert='Yes')) file_jsatt = os.path.join(location, "_summaryattachements.json") if json_att and not renderer.BufferWrite.exists(file_jsatt, local=not overwrite): f = file_jsatt, text=True, encoding='utf-8') js = json.dumps(json_att) f.write(js) return r
[docs] def remove_group(self, group): """ Removes a group. :param group: group :return: list of removed files See `remove_folder < pyquickhelper/filehelper/synchelper.html#module-pyquickhelper.filehelper.synchelper>`_. :githublink:`%|py|639` """ loc = self.get_group_location(group) return remove_folder(loc)
[docs] def enumerate_group_files(self, group): """ Enumerates all files in a group. :param group: group :return: iterator on files :githublink:`%|py|649` """ if group is None: for g in self.Groups: for _ in self.enumerate_group_files(g): yield _ else: loc = self.get_group_location(group) for _ in explore_folder_iterfile(loc): yield _
[docs] def list_mails(self, group): """ Returns the number of mails of a group. :param group: group name :return: list of mails :githublink:`%|py|665` """ names = list(self.enumerate_group_files(group)) mails = [] for name in names: if "attachments" in name: continue name_d = os.path.split(name)[-1] if name_d.startswith("d_") and name_d.endswith(".html"): mails.append(name) return mails
[docs] def zip_group(self, group, outfile, addition=None): """ Zips a group. :param group: group :param outfile: output file :param addition: additional files (sequence) :return: list of zipped files :githublink:`%|py|684` """ def iter_files(): for _ in self.enumerate_group_files(group): yield _ if addition: for _ in addition: yield _ return zip_files(outfile, iter_files(), root=self._location)
_link_regex = re.compile("(https?[:][^ \\\"<>)(]+)") _known_strings = ["", "doodle", "ensaenotebook", "teralab", "", "gohlke", "", "help.github", "api.jcdecaux"] _default_template_summary = """<?xml version="1.0" encoding="utf-8"?> <head> <meta http-equiv="content-type" content="text/html; charset=utf-8" /> </head> <body> <html> <head> <title>{{ title }}</title> <link rel="stylesheet" type="text/css" href="{{ css }}"> </head> <body> <h1>{{ title }}</h1> <ol type="1"> {% for ps in groups %} <li><a href="{{ ps["link"] }}">{{ ps["group"] }}</a><small><i> {{ ps["nb"] }} files - {{ format_size(ps["size"]) }} - {% if len(ps["emails"]) > 0 %} last mail {{ ps["emails"][-1]["date"] }} ---{% else %} No mail found. {% endif %} {{ len(ps["attachments"]) }} attachments</i></small> {% if len(ps["attachments"]) + len(ps["links"]) > 0 %} <ul> {% for day, att, data in ps["attachments"] %} <li>att: {{ day }} - <a href="{{ att }}">{{ os.path.split(att)[-1] }}</a></li> {% endfor %} {% for date, from_, url, domain, last in ps["links"] %} <li>link: {{ date }} <a href="{{ url }}">{{ domain }} // {{ last }}</a> from {{ from_ }}</li> {% endfor %} </ul> {% endif %} {% if len(ps["created_files"]) > 0 %} <ul> {% for name, relpath, size in ps["created_files"] %} <li>added: <a href="{{ relpath }}">{{ name }}</a> {{ size }}</li> {% endfor %} </ul> {% endif %} </li> {% endfor %} </ol> </body> </html> """.replace(" ", "")
[docs] def write_run_command(self, filename=None, renderer=None): """ Writes a command script to run a server for this local content. The server runs the javascripts fetching for local files. The content is available at ``http://localhost:9000/``. :githublink:`%|py|748` """ if filename is None: if sys.platform.startswith('win'): filename = "run_server.bat" else: filename = "" url = "http://localhost:9000/" content = textwrap.dedent(""" echo Open a browser with url '{}' python3 -m http.server 9000 """).format(url) dest = os.path.join(self.Location, filename) self.fLOG("[write_run_command] write '{}'.".format(dest)) with open(dest, 'w') as f: f.write(content)
[docs] def write_summary(self, renderer=None, link="index_mails.html", outfile="index.html", title="summary", nolink_if=None): """ Produces a summary and uses a :epkg:`Jinja2` template. :param renderer: instance of `EmailMessageRenderer < helpsphinx//pymmails/render/email_message_renderer.html>`_), can be None :param link: look for this file in each folder :param outfile: output file :param nolink_if: link containing those strings will be removed (if None, a default set will be assigned) :param title: title :return: summary The current default template is:: .. runpython:: from ensae_teaching_cs.automation_students.projects_repository import _default_template_summary_template print(_default_template_summary) :githublink:`%|py|787` """ if nolink_if is None: nolink_if = ProjectsRepository._known_strings def filter_in(url): if "\n" in url or "\r" in url or "\t" in url: return False if url.endswith("&quot;"): return False for _ in nolink_if: if _ in url: return False if ".ipynb_checkpoints" in url: return False return True def clean_url(u): u = u.replace("&#43;", "+").strip(".#'/ \r\n\t ") if u.endswith("&nbsp;"): u = u[:-6] return u def url_domain_name(url): r = urlparse(url) domain = r.netloc name = [_ for _ in url.split("/") if _] last = name[-1] if len(name) > 0 else domain if len(last) > 30: last = last[-30:] return domain, clean_url(last) def format_size(s): if s <= 2 ** 11: return "{0} bytes".format(s) elif s <= 2 ** 21: return "{0} Kb".format(s // (2 ** 10)) elif s <= 2 ** 31: return "{0} Mb".format(s // (2 ** 20)) else: return "{0} Gb".format(s // (2 ** 30)) groups = [] for group in self.Groups: lp = os.path.join(self.get_group_location(group), link) if os.path.exists(lp): c = os.path.relpath(lp, self._location), group else: c = "file:///{0}".format(group), group nb_files = 0 size = 0 atts = [] emails = [] links = [] created_files = [] for name in self.enumerate_group_files(group): if name.endswith(".metadata"): continue loc = self.get_group_location(group) nb_files += 1 tn = name size += os.stat(tn).st_size folder = os.path.split(name)[0] splf = folder.replace("\\", "/").split("/") if folder.endswith("attachments"): meta = name + ".metadata" if os.path.exists(meta): data = EmailMessage.read_metadata(meta) day = data["date"].strftime("%Y-%m-%d") else: data = None day = "" atts.append((day, os.path.relpath( name, self._location), data)) elif "attachments" in splf: rel = os.path.relpath(name, loc) dest = os.path.relpath(name, self._location) if rel == dest: raise Exception( # pragma: no cover "weird\n{0}\n{1}".format(rel, dest)) ssize = format_size(os.stat(name).st_size) if "__MACOSX" not in rel and "__MACOSX" not in dest and \ ".ipynb_checkpoints" not in dest and ".ipynb_checkpoints" not in rel: created_files.append((rel, dest, ssize)) else: mail = os.path.split(name)[-1] res = EmailMessage.interpret_default_filename(mail) if "date" in res and "uid" in res and "from" in res: emails.append( (res["date"], res["from"], res["uid"], res)) with open(os.path.join(loc, mail), "r", encoding="utf8") as f: content = urls = ProjectsRepository._link_regex.findall(content) if urls: for u in set(urls): u = clean_url(u) if not filter_in(u): continue domain, last = url_domain_name(u) links.append( (res["date"], res["from"], clean_url(u), domain, last)) # we sort atts.sort() links.sort() # we clean duplicated links mlinks = links links = [] done = {} for date, from_, url, domain, last in mlinks: if url in done: continue if "__MACOSX" in url or "__MACOSX" in last or \ ".ipynb_checkpoints" in last or ".ipynb_checkpoints" in url: continue links.append((date, from_, url, domain, last)) done[url] = True # we create the variable for the template emails = [_[-1] for _ in sorted(emails)] c = dict(link=c[0].replace("\\", "/"), group=c[1], nb=nb_files, size=size, attachments=atts, emails=emails, links=links, created_files=created_files) groups.append(c) # final summary if renderer is None: tmpl = ProjectsRepository._default_template_summary renderer = EmailMessageRenderer(tmpl=tmpl, fLOG=self.fLOG) dof = True else: dof = False res = renderer.write(filename=outfile, location=self.Location, mail=None, attachments=None, groups=groups, title=title, len=len, os=os, format_size=format_size) if dof: renderer.flush() return res
[docs] def unzip_convert(self, group): """ Unzips files and convert notebooks into :epkg:`HTML`. :param group: group name :return: list of new files :githublink:`%|py|934` """ self.unzip_files(group) return self.convert_files(group)
[docs] def unzip_files(self, group): """ Unzips files and convert notebooks into :epkg:`HTML`. :param group: group name :return: list of new filess :githublink:`%|py|944` """ def fvalid(zip_name, local_name): if "__pycache__" in zip_name: return False if zip_name.endswith(".pyc"): return False return True def clean_f(folder): folder = folder.replace(" ", "_").replace( ",", "_").replace("&", "_").replace("\r", "_") folder = folder.replace("\n", "_").replace("\t", "_") return folder names = list(self.enumerate_group_files(group)) files = [] for name in names: if "attachments" not in name: continue ext = os.path.splitext(name)[-1] if ext == ".zip": folder = os.path.splitext(name)[0] + "_zip" folder = clean_f(folder) if not os.path.exists(folder): self.fLOG( "[ProjectsRepository.unzip_files] unzip '{0}'".format(name)) self.fLOG( "[ProjectsRepository.unzip_files] creating '{0}'".format(folder)) os.makedirs(folder) try: lf = unzip_files( name, folder, fLOG=self.fLOG, fvalid=fvalid, fail_if_error=False) except (zipfile.BadZipFile, NotImplementedError, OSError) as e: self.fLOG( "[ProjectsRepository.unzip_files] ERROR: unable to unzip '{0}' because of '{1}']".format(name, e)) lf = [] files.extend(lf) else: # already done, we do not do it again pass elif ext == ".7z": folder = os.path.splitext(name)[0] + "_7z" folder = clean_f(folder) if not os.path.exists(folder): self.fLOG( "[ProjectsRepository.un7zip_files] un7zip '{0}'".format(name)) self.fLOG( "[ProjectsRepository.un7zip_files] creating '{0}'".format(folder)) os.makedirs(folder) lf = un7zip_files( name, folder, fLOG=self.fLOG, fvalid=fvalid) files.extend(lf) else: # already done, we do not do it again pass elif ext == ".rar": folder = os.path.splitext(name)[0] + "_rar" folder = clean_f(folder) if not os.path.exists(folder): self.fLOG( "[ProjectsRepository.unrar_files] unrar '{0}'".format(name)) self.fLOG( "[ProjectsRepository.unrar_files] creating '{0}'".format(folder)) os.makedirs(folder) lf = unrar_files( name, folder, fLOG=self.fLOG, fvalid=fvalid) files.extend(lf) else: # already done, we do not do it again pass elif name.endswith(".tar.gz"): folder = os.path.splitext(name)[0] + "_targz" folder = clean_f(folder) if not os.path.exists(folder): self.fLOG( "[ProjectsRepository.untar_files] ungzip '{0}'".format(name)) self.fLOG( "[ProjectsRepository.untar_files] creating '{0}'".format(folder)) os.makedirs(folder) unzip = "pkl.gz" not in name lf = untar_files(name, folder, fLOG=self.fLOG) files.extend(lf) else: # already done, we do not do it again pass elif ext == ".gz": folder = os.path.splitext(name)[0] + "_gz" folder = clean_f(folder) if not os.path.exists(folder): self.fLOG( "[ProjectsRepository.ungzip_files] ungzip '{0}'".format(name)) self.fLOG( "[ProjectsRepository.ungzip_files] creating '{0}'".format(folder)) os.makedirs(folder) unzip = "pkl.gz" not in name lf = ungzip_files( name, folder, fLOG=self.fLOG, fvalid=fvalid, unzip=unzip) files.extend(lf) else: # already done, we do not do it again pass return files
[docs] def convert_files(self, group): """ Converts all notebooks and python scripts into :epkg:`HTML` for a group. :param group: group name :return: list of new files :githublink:`%|py|1053` """ names = list(self.enumerate_group_files(group)) files = [] for name in names: if "attachments" not in name: continue ext = os.path.splitext(name)[-1] if ext == ".ipynb": self.fLOG( "[ProjectsRepository.convert_files] convert '{0}'".format(name)) out = name + ".html" if os.path.exists(out): warnings.warn( "[convert_files] overwriting '{0}'".format(out)) try: upgrade_notebook(name) nb2html(name, out, exc=False) files.append(out) except Exception as e: warnings.warn( "unable to convert a notebook '{0}' because of {1}".format(name, e)) elif ext == ".py": self.fLOG( "[ProjectsRepository.convert_files] convert '{0}'".format(name)) out = name + ".html" if os.path.exists(out): warnings.warn( "[convert_files] overwriting '{0}'".format(out)) try: py_to_html_file(name, out, False, title=os.path.relpath( name, self.get_group_location(group))) files.append(out) except Exception: # the syntax of the python file might be wrong warnings.warn( "unable to convert File \"{0}\"".format(name)) return files