Coverage for src/ensae_teaching_cs/automation_students/interro_motif.py: 77%
111 statements
« prev ^ index » next coverage.py v7.1.0, created at 2023-04-28 06:23 +0200
« prev ^ index » next coverage.py v7.1.0, created at 2023-04-28 06:23 +0200
1# -*- coding: utf-8 -*-
2"""
3@file
4@brief Retrieve python files and run them.
5"""
6import os
7import sys
8import hashlib
9import time
10import pandas
11from pyquickhelper.loghelper import noLOG, run_cmd
12from pyquickhelper.filehelper import explore_folder_iterfile
13from pyquickhelper.filehelper.download_helper import get_url_content_timeout
14from ..td_1a.edit_distance import edit_distance
17def _get_code(mail):
18 m = hashlib.md5()
19 m.update(mail)
20 b = m.digest()
21 return int(b[0])
24def execute_python_scripts(root, df, col_names=None, url=None, eol="/", fLOG=noLOG, gen_mail=None):
25 """
26 Retrieves all :epkg:`python` scripts and run them.
28 @param root main folder
29 @param df dataframe
30 @param col_names dictionary for columns:
31 folder, mail, program, out, err, url, cmp, url_content, key, time
32 @param eol if not None, replaces end of lines by *eof*
33 @param gen_mail generator of mails
34 @param fLOG logging function
35 @return dataframe
36 """
37 if gen_mail is None:
38 def iter_mail(mail):
39 yield mail
40 yield mail.lower()
41 gen_mail = iter_mail
43 def post_process(out, eol):
44 out = out.strip("\r\t\n").rstrip().replace(
45 "\r", "").replace("\t", " ")
46 if eol:
47 out = out.replace("\n", eol)
48 return out
50 downloads = {}
51 res = []
52 for name, mail in zip(df[col_names.get("folder", "folder")], df[col_names.get("mail", "mail")]):
53 row = {col_names.get("folder", "folder"): name}
54 fLOG(f"[execute_python_script], look into '{name}'")
55 subf = os.path.join(root, name)
56 col_find = col_names.get("exists", "exists")
57 if not os.path.exists(subf):
58 subf = os.path.join(root, name.replace("-", "."))
59 if not os.path.exists(subf):
60 row[col_find] = False
61 res.append(row)
62 else:
63 row[col_find] = True
64 store = []
65 for py in explore_folder_iterfile(subf, ".*[.]py$"):
66 store.append(py)
67 fLOG(" -", len(store), "programs found")
69 col_out = col_names.get("out", "out")
70 col_err = col_names.get("err", "err")
71 col_prog = col_names.get("program", "program")
72 col_time = col_names.get("time", "time")
73 col_key = col_names.get("key", "key")
74 col_size = col_names.get("size", "size")
75 col_url = col_names.get("url", "url")
76 col_ind = col_names.get("pattern_id", "pattern_id")
78 if len(store) == 0:
79 for mm in sorted(gen_mail(mail.strip())):
80 mailid = _get_code(mm.encode("utf-8"))
81 r = row.copy()
82 loc = url.format(mailid)
83 ind = {col_key: mm, col_ind: mailid, col_url: loc}
84 r.update(ind)
85 res.append(r)
86 continue
88 # test all programs
89 outs = []
90 for py in sorted(store):
91 cmd = f'"{sys.executable}" "{py}"'
92 t1 = time.perf_counter()
93 try:
94 out, err = run_cmd(cmd, wait=True)
95 except Exception as e:
96 out = None
97 err = str(e)
98 out = post_process(out, eol)
99 t2 = time.perf_counter()
100 outs.append({col_out: out, col_err: post_process(err, eol),
101 col_prog: os.path.split(py)[-1], col_time: t2 - t1,
102 col_size: os.stat(py).st_size})
104 if url is None:
105 for o in outs:
106 r = row.copy()
107 r.update(o)
108 res.append(r)
109 elif url is not None:
110 col_cmp = col_names.get("cmp", "cmp")
111 col_in = col_names.get(
112 "sortie_dans_motif", "sortie_dans_motif")
113 col_in2 = col_names.get(
114 "motif_dans_sortie", "motif_dans_sortie")
115 col_dist = col_names.get("dist", "dist")
116 col_content = col_names.get("content", "content")
118 if out is None:
119 for _, mm in gen_mail(mail.strip()):
120 mailid = _get_code(mm.encode("utf-8"))
121 ind = {col_ind: mailid}
122 for o in outs:
123 r = row.copy()
124 r.update(o)
125 r.update(ind)
126 res.append(r)
127 else:
128 for mm in sorted(gen_mail(mail.strip())):
129 mailid = _get_code(mm.encode("utf-8"))
130 loc = url.format(mailid)
131 ind = {col_key: mm, col_ind: mailid, col_url: loc}
133 if loc not in downloads:
134 downloads[loc] = get_url_content_timeout(
135 loc).strip("\n\r\t ")
136 content = post_process(downloads[loc], eol)
137 ind[col_content] = content
139 for o in outs:
140 r = row.copy()
141 r.update(o)
142 r.update(ind)
143 out = r[col_out]
144 r[col_cmp] = out == content or out.strip(
145 ) == content.strip()
146 r[col_in] = out in content
147 r[col_in2] = content in out
148 r[col_dist] = (edit_distance(out, content)[0]) if (
149 len(content) > len(out) // 2) else abs(len(content) - len(out))
150 res.append(r)
151 return pandas.DataFrame(res)