Coverage for pyquickhelper/loghelper/run_cmd.py: 95%

193 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-03 02:21 +0200

1# -*- coding: utf-8 -*- 

2""" 

3@file 

4@brief Implements function @see fn run_cmd. 

5""" 

6import sys 

7import os 

8import time 

9import subprocess 

10import threading 

11import warnings 

12import re 

13import queue 

14from .flog_fake_classes import PQHException 

15 

16 

17class RunCmdException(Exception): 

18 """ 

19 Raised by function @see fn run_cmd. 

20 """ 

21 pass 

22 

23 

24def get_interpreter_path(): 

25 """ 

26 Returns the interpreter path. 

27 """ 

28 if sys.platform.startswith("win"): 

29 return sys.executable.replace( # pragma: no cover 

30 "pythonw.exe", "python.exe") 

31 else: 

32 return sys.executable 

33 

34 

35def split_cmp_command(cmd, remove_quotes=True): 

36 """ 

37 Splits a command line. 

38 

39 @param cmd command line 

40 @param remove_quotes True by default 

41 @return list 

42 """ 

43 if isinstance(cmd, str): 

44 spl = cmd.split() 

45 res = [] 

46 for s in spl: 

47 if len(res) == 0: 

48 res.append(s) 

49 elif res[-1].startswith('"') and not res[-1].endswith('"'): 

50 res[-1] += " " + s 

51 else: 

52 res.append(s) 

53 if remove_quotes: 

54 nres = [] 

55 for _ in res: 

56 if _.startswith('"') and _.endswith('"'): 

57 nres.append(_.strip('"')) 

58 else: 

59 nres.append(_) 

60 return nres 

61 else: 

62 return res 

63 else: 

64 return cmd 

65 

66 

67def decode_outerr(outerr, encoding, encerror, msg): 

68 """ 

69 Decodes the output or the error after running a command line instructions. 

70 

71 @param outerr output or error 

72 @param encoding encoding (if None, it is replaced by ascii) 

73 @param encerror how to handle errors 

74 @param msg to add to the exception message 

75 @return converted string 

76 """ 

77 if encoding is None: 

78 encoding = "ascii" 

79 typstr = str 

80 if not isinstance(outerr, bytes): 

81 raise TypeError( # pragma: no cover 

82 "only able to decode bytes, not " + typstr(type(outerr))) 

83 try: 

84 out = outerr.decode(encoding, errors=encerror) 

85 return out 

86 except UnicodeDecodeError as exu: 

87 try: 

88 out = outerr.decode( 

89 "utf8" if encoding != "utf8" else "latin-1", errors=encerror) 

90 return out 

91 except Exception as e: # pragma: no cover 

92 out = outerr.decode(encoding, errors='ignore') 

93 raise RuntimeError( 

94 f"issue with cmd ({encoding}):{typstr(msg)}\n{typstr(exu)}" 

95 "\n-----\n{out}") from e 

96 raise RuntimeError( # pragma: no cover 

97 "complete issue with cmd:" + typstr(msg)) 

98 

99 

100def skip_run_cmd(cmd, sin="", shell=True, wait=False, log_error=True, 

101 stop_running_if=None, encerror="ignore", 

102 encoding="utf8", change_path=None, communicate=True, 

103 preprocess=True, timeout=None, catch_exit=False, fLOG=None, 

104 timeout_listen=None, tell_if_no_output=None, prefix_log=None): 

105 """ 

106 Has the same signature as @see fn run_cmd but does nothing. 

107 """ 

108 return "", "" 

109 

110 

111def run_cmd(cmd, sin="", shell=sys.platform.startswith("win"), wait=False, log_error=True, 

112 stop_running_if=None, encerror="ignore", encoding="utf8", 

113 change_path=None, communicate=True, preprocess=True, timeout=None, 

114 catch_exit=False, fLOG=None, tell_if_no_output=None, prefix_log=None): 

