# Python Hadoop Pig

This notebook aims at showing how to submit a PIG job to remote hadoop cluster (tested with Cloudera). It works better if you know Hadoop otherwise I recommend reading [Map/Reduce avec PIG](http://www.xavierdupre.fr/app/ensae_teaching_cs/helpsphinx2/notebooks/td3a_cenonce_session6.html#td3acenoncesession6rst) (French). First, we download data. We are going to upload that data to the remote cluster. The Hadoop distribution tested here is [Cloudera](http://www.cloudera.com/).

In [1]:
import pyensae
%load_ext pyensae
%load_ext pyenbc
pyensae.download_data("ConfLongDemo_JSI.txt", website="https://archive.ics.uci.edu/ml/machine-learning-databases/00196/")

'ConfLongDemo_JSI.txt'

We open a SSH connection to the bridge which can communicate to the cluster.

In [2]:
import pyquickhelper.ipythonhelper as ipy
params={"server":"", "username":"", "password":""}
ipy.open_html_form(params=params,title="credentials",key_save="ssh_remote_hadoop")

In [3]:
password = ssh_remote_hadoop["password"]
server = ssh_remote_hadoop["server"]
username = ssh_remote_hadoop["username"]

We open the SSH connection:

In [4]:
%remote_open

<pyensae.remote.ssh_remote_connection.ASSHClient at 0xa2422e8>

We check  the content of the remote machine:

In [5]:
%remote_cmd ls -l

In [6]:
%remote_ls .

Unnamed: 0,Unnamed: 1,attributes,code,alias,folder,size,unit,name,isdir
-rw-rw-r--,1,xavierdupre,xavierdupre,1043,Jul,14,23:40,centrer_reduire.pig,False
-rw-r--r--,1,xavierdupre,xavierdupre,2,Jul,15,00:22,diff_cluster,False
-rw-rw-r--,1,xavierdupre,xavierdupre,0,Sep,27,00:21,dummy,False
-rw-rw-r--,1,xavierdupre,xavierdupre,290,Jul,14,23:48,init_random.pig,False
-rw-rw-r--,1,xavierdupre,xavierdupre,1654,Jul,15,00:20,iteration_complete.pig,False
-rw-rw-r--,1,xavierdupre,xavierdupre,235,Jul,14,23:37,nb_obervations.pig,False
-rw-rw-r--,1,xavierdupre,xavierdupre,1778,Jul,14,23:57,pig_1436911046432.log,False
-rw-rw-r--,1,xavierdupre,xavierdupre,4570,Jul,15,00:45,pig_1436913856496.log,False
-rw-rw-r--,1,xavierdupre,xavierdupre,4570,Jul,15,23:52,pig_1436997076356.log,False
-rw-rw-r--,1,xavierdupre,xavierdupre,574,Jul,15,23:51,post_traitement.pig,False


We check the content on the cluster:

In [7]:
%remote_cmd hdfs dfs -ls

In [8]:
%dfs_ls .

Unnamed: 0,attributes,code,alias,folder,size,date,time,name,isdir
0,drwx------,-,xavierdupre,xavierdupre,0,2015-09-27,02:00,.Trash,True
1,drwx------,-,xavierdupre,xavierdupre,0,2015-09-27,00:22,.staging,True
2,-rw-r--r--,3,xavierdupre,xavierdupre,132727,2014-11-16,02:37,ConfLongDemo_JSI.small.example.txt,False
3,drwxr-xr-x,-,xavierdupre,xavierdupre,0,2014-11-16,02:38,ConfLongDemo_JSI.small.example2.walking.txt,True
4,-rw-r--r--,3,xavierdupre,xavierdupre,3400818,2015-07-14,23:35,Skin_NonSkin.txt,False
5,drwxr-xr-x,-,xavierdupre,xavierdupre,0,2015-07-15,00:22,diff_cluster,True
6,drwxr-xr-x,-,xavierdupre,xavierdupre,0,2015-07-14,23:44,donnees_normalisees,True
7,drwxr-xr-x,-,xavierdupre,xavierdupre,0,2015-07-14,23:43,ecartstypes,True
8,drwxr-xr-x,-,xavierdupre,xavierdupre,0,2015-07-14,23:49,init_random,True
9,drwxr-xr-x,-,xavierdupre,xavierdupre,0,2015-07-14,23:41,moyennes,True


We upload the file on the bridge (we should zip it first, it would reduce the uploading time).

In [9]:
%remote_up ConfLongDemo_JSI.txt ConfLongDemo_JSI.txt

'ConfLongDemo_JSI.txt'

We check it got there:

In [10]:
%remote_cmd ls Conf*JSI.txt

We put it on the cluster:

In [11]:
%remote_cmd hdfs dfs -put ConfLongDemo_JSI.txt ConfLongDemo_JSI.txt

We check it was put on the cluster:

In [12]:
%remote_cmd hdfs dfs -ls Conf*JSI.txt

In [13]:
dfs_ls Conf*JSI.txt

Unnamed: 0,attributes,code,alias,folder,size,date,time,name,isdir
0,-rw-r--r--,3,xavierdupre,xavierdupre,21546346,2015-09-27,11:33,ConfLongDemo_JSI.txt,False


We create a simple PIG program:

In [14]:
%%PIG filter_example.pig

myinput = LOAD 'ConfLongDemo_JSI.txt' USING PigStorage(',') AS
    (index:long, sequence, tag, timestamp:long, dateformat, x:double,y:double, z:double, activity) ;
filt = FILTER myinput BY activity == 'walking' ;
STORE filt INTO 'ConfLongDemo_JSI.walking.txt' USING PigStorage() ;

In [15]:
%pig_submit filter_example.pig -r=filter_example.redirect

We check the redirected files were created:

In [16]:
%remote_cmd ls f*redirect*

We check the tail on a regular basis to see the job running (some other commands can be used to monitor jobs, ``%remote_cmd mapred --help``).

In [17]:
%remote_cmd tail filter_example.redirect.err

In [18]:
%remote_cmd hdfs dfs -ls Conf*JSI.walking.txt

In [19]:
%dfs_ls Conf*JSI.walking.txt

Unnamed: 0,attributes,code,alias,folder,size,date,time,name,isdir
0,-rw-r--r--,3,xavierdupre,xavierdupre,0,2015-09-27,11:38,ConfLongDemo_JSI.walking.txt/_SUCCESS,False
1,-rw-r--r--,3,xavierdupre,xavierdupre,0,2015-09-27,11:38,ConfLongDemo_JSI.walking.txt/part-m-00000,False


After that, the stream has to downloaded to the bridge and then to the local machine with ``%remote_down``. We finally close the connection.

In [20]:
%remote_close

True

**END**