Reservoir Sampling distribué - énoncé - correction

Links: notebook, html ., PDF, python, slides ., GitHub

from jyquickhelper import add_notebook_menu
add_notebook_menu()
Plan
run previous cell, wait for 2 seconds

création d’un fichier à sampler

with open("sample4.txt", "w", encoding="utf8") as f:
    for i in range(0,100000):
        f.write("{0}\t{1}{0}\n".format(i, chr(i%26 + 65)))
    f.write("100001\tAAAAAA")
%load_ext pyensae
%head sample4.txt
0   A0
1   B1
2   C2
3   D3
4   E4
5   F5
6   G6
7   H7
8   I8
9   J9

connexion

import os
blobhp = {}
if "HDCREDENTIALS" in os.environ:
    blobhp["blob_storage"], blobhp["password1"], blobhp["hadoop_server"], blobhp["password2"], blobhp["username"] = \
        os.environ["HDCREDENTIALS"].split("**")
    r = type(blobhp)
else:
    from pyquickhelper.ipythonhelper import open_html_form
    params={"blob_storage":"", "password1":"", "hadoop_server":"", "password2":"", "username":"axavier"}
    r = open_html_form(params=params,title="server + hadoop + credentials", key_save="blobhp")
r
dict
import pyensae
%load_ext pyensae
blobstorage = blobhp["blob_storage"]
blobpassword = blobhp["password1"]
hadoop_server = blobhp["hadoop_server"]
hadoop_password = blobhp["password2"]
username = blobhp["username"]
client, bs = %hd_open
client, bs
(<pyensae.remote.azure_connection.AzureClient at 0x942e860>,
 <azure.storage.blob.blobservice.BlobService at 0x942e898>)

upload du fichier

%blob_up sample3.txt /$PSEUDO/sampling/sample4.txt
'$PSEUDO/sampling/sample4.txt'
%blob_ls /$PSEUDO/sampling
name last_modified content_type content_length blob_type
0 axavier/sampling/datafu-1.2.0.jar Fri, 13 Nov 2015 00:03:49 GMT application/octet-stream 1600826 BlockBlob
1 axavier/sampling/out_sampled_rs4_2015.txt Fri, 13 Nov 2015 01:08:22 GMT 0 BlockBlob
2 axavier/sampling/out_sampled_rs4_2015.txt/_SUC... Fri, 13 Nov 2015 01:08:22 GMT application/octet-stream 0 BlockBlob
3 axavier/sampling/out_sampled_rs4_2015.txt/part... Fri, 13 Nov 2015 01:08:21 GMT application/octet-stream 12785 BlockBlob
4 axavier/sampling/sample.txt Fri, 13 Nov 2015 00:02:50 GMT application/octet-stream 1377780 BlockBlob
5 axavier/sampling/sample2.txt Fri, 13 Nov 2015 00:35:55 GMT application/octet-stream 1377793 BlockBlob
6 axavier/sampling/sample3.txt Fri, 13 Nov 2015 00:39:40 GMT application/octet-stream 1377793 BlockBlob
7 axavier/sampling/sample4.txt Sun, 15 Nov 2015 12:24:22 GMT application/octet-stream 1377793 BlockBlob
8 axavier/sampling/sample4_hash.txt Fri, 13 Nov 2015 14:50:39 GMT 0 BlockBlob
9 axavier/sampling/sample4_hash.txt/_SUCCESS Fri, 13 Nov 2015 14:50:39 GMT application/octet-stream 0 BlockBlob
10 axavier/sampling/sample4_hash.txt/part-r-00000 Fri, 13 Nov 2015 14:50:38 GMT application/octet-stream 4771358 BlockBlob
11 axavier/sampling/sampled4_2015.txt Fri, 13 Nov 2015 00:50:20 GMT 0 BlockBlob
12 axavier/sampling/sampled4_2015.txt/_SUCCESS Fri, 13 Nov 2015 00:50:20 GMT application/octet-stream 0 BlockBlob
13 axavier/sampling/sampled4_2015.txt/part-m-00000 Fri, 13 Nov 2015 00:50:19 GMT application/octet-stream 1277794 BlockBlob
14 axavier/sampling/sampled_rs4_2015.txt Fri, 13 Nov 2015 01:04:51 GMT 0 BlockBlob
15 axavier/sampling/sampled_rs4_2015.txt/_SUCCESS Fri, 13 Nov 2015 01:04:51 GMT application/octet-stream 0 BlockBlob
16 axavier/sampling/sampled_rs4_2015.txt/part-m-0... Fri, 13 Nov 2015 01:04:50 GMT application/octet-stream 1277794 BlockBlob
17 axavier/sampling/sampled_srs4_2015.txt Fri, 13 Nov 2015 00:56:09 GMT 0 BlockBlob
18 axavier/sampling/sampled_srs4_2015.txt/_SUCCESS Fri, 13 Nov 2015 00:56:09 GMT application/octet-stream 0 BlockBlob
19 axavier/sampling/sampled_srs4_2015.txt/part-m-... Fri, 13 Nov 2015 00:56:09 GMT application/octet-stream 1277794 BlockBlob
20 axavier/sampling/sampled_srs_2015.txt Fri, 13 Nov 2015 00:52:34 GMT 0 BlockBlob
21 axavier/sampling/sampled_srs_2015.txt/_SUCCESS Fri, 13 Nov 2015 00:52:34 GMT application/octet-stream 0 BlockBlob
22 axavier/sampling/sampled_srs_2015.txt/part-m-0... Fri, 13 Nov 2015 00:52:34 GMT application/octet-stream 1277794 BlockBlob

