Examples

Azure

  1. Get the list of containers and files from a blob storage?
  2. List job queue
  3. Submit a job PIG
  4. Upload, download, to a blob storage

Get the list of containers and files from a blob storage?

The functionalities of a BlobService are described in blockblobservice.py.

from pyenbc.remote.azure_connection import AzureClient
cl = AzureClient("<blob_storage_service>",
                "<primary_key>")
bs = cl.open_blob_service()
res = cl.ls(bs)
for r in res:
    print(r["name"])

(original entry : azure_connection.py:docstring of pyenbc.remote.azure_connection.AzureClient, line 17)

List job queue

Most of the time, a job remains stuck in the job queue because it is full. Here is a code to check that is the case on a Azure cluster. It should be executed from a notebook.

Connection

blobstorage = "..."
blobpassword = "..."
hadoop_server = "..."
hadoop_password = "..."
username = "..."

%load_ext pyenbc
client, bs = %hd_open

Job queue

res = client.job_queue()
res.reverse()   # last submitted jobs first

Displays the first 20 jobs:

for i, r in enumerate(res[:20]):
    st = client.job_status(r["id"])
    print(i, r, st["status"]["state"],datetime.fromtimestamp(float(st["status"]["startTime"])/1000), st["status"]["jobName"])
    print(st["userargs"].get("file", None), st["profile"].get("jobName", None))

It gives:

0 {'detail': None, 'id': 'job_1451961118663_3126'} PREP 2016-01-26 21:57:28.756000 TempletonControllerJob
wasb://..../scripts/pig/titi.pig TempletonControllerJob
1 {'detail': None, 'id': 'job_1451961118663_3125'} PREP 2016-01-26 21:57:28.517999 TempletonControllerJob
wasb://..../scripts/pig/pre_processing.pig TempletonControllerJob
2 {'detail': None, 'id': 'job_1451961118663_3124'} PREP 2016-01-26 21:50:32.742000 TempletonControllerJob
wasb://..../scripts/pig/titi.pig TempletonControllerJob
3 {'detail': None, 'id': 'job_1451961118663_3123'} RUNNING 2016-01-26 21:46:57.219000 TempletonControllerJob
wasb://..../scripts/pig/alg1.pig TempletonControllerJob
4 {'detail': None, 'id': 'job_1451961118663_3122'} SUCCEEDED 2016-01-26 21:40:34.687999 PigLatin:pre_processing.pig
None PigLatin:pre_processing.pig
5 {'detail': None, 'id': 'job_1451961118663_3121'} RUNNING 2016-01-26 21:41:29.657000 TempletonControllerJob
wasb://..../scripts/pig/Algo_LDA2.pig TempletonControllerJob
6 {'detail': None, 'id': 'job_1451961118663_3120'} SUCCEEDED 2016-01-26 21:40:06.859999 TempletonControllerJob
wasb://..../scripts/pig/alg1.pig TempletonControllerJob

To kill a job:

client.job_kill("id")

(original entry : azure_connection.py:docstring of pyenbc.remote.azure_connection.AzureClient.job_queue, line 9)

Submit a job PIG

The script PIG must include an instruction LOAD. This instruction use file name defined with the wasb syntax.

If you place the string $CONTAINER before a stream name, it should be replaced by the corresponding wasb syntax associated to the container name defined by container_name. The function will then load your script, modify it and save another one with the by adding .wasb.pig. Others constants you could use:

  • $PSEUDO
  • $CONTAINER
  • $SCRIPTSPIG

However, this replacement is not done by this class, but your code could be such as:

blobstorage = "****"
blobpassword = "*********************"
hadoop_name = "*********"
hadoop_password = "********"
username = "********"
cl = AzureClient(blobstorage,
                blobpassword,
                hadoop_name,
                hadoop_password,
                username)
script = '''
    myinput = LOAD '$CONTAINER/input.csv'
            using PigStorage(',')
            AS (index:long, sequence, tag, timestamp:long, dateformat, x:double,y:double, z:double, activity) ;
    filt = FILTER myinput BY activity == 'walking' ;
    STORE filt INTO '$PSEUDO/output.csv' USING PigStorage() ;
    '''

with open("script_walking.pig","w") as f :
    f.write(script)

bs = cl.open_blob_service()
js = cl.pig_submit(bs, blobstorage, "testensae/script_walking.pig")
print(js)

js = cl.job_status('job_1414863995725_0013')

(original entry : azure_connection.py:docstring of pyenbc.remote.azure_connection.AzureClient.pig_submit, line 22)

Upload, download, to a blob storage

The following example uploads and downloads a file on a Blob Storage.

from pyenbc.remote.azure_connection import AzureClient
cl = AzureClient("<blob_storage_service>",
                "<primary_key>")
bs = cl.open_blob_service()
cl.upload(bs, "<container>", "myremotefolder/remotename.txt",
                            "local_filename.txt")

res = cl.ls(bs,"<container>")
for r in res:
    if "local_filename" in r["name"]:
        print(r)

cl.download(bs, "<container>", "myremotefolder/remotename.txt",
                            "another_local_filename.txt")

(original entry : azure_connection.py:docstring of pyenbc.remote.azure_connection.AzureClient, line 34)

Hadoop

  1. How to open a remote shell?
  2. Submit a HIVE query

How to open a remote shell?

ssh = ASSHClient(   "<server>",
                    "<login>",
                    "<password>")
ssh.connect()
out = ssh.send_recv_session("ls")
print( ssh.send_recv_session("python") )
print( ssh.send_recv_session("print('3')") )
print( ssh.send_recv_session("import sys\nsys.executable") )
print( ssh.send_recv_session("sys.exit()") )
print( ssh.send_recv_session(None) )
ssh.close_session()
ssh.close()

The notebook Communication with a remote Linux machine through SSH illustrates the output of these instructions.

(original entry : ssh_remote_connection.py:docstring of pyenbc.remote.ssh_remote_connection.ASSHClient.open_session, line 10)

Submit a HIVE query

client = ASSHClient()

hive_sql = '''
    DROP TABLE IF EXISTS bikes20;
    CREATE TABLE bikes20 (sjson STRING);
    LOAD DATA INPATH "/user/__USERNAME__/unittest2/paris*.txt" INTO TABLE bikes20;
    SELECT * FROM bikes20 LIMIT 10;
    '''.replace("__USERNAME__", self.client.username)

out,err = client.hive_submit(hive_sql, redirection=None)

(original entry : ssh_remote_connection.py:docstring of pyenbc.remote.ssh_remote_connection.ASSHClient.hive_submit, line 30)