module remote.azure_connection

Inheritance diagram of pyenbc.remote.azure_connection

Short summary

module pyenbc.remote.azure_connection

A class to help connect with a remote machine and send command line.

source on GitHub

Classes

class

truncated documentation

AzureClient

A simple class to access and communicate with Azure. It requires modules:

Static Methods

staticmethod

truncated documentation

mask_string

return empty string or as many * as the length of the string

Methods

method

truncated documentation

__init__

constructor

__str__

usual

_interpret_path

replace variavble such as $PSEUDO, $USERNAME

copy_blob

copy a blob

delete_blob

delete a blob

delete_folder

delete a folder and its content

df_head

Downloads the beginning of a stream and displays as a DataFrame.

download

Downloads data from a blob storage to a file. No more than 64Mb can be downloaded at the same, it needs to be split …

download_data

Downloads data from a blob storage and return bytes. No more than 64Mb can be downloaded at the same, it needs …

download_merge

Downloads all files from a folder in a blob storage to a single local file. Files will be merged. No more …

exists

test the existence of a path on the blob storage

get_status

return the status of the webHCatUrl server

get_version

return the status of the WebHCat version

hive_submit

Submits a :epkg:`HIVE` job, the function uploads it to the cluster as well as the dependencies. The code …

job_kill

kills a job see Delete Job — DELETE queue/:jobid

job_queue

returns the list of jobs It uses the API Job Information — GET queue/:jobid. …

job_status

return the status of a job see List Versions — GET version

ls

return the content of a blob storage

open_blob_service

open a blob service

pig_submit

Submits a :epkg:`PIG` job, the function uploads it to the cluster as well as the dependencies. The code …

standard_outputs

returns the standard output and error for a specific job id

upload

Uploads data from a file to a blob storage. No more than 64Mb can be uploaded at the same, it needs to be split …

upload_data

Uploads data (bytes) to a blob storage. No more than 64Mb can be uploaded at the same, it needs to be split into …

url_blob

returns an url for a blob file name

url_webHCatUrl

returns an url to the cluster

wait_job

wait until a job has run or failed

wasb_prefix

when using an instruction LOAD in a PIG script, file blob name must be reference using a wasb syntax. …

wasb_to_file

return something like wasb://demo@myblobstorage.blob...

Documentation

A class to help connect with a remote machine and send command line.

source on GitHub

class pyenbc.remote.azure_connection.AzureClient(blob_name, blob_key, hadoop_name=None, hadoop_key=None, hadoop_user_name='admin', pseudo='any', fLOG=None)[source]

Bases: object

A simple class to access and communicate with Azure. It requires modules:

Main functionalities related to blob:

  • list_containers, create_container, list_blobs, put_blob, put_block_blob_from_bytes

  • put_block_blob_from_text, put_page_blob_from_file, get_blob, get_blob

See How to use Blob storage from Python.

Get the list of containers and files from a blob storage?

The functionalities of a BlobService are described in blockblobservice.py.

from pyenbc.remote.azure_connection import AzureClient
cl = AzureClient("<blob_storage_service>",
                "<primary_key>")
bs = cl.open_blob_service()
res = cl.ls(bs)
for r in res:
    print(r["name"])

Upload, download, to a blob storage

The following example uploads and downloads a file on a Blob Storage.

from pyenbc.remote.azure_connection import AzureClient
cl = AzureClient("<blob_storage_service>",
                "<primary_key>")
bs = cl.open_blob_service()
cl.upload(bs, "<container>", "myremotefolder/remotename.txt",
                            "local_filename.txt")

res = cl.ls(bs,"<container>")
for r in res:
    if "local_filename" in r["name"]:
        print(r)

cl.download(bs, "<container>", "myremotefolder/remotename.txt",
                            "another_local_filename.txt")

Many function uses WebHCat API. The error codes can be found here: Error Codes and Responses.

Changed in version PSEUDO,: CONTAINER, SCRIPT will be passed to the script as parameters

source on GitHub

constructor

Parameters
  • blob_name – blob storage name

  • blob_key – account key for the blob storage

  • hadoop_name – hadoop server name (can be None if HDInsight is not used)

  • hadoop_key – hadoop key (can be None if HDInsight is not used)

  • pseudo – sometimes, the same identification is used to connect to HDInsight, the pseudo is meant to avoid collisions

  • fLOG – logging function

source on GitHub

__init__(blob_name, blob_key, hadoop_name=None, hadoop_key=None, hadoop_user_name='admin', pseudo='any', fLOG=None)[source]

constructor

