Source code for pyenbc.remote.magic_azure

# -*- coding: utf-8 -*-
"""
Magic command to run PIG script with Azure.


:githublink:`%|py|6`
"""
import sys
import os
from IPython.core.magic import magics_class, line_magic, cell_magic
from IPython.core.display import HTML
from pyquickhelper.loghelper import run_cmd
from pyquickhelper.ipythonhelper import MagicClassWithHelpers, MagicCommandParser
from .azure_connection import AzureClient, AzureException
from ..filehelper.jython_helper import run_jython, download_java_standalone


[docs]@magics_class class MagicAzure(MagicClassWithHelpers): """ Defines magic commands to access `blob storage <http://azure.microsoft.com/fr-fr/documentation/articles/storage-dotnet-how-to-use-blobs/>`_ and `HDInsight <http://azure.microsoft.com/fr-fr/services/hdinsight/>`_. When the container is not specified, it will take the default one. .. faqref:: :title: Magic command %blob_open does not work Try this:: %load_ext pyenbc The exception tells more about what goes wrong. Usually a module is missing. .. faqref:: :title: Incorrect padding The following crypted message happens sometimes:: Error: Incorrect padding It is usually due to an incorrect password. Some notebooks uses:: import pyquickhelper.ipythonhelper as ipy params={"blob_storage":"hdblobstorage", "password":""} ipy.open_html_form(params=params,title="credentials",key_save="blobservice") blobstorage = blobservice["blob_storage"] blobpassword = blobservice["password"] %load_ext pyenbc %blob_open This code avoids the author letting password in a notebook but you can just replace everything by:: blobstorage = "<username>" blobpassword = "****long*key*******==" %load_ext pyenbc %blob_open :githublink:`%|py|64` """
[docs] def create_client(self, account_name, account_key, hadoop_server=None, hadoop_password=None, username=None): """ Create a :class:`AzureClient <pyenbc.remote.azure_connection.AzureClient>` and stores in the workspace. :param account_name: login :param account_key: password :param hadoop_server: hadoop server :param hadoop_password: hadoop password :param username: username :return: instance of :class:`AzureClient <pyenbc.remote.azure_connection.AzureClient>` :githublink:`%|py|77` """ if username is None: username = "any" cl = AzureClient( account_name, account_key, hadoop_server, hadoop_password, pseudo=username) self.shell.user_ns["remote_azure_client"] = cl return cl
[docs] def _replace_params(self, cell): """ replaces parameter such ``__PASSWORD__`` by variable in the notebook environment :param cell: string :return: modified string :githublink:`%|py|95` """ if "__PASSWORD__" in cell and self.shell is not None and "password" in self.shell.user_ns: cell = cell.replace("__PASSWORD__", self.shell.user_ns["password"]) return cell
[docs] def get_blob_connection(self): """ returns the connection stored in the workspace :githublink:`%|py|103` """ if self.shell is None: raise Exception("No detected workspace.") if "remote_azure_client" not in self.shell.user_ns: raise KeyError("No opened Azure connection.") if "remote_azure_blob" not in self.shell.user_ns: raise KeyError("No opened Blob Storage connection.") cl = self.shell.user_ns["remote_azure_client"] bs = self.shell.user_ns["remote_azure_blob"] return cl, bs
[docs] @line_magic def azureclient(self, line): """ returns the AzureClient object :githublink:`%|py|121` """ cl, _ = self.get_blob_connection() return cl
[docs] @line_magic def blobservice(self, line): """ returns the BlobService object :githublink:`%|py|129` """ _, bs = self.get_blob_connection() return bs
[docs] @line_magic def blobcontainer(self, line): """ returns the Blob Storage container :githublink:`%|py|137` """ cl, _ = self.get_blob_connection() return cl.account_name
[docs] @staticmethod def blob_open_parser(): """ defines the way to parse the magic command ``%blob_open`` :githublink:`%|py|145` """ parser = MagicCommandParser(prog="blob_open", description='open a connection to an Azure blob storage, by default, ' + 'the magic command takes blobstorage and blobpassword local variables as default values') parser.add_argument( '-b', '--blobstorage', type=str, default='blobstorage', help='blob storage name') parser.add_argument( '-p', '--blobpassword', type=str, default='blobpassword', help='blob password') return parser
[docs] @line_magic def blob_open(self, line): """ .. nbref:: :tag: Azure :title: blob_open Opens a connection to blob service. It returns objects :class:`AzureClient <pyenbc.remote.azure_connection.AzureClient>` and `BlobService <http://www.xavierdupre.fr/app/azure-sdk-for-python/helpsphinx/storage/blobservice.html? highlight=blobservice#azure.storage.blobservice.BlobService>`_. The code for magic command ``%blob_open`` is equivalent to:: from pyenbc.remote import AzureClient cl = AzureClient(account_name, account_key, hadoop_server, hadoop_password, pseudo=username) bs = cl.open_blob_service() .. versionchanged:: 1.1 :githublink:`%|py|182` """ parser = self.get_parser(MagicAzure.blob_open_parser, "blob_open") args = self.get_args(line, parser) if args is not None: server = args.blobstorage password = args.blobpassword if self.shell is None: raise Exception("No detected workspace.") if "remote_azure_blob" in self.shell.user_ns: raise Exception( "a connection is still open, close it first (stored in remote_azure_blob local variable)") cl = self.create_client(server, password) bs = cl.open_blob_service() self.shell.user_ns["remote_azure_blob"] = bs return cl, bs return None
[docs] @staticmethod def hd_open_parser(): """ defines the way to parse the magic command ``%hd_open`` :githublink:`%|py|206` """ parser = MagicCommandParser(prog="hd_open", description='open a connection to an Azure blob storage and a HD Insight cluster, ' + 'by default, the magic command takes blobstorage, blobpassword, hadoop_server, ' + 'hadoop_password local variables as default values') parser.add_argument( '-b', '--blobstorage', type=str, default='blobstorage', help='blob storage name') parser.add_argument( '-p', '--blobpassword', type=str, default='blobpassword', help='blob password') parser.add_argument( '-s', '--hadoop_server', type=str, default='hadoop_server', help='hadoop server name') parser.add_argument( '-P', '--hadoop_password', type=str, default='hadoop_password', help='hadoop password') parser.add_argument( '-u', '--username', type=str, default='username', help='username (used as a prefix to avoid conflict when multiple users are using the same connection') return parser
[docs] @line_magic def hd_open(self, line): """ Opens a connection to blob service. .. nbref:: :tag: Azure :title: hd_open Opens a connection to blob service. It returns objects :class:`AzureClient <pyenbc.remote.azure_connection.AzureClient>` and `BlobService <http://www.xavierdupre.fr/app/azure-sdk-for-python/helpsphinx/storage/blobservice.html? highlight=blobservice#azure.storage.blobservice.BlobService>`_. The code for magic command ``%hd_open`` is equivalent to:: from pyenbc.remote import AzureClient cl = AzureClient(account_name, account_key, hadoop_server, hadoop_password, pseudo=username) bs = cl.open_blob_service() :githublink:`%|py|262` """ parser = self.get_parser(MagicAzure.hd_open_parser, "hd_open") args = self.get_args(line, parser) if args is not None: server = args.blobstorage password = args.blobpassword hadoop_server = args.hadoop_server hadoop_password = args.hadoop_password username = args.username if self.shell is None: raise Exception("No detected workspace.") if "remote_azure_blob" in self.shell.user_ns: raise Exception( "a connection is still open, close it first (stored in remote_azure_blob local variable)") cl = self.create_client( server, password, hadoop_server, hadoop_password, username=username) bs = cl.open_blob_service() self.shell.user_ns["remote_azure_blob"] = bs return cl, bs return None
[docs] @line_magic def blob_close(self, line): """ close the connection and remove the connection from the notebook workspace .. nbref:: :tag: Azure :title: blob_close Does nothing. :githublink:`%|py|302` """ _, bs = self.get_blob_connection() # bs.close() del self.shell.user_ns["remote_azure_blob"] return True
[docs] @line_magic def blob_containers(self, line): """ returns the list of containers :githublink:`%|py|312` """ if "-h" in line or "--help" in line: print("Usage: %blob_containers") return None else: _, bs = self.get_blob_connection() res = bs.list_containers() return [r.name for r in res]
[docs] def _interpret_path(self, line, cl, bs, empty_is_value=False): """ interpret a path :param line: line (see :ref:`l-magic-path-container`) :param cl: :class:`AzureClient <pyenbc.remote.azure_connection.AzureClient>` :param bs: blob service :param empty_is_value: if True, do not raise an exception :return: container, remotepath :githublink:`%|py|330` """ line = line.strip() if line.startswith("/"): container = cl.account_name line = line.lstrip("/") remotepath = line else: spl = line.split("/") container = spl[0] remotepath = None if len(spl) == 1 else "/".join(spl[1:]) if not empty_is_value and (remotepath is None or len(remotepath) == 0): raise FileNotFoundError("path should not be empty: " + line) return container, remotepath
[docs] @staticmethod def blob_ls_parser(): """ defines the way to parse the magic command ``%blob_ls`` :githublink:`%|py|350` """ parser = MagicCommandParser(prog="blob_ls", description='describes the content of folder in a blob storage') parser.add_argument( 'path', type=str, help='path to look into, </path> or <container>/<path>') return parser
[docs] @line_magic def blob_ls(self, line): """ defines command %blob_ls, see :ref:`l-magic-path-container` .. nbref:: :tag: Azure :title: blob_ls The code for magic command ``%blob_ls`` is equivalent to:: from pyenbc.remote import AzureClient cl = AzureClient(account_name, account_key, hadoop_server, hadoop_password, pseudo=username) bs = cl.open_blob_service() df = cl.ls(bs, container, remotepath) :githublink:`%|py|374` """ parser = self.get_parser(MagicAzure.blob_ls_parser, "blob_ls") args = self.get_args(line, parser) if args is not None: cl, bs = self.get_blob_connection() container, remotepath = self._interpret_path( args.path, cl, bs, True) df = cl.ls(bs, container, remotepath) if len(df) > 0: return df[["name", "last_modified", "content_type", "content_length", "blob_type"]] else: return df return None
[docs] @staticmethod def blob_lsl_parser(): """ defines the way to parse the magic command ``%blob_lsl`` :githublink:`%|py|394` """ parser = MagicCommandParser(prog="blob_lsl", description='describes the content of folder in a blob storage + metadata') parser.add_argument( 'path', type=str, help='path to look into, </path> or <container>/<path>') return parser
[docs] @line_magic def blob_lsl(self, line): """ defines command %blob_lsl (extended version of blob_lsl), see :ref:`l-magic-path-container` .. nbref:: :tag: Azure :title: blob_lsl The code for magic command ``%blob_lsl`` is equivalent to:: from pyenbc.remote import AzureClient cl = AzureClient(account_name, account_key, hadoop_server, hadoop_password, pseudo=username) bs = cl.open_blob_service() df = cl.ls(bs, container, remotepath, add_metadata=True) :githublink:`%|py|419` """ parser = self.get_parser(MagicAzure.blob_lsl_parser, "blob_lsl") args = self.get_args(line, parser) if args is not None: cl, bs = self.get_blob_connection() container, remotepath = self._interpret_path( args.path, cl, bs, True) return cl.ls(bs, container, remotepath, add_metadata=True) return None
[docs] @staticmethod def blob_up_parser(): """ Defines the way to parse the magic command ``%blob_up``. :githublink:`%|py|434` """ parser = MagicCommandParser(prog="blob_up", description='upload a file on a blob storage, ' + 'we assume the container is the first element to the remote path') parser.add_argument( 'localfile', type=str, help='local file to upload') parser.add_argument( 'remotepath', type=str, help='remote path of the uploaded file') return parser
[docs] @line_magic def blob_up(self, line): """ upload a file to the blob storage, we assume the container is the first element of the path, see :ref:`l-magic-path-container` Example:: %blob_up localfile remotepath the command does not allow spaces in files .. nbref:: :tag: Azure :title: blob_up The code for magic command ``%blob_up`` is equivalent to:: from pyenbc.remote import AzureClient cl = AzureClient(account_name, account_key, hadoop_server, hadoop_password, pseudo=username) bs = cl.open_blob_service() cl.upload(bs, container, remotepath, localfile) :githublink:`%|py|470` """ parser = self.get_parser(MagicAzure.blob_up_parser, "blob_up") args = self.get_args(line, parser) if args is not None: localfile, remotepath = args.localfile, args.remotepath if not os.path.exists(localfile): raise FileNotFoundError(localfile) cl, bs = self.get_blob_connection() container, remotepath = self._interpret_path(remotepath, cl, bs) cl.upload(bs, container, remotepath, localfile) return remotepath return None
[docs] @staticmethod def blob_down_parser(): """ Defines the way to parse the magic command ``%blob_down``. :githublink:`%|py|489` """ parser = MagicCommandParser(prog="blob_down", description='download a file from a blob storage, we assume the container ' + 'is the first element to the remote path') parser.add_argument( 'remotepath', type=str, help='remote path of the file to download') parser.add_argument( 'localfile', type=str, help='local name for the downloaded file') parser.add_argument( '-o', '--overwrite', action='store_true', default=False, help='overwrite the local file') return parser
[docs] @line_magic def blob_down(self, line): """ download a file from the blob storage, see :ref:`l-magic-path-container` Example:: %blob_down remotepath localfile the command does not allow spaces in file names .. nbref:: :tag: Azure :title: blob_down The code for magic command ``%blob_down`` is equivalent to:: from pyenbc.remote import AzureClient cl = AzureClient(account_name, account_key, hadoop_server, hadoop_password, pseudo=username) bs = cl.open_blob_service() cl.download(bs, container, remotepath, localfile) :githublink:`%|py|530` """ parser = self.get_parser(MagicAzure.blob_down_parser, "blob_down") args = self.get_args(line, parser) if args is not None: localfile, remotepath = args.localfile, args.remotepath if os.path.exists(localfile): if args.overwrite: os.remove(localfile) else: raise Exception( "file {0} cannot be overwritten".format(localfile)) cl, bs = self.get_blob_connection() container, remotepath = self._interpret_path(remotepath, cl, bs) cl.download(bs, container, remotepath, localfile) return localfile return None
[docs] @staticmethod def blob_downmerge_parser(): """ defines the way to parse the magic command ``%blob_downmerge`` :githublink:`%|py|552` """ parser = MagicCommandParser(prog="blob_downmerge", description='download a set of files from a blob storage folder, files will ' + 'be merged, we assume the container is the first element to the remote path') parser.add_argument( 'remotepath', type=str, help='remote path of the folder to download') parser.add_argument( 'localfile', type=str, help='local name for the downloaded merged file') parser.add_argument( '-o', '--overwrite', action='store_true', default=False, help='overwrite the local file') return parser
[docs] @line_magic def blob_downmerge(self, line): """ download all files from a folder, see :ref:`l-magic-path-container` Example:: %blob_downmerge remotepath localfile the command does not allow spaces in file names .. nbref:: :tag: Azure :title: blob_downmerge The code for magic command ``%blob_downmerge`` is equivalent to:: from pyenbc.remote import AzureClient cl = AzureClient(account_name, account_key, hadoop_server, hadoop_password, pseudo=username) bs = cl.open_blob_service() cl.download_merge(bs, container, remotepath, localfile) .. versionadded:: 1.1 :githublink:`%|py|596` """ parser = self.get_parser( MagicAzure.blob_downmerge_parser, "blob_downmerge") args = self.get_args(line, parser) if args is not None: localfile, remotepath = args.localfile, args.remotepath if os.path.exists(localfile): if args.overwrite: os.remove(localfile) else: raise Exception( "file {0} cannot be overwritten".format(localfile)) cl, bs = self.get_blob_connection() container, remotepath = self._interpret_path(remotepath, cl, bs) cl.download_merge(bs, container, remotepath, localfile) return localfile return None
[docs] @line_magic def blob_rm(self, line): """ calls :meth:`blob_delete <pyenbc.remote.magic_azure.MagicAzure.blob_delete>` .. versionadded:: 1.1 :githublink:`%|py|622` """ return self.blob_delete(line)
[docs] @staticmethod def blob_delete_parser(): """ defines the way to parse the magic command ``%blob_delete`` :githublink:`%|py|629` """ parser = MagicCommandParser(prog="blob_delete", description='remove a remote path') parser.add_argument( 'remotepath', type=str, help='remote path to remove') return parser
[docs] @line_magic def blob_delete(self, line): """ deletes a blob, see :ref:`l-magic-path-container` .. nbref:: :tag: Azure :title: blob_delete The code for magic command ``%blob_delete`` is equivalent to:: from pyenbc.remote import AzureClient cl = AzureClient(account_name, account_key, hadoop_server, hadoop_password, pseudo=username) bs = cl.open_blob_service() cl.delete_blob(bs, container, remotepath) :githublink:`%|py|654` """ parser = self.get_parser(MagicAzure.blob_delete_parser, "blob_delete") args = self.get_args(line, parser) if args is not None: cl, bs = self.get_blob_connection() container, remotepath = self._interpret_path( args.remotepath, cl, bs) cl.delete_blob(bs, container, remotepath) return True return None
[docs] @staticmethod def blob_rmr_parser(): """ defines the way to parse the magic command ``%blob_rmr`` :githublink:`%|py|670` """ parser = MagicCommandParser(prog="blob_rmr", description='remove a remote folder') parser.add_argument( 'remotepath', type=str, help='remote path to remove') return parser
[docs] @line_magic def blob_rmr(self, line): """ deletes a folder, see :ref:`l-magic-path-container` .. nbref:: :tag: Azure :title: blob_rmr The code for magic command ``%blob_rmr`` is equivalent to:: from pyenbc.remote import AzureClient cl = AzureClient(account_name, account_key, hadoop_server, hadoop_password, pseudo=username) bs = cl.open_blob_service() cl.delete_folder(bs, container, remotepath) :githublink:`%|py|695` """ parser = self.get_parser(MagicAzure.blob_rmr_parser, "blob_rmr") args = self.get_args(line, parser) if args is not None: cl, bs = self.get_blob_connection() container, remotepath = self._interpret_path( args.remotepath, cl, bs) return cl.delete_folder(bs, container, remotepath) return None
[docs] @staticmethod def blob_copy_parser(): """ defines the way to parse the magic command ``%blob_copy`` :githublink:`%|py|710` """ parser = MagicCommandParser(prog="blob_copy", description='copy a blob folder') parser.add_argument( 'remotepath', type=str, help='remote path to remove') parser.add_argument( 'remotedest', type=str, help='remote destination') return parser
[docs] @line_magic def blob_copy(self, line): """ copy a blob storage, see :ref:`l-magic-path-container` .. nbref:: :tag: Azure :title: blob_copy The code for magic command ``%blob_copy`` is equivalent to:: from pyenbc.remote import AzureClient cl = AzureClient(account_name, account_key, hadoop_server, hadoop_password, pseudo=username) bs = cl.open_blob_service() cl.copy_blob(bs, container, dest, src) :githublink:`%|py|739` """ parser = self.get_parser(MagicAzure.blob_copy_parser, "blob_copy") args = self.get_args(line, parser) if args is not None: src, dest = args.remotepath, args.remotedest cl, bs = self.get_blob_connection() container, src = self._interpret_path(src, cl, bs) container_, dest = self._interpret_path(dest, cl, bs) if container != container_: raise AzureException( "containers should be the same: {0} != {1}".format( container, container_), None) cl.copy_blob(bs, container, dest, src) return True return None
[docs] @staticmethod def hd_queue_parser(): """ defines the way to parse the magic command ``%hd_queue`` :githublink:`%|py|762` """ parser = MagicCommandParser(prog="hd_queue", description='displays the job queue') parser.add_argument( '-s', '--showall', action="store_true", default=False, help="show all jobs, only users'") return parser
[docs] @line_magic def hd_queue(self, line): """ defines ``%hd_queue`` .. nbref:: :tag: Azure :title: hd_queue The code for magic command ``%hd_queue`` is equivalent to:: from pyenbc.remote import AzureClient cl = AzureClient(account_name, account_key, hadoop_server, hadoop_password, pseudo=username) bs = cl.open_blob_service() cl.job_queue(showall=showall) :githublink:`%|py|788` """ parser = self.get_parser(MagicAzure.hd_queue_parser, "hd_queue") args = self.get_args(line, parser) if args is not None: showall = args.showall cl, _ = self.get_blob_connection() return cl.job_queue(showall=showall) return None
[docs] @staticmethod def hd_job_status_parser(): """ defines the way to parse the magic command ``%hd_job_status`` :githublink:`%|py|802` """ parser = MagicCommandParser(prog="hd_job_status", description='get the status of the job') parser.add_argument( 'jobid', type=str, help='job id') return parser
[docs] @line_magic def hd_job_status(self, line): """ defines ``%hd_job_status`` .. nbref:: :tag: Azure :title: hd_job_status The code for magic command ``%hd_job_status`` is equivalent to:: from pyenbc.remote import AzureClient cl = AzureClient(account_name, account_key, hadoop_server, hadoop_password, pseudo=username) bs = cl.open_blob_service() cl.job_status(jobid) :githublink:`%|py|826` """ parser = self.get_parser( MagicAzure.hd_job_status_parser, "hd_job_status") args = self.get_args(line, parser) if args is not None: jobid = args.jobid cl, _ = self.get_blob_connection() return cl.job_status(jobid) return None
[docs] @staticmethod def hd_job_kill_parser(): """ defines the way to parse the magic command ``%hd_job_kill`` :githublink:`%|py|841` """ parser = MagicCommandParser(prog="hd_job_kill", description='kill a job') parser.add_argument( 'jobid', type=str, help='job id') return parser
[docs] @line_magic def hd_job_kill(self, line): """ defines ``%hd_job_kill`` .. nbref:: :tag: Azure :title: hd_job_kill The code for magic command ``%hd_job_kill`` is equivalent to:: from pyenbc.remote import AzureClient cl = AzureClient(account_name, account_key, hadoop_server, hadoop_password, pseudo=username) bs = cl.open_blob_service() cl.job_kill(jobid) :githublink:`%|py|865` """ parser = self.get_parser(MagicAzure.hd_job_kill_parser, "hd_job_kill") args = self.get_args(line, parser) if args is not None: jobid = args.jobid cl, _ = self.get_blob_connection() return cl.job_kill(jobid) return None
[docs] @line_magic def hd_wasb_prefix(self, line): """ defines ``%hd_wasb_prefix``, returns the prefix used to connect to the blob storage, it includes the *container* name :githublink:`%|py|880` """ cl, _ = self.get_blob_connection() return cl.wasb_to_file(cl.account_name, "")
[docs] @staticmethod def PIG_azure_parser(): """ defines the way to parse the magic command ``%%PIG_azure`` :githublink:`%|py|888` """ parser = MagicCommandParser(prog="PIG_azure", description='The command store the content of the cell as a local file.') parser.add_argument( 'file', type=str, help='file name') return parser
[docs] @cell_magic def PIG_azure(self, line, cell=None): """ defines command ``%%PIG_azure`` .. nbref:: :tag: Azure :title: PIG_azure The code for magic command ``%PIG_azure`` is equivalent to:: with open(filename, "w", encoding="utf8") as f: f.write(script) :githublink:`%|py|912` """ parser = self.get_parser(MagicAzure.PIG_azure_parser, "PIG_azure") args = self.get_args(line, parser) if args is not None: filename = args.file script = cell.replace("\r", "") with open(filename, "w", encoding="utf8") as f: f.write(script)
[docs] @staticmethod def HIVE_azure_parser(): """ defines the way to parse the magic command ``%HIVE_azure`` :githublink:`%|py|926` """ parser = MagicCommandParser(prog="HIVE_azure", description='The command store the content of the cell as a local file.') parser.add_argument( 'file', type=str, help='file name') return parser
[docs] @cell_magic def HIVE_azure(self, line, cell=None): """ defines command ``%%HIVE_azure`` .. nbref:: :tag: Azure :title: HIVE_azure The code for magic command ``%HIVE_azure`` is equivalent to:: with open(filename, "w", encoding="utf8") as f: f.write(script) :githublink:`%|py|950` """ parser = self.get_parser(MagicAzure.HIVE_azure_parser, "HIVE_azure") args = self.get_args(line, parser) if args is not None: filename = args.file script = cell.replace("\r", "") with open(filename, "w", encoding="utf8") as f: f.write(script)
[docs] @staticmethod def HIVE_azure_submit_parser(): """ Defines the way to parse the magic command ``%HIVE_azure_submit``. :githublink:`%|py|964` """ parser = MagicCommandParser(prog="HIVE_azure_submit", description='Submits a job to the cluster, the job is local, the job is ' + 'first uploaded to the cluster. The magic command populates the local ' + 'variable last_job with the submitted job id.') parser.add_argument( 'file', type=str, help='file name') parser.add_argument( '-d', '--dependency', nargs="*", type=str, help='dependency of the job, the python script') parser.add_argument( '-s', '--stop-on-failure', action='store_true', default=False, help='if true, the job stops on failure right away') parser.add_argument( '-o', '--options', nargs='*', type=str, help='list of options for the job') return parser
[docs] @line_magic def HIVE_azure_submit(self, line): """ Defines command ``%HIVE_azure_submit``. .. nbref:: :tag: Azure :title: HIVE_azure_submit The code for magic command ``%HIVE_azure_submit`` is equivalent to:: from pyenbc.remote import AzureClient cl = AzureClient(account_name, account_key, hadoop_server, hadoop_password, pseudo=username) bs = cl.open_blob_service() cl.hive_submit(bs, cl.account_name, hive_file_name, dependencies, **options) :githublink:`%|py|1008` """ parser = self.get_parser( MagicAzure.HIVE_azure_submit_parser, "HIVE_azure_submit") args = self.get_args(line, parser) if args is not None: pig = args.file pys = [_ for _ in args.dependency if _.endswith( ".py")] if args.dependency is not None else [] if not os.path.exists(pig): raise FileNotFoundError(pig) options = {"stop_on_failure": False} if args.options is not None: options.update({k: True for k in args.options}) cl, bs = self.get_blob_connection() r = cl.HIVE_submit(bs, cl.account_name, pig, pys, **options) self.shell.user_ns["last_job"] = r return r return None
[docs] @staticmethod def hd_pig_submit_parser(): """ Defines the way to parse the magic command ``%hd_pig_submit``. :githublink:`%|py|1036` """ parser = MagicCommandParser(prog="hd_pig_submit", description='Submits a job to the cluster, the job is local, the job is ' + 'first uploaded to the cluster. The magic command populates the local ' + 'variable last_job with the submitted job id.') parser.add_argument( 'file', type=str, help='file name') parser.add_argument( '-d', '--dependency', nargs="*", type=str, help='dependency of the job, the python script') parser.add_argument( '-s', '--stop-on-failure', action='store_true', default=False, help='if true, the job stops on failure right away') parser.add_argument( '-o', '--options', nargs='*', type=str, help='list of options for the job') return parser
[docs] @line_magic def hd_pig_submit(self, line): """ Defines command ``%hd_pig_submit``. .. nbref:: :tag: Azure :title: hd_pig_submit The code for magic command ``%hd_pig_submit`` is equivalent to:: from pyenbc.remote import AzureClient cl = AzureClient(account_name, account_key, hadoop_server, hadoop_password, pseudo=username) bs = cl.open_blob_service() cl.pig_submit(bs, cl.account_name, pig_file_name, dependencies, **options) :githublink:`%|py|1080` """ parser = self.get_parser( MagicAzure.hd_pig_submit_parser, "hd_pig_submit") args = self.get_args(line, parser) if args is not None: pig = args.file pys = [_ for _ in args.dependency if _.endswith( ".py")] if args.dependency is not None else [] if not os.path.exists(pig): raise FileNotFoundError(pig) options = {"stop_on_failure": False} if args.options is not None: options.update({k: True for k in args.options}) cl, bs = self.get_blob_connection() r = cl.pig_submit(bs, cl.account_name, pig, pys, **options) self.shell.user_ns["last_job"] = r return r return None
[docs] @staticmethod def hd_tail_stderr_parser(): """ defines the way to parse the magic command ``%hd_tail_stderr`` :githublink:`%|py|1108` """ parser = MagicCommandParser(prog="hd_tail_stderr", description='Submits a job to the cluster, the job is local, the job is first ' + 'uploaded to the cluster. The magic command populates the local variable ' + 'last_job with the submitted job id.') parser.add_argument( 'jobid', type=str, help='job id') parser.add_argument( '-n', '--nblines', type=int, default=20, help='number of lines to display') parser.add_argument( '--raw-output', default=False, action='store_true', help='display raw text instead of HTML') return parser
[docs] @line_magic def hd_tail_stderr(self, line): """ defines ``%hd_tail_stderr`` .. warning:: This function gets the status of the job to get the script name. But the rediction uses the script name and not the job id. As a consequence, if the same script name is run multiple times, the redirection will contain the output of multiples jobs. .. nbref:: :tag: Azure :title: hd_tail_stderr The code for magic command ``%hd_tail_stderr`` is equivalent to:: from pyenbc.remote import AzureClient cl = AzureClient(account_name, account_key, hadoop_server, hadoop_password, pseudo=username) bs = cl.open_blob_service() cl.standard_outputs(job_id, bs, cl.account_name, ".") :githublink:`%|py|1150` """ parser = self.get_parser( MagicAzure.hd_tail_stderr_parser, "hd_tail_stderr") args = self.get_args(line, parser) if args is not None: job = args.jobid nbline = args.nblines if len(job) == 0: if self.shell is None or "last_job" not in self.shell.user_ns: raise Exception("no submitted jobs found in the workspace") else: job = self.shell.user_ns["last_job"]["jid"] cl, bs = self.get_blob_connection() out, err = cl.standard_outputs(job, bs, cl.account_name, ".") lines = err.split("\n") show = "\n".join(_.strip("\n\r") for _ in lines[-nbline:]) show = show.replace( "ERROR", '<b><font color="#DD0000">ERROR</font></b>') if args.raw_output: if len(out) > 0: lineo = out.split("\n") shoo = "\n".join(_.strip("\n\r") for _ in lineo[-nbline:]) return shoo else: return show else: if len(out) > 0: lineo = out.split("\n") shoo = "\n".join(_.strip("\n\r") for _ in lineo[-nbline:]) return HTML( "<pre>\n%s\n</pre><br /><b>OUT:</b><br /><pre>\n%s\n</pre>" % (show, shoo)) else: return HTML("<pre>\n%s\n</pre><br />" % show) return None
[docs] def _run_jython(self, cell, filename, func_name, args, true_jython=None): """ run a jython script :param cell: content of the cell :param filename: filename used to store the content of the cell :param func_name: function name :param args: list of arguments to run :param true_jython: jython (True) or this Python (False) :return: out, err :githublink:`%|py|1200` """ with open(filename, 'r', encoding="utf8") as pyf: content = pyf.read() temp = filename.replace(".py", ".temp.py") with open(temp, "w", encoding="utf8") as pyf: pyf.write(""" # -*- coding: utf8 -*- if __name__ != '__lib__': def outputSchema(dont_care): def wrapper(func): def inner(*args, **kwargs): return func(*args, **kwargs) return inner return wrapper def outputSchemaFunction(schema_def): def decorator(func): func.outputSchemaFunction = schema_def return func return decorator def schemaFunction(schema_def): def decorator(func): func.schemaFunction = schema_def return func return decorator """.replace(" ", "")) pyf.write( content.replace( "except Exception,", "except Exception as ")) s_func_name = func_name if isinstance( func_name, str) else func_name.__name__.split(".")[-1] pyf.write(""" if __name__ != '__lib__': import sys for row in sys.stdin: row = row.strip() res = {0}(row) sys.stdout.write(str(res)) sys.stdout.write("\\n") sys.stdout.flush() """.format(s_func_name).replace(" ", "")) cmd = sys.executable.replace( "pythonw", "python") + " " + temp + " " + " ".join("{}".format(_) for _ in args) tosend = cell if true_jython: download_java_standalone() out, err = run_jython(temp, sin=cell, timeout=10) else: out, err = run_cmd( cmd, wait=True, sin=tosend, communicate=True, timeout=10, shell=False) return out, err
[docs] @staticmethod def runjython_parser(): """ defines the way to parse the magic command ``%%runjython`` :githublink:`%|py|1259` """ parser = MagicCommandParser(prog="runjython", description='run a jython script used for streaming in HDInsight, ' + 'the function appends fake decorator a timeout is set up at 10s') parser.add_argument( 'file', type=str, help='file name') parser.add_argument( 'function_name', type=str, help='function name') parser.add_argument( '--raw-output', default=False, action='store_true', help='display raw text instead of HTML') parser.add_argument( 'args', type=str, nargs="*", help='arguments') return parser
[docs] @cell_magic def runjpython(self, line, cell=None): """ Defines command ``%%runjython``. .. nbref:: :tag: Azure :title: runjpython Run a jython script used for streaming in HDInsight, the function appends fake decorator a timeout is set up at 10s The magic function create another file included the decoration. It runs the script with this version of Python. See `In a python script how can I ignore Apache Pig's Python Decorators for standalone unit testing <http://stackoverflow.com/questions/18223898/in-a-python-script-how-can-i-ignore-apache-pigs-python-decorators-for-standalon>`_ See :meth:`_run_jython <pyenbc.remote.magic_azure.MagicAzure._run_jython>` to see the code. .. versionadded:: 1.1 :githublink:`%|py|1305` """ parser = self.get_parser(MagicAzure.runjython_parser, "runjpython") args = self.get_args(line, parser) if args is not None: filename = args.file func_name = args.function_name args = args.args out, err = self._run_jython(cell, filename, func_name, args, False) if args.raw_output: if len(err) > 0: return err else: return out else: if len(err) > 0: return HTML( '<font color="#DD0000">Error</font><br /><pre>\n%s\n</pre>' % err) else: return HTML('<pre>\n%s\n</pre>' % out) return None
[docs] @staticmethod def jython_parser(): """ defines the way to parse the magic command ``%%jython`` :githublink:`%|py|1331` """ parser = MagicCommandParser(prog="jython", description='run a jython script used for streaming in HDInsight, it does it using Jython') parser.add_argument( 'file', type=str, help='file name') parser.add_argument( 'function_name', type=str, help='function name') parser.add_argument( '--raw-output', default=False, action='store_true', help='display raw text instead of HTML') parser.add_argument( 'args', type=str, nargs="*", help='arguments') return parser
[docs] @cell_magic def jython(self, line, cell=None): """ Defines command ``%%runjython``. run a jython script used for streaming in HDInsight, the function appends fake decorator a timeout is set up at 10s The magic function create another file included the decoration. It runs the script with Jython (see the default version) See `In a python script how can I ignore Apache Pig's Python Decorators for standalone unit testing <http://stackoverflow.com/questions/18223898/in-a-python-script-how-can-i-ignore-apache-pigs-python-decorators-for-standalon>`_. .. versionadded:: 1.1 :githublink:`%|py|1370` """ parser = self.get_parser(MagicAzure.jython_parser, "jpython") args = self.get_args(line, parser) if args is not None: filename = args.file func_name = args.function_name raw_output = args.raw_output args = args.args out, err = self._run_jython(cell, filename, func_name, args, True) if raw_output: if len(err) > 0: return err else: return out else: if len(err) > 0: return HTML( '<font color="#DD0000">Error</font><br /><pre>\n%s\n</pre>' % err) else: return HTML('<pre>\n%s\n</pre>' % out) return None
[docs] @staticmethod def blob_head_parser(): """ defines the way to parse the magic command ``%blob_head`` :githublink:`%|py|1397` """ parser = MagicCommandParser(prog="blob_head", description='get the head of stream in a dataframe') parser.add_argument( 'remotepath', type=str, help='remote path of the file to download') parser.add_argument( '-m', '--merge', action='store_true', default=False, help='merges files in a folder') parser.add_argument( '-d', '--df', action='store_true', default=True, help='results as a dataframe') parser.add_argument( '-s', '--size', type=int, default=2 ** 20, help='size of data to get') parser.add_argument( '-e', '--encoding', type=str, default="utf8", help='encoding') parser.add_argument( '--sep', type=str, default="\t", help='column separator') parser.add_argument( '--header', default='infer', help='results as a dataframe') return parser
[docs] @line_magic def blob_head(self, line): """ download a file from the blob storage and display its head, see :ref:`l-magic-path-container` Example:: %blob_head remotepath the command does not allow spaces in file names .. nbref:: :tag: Azure :title: blob_head The code for magic command ``%blob_head`` is equivalent to:: from pyenbc.remote import AzureClient cl = AzureClient(account_name, account_key, hadoop_server, hadoop_password, pseudo=username) bs = cl.open_blob_service() df = cl.df_head(bs, container, remotepath, localfile) :githublink:`%|py|1461` """ parser = self.get_parser(MagicAzure.blob_head_parser, "blob_head") args = self.get_args(line, parser) if args is not None: remotepath = args.remotepath cl, bs = self.get_blob_connection() container, remotepath = self._interpret_path(remotepath, cl, bs) res = cl.df_head(bs, container, remotepath, stop_at=args.size, encoding=args.encoding, as_df=args.df, merge=args.merge, sep=args.sep, header=args.header) return res return None
[docs] @staticmethod def blob_path_parser(): """ defines the magic command ``%blob_path``, checks the path used in commands ``blob_down``, ``blob_up`` :githublink:`%|py|1481` """ parser = MagicCommandParser(prog="blob_path", description='remove a remote path') parser.add_argument( 'remotepath', type=str, help='remote path to interpret') return parser
[docs] @line_magic def blob_path(self, line): """ checks the path used in commands ``blob_down``, ``blob_up``, see :meth:`_interpret_path`, :ref:`l-magic-path-container` .. nbref:: :tag: Azure :title: blob_path The code for magic command ``%blob_path`` is equivalent to:: if line.startswith("/"): container = account_name remotepath = remotepath.lstrip("/") else: spl = line.split("/") container = spl[0] remotepath = None if len(spl) == 1 else "/".join(spl[1:]) result = container, remotepath :githublink:`%|py|1511` """ parser = self.get_parser(MagicAzure.blob_delete_parser, "blob_delete") args = self.get_args(line, parser) if args is not None: cl, bs = self.get_blob_connection() container, remotepath = self._interpret_path( args.remotepath, cl, bs) return container, remotepath return None
[docs]def register_azure_magics(ip=None): """ register magics function, can be called from a notebook :param ip: from ``get_ipython()`` :githublink:`%|py|1528` """ if ip is None: from IPython import get_ipython ip = get_ipython() ip.register_magics(MagicAzure)