115 """ 

116 Runs a command line and wait for the result. 

117 

118 @param cmd command line 

119 @param sin sin: what must be written on the standard input 

120 @param shell if True, cmd is a shell command (and no command window is opened) 

121 @param wait call ``proc.wait`` 

122 @param log_error if log_error, call fLOG (error) 

123 @param stop_running_if the function stops waiting if some condition is fulfilled. 

124 The function received the last line from the logs. 

125 Signature: ``stop_waiting_if(last_out, last_err) -> bool``. 

126 The function must return True to stop waiting. 

127 This function can also be used to intercept the standard output 

128 and the standard error while running. 

129 @param encerror encoding errors (ignore by default) while converting the output into a string 

130 @param encoding encoding of the output 

131 @param change_path change the current path if not None (put it back after the execution) 

132 @param communicate use method `communicate 

133 <https://docs.python.org/3/library/subprocess.html#subprocess.Popen.communicate>`_ 

134 which is supposed to be safer, parameter ``wait`` must be True 

135 @param preprocess preprocess the command line if necessary (not available on Windows) 

136 (False to disable that option) 

137 @param timeout when data is sent to stdin (``sin``), a timeout is needed to avoid 

138 waiting for ever (*timeout* is in seconds) 

139 @param catch_exit catch *SystemExit* exception 

140 @param fLOG logging function (if not None, bypass others parameters) 

141 @param tell_if_no_output tells if there is no output every *tell_if_no_output* seconds 

142 @param prefix_log add a prefix to a line before printing it 

143 @return content of stdout, stdres (only if wait is True) 

144 

145 .. exref:: 

146 :title: Run a program using the command line 

147 

148 :: 

149 

150 from pyquickhelper.loghelper import run_cmd 

151 out, err = run_cmd("python setup.py install", wait=True) 

152 

153 If you are using this function to run :epkg:`git` function, parameter ``shell`` must be True. 

154 The function catches *SystemExit* exception. 

155 See `Constantly print Subprocess output while process is running 

156 <http://stackoverflow.com/questions/4417546/constantly-print-subprocess-output-while-process-is-running/4417735>`_. 

157 If *wait* is False, the function returns the started process. 

158 ``__exit__`` should be called if wait if False. 

159 Parameter *prefix_log* was added. 

160 """ 

161 if prefix_log is None: 

162 prefix_log = "" 

163 if fLOG is not None: 

164 if isinstance(cmd, (list, tuple)): 

165 fLOG( # pragma: no cover 

166 prefix_log + "[run_cmd] execute", " ".join(cmd)) 

167 else: 

168 fLOG(prefix_log + "[run_cmd] execute", cmd) 

169 

170 if change_path is not None: 

171 current = os.getcwd() 

172 os.chdir(change_path) 

173 

174 if sys.platform.startswith("win"): 

175 cmdl = cmd 

176 else: 

177 cmdl = split_cmp_command(cmd) if preprocess else cmd 

178 

179 if catch_exit: 

180 try: 

181 pproc = subprocess.Popen(cmdl, 

182 shell=shell, 

183 stdin=subprocess.PIPE if sin is not None and len( 

184 sin) > 0 else None, 

185 stdout=subprocess.PIPE if wait else None, 

186 stderr=subprocess.PIPE if wait else None) 

187 except SystemExit as e: 

188 if change_path is not None: # pragma: no cover 

189 os.chdir(current) 

190 raise RunCmdException( # pragma: no cover 

191 "SystemExit raised (1)") from e 

192 

193 else: 

194 pproc = subprocess.Popen(cmdl, 

195 shell=shell, 

196 stdin=subprocess.PIPE if sin is not None and len( 

197 sin) > 0 else None, 

198 stdout=subprocess.PIPE if wait else None, 

199 stderr=subprocess.PIPE if wait else None) 

200 

201 pproc.__enter__() # pylint: disable=C2801 

202 if isinstance(cmd, list): 

203 cmd = " ".join(cmd) 

204 

205 if wait: 

206 skip_out_err = False 

207 out = [] 

208 err = [] 

209 err_read = False 

210 skip_waiting = False 

211 

212 if communicate: 

213 # communicate is True 

214 if tell_if_no_output is not None: 

215 raise NotImplementedError( 

216 "tell_if_no_output is not implemented when communicate is True") 

217 if stop_running_if is not None: 

218 raise NotImplementedError( 

219 "stop_running_if is not implemented when communicate is True") 

220 input = sin if sin is None else sin.encode() 

221 if input is not None and len(input) > 0: 

222 if fLOG is not None: 

223 fLOG(prefix_log + "[run_cmd] input", [input]) 

224 

225 if catch_exit: 

226 try: 

227 stdoutdata, stderrdata = pproc.communicate( 

228 input=input, timeout=timeout) 

229 except SystemExit as e: # pragma: no cover 

230 if change_path is not None: 

231 os.chdir(current) 

