module remote.azure_connection

Inheritance diagram of pyenbc.remote.azure_connection

Short summary

module pyenbc.remote.azure_connection

A class to help connect with a remote machine and send command line.

source on GitHub

Classes

class truncated documentation
AzureClient A simple class to access and communicate with Azure. It requires modules:

Static Methods

staticmethod truncated documentation
mask_string return empty string or as many * as the length of the string

Methods

method truncated documentation
__init__ constructor
__str__ usual
_interpret_path replace variavble such as $PSEUDO, $USERNAME
copy_blob copy a blob
delete_blob delete a blob
delete_folder delete a folder and its content
df_head Downloads the beginning of a stream and displays as a DataFrame.
download Downloads data from a blob storage to a file. No more than 64Mb can be downloaded at the same, it needs to be split …
download_data Downloads data from a blob storage and return bytes. No more than 64Mb can be downloaded at the same, it needs …
download_merge Downloads all files from a folder in a blob storage to a single local file. Files will be merged. No more …
exists test the existence of a path on the blob storage
get_status return the status of the webHCatUrl server
get_version return the status of the WebHCat version
hive_submit Submits a :epkg:`HIVE` job, the function uploads it to the cluster as well as the dependencies. The code …
job_kill kills a job see Delete Job — DELETE queue/:jobid
job_queue returns the list of jobs It uses the API Job Information — GET queue/:jobid. …
job_status return the status of a job see List Versions — GET version
ls return the content of a blob storage
open_blob_service open a blob service
pig_submit Submits a :epkg:`PIG` job, the function uploads it to the cluster as well as the dependencies. The code …
standard_outputs returns the standard output and error for a specific job id
upload Uploads data from a file to a blob storage. No more than 64Mb can be uploaded at the same, it needs to be split …
upload_data Uploads data (bytes) to a blob storage. No more than 64Mb can be uploaded at the same, it needs to be split into …
url_blob returns an url for a blob file name
url_webHCatUrl returns an url to the cluster
wait_job wait until a job has run or failed
wasb_prefix when using an instruction LOAD in a PIG script, file blob name must be reference using a wasb syntax. …
wasb_to_file return something like wasb://demo@myblobstorage.blob...

Documentation

A class to help connect with a remote machine and send command line.

source on GitHub

class pyenbc.remote.azure_connection.AzureClient(blob_name, blob_key, hadoop_name=None, hadoop_key=None, hadoop_user_name='admin', pseudo='any', fLOG=None)[source]

Bases: object

A simple class to access and communicate with Azure. It requires modules:

Main functionalities related to blob:

  • list_containers, create_container, list_blobs, put_blob, put_block_blob_from_bytes
  • put_block_blob_from_text, put_page_blob_from_file, get_blob, get_blob

See How to use Blob storage from Python.

Get the list of containers and files from a blob storage?

The functionalities of a BlobService are described in blockblobservice.py.

from pyenbc.remote.azure_connection import AzureClient
cl = AzureClient("<blob_storage_service>",
                "<primary_key>")
bs = cl.open_blob_service()
res = cl.ls(bs)
for r in res:
    print(r["name"])

Upload, download, to a blob storage

The following example uploads and downloads a file on a Blob Storage.

from pyenbc.remote.azure_connection import AzureClient
cl = AzureClient("<blob_storage_service>",
                "<primary_key>")
bs = cl.open_blob_service()
cl.upload(bs, "<container>", "myremotefolder/remotename.txt",
                            "local_filename.txt")

res = cl.ls(bs,"<container>")
for r in res:
    if "local_filename" in r["name"]:
        print(r)

cl.download(bs, "<container>", "myremotefolder/remotename.txt",
                            "another_local_filename.txt")

Many function uses WebHCat API. The error codes can be found here: Error Codes and Responses.

Changed in version PSEUDO,: CONTAINER, SCRIPT will be passed to the script as parameters

source on GitHub

constructor

Parameters:
  • blob_name – blob storage name
  • blob_key – account key for the blob storage
  • hadoop_name – hadoop server name (can be None if HDInsight is not used)
  • hadoop_key – hadoop key (can be None if HDInsight is not used)
  • pseudo – sometimes, the same identification is used to connect to HDInsight, the pseudo is meant to avoid collisions
  • fLOG – logging function

source on GitHub

__init__(blob_name, blob_key, hadoop_name=None, hadoop_key=None, hadoop_user_name='admin', pseudo='any', fLOG=None)[source]

constructor

