HDInsight, PIG¶

Short examples on how to connect to a cluster from a notebook and submit a job (Azure + PIG).

In [1]:
from jyquickhelper import add_notebook_menu
add_notebook_menu()
Out[1]:
run previous cell, wait for 2 seconds

Download the data¶

In [2]:
url = "https://archive.ics.uci.edu/ml/machine-learning-databases/00222/"
file = "bank.zip"
import pyensae
data = pyensae.download_data(file, website=url)
In [3]:
import pandas
df = pandas.read_csv("bank-full.csv", sep=";")
In [4]:
df.head()
Out[4]:
age job marital education default balance housing loan contact day month duration campaign pdays previous poutcome y
0 58 management married tertiary no 2143 yes no unknown 5 may 261 1 -1 0 unknown no
1 44 technician single secondary no 29 yes no unknown 5 may 151 1 -1 0 unknown no
2 33 entrepreneur married secondary no 2 yes yes unknown 5 may 76 1 -1 0 unknown no
3 47 blue-collar married unknown no 1506 yes no unknown 5 may 92 1 -1 0 unknown no
4 33 unknown single unknown no 1 no no unknown 5 may 198 1 -1 0 unknown no
In [5]:
df.to_csv("bank_full_tab_no.txt", sep="\t", index=False, header=None)

Connect to the cluster¶

In [6]:
import pyensae
blobstorage = 
blobpassword = 
hadoop_server = 
hadoop_password = 
username = "centrale"
client, bs =  %hd_open
client, bs
Out[6]:
(<pyensae.remote.azure_connection.AzureClient at 0x1a349a00550>,
 <azure.storage.blob.blockblobservice.BlockBlobService at 0x1a349a314a8>)

Upload the data¶

In [7]:
%blob_up bank_full_tab_no.txt hdblobstorage/centrale2/bank_full_tab_no.txt
Out[7]:
'centrale2/bank_full_tab_no.txt'
In [8]:
%blob_ls hdblobstorage/centrale2
Out[8]:
name last_modified content_type content_length blob_type
0 centrale2/bank_full_tab_no.txt 2016-06-16 10:18:58+00:00 None 3751188 BlockBlob

Submit a PIG query¶

In [9]:
mapping = {'int64': 'double', 'float': 'double', 'object': 'chararray'}
schema = ["%s:%s" % (_[0], mapping.get(str(_[1]), _[1])) for _ in zip(df.columns, df.dtypes)]
schema = ", ".join(schema)
schema
Out[9]:
'age:double, job:chararray, marital:chararray, education:chararray, default:chararray, balance:double, housing:chararray, loan:chararray, contact:chararray, day:double, month:chararray, duration:double, campaign:double, pdays:double, previous:double, poutcome:chararray, y:chararray'

On ajoute l'instruction DESCRIBE.

In [10]:
%%PIG_azure aggage3.pig
values = LOAD '$CONTAINER/centrale/bank_full_tab_no.txt' USING PigStorage('\t') AS (age:double, 
                    job:chararray, marital:chararray, education:chararray, 
                   default:chararray, balance:double, housing:chararray, loan:chararray, 
                   contact:chararray, day:double, month:chararray, duration:double, 
                   campaign:double, pdays:double, previous:double, poutcome:chararray, y:chararray);
DESCRIBE values;
gr = GROUP values BY loan ;
DESCRIBE gr;
agg = FOREACH gr GENERATE group, AVG(age) AS avg_age ;
DESCRIBE agg;
STORE agg INTO '$CONTAINER/centrale/bank_full_tab_no_agg.txt' USING PigStorage('\t') ;
In [11]:
jid = %hd_pig_submit aggage3.pig
In [12]:
jid
Out[12]:
{'id': 'job_1466069083851_0005'}
In [13]:
%hd_queue
Out[13]:
[{'detail': None, 'id': 'job_1466069083851_0005'},
 {'detail': None, 'id': 'job_1466069083851_0004'},
 {'detail': None, 'id': 'job_1466069083851_0003'},
 {'detail': None, 'id': 'job_1466069083851_0002'},
 {'detail': None, 'id': 'job_1466069083851_0001'}]
