Source code for pyenbc.remote.azure_connection

# -*- coding: utf-8 -*-
"""
A class to help connect with a remote machine and send command line.


:githublink:`%|py|6`
"""
import os
import time
import io
import warnings
import requests
from pyquickhelper.loghelper import noLOG
from .azure_exception import AzureException


[docs]class AzureClient(): """ A simple class to access and communicate with `Azure <http://azure.microsoft.com/>`_. It requires modules: * `azure <https://github.com/Azure/azure-sdk-for-python>`_ * `requests <http://docs.python-requests.org/en/latest/>`_ .. index: blob, Azure Main functionalities related to blob: * list_containers, create_container, list_blobs, put_blob, put_block_blob_from_bytes * put_block_blob_from_text, put_page_blob_from_file, get_blob, get_blob See `How to use Blob storage from Python <https://azure.microsoft.com/en-us/documentation/articles/storage-python-how-to-use-blob-storage/>`_. .. exref:: :title: Get the list of containers and files from a blob storage? :tag: Azure The functionalities of a ``BlobService`` are described in `blockblobservice.py <https://github.com/Azure/azure-storage-python/blob/master/azure/storage/blob/blockblobservice.py>`_. :: from pyenbc.remote.azure_connection import AzureClient cl = AzureClient("<blob_storage_service>", "<primary_key>") bs = cl.open_blob_service() res = cl.ls(bs) for r in res: print(r["name"]) .. exref:: :title: Upload, download, to a blob storage :tag: Azure The following example uploads and downloads a file on a Blob Storage. :: from pyenbc.remote.azure_connection import AzureClient cl = AzureClient("<blob_storage_service>", "<primary_key>") bs = cl.open_blob_service() cl.upload(bs, "<container>", "myremotefolder/remotename.txt", "local_filename.txt") res = cl.ls(bs,"<container>") for r in res: if "local_filename" in r["name"]: print(r) cl.download(bs, "<container>", "myremotefolder/remotename.txt", "another_local_filename.txt") Many function uses `WebHCat API <https://cwiki.apache.org/confluence/display/Hive/WebHCat+Reference>`_. The error codes can be found here: `Error Codes and Responses <https://cwiki.apache.org/confluence/display/Hive/WebHCat+UsingWebHCat#WebHCatUsingWebHCat-ErrorCodesandResponses>`_. .. versionchanged:: PSEUDO, CONTAINER, SCRIPT will be passed to the script as parameters :githublink:`%|py|84` """ _blob_properties = [ "copy_completion_time", "content_encoding", "content_language", "blob_type", "copy_status_description", "copy_id", "content_md5", "lease_duration", "copy_source", "content_type", "content_length", "lease_state", "copy_progress", "copy_status", "xms_blob_sequence_number", "lease_status", "etag", "last_modified", ]
[docs] def __init__(self, blob_name, blob_key, hadoop_name=None, hadoop_key=None, hadoop_user_name="admin", pseudo="any", fLOG=None): """ constructor :param blob_name: blob storage name :param blob_key: account key for the blob storage :param hadoop_name: hadoop server name (can be None if HDInsight is not used) :param hadoop_key: hadoop key (can be None if HDInsight is not used) :param pseudo: sometimes, the same identification is used to connect to HDInsight, the pseudo is meant to avoid collisions :param fLOG: logging function :githublink:`%|py|125` """ self.account_name = blob_name self.account_key = blob_key self.hadoop_name = hadoop_name self.hadoop_key = hadoop_key self.hadoop_user_name = hadoop_user_name self.pseudo = pseudo if fLOG is None: def _log_(*args, **kwargs): pass self.LOG = _log_ else: self.LOG = fLOG if pseudo is None: raise ValueError("pseudo cannot be None") self.default_parameters = dict( SCRIPTPIG=self.pseudo + "/scripts/pig", SCRIPTHIVE=self.pseudo + "/scripts/hive", PSEUDO=self.pseudo, CONTAINER="")
[docs] def _interpret_path(self, blob_path): """ replace variavble such as ``$PSEUDO``, ``$USERNAME`` :param blob_path: path :return: modified path .. versionadded:: 1.1 :githublink:`%|py|156` """ if blob_path is None: return None if "$" in blob_path: for k, v in self.default_parameters.items(): blob_path = blob_path.replace("$" + k, v) return blob_path
[docs] @staticmethod def mask_string(s): """ return empty string or as many ``*`` as the length of the string :githublink:`%|py|168` """ if s is None: return "" else: return "*" * len(s)
[docs] def __str__(self): """ usual :githublink:`%|py|177` """ mes = "AzureClient [blob:({0},{1}), hadoop:({2},{3},{4})]".format(AzureClient.mask_string(self.account_name), AzureClient.mask_string( self.account_key), AzureClient.mask_string( self.hadoop_name), AzureClient.mask_string(self.hadoop_key), AzureClient.mask_string(self.hadoop_user_name)) return mes
[docs] def open_blob_service(self): """ open a blob service :githublink:`%|py|188` """ try: from azure.storage.blob import BlobService except ImportError: from azure.storage.blob import BlockBlobService as BlobService return BlobService(self.account_name, self.account_key)
[docs] def exists(self, blob_service, container_name, path): """ test the existence of a path on the blob storage :param blob_service: blob service, returned by :meth:`open_blob_service <pyenbc.remote.azure_connection.AzureClient.open_blob_service>` :param container_name: None for all, its name otherwise :param path: path in the container :return: boolean .. versionadded:: 1.1 :githublink:`%|py|205` """ path = self._interpret_path(path) df = self.ls(blob_service, container_name, path, as_df=False) return len(df) > 0
[docs] def ls(self, blob_service, container_name=None, path=None, add_metadata=False, as_df=True): """ return the content of a blob storage :param blob_service: blob service, returned by :meth:`open_blob_service <pyenbc.remote.azure_connection.AzureClient.open_blob_service>` :param container_name: None for all, its name otherwise :param path: path in the container :param add_metadata: add the metadata to the blob :param as_df: if True, returns a DataFrame :return: list of dictionaries .. versionchanged:: 1.1 Parameter *add_metadata* was added and the function now returns the property *last_modified*, parameter *as_df* :githublink:`%|py|226` """ res = [] if container_name is None: for cn in blob_service.list_containers(): self.LOG("exploring ", cn.name) r = self.ls( blob_service, cn.name, path=path, add_metadata=add_metadata, as_df=False) res.extend(r) if as_df: import pandas # pylint: disable=C0415 return pandas.DataFrame(res) else: return res else: path = self._interpret_path(path) res = [] for b in blob_service.list_blobs(container_name, prefix=path, include="metadata" if add_metadata else None): obs = {} obs["name"] = b.name if hasattr(b, "url"): obs["url"] = b.url else: obs["url"] = blob_service.make_blob_url( container_name, b.name) for p in AzureClient._blob_properties: if hasattr(b.properties, p): obs[p] = getattr(b.properties, p) else: obs[p] = None if b.metadata is not None: for k, v in b.metadata.items(): obs["meta_%s" % k] = v res.append(obs) if as_df: import pandas # pylint: disable=C0415 if len(res) > 0: return pandas.DataFrame(res) else: return pandas.DataFrame(columns=["name", "url"]) else: return res
_chunk_size = 4 * 1024 * 1024
[docs] def upload(self, blob_service, container_name, blob_name, file_path): """ Uploads data from a file to a blob storage. No more than 64Mb can be uploaded at the same, it needs to be split into pieces. :param blob_service: returns by :meth:`open_blob_service <pyenbc.remote.azure_connection.AzureClient.open_blob_service>` :param container_name: container name :param blob_name: blob name (remote file name) :param file_path: local file path :return: list of uploaded blob names The code comes from `Utilisation du service de stockage d'objets blob à partir de Python <http://azure.microsoft.com/fr-fr/documentation/articles/storage-python-how-to-use-blob-storage/>`_. :githublink:`%|py|296` """ if isinstance(file_path, list): res = [] for filename in file_path: only = os.path.split(filename)[-1] bn = blob_name.rstrip("/") + "/" + only r = self.upload(blob_service, container_name, bn, filename) res.append(r) return res else: blob_name = self._interpret_path(blob_name) if hasattr(blob_service, "put_blob"): # this code should disappear as it relies on an old version of # the module azure blob_service.create_container( container_name, None, None, False) blob_service.put_blob( container_name, blob_name, None, 'BlockBlob') block_ids = [] index = 0 with open(file_path, 'rb') as f: while True: data = f.read(AzureClient._chunk_size) if data: block_id = '{0:08d}'.format(index) blob_service.put_block( container_name, blob_name, data, block_id) block_ids.append(block_id) index += 1 self.LOG("uploaded", index, " bytes from ", file_path) else: break blob_service.put_block_list( container_name, blob_name, block_ids) else: blob_service.create_blob_from_path( container_name, blob_name, file_path) return blob_name
[docs] def upload_data(self, blob_service, container_name, blob_name, data): """ Uploads data (bytes) to a blob storage. No more than 64Mb can be uploaded at the same, it needs to be split into pieces. :param blob_service: returns by :meth:`open_blob_service <pyenbc.remote.azure_connection.AzureClient.open_blob_service>` :param container_name: container name :param blob_name: blob name (remote file name) :param data: bytes :return: list of uploaded blob names The code comes from `Utilisation du service de stockage d'objets blob à partir de Python <http://azure.microsoft.com/fr-fr/documentation/articles/storage-python-how-to-use-blob-storage/>`_. .. versionadded:: 1.1 :githublink:`%|py|363` """ blob_name = self._interpret_path(blob_name) blob_service.create_container(container_name, None, None, False) if hasattr(blob_service, "put_blob"): # this code should disappear as it relies on an old version of the # module azure blob_service.put_blob(container_name, blob_name, None, 'BlockBlob') block_ids = [] index = 0 while True: if len(data) > AzureClient._chunk_size: da = data[:AzureClient._chunk_size] data = data[AzureClient._chunk_size:] else: da = data data = None block_id = '{0:08d}'.format(index) blob_service.put_block( container_name, blob_name, da, block_id) block_ids.append(block_id) index += 1 if not data: break blob_service.put_block_list(container_name, blob_name, block_ids) else: blob_service.create_blob_from_bytes( container_name, blob_name, data) return blob_name
[docs] def download(self, blob_service, container_name, blob_name, file_path=None, append=False, chunk_size=None, stop_at=None): """ Downloads data from a blob storage to a file. No more than 64Mb can be downloaded at the same, it needs to be split into pieces. :param blob_service: returns by :meth:`open_blob_service <pyenbc.remote.azure_connection.AzureClient.open_blob_service>` :param container_name: container name :param blob_name: blob name (or list of blob names) (remote file name) :param file_path: local file path :param append: if True, append the content to an existing file :param chunk_size: download by chunk :param stop_at: stop at a given size (None to avoid stopping) :return: local file or bytes if *file_path* is None The code comes from `Utilisation du service de stockage d'objets blob à partir de Python <http://azure.microsoft.com/fr-fr/documentation/articles/storage-python-how-to-use-blob-storage/>`_. .. versionchanged:: 1.1 Parameters *append*, *chunk_size* were added. If *file_path* is None (default value now), the function returns bytes. :githublink:`%|py|427` """ if not isinstance(blob_name, str): res = [] for blob in blob_name: dest = os.path.join(file_path, os.path.split(blob)[-1]) r = self.download( blob_service, container_name, blob, dest, append=append, chunk_size=chunk_size, stop_at=stop_at) res.append(r) if stop_at is not None: if file_path is None: stop_at -= len(r) else: stop_at -= os.stat(r).st_size if stop_at <= 0: break if file_path is None: st = io.BytesIO() for r in res: st.write(r) return st.getvalue() else: return res else: blob_name = self._interpret_path(blob_name) if hasattr(blob_service, "get_blob"): # this code should disappear as it relies on an old version of # the module azure props = blob_service.get_blob_properties( container_name, blob_name) if hasattr(props, "properties"): blob_size = props.properties.content_length else: blob_size = int(props['content-length']) if chunk_size is None: chunk_size = AzureClient._chunk_size if stop_at is not None and stop_at < chunk_size: chunk_size = max(stop_at, 0) def iterations(f, chunk_size, container_name, blob_name, file_path, stop_at): "local function" index = 0 while index < blob_size: chunk_range = 'bytes={}-{}'.format(index, index + chunk_size - 1) data = blob_service.get_blob( container_name, blob_name, x_ms_range=chunk_range) length = len(data) index += length self.LOG("downloaded ", index, "bytes from ", file_path) if length > 0: f.write(data) if length < chunk_size: return False else: return False if stop_at is not None and stop_at <= index: return False return True if file_path is None: f = io.BytesIO() iterations(f, chunk_size, container_name, blob_name, file_path, stop_at) return f.getvalue() else: mode = 'ab' if append else 'wb' with open(file_path, mode) as f: iterations(f, chunk_size, container_name, blob_name, file_path, stop_at) return file_path else: bl = blob_service.get_blob_to_bytes(container_name, blob_name) if file_path is None: return bl.content else: with open(file_path, "wb") as f: f.write(bl.content) return file_path
[docs] def download_data(self, blob_service, container_name, blob_name, chunk_size=None, stop_at=None): """ Downloads data from a blob storage and return bytes. No more than 64Mb can be downloaded at the same, it needs to be split into pieces. :param blob_service: returns by :meth:`open_blob_service <pyenbc.remote.azure_connection.AzureClient.open_blob_service>` :param container_name: container name :param blob_name: blob name (or list of blob names) (remote file name) :param chunk_size: download by chunk :param stop_at: stop at a given size (None to avoid stopping) :return: local file or bytes if *file_path* is None .. versionadded:: 1.1 :githublink:`%|py|533` """ return self.download(blob_service=blob_service, container_name=container_name, blob_name=blob_name, chunk_size=chunk_size, stop_at=stop_at)
[docs] def df_head(self, blob_service, container_name, blob_name, stop_at=2 ** 20, encoding="utf-8", as_df=True, merge=False, **options): """ Downloads the beginning of a stream and displays as a DataFrame. :param blob_service: returns by :meth:`open_blob_service <pyenbc.remote.azure_connection.AzureClient.open_blob_service>` :param container_name: container name :param blob_name: blob name (or list of blob names) (remote file name) :param stop_at: stop at a given size (None to avoid stopping) :param encoding: encoding :param as_df: result as a dataframe or a string :param merge: if True, *blob_name* is a folder, method :meth:`download_merge <pyenbc.remote.azure_connection.AzureClient.download_merge>` is called :param options: see `read_csv <http://pandas.pydata.org/pandas-docs/version/0.17.0/generated/pandas.read_csv.html? highlight=read_csv#pandas.read_csv>`_ :return: local file or bytes if *file_path* is None .. versionadded:: 1.1 :githublink:`%|py|554` """ if merge: do = self.download_merge(blob_service=blob_service, container_name=container_name, blob_folder=blob_name, stop_at=stop_at) else: do = self.download(blob_service=blob_service, container_name=container_name, blob_name=blob_name, stop_at=stop_at) text = do.decode(encoding) if as_df: pos = text.rfind("\n") if pos > 0: st = io.StringIO(text[:pos]) else: st = io.StringIO(text) import pandas # pylint: disable=C0415 return pandas.read_csv(st, **options) else: return text
[docs] def download_merge(self, blob_service, container_name, blob_folder, file_path=None, chunk_size=None, stop_at=None): """ Downloads all files from a folder in a blob storage to a single local file. Files will be merged. No more than 64Mb can be downloaded at the same, it needs to be split into pieces. :param blob_service: returns by :meth:`open_blob_service <pyenbc.remote.azure_connection.AzureClient.open_blob_service>` :param container_name: container name :param blob_folder: blob folder(remote file name) :param file_path: local file path :param chunk_size: download by chunk :param stop_at: stop at a given size (None to avoid stopping) :return: local file .. versionchanged:: 1.1 Parameters *append*, *chunk_size* were added. If *file_path* is None (default value now), the function returns bytes. :githublink:`%|py|602` """ blob_folder = self._interpret_path(blob_folder) content = self.ls( blob_service, container_name, blob_folder, as_df=False) first = True store = io.BytesIO() for cont in content: if cont["content_length"] > 0: by = self.download( blob_service, container_name, cont["name"], file_path=file_path, chunk_size=chunk_size, stop_at=stop_at, append=not first) if first: first = False if file_path is None: store.write(by) if stop_at is not None: stop_at -= len(by) else: if stop_at is not None: stop_at -= os.stat(file_path).st_size if stop_at is not None and stop_at <= 0: break if file_path is None: return store.getvalue() else: return file_path
[docs] def delete_blob(self, blob_service, container_name, blob_name): """ delete a blob :param blob_service: returns by :meth:`open_blob_service <pyenbc.remote.azure_connection.AzureClient.open_blob_service>` :param container_name: container name :param blob_name: blob name (remote file name) :githublink:`%|py|645` """ blob_name = self._interpret_path(blob_name) blob_service.delete_blob(container_name, blob_name) return blob_name
[docs] def delete_folder(self, blob_service, container_name, blob_folder): """ delete a folder and its content :param blob_service: returns by :meth:`open_blob_service <pyenbc.remote.azure_connection.AzureClient.open_blob_service>` :param container_name: container name :param blob_folder: blob folder (remote folder name) .. versionadded:: 1.1 :githublink:`%|py|659` """ blob_folder = self._interpret_path(blob_folder) df = self.ls(blob_service, container_name, blob_folder) rem = [] for name in df["name"]: r = self.delete_blob(blob_service, container_name, name) rem.append(r) return rem
[docs] def url_blob(self, blob_service, container, blob_name): """ returns an url for a blob file name :param container: container :param blob_name: blob_name :return: url :githublink:`%|py|675` """ blob_name = self._interpret_path(blob_name) src = blob_service.make_blob_url(container, blob_name) return src
[docs] def copy_blob(self, blob_service, container, blob_name, source): """ copy a blob :param blob_service: returns by :meth:`open_blob_service <pyenbc.remote.azure_connection.AzureClient.open_blob_service>` :param container_name: container name :param blob_name: destination :param source: source :githublink:`%|py|688` """ blob_name = self._interpret_path(blob_name) url = self.url_blob(blob_service, container, source) res = blob_service.copy_blob(container, blob_name, url) return res
[docs] def url_webHCatUrl(self, cmd): """ returns an url to the cluster :param cmd: something like ``pig``, ``status`` :return: url :githublink:`%|py|700` """ if self.hadoop_name is None: raise AttributeError( "no hadoop server was given to the constructor for cmd: {0}".format(cmd)) webHCatUrl = 'https://' + self.hadoop_name + \ '.azurehdinsight.net/templeton/v1/' + cmd return webHCatUrl
[docs] def wasb_to_file(self, container_name, blob_file): """ return something like ``wasb://demo@myblobstorage.blob...`` :param container_name: name of a container :param blob_file: path to a file :return: return a url to blob file (pig script for example) :githublink:`%|py|715` """ blob_file = self._interpret_path(blob_file) return 'wasb://{1}@{0}.blob.core.windows.net/{2}'.format(container_name, self.account_name, blob_file)
[docs] def wasb_prefix(self, container_name): """ when using an instruction ``LOAD`` in a PIG script, file blob name must be reference using a wasb syntax. This method returns the prefix to add. :return: wasb prefix :githublink:`%|py|727` """ return self.wasb_to_file(container_name, "")
[docs] def get_status(self): """ return the status of the webHCatUrl server :return: json :githublink:`%|py|735` """ if self.hadoop_user_name is None: raise AttributeError( "no hadoop user name was given to the constructor") if self.hadoop_key is None: raise AttributeError( "no hadoop password was given to the constructor") webHCatUrl = self.url_webHCatUrl("status") r = requests.get(webHCatUrl, auth=(self.hadoop_user_name, self.hadoop_key)) if r.status_code != 200: raise AzureException( "unable to the status of server: " + webHCatUrl, r) return r.json()
[docs] def get_version(self): """ return the status of the WebHCat version :return: json :githublink:`%|py|759` """ if self.hadoop_user_name is None: raise AttributeError( "no hadoop user name was given to the constructor") if self.hadoop_key is None: raise AttributeError( "no hadoop password was given to the constructor") webHCatUrl = self.url_webHCatUrl("version/hive") r = requests.get(webHCatUrl, auth=(self.hadoop_user_name, self.hadoop_key)) if r.status_code != 200: raise AzureException( "unable to the version of server: " + webHCatUrl, r) return r.json()
[docs] def pig_submit(self, blob_service, container_name, pig_file, dependencies=None, status_dir=None, stop_on_failure=True, params=None): """ Submits a :epkg:`PIG` job, the function uploads it to the cluster as well as the dependencies. The code comes from `How to use HDInsight from Linux <http://blogs.msdn.com/b/benjguin/archive/2014/02/18/how-to-use-hdinsight-from-linux.aspx>`_ and `start a Pig + Jython job in HDInsight thru WebHCat <http://blogs.msdn.com/b/benjguin/archive/2014/03/21/start-a-pig-jython-job-in-hdinsight-thru-webhcat.aspx>`_. The API is described at `Pig Job — POST pig <https://cwiki.apache.org/confluence/display/Hive/WebHCat+Reference+Pig>`_. :param blob_service: returns by :meth:`open_blob_service <pyenbc.remote.azure_connection.AzureClient.open_blob_service>` :param container_name: name of a container :param pig_file: path to the job in the blob storage :param dependencies: dependencies :param status_dir: folder used by Hadoop to store job's progress, it should contain your alias if you want to avoid collision with others' jobs :param stop_on_failure: stop on failure, do not wait as long as possible :param params: to :return: json .. exref:: :title: Submit a job PIG :tag: Azure The script PIG must include an instruction ``LOAD``. This instruction use file name defined with the `wasb syntax <http://azure.microsoft.com/en-us/documentation/articles/hdinsight-use-blob-storage/>`_. If you place the string ``$CONTAINER`` before a stream name, it should be replaced by the corresponding wasb syntax associated to the container name defined by ``container_name``. The function will then load your script, modify it and save another one with the by adding ``.wasb.pig``. Others constants you could use: * ``$PSEUDO`` * ``$CONTAINER`` * ``$SCRIPTSPIG`` However, this replacement is not done by this class, but your code could be such as: :: blobstorage = "****" blobpassword = "*********************" hadoop_name = "*********" hadoop_password = "********" username = "********" cl = AzureClient(blobstorage, blobpassword, hadoop_name, hadoop_password, username) script = ''' myinput = LOAD '$CONTAINER/input.csv' using PigStorage(',') AS (index:long, sequence, tag, timestamp:long, dateformat, x:double,y:double, z:double, activity) ; filt = FILTER myinput BY activity == 'walking' ; STORE filt INTO '$PSEUDO/output.csv' USING PigStorage() ; ''' with open("script_walking.pig","w") as f : f.write(script) bs = cl.open_blob_service() js = cl.pig_submit(bs, blobstorage, "testensae/script_walking.pig") print(js) js = cl.job_status('job_1414863995725_0013') .. versionadded:: 1.1 parameter *stop_on_failure* :githublink:`%|py|855` """ if self.hadoop_user_name is None: raise AttributeError( "no hadoop user name was given to the constructor") if self.hadoop_key is None: raise AttributeError( "no hadoop password was given to the constructor") # upload scripts = self.default_parameters["SCRIPTPIG"] toup = [pig_file] if dependencies is not None: toup.extend(dependencies) res = self.upload(blob_service, container_name, scripts, toup) # path modification wasb = self.wasb_to_file(container_name, res[0]) if dependencies is not None: wasbdep = ",".join( self.wasb_to_file( container_name, _) for _ in res[ 1:]) else: wasbdep = None # parameter args = ['-v'] for k, v in sorted(self.default_parameters.items()): if k == "CONTAINER": args.extend(["-param", '%s=%s' % (k, self.wasb_to_file(container_name, v))]) else: args.extend(["-param", '%s=%s' % (k, v.replace('"', '\\"'))]) if params is not None: for k, v in sorted(params.items()): args.extend(["-param", '%s=%s' % (k, v.replace('"', '\\"'))]) if stop_on_failure: args.append("-stop_on_failure") # params params = {'user.name': self.hadoop_user_name, 'file': wasb, 'arg': args} if wasbdep is not None: params["files"] = wasbdep if status_dir is not None: status_dir = self._interpret_path(status_dir) params['statusdir'] = self.wasb_to_file( container_name, status_dir + "/" + os.path.split(pig_file)[-1] + ".log") else: status_dir = self.default_parameters["SCRIPTPIG"] params['statusdir'] = self.wasb_to_file(container_name, self.default_parameters[ "SCRIPTPIG"] + "/" + os.path.split(pig_file)[-1] + ".log") webHCatUrl = self.url_webHCatUrl("pig") # submit the job r = requests.post(webHCatUrl, auth=(self.hadoop_user_name, self.hadoop_key), data=params) if r.status_code != 200: raise AzureException( "unable to submit job: {0}\n---\nWITH PARAMS\n---\n{1}".format(pig_file, params), r) return r.json()
[docs] def hive_submit(self, blob_service, container_name, hive_file, dependencies=None, status_dir=None, stop_on_failure=True, params=None): """ Submits a :epkg:`HIVE` job, the function uploads it to the cluster as well as the dependencies. The code comes from `How to use HDInsight from Linux <http://blogs.msdn.com/b/benjguin/archive/2014/02/18/how-to-use-hdinsight-from-linux.aspx>`_ and `start a Pig + Jython job in HDInsight thru WebHCat <http://blogs.msdn.com/b/benjguin/archive/2014/03/21/start-a-pig-jython-job-in-hdinsight-thru-webhcat.aspx>`_. The API is described at `Pig Job — POST pig <https://cwiki.apache.org/confluence/display/Hive/WebHCat+Reference+Pig>`_. :param blob_service: returns by :meth:`open_blob_service <pyenbc.remote.azure_connection.AzureClient.open_blob_service>` :param container_name: name of a container :param hive_file: path to the job in the blob storage :param dependencies: dependencies :param status_dir: folder used by Hadoop to store job's progress, it should contain your alias if you want to avoid collision with others' jobs :param stop_on_failure: stop on failure, do not wait as long as possible :param params: to :return: json .. versionadded:: 1.1 :githublink:`%|py|949` """ if self.hadoop_user_name is None: raise AttributeError( "no hadoop user name was given to the constructor") if self.hadoop_key is None: raise AttributeError( "no hadoop password was given to the constructor") # upload scripts = self.default_parameters["SCRIPTHIVE"] toup = [hive_file] if dependencies is not None: toup.extend(dependencies) res = self.upload(blob_service, container_name, scripts, toup) # path modification wasb = self.wasb_to_file(container_name, res[0]) if dependencies is not None: wasbdep = ",".join( self.wasb_to_file( container_name, _) for _ in res[ 1:]) else: wasbdep = None # parameter args = ['-v'] for k, v in sorted(self.default_parameters.items()): if k == "CONTAINER": args.extend(["-param", '%s=%s' % (k, self.wasb_to_file(container_name, v))]) else: args.extend(["-param", '%s=%s' % (k, v.replace('"', '\\"'))]) if params is not None: for k, v in sorted(params.items()): args.extend(["-param", '%s=%s' % (k, v.replace('"', '\\"'))]) if stop_on_failure: args.append("-stop_on_failure") # params params = {'user.name': self.hadoop_user_name, 'file': wasb, 'arg': args} if wasbdep is not None: params["files"] = wasbdep if status_dir is not None: status_dir = self._interpret_path(status_dir) params['statusdir'] = self.wasb_to_file( container_name, status_dir + "/" + os.path.split(hive_file)[-1] + ".log") else: status_dir = self.default_parameters["SCRIPTHIVE"] params['statusdir'] = self.wasb_to_file(container_name, self.default_parameters[ "SCRIPTHIVE"] + "/" + os.path.split(hive_file)[-1] + ".log") webHCatUrl = self.url_webHCatUrl("hive") warnings.warn("Hive submission is not tested. It will probably fail.") # submit the job r = requests.post(webHCatUrl, auth=(self.hadoop_user_name, self.hadoop_key), data=params) if r.status_code != 200: raise AzureException( "unable to submit job: {0}\n---\nWITH PARAMS\n---\n{1}".format(hive_file, params), r) return r.json()
[docs] def job_queue(self, showall=False, fields=None): """ returns the list of jobs It uses the API `Job Information — GET queue/:jobid <https://cwiki.apache.org/confluence/display/Hive/WebHCat+Reference+Jobs>`_. :param showall: if True, show all your jobs (not only yours) :param fields: to add fields in the requests :return: list of jobs .. exref:: :title: List job queue :tag: Azure Most of the time, a job remains stuck in the job queue because it is full. Here is a code to check that is the case on a Azure cluster. It should be executed from a notebook. Connection :: blobstorage = "..." blobpassword = "..." hadoop_server = "..." hadoop_password = "..." username = "..." %load_ext pyenbc client, bs = %hd_open Job queue :: res = client.job_queue() res.reverse() # last submitted jobs first Displays the first 20 jobs:: for i, r in enumerate(res[:20]): st = client.job_status(r["id"]) print(i, r, st["status"]["state"],datetime.fromtimestamp(float(st["status"]["startTime"])/1000), st["status"]["jobName"]) print(st["userargs"].get("file", None), st["profile"].get("jobName", None)) It gives:: 0 {'detail': None, 'id': 'job_1451961118663_3126'} PREP 2016-01-26 21:57:28.756000 TempletonControllerJob wasb://..../scripts/pig/titi.pig TempletonControllerJob 1 {'detail': None, 'id': 'job_1451961118663_3125'} PREP 2016-01-26 21:57:28.517999 TempletonControllerJob wasb://..../scripts/pig/pre_processing.pig TempletonControllerJob 2 {'detail': None, 'id': 'job_1451961118663_3124'} PREP 2016-01-26 21:50:32.742000 TempletonControllerJob wasb://..../scripts/pig/titi.pig TempletonControllerJob 3 {'detail': None, 'id': 'job_1451961118663_3123'} RUNNING 2016-01-26 21:46:57.219000 TempletonControllerJob wasb://..../scripts/pig/alg1.pig TempletonControllerJob 4 {'detail': None, 'id': 'job_1451961118663_3122'} SUCCEEDED 2016-01-26 21:40:34.687999 PigLatin:pre_processing.pig None PigLatin:pre_processing.pig 5 {'detail': None, 'id': 'job_1451961118663_3121'} RUNNING 2016-01-26 21:41:29.657000 TempletonControllerJob wasb://..../scripts/pig/Algo_LDA2.pig TempletonControllerJob 6 {'detail': None, 'id': 'job_1451961118663_3120'} SUCCEEDED 2016-01-26 21:40:06.859999 TempletonControllerJob wasb://..../scripts/pig/alg1.pig TempletonControllerJob To kill a job:: client.job_kill("id") :githublink:`%|py|1082` """ if self.hadoop_user_name is None: raise AttributeError( "no hadoop user name was given to the constructor") if self.hadoop_key is None: raise AttributeError( "no hadoop password was given to the constructor") webHCatUrl = self.url_webHCatUrl("jobs") params = {"user.name": self.hadoop_user_name} if showall: params["showall"] = "true" if fields: if fields != "*": raise ValueError("fields can only be *") params["fields"] = fields r = requests.get(webHCatUrl, auth=(self.hadoop_user_name, self.hadoop_key), params=params) if r.status_code != 200: raise AzureException("unable to get job queue", r) return r.json()
[docs] def job_status(self, jobid): """ return the status of a job see `List Versions — GET version <https://cwiki.apache.org/confluence/display/Hive/WebHCat+Reference+Job>`_ for the outcome :param jobid: jobid :return: json You can extract the *startTime* by doing:: from datetime import datetime st = client.job_status(<job_id>) datetime.fromtimestamp(float(st["status"]["startTime"])/1000) :githublink:`%|py|1123` """ if self.hadoop_user_name is None: raise AttributeError( "no hadoop user name was given to the constructor") if self.hadoop_key is None: raise AttributeError( "no hadoop password was given to the constructor") params = {"user.name": self.hadoop_user_name} webHCatUrl = self.url_webHCatUrl("jobs/" + jobid) r = requests.get(webHCatUrl, auth=(self.hadoop_user_name, self.hadoop_key), params=params) if r.status_code != 200: raise AzureException( "unable to the version of server: " + webHCatUrl, r) return r.json()
[docs] def wait_job(self, job_id, delay=5, fLOG=noLOG): """ wait until a job has run or failed :param job_id: job_id :param delay: check every N seconds :return: status .. versionadded:: 1.1 :githublink:`%|py|1154` """ status = self.job_status(job_id) while status["status"]["state"] in ["PREP", "RUNNING"]: fLOG("job_id", job_id, ":", status["status"]["state"]) time.sleep(delay) status = self.job_status(job_id) return status
[docs] def standard_outputs(self, job_id, blob_service, container, folder): """ returns the standard output and error for a specific job id :param job_id: job_id or status :param blob_service: returns by :meth:`open_blob_service <pyenbc.remote.azure_connection.AzureClient.open_blob_service>` :param container_name: name of a container @parm folder folder where to download them :return: out, err :githublink:`%|py|1171` """ if isinstance(job_id, str): status = self.job_status(job_id) else: status = job_id status_dir = status["userargs"]["statusdir"] spl = status_dir.split("core.windows.net/") # to change path = spl[-1] self.download( blob_service, container, [path + "/" + _ for _ in ["stderr", "stdout"]], folder) with open(os.path.join(folder, "stdout"), "r", encoding="utf8") as f: out = f.read() with open(os.path.join(folder, "stderr"), "r", encoding="utf8") as f: err = f.read() return out, err
[docs] def job_kill(self, jobid): """ kills a job see `Delete Job — DELETE queue/:jobid <https://cwiki.apache.org/confluence/display/Hive/WebHCat+Reference+DeleteJob>`_ for the outcome :param jobid: jobid :return: json :githublink:`%|py|1198` """ if self.hadoop_user_name is None: raise AttributeError( "no hadoop user name was given to the constructor") if self.hadoop_key is None: raise AttributeError( "no hadoop password was given to the constructor") params = {"user.name": self.hadoop_user_name} webHCatUrl = self.url_webHCatUrl("jobs/" + jobid) r = requests.delete(webHCatUrl, auth=(self.hadoop_user_name, self.hadoop_key), params=params) if r.status_code != 200: raise AzureException( "unable to the version of server: " + webHCatUrl, r) return r.json()