Parameters
  • blob_name – blob storage name

  • blob_key – account key for the blob storage

  • hadoop_name – hadoop server name (can be None if HDInsight is not used)

  • hadoop_key – hadoop key (can be None if HDInsight is not used)

  • pseudo – sometimes, the same identification is used to connect to HDInsight, the pseudo is meant to avoid collisions

  • fLOG – logging function

source on GitHub

__str__()[source]

usual

source on GitHub

_blob_properties = ['copy_completion_time', 'content_encoding', 'content_language', 'blob_type', 'copy_status_description', 'copy_id', 'content_md5', 'lease_duration', 'copy_source', 'content_type', 'content_length', 'lease_state', 'copy_progress', 'copy_status', 'xms_blob_sequence_number', 'lease_status', 'etag', 'last_modified']
_chunk_size = 4194304
_interpret_path(blob_path)[source]

replace variavble such as $PSEUDO, $USERNAME

Parameters

blob_path – path

Returns

modified path

New in version 1.1.

source on GitHub

copy_blob(blob_service, container, blob_name, source)[source]

copy a blob

Parameters
  • blob_service – returns by open_blob_service

  • container_name – container name

  • blob_name – destination

  • source – source

source on GitHub

delete_blob(blob_service, container_name, blob_name)[source]

delete a blob

Parameters
  • blob_service – returns by open_blob_service

  • container_name – container name

  • blob_name – blob name (remote file name)

source on GitHub

delete_folder(blob_service, container_name, blob_folder)[source]

delete a folder and its content

Parameters
  • blob_service – returns by open_blob_service

  • container_name – container name

  • blob_folder – blob folder (remote folder name)

New in version 1.1.

source on GitHub

df_head(blob_service, container_name, blob_name, stop_at=1048576, encoding='utf-8', as_df=True, merge=False, **options)[source]

Downloads the beginning of a stream and displays as a DataFrame.

Parameters
  • blob_service – returns by open_blob_service

  • container_name – container name

  • blob_name – blob name (or list of blob names) (remote file name)

  • stop_at – stop at a given size (None to avoid stopping)

  • encoding – encoding

  • as_df – result as a dataframe or a string

  • merge – if True, blob_name is a folder, method download_merge is called

  • options – see read_csv

Returns

local file or bytes if file_path is None

New in version 1.1.

source on GitHub

download(blob_service, container_name, blob_name, file_path=None, append=False, chunk_size=None, stop_at=None)[source]

Downloads data from a blob storage to a file. No more than 64Mb can be downloaded at the same, it needs to be split into pieces.

Parameters
  • blob_service – returns by open_blob_service

  • container_name – container name

  • blob_name – blob name (or list of blob names) (remote file name)

  • file_path – local file path

  • append – if True, append the content to an existing file

  • chunk_size – download by chunk

  • stop_at – stop at a given size (None to avoid stopping)

Returns

local file or bytes if file_path is None

The code comes from Utilisation du service de stockage d’objets blob à partir de Python.

Changed in version 1.1: Parameters append, chunk_size were added. If file_path is None (default value now), the function returns bytes.

source on GitHub

download_data(blob_service, container_name, blob_name, chunk_size=None, stop_at=None)[source]

Downloads data from a blob storage and return bytes. No more than 64Mb can be downloaded at the same, it needs to be split into pieces.

Parameters
  • blob_service – returns by open_blob_service

  • container_name – container name

  • blob_name – blob name (or list of blob names) (remote file name)

  • chunk_size – download by chunk

  • stop_at – stop at a given size (None to avoid stopping)

Returns

local file or bytes if file_path is None

New in version 1.1.

source on GitHub

download_merge(blob_service, container_name, blob_folder, file_path=None, chunk_size=None, stop_at=None)[source]

Downloads all files from a folder in a blob storage to a single local file. Files will be merged. No more than 64Mb can be downloaded at the same, it needs to be split into pieces.

Parameters
  • blob_service – returns by open_blob_service

  • container_name – container name

  • blob_folder – blob folder(remote file name)

  • file_path – local file path

  • chunk_size – download by chunk

  • stop_at – stop at a given size (None to avoid stopping)

Returns

local file

Changed in version 1.1: Parameters append, chunk_size were added. If file_path is None (default value now), the function returns bytes.

source on GitHub

exists(blob_service, container_name, path)[source]

test the existence of a path on the blob storage

Parameters
  • blob_service – blob service, returned by open_blob_service

  • container_name – None for all, its name otherwise

  • path – path in the container

Returns

boolean

