PIG et Paramètres sur Azure - énoncé#
Links: notebook
, html, PDF
, python
, slides, GitHub
Manipulation de données JSON en Map/Reduce avec PIG sur HDInsight.
from jyquickhelper import add_notebook_menu
add_notebook_menu()
Paramètres#
Les sites web produisent des données en continu. On utilise fréquemment le même script pour traiter les donnéesd’un jour, du lendemain, de jour d’après… Tous les jours, on veut récupérer la fréquentation de la veille. La seule chose qui change est la date des données qu’on veut traiter. Plutôt que de recopier un script en entier pour changer une date qui apparaît parfois à plusieurs endroits, il est préférable d’écrire un script ou la date apparaît comme une variable.
Ce notebook va illustrer ce procédé sur la construction d’un histogramme. Le paramètre du script sera la largeur des barres de l’histogramme (ou bin en anglais).
Connexion au cluster#
On prend le cluster
Cloudera.
Il faut exécuter ce script pour pouvoir notifier au notebook que la
variable params
existe.
import pyensae
from pyquickhelper.ipythonhelper import open_html_form
params={"blob_storage":"", "password1":"", "hadoop_server":"", "password2":"", "username":"alias"}
open_html_form(params=params,title="server + hadoop + credentials", key_save="blobhp")
blob_storage
hadoop_server
password1
password2
username
import pyensae
%load_ext pyensae
%load_ext pyenbc
blobstorage = blobhp["blob_storage"]
blobpassword = blobhp["password1"]
hadoop_server = blobhp["hadoop_server"]
hadoop_password = blobhp["password2"]
username = blobhp["username"]
client, bs = %hd_open
client, bs
(<pyensae.remote.azure_connection.AzureClient at 0xabaea50>,
<azure.storage.blobservice.BlobService at 0xabaea90>)
Upload version#
On commence par simuler des données.
import random
with open("random.sample.txt", "w") as f :
for i in range(0,10000) :
x = random.random()
f.write(str(x)+"\n")
On uploade le fichier sur le cluster (il n’est pas besoin de créer le répertoire au préalable).
%blob_up random.sample.txt /$PSEUDO/random/random.sample.txt
'$PSEUDO/random/random.sample.txt'
%blob_ls /$PSEUDO/random
name | last_modified | content_type | content_length | blob_type | |
---|---|---|---|---|---|
0 | xavierdupre/random/random.sample.txt | Thu, 27 Nov 2014 23:21:26 GMT | application/octet-stream | 202619 | BlockBlob |
PIG et paramètres#
On indique un paramètre par le symbole : $bins
. La valeur du
paramètre est passé sous forme de chaîne de caractères au script et
remplacée telle quelle dans le script. Il en va de même des constantes
déclarées grâce au mot-clé
%declare.
La sortie du script inclut le paramètre : cela permet de retrouver comment ces données ont été générées.
%%PIG histogram.pig
values = LOAD '$CONTAINER/$PSEUDO/random/random.sample.txt' USING PigStorage('\t') AS (x:double);
values_h = FOREACH values GENERATE x, ((int)(x / $bins)) * $bins AS h ;
hist_group = GROUP values_h BY h ;
hist = FOREACH hist_group GENERATE group, COUNT(values_h) AS nb ;
STORE hist INTO '$CONTAINER/$PSEUDO/random/histo_$bins.txt' USING PigStorage('\t') ;
Pour supprimer les précédents résultats :
if client.exists(bs, client.account_name, "$PSEUDO/random/histo_0.1.txt"):
r = client.delete_folder (bs, client.account_name, "$PSEUDO/random/histo_0.1.txt")
print(r)
On exécute le job. Comme la commande magique supportant les paramètres
n’existe pas encore, il faut utiliser la variable client
et sa
méthode
pig_submit
qui fait la même chose. Elle upload le script puis le soumet.
jid = client.pig_submit(bs, client.account_name, "histogram.pig", params = dict(bins="0.1"), stop_on_failure=True )
{'id': 'job_1416874839254_0101'}
st = %hd_job_status jid["id"]
st["id"],st["percentComplete"],st["status"]["jobComplete"]
('job_1416874839254_0101', '100% complete', True)
%hd_tail_stderr jid["id"]
Job DAG: job_1416874839254_0102 2014-11-27 23:29:00,446 [main] WARN org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - No FileSystem for scheme: wasb. Not creating success file 2014-11-27 23:29:00,446 [main] INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnodehost/100.74.20.101:9010 2014-11-27 23:29:00,525 [main] INFO org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server 2014-11-27 23:29:03,900 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
On vérifie que tout s’est bien passé. La taille devrait être équivalent à l’entrée.
%blob_ls /$PSEUDO/random
name | last_modified | content_type | content_length | blob_type | |
---|---|---|---|---|---|
0 | xavierdupre/random/histo_0.1.txt | Thu, 27 Nov 2014 23:28:55 GMT | 0 | BlockBlob | |
1 | xavierdupre/random/histo_0.1.txt/_SUCCESS | Thu, 27 Nov 2014 23:28:55 GMT | application/octet-stream | 0 | BlockBlob |
2 | xavierdupre/random/histo_0.1.txt/part-r-00000 | Thu, 27 Nov 2014 23:28:54 GMT | application/octet-stream | 131 | BlockBlob |
3 | xavierdupre/random/random.sample.txt | Thu, 27 Nov 2014 23:21:26 GMT | application/octet-stream | 202619 | BlockBlob |
import os
if os.path.exists("histo.txt") : os.remove("histo.txt")
%blob_downmerge /$PSEUDO/random/histo_0.1.txt histo.txt
'histo.txt'
import matplotlib.pyplot as plt
plt.style.use('ggplot')
import pandas
df = pandas.read_csv("histo.txt", sep="\t",names=["bin","nb"])
df.plot(x="bin",y="nb",kind="bar")
<matplotlib.axes._subplots.AxesSubplot at 0x9b21d30>
Exercice 1 : min, max#
Ajouter deux paramètres pour construire l’histogramme entre deux valeurs
a
,b
.