Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# -*- coding: utf-8 -*- 

2""" 

3@file 

4@brief Implements function @see fn run_cmd. 

5""" 

6import sys 

7import os 

8import pprint 

9import subprocess 

10import textwrap 

11import traceback 

12import pickle 

13from multiprocessing import Pool, freeze_support 

14 

15 

16def execute_script(code, folder=None, filename="_temp_custom_run_script_.py", check=True): 

17 """ 

18 Executes a :epkg:`python` script in a separate process. 

19 

20 @param code python script 

21 @param folder write the script in a folder then runs it, 

22 it None, the function uses a Pool to execute 

23 the script 

24 @param filename name of the scrit to write 

25 @param check checks that the output is not empty 

26 @return dictionary with local variables 

27 """ 

28 addition = textwrap.dedent(""" 

29 loc = locals().copy() 

30 try: 

31 data = {'__file__': __file__} 

32 except NameError: 

33 data = {} 

34 import pickle 

35 for k, v in loc.items(): 

36 if v is None or isinstance(v, (str, int, float, tuple, list, dict, set)): 

37 try: 

38 pickle.dumps(v) 

39 except Exception: 

40 # not pickable 

41 continue 

42 data[k] = v 

43 __CHECK__ 

44 pkl = pickle.dumps(data) 

45 """) 

46 if check: 

47 checkc = textwrap.dedent(""" 

48 if len(data) == 0: 

49 import pprint 

50 raise RuntimeError("data cannot be empty.\\n{}".format(pprint.pformat(loc))) 

51 """) 

52 else: 

53 checkc = "" # pragma: no cover 

54 addition = addition.replace("__CHECK__", checkc) 

55 new_code = "\n".join([code, "", addition]) 

56 if folder is None: 

57 try: 

58 obj = compile(new_code, '', 'exec') 

59 except Exception: 

60 excs = traceback.format_exc() 

61 return {'ERROR': excs, 'code': new_code} 

62 lo = {} 

63 gl = {} 

64 try: 

65 exec(obj, gl, lo) 

66 except Exception: 

67 excs = traceback.format_exc() 

68 return {'ERROR': excs, 'code': new_code} 

69 

70 pkl = lo['pkl'] 

71 loc = pickle.loads(pkl) 

72 return loc 

73 else: 

74 name = os.path.join(folder, filename) 

75 data = name + ".pkl" 

76 new_code = new_code + \ 

77 "\nwith open('{}', 'wb') as f: f.write(pkl)".format( 

78 data.replace("\\", "/")) 

79 with open(name, "w", encoding="utf-8") as f: 

80 f.write(new_code) 

81 if sys.platform.startswith("win"): 

82 cmdl = '{0} -u "{1}"'.format(sys.executable, name) 

83 else: 

84 cmdl = [sys.executable, '-u', name] 

85 proc = subprocess.Popen(cmdl, stdout=subprocess.PIPE, 

86 stderr=subprocess.PIPE) 

87 _, errs = proc.communicate() 

88 errs = errs.decode('utf-8', errors="ignore") 

89 if errs: 

90 lines = errs.split('\n') 

91 lines = [ 

92 line for line in lines if "Warning" not in line and line and line[0] != ' '] 

93 errs2 = "\n".join(lines).strip("\r\n ") 

94 if errs2: 

95 return {'ERROR': errs} # pragma: no cover 

96 with open(data, "rb") as f: 

97 loc = pickle.load(f) 

98 return loc 

99 

100 

101def execute_script_get_local_variables(script, folder=None, 

102 filename="_temp_custom_run_script_.py", 

103 check=True): 

104 """ 

105 Executes a script and returns the local variables. 

106 

107 @param script filename or code 

108 @param folder write the script in a folder then runs it, 

109 it None, the function uses a Pool to execute 

110 the script 

111 @param filename name of the scrit to write 

112 @param check checks that the output is not empty 

113 @return dictionary 

114 """ 

115 if "\n" not in script and os.path.exists(script): 

116 with open(script, "r", encoding="utf-8") as f: 

117 content = f.read() 

118 else: 

119 content = script 

120 

121 if folder is None: 

122 with Pool(1, None, None, None) as p: 

123 res = p.map(execute_script, [content]) 

124 if len(res) != 1: 

125 raise RuntimeError( # pragma: no cover 

126 "Something went wrong with content\n{}".format(content)) 

127 return res[0] 

128 return execute_script(content, folder, filename, check=check) 

129 

130 

131def dictionary_as_class(dico): 

132 """ 

133 Every key of dictionary ``dico`` becomes 

134 a member of a dummy class. 

135 

136 @param dico dictionary 

137 @return class 

138 """ 

139 class dummy_class: 

140 def __init__(self, dico): 

141 for k, v in dico.items(): 

142 if not isinstance(k, str): 

143 raise TypeError( # pragma: no cover 

144 "Key '{}' must be a string.".format(k)) 

145 setattr(self, k, v) 

146 

147 def __str__(self): 

148 data = {k: v for k, v in self.__dict__.items() 

149 if not k.startswith("_")} 

150 return pprint.pformat(data) 

151 

152 def drop(self, *dr): 

153 sdr = set(dr) 

154 data = {k: v for k, v in self.__dict__.items() 

155 if not k.startswith("_") and k not in sdr} 

156 return dummy_class(data) 

157 

158 du = dummy_class(dico) 

159 return du 

160 

161 

162if __name__ == '__main__': # pragma: no cover 

163 freeze_support()