Parameters:
  • blob_name – blob storage name
  • blob_key – account key for the blob storage
  • hadoop_name – hadoop server name (can be None if HDInsight is not used)
  • hadoop_key – hadoop key (can be None if HDInsight is not used)
  • pseudo – sometimes, the same identification is used to connect to HDInsight, the pseudo is meant to avoid collisions
  • fLOG – logging function

source on GitHub

__str__()[source]

usual

source on GitHub

_blob_properties = ['copy_completion_time', 'content_encoding', 'content_language', 'blob_type', 'copy_status_description', 'copy_id', 'content_md5', 'lease_duration', 'copy_source', 'content_type', 'content_length', 'lease_state', 'copy_progress', 'copy_status', 'xms_blob_sequence_number', 'lease_status', 'etag', 'last_modified']
_chunk_size = 4194304
_interpret_path(blob_path)[source]

replace variavble such as $PSEUDO, $USERNAME

Parameters:blob_path – path
Returns:modified path

New in version 1.1.

source on GitHub

copy_blob(blob_service, container, blob_name, source)[source]

copy a blob

Parameters:
  • blob_service – returns by open_blob_service
  • container_name – container name
  • blob_name – destination
  • source – source

source on GitHub

delete_blob(blob_service, container_name, blob_name)[source]

delete a blob

Parameters:
  • blob_service – returns by open_blob_service
  • container_name – container name
  • blob_name – blob name (remote file name)

source on GitHub

delete_folder(blob_service, container_name, blob_folder)[source]

delete a folder and its content

Parameters:
  • blob_service – returns by open_blob_service
  • container_name – container name
  • blob_folder – blob folder (remote folder name)

New in version 1.1.

source on GitHub

df_head(blob_service, container_name, blob_name, stop_at=1048576, encoding='utf-8', as_df=True, merge=False, **options)[source]

Downloads the beginning of a stream and displays as a DataFrame.

Parameters:
  • blob_service – returns by open_blob_service
  • container_name – container name
  • blob_name – blob name (or list of blob names) (remote file name)
  • stop_at – stop at a given size (None to avoid stopping)
  • encoding – encoding
  • as_df – result as a dataframe or a string
  • merge – if True, blob_name is a folder, method download_merge is called
  • options – see read_csv
Returns:

local file or bytes if file_path is None

New in version 1.1.

source on GitHub

download(blob_service, container_name, blob_name, file_path=None, append=False, chunk_size=None, stop_at=None)[source]

Downloads data from a blob storage to a file. No more than 64Mb can be downloaded at the same, it needs to be split into pieces.

Parameters:
  • blob_service – returns by open_blob_service
  • container_name – container name
  • blob_name – blob name (or list of blob names) (remote file name)
  • file_path – local file path
  • append – if True, append the content to an existing file
  • chunk_size – download by chunk
  • stop_at – stop at a given size (None to avoid stopping)
Returns:

local file or bytes if file_path is None

The code comes from Utilisation du service de stockage d’objets blob à partir de Python.

Changed in version 1.1: Parameters append, chunk_size were added. If file_path is None (default value now), the function returns bytes.

source on GitHub

download_data(blob_service, container_name, blob_name, chunk_size=None, stop_at=None)[source]

Downloads data from a blob storage and return bytes. No more than 64Mb can be downloaded at the same, it needs to be split into pieces.

Parameters:
  • blob_service – returns by open_blob_service
  • container_name – container name
  • blob_name – blob name (or list of blob names) (remote file name)
  • chunk_size – download by chunk
  • stop_at – stop at a given size (None to avoid stopping)
Returns:

local file or bytes if file_path is None

New in version 1.1.

source on GitHub

download_merge(blob_service, container_name, blob_folder, file_path=None, chunk_size=None, stop_at=None)[source]

Downloads all files from a folder in a blob storage to a single local file. Files will be merged. No more than 64Mb can be downloaded at the same, it needs to be split into pieces.

Parameters:
  • blob_service – returns by open_blob_service
  • container_name – container name
  • blob_folder – blob folder(remote file name)
  • file_path – local file path
  • chunk_size – download by chunk
  • stop_at – stop at a given size (None to avoid stopping)
Returns:

local file

Changed in version 1.1: Parameters append, chunk_size were added. If file_path is None (default value now), the function returns bytes.

source on GitHub

exists(blob_service, container_name, path)[source]

test the existence of a path on the blob storage

Parameters:
  • blob_service – blob service, returned by open_blob_service
  • container_name – None for all, its name otherwise
  • path – path in the container
Returns:

boolean

New in version 1.1.

source on GitHub

get_status()[source]

