# -*- coding: utf-8 -*-
"""
Implements function :func:`run_cmd <pyquickhelper.loghelper.run_cmd.run_cmd>`.
:githublink:`%|py|6`
"""
import sys
import os
import pprint
import subprocess
import textwrap
import traceback
import pickle
from multiprocessing import Pool, freeze_support
[docs]def execute_script(code, folder=None, filename="_temp_custom_run_script_.py", check=True):
"""
Executes a :epkg:`python` script in a separate process.
:param code: python script
:param folder: write the script in a folder then runs it,
it None, the function uses a Pool to execute
the script
:param filename: name of the scrit to write
:param check: checks that the output is not empty
:return: dictionary with local variables
:githublink:`%|py|27`
"""
addition = textwrap.dedent("""
loc = locals().copy()
try:
data = {'__file__': __file__}
except NameError:
data = {}
import pickle
for k, v in loc.items():
if v is None or isinstance(v, (str, int, float, tuple, list, dict, set)):
try:
pickle.dumps(v)
except Exception:
# not pickable
continue
data[k] = v
__CHECK__
pkl = pickle.dumps(data)
""")
if check:
checkc = textwrap.dedent("""
if len(data) == 0:
import pprint
raise RuntimeError("data cannot be empty.\\n{}".format(pprint.pformat(loc)))
""")
else:
checkc = ""
addition = addition.replace("__CHECK__", checkc)
new_code = "\n".join([code, "", addition])
if folder is None:
try:
obj = compile(new_code, '', 'exec')
except Exception:
excs = traceback.format_exc()
return {'ERROR': excs, 'code': new_code}
lo = {}
gl = {}
try:
exec(obj, gl, lo)
except Exception:
excs = traceback.format_exc()
return {'ERROR': excs, 'code': new_code}
pkl = lo['pkl']
loc = pickle.loads(pkl)
return loc
else:
name = os.path.join(folder, filename)
data = name + ".pkl"
new_code = new_code + \
"\nwith open('{}', 'wb') as f: f.write(pkl)".format(
data.replace("\\", "/"))
with open(name, "w", encoding="utf-8") as f:
f.write(new_code)
if sys.platform.startswith("win"):
cmdl = '{0} -u "{1}"'.format(sys.executable, name)
else:
cmdl = [sys.executable, '-u', name]
proc = subprocess.Popen(cmdl, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
_, errs = proc.communicate()
errs = errs.decode('utf-8', errors="ignore")
if errs:
lines = errs.split('\n')
lines = [
line for line in lines if "Warning" not in line and line and line[0] != ' ']
errs2 = "\n".join(lines).strip("\r\n ")
if errs2:
return {'ERROR': errs}
with open(data, "rb") as f:
loc = pickle.load(f)
return loc
[docs]def execute_script_get_local_variables(script, folder=None,
filename="_temp_custom_run_script_.py",
check=True):
"""
Executes a script and returns the local variables.
:param script: filename or code
:param folder: write the script in a folder then runs it,
it None, the function uses a Pool to execute
the script
:param filename: name of the scrit to write
:param check: checks that the output is not empty
:return: dictionary
:githublink:`%|py|114`
"""
if "\n" not in script and os.path.exists(script):
with open(script, "r", encoding="utf-8") as f:
content = f.read()
else:
content = script
if folder is None:
with Pool(1, None, None, None) as p:
res = p.map(execute_script, [content])
if len(res) != 1:
raise RuntimeError(
"Something went wrong with content\n{}".format(content))
return res[0]
return execute_script(content, folder, filename, check=check)
[docs]def dictionary_as_class(dico):
"""
Every key of dictionary ``dico`` becomes
a member of a dummy class.
:param dico: dictionary
:return: class
:githublink:`%|py|138`
"""
class dummy_class:
def __init__(self, dico):
for k, v in dico.items():
if not isinstance(k, str):
raise TypeError("Key '{}' must be a string.".format(k))
setattr(self, k, v)
def __str__(self):
data = {k: v for k, v in self.__dict__.items()
if not k.startswith("_")}
return pprint.pformat(data)
def drop(self, *dr):
sdr = set(dr)
data = {k: v for k, v in self.__dict__.items()
if not k.startswith("_") and k not in sdr}
return dummy_class(data)
du = dummy_class(dico)
return du
if __name__ == '__main__': # pragma: no cover
freeze_support()