Coverage for pyquickhelper/loghelper/process_script.py: 96%
76 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-03 02:21 +0200
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-03 02:21 +0200
1# -*- coding: utf-8 -*-
2"""
3@file
4@brief Implements function @see fn run_cmd.
5"""
6import sys
7import os
8import pprint
9import subprocess
10import textwrap
11import traceback
12import pickle
13from multiprocessing import Pool, freeze_support
16def execute_script(code, folder=None, filename="_temp_custom_run_script_.py", check=True):
17 """
18 Executes a :epkg:`python` script in a separate process.
20 @param code python script
21 @param folder write the script in a folder then runs it,
22 it None, the function uses a Pool to execute
23 the script
24 @param filename name of the scrit to write
25 @param check checks that the output is not empty
26 @return dictionary with local variables
27 """
28 addition = textwrap.dedent("""
29 loc = locals().copy()
30 try:
31 data = {'__file__': __file__}
32 except NameError:
33 data = {}
34 import pickle
35 for k, v in loc.items():
36 if v is None or isinstance(v, (str, int, float, tuple, list, dict, set)):
37 try:
38 pickle.dumps(v)
39 except Exception:
40 # not pickable
41 continue
42 data[k] = v
43 __CHECK__
44 pkl = pickle.dumps(data)
45 """)
46 if check:
47 checkc = textwrap.dedent("""
48 if len(data) == 0:
49 import pprint
50 raise RuntimeError("data cannot be empty.\\n{}".format(pprint.pformat(loc)))
51 """)
52 else:
53 checkc = "" # pragma: no cover
54 addition = addition.replace("__CHECK__", checkc)
55 new_code = "\n".join([code, "", addition])
56 if folder is None:
57 try:
58 obj = compile(new_code, '', 'exec')
59 except Exception:
60 excs = traceback.format_exc()
61 return {'ERROR': excs, 'code': new_code}
62 lo = {}
63 gl = {}
64 try:
65 exec(obj, gl, lo)
66 except Exception:
67 excs = traceback.format_exc()
68 return {'ERROR': excs, 'code': new_code}
70 pkl = lo['pkl']
71 loc = pickle.loads(pkl)
72 return loc
73 else:
74 name = os.path.join(folder, filename)
75 data = name + ".pkl"
76 new_code = new_code + \
77 "\nwith open('{}', 'wb') as f: f.write(pkl)".format(
78 data.replace("\\", "/"))
79 with open(name, "w", encoding="utf-8") as f:
80 f.write(new_code)
81 if sys.platform.startswith("win"):
82 cmdl = f'{sys.executable} -u "{name}"'
83 else:
84 cmdl = [sys.executable, '-u', name]
85 proc = subprocess.Popen(cmdl, stdout=subprocess.PIPE,
86 stderr=subprocess.PIPE)
87 _, errs = proc.communicate()
88 errs = errs.decode('utf-8', errors="ignore")
89 if errs:
90 lines = errs.split('\n')
91 lines = [
92 line for line in lines if "Warning" not in line and line and line[0] != ' ']
93 errs2 = "\n".join(lines).strip("\r\n ")
94 if errs2:
95 return {'ERROR': errs} # pragma: no cover
96 with open(data, "rb") as f:
97 loc = pickle.load(f)
98 return loc
101def execute_script_get_local_variables(script, folder=None,
102 filename="_temp_custom_run_script_.py",
103 check=True):
104 """
105 Executes a script and returns the local variables.
107 @param script filename or code
108 @param folder write the script in a folder then runs it,
109 it None, the function uses a Pool to execute
110 the script
111 @param filename name of the scrit to write
112 @param check checks that the output is not empty
113 @return dictionary
114 """
115 if "\n" not in script and os.path.exists(script):
116 with open(script, "r", encoding="utf-8") as f:
117 content = f.read()
118 else:
119 content = script
121 if folder is None:
122 with Pool(1, None, None, None) as p:
123 res = p.map(execute_script, [content])
124 if len(res) != 1:
125 raise RuntimeError( # pragma: no cover
126 f"Something went wrong with content\n{content}")
127 return res[0]
128 return execute_script(content, folder, filename, check=check)
131def dictionary_as_class(dico):
132 """
133 Every key of dictionary ``dico`` becomes
134 a member of a dummy class.
136 @param dico dictionary
137 @return class
138 """
139 class dummy_class:
140 def __init__(self, dico):
141 for k, v in dico.items():
142 if not isinstance(k, str):
143 raise TypeError( # pragma: no cover
144 f"Key '{k}' must be a string.")
145 setattr(self, k, v)
147 def __str__(self):
148 data = {k: v for k, v in self.__dict__.items()
149 if not k.startswith("_")}
150 return pprint.pformat(data)
152 def drop(self, *dr):
153 sdr = set(dr)
154 data = {k: v for k, v in self.__dict__.items()
155 if not k.startswith("_") and k not in sdr}
156 return dummy_class(data)
158 du = dummy_class(dico)
159 return du
162if __name__ == '__main__': # pragma: no cover
163 freeze_support()