module remote.azure_connection
¶
Short summary¶
module pyenbc.remote.azure_connection
A class to help connect with a remote machine and send command line.
Classes¶
class |
truncated documentation |
---|---|
A simple class to access and communicate with Azure. It requires modules: |
Static Methods¶
staticmethod |
truncated documentation |
---|---|
return empty string or as many |
Methods¶
method |
truncated documentation |
---|---|
constructor |
|
usual |
|
replace variavble such as |
|
copy a blob |
|
delete a blob |
|
delete a folder and its content |
|
Downloads the beginning of a stream and displays as a DataFrame. |
|
Downloads data from a blob storage to a file. No more than 64Mb can be downloaded at the same, it needs to be split … |
|
Downloads data from a blob storage and return bytes. No more than 64Mb can be downloaded at the same, it needs … |
|
Downloads all files from a folder in a blob storage to a single local file. Files will be merged. No more … |
|
test the existence of a path on the blob storage |
|
return the status of the webHCatUrl server |
|
return the status of the WebHCat version |
|
Submits a :epkg:`HIVE` job, the function uploads it to the cluster as well as the dependencies. The code … |
|
kills a job see Delete Job — DELETE queue/:jobid … |
|
returns the list of jobs It uses the API Job Information — GET queue/:jobid. … |
|
return the status of a job see List Versions — GET version … |
|
return the content of a blob storage |
|
open a blob service |
|
Submits a :epkg:`PIG` job, the function uploads it to the cluster as well as the dependencies. The code … |
|
returns the standard output and error for a specific job id |
|
Uploads data from a file to a blob storage. No more than 64Mb can be uploaded at the same, it needs to be split … |
|
Uploads data (bytes) to a blob storage. No more than 64Mb can be uploaded at the same, it needs to be split into … |
|
returns an url for a blob file name |
|
returns an url to the cluster |
|
wait until a job has run or failed |
|
when using an instruction |
|
return something like |
Documentation¶
A class to help connect with a remote machine and send command line.
- class pyenbc.remote.azure_connection.AzureClient(blob_name, blob_key, hadoop_name=None, hadoop_key=None, hadoop_user_name='admin', pseudo='any', fLOG=None)¶
Bases:
object
A simple class to access and communicate with Azure. It requires modules:
Main functionalities related to blob:
list_containers, create_container, list_blobs, put_blob, put_block_blob_from_bytes
put_block_blob_from_text, put_page_blob_from_file, get_blob, get_blob
See How to use Blob storage from Python.
Get the list of containers and files from a blob storage?
The functionalities of a
BlobService
are described in blockblobservice.py.from pyenbc.remote.azure_connection import AzureClient cl = AzureClient("<blob_storage_service>", "<primary_key>") bs = cl.open_blob_service() res = cl.ls(bs) for r in res: print(r["name"])
Upload, download, to a blob storage
The following example uploads and downloads a file on a Blob Storage.
from pyenbc.remote.azure_connection import AzureClient cl = AzureClient("<blob_storage_service>", "<primary_key>") bs = cl.open_blob_service() cl.upload(bs, "<container>", "myremotefolder/remotename.txt", "local_filename.txt") res = cl.ls(bs,"<container>") for r in res: if "local_filename" in r["name"]: print(r) cl.download(bs, "<container>", "myremotefolder/remotename.txt", "another_local_filename.txt")
Many function uses WebHCat API. The error codes can be found here: Error Codes and Responses.
Changed in version PSEUDO,: CONTAINER, SCRIPT will be passed to the script as parameters
constructor
- Parameters:
blob_name – blob storage name
blob_key – account key for the blob storage
hadoop_name – hadoop server name (can be None if HDInsight is not used)
hadoop_key – hadoop key (can be None if HDInsight is not used)
pseudo – sometimes, the same identification is used to connect to HDInsight, the pseudo is meant to avoid collisions
fLOG – logging function
- __init__(blob_name, blob_key, hadoop_name=None, hadoop_key=None, hadoop_user_name='admin', pseudo='any', fLOG=None)¶
constructor
- Parameters:
blob_name – blob storage name
blob_key – account key for the blob storage
hadoop_name – hadoop server name (can be None if HDInsight is not used)
hadoop_key – hadoop key (can be None if HDInsight is not used)
pseudo – sometimes, the same identification is used to connect to HDInsight, the pseudo is meant to avoid collisions
fLOG – logging function
- __str__()¶
usual
- _blob_properties = ['copy_completion_time', 'content_encoding', 'content_language', 'blob_type', 'copy_status_description', 'copy_id', 'content_md5', 'lease_duration', 'copy_source', 'content_type', 'content_length', 'lease_state', 'copy_progress', 'copy_status', 'xms_blob_sequence_number', 'lease_status', 'etag', 'last_modified']¶
- _chunk_size = 4194304¶
- _interpret_path(blob_path)¶
replace variavble such as
$PSEUDO
,$USERNAME
- Parameters:
blob_path – path
- Returns:
modified path
New in version 1.1.
- copy_blob(blob_service, container, blob_name, source)¶
copy a blob
- Parameters:
blob_service – returns by
open_blob_service
container_name – container name
blob_name – destination
source – source
- delete_blob(blob_service, container_name, blob_name)¶
delete a blob
- Parameters:
blob_service – returns by
open_blob_service
container_name – container name
blob_name – blob name (remote file name)
- delete_folder(blob_service, container_name, blob_folder)¶
delete a folder and its content
- Parameters:
blob_service – returns by
open_blob_service
container_name – container name
blob_folder – blob folder (remote folder name)
New in version 1.1.
- df_head(blob_service, container_name, blob_name, stop_at=1048576, encoding='utf-8', as_df=True, merge=False, **options)¶
Downloads the beginning of a stream and displays as a DataFrame.
- Parameters:
blob_service – returns by
open_blob_service
container_name – container name
blob_name – blob name (or list of blob names) (remote file name)
stop_at – stop at a given size (None to avoid stopping)
encoding – encoding
as_df – result as a dataframe or a string
merge – if True, blob_name is a folder, method
download_merge
is calledoptions – see read_csv
- Returns:
local file or bytes if file_path is None
New in version 1.1.
- download(blob_service, container_name, blob_name, file_path=None, append=False, chunk_size=None, stop_at=None)¶
Downloads data from a blob storage to a file. No more than 64Mb can be downloaded at the same, it needs to be split into pieces.
- Parameters:
blob_service – returns by
open_blob_service
container_name – container name
blob_name – blob name (or list of blob names) (remote file name)
file_path – local file path
append – if True, append the content to an existing file
chunk_size – download by chunk
stop_at – stop at a given size (None to avoid stopping)
- Returns:
local file or bytes if file_path is None
The code comes from Utilisation du service de stockage d’objets blob à partir de Python.
Changed in version 1.1: Parameters append, chunk_size were added. If file_path is None (default value now), the function returns bytes.
- download_data(blob_service, container_name, blob_name, chunk_size=None, stop_at=None)¶
Downloads data from a blob storage and return bytes. No more than 64Mb can be downloaded at the same, it needs to be split into pieces.
- Parameters:
blob_service – returns by
open_blob_service
container_name – container name
blob_name – blob name (or list of blob names) (remote file name)
chunk_size – download by chunk
stop_at – stop at a given size (None to avoid stopping)
- Returns:
local file or bytes if file_path is None
New in version 1.1.
- download_merge(blob_service, container_name, blob_folder, file_path=None, chunk_size=None, stop_at=None)¶
Downloads all files from a folder in a blob storage to a single local file. Files will be merged. No more than 64Mb can be downloaded at the same, it needs to be split into pieces.
- Parameters:
blob_service – returns by
open_blob_service
container_name – container name
blob_folder – blob folder(remote file name)
file_path – local file path
chunk_size – download by chunk
stop_at – stop at a given size (None to avoid stopping)
- Returns:
local file
Changed in version 1.1: Parameters append, chunk_size were added. If file_path is None (default value now), the function returns bytes.
- exists(blob_service, container_name, path)¶
test the existence of a path on the blob storage
- Parameters:
blob_service – blob service, returned by
open_blob_service
container_name – None for all, its name otherwise
path – path in the container
- Returns:
boolean
New in version 1.1.
- get_status()¶
return the status of the webHCatUrl server
- Returns:
json
- get_version()¶
return the status of the WebHCat version
- Returns:
json
- hive_submit(blob_service, container_name, hive_file, dependencies=None, status_dir=None, stop_on_failure=True, params=None)¶
Submits a :epkg:`HIVE` job, the function uploads it to the cluster as well as the dependencies.
The code comes from How to use HDInsight from Linux and start a Pig + Jython job in HDInsight thru WebHCat. The API is described at Pig Job — POST pig.
- Parameters:
blob_service – returns by
open_blob_service
container_name – name of a container
hive_file – path to the job in the blob storage
dependencies – dependencies
status_dir – folder used by Hadoop to store job’s progress, it should contain your alias if you want to avoid collision with others’ jobs
stop_on_failure – stop on failure, do not wait as long as possible
params – to
- Returns:
json
New in version 1.1.
- job_kill(jobid)¶
kills a job
see Delete Job — DELETE queue/:jobid for the outcome
- Parameters:
jobid – jobid
- Returns:
json
- job_queue(showall=False, fields=None)¶
returns the list of jobs
It uses the API Job Information — GET queue/:jobid.
- Parameters:
showall – if True, show all your jobs (not only yours)
fields – to add fields in the requests
- Returns:
list of jobs
List job queue
Most of the time, a job remains stuck in the job queue because it is full. Here is a code to check that is the case on a Azure cluster. It should be executed from a notebook.
Connection
blobstorage = "..." blobpassword = "..." hadoop_server = "..." hadoop_password = "..." username = "..." %load_ext pyenbc client, bs = %hd_open
Job queue
res = client.job_queue() res.reverse() # last submitted jobs first
Displays the first 20 jobs:
for i, r in enumerate(res[:20]): st = client.job_status(r["id"]) print(i, r, st["status"]["state"],datetime.fromtimestamp(float(st["status"]["startTime"])/1000), st["status"]["jobName"]) print(st["userargs"].get("file", None), st["profile"].get("jobName", None))
It gives:
0 {'detail': None, 'id': 'job_1451961118663_3126'} PREP 2016-01-26 21:57:28.756000 TempletonControllerJob wasb://..../scripts/pig/titi.pig TempletonControllerJob 1 {'detail': None, 'id': 'job_1451961118663_3125'} PREP 2016-01-26 21:57:28.517999 TempletonControllerJob wasb://..../scripts/pig/pre_processing.pig TempletonControllerJob 2 {'detail': None, 'id': 'job_1451961118663_3124'} PREP 2016-01-26 21:50:32.742000 TempletonControllerJob wasb://..../scripts/pig/titi.pig TempletonControllerJob 3 {'detail': None, 'id': 'job_1451961118663_3123'} RUNNING 2016-01-26 21:46:57.219000 TempletonControllerJob wasb://..../scripts/pig/alg1.pig TempletonControllerJob 4 {'detail': None, 'id': 'job_1451961118663_3122'} SUCCEEDED 2016-01-26 21:40:34.687999 PigLatin:pre_processing.pig None PigLatin:pre_processing.pig 5 {'detail': None, 'id': 'job_1451961118663_3121'} RUNNING 2016-01-26 21:41:29.657000 TempletonControllerJob wasb://..../scripts/pig/Algo_LDA2.pig TempletonControllerJob 6 {'detail': None, 'id': 'job_1451961118663_3120'} SUCCEEDED 2016-01-26 21:40:06.859999 TempletonControllerJob wasb://..../scripts/pig/alg1.pig TempletonControllerJob
To kill a job:
client.job_kill("id")
- job_status(jobid)¶
return the status of a job
see List Versions — GET version for the outcome
- Parameters:
jobid – jobid
- Returns:
json
You can extract the startTime by doing:
from datetime import datetime st = client.job_status(<job_id>) datetime.fromtimestamp(float(st["status"]["startTime"])/1000)
- ls(blob_service, container_name=None, path=None, add_metadata=False, as_df=True)¶
return the content of a blob storage
- Parameters:
blob_service – blob service, returned by
open_blob_service
container_name – None for all, its name otherwise
path – path in the container
add_metadata – add the metadata to the blob
as_df – if True, returns a DataFrame
- Returns:
list of dictionaries
Changed in version 1.1: Parameter add_metadata was added and the function now returns the property last_modified, parameter as_df
- static mask_string(s)¶
return empty string or as many
*
as the length of the string
- open_blob_service()¶
open a blob service
- pig_submit(blob_service, container_name, pig_file, dependencies=None, status_dir=None, stop_on_failure=True, params=None)¶
Submits a :epkg:`PIG` job, the function uploads it to the cluster as well as the dependencies.
The code comes from How to use HDInsight from Linux and start a Pig + Jython job in HDInsight thru WebHCat. The API is described at Pig Job — POST pig.
- Parameters:
blob_service – returns by
open_blob_service
container_name – name of a container
pig_file – path to the job in the blob storage
dependencies – dependencies
status_dir – folder used by Hadoop to store job’s progress, it should contain your alias if you want to avoid collision with others’ jobs
stop_on_failure – stop on failure, do not wait as long as possible
params – to
- Returns:
json
Submit a job PIG
The script PIG must include an instruction
LOAD
. This instruction use file name defined with the wasb syntax.If you place the string
$CONTAINER
before a stream name, it should be replaced by the corresponding wasb syntax associated to the container name defined bycontainer_name
. The function will then load your script, modify it and save another one with the by adding.wasb.pig
. Others constants you could use:$PSEUDO
$CONTAINER
$SCRIPTSPIG
However, this replacement is not done by this class, but your code could be such as:
blobstorage = "****" blobpassword = "*********************" hadoop_name = "*********" hadoop_password = "********" username = "********" cl = AzureClient(blobstorage, blobpassword, hadoop_name, hadoop_password, username) script = ''' myinput = LOAD '$CONTAINER/input.csv' using PigStorage(',') AS (index:long, sequence, tag, timestamp:long, dateformat, x:double,y:double, z:double, activity) ; filt = FILTER myinput BY activity == 'walking' ; STORE filt INTO '$PSEUDO/output.csv' USING PigStorage() ; ''' with open("script_walking.pig","w") as f : f.write(script) bs = cl.open_blob_service() js = cl.pig_submit(bs, blobstorage, "testensae/script_walking.pig") print(js) js = cl.job_status('job_1414863995725_0013')
New in version 1.1: parameter stop_on_failure
- standard_outputs(job_id, blob_service, container, folder)¶
returns the standard output and error for a specific job id
- Parameters:
job_id – job_id or status
blob_service – returns by
open_blob_service
container_name – name of a container
@parm folder folder where to download them
- Returns:
out, err
- upload(blob_service, container_name, blob_name, file_path)¶
Uploads data from a file to a blob storage. No more than 64Mb can be uploaded at the same, it needs to be split into pieces.
- Parameters:
blob_service – returns by
open_blob_service
container_name – container name
blob_name – blob name (remote file name)
file_path – local file path
- Returns:
list of uploaded blob names
The code comes from Utilisation du service de stockage d’objets blob à partir de Python.
- upload_data(blob_service, container_name, blob_name, data)¶
Uploads data (bytes) to a blob storage. No more than 64Mb can be uploaded at the same, it needs to be split into pieces.
- Parameters:
blob_service – returns by
open_blob_service
container_name – container name
blob_name – blob name (remote file name)
data – bytes
- Returns:
list of uploaded blob names
The code comes from Utilisation du service de stockage d’objets blob à partir de Python.
New in version 1.1.
- url_blob(blob_service, container, blob_name)¶
returns an url for a blob file name
- Parameters:
container – container
blob_name – blob_name
- Returns:
url
- url_webHCatUrl(cmd)¶
returns an url to the cluster
- Parameters:
cmd – something like
pig
,status
- Returns:
url
- wait_job(job_id, delay=5, fLOG=<function noLOG>)¶
wait until a job has run or failed
- Parameters:
job_id – job_id
delay – check every N seconds
- Returns:
status
New in version 1.1.
- wasb_prefix(container_name)¶
when using an instruction
LOAD
in a PIG script, file blob name must be reference using a wasb syntax. This method returns the prefix to add.- Returns:
wasb prefix
- wasb_to_file(container_name, blob_file)¶
return something like
wasb://demo@myblobstorage.blob...
- Parameters:
container_name – name of a container
blob_file – path to a file
- Returns:
return a url to blob file (pig script for example)