Code python pour le reservoir sampling

ensemble = [ "%d%s" % (i, chr(i%26 + 97)) for i in range(0,10000)]
ensemble[:5]
['0a', '1b', '2c', '3d', '4e']
import random
def reservoir_sampling(ensemble, k):
    N = len(ensemble)
    echantillon = []
    for i, e in enumerate(ensemble):
        if len(echantillon) < k:
            echantillon.append(e)
        else:
            j = random.randint(0, i)
            if j < k:
                echantillon[j] = e
    return echantillon

reservoir_sampling(ensemble, 10)
['8681x',
 '8356k',
 '5490e',
 '4405l',
 '5890o',
 '2689l',
 '8672o',
 '3603p',
 '8599t',
 '6086c']

python à jython

On s’assure que le code précédent fonctionne en jython (python compilé en java). On s’inspire pour cela de la documentation jython-udfs.

On créé d’abord un script PIG pour récupérer le schema et les premières lignes

%%PIG sample_explore.pig

ensemble = LOAD '$CONTAINER/$PSEUDO/sampling/sample4.txt'
        USING PigStorage('\t') AS (x:int, v:chararray) ;
DESCRIBE ensemble;
ens_group = GROUP ensemble ALL;
DESCRIBE ens_group;
sampled = FOREACH ens_group GENERATE FLATTEN(ensemble);
DESCRIBE sampled;

--ens10 = LIMIT ensemble 10;
--ens_group10 = LIMIT en_group10 ;
--DUMP ens10;
--DUMP ens_group10;

Si la fonction suivante provoque une erreur

::
AzureException: STATUS: 403, JSON: Expecting value: line 1 column 1 (char 0) <Response [403]> unable to submit job: sample_explore.pig

Vérifier les identifiants utilisés pour se connecter.

jid = %hd_pig_submit sample_explore.pig
jid
{'id': 'job_1446540516812_0185'}
st = %hd_job_status jid["id"]
(st["id"],st["percentComplete"],st["completed"],
st["status"]["jobComplete"],st["status"]["state"])
('job_1446540516812_0185', None, None, False, 'RUNNING')

La sortie standard contient les informations souhaitées :

%hd_tail_stderr jid["id"] -n 5
2015-11-15 12:33:06,608 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
2015-11-15 12:33:06,608 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: wasb://clusterensaeazure1-1@hdblobstorage.blob.core.windows.net
2015-11-15 12:33:08,233 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
2015-11-15 12:33:09,374 [main] INFO  org.apache.pig.Main - Pig script completed in 4 seconds and 578 milliseconds (4578 ms)


OUT:
ensemble: {x: int,v: chararray}
ens_group: {group: chararray,ensemble: {(x: int,v: chararray)}}
sampled: {ensemble::x: int,ensemble::v: chararray}

Et la sortie du second dump

