Source code for pyquickhelper.ipythonhelper.notebook_helper

"""
Some automation helpers about notebooks


:githublink:`%|py|5`
"""
import os
import sys
import json
import warnings
from io import StringIO
from nbformat import versions
from nbformat.reader import reads, NotJSONError
from nbformat.v4 import upgrade
from ..filehelper import read_content_ufs
from ..loghelper import noLOG
from ..filehelper import explore_folder_iterfile, remove_folder
from .notebook_runner import NotebookRunner
from .notebook_exception import NotebookException


with warnings.catch_warnings():
    warnings.simplefilter("ignore", category=ImportWarning)
    try:
        from ipykernel.kernelspec import install as install_k
        raisewarn = False
    except ImportError:  # pragma: no cover
        raisewarn = True
if raisewarn:  # pragma: no cover
    warnings.warn("ipykernel is not installed. pyquickhelper cannot execute a notebook.",
                  category=ImportWarning)


[docs]def writes(nb, **kwargs): """ Write a notebook to a string in a given format in the current nbformat version. This function always writes the notebook in the current nbformat version. Parameters ++++++++++ nb : NotebookNode The notebook to write. kwargs : Among these parameters, *version* (int) which is The nbformat version to write. Used for downgrading notebooks. Returns +++++++ s : unicode The notebook string. :githublink:`%|py|53` """ try: return versions[nb.nbformat].writes_json(nb, **kwargs) except AttributeError as e: # pragma: no cover raise NotebookException( "probably wrong error: {0}".format(nb.nbformat)) from e
[docs]def upgrade_notebook(filename, encoding="utf-8"): """ Converts a notebook from version 2 to latest. :param filename: filename :param encoding: encoding :return: modification? :githublink:`%|py|68` """ with open(filename, "r", encoding=encoding) as payload: content = payload.read() try: nb = reads(content) except NotJSONError as e: # pragma: no cover if len(content) > 10: lc = list(content[:10]) else: lc = list(content) raise ValueError( "Unable to read content type '{0}' in '{2}' ---- {1}".format(type(content), lc, filename)) from e if not hasattr(nb, "nbformat") or nb.nbformat >= 4: return False try: upgrade(nb, from_version=nb.nbformat) except ValueError as e: # pragma: no cover raise ValueError("Unable to convert '{0}'.".format(filename)) from e s = writes(nb) if isinstance(s, bytes): s = s.decode('utf8') if s == content: return False with open(filename, "w", encoding=encoding) as f: f.write(s) return True
[docs]def read_nb(filename, profile_dir=None, encoding="utf8", working_dir=None, comment="", fLOG=noLOG, code_init=None, kernel_name="python", log_level="30", extended_args=None, kernel=False, replacements=None): """ Reads a notebook and return a :class:`NotebookRunner <pyquickhelper.ipythonhelper.notebook_runner.NotebookRunner>` object. :param filename: notebook filename (or stream) :param profile_dir: profile directory :param encoding: encoding for the notebooks :param working_dir: working directory :param comment: additional information added to error message :param code_init: to initialize the notebook with a python code as if it was a cell :param fLOG: logging function :param log_level: Choices: (0, 10, 20, 30=default, 40, 50, 'DEBUG', 'INFO', 'WARN', 'ERROR', 'CRITICAL') :param kernel_name: kernel name, it can be None :param extended_args: others arguments to pass to the command line (`--KernelManager.autorestar=True` for example), see :ref:`l-ipython_notebook_args` for a full list :param kernel: *kernel* is True by default, the notebook can be run, if False, the notebook can be read but not run :param replacements: replacements to make in every cell before running it, dictionary ``{ string: string }`` :return: :class:`NotebookRunner <pyquickhelper.ipythonhelper.notebook_runner.NotebookRunner>` :githublink:`%|py|125` """ if isinstance(filename, str): with open(filename, "r", encoding=encoding) as payload: nb = reads(payload.read()) nb_runner = NotebookRunner( nb, profile_dir=profile_dir, theNotebook=os.path.abspath(filename), kernel=kernel, working_dir=working_dir, comment=comment, fLOG=fLOG, code_init=code_init, kernel_name="python", log_level="30", extended_args=None, filename=filename, replacements=replacements) return nb_runner else: nb = reads(filename.read()) nb_runner = NotebookRunner(nb, kernel=kernel, profile_dir=profile_dir, working_dir=working_dir, comment=comment, fLOG=fLOG, code_init=code_init, kernel_name="python", log_level="30", extended_args=None, filename=filename, replacements=replacements) return nb_runner
[docs]def read_nb_json(js, profile_dir=None, encoding="utf8", working_dir=None, comment="", fLOG=noLOG, code_init=None, kernel_name="python", log_level="30", extended_args=None, kernel=False, replacements=None): """ Reads a notebook from a :epkg:`JSON` stream or string. :param js: string or stream :param profile_dir: profile directory :param encoding: encoding for the notebooks :param working_dir: working directory :param comment: additional information added to error message :param code_init: to initialize the notebook with a python code as if it was a cell :param fLOG: logging function :param log_level: Choices: (0, 10, 20, 30=default, 40, 50, 'DEBUG', 'INFO', 'WARN', 'ERROR', 'CRITICAL') :param kernel_name: kernel name, it can be None :param extended_args: others arguments to pass to the command line ('--KernelManager.autorestar=True' for example), see :ref:`l-ipython_notebook_args` for a full list :param kernel: *kernel* is True by default, the notebook can be run, if False, the notebook can be read but not run :param replacements: replacements to make in every cell before running it, dictionary ``{ string: string }`` :return: instance of :class:`NotebookRunner <pyquickhelper.ipythonhelper.notebook_runner.NotebookRunner>` :githublink:`%|py|170` """ if isinstance(js, str): st = StringIO(js) else: st = js return read_nb(st, encoding=encoding, kernel=kernel, profile_dir=profile_dir, working_dir=working_dir, comment=comment, fLOG=fLOG, code_init=code_init, kernel_name="python", log_level="30", extended_args=None, replacements=replacements)
[docs]def find_notebook_kernel(kernel_spec_manager=None): """ Returns a dict mapping kernel names to resource directories. :param kernel_spec_manager: see `KernelSpecManager <http://jupyter-client.readthedocs.org/en/ latest/api/kernelspec.html#jupyter_client.kernelspec.KernelSpecManager>`_ A KernelSpecManager to use for installation. If none provided, a default instance will be created. :return: dict The list of installed kernels is described at `Making kernel for Jupyter <http://jupyter-client.readthedocs.org/en/latest/kernels.html#kernelspecs>`_. The function only works with *Jupyter>=4.0*. :githublink:`%|py|195` """ if kernel_spec_manager is None: from jupyter_client.kernelspec import KernelSpecManager kernel_spec_manager = KernelSpecManager() return kernel_spec_manager.find_kernel_specs()
[docs]def get_notebook_kernel(kernel_name, kernel_spec_manager=None): """ Returns a `KernelSpec <https://ipython.org/ipython-doc/dev/api/ generated/IPython.kernel.kernelspec.html>`_. :param kernel_spec_manager: see `KernelSpecManager <http://jupyter-client.readthedocs.org/en/ latest/api/kernelspec.html#jupyter_client.kernelspec.KernelSpecManager>`_ A KernelSpecManager to use for installation. If none provided, a default instance will be created. :param kernel_name: kernel name :return: KernelSpec The function only works with *Jupyter>=4.0*. :githublink:`%|py|215` """ if kernel_spec_manager is None: from jupyter_client.kernelspec import KernelSpecManager kernel_spec_manager = KernelSpecManager() return kernel_spec_manager.get_kernel_spec(kernel_name)
[docs]def install_notebook_extension(path=None, overwrite=False, symlink=False, user=False, prefix=None, nbextensions_dir=None, destination=None): """ Installs notebook extensions, see `install_nbextension <https://ipython.org/ipython-doc/ dev/api/generated/IPython.html.nbextensions.html #IPython.html.nbextensions.install_nbextension>`_ for documentation. :param path: if None, use default value :param overwrite: overwrite the extension :param symlink: see the original function :param user: user :param prefix: see the original function :param nbextensions_dir: see the original function :param destination: see the original function :return: standard output Default value is `https://github.com/ipython-contrib/IPython-notebook-extensions/archive/master.zip <https://github.com/ipython-contrib/IPython-notebook-extensions/archive/master.zip>`_. :githublink:`%|py|244` """ if path is None: path = "https://github.com/ipython-contrib/IPython-notebook-extensions/archive/master.zip" cout = sys.stdout cerr = sys.stderr sys.stdout = StringIO() sys.stderr = StringIO() from notebook.nbextensions import install_nbextension install_nbextension(path=path, overwrite=overwrite, symlink=symlink, user=user, prefix=prefix, nbextensions_dir=nbextensions_dir, destination=destination) out = sys.stdout.getvalue() err = sys.stderr.getvalue() sys.stdout = cout sys.stderr = cerr if len(err) != 0: raise NotebookException( "unable to install exception from: {0}\nOUT:\n{1}\n[nberror]\n{2}".format(path, out, err)) return out
[docs]def get_jupyter_datadir(): """ Returns the data directory for the notebook. :return: path :githublink:`%|py|272` """ from jupyter_client.kernelspec import KernelSpecManager return KernelSpecManager().data_dir
[docs]def get_jupyter_extension_dir(user=False, prefix=None, nbextensions_dir=None): """ Parameters ++++++++++ user : bool [default: False] Whether to check the user's .ipython/nbextensions directory. Otherwise check a system-wide install (e.g. /usr/local/share/jupyter/nbextensions). prefix : str [optional] Specify install prefix, if it should differ from default (e.g. /usr/local). Will check prefix/share/jupyter/nbextensions nbextensions_dir : str [optional] Specify absolute path of nbextensions directory explicitly. Return ++++++ path: path to installed extensions (by the user) :githublink:`%|py|296` """ from notebook.nbextensions import _get_nbextension_dir return _get_nbextension_dir(nbextensions_dir=nbextensions_dir, user=user, prefix=prefix)
[docs]def get_installed_notebook_extension(user=False, prefix=None, nbextensions_dir=None): """ Retuns installed extensions. :param user: bool [default: False] Whether to check the user's .ipython/nbextensions directory. Otherwise check a system-wide install (e.g. /usr/local/share/jupyter/nbextensions). :param prefix: str [optional] Specify install prefix, if it should differ from default (e.g. /usr/local). Will check prefix/share/jupyter/nbextensions :param nbextensions_dir: str [optional] Specify absolute path of nbextensions directory explicitly. :return: list: list of installed notebook extension (by the user) You can install extensions with function :func:`install_notebook_extension <pyquickhelper.ipythonhelper.notebook_helper.install_notebook_extension>`. :githublink:`%|py|317` """ path = get_jupyter_extension_dir( user=user, prefix=prefix, nbextensions_dir=nbextensions_dir) if not os.path.exists(path): raise FileNotFoundError(path) res = [] for file in explore_folder_iterfile(path): rel = os.path.relpath(file, path) spl = os.path.split(rel) name = spl[-1] if name == "main.js": fold = "/".join(spl[:-1]).replace("\\", "/") + "/main" res.append(fold) return res
[docs]def install_jupyter_kernel(exe=sys.executable, kernel_spec_manager=None, user=False, kernel_name=None, prefix=None): """ Installs a kernel based on executable (this python by default). :param exe: Python executable current one by default :param kernel_spec_manager: (KernelSpecManager [optional]). A KernelSpecManager to use for installation. If none provided, a default instance will be created. :param user: (bool). Whether to do a user-only install, or system-wide. :param kernel_name: (str), optional. Specify a name for the kernelspec. This is needed for having multiple IPython kernels for different environments. :param prefix: (str), optional. Specify an install prefix for the kernelspec. This is needed to install into a non-default location, such as a conda/virtual-env. :return: The path where the kernelspec was installed. A kernel is defined by the following fields: :: { "display_name": "Python 3 (ENSAE)", "language": "python", "argv": [ "c:\\\\PythonENSAE\\\\python\\\\python.exe", "-m", "ipykernel", "-f", "{connection_file}" ] } For R, it looks like: :: { "display_name": "R (ENSAE)", "language": "R", "argv": [ "c:\\\\PythonENSAE\\\\tools\\\\R\\\\bin\\\\x64\\\\R.exe", "--quiet", "-e", "IRkernel::main()", "--args", "{connection_file}" ] } :githublink:`%|py|386` """ exe = exe.replace("pythonw.exe", "python.exe") dest = install_k(kernel_spec_manager=kernel_spec_manager, user=user, kernel_name=kernel_name, prefix=prefix) kernel_file = os.path.join(dest, "kernel.json") kernel = dict(display_name=kernel_name, language="python", argv=[exe, "-m", "ipykernel", "-f", "{connection_file}"]) s = json.dumps(kernel) with open(kernel_file, "w") as f: f.write(s) return dest
[docs]def install_python_kernel_for_unittest(suffix=None): """ Installs a kernel based on this python (sys.executable) for unit test purposes. :param suffix: suffix to add to the kernel name :return: kernel name :githublink:`%|py|408` """ exe = os.path.split(sys.executable)[0].replace("pythonw", "python") exe = exe.replace("\\", "/").replace("/", "_").replace(".", "_").replace(":", "") kern = "ut_" + exe + "_" + str(sys.version_info[0]) if suffix is not None: kern += "_" + suffix kern = kern.lower() install_jupyter_kernel(kernel_name=kern) return kern
[docs]def remove_kernel(kernel_name, kernel_spec_manager=None): """ Removes a kernel. :param kernel_spec_manager: see `KernelSpecManager <http://jupyter-client.readthedocs.org/ en/latest/api/kernelspec.html#jupyter_client.kernelspec.KernelSpecManager>`_ A KernelSpecManager to use for installation. If none provided, a default instance will be created. :param kernel_name: kernel name The function only works with *Jupyter>=4.0*. :githublink:`%|py|431` """ kernels = find_notebook_kernel(kernel_spec_manager=kernel_spec_manager) if kernel_name in kernels: fold = kernels[kernel_name] if not os.path.exists(fold): raise FileNotFoundError("unable to remove folder " + fold) remove_folder(fold) else: raise NotebookException( # pragma: no cover "Unable to find kernel '{0}' in {1}".format( kernel_name, ", ".join(kernels.keys())))
[docs]def remove_execution_number(infile, outfile=None, encoding="utf-8", indent=2, rule=int): """ Removes execution number from a notebook. :param infile: filename of the notebook :param outfile: None ot save the file :param encoding: encoding :param indent: indentation :param rule: determines the rule which specifies execution numbers, 'None' for None, 'int' for consectuive integers numbers. :return: modified string or None if outfile is not None and the file was not modified .. todoext:: :title: remove execution number from notebook facilitate git versionning :tag: enhancement :issue: 18 :cost: 1 :hidden: :date: 2016-08-23 :release: 1.4 Remove execution number from the notebook to avoid commiting changes only about those numbers `notebook 5.1.0 <http://jupyter-notebook.readthedocs.io/en/stable/changelog.html#release-5-1-0>`_ introduced changes which are incompatible with leaving the cell executing number empty. :githublink:`%|py|471` """ def fixup(adict, k, v, cellno=0, outputs="outputs"): for key in adict.keys(): if key == k: if rule is None: adict[key] = v elif rule is int: cellno += 1 adict[key] = cellno else: raise ValueError( # pragma: no cover "Rule '{0}' does not apply on {1}={2}".format(rule, key, adict[key])) elif key == "outputs": if isinstance(adict[key], dict): fixup(adict[key], k, v, cellno=cellno, outputs=outputs) elif isinstance(adict[key], list): for el in adict[key]: if isinstance(el, dict): fixup(el, k, v, cellno=cellno, outputs=outputs) elif isinstance(adict[key], dict): cellno = fixup(adict[key], k, v, cellno=cellno, outputs=outputs) elif isinstance(adict[key], list): for el in adict[key]: if isinstance(el, dict): cellno = fixup(el, k, v, cellno=cellno, outputs=outputs) return cellno content = read_content_ufs(infile) js = json.loads(content) fixup(js, "execution_count", None) st = StringIO() json.dump(js, st, indent=indent, sort_keys=True) res = st.getvalue() if outfile is not None: if content != res: with open(outfile, "w", encoding=encoding) as f: f.write(res) return content return None return res