Coverage for pyquickhelper/loghelper/run_cmd.py: 95%
193 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-03 02:21 +0200
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-03 02:21 +0200
1# -*- coding: utf-8 -*-
2"""
3@file
4@brief Implements function @see fn run_cmd.
5"""
6import sys
7import os
8import time
9import subprocess
10import threading
11import warnings
12import re
13import queue
14from .flog_fake_classes import PQHException
17class RunCmdException(Exception):
18 """
19 Raised by function @see fn run_cmd.
20 """
21 pass
24def get_interpreter_path():
25 """
26 Returns the interpreter path.
27 """
28 if sys.platform.startswith("win"):
29 return sys.executable.replace( # pragma: no cover
30 "pythonw.exe", "python.exe")
31 else:
32 return sys.executable
35def split_cmp_command(cmd, remove_quotes=True):
36 """
37 Splits a command line.
39 @param cmd command line
40 @param remove_quotes True by default
41 @return list
42 """
43 if isinstance(cmd, str):
44 spl = cmd.split()
45 res = []
46 for s in spl:
47 if len(res) == 0:
48 res.append(s)
49 elif res[-1].startswith('"') and not res[-1].endswith('"'):
50 res[-1] += " " + s
51 else:
52 res.append(s)
53 if remove_quotes:
54 nres = []
55 for _ in res:
56 if _.startswith('"') and _.endswith('"'):
57 nres.append(_.strip('"'))
58 else:
59 nres.append(_)
60 return nres
61 else:
62 return res
63 else:
64 return cmd
67def decode_outerr(outerr, encoding, encerror, msg):
68 """
69 Decodes the output or the error after running a command line instructions.
71 @param outerr output or error
72 @param encoding encoding (if None, it is replaced by ascii)
73 @param encerror how to handle errors
74 @param msg to add to the exception message
75 @return converted string
76 """
77 if encoding is None:
78 encoding = "ascii"
79 typstr = str
80 if not isinstance(outerr, bytes):
81 raise TypeError( # pragma: no cover
82 "only able to decode bytes, not " + typstr(type(outerr)))
83 try:
84 out = outerr.decode(encoding, errors=encerror)
85 return out
86 except UnicodeDecodeError as exu:
87 try:
88 out = outerr.decode(
89 "utf8" if encoding != "utf8" else "latin-1", errors=encerror)
90 return out
91 except Exception as e: # pragma: no cover
92 out = outerr.decode(encoding, errors='ignore')
93 raise RuntimeError(
94 f"issue with cmd ({encoding}):{typstr(msg)}\n{typstr(exu)}"
95 "\n-----\n{out}") from e
96 raise RuntimeError( # pragma: no cover
97 "complete issue with cmd:" + typstr(msg))
100def skip_run_cmd(cmd, sin="", shell=True, wait=False, log_error=True,
101 stop_running_if=None, encerror="ignore",
102 encoding="utf8", change_path=None, communicate=True,
103 preprocess=True, timeout=None, catch_exit=False, fLOG=None,
104 timeout_listen=None, tell_if_no_output=None, prefix_log=None):
105 """
106 Has the same signature as @see fn run_cmd but does nothing.
107 """
108 return "", ""
111def run_cmd(cmd, sin="", shell=sys.platform.startswith("win"), wait=False, log_error=True,
112 stop_running_if=None, encerror="ignore", encoding="utf8",
113 change_path=None, communicate=True, preprocess=True, timeout=None,
114 catch_exit=False, fLOG=None, tell_if_no_output=None, prefix_log=None):
115 """
116 Runs a command line and wait for the result.
118 @param cmd command line
119 @param sin sin: what must be written on the standard input
120 @param shell if True, cmd is a shell command (and no command window is opened)
121 @param wait call ``proc.wait``
122 @param log_error if log_error, call fLOG (error)
123 @param stop_running_if the function stops waiting if some condition is fulfilled.
124 The function received the last line from the logs.
125 Signature: ``stop_waiting_if(last_out, last_err) -> bool``.
126 The function must return True to stop waiting.
127 This function can also be used to intercept the standard output
128 and the standard error while running.
129 @param encerror encoding errors (ignore by default) while converting the output into a string
130 @param encoding encoding of the output
131 @param change_path change the current path if not None (put it back after the execution)
132 @param communicate use method `communicate
133 <https://docs.python.org/3/library/subprocess.html#subprocess.Popen.communicate>`_
134 which is supposed to be safer, parameter ``wait`` must be True
135 @param preprocess preprocess the command line if necessary (not available on Windows)
136 (False to disable that option)
137 @param timeout when data is sent to stdin (``sin``), a timeout is needed to avoid
138 waiting for ever (*timeout* is in seconds)
139 @param catch_exit catch *SystemExit* exception
140 @param fLOG logging function (if not None, bypass others parameters)
141 @param tell_if_no_output tells if there is no output every *tell_if_no_output* seconds
142 @param prefix_log add a prefix to a line before printing it
143 @return content of stdout, stdres (only if wait is True)
145 .. exref::
146 :title: Run a program using the command line
148 ::
150 from pyquickhelper.loghelper import run_cmd
151 out, err = run_cmd("python setup.py install", wait=True)
153 If you are using this function to run :epkg:`git` function, parameter ``shell`` must be True.
154 The function catches *SystemExit* exception.
155 See `Constantly print Subprocess output while process is running
156 <http://stackoverflow.com/questions/4417546/constantly-print-subprocess-output-while-process-is-running/4417735>`_.
157 If *wait* is False, the function returns the started process.
158 ``__exit__`` should be called if wait if False.
159 Parameter *prefix_log* was added.
160 """
161 if prefix_log is None:
162 prefix_log = ""
163 if fLOG is not None:
164 if isinstance(cmd, (list, tuple)):
165 fLOG( # pragma: no cover
166 prefix_log + "[run_cmd] execute", " ".join(cmd))
167 else:
168 fLOG(prefix_log + "[run_cmd] execute", cmd)
170 if change_path is not None:
171 current = os.getcwd()
172 os.chdir(change_path)
174 if sys.platform.startswith("win"):
175 cmdl = cmd
176 else:
177 cmdl = split_cmp_command(cmd) if preprocess else cmd
179 if catch_exit:
180 try:
181 pproc = subprocess.Popen(cmdl,
182 shell=shell,
183 stdin=subprocess.PIPE if sin is not None and len(
184 sin) > 0 else None,
185 stdout=subprocess.PIPE if wait else None,
186 stderr=subprocess.PIPE if wait else None)
187 except SystemExit as e:
188 if change_path is not None: # pragma: no cover
189 os.chdir(current)
190 raise RunCmdException( # pragma: no cover
191 "SystemExit raised (1)") from e
193 else:
194 pproc = subprocess.Popen(cmdl,
195 shell=shell,
196 stdin=subprocess.PIPE if sin is not None and len(
197 sin) > 0 else None,
198 stdout=subprocess.PIPE if wait else None,
199 stderr=subprocess.PIPE if wait else None)
201 pproc.__enter__() # pylint: disable=C2801
202 if isinstance(cmd, list):
203 cmd = " ".join(cmd)
205 if wait:
206 skip_out_err = False
207 out = []
208 err = []
209 err_read = False
210 skip_waiting = False
212 if communicate:
213 # communicate is True
214 if tell_if_no_output is not None:
215 raise NotImplementedError(
216 "tell_if_no_output is not implemented when communicate is True")
217 if stop_running_if is not None:
218 raise NotImplementedError(
219 "stop_running_if is not implemented when communicate is True")
220 input = sin if sin is None else sin.encode()
221 if input is not None and len(input) > 0:
222 if fLOG is not None:
223 fLOG(prefix_log + "[run_cmd] input", [input])
225 if catch_exit:
226 try:
227 stdoutdata, stderrdata = pproc.communicate(
228 input=input, timeout=timeout)
229 except SystemExit as e: # pragma: no cover
230 if change_path is not None:
231 os.chdir(current)
232 raise RunCmdException("SystemExit raised (2)") from e
233 else:
234 stdoutdata, stderrdata = pproc.communicate(
235 input=input, timeout=timeout)
237 out = decode_outerr(stdoutdata, encoding, encerror, cmd)
238 err = decode_outerr(stderrdata, encoding, encerror, cmd)
239 else:
240 # communicate is False: use of threads
241 if sin is not None and len(sin) > 0:
242 if change_path is not None:
243 os.chdir(current) # pragma: no cover
244 pproc.__exit__(None, None, None)
245 raise RuntimeError(
246 "Argument 'communicate' should be True to send "
247 "something on stdin.")
248 stdout, stderr = pproc.stdout, pproc.stderr
250 begin = time.perf_counter()
251 last_update = begin
252 # with threads
253 (stdoutReader, stdoutQueue) = _AsyncLineReader.getForFd(
254 stdout, catch_exit=catch_exit)
255 (stderrReader, stderrQueue) = _AsyncLineReader.getForFd(
256 stderr, catch_exit=catch_exit)
257 runloop = True
259 while (not stdoutReader.eof() or not stderrReader.eof()) and runloop:
260 while not stdoutQueue.empty():
261 line = stdoutQueue.get()
262 decol = decode_outerr(
263 line, encoding, encerror, cmd)
264 sdecol = decol.strip("\n\r")
265 if fLOG is not None:
266 fLOG(prefix_log + sdecol)
267 out.append(sdecol)
268 last_update = time.perf_counter()
269 if stop_running_if is not None and stop_running_if(decol, None):
270 runloop = False
271 break
273 while not stderrQueue.empty():
274 line = stderrQueue.get()
275 decol = decode_outerr(
276 line, encoding, encerror, cmd)
277 sdecol = decol.strip("\n\r")
278 if fLOG is not None:
279 fLOG(prefix_log + sdecol)
280 err.append(sdecol)
281 last_update = time.perf_counter()
282 if stop_running_if is not None and stop_running_if(None, decol):
283 runloop = False
284 break
285 time.sleep(0.05)
287 delta = time.perf_counter() - last_update
288 if tell_if_no_output is not None and delta >= tell_if_no_output:
289 fLOG( # pragma: no cover
290 prefix_log + "[run_cmd] No update in {0} seconds for cmd: {1}".format(
291 "%5.1f" % (last_update - begin), cmd))
292 last_update = time.perf_counter() # pragma: no cover
293 full_delta = time.perf_counter() - begin
294 if timeout is not None and full_delta > timeout:
295 runloop = False # pragma: no cover
296 fLOG( # pragma: no cover
297 prefix_log + "[run_cmd] Timeout after {0} seconds for cmd: {1}".format(
298 "%5.1f" % full_delta, cmd))
299 break # pragma: no cover
301 if runloop:
302 # Waiting for async readers to finish...
303 stdoutReader.join()
304 stderrReader.join()
306 # Waiting for process to exit...
307 returnCode = pproc.wait()
308 err_read = True
310 if returnCode != 0: # pragma: no cover
311 if change_path is not None:
312 os.chdir(current)
313 try:
314 # we try to close the ressources
315 stdout.close()
316 stderr.close()
317 except Exception as e:
318 warnings.warn(
319 "Unable to close stdout and sterr.", RuntimeWarning)
320 if catch_exit:
321 mes = ("SystemExit raised with error code {0}\nCMD:\n{1}\n"
322 "CWD:\n{2}\n#---OUT---#\n{3}\n#---ERR---#\n{4}")
323 raise RunCmdException(mes.format(
324 returnCode, cmd, os.getcwd(), "\n".join(out), "\n".join(err)))
325 raise subprocess.CalledProcessError(returnCode, cmd)
327 if not skip_waiting:
328 pproc.wait()
329 else: # pragma: no cover
330 out.append("[run_cmd] killing process.")
331 fLOG(
332 prefix_log + "[run_cmd] killing process because stop_running_if returned True.")
333 pproc.kill()
334 err_read = True
335 fLOG(prefix_log + "[run_cmd] process killed.")
336 skip_out_err = True
338 out = "\n".join(out)
339 if skip_out_err:
340 err = "Process killed." # pragma: no cover
341 else:
342 if err_read:
343 err = "\n".join(err)
344 else: # pragma: no cover
345 temp = err = stderr.read()
346 try:
347 err = decode_outerr(temp, encoding, encerror, cmd)
348 except Exception:
349 err = decode_outerr(temp, encoding, "ignore", cmd)
350 stdout.close()
351 stderr.close()
353 # same path for whether communicate is False or True
354 err = err.replace("\r\n", "\n")
355 if fLOG is not None:
356 fLOG(prefix_log + "end of execution", cmd)
358 if len(err) > 0 and log_error and fLOG is not None:
359 if "\n" in err:
360 fLOG(prefix_log + "[run_cmd] stderr (log)")
361 for eline in err.split("\n"):
362 fLOG(prefix_log + eline)
363 else:
364 fLOG( # pragma: no cover
365 prefix_log + f"[run_cmd] stderr (log)\n{err}")
367 if change_path is not None:
368 os.chdir(current)
370 pproc.__exit__(None, None, None)
371 if sys.platform.startswith("win"): # pragma: no cover
372 if err is not None:
373 err = err.strip("\n\r\t ")
374 return out.replace("\r\n", "\n"), err.replace("\r\n", "\n")
375 else:
376 if err is not None:
377 err = err.strip("\n\r\t ")
378 return out, err
379 else:
381 if change_path is not None:
382 os.chdir(current) # pragma: no cover
384 return pproc, None
387def parse_exception_message(exc):
388 """
389 Parses the message embedded in an exception and returns the standard output and error
390 if it can be found.
392 @param exc exception coming from @see fn run_cmd
393 @return out, err
394 """
395 mes = str(exc)
396 reg = re.compile(".*#---OUT---#(.*)#---ERR---#(.*)", re.DOTALL)
397 find = reg.search(mes.replace("\r", ""))
398 if find: # pragma: no cover
399 gr = find.groups()
400 out, err = gr[0], gr[1]
401 return out.strip("\n "), err.strip("\n ")
402 else:
403 return None, None
406def run_script(script, *args, **kwargs):
407 """
408 Runs a script.
410 @param script script to execute or command line starting with ``-m``
411 @param args other parameters
412 @param kwargs sent to @see fn run_cmd
413 @return out,err: content of stdout stream and stderr stream
415 Allows command line starting with ``-m``.
416 """
417 if not script.startswith('-m') and not os.path.exists(script):
418 raise PQHException(f"file {script} not found")
419 py = get_interpreter_path()
420 cmd = f"{py} {script}"
421 if len(args) > 0:
422 typstr = str
423 cmd += " " + " ".join([typstr(x) for x in args])
424 out, err = run_cmd(cmd, **kwargs)
425 return out, err
428class _AsyncLineReader(threading.Thread):
430 def __init__(self, fd, outputQueue, catch_exit):
431 threading.Thread.__init__(self)
433 assert isinstance(outputQueue, queue.Queue)
434 assert callable(fd.readline)
436 self.fd = fd
437 self.catch_exit = catch_exit
438 self.outputQueue = outputQueue
440 def run(self):
441 if self.catch_exit:
442 try:
443 for _ in map(self.outputQueue.put, iter(self.fd.readline, b'')):
444 pass
445 except SystemExit as e: # pragma: no cover
446 self.outputQueue.put(str(e))
447 raise RunCmdException("SystemExit raised (3)") from e
448 else:
449 for _ in map(self.outputQueue.put, iter(self.fd.readline, b'')):
450 pass
452 def eof(self):
453 return not self.is_alive() and self.outputQueue.empty()
455 @classmethod
456 def getForFd(cls, fd, start=True, catch_exit=False):
457 q = queue.Queue()
458 reader = cls(fd, q, catch_exit)
460 if start:
461 reader.start()
463 return reader, q