In [14]:
df = %hd_job_status jid['id']
df["status"]["state"]
Out[14]:
'RUNNING'
In [15]:
%hd_tail_stderr -n 100 jid['id']
Out[15]:
16/06/16 21:05:43 INFO pig.ExecTypeProvider: Trying ExecType : LOCAL
16/06/16 21:05:43 INFO pig.ExecTypeProvider: Trying ExecType : MAPREDUCE
16/06/16 21:05:43 INFO pig.ExecTypeProvider: Picked MAPREDUCE as the ExecType
2016-06-16 21:05:43,576 [main] INFO  org.apache.pig.Main - Apache Pig version 0.15.0.2.3.3.1-21 (r: unknown) compiled May 04 2016, 20:06:44
2016-06-16 21:05:43,576 [main] INFO  org.apache.pig.Main - Logging error messages to: C:\apps\dist\hadoop-2.7.1.2.3.3.1-21\logs\pig_1466111143557.log
2016-06-16 21:05:45,088 [main] INFO  org.apache.pig.impl.util.Utils - Default bootup file D:\Users\hdp/.pigbootup not found
2016-06-16 21:05:45,498 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
2016-06-16 21:05:45,498 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
2016-06-16 21:05:45,498 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: wasb://clusterensaeazure1-3@hdblobstorage.blob.core.windows.net
2016-06-16 21:05:47,452 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
2016-06-16 21:05:49,057 [main] ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1025: 
 Invalid field projection. Projected field [age] does not exist in schema: group:chararray,values:bag{:tuple(age:double,job:chararray,marital:chararray,education:chararray,default:chararray,balance:double,housing:chararray,loan:chararray,contact:chararray,day:double,month:chararray,duration:double,campaign:double,pdays:double,previous:double,poutcome:chararray,y:chararray)}.
2016-06-16 21:05:49,057 [main] ERROR org.apache.pig.tools.grunt.Grunt - org.apache.pig.impl.logicalLayer.FrontendException: ERROR 1001: Unable to describe schema for alias agg
	at org.apache.pig.PigServer.dumpSchema(PigServer.java:823)
	at org.apache.pig.tools.grunt.GruntParser.processDescribe(GruntParser.java:321)
	at org.apache.pig.tools.pigscript.parser.PigScriptParser.parse(PigScriptParser.java:416)
	at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:230)
	at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:205)
	at org.apache.pig.tools.grunt.Grunt.exec(Grunt.java:81)
	at org.apache.pig.Main.run(Main.java:502)
	at org.apache.pig.Main.main(Main.java:177)
Caused by: org.apache.pig.impl.plan.PlanValidationException: ERROR 1025: 
 Invalid field projection. Projected field [age] does not exist in schema: group:chararray,values:bag{:tuple(age:double,job:chararray,marital:chararray,education:chararray,default:chararray,balance:double,housing:chararray,loan:chararray,contact:chararray,day:double,month:chararray,duration:double,campaign:double,pdays:double,previous:double,poutcome:chararray,y:chararray)}.
	at org.apache.pig.newplan.logical.expression.ProjectExpression.findColNum(ProjectExpression.java:191)
	at org.apache.pig.newplan.logical.expression.ProjectExpression.setColumnNumberFromAlias(ProjectExpression.java:174)
	at org.apache.pig.newplan.logical.visitor.ColumnAliasConversionVisitor$1.visit(ColumnAliasConversionVisitor.java:53)
	at org.apache.pig.newplan.logical.expression.ProjectExpression.accept(ProjectExpression.java:215)
	at org.apache.pig.newplan.DependencyOrderWalker.walk(DependencyOrderWalker.java:75)
	at org.apache.pig.newplan.PlanVisitor.visit(PlanVisitor.java:52)
	at org.apache.pig.newplan.logical.optimizer.AllExpressionVisitor.visit(AllExpressionVisitor.java:142)
	at org.apache.pig.newplan.logical.relational.LOInnerLoad.accept(LOInnerLoad.java:128)
	at org.apache.pig.newplan.DependencyOrderWalker.walk(DependencyOrderWalker.java:75)
	at org.apache.pig.newplan.logical.optimizer.AllExpressionVisitor.visit(AllExpressionVisitor.java:124)
	at org.apache.pig.newplan.logical.relational.LOForEach.accept(LOForEach.java:87)
	at org.apache.pig.newplan.DependencyOrderWalker.walk(DependencyOrderWalker.java:75)
	at org.apache.pig.newplan.PlanVisitor.visit(PlanVisitor.java:52)
	at org.apache.pig.newplan.logical.relational.LogicalPlan.validate(LogicalPlan.java:175)
	at org.apache.pig.PigServer$Graph.compile(PigServer.java:1767)
	at org.apache.pig.PigServer$Graph.access$300(PigServer.java:1443)
	at org.apache.pig.PigServer.buildStorePlan(PigServer.java:1339)
	at org.apache.pig.PigServer.getOperatorForAlias(PigServer.java:1418)
	at org.apache.pig.PigServer.dumpSchema(PigServer.java:806)
	... 7 more