return the status of the webHCatUrl server

Returns:json

source on GitHub

get_version()[source]

return the status of the WebHCat version

Returns:json

source on GitHub

hive_submit(blob_service, container_name, hive_file, dependencies=None, status_dir=None, stop_on_failure=True, params=None)[source]

Submits a :epkg:`HIVE` job, the function uploads it to the cluster as well as the dependencies.

The code comes from How to use HDInsight from Linux and start a Pig + Jython job in HDInsight thru WebHCat. The API is described at Pig Job — POST pig.

Parameters:
  • blob_service – returns by open_blob_service
  • container_name – name of a container
  • hive_file – path to the job in the blob storage
  • dependencies – dependencies
  • status_dir – folder used by Hadoop to store job’s progress, it should contain your alias if you want to avoid collision with others’ jobs
  • stop_on_failure – stop on failure, do not wait as long as possible
  • params – to
Returns:

json

New in version 1.1.

source on GitHub

job_kill(jobid)[source]

kills a job

see Delete Job — DELETE queue/:jobid for the outcome

Parameters:jobid – jobid
Returns:json

source on GitHub

job_queue(showall=False, fields=None)[source]

returns the list of jobs

It uses the API Job Information — GET queue/:jobid.

Parameters:
  • showall – if True, show all your jobs (not only yours)
  • fields – to add fields in the requests
Returns:

list of jobs

List job queue

Most of the time, a job remains stuck in the job queue because it is full. Here is a code to check that is the case on a Azure cluster. It should be executed from a notebook.

Connection

blobstorage = "..."
blobpassword = "..."
hadoop_server = "..."
hadoop_password = "..."
username = "..."

%load_ext pyenbc
client, bs = %hd_open

Job queue

res = client.job_queue()
res.reverse()   # last submitted jobs first

Displays the first 20 jobs:

for i, r in enumerate(res[:20]):
    st = client.job_status(r["id"])
    print(i, r, st["status"]["state"],datetime.fromtimestamp(float(st["status"]["startTime"])/1000), st["status"]["jobName"])
    print(st["userargs"].get("file", None), st["profile"].get("jobName", None))

It gives:

0 {'detail': None, 'id': 'job_1451961118663_3126'} PREP 2016-01-26 21:57:28.756000 TempletonControllerJob
wasb://..../scripts/pig/titi.pig TempletonControllerJob
1 {'detail': None, 'id': 'job_1451961118663_3125'} PREP 2016-01-26 21:57:28.517999 TempletonControllerJob
wasb://..../scripts/pig/pre_processing.pig TempletonControllerJob
2 {'detail': None, 'id': 'job_1451961118663_3124'} PREP 2016-01-26 21:50:32.742000 TempletonControllerJob
wasb://..../scripts/pig/titi.pig TempletonControllerJob
3 {'detail': None, 'id': 'job_1451961118663_3123'} RUNNING 2016-01-26 21:46:57.219000 TempletonControllerJob
wasb://..../scripts/pig/alg1.pig TempletonControllerJob
4 {'detail': None, 'id': 'job_1451961118663_3122'} SUCCEEDED 2016-01-26 21:40:34.687999 PigLatin:pre_processing.pig
None PigLatin:pre_processing.pig
5 {'detail': None, 'id': 'job_1451961118663_3121'} RUNNING 2016-01-26 21:41:29.657000 TempletonControllerJob
wasb://..../scripts/pig/Algo_LDA2.pig TempletonControllerJob
6 {'detail': None, 'id': 'job_1451961118663_3120'} SUCCEEDED 2016-01-26 21:40:06.859999 TempletonControllerJob
wasb://..../scripts/pig/alg1.pig TempletonControllerJob

To kill a job:

client.job_kill("id")

source on GitHub

job_status(jobid)[source]

return the status of a job

see List Versions — GET version for the outcome

Parameters:jobid – jobid
Returns:json

You can extract the startTime by doing:

from datetime import datetime
st = client.job_status(<job_id>)
datetime.fromtimestamp(float(st["status"]["startTime"])/1000)

source on GitHub

ls(blob_service, container_name=None, path=None, add_metadata=False, as_df=True)[source]

return the content of a blob storage

Parameters:
  • blob_service – blob service, returned by open_blob_service
  • container_name – None for all, its name otherwise
  • path – path in the container
  • add_metadata – add the metadata to the blob
  • as_df – if True, returns a DataFrame
Returns:

list of dictionaries

Changed in version 1.1: Parameter add_metadata was added and the function now returns the property last_modified, parameter as_df