New in version 1.1.

source on GitHub

get_status()[source]

return the status of the webHCatUrl server

Returns

json

source on GitHub

get_version()[source]

return the status of the WebHCat version

Returns

json

source on GitHub

hive_submit(blob_service, container_name, hive_file, dependencies=None, status_dir=None, stop_on_failure=True, params=None)[source]

Submits a :epkg:`HIVE` job, the function uploads it to the cluster as well as the dependencies.

The code comes from How to use HDInsight from Linux and start a Pig + Jython job in HDInsight thru WebHCat. The API is described at Pig Job — POST pig.

Parameters
  • blob_service – returns by open_blob_service

  • container_name – name of a container

  • hive_file – path to the job in the blob storage

  • dependencies – dependencies

  • status_dir – folder used by Hadoop to store job’s progress, it should contain your alias if you want to avoid collision with others’ jobs

  • stop_on_failure – stop on failure, do not wait as long as possible

  • params – to

Returns

json

New in version 1.1.

source on GitHub

job_kill(jobid)[source]

kills a job

see Delete Job — DELETE queue/:jobid for the outcome

Parameters

jobid – jobid

Returns

json

source on GitHub

job_queue(showall=False, fields=None)[source]

returns the list of jobs

It uses the API Job Information — GET queue/:jobid.

Parameters
  • showall – if True, show all your jobs (not only yours)

  • fields – to add fields in the requests

Returns

list of jobs

List job queue

Most of the time, a job remains stuck in the job queue because it is full. Here is a code to check that is the case on a Azure cluster. It should be executed from a notebook.

Connection

blobstorage = "..."
blobpassword = "..."
hadoop_server = "..."
hadoop_password = "..."
username = "..."

%load_ext pyenbc
client, bs = %hd_open

Job queue

res = client.job_queue()
res.reverse()   # last submitted jobs first

Displays the first 20 jobs:

for i, r in enumerate(res[:20]):
    st = client.job_status(r["id"])
    print(i, r, st["status"]["state"],datetime.fromtimestamp(float(st["status"]["startTime"])/1000), st["status"]["jobName"])
    print(st["userargs"].get("file", None), st["profile"].get("jobName", None))

It gives:

0 {'detail': None, 'id': 'job_1451961118663_3126'} PREP 2016-01-26 21:57:28.756000 TempletonControllerJob
wasb://..../scripts/pig/titi.pig TempletonControllerJob
1 {'detail': None, 'id': 'job_1451961118663_3125'} PREP 2016-01-26 21:57:28.517999 TempletonControllerJob
wasb://..../scripts/pig/pre_processing.pig TempletonControllerJob
2 {'detail': None, 'id': 'job_1451961118663_3124'} PREP 2016-01-26 21:50:32.742000 TempletonControllerJob
wasb://..../scripts/pig/titi.pig TempletonControllerJob
3 {'detail': None, 'id': 'job_1451961118663_3123'} RUNNING 2016-01-26 21:46:57.219000 TempletonControllerJob
wasb://..../scripts/pig/alg1.pig TempletonControllerJob
4 {'detail': None, 'id': 'job_1451961118663_3122'} SUCCEEDED 2016-01-26 21:40:34.687999 PigLatin:pre_processing.pig
None PigLatin:pre_processing.pig
5 {'detail': None, 'id': 'job_1451961118663_3121'} RUNNING 2016-01-26 21:41:29.657000 TempletonControllerJob
wasb://..../scripts/pig/Algo_LDA2.pig TempletonControllerJob
6 {'detail': None, 'id': 'job_1451961118663_3120'} SUCCEEDED 2016-01-26 21:40:06.859999 TempletonControllerJob
wasb://..../scripts/pig/alg1.pig TempletonControllerJob

To kill a job:

client.job_kill("id")

source on GitHub

job_status(jobid)[source]

return the status of a job

see List Versions — GET version for the outcome

Parameters

jobid – jobid

Returns

json

You can extract the startTime by doing:

from datetime import datetime
st = client.job_status(<job_id>)
datetime.fromtimestamp(float(st["status"]["startTime"])/1000)

source on GitHub

ls(blob_service, container_name=None, path=None, add_metadata=False, as_df=True)[source]

return the content of a blob storage

Parameters
  • blob_service – blob service, returned by open_blob_service

  • container_name – None for all, its name otherwise

  • path – path in the container

  • add_metadata – add the metadata to the blob

  • as_df – if True, returns a DataFrame

Returns

list of dictionaries

Changed in version 1.1: Parameter add_metadata was added and the function now returns the property last_modified, parameter as_df