Details also at logfile: C:\apps\dist\hadoop-2.7.1.2.3.3.1-21\logs\pig_1466111143557.log
2016-06-16 21:05:49,119 [main] INFO  org.apache.pig.Main - Pig script completed in 5 seconds and 954 milliseconds (5954 ms)


OUT:
values: {age: double,job: chararray,marital: chararray,education: chararray,default: chararray,balance: double,housing: chararray,loan: chararray,contact: chararray,day: double,month: chararray,duration: double,campaign: double,pdays: double,previous: double,poutcome: chararray,y: chararray}
gr: {group: chararray,values: {(age: double,job: chararray,marital: chararray,education: chararray,default: chararray,balance: double,housing: chararray,loan: chararray,contact: chararray,day: double,month: chararray,duration: double,campaign: double,pdays: double,previous: double,poutcome: chararray,y: chararray)}}

In [16]:
%%PIG_azure aggage4.pig
values = LOAD '$CONTAINER/centrale/bank_full_tab_no.txt' USING PigStorage('\t') AS (age:double, 
                                                    job:chararray, marital:chararray, education:chararray, 
                                                   default:chararray, balance:double, housing:chararray, loan:chararray, 
                                                   contact:chararray, day:double, month:chararray, duration:double, 
                                                   campaign:double, 
                                                   pdays:double, previous:double, poutcome:chararray, y:chararray);
DESCRIBE values;
gr = GROUP values BY loan ;
DESCRIBE gr;
agg = FOREACH gr GENERATE group, AVG(values.age) AS avg_age ;
DESCRIBE agg;
STORE agg INTO '$CONTAINER/centrale/bank_full_tab_no_agg2.txt' USING PigStorage('\t') ;
In [17]:
jid = %hd_pig_submit aggage4.pig
In [18]:
jid
Out[18]:
{'id': 'job_1466069083851_0008'}
In [19]:
%hd_queue
Out[19]:
[{'detail': None, 'id': 'job_1466069083851_0009'},
 {'detail': None, 'id': 'job_1466069083851_0008'},
 {'detail': None, 'id': 'job_1466069083851_0007'},
 {'detail': None, 'id': 'job_1466069083851_0006'},
 {'detail': None, 'id': 'job_1466069083851_0005'},
 {'detail': None, 'id': 'job_1466069083851_0004'},
 {'detail': None, 'id': 'job_1466069083851_0003'},
 {'detail': None, 'id': 'job_1466069083851_0002'},
 {'detail': None, 'id': 'job_1466069083851_0001'}]
In [20]:
df = %hd_job_status jid['id']
df["status"]["state"]
Out[20]:
'RUNNING'
In [21]:
hd_tail_stderr -n 50 jid['id']
Out[21]:
2016-06-16 21:13:19,066 [main] INFO  org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
2016-06-16 21:13:19,410 [main] INFO  org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service address: http://headnodehost:8188/ws/v1/timeline/
2016-06-16 21:13:19,410 [main] INFO  org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnodehost/100.106.128.41:9010
2016-06-16 21:13:19,410 [main] INFO  org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at headnodehost/100.106.128.41:10200
2016-06-16 21:13:19,504 [main] INFO  org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
2016-06-16 21:13:19,629 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete
2016-06-16 21:13:19,629 [main] INFO  org.apache.pig.tools.pigstats.mapreduce.SimplePigStats - Script Statistics: 

HadoopVersion	PigVersion	UserId	StartedAt	FinishedAt	Features
2.7.1.2.3.3.1-21	0.15.0.2.3.3.1-21	hdp	2016-06-16 21:12:27	2016-06-16 21:13:19	GROUP_BY

Success!

Job Stats (time in seconds):
JobId	Maps	Reduces	MaxMapTime	MinMapTime	AvgMapTime	MedianMapTime	MaxReduceTime	MinReduceTime	AvgReduceTime	MedianReducetime	Alias	Feature	Outputs
job_1466069083851_0009	1	1	12	12	12	12	9	9	9	9	agg,gr,values	GROUP_BY,COMBINER	wasb://hdblobstorage@hdblobstorage.blob.core.windows.net//centrale/bank_full_tab_no_agg2.txt,