source on GitHub

static mask_string(s)[source]

return empty string or as many * as the length of the string

source on GitHub

open_blob_service()[source]

open a blob service

source on GitHub

pig_submit(blob_service, container_name, pig_file, dependencies=None, status_dir=None, stop_on_failure=True, params=None)[source]

Submits a :epkg:`PIG` job, the function uploads it to the cluster as well as the dependencies.

The code comes from How to use HDInsight from Linux and start a Pig + Jython job in HDInsight thru WebHCat. The API is described at Pig Job — POST pig.

Parameters:
  • blob_service – returns by open_blob_service
  • container_name – name of a container
  • pig_file – path to the job in the blob storage
  • dependencies – dependencies
  • status_dir – folder used by Hadoop to store job’s progress, it should contain your alias if you want to avoid collision with others’ jobs
  • stop_on_failure – stop on failure, do not wait as long as possible
  • params – to
Returns:

json

Submit a job PIG

The script PIG must include an instruction LOAD. This instruction use file name defined with the wasb syntax.

If you place the string $CONTAINER before a stream name, it should be replaced by the corresponding wasb syntax associated to the container name defined by container_name. The function will then load your script, modify it and save another one with the by adding .wasb.pig. Others constants you could use:

  • $PSEUDO
  • $CONTAINER
  • $SCRIPTSPIG

However, this replacement is not done by this class, but your code could be such as:

blobstorage = "****"
blobpassword = "*********************"
hadoop_name = "*********"
hadoop_password = "********"
username = "********"
cl = AzureClient(blobstorage,
                blobpassword,
                hadoop_name,
                hadoop_password,
                username)
script = '''
    myinput = LOAD '$CONTAINER/input.csv'
            using PigStorage(',')
            AS (index:long, sequence, tag, timestamp:long, dateformat, x:double,y:double, z:double, activity) ;
    filt = FILTER myinput BY activity == 'walking' ;
    STORE filt INTO '$PSEUDO/output.csv' USING PigStorage() ;
    '''

with open("script_walking.pig","w") as f :
    f.write(script)

bs = cl.open_blob_service()
js = cl.pig_submit(bs, blobstorage, "testensae/script_walking.pig")
print(js)

js = cl.job_status('job_1414863995725_0013')

New in version 1.1: parameter stop_on_failure

source on GitHub

standard_outputs(job_id, blob_service, container, folder)[source]

returns the standard output and error for a specific job id

Parameters:
  • job_id – job_id or status
  • blob_service – returns by open_blob_service
  • container_name – name of a container

@parm folder folder where to download them

Returns:out, err

source on GitHub

upload(blob_service, container_name, blob_name, file_path)[source]

Uploads data from a file to a blob storage. No more than 64Mb can be uploaded at the same, it needs to be split into pieces.

Parameters:
  • blob_service – returns by open_blob_service
  • container_name – container name
  • blob_name – blob name (remote file name)
  • file_path – local file path
Returns:

list of uploaded blob names

The code comes from Utilisation du service de stockage d’objets blob à partir de Python.

source on GitHub

upload_data(blob_service, container_name, blob_name, data)[source]

Uploads data (bytes) to a blob storage. No more than 64Mb can be uploaded at the same, it needs to be split into pieces.

Parameters:
  • blob_service – returns by open_blob_service
  • container_name – container name
  • blob_name – blob name (remote file name)
  • data – bytes
Returns:

list of uploaded blob names

The code comes from Utilisation du service de stockage d’objets blob à partir de Python.

New in version 1.1.

source on GitHub

url_blob(blob_service, container, blob_name)[source]

returns an url for a blob file name

Parameters:
  • container – container
  • blob_name – blob_name
Returns:

url

source on GitHub

url_webHCatUrl(cmd)[source]

returns an url to the cluster

Parameters:cmd – something like pig, status
Returns:url

source on GitHub

wait_job(job_id, delay=5, fLOG=<function noLOG>)[source]

wait until a job has run or failed

Parameters:
  • job_id – job_id
  • delay – check every N seconds
Returns:

status

New in version 1.1.

source on GitHub

wasb_prefix(container_name)[source]

when using an instruction LOAD in a PIG script, file blob name must be reference using a wasb syntax. This method returns the prefix to add.

Returns:wasb prefix

source on GitHub

wasb_to_file(container_name, blob_file)[source]

return something like wasb://demo@myblobstorage.blob...

Parameters:
  • container_name – name of a container
  • blob_file – path to a file
Returns:

return a url to blob file (pig script for example)

source on GitHub