Source code for pyquickhelper.sphinxext.sphinx_runpython_extension

# -*- coding: utf-8 -*-
"""
Defines runpython directives.
See `Tutorial: Writing a simple extension <http://sphinx-doc.org/extdev/tutorial.html>`_


:githublink:`%|py|7`
"""
import sys
import os
from contextlib import redirect_stdout, redirect_stderr
import traceback
import warnings
from io import StringIO
import sphinx
from docutils import nodes, core
from docutils.parsers.rst import Directive, directives
from docutils.statemachine import StringList
from sphinx.util.nodes import nested_parse_with_titles
from ..loghelper.flog import run_cmd
from ..texthelper.texts_language import TITLES
from ..pycode.code_helper import remove_extra_spaces_and_pep8
from .sphinx_collapse_extension import collapse_node


[docs]class RunPythonCompileError(Exception): """ exception raised when a piece of code included in the documentation does not compile :githublink:`%|py|28` """ pass
[docs]class RunPythonExecutionError(Exception): """ Exception raised when a piece of code included in the documentation raises an exception. :githublink:`%|py|36` """ pass
[docs]def run_python_script(script, params=None, comment=None, setsysvar=None, process=False, exception=False, warningout=None, chdir=None, context=None, store_in_file=None): """ Executes a script :epkg:`python` as a string. :param script: python script :param params: params to add before the execution :param comment: message to add in a exception when the script fails :param setsysvar: if not None, add a member to module *sys*, set up this variable to True, if is remove after the execution :param process: run the script in a separate process :param exception: expects an exception to be raised, fails if it is not, the function returns no output and the error message :param warningout: warning to disable (name of warnings) :param chdir: change directory before running this script (if not None) :param context: if not None, added to the local context @parm store_in_file stores the script into this file and calls tells python the source can be found here, that is useful is the script is using module ``inspect`` to retrieve the source which are not stored in memory :return: stdout, stderr, context If the execution throws an exception such as ``NameError: name 'math' is not defined`` after importing the module ``math``. It comes from the fact the domain name used by the function `exec <https://docs.python.org/3/library/functions.html#exec>`_ contains the declared objects. Example: :: import math def coordonnees_polaires(x,y): rho = math.sqrt(x*x+y*y) theta = math.atan2 (y,x) return rho, theta coordonnees_polaires(1, 1) The code can be modified into: :: def fake_function(): import math def coordonnees_polaires(x,y): rho = math.sqrt(x*x+y*y) theta = math.atan2 (y,x) return rho, theta coordonnees_polaires(1, 1) fake_function() Section :ref:`l-image-rst-runpython` explains how to display an image with this directive. .. versionchanged:: 1.9 Parameter *store_in_file* was added. :githublink:`%|py|100` """ def warning_filter(warningout): if warningout in (None, ''): warnings.simplefilter("always") elif isinstance(warningout, str): li = [_.strip() for _ in warningout.split()] warning_filter(li) elif isinstance(warningout, list): def interpret(s): return eval(s) if isinstance(s, str) else s warns = [interpret(w) for w in warningout] for w in warns: warnings.simplefilter("ignore", w) else: raise ValueError( "Unexpected value for warningout: {0}".format(warningout)) if params is None: params = {} if process: if context is not None and len(context) != 0: raise RunPythonExecutionError( "context cannot be used if the script runs in a separate process.") cmd = sys.executable header = ["# coding: utf-8", "import sys"] if setsysvar: header.append("sys.{0} = True".format(setsysvar)) add = 0 for path in sys.path: if path.endswith("source") or path.endswith("source/") or path.endswith("source\\"): header.append("sys.path.append('{0}')".format( path.replace("\\", "\\\\"))) add += 1 if add == 0: for path in sys.path: if path.endswith("src") or path.endswith("src/") or path.endswith("src\\"): header.append("sys.path.append('{0}')".format( path.replace("\\", "\\\\"))) add += 1 if add == 0: # It did not find any path linked to the copy of # the current module in the documentation # it assumes the first path of `sys.path` is part # of the unit test. path = sys.path[0] path = os.path.join(path, "..", "..", "src") if os.path.exists(path): header.append("sys.path.append('{0}')".format( path.replace("\\", "\\\\"))) add += 1 else: path = sys.path[0] path = os.path.join(path, "src") if os.path.exists(path): header.append("sys.path.append('{0}')".format( path.replace("\\", "\\\\"))) add += 1 if add == 0: # We do nothing unless the execution failed. exc_path = RunPythonExecutionError( "Unable to find a path to add:\n{0}".format("\n".join(sys.path))) else: exc_path = None header.append('') script = "\n".join(header) + script if store_in_file is not None: with open(store_in_file, "w", encoding="utf-8") as f: f.write(script) script_arg = None cmd += ' ' + store_in_file else: script_arg = script try: out, err = run_cmd(cmd, script_arg, wait=True, change_path=chdir) return out, err, None except Exception as ee: if not exception: message = ("--SCRIPT--\n{0}\n--PARAMS--\n{1}\n--COMMENT--\n" "{2}\n--ERR--\n{3}\n--OUT--\n{4}\n--EXC--\n{5}" "").format(script, params, comment, "", str(ee), ee) if exc_path: message += "\n---EXC--\n{0}".format(exc_path) raise RunPythonExecutionError(message) from ee return str(ee), str(ee), None else: if store_in_file: raise NotImplementedError( "store_in_file is only implemented if process is True.") try: obj = compile(script, "", "exec") except Exception as ec: # pragma: no cover if comment is None: comment = "" if not exception: message = "SCRIPT:\n{0}\nPARAMS\n{1}\nCOMMENT\n{2}".format( script, params, comment) raise RunPythonCompileError(message) from ec return "", "Cannot compile the do to {0}".format(ec), None globs = globals().copy() loc = locals() for k, v in params.items(): loc[k] = v loc["__dict__"] = params if context is not None: for k, v in context.items(): globs["__runpython__" + k] = v globs['__runpython__script__'] = script if setsysvar is not None: sys.__dict__[setsysvar] = True sout = StringIO() serr = StringIO() with redirect_stdout(sout): with redirect_stderr(sout): with warnings.catch_warnings(): warning_filter(warningout) if chdir is not None: current = os.getcwd() os.chdir(chdir) try: exec(obj, globs, loc) except Exception as ee: if chdir is not None: os.chdir(current) if setsysvar is not None: del sys.__dict__[setsysvar] if comment is None: comment = "" gout = sout.getvalue() gerr = serr.getvalue() excs = traceback.format_exc() lines = excs.split("\n") excs = "\n".join( _ for _ in lines if "sphinx_runpython_extension.py" not in _) if not exception: message = ("--SCRIPT--\n{0}\n--PARAMS--\n{1}\n--COMMENT--" "\n{2}\n--ERR--\n{3}\n--OUT--\n{4}\n--EXC--" "\n{5}\n--TRACEBACK--\n{6}").format( script, params, comment, gout, gerr, ee, excs) raise RunPythonExecutionError(message) from ee return (gout + "\n" + gerr), (gerr + "\n" + excs), None if chdir is not None: os.chdir(current) if setsysvar is not None: del sys.__dict__[setsysvar] gout = sout.getvalue() gerr = serr.getvalue() avoid = {"__runpython____WD__", "__runpython____k__", "__runpython____w__"} context = {k[13:]: v for k, v in globs.items() if k.startswith( "__runpython__") and k not in avoid} return gout, gerr, context
[docs]class runpython_node(nodes.Structural, nodes.Element): """ Defines *runpython* node. :githublink:`%|py|275` """ pass
[docs]class RunPythonDirective(Directive): """ Extracts script to run described by ``.. runpython::`` and modifies the documentation. .. exref:: :title: A python script which generates documentation The following code prints the version of Python on the standard output. It is added to the documentation:: .. runpython:: :showcode: import sys print("sys.version_info=", str(sys.version_info)) If give the following results: .. runpython:: import sys print("sys.version_info=", str(sys.version_info)) Options *showcode* can be used to display the code. The option *rst* will assume the output is in RST format and must be interpreted. *showout* will complement the RST output with the raw format. The directive has a couple of options: * ``:assert:`` condition to validate at the end of the execution to check it went right * ``:current:`` runs the script in the source file directory * ``:exception:`` the code throws an exception but it is expected. The error is displayed. * ``:indent:<int>`` to indent the output * ``:language:``: changes ``::`` into ``.. code-block:: language`` * ``:linenos:`` to show line numbers * ``:nopep8:`` if present, leaves the code as it is and does not apply pep8 by default, see :func:`remove_extra_spaces_and_pep8 <pyquickhelper.pycode.code_helper.remove_extra_spaces_and_pep8>`. * ``:numpy_precision: <precision>``, run ``numpy.set_printoptions(precision=...)``, precision is 3 by default * ``:process:`` run the script in an another process * ``:restore:`` restore the local context stored in :epkg:`sphinx` application by the previous call to *runpython* * ``:rst:`` to interpret the output, otherwise, it is considered as raw text * ``:setsysvar:`` adds a member to *sys* module, the module can act differently based on that information, if the value is left empty, *sys.enable_disabled_documented_pieces_of_code* will be be set up to *True*. * ``:showcode:`` to show the code before its output * ``:showout`` if *:rst:* is set up, this flag adds the raw rst output to check what is happening * ``:sin:<text_for_in>`` which text to display before the code (by default *In*) * ``:sout:<text_for_in>`` which text to display before the output (by default *Out*) * ``:sphinx:`` by default, function `nested_parse_with_titles <http://sphinx-doc.org/extdev/markupapi.html?highlight=nested_parse>`_ is used to parse the output of the script, if this option is set to false, `public_doctree <http://code.nabla.net/doc/docutils/api/docutils/core/docutils.core.publish_doctree.html>`_. * ``:store:`` stores the local context in :epkg:`sphinx` application to restore it later by another call to *runpython* * ``:toggle:`` add a button to hide or show the code, it takes the values ``code`` or ``out`` or ``both``. The direction then hides the given section but adds a button to show it. * ``:warningout:`` name of warnings to disable (ex: ``ImportWarning``), separated by spaces * ``:store_in_file:`` the directive store the script in a file, then executes this file (only if ``:process:`` is enabled), this trick is needed when the script to executes relies on function such :epkg:`*py:inspect:getsource` which requires the script to be stored somewhere in order to retrieve it. Option *rst* can be used the following way:: .. runpython:: :rst: for l in range(0,10): print("**line**", "*" +str(l)+"*") print('') Which displays interpreted :epkg:`RST`: .. runpython:: :rst: for l in range(0,10): print("**line**", "*" +str(l)+"*") print('') If the directive produces RST text to be included later in the documentation, it is able to interpret `docutils directives <http://docutils.sourceforge.net/docs/ref/rst/directives.html>`_ and `Sphinx directives <http://sphinx-doc.org/rest.html>`_ with function `nested_parse_with_titles <http://sphinx-doc.org/extdev/ markupapi.html?highlight=nested_parse>`_. However, if this text contains titles, it is better to use option ``:sphinx: false``. Unless *process* option is enabled, global variables cannot be used. `sphinx-autorun <https://pypi.org/project/sphinx-autorun/>`_ offers a similar service except it cannot produce compile :epkg:`RST` content, hide the source and a couple of other options. Option *toggle* can hide or unhide the piece of code or/and its output. The directive also adds local variables such as ``__WD__`` which contains the path to the documentation which contains the directive. It is useful to load additional files ``os.path.join(__WD__, ...)``. .. runpython:: :toggle: out :showcode: print("Hide or unhide this output.") .. versionchanged:: 1.9 Options *store_in_file* was added. :githublink:`%|py|391` """ required_arguments = 0 optional_arguments = 0 final_argument_whitespace = True option_spec = { 'indent': directives.unchanged, 'showcode': directives.unchanged, 'showout': directives.unchanged, 'rst': directives.unchanged, 'sin': directives.unchanged, 'sout': directives.unchanged, 'sphinx': directives.unchanged, 'sout2': directives.unchanged, 'setsysvar': directives.unchanged, 'process': directives.unchanged, 'exception': directives.unchanged, 'nopep8': directives.unchanged, 'warningout': directives.unchanged, 'toggle': directives.unchanged, 'current': directives.unchanged, 'assert': directives.unchanged, 'language': directives.unchanged, 'store': directives.unchanged, 'restore': directives.unchanged, 'numpy_precision': directives.unchanged, 'store_in_file': directives.unchanged, 'linenos': directives.unchanged, } has_content = True runpython_class = runpython_node
[docs] def run(self): """ Extracts the information in a dictionary, runs the script. :return: a list of nodes :githublink:`%|py|428` """ # settings sett = self.state.document.settings language_code = sett.language_code lineno = self.lineno # add the instance to the global settings if hasattr(sett, "out_runpythonlist"): sett.out_runpythonlist.append(self) # env if hasattr(self.state.document.settings, "env"): env = self.state.document.settings.env else: env = None if env is None: docname = "___unknown_docname___" else: docname = env.docname # post bool_set = (True, 1, "True", "1", "true") bool_set_ = (True, 1, "True", "1", "true", '') p = { 'showcode': 'showcode' in self.options, 'linenos': 'linenos' in self.options, 'showout': 'showout' in self.options, 'rst': 'rst' in self.options, 'sin': self.options.get('sin', TITLES[language_code]["In"]), 'sout': self.options.get('sout', TITLES[language_code]["Out"]), 'sout2': self.options.get('sout2', TITLES[language_code]["Out2"]), 'sphinx': 'sphinx' not in self.options or self.options['sphinx'] in bool_set, 'setsysvar': self.options.get('setsysvar', None), 'process': 'process' in self.options and self.options['process'] in bool_set_, 'exception': 'exception' in self.options and self.options['exception'] in bool_set_, 'nopep8': 'nopep8' in self.options and self.options['nopep8'] in bool_set_, 'warningout': self.options.get('warningout', '').strip(), 'toggle': self.options.get('toggle', '').strip(), 'current': 'current' in self.options and self.options['current'] in bool_set_, 'assert': self.options.get('assert', '').strip(), 'language': self.options.get('language', '').strip(), 'store_in_file': self.options.get('store_in_file', None), 'numpy_precision': self.options.get('numpy_precision', '3').strip(), 'store': 'store' in self.options and self.options['store'] in bool_set_, 'restore': 'restore' in self.options and self.options['restore'] in bool_set_, } if p['setsysvar'] is not None and len(p['setsysvar']) == 0: p['setsysvar'] = 'enable_disabled_documented_pieces_of_code' dind = 0 if p['rst'] else 4 p['indent'] = int(self.options.get("indent", dind)) # run the script name = "run_python_script_{0}".format(id(p)) if p['process']: content = ["if True:"] else: content = ["def {0}():".format(name)] if "numpy" in "\n".join(self.content) and p['numpy_precision'] not in (None, 'None', '-', ''): try: import numpy # pylint: disable=W0611 prec = int(p['numpy_precision']) content.append(" import numpy") content.append(" numpy.set_printoptions(%d)" % prec) except (ImportError, ValueError): pass content.append(' ## __WD__ ##') if p["restore"]: context = getattr(env, "runpython_context", None) for k in sorted(context): content.append( " {0} = globals()['__runpython__{0}']".format(k)) else: context = None modified_content = self.modify_script_before_running( "\n".join(self.content)) if p['assert']: footer = [] assert_condition = p['assert'].split('\n') for cond in assert_condition: footer.append("if not({0}):".format(cond)) footer.append( " raise AssertionError('''Condition '{0}' failed.''')".format(cond)) modified_content += "\n\n" + "\n".join(footer) for line in modified_content.split("\n"): content.append(" " + line) if p["store"]: content.append(' for __k__, __v__ in locals().copy().items():') content.append( " globals()['__runpython__' + __k__] = __v__") if not p['process']: content.append("{0}()".format(name)) script = "\n".join(content) script_disp = "\n".join(self.content) if not p["nopep8"]: try: script_disp = remove_extra_spaces_and_pep8( script_disp, is_string=True) except Exception as e: # pragma: no cover if '.' in docname: comment = ' File "{0}", line {1}'.format(docname, lineno) else: comment = ' File "{0}.rst", line {1}\n File "{0}.py", line {1}\n'.format( docname, lineno) raise ValueError( "Pep8 issue with\n'{0}'\n---SCRIPT---\n{1}".format(docname, script)) from e # if an exception is raised, the documentation should report a warning # return [document.reporter.warning('messagr', line=self.lineno)] current_source = self.state.document.current_source docstring = ":docstring of " in current_source if docstring: current_source = current_source.split(":docstring of ")[0] if os.path.exists(current_source): comment = ' File "{0}", line {1}'.format(current_source, lineno) if docstring: new_name = os.path.split(current_source)[0] + ".py" comment += '\n File "{0}", line {1}'.format(new_name, lineno) cs_source = current_source else: if '.' in docname: comment = ' File "{0}", line {1}'.format(docname, lineno) else: comment = ' File "{0}.rst", line {1}\n File "{0}.py", line {1}\n'.format( docname, lineno) cs_source = docname # Add __WD__. cs_source_dir = os.path.dirname(cs_source).replace("\\", "/") script = script.replace( '## __WD__ ##', "__WD__ = '{0}'".format(cs_source_dir)) out, err, context = run_python_script(script, comment=comment, setsysvar=p['setsysvar'], process=p["process"], exception=p['exception'], warningout=p['warningout'], chdir=cs_source_dir if p['current'] else None, context=context, store_in_file=p['store_in_file']) if p['store']: # Stores modified local context. setattr(env, "runpython_context", context) else: context = {} setattr(env, "runpython_context", context) if out is not None: out = out.rstrip(" \n\r\t") if err is not None: err = err.rstrip(" \n\r\t") content = out if len(err) > 0: content += "\n[runpythonerror]\n" + err # add member self.exe_class = p.copy() self.exe_class.update(dict(out=out, err=err, script=script)) # add indent def add_indent(content, nbind): "local function" lines = content.split("\n") if nbind > 0: lines = [(" " * nbind + _) for _ in lines] content = "\n".join(lines) return content content = add_indent(content, p['indent']) # build node node = self.__class__.runpython_class(rawsource=content, indent=p["indent"], showcode=p["showcode"], rst=p["rst"], sin=p["sin"], sout=p["sout"]) if p["showcode"]: if 'code' in p['toggle'] or 'both' in p['toggle']: hide = TITLES[language_code]['hide'] + \ ' ' + TITLES[language_code]['code'] unhide = TITLES[language_code]['unhide'] + \ ' ' + TITLES[language_code]['code'] secin = collapse_node(hide=hide, unhide=unhide, show=False) node += secin else: secin = node pin = nodes.paragraph(text=p["sin"]) if p['language'] in (None, ''): p['language'] = 'python' if p['language']: pcode = nodes.literal_block( script_disp, script_disp, language=p['language'], linenos=p['linenos']) else: pcode = nodes.literal_block( script_disp, script_disp, linenos=p['linenos']) secin += pin secin += pcode elif len(self.options.get('sout', '')) == 0: p["sout"] = '' p["sout2"] = '' # RST output. if p["rst"]: settings_overrides = {} try: sett.output_encoding except KeyError: settings_overrides["output_encoding"] = "unicode" # try: # sett.doctitle_xform # except KeyError: # settings_overrides["doctitle_xform"] = True try: sett.warning_stream except KeyError: # pragma: no cover settings_overrides["warning_stream"] = StringIO() # 'initial_header_level': 2, secout = node if 'out' in p['toggle'] or 'both' in p['toggle']: hide = TITLES[language_code]['hide'] + \ ' ' + TITLES[language_code]['outl'] unhide = TITLES[language_code]['unhide'] + \ ' ' + TITLES[language_code]['outl'] secout = collapse_node(hide=hide, unhide=unhide, show=False) node += secout elif len(p["sout"]) > 0: secout += nodes.paragraph(text=p["sout"]) try: if p['sphinx']: st = StringList(content.replace("\r", "").split("\n")) nested_parse_with_titles(self.state, st, secout) dt = None else: dt = core.publish_doctree( content, settings=sett, settings_overrides=settings_overrides) except Exception as e: # pragma: no cover tab = content content = ["::"] st = StringIO() traceback.print_exc(file=st) content.append("") trace = st.getvalue() trace += "\n----------------------OPT\n" + str(p) trace += "\n----------------------EXC\n" + str(e) trace += "\n----------------------SETT\n" + str(sett) trace += "\n----------------------ENV\n" + str(env) trace += "\n----------------------DOCNAME\n" + str(docname) trace += "\n----------------------CODE\n" content.extend(" " + _ for _ in trace.split("\n")) content.append("") content.append("") content.extend(" " + _ for _ in tab.split("\n")) content = "\n".join(content) pout = nodes.literal_block(content, content) secout += pout dt = None if dt is not None: for ch in dt.children: node += ch # Regular output. if not p["rst"] or p["showout"]: text = p["sout2"] if p["rst"] else p["sout"] secout = node if 'out' in p['toggle'] or 'both' in p['toggle']: hide = TITLES[language_code]['hide'] + \ ' ' + TITLES[language_code]['outl'] unhide = TITLES[language_code]['unhide'] + \ ' ' + TITLES[language_code]['outl'] secout = collapse_node(hide=hide, unhide=unhide, show=False) node += secout elif len(text) > 0: pout2 = nodes.paragraph(text=text) node += pout2 pout = nodes.literal_block(content, content) secout += pout p['runpython'] = node # classes node['classes'] += ["runpython"] ns = [node] return ns
[docs] def modify_script_before_running(self, script): """ Takes the script as a string and returns another string before it is run. It does not modify what is displayed. The function can be overwritten by any class based on this one. :githublink:`%|py|732` """ return script
[docs]def visit_runpython_node(self, node): """ What to do when visiting a node :class:`runpython_node <pyquickhelper.sphinxext.sphinx_runpython_extension.runpython_node>` the function should have different behaviour, depending on the format, or the setup should specify a different function for each. :githublink:`%|py|742` """ pass
[docs]def depart_runpython_node(self, node): """ What to do when leaving a node :class:`runpython_node <pyquickhelper.sphinxext.sphinx_runpython_extension.runpython_node>` the function should have different behaviour, depending on the format, or the setup should specify a different function for each. :githublink:`%|py|752` """ pass
[docs]def setup(app): """ setup for ``runpython`` (sphinx) :githublink:`%|py|759` """ app.add_config_value('out_runpythonlist', [], 'env') if hasattr(app, "add_mapping"): app.add_mapping('runpython', runpython_node) app.add_node(runpython_node, html=(visit_runpython_node, depart_runpython_node), epub=(visit_runpython_node, depart_runpython_node), elatex=(visit_runpython_node, depart_runpython_node), latex=(visit_runpython_node, depart_runpython_node), rst=(visit_runpython_node, depart_runpython_node), md=(visit_runpython_node, depart_runpython_node), text=(visit_runpython_node, depart_runpython_node)) app.add_directive('runpython', RunPythonDirective) return {'version': sphinx.__display_version__, 'parallel_read_safe': True}