Input(s):
Successfully read 45212 records from: "wasb://hdblobstorage@hdblobstorage.blob.core.windows.net//centrale/bank_full_tab_no.txt"

Output(s):
Successfully stored 3 records in: "wasb://hdblobstorage@hdblobstorage.blob.core.windows.net//centrale/bank_full_tab_no_agg2.txt"

Counters:
Total records written : 3
Total bytes written : 0
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0

Job DAG:
job_1466069083851_0009


2016-06-16 21:13:19,848 [main] INFO  org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service address: http://headnodehost:8188/ws/v1/timeline/
2016-06-16 21:13:19,848 [main] INFO  org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnodehost/100.106.128.41:9010
2016-06-16 21:13:19,848 [main] INFO  org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at headnodehost/100.106.128.41:10200
2016-06-16 21:13:19,926 [main] INFO  org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
2016-06-16 21:13:20,160 [main] INFO  org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service address: http://headnodehost:8188/ws/v1/timeline/
2016-06-16 21:13:20,160 [main] INFO  org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnodehost/100.106.128.41:9010
2016-06-16 21:13:20,160 [main] INFO  org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at headnodehost/100.106.128.41:10200
2016-06-16 21:13:20,238 [main] INFO  org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
2016-06-16 21:13:20,506 [main] INFO  org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service address: http://headnodehost:8188/ws/v1/timeline/
2016-06-16 21:13:20,506 [main] INFO  org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnodehost/100.106.128.41:9010
2016-06-16 21:13:20,506 [main] INFO  org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at headnodehost/100.106.128.41:10200
2016-06-16 21:13:20,582 [main] INFO  org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
2016-06-16 21:13:20,646 [main] WARN  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Encountered Warning FIELD_DISCARDED_TYPE_CONVERSION_FAILED 7 time(s).
2016-06-16 21:13:20,646 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
2016-06-16 21:13:20,725 [main] INFO  org.apache.pig.Main - Pig script completed in 1 minute, 2 seconds and 364 milliseconds (62364 ms)


OUT:
values: {age: double,job: chararray,marital: chararray,education: chararray,default: chararray,balance: double,housing: chararray,loan: chararray,contact: chararray,day: double,month: chararray,duration: double,campaign: double,pdays: double,previous: double,poutcome: chararray,y: chararray}
gr: {group: chararray,values: {(age: double,job: chararray,marital: chararray,education: chararray,default: chararray,balance: double,housing: chararray,loan: chararray,contact: chararray,day: double,month: chararray,duration: double,campaign: double,pdays: double,previous: double,poutcome: chararray,y: chararray)}}
agg: {group: chararray,avg_age: double}

