import mermaid from 'https://cdnjs.cloudflare.com/ajax/libs/mermaid/10.2.3/mermaid.esm.min.mjs'; mermaid.initialize({ startOnLoad: true });
Représentation d'une matrice avec Spark / Map / Reduce.
from jyquickhelper import add_notebook_menu
add_notebook_menu()
Ce notebook propose d'implémenter un produit matriciel sous Spark. Spark comme SQL n'aime pas trop avoir un nombre de colonnes variables. La première étape consiste à transformer les matrices $I\times J$ en tableau de trois colonnes $(i,j,coefficient)$.
from numpy.random import rand
rnd1 = rand(10,10)
rnd2 = rand(10, 2)
rnd1 @ rnd2
array([[ 3.16485005, 4.55944572], [ 2.5267903 , 2.87023564], [ 2.54568848, 3.04477188], [ 1.97955889, 2.62345798], [ 2.62883641, 4.11144949], [ 2.78826895, 3.49846327], [ 2.21395203, 2.90355441], [ 2.23250159, 3.21322361], [ 2.42321287, 3.4138286 ], [ 2.13862033, 3.38881814]])
import pandas
df1 = pandas.DataFrame(rnd1)
df2 = pandas.DataFrame(rnd2)
df2
0 | 1 | |
---|---|---|
0 | 0.797985 | 0.429407 |
1 | 0.621650 | 0.495996 |
2 | 0.388457 | 0.513059 |
3 | 0.481193 | 0.758338 |
4 | 0.704995 | 0.846644 |
5 | 0.338331 | 0.795261 |
6 | 0.168499 | 0.982211 |
7 | 0.149587 | 0.408392 |
8 | 0.964368 | 0.393823 |
9 | 0.106360 | 0.981409 |
df1.to_csv("rnd1.txt", sep="\t", header=None, index=False)
df2.to_csv("rnd2.txt", sep="\t", header=None, index=False)
Lorsqu'un traitement est distribué en Map/Reduce, il n'est pas possible de s'appuyer sur l'ordre dans lequel sont traitées les lignes. Le plus est d'ajouter cette information sur chaque ligne plutôt que de chercher à la récupérer.
df1.to_csv("rnd1.txt", sep="\t", header=None, index=True)
df2.to_csv("rnd2.txt", sep="\t", header=None, index=True)
def process_mat_row(row):
values = row.split("\t")
index = int(values[0])
values = [float(_) for _ in values[1:]]
return [[index, j, v] for j, v in enumerate(values)]
mat1 = sc.textFile("rnd1.txt")
new_mat1 = mat1.flatMap(process_mat_row)
new_mat1.take(12)
[[0, 0, 0.9644291538662529], [0, 1, 0.4373852433376806], [0, 2, 0.9381996928112303], [0, 3, 0.9667835285319625], [0, 4, 0.4641212389055033], [0, 5, 0.9623402255972683], [0, 6, 0.5503835704242525], [0, 7, 0.44419152080661695], [0, 8, 0.4246234335886486], [0, 9, 0.6790142143195625], [1, 0, 0.8978778490345252], [1, 1, 0.4449340737705302]]
mat2 = sc.textFile("rnd2.txt")
new_mat2 = mat2.flatMap(process_mat_row)
new_mat2.take(12)
[[0, 0, 0.7979848790126467], [0, 1, 0.42940652289456605], [1, 0, 0.6216501174339032], [1, 1, 0.49599627482280284], [2, 0, 0.3884569290726675], [2, 1, 0.5130585892599168], [3, 0, 0.4811927042491243], [3, 1, 0.7583376223390912], [4, 0, 0.7049954049585642], [4, 1, 0.8466443457915623], [5, 0, 0.3383307412367995], [5, 1, 0.7952613070751512]]
def key_ij(row):
return row[0], (row[1], row[2])
def key_ji(row):
return row[1], (row[0], row[2])
mat_join = new_mat1.map(key_ji).join(new_mat2.map(key_ij))
mat_join.take(12)
[(0, ((0, 0.9644291538662529), (0, 0.7979848790126467))), (0, ((0, 0.9644291538662529), (1, 0.42940652289456605))), (0, ((1, 0.8978778490345252), (0, 0.7979848790126467))), (0, ((1, 0.8978778490345252), (1, 0.42940652289456605))), (0, ((2, 0.9974373360415959), (0, 0.7979848790126467))), (0, ((2, 0.9974373360415959), (1, 0.42940652289456605))), (0, ((3, 0.6998511854857753), (0, 0.7979848790126467))), (0, ((3, 0.6998511854857753), (1, 0.42940652289456605))), (0, ((4, 0.7225040569303974), (0, 0.7979848790126467))), (0, ((4, 0.7225040569303974), (1, 0.42940652289456605))), (0, ((5, 0.6833261146005197), (0, 0.7979848790126467))), (0, ((5, 0.6833261146005197), (1, 0.42940652289456605)))]
On effectue le produit matriciel.
def produit_matriciel(row):
index, ((i, v1), (j, v2)) = row
return i, j, v1 * v2
produit = mat_join.map(produit_matriciel)
produit.take(12)
[(0, 0, 0.7695998816642311), (0, 1, 0.4141321695398561), (1, 0, 0.716492946729951), (1, 1, 0.3855546051379675), (2, 0, 0.7959399119238495), (2, 1, 0.4283060982748405), (3, 0, 0.5584706635767238), (3, 1, 0.30052066410308675), (4, 0, 0.5765473124557496), (4, 1, 0.31024795486369955), (5, 0, 0.5452839068856777), (5, 1, 0.29342469087366296)]
Il ne reste plus qu'à agréger reduceByKey. La documentation fournit un exemple facilement transposable. Elle indique aussi : Merge the values for each key using an associative and commutative reduce function. Pourquoi précise-t-elle associative et commutative ? Cela signifie que le résultat ne dépend pas de l'ordre dans lequel l'agrégation est réalisée et qu'on peut commencer à agréger sans attendre d'avoir regroupé toutes les valeurs associées à une clé.
Le cas 2 est moins consommateur en terme de données. Le cas 1 n'est possible que si les valeurs agrégées ne sont pas trop nombreuses. Ca tombe bien, dans notre cas, le cas 2 convient.
from operator import add
final = produit.map(lambda row: ((row[0], row[1]), row[2])).reduceByKey(add)
aslist = final.collect()
aslist.sort()
aslist
[((0, 0), 3.164850046348241), ((0, 1), 4.559445715024405), ((1, 0), 2.526790300879841), ((1, 1), 2.8702356426731646), ((2, 0), 2.5456884797140247), ((2, 1), 3.04477187797909), ((3, 0), 1.9795588879982224), ((3, 1), 2.623457980006711), ((4, 0), 2.6288364080082656), ((4, 1), 4.111449492587058), ((5, 0), 2.788268947579333), ((5, 1), 3.498463270496026), ((6, 0), 2.2139520348118236), ((6, 1), 2.903554407097735), ((7, 0), 2.232501586646612), ((7, 1), 3.213223607913268), ((8, 0), 2.42321286851472), ((8, 1), 3.4138285975924623), ((9, 0), 2.1386203274215574), ((9, 1), 3.3888181357124005)]
Résultat initial :
rnd1 @ rnd2
array([[ 3.16485005, 4.55944572], [ 2.5267903 , 2.87023564], [ 2.54568848, 3.04477188], [ 1.97955889, 2.62345798], [ 2.62883641, 4.11144949], [ 2.78826895, 3.49846327], [ 2.21395203, 2.90355441], [ 2.23250159, 3.21322361], [ 2.42321287, 3.4138286 ], [ 2.13862033, 3.38881814]])
schema = ["index"] + ["c%d" % i for i in range(1, 11)]
mat1 = spark.createDataFrame(pandas.read_csv("rnd1.txt", header=None, sep="\t"), schema=schema)
mat1.printSchema()
root |-- index: long (nullable = true) |-- c1: double (nullable = true) |-- c2: double (nullable = true) |-- c3: double (nullable = true) |-- c4: double (nullable = true) |-- c5: double (nullable = true) |-- c6: double (nullable = true) |-- c7: double (nullable = true) |-- c8: double (nullable = true) |-- c9: double (nullable = true) |-- c10: double (nullable = true)
schema = ["index"] + ["c%d" % i for i in range(1, 3)]
mat2 = spark.createDataFrame(pandas.read_csv("rnd2.txt", header=None, sep="\t"), schema=schema)
mat2.printSchema()
root |-- index: long (nullable = true) |-- c1: double (nullable = true) |-- c2: double (nullable = true)
Nous allons avoir besoin de quelques-uns des fonctions et types suivant :
Je recommande le type FloatType qui prend deux fois moins de place pour une précision moindre mais suffisante dans la plupart des cas.
from pyspark.sql.types import ArrayType, StructField, StructType, DoubleType, IntegerType
from pyspark.sql.functions import explode, posexplode, array
from pyspark.sql import Row
cols = ["c%d" % i for i in range(1, 11)]
mat1_array = mat1.select(mat1.index, array(*cols).alias("x"))
mat1_array.printSchema()
root |-- index: long (nullable = true) |-- x: array (nullable = false) | |-- element: double (containsNull = true)
mat1_exploded = mat1_array.select("index", posexplode("x"))
mat1_exploded.printSchema()
root |-- index: long (nullable = true) |-- pos: integer (nullable = false) |-- col: double (nullable = true)
mat1.toPandas().shape, mat1_exploded.toPandas().shape
((10, 11), (100, 3))
On recommence le même procédé pour l'autre matrice.
cols = ["c%d" % i for i in range(1, 3)]
mat2_array = mat2.select(mat2.index, array(*cols).alias("x"))
mat2_exploded = mat2_array.select("index", posexplode("x"))
Il ne reste plus qu'à faire le produit avec la méthode join après avoir renommé les colonnes avant la jointure pour éviter les ambiguïtés.
mat2_exp2 = mat2_exploded.withColumnRenamed("index", "index2") \
.withColumnRenamed("pos", "pos2") \
.withColumnRenamed("col", "col2")
produit = mat1_exploded.join(mat2_exp2, mat1_exploded.pos == mat2_exp2.index2)
produit.printSchema()
root |-- index: long (nullable = true) |-- pos: integer (nullable = false) |-- col: double (nullable = true) |-- index2: long (nullable = true) |-- pos2: integer (nullable = false) |-- col2: double (nullable = true)
produit.toPandas().head()
index | pos | col | index2 | pos2 | col2 | |
---|---|---|---|---|---|---|
0 | 0 | 0 | 0.964429 | 0 | 0 | 0.797985 |
1 | 0 | 0 | 0.964429 | 0 | 1 | 0.429407 |
2 | 1 | 0 | 0.897878 | 0 | 0 | 0.797985 |
3 | 1 | 0 | 0.897878 | 0 | 1 | 0.429407 |
4 | 2 | 0 | 0.997437 | 0 | 0 | 0.797985 |
prod = produit.select(produit.index.alias("i"), produit.pos2.alias("j"),
(produit.col * produit.col2).alias("val"))
final = prod.groupby("i", "j").sum("val")
final.printSchema()
root |-- i: long (nullable = true) |-- j: integer (nullable = false) |-- sum(val): double (nullable = true)
df = final.toPandas()
df.sort_values(["i", "j"]).head()
i | j | sum(val) | |
---|---|---|---|
7 | 0 | 0 | 3.164850 |
10 | 0 | 1 | 4.559446 |
18 | 1 | 0 | 2.526790 |
3 | 1 | 1 | 2.870236 |
6 | 2 | 0 | 2.545688 |
df.shape
(20, 3)