232 raise RunCmdException("SystemExit raised (2)") from e 

233 else: 

234 stdoutdata, stderrdata = pproc.communicate( 

235 input=input, timeout=timeout) 

236 

237 out = decode_outerr(stdoutdata, encoding, encerror, cmd) 

238 err = decode_outerr(stderrdata, encoding, encerror, cmd) 

239 else: 

240 # communicate is False: use of threads 

241 if sin is not None and len(sin) > 0: 

242 if change_path is not None: 

243 os.chdir(current) # pragma: no cover 

244 pproc.__exit__(None, None, None) 

245 raise RuntimeError( 

246 "Argument 'communicate' should be True to send " 

247 "something on stdin.") 

248 stdout, stderr = pproc.stdout, pproc.stderr 

249 

250 begin = time.perf_counter() 

251 last_update = begin 

252 # with threads 

253 (stdoutReader, stdoutQueue) = _AsyncLineReader.getForFd( 

254 stdout, catch_exit=catch_exit) 

255 (stderrReader, stderrQueue) = _AsyncLineReader.getForFd( 

256 stderr, catch_exit=catch_exit) 

257 runloop = True 

258 

259 while (not stdoutReader.eof() or not stderrReader.eof()) and runloop: 

260 while not stdoutQueue.empty(): 

261 line = stdoutQueue.get() 

262 decol = decode_outerr( 

263 line, encoding, encerror, cmd) 

264 sdecol = decol.strip("\n\r") 

265 if fLOG is not None: 

266 fLOG(prefix_log + sdecol) 

267 out.append(sdecol) 

268 last_update = time.perf_counter() 

269 if stop_running_if is not None and stop_running_if(decol, None): 

270 runloop = False 

271 break 

272 

273 while not stderrQueue.empty(): 

274 line = stderrQueue.get() 

275 decol = decode_outerr( 

276 line, encoding, encerror, cmd) 

277 sdecol = decol.strip("\n\r") 

278 if fLOG is not None: 

279 fLOG(prefix_log + sdecol) 

280 err.append(sdecol) 

281 last_update = time.perf_counter() 

282 if stop_running_if is not None and stop_running_if(None, decol): 

283 runloop = False 

284 break 

285 time.sleep(0.05) 

286 

287 delta = time.perf_counter() - last_update 

288 if tell_if_no_output is not None and delta >= tell_if_no_output: 

289 fLOG( # pragma: no cover 

290 prefix_log + "[run_cmd] No update in {0} seconds for cmd: {1}".format( 

291 "%5.1f" % (last_update - begin), cmd)) 

292 last_update = time.perf_counter() # pragma: no cover 

293 full_delta = time.perf_counter() - begin 

294 if timeout is not None and full_delta > timeout: 

295 runloop = False # pragma: no cover 

296 fLOG( # pragma: no cover 

297 prefix_log + "[run_cmd] Timeout after {0} seconds for cmd: {1}".format( 

298 "%5.1f" % full_delta, cmd)) 

299 break # pragma: no cover 

300 

301 if runloop: 

302 # Waiting for async readers to finish... 

303 stdoutReader.join() 

304 stderrReader.join() 

305 

306 # Waiting for process to exit... 

307 returnCode = pproc.wait() 

308 err_read = True 

309 

310 if returnCode != 0: # pragma: no cover 

311 if change_path is not None: 

312 os.chdir(current) 

313 try: 

314 # we try to close the ressources 

315 stdout.close() 

316 stderr.close() 

317 except Exception as e: 

318 warnings.warn( 

319 "Unable to close stdout and sterr.", RuntimeWarning) 

320 if catch_exit: 

321 mes = ("SystemExit raised with error code {0}\nCMD:\n{1}\n" 

322 "CWD:\n{2}\n#---OUT---#\n{3}\n#---ERR---#\n{4}") 

323 raise RunCmdException(mes.format( 

324 returnCode, cmd, os.getcwd(), "\n".join(out), "\n".join(err))) 

325 raise subprocess.CalledProcessError(returnCode, cmd) 

326 

327 if not skip_waiting: 

328 pproc.wait() 

329 else: # pragma: no cover 

330 out.append("[run_cmd] killing process.") 

331 fLOG( 

332 prefix_log + "[run_cmd] killing process because stop_running_if returned True.") 

333 pproc.kill() 

334 err_read = True 

335 fLOG(prefix_log + "[run_cmd] process killed.") 

