.. _sparkmatrix3columnsrst: ====================== Matrices en 3 colonnes ====================== .. only:: html **Links:** :download:`notebook `, :downloadlink:`html `, :download:`PDF `, :download:`python `, :downloadlink:`slides `, :githublink:`GitHub|_doc/notebooks/spark_local/spark_matrix_3_columns.ipynb|*` Représentation d’une matrice avec Spark / Map / Reduce. .. code:: ipython3 from jyquickhelper import add_notebook_menu add_notebook_menu() .. contents:: :local: Ce notebook propose d’implémenter un produit matriciel sous Spark. Spark comme SQL n’aime pas trop avoir un nombre de colonnes variables. La première étape consiste à transformer les matrices :math:`I\times J` en tableau de trois colonnes :math:`(i,j,coefficient)`. Création d’une matrice aléatoire -------------------------------- .. code:: ipython3 from numpy.random import rand rnd1 = rand(10,10) rnd2 = rand(10, 2) rnd1 @ rnd2 .. parsed-literal:: array([[ 3.16485005, 4.55944572], [ 2.5267903 , 2.87023564], [ 2.54568848, 3.04477188], [ 1.97955889, 2.62345798], [ 2.62883641, 4.11144949], [ 2.78826895, 3.49846327], [ 2.21395203, 2.90355441], [ 2.23250159, 3.21322361], [ 2.42321287, 3.4138286 ], [ 2.13862033, 3.38881814]]) .. code:: ipython3 import pandas df1 = pandas.DataFrame(rnd1) df2 = pandas.DataFrame(rnd2) df2 .. raw:: html
0 1
0 0.797985 0.429407
1 0.621650 0.495996
2 0.388457 0.513059
3 0.481193 0.758338
4 0.704995 0.846644
5 0.338331 0.795261
6 0.168499 0.982211
7 0.149587 0.408392
8 0.964368 0.393823
9 0.106360 0.981409
.. code:: ipython3 df1.to_csv("rnd1.txt", sep="\t", header=None, index=False) df2.to_csv("rnd2.txt", sep="\t", header=None, index=False) Conversion d’une matrice au format Spark ---------------------------------------- Lorsqu’un traitement est distribué en Map/Reduce, il n’est pas possible de s’appuyer sur l’ordre dans lequel sont traitées les lignes. Le plus est d’ajouter cette information sur chaque ligne plutôt que de chercher à la récupérer. .. code:: ipython3 df1.to_csv("rnd1.txt", sep="\t", header=None, index=True) df2.to_csv("rnd2.txt", sep="\t", header=None, index=True) .. code:: ipython3 def process_mat_row(row): values = row.split("\t") index = int(values[0]) values = [float(_) for _ in values[1:]] return [[index, j, v] for j, v in enumerate(values)] .. code:: ipython3 mat1 = sc.textFile("rnd1.txt") new_mat1 = mat1.flatMap(process_mat_row) new_mat1.take(12) .. parsed-literal:: [[0, 0, 0.9644291538662529], [0, 1, 0.4373852433376806], [0, 2, 0.9381996928112303], [0, 3, 0.9667835285319625], [0, 4, 0.4641212389055033], [0, 5, 0.9623402255972683], [0, 6, 0.5503835704242525], [0, 7, 0.44419152080661695], [0, 8, 0.4246234335886486], [0, 9, 0.6790142143195625], [1, 0, 0.8978778490345252], [1, 1, 0.4449340737705302]] .. code:: ipython3 mat2 = sc.textFile("rnd2.txt") new_mat2 = mat2.flatMap(process_mat_row) new_mat2.take(12) .. parsed-literal:: [[0, 0, 0.7979848790126467], [0, 1, 0.42940652289456605], [1, 0, 0.6216501174339032], [1, 1, 0.49599627482280284], [2, 0, 0.3884569290726675], [2, 1, 0.5130585892599168], [3, 0, 0.4811927042491243], [3, 1, 0.7583376223390912], [4, 0, 0.7049954049585642], [4, 1, 0.8466443457915623], [5, 0, 0.3383307412367995], [5, 1, 0.7952613070751512]] Produit matriciel ----------------- Il faut d’abord faire la jointure avec la méthode `join `__. Il faut que la clé soit sur la première colonne. .. code:: ipython3 def key_ij(row): return row[0], (row[1], row[2]) def key_ji(row): return row[1], (row[0], row[2]) mat_join = new_mat1.map(key_ji).join(new_mat2.map(key_ij)) mat_join.take(12) .. parsed-literal:: [(0, ((0, 0.9644291538662529), (0, 0.7979848790126467))), (0, ((0, 0.9644291538662529), (1, 0.42940652289456605))), (0, ((1, 0.8978778490345252), (0, 0.7979848790126467))), (0, ((1, 0.8978778490345252), (1, 0.42940652289456605))), (0, ((2, 0.9974373360415959), (0, 0.7979848790126467))), (0, ((2, 0.9974373360415959), (1, 0.42940652289456605))), (0, ((3, 0.6998511854857753), (0, 0.7979848790126467))), (0, ((3, 0.6998511854857753), (1, 0.42940652289456605))), (0, ((4, 0.7225040569303974), (0, 0.7979848790126467))), (0, ((4, 0.7225040569303974), (1, 0.42940652289456605))), (0, ((5, 0.6833261146005197), (0, 0.7979848790126467))), (0, ((5, 0.6833261146005197), (1, 0.42940652289456605)))] On effectue le produit matriciel. .. code:: ipython3 def produit_matriciel(row): index, ((i, v1), (j, v2)) = row return i, j, v1 * v2 produit = mat_join.map(produit_matriciel) produit.take(12) .. parsed-literal:: [(0, 0, 0.7695998816642311), (0, 1, 0.4141321695398561), (1, 0, 0.716492946729951), (1, 1, 0.3855546051379675), (2, 0, 0.7959399119238495), (2, 1, 0.4283060982748405), (3, 0, 0.5584706635767238), (3, 1, 0.30052066410308675), (4, 0, 0.5765473124557496), (4, 1, 0.31024795486369955), (5, 0, 0.5452839068856777), (5, 1, 0.29342469087366296)] Il ne reste plus qu’à agréger `reduceByKey `__. La documentation fournit un exemple facilement transposable. Elle indique aussi : *Merge the values for each key using an associative and commutative reduce function.* Pourquoi précise-t-elle **associative et commutative** ? Cela signifie que le résultat ne dépend pas de l’ordre dans lequel l’agrégation est réalisée et qu’on peut commencer à agréger sans attendre d’avoir regroupé toutes les valeurs associées à une clé. - *Cas 1 :* `groupBy `__ + agrégation qui commence une fois les valeurs regroupées - *Cas 2 :* `reduceByKey `__ + agrégation qui commence dès les premières valeurs regroupées Le cas 2 est moins consommateur en terme de données. Le cas 1 n’est possible que si les valeurs agrégées ne sont pas trop nombreuses. Ca tombe bien, dans notre cas, le cas 2 convient. .. code:: ipython3 from operator import add final = produit.map(lambda row: ((row[0], row[1]), row[2])).reduceByKey(add) aslist = final.collect() aslist.sort() aslist .. parsed-literal:: [((0, 0), 3.164850046348241), ((0, 1), 4.559445715024405), ((1, 0), 2.526790300879841), ((1, 1), 2.8702356426731646), ((2, 0), 2.5456884797140247), ((2, 1), 3.04477187797909), ((3, 0), 1.9795588879982224), ((3, 1), 2.623457980006711), ((4, 0), 2.6288364080082656), ((4, 1), 4.111449492587058), ((5, 0), 2.788268947579333), ((5, 1), 3.498463270496026), ((6, 0), 2.2139520348118236), ((6, 1), 2.903554407097735), ((7, 0), 2.232501586646612), ((7, 1), 3.213223607913268), ((8, 0), 2.42321286851472), ((8, 1), 3.4138285975924623), ((9, 0), 2.1386203274215574), ((9, 1), 3.3888181357124005)] Résultat initial : .. code:: ipython3 rnd1 @ rnd2 .. parsed-literal:: array([[ 3.16485005, 4.55944572], [ 2.5267903 , 2.87023564], [ 2.54568848, 3.04477188], [ 1.97955889, 2.62345798], [ 2.62883641, 4.11144949], [ 2.78826895, 3.49846327], [ 2.21395203, 2.90355441], [ 2.23250159, 3.21322361], [ 2.42321287, 3.4138286 ], [ 2.13862033, 3.38881814]]) Même algorithme avec les Spark DataFrame ---------------------------------------- On a besoin de réaliser un `flatMap `__. Une façon de faire est de créer des colonnes qui sont de type composé : un tableau, une structure. La multiplication des lignes est obtenue avec la fonction `explode `__. .. code:: ipython3 schema = ["index"] + ["c%d" % i for i in range(1, 11)] mat1 = spark.createDataFrame(pandas.read_csv("rnd1.txt", header=None, sep="\t"), schema=schema) .. code:: ipython3 mat1.printSchema() .. parsed-literal:: root |-- index: long (nullable = true) |-- c1: double (nullable = true) |-- c2: double (nullable = true) |-- c3: double (nullable = true) |-- c4: double (nullable = true) |-- c5: double (nullable = true) |-- c6: double (nullable = true) |-- c7: double (nullable = true) |-- c8: double (nullable = true) |-- c9: double (nullable = true) |-- c10: double (nullable = true) .. code:: ipython3 schema = ["index"] + ["c%d" % i for i in range(1, 3)] mat2 = spark.createDataFrame(pandas.read_csv("rnd2.txt", header=None, sep="\t"), schema=schema) .. code:: ipython3 mat2.printSchema() .. parsed-literal:: root |-- index: long (nullable = true) |-- c1: double (nullable = true) |-- c2: double (nullable = true) Nous allons avoir besoin de quelques-uns des fonctions et types suivant : - `explode `__, `posexplode `__, `array `__, `alias `__ - `StructType `__, `StructField `__ - `ArrayType `__ - `DoubleType `__, `IntegerType `__ Je recommande le type `FloatType `__ qui prend deux fois moins de place pour une précision moindre mais suffisante dans la plupart des cas. .. code:: ipython3 from pyspark.sql.types import ArrayType, StructField, StructType, DoubleType, IntegerType from pyspark.sql.functions import explode, posexplode, array from pyspark.sql import Row .. code:: ipython3 cols = ["c%d" % i for i in range(1, 11)] mat1_array = mat1.select(mat1.index, array(*cols).alias("x")) mat1_array.printSchema() .. parsed-literal:: root |-- index: long (nullable = true) |-- x: array (nullable = false) | |-- element: double (containsNull = true) .. code:: ipython3 mat1_exploded = mat1_array.select("index", posexplode("x")) mat1_exploded.printSchema() .. parsed-literal:: root |-- index: long (nullable = true) |-- pos: integer (nullable = false) |-- col: double (nullable = true) .. code:: ipython3 mat1.toPandas().shape, mat1_exploded.toPandas().shape .. parsed-literal:: ((10, 11), (100, 3)) On recommence le même procédé pour l’autre matrice. .. code:: ipython3 cols = ["c%d" % i for i in range(1, 3)] mat2_array = mat2.select(mat2.index, array(*cols).alias("x")) mat2_exploded = mat2_array.select("index", posexplode("x")) Il ne reste plus qu’à faire le produit avec la méthode `join `__ après avoir renommé les colonnes avant la jointure pour éviter les ambiguïtés. .. code:: ipython3 mat2_exp2 = mat2_exploded.withColumnRenamed("index", "index2") \ .withColumnRenamed("pos", "pos2") \ .withColumnRenamed("col", "col2") produit = mat1_exploded.join(mat2_exp2, mat1_exploded.pos == mat2_exp2.index2) .. code:: ipython3 produit.printSchema() .. parsed-literal:: root |-- index: long (nullable = true) |-- pos: integer (nullable = false) |-- col: double (nullable = true) |-- index2: long (nullable = true) |-- pos2: integer (nullable = false) |-- col2: double (nullable = true) .. code:: ipython3 produit.toPandas().head() .. raw:: html
index pos col index2 pos2 col2
0 0 0 0.964429 0 0 0.797985
1 0 0 0.964429 0 1 0.429407
2 1 0 0.897878 0 0 0.797985
3 1 0 0.897878 0 1 0.429407
4 2 0 0.997437 0 0 0.797985
.. code:: ipython3 prod = produit.select(produit.index.alias("i"), produit.pos2.alias("j"), (produit.col * produit.col2).alias("val")) final = prod.groupby("i", "j").sum("val") .. code:: ipython3 final.printSchema() .. parsed-literal:: root |-- i: long (nullable = true) |-- j: integer (nullable = false) |-- sum(val): double (nullable = true) .. code:: ipython3 df = final.toPandas() .. code:: ipython3 df.sort_values(["i", "j"]).head() .. raw:: html
i j sum(val)
7 0 0 3.164850
10 0 1 4.559446
18 1 0 2.526790
3 1 1 2.870236
6 2 0 2.545688
.. code:: ipython3 df.shape .. parsed-literal:: (20, 3)