::
(all,{(100001,AAAAAA),(99999,D99999),(99998,C99998)…

Le code Jython

import pyensae
%%PYTHON reservoir_sampling.py

import random

@outputSchemaFunction("rsSchema")
def reservoir_sampling(ensemble):
    ensemble = eval(ensemble)
    k = 10
    N = len(ensemble)
    echantillon = []
    for i, e in enumerate(ensemble):
        if len(echantillon) < k:
            echantillon.append(e)
        else:
            j = random.randint(0, i)
            if j < k:
                echantillon[j] = e
    return echantillon

@schemaFunction("rsSchema")
def rsSchema(input):
    return input
%%jython reservoir_sampling.py reservoir_sampling
{(100001,"AAAAAA"),(99999,"D99999"),(99998,"C99998")}
[(99998, 'C99998'), (99999, 'D99999'), (100001, 'AAAAAA')]

On ajoute le code jython au script précédent :

%%PIG sample_explore_complete.pig

REGISTER '$CONTAINER/$SCRIPTPIG/reservoir_sampling.py' using jython as myrs;

ensemble = LOAD '$CONTAINER/$PSEUDO/sampling/sample4.txt'
        USING PigStorage('\t') AS (x:int, v:chararray) ;
DESCRIBE ensemble;
ens_group = GROUP ensemble ALL;
DESCRIBE ens_group;
sampled = FOREACH ens_group GENERATE FLATTEN(myrs.reservoir_sample(ensemble));
DESCRIBE sampled;

STORE sampled INTO
INTO '$CONTAINER/$PSEUDO/sampling/sample_rs.txt' USING PigStorage();
jid = %hd_pig_submit sample_explore_complete.pig -d reservoir_sampling.py
jid
{'id': 'job_1446540516812_0229'}
st = %hd_job_status jid["id"]
(st["id"],st["percentComplete"],st["completed"],
st["status"]["jobComplete"],st["status"]["state"])
('job_1446540516812_0229', None, 'done', False, 'RUNNING')
%hd_tail_stderr jid["id"] -n 100
15/11/15 18:43:49 INFO pig.ExecTypeProvider: Trying ExecType : LOCAL
15/11/15 18:43:49 INFO pig.ExecTypeProvider: Trying ExecType : MAPREDUCE
15/11/15 18:43:49 INFO pig.ExecTypeProvider: Picked MAPREDUCE as the ExecType
2015-11-15 18:43:49,598 [main] INFO  org.apache.pig.Main - Apache Pig version 0.14.0.2.2.7.1-33 (r: unknown) compiled Oct 13 2015, 04:18:06
2015-11-15 18:43:49,598 [main] INFO  org.apache.pig.Main - Logging error messages to: C:\apps\dist\hadoop-2.6.0.2.2.7.1-33\logs\pig_1447613029598.log
2015-11-15 18:43:50,848 [main] INFO  org.apache.pig.impl.util.Utils - Default bootup file D:\Users\hdp/.pigbootup not found
2015-11-15 18:43:51,145 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
2015-11-15 18:43:51,145 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
2015-11-15 18:43:51,145 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: wasb://clusterensaeazure1-1@hdblobstorage.blob.core.windows.net
2015-11-15 18:43:51,879 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
2015-11-15 18:43:52,192 [main] INFO  org.apache.pig.scripting.jython.JythonScriptEngine - created tmp python.cachedir=D:\Users\hdp\AppData\Local\Temp\pig_jython_3357684506669481882
2015-11-15 18:43:54,817 [main] WARN  org.apache.pig.scripting.jython.JythonScriptEngine - pig.cmd.args.remainders is empty. This is not expected unless on testing.
2015-11-15 18:43:57,645 [main] INFO  org.apache.pig.scripting.jython.JythonScriptEngine - Register scripting UDF: myrs.reservoir_sampling
2015-11-15 18:43:58,535 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
2015-11-15 18:43:59,660 [main] ERROR org.apache.pig.PigServer - exception during parsing: Error during parsing. Could not resolve myrs.reservoir_sample using imports: [, java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.]
Failed to parse: Pig script failed to parse:
 Failed to generate logical plan. Nested exception: org.apache.pig.backend.executionengine.ExecException: ERROR 1070: Could not resolve myrs.reservoir_sample using imports: [, java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.]
    at org.apache.pig.parser.QueryParserDriver.parse(QueryParserDriver.java:199)
    at org.apache.pig.PigServer$Graph.parseQuery(PigServer.java:1735)
    at org.apache.pig.PigServer$Graph.access$000(PigServer.java:1443)
    at org.apache.pig.PigServer.parseAndBuild(PigServer.java:387)
    at org.apache.pig.tools.grunt.GruntParser.processDescribe(GruntParser.java:300)
    at org.apache.pig.tools.pigscript.parser.PigScriptParser.parse(PigScriptParser.java:412)
    at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:230)
    at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:205)
    at org.apache.pig.tools.grunt.Grunt.exec(Grunt.java:81)
    at org.apache.pig.Main.run(Main.java:495)
    at org.apache.pig.Main.main(Main.java:170)
Caused by:
 Failed to generate logical plan. Nested exception: org.apache.pig.backend.executionengine.ExecException: ERROR 1070: Could not resolve myrs.reservoir_sample using imports: [, java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.]
    at org.apache.pig.parser.LogicalPlanBuilder.buildUDF(LogicalPlanBuilder.java:1572)
    at org.apache.pig.parser.LogicalPlanGenerator.func_eval(LogicalPlanGenerator.java:9372)
    at org.apache.pig.parser.LogicalPlanGenerator.projectable_expr(LogicalPlanGenerator.java:11051)
    at org.apache.pig.parser.LogicalPlanGenerator.var_expr(LogicalPlanGenerator.java:10810)
    at org.apache.pig.parser.LogicalPlanGenerator.expr(LogicalPlanGenerator.java:10159)
    at org.apache.pig.parser.LogicalPlanGenerator.flatten_clause(LogicalPlanGenerator.java:7629)
    at org.apache.pig.parser.LogicalPlanGenerator.flatten_generated_item(LogicalPlanGenerator.java:7452)
    at org.apache.pig.parser.LogicalPlanGenerator.generate_clause(LogicalPlanGenerator.java:17590)
    at org.apache.pig.parser.LogicalPlanGenerator.foreach_plan(LogicalPlanGenerator.java:15982)
    at org.apache.pig.parser.LogicalPlanGenerator.foreach_clause(LogicalPlanGenerator.java:15849)
    at org.apache.pig.parser.LogicalPlanGenerator.op_clause(LogicalPlanGenerator.java:1933)
    at org.apache.pig.parser.LogicalPlanGenerator.general_statement(LogicalPlanGenerator.java:1102)
    at org.apache.pig.parser.LogicalPlanGenerator.statement(LogicalPlanGenerator.java:560)
    at org.apache.pig.parser.LogicalPlanGenerator.query(LogicalPlanGenerator.java:421)
    at org.apache.pig.parser.QueryParserDriver.parse(QueryParserDriver.java:191)
    ... 10 more