In [22]:
%blob_ls /centrale
Out[22]:
name last_modified content_type content_length blob_type
0 centrale/bank_full.csv 2016-06-15 22:17:59+00:00 None 4610348 BlockBlob
1 centrale/bank_full_tab.txt 2016-06-15 22:19:46+00:00 None 3751306 BlockBlob
2 centrale/bank_full_tab_no.txt 2016-06-15 23:00:52+00:00 None 3751306 BlockBlob
3 centrale/bank_full_tab_no_agg.txt 2016-06-16 10:32:11+00:00 None 0 BlockBlob
4 centrale/bank_full_tab_no_agg.txt/_SUCCESS 2016-06-16 10:32:11+00:00 None 0 BlockBlob
5 centrale/bank_full_tab_no_agg.txt/part-r-00000 2016-06-16 10:32:11+00:00 None 49 BlockBlob
6 centrale/bank_full_tab_no_agg2.txt 2016-06-16 21:13:14+00:00 None 0 BlockBlob
7 centrale/bank_full_tab_no_agg2.txt/_SUCCESS 2016-06-16 21:13:14+00:00 None 0 BlockBlob
8 centrale/bank_full_tab_no_agg2.txt/part-r-00000 2016-06-16 21:13:13+00:00 None 49 BlockBlob
9 centrale/scripts/pig/aggage.pig 2016-06-15 23:15:54+00:00 None 782 BlockBlob
10 centrale/scripts/pig/aggage.pig.log 2016-06-15 23:16:40+00:00 None 0 BlockBlob
11 centrale/scripts/pig/aggage.pig.log/exit 2016-06-15 23:16:40+00:00 None 3 BlockBlob
12 centrale/scripts/pig/aggage.pig.log/stderr 2016-06-15 23:16:30+00:00 None 4060 BlockBlob
13 centrale/scripts/pig/aggage.pig.log/stdout 2016-06-15 23:16:30+00:00 None 0 BlockBlob
14 centrale/scripts/pig/aggage2.pig 2016-06-16 10:28:16+00:00 None 853 BlockBlob
15 centrale/scripts/pig/aggage2.pig.log 2016-06-16 10:29:04+00:00 None 0 BlockBlob
16 centrale/scripts/pig/aggage2.pig.log/exit 2016-06-16 10:29:04+00:00 None 3 BlockBlob
17 centrale/scripts/pig/aggage2.pig.log/stderr 2016-06-16 10:28:54+00:00 None 4883 BlockBlob
18 centrale/scripts/pig/aggage2.pig.log/stdout 2016-06-16 10:28:54+00:00 None 613 BlockBlob
19 centrale/scripts/pig/aggage3.pig 2016-06-16 21:05:11+00:00 None 853 BlockBlob
20 centrale/scripts/pig/aggage3.pig.log 2016-06-16 21:05:59+00:00 None 0 BlockBlob
21 centrale/scripts/pig/aggage3.pig.log/exit 2016-06-16 21:05:59+00:00 None 3 BlockBlob
22 centrale/scripts/pig/aggage3.pig.log/stderr 2016-06-16 21:05:49+00:00 None 4883 BlockBlob
23 centrale/scripts/pig/aggage3.pig.log/stdout 2016-06-16 21:05:49+00:00 None 613 BlockBlob
24 centrale/scripts/pig/aggage4.pig 2016-06-16 21:11:47+00:00 None 861 BlockBlob
25 centrale/scripts/pig/aggage4.pig.log 2016-06-16 21:13:31+00:00 None 0 BlockBlob
26 centrale/scripts/pig/aggage4.pig.log/exit 2016-06-16 21:13:31+00:00 None 3 BlockBlob
27 centrale/scripts/pig/aggage4.pig.log/stderr 2016-06-16 21:13:21+00:00 None 16643 BlockBlob
28 centrale/scripts/pig/aggage4.pig.log/stdout 2016-06-16 21:13:21+00:00 None 654 BlockBlob
29 centrale2/bank_full_tab_no.txt 2016-06-16 10:18:58+00:00 None 3751188 BlockBlob
In [23]:
%blob_downmerge --help
usage: blob_downmerge [-h] [-o] remotepath localfile

download a set of files from a blob storage folder, files will be merged, we
assume the container is the first element to the remote path

positional arguments:
  remotepath       remote path of the folder to download
  localfile        local name for the downloaded merged file

optional arguments:
  -h, --help       show this help message and exit
  -o, --overwrite  overwrite the local file
usage: blob_downmerge [-h] [-o] remotepath localfile

In [24]:
%blob_down /centrale/bank_full_tab_no_agg2.txt/part-r-00000 agg_hadoop3.txt
Out[24]:
'agg_hadoop3.txt'
In [25]:
import pandas
df = pandas.read_csv("agg_hadoop3.txt", sep="\t", header=-1)
df
Out[25]:
0 1
0 no 41.008823
1 yes 40.555632
2 loan NaN

J'ai oublié d'enlever le header. On vérifie que les calcus sont bons en les faisant en local.

In [26]:
df = pandas.read_csv("bank-full.csv", sep=";")
df.head()
Out[26]:
age job marital education default balance housing loan contact day month duration campaign pdays previous poutcome y
0 58 management married tertiary no 2143 yes no unknown 5 may 261 1 -1 0 unknown no
1 44 technician single secondary no 29 yes no unknown 5 may 151 1 -1 0 unknown no
2 33 entrepreneur married secondary no 2 yes yes unknown 5 may 76 1 -1 0 unknown no
3 47 blue-collar married unknown no 1506 yes no unknown 5 may 92 1 -1 0 unknown no
4 33 unknown single unknown no 1 no no unknown 5 may 198 1 -1 0 unknown no
In [27]:
df[["loan", "age"]].groupby("loan").mean()
Out[27]:
age
loan
no 41.008823
yes 40.555632
In [28]: