Python Hadoop PigΒΆ
Links: notebook
, html, PDF
, python
, slides, GitHub
This notebook aims at showing how to submit a PIG job to remote hadoop cluster (tested with Cloudera). It works better if you know Hadoop otherwise I recommend reading Map/Reduce avec PIG (French). First, we download data. We are going to upload that data to the remote cluster. The Hadoop distribution tested here is Cloudera.
import pyensae
%load_ext pyensae
%load_ext pyenbc
pyensae.download_data("ConfLongDemo_JSI.txt", website="https://archive.ics.uci.edu/ml/machine-learning-databases/00196/")
'ConfLongDemo_JSI.txt'
We open a SSH connection to the bridge which can communicate to the cluster.
import pyquickhelper.ipythonhelper as ipy
params={"server":"", "username":"", "password":""}
ipy.open_html_form(params=params,title="credentials",key_save="ssh_remote_hadoop")
password
server
username
password = ssh_remote_hadoop["password"]
server = ssh_remote_hadoop["server"]
username = ssh_remote_hadoop["username"]
We open the SSH connection:
%remote_open
<pyensae.remote.ssh_remote_connection.ASSHClient at 0xa2422e8>
We check the content of the remote machine:
%remote_cmd ls -l
total 3404 -rw-rw-r-- 1 xavierdupre xavierdupre 1043 Jul 14 23:40 centrer_reduire.pig -rw-r--r-- 1 xavierdupre xavierdupre 2 Jul 15 00:22 diff_cluster -rw-rw-r-- 1 xavierdupre xavierdupre 0 Sep 27 00:21 dummy -rw-rw-r-- 1 xavierdupre xavierdupre 290 Jul 14 23:48 init_random.pig -rw-rw-r-- 1 xavierdupre xavierdupre 1654 Jul 15 00:20 iteration_complete.pig -rw-rw-r-- 1 xavierdupre xavierdupre 235 Jul 14 23:37 nb_obervations.pig -rw-rw-r-- 1 xavierdupre xavierdupre 1778 Jul 14 23:57 pig_1436911046432.log -rw-rw-r-- 1 xavierdupre xavierdupre 4570 Jul 15 00:45 pig_1436913856496.log -rw-rw-r-- 1 xavierdupre xavierdupre 4570 Jul 15 23:52 pig_1436997076356.log -rw-rw-r-- 1 xavierdupre xavierdupre 574 Jul 15 23:51 post_traitement.pig -rw-rw-r-- 1 xavierdupre xavierdupre 659 Sep 27 00:21 pystream.pig -rw-rw-r-- 1 xavierdupre xavierdupre 382 Sep 27 00:21 pystream.py -rw-rw-r-- 1 xavierdupre xavierdupre 26186 Jul 15 23:52 redirection.err -rw-rw-r-- 1 xavierdupre xavierdupre 0 Jul 15 23:51 redirection.out -rw-rw-r-- 1 xavierdupre xavierdupre 3400818 Jul 15 23:48 Skin_NonSkin.txt
%remote_ls .
attributes | code | alias | folder | size | unit | name | isdir | ||
---|---|---|---|---|---|---|---|---|---|
-rw-rw-r-- | 1 | xavierdupre | xavierdupre | 1043 | Jul | 14 | 23:40 | centrer_reduire.pig | False |
-rw-r--r-- | 1 | xavierdupre | xavierdupre | 2 | Jul | 15 | 00:22 | diff_cluster | False |
-rw-rw-r-- | 1 | xavierdupre | xavierdupre | 0 | Sep | 27 | 00:21 | dummy | False |
1 | xavierdupre | xavierdupre | 290 | Jul | 14 | 23:48 | init_random.pig | False | |
1 | xavierdupre | xavierdupre | 1654 | Jul | 15 | 00:20 | iteration_complete.pig | False | |
1 | xavierdupre | xavierdupre | 235 | Jul | 14 | 23:37 | nb_obervations.pig | False | |
1 | xavierdupre | xavierdupre | 1778 | Jul | 14 | 23:57 | pig_1436911046432.log | False | |
1 | xavierdupre | xavierdupre | 4570 | Jul | 15 | 00:45 | pig_1436913856496.log | False | |
1 | xavierdupre | xavierdupre | 4570 | Jul | 15 | 23:52 | pig_1436997076356.log | False | |
1 | xavierdupre | xavierdupre | 574 | Jul | 15 | 23:51 | post_traitement.pig | False | |
1 | xavierdupre | xavierdupre | 659 | Sep | 27 | 00:21 | pystream.pig | False | |
1 | xavierdupre | xavierdupre | 382 | Sep | 27 | 00:21 | pystream.py | False | |
1 | xavierdupre | xavierdupre | 26186 | Jul | 15 | 23:52 | redirection.err | False | |
1 | xavierdupre | xavierdupre | 0 | Jul | 15 | 23:51 | redirection.out | False | |
1 | xavierdupre | xavierdupre | 3400818 | Jul | 15 | 23:48 | Skin_NonSkin.txt | False |
We check the content on the cluster:
%remote_cmd hdfs dfs -ls
Found 33 items drwx------ - xavierdupre xavierdupre 0 2015-09-27 02:00 .Trash drwx------ - xavierdupre xavierdupre 0 2015-09-27 00:22 .staging -rw-r--r-- 3 xavierdupre xavierdupre 132727 2014-11-16 02:37 ConfLongDemo_JSI.small.example.txt drwxr-xr-x - xavierdupre xavierdupre 0 2014-11-16 02:38 ConfLongDemo_JSI.small.example2.walking.txt -rw-r--r-- 3 xavierdupre xavierdupre 3400818 2015-07-14 23:35 Skin_NonSkin.txt drwxr-xr-x - xavierdupre xavierdupre 0 2015-07-15 00:22 diff_cluster drwxr-xr-x - xavierdupre xavierdupre 0 2015-07-14 23:44 donnees_normalisees drwxr-xr-x - xavierdupre xavierdupre 0 2015-07-14 23:43 ecartstypes drwxr-xr-x - xavierdupre xavierdupre 0 2015-07-14 23:49 init_random drwxr-xr-x - xavierdupre xavierdupre 0 2015-07-14 23:41 moyennes drwxr-xr-x - xavierdupre xavierdupre 0 2015-07-14 23:38 nb_obervations drwxr-xr-x - xavierdupre xavierdupre 0 2015-07-15 00:05 output_iter1 drwxr-xr-x - xavierdupre xavierdupre 0 2015-07-15 00:22 output_iter10 drwxr-xr-x - xavierdupre xavierdupre 0 2015-07-15 00:07 output_iter2 drwxr-xr-x - xavierdupre xavierdupre 0 2015-07-15 00:09 output_iter3 drwxr-xr-x - xavierdupre xavierdupre 0 2015-07-15 00:11 output_iter4 drwxr-xr-x - xavierdupre xavierdupre 0 2015-07-15 00:13 output_iter5 drwxr-xr-x - xavierdupre xavierdupre 0 2015-07-15 00:15 output_iter6 drwxr-xr-x - xavierdupre xavierdupre 0 2015-07-15 00:17 output_iter7 drwxr-xr-x - xavierdupre xavierdupre 0 2015-07-15 00:18 output_iter8 drwxr-xr-x - xavierdupre xavierdupre 0 2015-07-15 00:20 output_iter9 -rw-r--r-- 3 xavierdupre xavierdupre 461444 2014-11-20 01:33 paris.2014-11-11_22-00-18.331391.txt drwxr-xr-x - xavierdupre xavierdupre 0 2014-11-23 22:03 python_info.txt drwxr-xr-x - xavierdupre xavierdupre 0 2014-11-23 22:07 python_info2.txt drwxr-xr-x - xavierdupre xavierdupre 0 2014-12-03 22:55 random drwxr-xr-x - xavierdupre xavierdupre 0 2014-11-20 23:43 unitest2 drwxr-xr-x - xavierdupre xavierdupre 0 2015-09-27 00:23 unittest drwxr-xr-x - xavierdupre xavierdupre 0 2015-09-27 00:22 unittest2 drwxr-xr-x - xavierdupre xavierdupre 0 2014-11-20 01:53 velib_1hjs drwxr-xr-x - xavierdupre xavierdupre 0 2014-11-21 01:17 velib_py drwxr-xr-x - xavierdupre xavierdupre 0 2014-11-23 21:34 velib_py_results drwxr-xr-x - xavierdupre xavierdupre 0 2014-11-23 21:51 velib_py_results_3days drwxr-xr-x - xavierdupre xavierdupre 0 2014-11-21 11:08 velib_several_days
%dfs_ls .
attributes | code | alias | folder | size | date | time | name | isdir | |
---|---|---|---|---|---|---|---|---|---|
0 | drwx------ | - | xavierdupre | xavierdupre | 0 | 2015-09-27 | 02:00 | .Trash | True |
1 | drwx------ | - | xavierdupre | xavierdupre | 0 | 2015-09-27 | 00:22 | .staging | True |
2 | -rw-r--r-- | 3 | xavierdupre | xavierdupre | 132727 | 2014-11-16 | 02:37 | ConfLongDemo_JSI.small.example.txt | False |
3 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2014-11-16 | 02:38 | ConfLongDemo_JSI.small.example2.walking.txt | True |
4 | -rw-r--r-- | 3 | xavierdupre | xavierdupre | 3400818 | 2015-07-14 | 23:35 | Skin_NonSkin.txt | False |
5 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2015-07-15 | 00:22 | diff_cluster | True |
6 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2015-07-14 | 23:44 | donnees_normalisees | True |
7 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2015-07-14 | 23:43 | ecartstypes | True |
8 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2015-07-14 | 23:49 | init_random | True |
9 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2015-07-14 | 23:41 | moyennes | True |
10 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2015-07-14 | 23:38 | nb_obervations | True |
11 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2015-07-15 | 00:05 | output_iter1 | True |
12 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2015-07-15 | 00:22 | output_iter10 | True |
13 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2015-07-15 | 00:07 | output_iter2 | True |
14 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2015-07-15 | 00:09 | output_iter3 | True |
15 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2015-07-15 | 00:11 | output_iter4 | True |
16 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2015-07-15 | 00:13 | output_iter5 | True |
17 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2015-07-15 | 00:15 | output_iter6 | True |
18 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2015-07-15 | 00:17 | output_iter7 | True |
19 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2015-07-15 | 00:18 | output_iter8 | True |
20 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2015-07-15 | 00:20 | output_iter9 | True |
21 | -rw-r--r-- | 3 | xavierdupre | xavierdupre | 461444 | 2014-11-20 | 01:33 | paris.2014-11-11_22-00-18.331391.txt | False |
22 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2014-11-23 | 22:03 | python_info.txt | True |
23 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2014-11-23 | 22:07 | python_info2.txt | True |
24 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2014-12-03 | 22:55 | random | True |
25 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2014-11-20 | 23:43 | unitest2 | True |
26 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2015-09-27 | 00:23 | unittest | True |
27 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2015-09-27 | 00:22 | unittest2 | True |
28 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2014-11-20 | 01:53 | velib_1hjs | True |
29 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2014-11-21 | 01:17 | velib_py | True |
30 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2014-11-23 | 21:34 | velib_py_results | True |
31 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2014-11-23 | 21:51 | velib_py_results_3days | True |
32 | drwxr-xr-x | - | xavierdupre | xavierdupre | 0 | 2014-11-21 | 11:08 | velib_several_days | True |
We upload the file on the bridge (we should zip it first, it would reduce the uploading time).
%remote_up ConfLongDemo_JSI.txt ConfLongDemo_JSI.txt
'ConfLongDemo_JSI.txt'
We check it got there:
%remote_cmd ls Conf*JSI.txt
ConfLongDemo_JSI.txt
We put it on the cluster:
%remote_cmd hdfs dfs -put ConfLongDemo_JSI.txt ConfLongDemo_JSI.txt
We check it was put on the cluster:
%remote_cmd hdfs dfs -ls Conf*JSI.txt
Found 1 items -rw-r--r-- 3 xavierdupre xavierdupre 21546346 2015-09-27 11:33 ConfLongDemo_JSI.txt
dfs_ls Conf*JSI.txt
attributes | code | alias | folder | size | date | time | name | isdir | |
---|---|---|---|---|---|---|---|---|---|
0 | -rw-r--r-- | 3 | xavierdupre | xavierdupre | 21546346 | 2015-09-27 | 11:33 | ConfLongDemo_JSI.txt | False |
We create a simple PIG program:
%%PIG filter_example.pig
myinput = LOAD 'ConfLongDemo_JSI.txt' USING PigStorage(',') AS
(index:long, sequence, tag, timestamp:long, dateformat, x:double,y:double, z:double, activity) ;
filt = FILTER myinput BY activity == 'walking' ;
STORE filt INTO 'ConfLongDemo_JSI.walking.txt' USING PigStorage() ;
%pig_submit filter_example.pig -r=filter_example.redirect
We check the redirected files were created:
%remote_cmd ls f*redirect*
filter_example.redirect.err filter_example.redirect.out
We check the tail on a regular basis to see the job running (some other
commands can be used to monitor jobs, %remote_cmd mapred --help
).
%remote_cmd tail filter_example.redirect.err
Spillable Memory Manager spill count : 0 Total bags proactively spilled: 0 Total records proactively spilled: 0 Job DAG: job_1435583503337_0055 2015-09-27 11:38:56,436 [main] WARN org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Encountered Warning ACCESSING_NON_EXISTENT_FIELD 164860 time(s). 2015-09-27 11:38:56,436 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
%remote_cmd hdfs dfs -ls Conf*JSI.walking.txt
Found 2 items -rw-r--r-- 3 xavierdupre xavierdupre 0 2015-09-27 11:38 ConfLongDemo_JSI.walking.txt/_SUCCESS -rw-r--r-- 3 xavierdupre xavierdupre 0 2015-09-27 11:38 ConfLongDemo_JSI.walking.txt/part-m-00000
%dfs_ls Conf*JSI.walking.txt
attributes | code | alias | folder | size | date | time | name | isdir | |
---|---|---|---|---|---|---|---|---|---|
0 | -rw-r--r-- | 3 | xavierdupre | xavierdupre | 0 | 2015-09-27 | 11:38 | ConfLongDemo_JSI.walking.txt/_SUCCESS | False |
1 | -rw-r--r-- | 3 | xavierdupre | xavierdupre | 0 | 2015-09-27 | 11:38 | ConfLongDemo_JSI.walking.txt/part-m-00000 | False |
After that, the stream has to downloaded to the bridge and then to the
local machine with %remote_down
. We finally close the connection.
%remote_close
True
END