Caused by: org.apache.pig.backend.executionengine.ExecException: ERROR 1070: Could not resolve myrs.reservoir_sample using imports: [, java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.]
    at org.apache.pig.impl.PigContext.resolveClassName(PigContext.java:677)
    at org.apache.pig.impl.PigContext.getClassForAlias(PigContext.java:793)
    at org.apache.pig.parser.LogicalPlanBuilder.buildUDF(LogicalPlanBuilder.java:1569)
    ... 24 more
2015-11-15 18:43:59,707 [main] ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1070: Could not resolve myrs.reservoir_sample using imports: [, java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.]
2015-11-15 18:43:59,707 [main] ERROR org.apache.pig.tools.grunt.Grunt - org.apache.pig.impl.logicalLayer.FrontendException: ERROR 1000: Error during parsing. Could not resolve myrs.reservoir_sample using imports: [, java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.]
    at org.apache.pig.PigServer$Graph.parseQuery(PigServer.java:1748)
    at org.apache.pig.PigServer$Graph.access$000(PigServer.java:1443)
    at org.apache.pig.PigServer.parseAndBuild(PigServer.java:387)
    at org.apache.pig.tools.grunt.GruntParser.processDescribe(GruntParser.java:300)
    at org.apache.pig.tools.pigscript.parser.PigScriptParser.parse(PigScriptParser.java:412)
    at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:230)
    at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:205)
    at org.apache.pig.tools.grunt.Grunt.exec(Grunt.java:81)
    at org.apache.pig.Main.run(Main.java:495)
    at org.apache.pig.Main.main(Main.java:170)
Caused by: Failed to parse: Pig script failed to parse:
 Failed to generate logical plan. Nested exception: org.apache.pig.backend.executionengine.ExecException: ERROR 1070: Could not resolve myrs.reservoir_sample using imports: [, java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.]
    at org.apache.pig.parser.QueryParserDriver.parse(QueryParserDriver.java:199)
    at org.apache.pig.PigServer$Graph.parseQuery(PigServer.java:1735)
    ... 9 more
Caused by:
 Failed to generate logical plan. Nested exception: org.apache.pig.backend.executionengine.ExecException: ERROR 1070: Could not resolve myrs.reservoir_sample using imports: [, java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.]
    at org.apache.pig.parser.LogicalPlanBuilder.buildUDF(LogicalPlanBuilder.java:1572)
    at org.apache.pig.parser.LogicalPlanGenerator.func_eval(LogicalPlanGenerator.java:9372)
    at org.apache.pig.parser.LogicalPlanGenerator.projectable_expr(LogicalPlanGenerator.java:11051)
    at org.apache.pig.parser.LogicalPlanGenerator.var_expr(LogicalPlanGenerator.java:10810)
    at org.apache.pig.parser.LogicalPlanGenerator.expr(LogicalPlanGenerator.java:10159)
    at org.apache.pig.parser.LogicalPlanGenerator.flatten_clause(LogicalPlanGenerator.java:7629)
    at org.apache.pig.parser.LogicalPlanGenerator.flatten_generated_item(LogicalPlanGenerator.java:7452)
    at org.apache.pig.parser.LogicalPlanGenerator.generate_clause(LogicalPlanGenerator.java:17590)
    at org.apache.pig.parser.LogicalPlanGenerator.foreach_plan(LogicalPlanGenerator.java:15982)
    at org.apache.pig.parser.LogicalPlanGenerator.foreach_clause(LogicalPlanGenerator.java:15849)
    at org.apache.pig.parser.LogicalPlanGenerator.op_clause(LogicalPlanGenerator.java:1933)
    at org.apache.pig.parser.LogicalPlanGenerator.general_statement(LogicalPlanGenerator.java:1102)
    at org.apache.pig.parser.LogicalPlanGenerator.statement(LogicalPlanGenerator.java:560)
    at org.apache.pig.parser.LogicalPlanGenerator.query(LogicalPlanGenerator.java:421)
    at org.apache.pig.parser.QueryParserDriver.parse(QueryParserDriver.java:191)
    ... 10 more
Caused by: org.apache.pig.backend.executionengine.ExecException: ERROR 1070: Could not resolve myrs.reservoir_sample using imports: [, java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.]
    at org.apache.pig.impl.PigContext.resolveClassName(PigContext.java:677)
    at org.apache.pig.impl.PigContext.getClassForAlias(PigContext.java:793)
    at org.apache.pig.parser.LogicalPlanBuilder.buildUDF(LogicalPlanBuilder.java:1569)
    ... 24 more

Details also at logfile: C:\apps\dist\hadoop-2.6.0.2.2.7.1-33\logs\pig_1447613029598.log
2015-11-15 18:43:59,754 [main] INFO  org.apache.pig.Main - Pig script completed in 10 seconds and 453 milliseconds (10453 ms)


OUT:
ensemble: {x: int,v: chararray}
ens_group: {group: chararray,ensemble: {(x: int,v: chararray)}}

A corriger plus tard. Dans l’immédiat, on utilisera la librairie datafu. Si le cluster ne reconnaît pas la librairie, voir la section java pour comprendre comment l’importer. On la déclare dans le script par l’instruction REGISTER.

%%PIG sample_explore_datafu.pig

REGISTER '$CONTAINER/$PSEUDO/sampling/datafu-1.2.0.jar';
DEFINE RS datafu.pig.sampling.ReservoirSample('1000');

ensemble = LOAD '$CONTAINER/$PSEUDO/sampling/sample4.txt'
        USING PigStorage('\t') AS (x:int, v:chararray) ;
DESCRIBE ensemble;
ens_group = GROUP ensemble ALL;
DESCRIBE ens_group;
sampled = FOREACH ens_group GENERATE FLATTEN(RS(ensemble));
DESCRIBE sampled;

STORE sampled
INTO '$CONTAINER/$PSEUDO/sampling/sample_datafu_rs.txt' USING PigStorage();
jid = %hd_pig_submit sample_explore_datafu.pig
jid
{'id': 'job_1446540516812_0193'}
st = %hd_job_status jid["id"]
(st["id"],st["percentComplete"],st["completed"],
st["status"]["jobComplete"],st["status"]["state"])
('job_1446540516812_0193', '50% complete', None, False, 'RUNNING')
%hd_tail_stderr jid["id"] -n 100


%blob_ls /$PSEUDO/sampling/sample_datafu
name last_modified content_type content_length blob_type
0 axavier/sampling/sample_datafu_rs.txt Sun, 15 Nov 2015 13:23:40 GMT 0 BlockBlob
1 axavier/sampling/sample_datafu_rs.txt/_SUCCESS Sun, 15 Nov 2015 13:23:40 GMT application/octet-stream 0 BlockBlob
2 axavier/sampling/sample_datafu_rs.txt/part-r-0... Sun, 15 Nov 2015 13:23:38 GMT application/octet-stream 12780 BlockBlob

version distribuée

Astuce : on distribue puis on recombine les échantillons en faisant un dernier reservoir sampling mais pondéré. Comment distribuer ? Le second sampling est remplacé par une méthode d’échantillonage classique car le reservoir sampling pondéré n’est pas disponible dans la librairie datafu version 1.2.0.

%%PIG sample_explore_datafu_dist.pig

REGISTER '$CONTAINER/$PSEUDO/sampling/datafu-1.2.0.jar';
DEFINE RS datafu.pig.sampling.ReservoirSample('1000');
DEFINE WeightedSample datafu.pig.sampling.WeightedSample();

ensemble = LOAD '$CONTAINER/$PSEUDO/sampling/sample4.txt'
        USING PigStorage('\t') AS (x:int, v:chararray) ;
DESCRIBE ensemble;
keys = FOREACH ensemble GENERATE x, v, x%10 AS key;
DESCRIBE keys;
ens_group = GROUP keys BY key ;
DESCRIBE ens_group;
sampled = FOREACH ens_group GENERATE COUNT(keys) AS weigth, FLATTEN(RS(keys));
DESCRIBE sampled;
wsampled = FOREACH (GROUP sampled ALL) GENERATE FLATTEN(WeightedSample(sampled, 0, 1000));
DESCRIBE wsampled;

STORE wsampled
INTO '$CONTAINER/$PSEUDO/sampling/sample_datafu_rs_dist2.txt' USING PigStorage();
jid = %hd_pig_submit sample_explore_datafu_dist.pig
jid
{'id': 'job_1446540516812_0238'}
st = %hd_job_status jid["id"]
(st["id"],st["percentComplete"],st["completed"],
st["status"]["jobComplete"],st["status"]["state"])
('job_1446540516812_0238', '100% complete', 'done', True, 'SUCCEEDED')
%hd_tail_stderr jid["id"] -n 10
2015-11-15 19:22:17,553 [main] INFO  org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnodehost/100.89.128.19:9010
2015-11-15 19:22:17,553 [main] INFO  org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at headnodehost/100.89.128.19:10200
2015-11-15 19:22:17,615 [main] INFO  org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
2015-11-15 19:22:17,803 [main] INFO  org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service address: http://headnodehost:8188/ws/v1/timeline/
2015-11-15 19:22:17,803 [main] INFO  org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnodehost/100.89.128.19:9010
2015-11-15 19:22:17,803 [main] INFO  org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at headnodehost/100.89.128.19:10200
2015-11-15 19:22:17,865 [main] INFO  org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
2015-11-15 19:22:17,943 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
2015-11-15 19:22:17,975 [main] INFO  org.apache.pig.Main - Pig script completed in 1 minute, 42 seconds and 839 milliseconds (102839 ms)


OUT:
ensemble: {x: int,v: chararray}
keys: {x: int,v: chararray,key: int}
ens_group: {group: int,keys: {(x: int,v: chararray,key: int)}}
sampled: {weigth: long,datafu.pig.sampling.reservoirsample_keys_4::x: int,datafu.pig.sampling.reservoirsample_keys_4::v: chararray,datafu.pig.sampling.reservoirsample_keys_4::key: int}
wsampled: {datafu.pig.sampling.weightedsample_sampled_12::weigth: long,datafu.pig.sampling.weightedsample_sampled_12::datafu.pig.sampling.reservoirsample_keys_11::x: int,datafu.pig.sampling.weightedsample_sampled_12::datafu.pig.sampling.reservoirsample_keys_11::v: chararray,datafu.pig.sampling.weightedsample_sampled_12::datafu.pig.sampling.reservoirsample_keys_11::key: int}

%blob_ls /$PSEUDO/sampling/sample_datafu_rs_dist2
name last_modified content_type content_length blob_type
0 axavier/sampling/sample_datafu_rs_dist2.txt Sun, 15 Nov 2015 19:22:05 GMT 0 BlockBlob
1 axavier/sampling/sample_datafu_rs_dist2.txt/_S... Sun, 15 Nov 2015 19:22:06 GMT application/octet-stream 0 BlockBlob
2 axavier/sampling/sample_datafu_rs_dist2.txt/pa... Sun, 15 Nov 2015 19:22:05 GMT application/octet-stream 20770 BlockBlob
df = %blob_head /$PSEUDO/sampling/sample_datafu_rs_dist2.txt -m
df.head()
10001 21260 S21260 0
0 10000 25191 X25191 1
1 10000 73760 Y73760 0
2 10000 90105 P90105 5
3 10000 46070 Y46070 0
4 10001 58590 M58590 0

version distribuée améliorée

Le problème de la version précédente : chaque sous-ensemble traité d’un seul bloc utilise une séquence de nombres aléatoires sur laquelle on ne connaît pas grand chose. Si les mêmes seed sont utilisées, il est possible que les séquences, même si elles simulent le hasard, soient extrêmement corrélées entre chaque bloc. Il faut remédier à cela.

Il faut également s’assurer que chaque bloc n’est pas skewed.

%%PIG_azure script_rs.pig

REGISTER '$CONTAINER/$PSEUDO/sampling/datafu-1.2.0.jar';
DEFINE MD5 datafu.pig.hash.MD5();
DEFINE RS datafu.pig.sampling.ReservoirSample('1000');
DEFINE WeightedSample datafu.pig.sampling.WeightedSample();

ensemble = LOAD '$CONTAINER/$PSEUDO/sampling/sample4.txt'
        USING PigStorage('\t') AS (x:int, v:chararray) ;
DESCRIBE ensemble;

ens_group = GROUP ensemble BY (x,v);
DESCRIBE ens_group;

compte_group = FOREACH ens_group
            GENERATE group.x AS x,
                     group.v AS v,
                     COUNT(ensemble) AS nb_ligne ;
DESCRIBE compte_group;

hash_group = FOREACH compte_group
                GENERATE x, v, nb_ligne,
                        SUBSTRING(MD5(v), 0, 1) AS val;
DESCRIBE hash_group;

group_hash = GROUP hash_group BY val ;
DESCRIBE group_hash;

rs_parall = FOREACH group_hash GENERATE
                    COUNT(hash_group) AS nb_hash,
                    FLATTEN(RS(hash_group)) ;
DESCRIBE rs_parall;

wsampled = FOREACH (GROUP rs_parall ALL) GENERATE FLATTEN(WeightedSample(rs_parall, 0, 1000));
DESCRIBE wsampled;

STORE wsampled
INTO '$CONTAINER/$PSEUDO/sampling/sample_distributed_hash.txt' USING PigStorage();
jid=%hd_pig_submit script_rs.pig
jid
{'id': 'job_1446540516812_0244'}
st = %hd_job_status jid["id"]
(st["id"],st["percentComplete"],st["completed"],
st["status"]["jobComplete"],st["status"]["state"])
('job_1446540516812_0244', '100% complete', None, False, 'RUNNING')
%hd_tail_stderr jid["id"] -n 10
2015-11-15 19:52:05,138 [main] INFO  org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnodehost/100.89.128.19:9010
2015-11-15 19:52:05,138 [main] INFO  org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at headnodehost/100.89.128.19:10200
2015-11-15 19:52:05,200 [main] INFO  org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
2015-11-15 19:52:05,435 [main] INFO  org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service address: http://headnodehost:8188/ws/v1/timeline/
2015-11-15 19:52:05,435 [main] INFO  org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnodehost/100.89.128.19:9010
2015-11-15 19:52:05,435 [main] INFO  org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at headnodehost/100.89.128.19:10200
2015-11-15 19:52:05,513 [main] INFO  org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
2015-11-15 19:52:05,560 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
2015-11-15 19:52:05,607 [main] INFO  org.apache.pig.Main - Pig script completed in 2 minutes, 29 seconds and 962 milliseconds (149962 ms)


OUT:
ensemble: {x: int,v: chararray}
ens_group: {group: (x: int,v: chararray),ensemble: {(x: int,v: chararray)}}
compte_group: {x: int,v: chararray,nb_ligne: long}
hash_group: {x: int,v: chararray,nb_ligne: long,val: chararray}
group_hash: {group: chararray,hash_group: {(x: int,v: chararray,nb_ligne: long,val: chararray)}}
rs_parall: {nb_hash: long,datafu.pig.sampling.reservoirsample_hash_group_4::x: int,datafu.pig.sampling.reservoirsample_hash_group_4::v: chararray,datafu.pig.sampling.reservoirsample_hash_group_4::nb_ligne: long,datafu.pig.sampling.reservoirsample_hash_group_4::val: chararray}
wsampled: {datafu.pig.sampling.weightedsample_rs_parall_12::nb_hash: long,datafu.pig.sampling.weightedsample_rs_parall_12::datafu.pig.sampling.reservoirsample_hash_group_11::x: int,datafu.pig.sampling.weightedsample_rs_parall_12::datafu.pig.sampling.reservoirsample_hash_group_11::v: chararray,datafu.pig.sampling.weightedsample_rs_parall_12::datafu.pig.sampling.reservoirsample_hash_group_11::nb_ligne: long,datafu.pig.sampling.weightedsample_rs_parall_12::datafu.pig.sampling.reservoirsample_hash_group_11::val: chararray}

%blob_ls /$PSEUDO/sampling/sample_distributed_hash.txt
name last_modified content_type content_length blob_type
0 axavier/sampling/sample_distributed_hash.txt Sun, 15 Nov 2015 19:51:56 GMT 0 BlockBlob
1 axavier/sampling/sample_distributed_hash.txt/_... Sun, 15 Nov 2015 19:51:56 GMT application/octet-stream 0 BlockBlob
2 axavier/sampling/sample_distributed_hash.txt/p... Sun, 15 Nov 2015 19:51:55 GMT application/octet-stream 21750 BlockBlob
df =%blob_head /$PSEUDO/sampling/sample_distributed_hash.txt -m
df.head()
6693 27244 W27244 1 6
0 6749 51104 O51104 1 1
1 6605 91527 H91527 1 6
2 6630 75027 R75027 1 4
3 6789 58148 M58148 1 1
4 6659 71659 D71659 1 5
%blob_downmerge /$PSEUDO/sampling/sample_distributed_hash.txt sample_distributed_hash.txt
'sample_distributed_hash.txt'
%head sample_distributed_hash.txt
6693        27244   W27244  1       6
6749        51104   O51104  1       1
6605        91527   H91527  1       6
6630        75027   R75027  1       4
6789        58148   M58148  1       1
6659        71659   D71659  1       5
6811        74380   U74380  1       9
6749        20125   B20125  1       2
6587        33466   E33466  1       5
6587        21645   N21645  1       5

version java

On s’inspire de l’exemple suivant Sampling. On télécharge datafu 1.2 depuis Maven. Ce n’est pas la dernière version mais suivre les instructions pour builder datafu (voir documentation). En particulier, la version pondérée du reservoir sampling n’est pas disponible (voir history, la version 1.2.0 est sorti en décembre 2013).

L’implémentation java n’a pas l’air de résoudre un problème qui peut survenir si la taille de l’échantillon demandée est trop grande. Voir section suivante.

import pyensae
pyensae.download_data("datafu-1.2.0.jar", url="http://central.maven.org/maven2/com/linkedin/datafu/datafu/1.2.0/")
'datafu-1.2.0.jar'
%blob_up datafu-1.2.0.jar /$PSEUDO/sampling/datafu-1.2.0.jar
'$PSEUDO/sampling/datafu-1.2.0.jar'
%blob_ls /$PSEUDO/sampling
name last_modified content_type content_length blob_type
0 axavier/sampling/datafu-1.2.0.jar Fri, 13 Nov 2015 00:03:49 GMT application/octet-stream 1600826 BlockBlob
1 axavier/sampling/sample.txt Fri, 13 Nov 2015 00:02:50 GMT application/octet-stream 1377780 BlockBlob
2 axavier/sampling/sample2.txt Fri, 13 Nov 2015 00:35:55 GMT application/octet-stream 1377793 BlockBlob
3 axavier/sampling/sample3.txt Fri, 13 Nov 2015 00:39:40 GMT application/octet-stream 1377793 BlockBlob
4 axavier/sampling/sample4.txt Fri, 13 Nov 2015 00:41:49 GMT application/octet-stream 1377793 BlockBlob
%%PIG_azure sample.pig