336 skip_out_err = True 

337 

338 out = "\n".join(out) 

339 if skip_out_err: 

340 err = "Process killed." # pragma: no cover 

341 else: 

342 if err_read: 

343 err = "\n".join(err) 

344 else: # pragma: no cover 

345 temp = err = stderr.read() 

346 try: 

347 err = decode_outerr(temp, encoding, encerror, cmd) 

348 except Exception: 

349 err = decode_outerr(temp, encoding, "ignore", cmd) 

350 stdout.close() 

351 stderr.close() 

352 

353 # same path for whether communicate is False or True 

354 err = err.replace("\r\n", "\n") 

355 if fLOG is not None: 

356 fLOG(prefix_log + "end of execution", cmd) 

357 

358 if len(err) > 0 and log_error and fLOG is not None: 

359 if "\n" in err: 

360 fLOG(prefix_log + "[run_cmd] stderr (log)") 

361 for eline in err.split("\n"): 

362 fLOG(prefix_log + eline) 

363 else: 

364 fLOG( # pragma: no cover 

365 prefix_log + f"[run_cmd] stderr (log)\n{err}") 

366 

367 if change_path is not None: 

368 os.chdir(current) 

369 

370 pproc.__exit__(None, None, None) 

371 if sys.platform.startswith("win"): # pragma: no cover 

372 if err is not None: 

373 err = err.strip("\n\r\t ") 

374 return out.replace("\r\n", "\n"), err.replace("\r\n", "\n") 

375 else: 

376 if err is not None: 

377 err = err.strip("\n\r\t ") 

378 return out, err 

379 else: 

380 

381 if change_path is not None: 

382 os.chdir(current) # pragma: no cover 

383 

384 return pproc, None 

385 

386 

387def parse_exception_message(exc): 

388 """ 

389 Parses the message embedded in an exception and returns the standard output and error 

390 if it can be found. 

391 

392 @param exc exception coming from @see fn run_cmd 

393 @return out, err 

394 """ 

395 mes = str(exc) 

396 reg = re.compile(".*#---OUT---#(.*)#---ERR---#(.*)", re.DOTALL) 

397 find = reg.search(mes.replace("\r", "")) 

398 if find: # pragma: no cover 

399 gr = find.groups() 

400 out, err = gr[0], gr[1] 

401 return out.strip("\n "), err.strip("\n ") 

402 else: 

403 return None, None 

404 

405 

406def run_script(script, *args, **kwargs): 

407 """ 

408 Runs a script. 

409 

410 @param script script to execute or command line starting with ``-m`` 

411 @param args other parameters 

412 @param kwargs sent to @see fn run_cmd 

413 @return out,err: content of stdout stream and stderr stream 

414 

415 Allows command line starting with ``-m``. 

416 """ 

417 if not script.startswith('-m') and not os.path.exists(script): 

418 raise PQHException(f"file {script} not found") 

419 py = get_interpreter_path() 

420 cmd = f"{py} {script}" 

421 if len(args) > 0: 

422 typstr = str 

423 cmd += " " + " ".join([typstr(x) for x in args]) 

424 out, err = run_cmd(cmd, **kwargs) 

425 return out, err 

426 

427 

428class _AsyncLineReader(threading.Thread): 

429 

430 def __init__(self, fd, outputQueue, catch_exit): 

431 threading.Thread.__init__(self) 

432 

433 assert isinstance(outputQueue, queue.Queue) 

434 assert callable(fd.readline) 

435 

436 self.fd = fd 

437 self.catch_exit = catch_exit 

438 self.outputQueue = outputQueue 

439 

440 def run(self): 

441 if self.catch_exit: 

442 try: 

443 for _ in map(self.outputQueue.put, iter(self.fd.readline, b'')): 

444 pass 

445 except SystemExit as e: # pragma: no cover 

446 self.outputQueue.put(str(e)) 

447 raise RunCmdException("SystemExit raised (3)") from e 

448 else: 

449 for _ in map(self.outputQueue.put, iter(self.fd.readline, b'')): 

450 pass 

451 

452 def eof(self): 

453 return not self.is_alive() and self.outputQueue.empty() 

454 

455 @classmethod 

456 def getForFd(cls, fd, start=True, catch_exit=False): 

457 q = queue.Queue() 

458 reader = cls(fd, q, catch_exit) 

459 

460 if start: 

461 reader.start() 

462 

463 return reader, q