source on GitHub

static mask_string(s)[source]

return empty string or as many * as the length of the string

source on GitHub

open_blob_service()[source]

open a blob service

source on GitHub

pig_submit(blob_service, container_name, pig_file, dependencies=None, status_dir=None, stop_on_failure=True, params=None)[source]

Submits a :epkg:`PIG` job, the function uploads it to the cluster as well as the dependencies.

The code comes from How to use HDInsight from Linux and start a Pig + Jython job in HDInsight thru WebHCat. The API is described at Pig Job — POST pig.

Parameters
  • blob_service – returns by open_blob_service

  • container_name – name of a container

  • pig_file – path to the job in the blob storage

  • dependencies – dependencies

  • status_dir – folder used by Hadoop to store job’s progress, it should contain your alias if you want to avoid collision with others’ jobs

  • stop_on_failure – stop on failure, do not wait as long as possible

  • params – to

Returns

json

Submit a job PIG

The script PIG must include an instruction LOAD. This instruction use file name defined with the wasb syntax.

If you place the string $CONTAINER before a stream name, it should be replaced by the corresponding wasb syntax associated to the container name defined by container_name. The function will then load your script, modify it and save another one with the by adding .wasb.pig. Others constants you could use:

  • $PSEUDO

  • $CONTAINER

  • $SCRIPTSPIG

However, this replacement is not done by this class, but your code could be such as:

blobstorage = "****"
blobpassword = "*********************"
hadoop_name = "*********"
hadoop_password = "********"
username = "********"
cl = AzureClient(blobstorage,
                blobpassword,
                hadoop_name,
                hadoop_password,
                username)
script = '''
    myinput = LOAD '$CONTAINER/input.csv'
            using PigStorage(',')
            AS (index:long, sequence, tag, timestamp:long, dateformat, x:double,y:double, z:double, activity) ;
    filt = FILTER myinput BY activity == 'walking' ;
    STORE filt INTO '$PSEUDO/output.csv' USING PigStorage() ;
    '''

with open("script_walking.pig","w") as f :
    f.write(script)

bs = cl.open_blob_service()
js = cl.pig_submit(bs, blobstorage, "testensae/script_walking.pig")
print(js)

js = cl.job_status('job_1414863995725_0013')

New in version 1.1: parameter stop_on_failure

source on GitHub

standard_outputs(job_id, blob_service, container, folder)[source]

returns the standard output and error for a specific job id

Parameters
  • job_id – job_id or status

  • blob_service – returns by open_blob_service

  • container_name – name of a container

@parm folder folder where to download them

Returns

out, err

source on GitHub

upload(blob_service, container_name, blob_name, file_path)[source]

Uploads data from a file to a blob storage. No more than 64Mb can be uploaded at the same, it needs to be split into pieces.

Parameters
  • blob_service – returns by open_blob_service

  • container_name – container name

  • blob_name – blob name (remote file name)

  • file_path – local file path

Returns

list of uploaded blob names

The code comes from Utilisation du service de stockage d’objets blob à partir de Python.

source on GitHub

upload_data(blob_service, container_name, blob_name, data)[source]

Uploads data (bytes) to a blob storage. No more than 64Mb can be uploaded at the same, it needs to be split into pieces.

Parameters
  • blob_service – returns by open_blob_service

  • container_name – container name

  • blob_name – blob name (remote file name)

  • data – bytes

Returns

list of uploaded blob names

The code comes from Utilisation du service de stockage d’objets blob à partir de Python.

New in version 1.1.

source on GitHub

url_blob(blob_service, container, blob_name)[source]

returns an url for a blob file name

Parameters
  • container – container

  • blob_name – blob_name

Returns

url

source on GitHub

url_webHCatUrl(cmd)[source]

returns an url to the cluster

Parameters

cmd – something like pig, status

Returns

url

source on GitHub

wait_job(job_id, delay=5, fLOG=<function noLOG>)[source]

wait until a job has run or failed

Parameters
  • job_id – job_id

  • delay – check every N seconds

Returns

status

New in version 1.1.

source on GitHub

wasb_prefix(container_name)[source]

when using an instruction LOAD in a PIG script, file blob name must be reference using a wasb syntax. This method returns the prefix to add.

Returns

wasb prefix

source on GitHub

wasb_to_file(container_name, blob_file)[source]

return something like wasb://demo@myblobstorage.blob...

Parameters
  • container_name – name of a container

  • blob_file – path to a file

Returns

return a url to blob file (pig script for example)

source on GitHub