REGISTER '$CONTAINER/$PSEUDO/sampling/datafu-1.2.0.jar';

DEFINE RS datafu.pig.sampling.ReservoirSample('1000');

dset = LOAD '$CONTAINER/$PSEUDO/sampling/sample4.txt'
        USING PigStorage('\t') AS (x:int, v:chararray) ;
sampled = FOREACH (GROUP dset ALL) GENERATE FLATTEN(RS(dset));
STORE sampled INTO '$CONTAINER/$PSEUDO/sampling/out_sampled_rs4_2015.txt' USING PigStorage() ;
jid = %hd_pig_submit sample.pig
st = %hd_job_status jid["id"]
st["id"],st["percentComplete"],st["completed"],st["status"]["jobComplete"],st["status"]["state"]
('job_1446540516812_0136', None, None, False, 'RUNNING')
%hd_tail_stderr jid["id"] -n 10


%blob_ls /$PSEUDO/sampling
name last_modified content_type content_length blob_type
0 axavier/sampling/datafu-1.2.0.jar Fri, 13 Nov 2015 00:03:49 GMT application/octet-stream 1600826 BlockBlob
1 axavier/sampling/out_sampled_rs4_2015.txt Fri, 13 Nov 2015 01:08:22 GMT 0 BlockBlob
2 axavier/sampling/out_sampled_rs4_2015.txt/_SUC... Fri, 13 Nov 2015 01:08:22 GMT application/octet-stream 0 BlockBlob
3 axavier/sampling/out_sampled_rs4_2015.txt/part... Fri, 13 Nov 2015 01:08:21 GMT application/octet-stream 12785 BlockBlob
4 axavier/sampling/sample.txt Fri, 13 Nov 2015 00:02:50 GMT application/octet-stream 1377780 BlockBlob
5 axavier/sampling/sample2.txt Fri, 13 Nov 2015 00:35:55 GMT application/octet-stream 1377793 BlockBlob
6 axavier/sampling/sample3.txt Fri, 13 Nov 2015 00:39:40 GMT application/octet-stream 1377793 BlockBlob
7 axavier/sampling/sample4.txt Fri, 13 Nov 2015 00:41:49 GMT application/octet-stream 1377793 BlockBlob
8 axavier/sampling/sampled4_2015.txt Fri, 13 Nov 2015 00:50:20 GMT 0 BlockBlob
9 axavier/sampling/sampled4_2015.txt/_SUCCESS Fri, 13 Nov 2015 00:50:20 GMT application/octet-stream 0 BlockBlob
10 axavier/sampling/sampled4_2015.txt/part-m-00000 Fri, 13 Nov 2015 00:50:19 GMT application/octet-stream 1277794 BlockBlob
11 axavier/sampling/sampled_rs4_2015.txt Fri, 13 Nov 2015 01:04:51 GMT 0 BlockBlob
12 axavier/sampling/sampled_rs4_2015.txt/_SUCCESS Fri, 13 Nov 2015 01:04:51 GMT application/octet-stream 0 BlockBlob
13 axavier/sampling/sampled_rs4_2015.txt/part-m-0... Fri, 13 Nov 2015 01:04:50 GMT application/octet-stream 1277794 BlockBlob
14 axavier/sampling/sampled_srs4_2015.txt Fri, 13 Nov 2015 00:56:09 GMT 0 BlockBlob
15 axavier/sampling/sampled_srs4_2015.txt/_SUCCESS Fri, 13 Nov 2015 00:56:09 GMT application/octet-stream 0 BlockBlob
16 axavier/sampling/sampled_srs4_2015.txt/part-m-... Fri, 13 Nov 2015 00:56:09 GMT application/octet-stream 1277794 BlockBlob
17 axavier/sampling/sampled_srs_2015.txt Fri, 13 Nov 2015 00:52:34 GMT 0 BlockBlob
18 axavier/sampling/sampled_srs_2015.txt/_SUCCESS Fri, 13 Nov 2015 00:52:34 GMT application/octet-stream 0 BlockBlob
19 axavier/sampling/sampled_srs_2015.txt/part-m-0... Fri, 13 Nov 2015 00:52:34 GMT application/octet-stream 1277794 BlockBlob
%blob_downmerge /$PSEUDO/sampling/out_sampled_rs4_2015.txt out_sampled_rs4_2015.txt -o
'out_sampled_rs4_2015.txt'
%head out_sampled_rs4_2015.txt
90648       M90648
49678       S49678
41434       Q41434
30149       P30149
15836       C15836
61110       K61110
3838        Q3838
81515       F81515
48052       E48052
16332       E16332

fin

%blob_close
True

version avec itérateur