Matrices en 3 colonnes

Links: notebook, html, PDF, python, slides, GitHub

Représentation d’une matrice avec Spark / Map / Reduce.

from jyquickhelper import add_notebook_menu
add_notebook_menu()

Ce notebook propose d’implémenter un produit matriciel sous Spark. Spark comme SQL n’aime pas trop avoir un nombre de colonnes variables. La première étape consiste à transformer les matrices I\times J en tableau de trois colonnes (i,j,coefficient).

Création d’une matrice aléatoire

from numpy.random import rand
rnd1 = rand(10,10)
rnd2 = rand(10, 2)
rnd1 @ rnd2
array([[ 3.16485005,  4.55944572],
       [ 2.5267903 ,  2.87023564],
       [ 2.54568848,  3.04477188],
       [ 1.97955889,  2.62345798],
       [ 2.62883641,  4.11144949],
       [ 2.78826895,  3.49846327],
       [ 2.21395203,  2.90355441],
       [ 2.23250159,  3.21322361],
       [ 2.42321287,  3.4138286 ],
       [ 2.13862033,  3.38881814]])
import pandas
df1 = pandas.DataFrame(rnd1)
df2 = pandas.DataFrame(rnd2)
df2
0 1
0 0.797985 0.429407
1 0.621650 0.495996
2 0.388457 0.513059
3 0.481193 0.758338
4 0.704995 0.846644
5 0.338331 0.795261
6 0.168499 0.982211
7 0.149587 0.408392
8 0.964368 0.393823
9 0.106360 0.981409
df1.to_csv("rnd1.txt", sep="\t", header=None, index=False)
df2.to_csv("rnd2.txt", sep="\t", header=None, index=False)

Conversion d’une matrice au format Spark

Lorsqu’un traitement est distribué en Map/Reduce, il n’est pas possible de s’appuyer sur l’ordre dans lequel sont traitées les lignes. Le plus est d’ajouter cette information sur chaque ligne plutôt que de chercher à la récupérer.

df1.to_csv("rnd1.txt", sep="\t", header=None, index=True)
df2.to_csv("rnd2.txt", sep="\t", header=None, index=True)
def process_mat_row(row):
    values = row.split("\t")
    index = int(values[0])
    values = [float(_) for _ in values[1:]]
    return [[index, j, v] for j, v in enumerate(values)]
mat1 = sc.textFile("rnd1.txt")
new_mat1 = mat1.flatMap(process_mat_row)
new_mat1.take(12)
[[0, 0, 0.9644291538662529],
 [0, 1, 0.4373852433376806],
 [0, 2, 0.9381996928112303],
 [0, 3, 0.9667835285319625],
 [0, 4, 0.4641212389055033],
 [0, 5, 0.9623402255972683],
 [0, 6, 0.5503835704242525],
 [0, 7, 0.44419152080661695],
 [0, 8, 0.4246234335886486],
 [0, 9, 0.6790142143195625],
 [1, 0, 0.8978778490345252],
 [1, 1, 0.4449340737705302]]
mat2 = sc.textFile("rnd2.txt")
new_mat2 = mat2.flatMap(process_mat_row)
new_mat2.take(12)
[[0, 0, 0.7979848790126467],
 [0, 1, 0.42940652289456605],
 [1, 0, 0.6216501174339032],
 [1, 1, 0.49599627482280284],
 [2, 0, 0.3884569290726675],
 [2, 1, 0.5130585892599168],
 [3, 0, 0.4811927042491243],
 [3, 1, 0.7583376223390912],
 [4, 0, 0.7049954049585642],
 [4, 1, 0.8466443457915623],
 [5, 0, 0.3383307412367995],
 [5, 1, 0.7952613070751512]]

Produit matriciel

Il faut d’abord faire la jointure avec la méthode join. Il faut que la clé soit sur la première colonne.

def key_ij(row):
    return row[0], (row[1], row[2])
def key_ji(row):
    return row[1], (row[0], row[2])
mat_join = new_mat1.map(key_ji).join(new_mat2.map(key_ij))
mat_join.take(12)
[(0, ((0, 0.9644291538662529), (0, 0.7979848790126467))),
 (0, ((0, 0.9644291538662529), (1, 0.42940652289456605))),
 (0, ((1, 0.8978778490345252), (0, 0.7979848790126467))),
 (0, ((1, 0.8978778490345252), (1, 0.42940652289456605))),
 (0, ((2, 0.9974373360415959), (0, 0.7979848790126467))),
 (0, ((2, 0.9974373360415959), (1, 0.42940652289456605))),
 (0, ((3, 0.6998511854857753), (0, 0.7979848790126467))),
 (0, ((3, 0.6998511854857753), (1, 0.42940652289456605))),
 (0, ((4, 0.7225040569303974), (0, 0.7979848790126467))),
 (0, ((4, 0.7225040569303974), (1, 0.42940652289456605))),
 (0, ((5, 0.6833261146005197), (0, 0.7979848790126467))),
 (0, ((5, 0.6833261146005197), (1, 0.42940652289456605)))]

On effectue le produit matriciel.

def produit_matriciel(row):
    index, ((i, v1), (j, v2)) = row
    return i, j, v1 * v2
produit = mat_join.map(produit_matriciel)
produit.take(12)
[(0, 0, 0.7695998816642311),
 (0, 1, 0.4141321695398561),
 (1, 0, 0.716492946729951),
 (1, 1, 0.3855546051379675),
 (2, 0, 0.7959399119238495),
 (2, 1, 0.4283060982748405),
 (3, 0, 0.5584706635767238),
 (3, 1, 0.30052066410308675),
 (4, 0, 0.5765473124557496),
 (4, 1, 0.31024795486369955),
 (5, 0, 0.5452839068856777),
 (5, 1, 0.29342469087366296)]

Il ne reste plus qu’à agréger reduceByKey. La documentation fournit un exemple facilement transposable. Elle indique aussi : Merge the values for each key using an associative and commutative reduce function. Pourquoi précise-t-elle associative et commutative ? Cela signifie que le résultat ne dépend pas de l’ordre dans lequel l’agrégation est réalisée et qu’on peut commencer à agréger sans attendre d’avoir regroupé toutes les valeurs associées à une clé.

  • Cas 1 : groupBy + agrégation qui commence une fois les valeurs regroupées

  • Cas 2 : reduceByKey + agrégation qui commence dès les premières valeurs regroupées

Le cas 2 est moins consommateur en terme de données. Le cas 1 n’est possible que si les valeurs agrégées ne sont pas trop nombreuses. Ca tombe bien, dans notre cas, le cas 2 convient.

from operator import add
final = produit.map(lambda row: ((row[0], row[1]), row[2])).reduceByKey(add)
aslist = final.collect()
aslist.sort()
aslist
[((0, 0), 3.164850046348241),
 ((0, 1), 4.559445715024405),
 ((1, 0), 2.526790300879841),
 ((1, 1), 2.8702356426731646),
 ((2, 0), 2.5456884797140247),
 ((2, 1), 3.04477187797909),
 ((3, 0), 1.9795588879982224),
 ((3, 1), 2.623457980006711),
 ((4, 0), 2.6288364080082656),
 ((4, 1), 4.111449492587058),
 ((5, 0), 2.788268947579333),
 ((5, 1), 3.498463270496026),
 ((6, 0), 2.2139520348118236),
 ((6, 1), 2.903554407097735),
 ((7, 0), 2.232501586646612),
 ((7, 1), 3.213223607913268),
 ((8, 0), 2.42321286851472),
 ((8, 1), 3.4138285975924623),
 ((9, 0), 2.1386203274215574),
 ((9, 1), 3.3888181357124005)]

Résultat initial :

rnd1 @ rnd2
array([[ 3.16485005,  4.55944572],
       [ 2.5267903 ,  2.87023564],
       [ 2.54568848,  3.04477188],
       [ 1.97955889,  2.62345798],
       [ 2.62883641,  4.11144949],
       [ 2.78826895,  3.49846327],
       [ 2.21395203,  2.90355441],
       [ 2.23250159,  3.21322361],
       [ 2.42321287,  3.4138286 ],
       [ 2.13862033,  3.38881814]])

Même algorithme avec les Spark DataFrame

On a besoin de réaliser un flatMap. Une façon de faire est de créer des colonnes qui sont de type composé : un tableau, une structure. La multiplication des lignes est obtenue avec la fonction explode.

schema = ["index"] + ["c%d" % i for i in range(1, 11)]
mat1 = spark.createDataFrame(pandas.read_csv("rnd1.txt", header=None, sep="\t"), schema=schema)
mat1.printSchema()
root
 |-- index: long (nullable = true)
 |-- c1: double (nullable = true)
 |-- c2: double (nullable = true)
 |-- c3: double (nullable = true)
 |-- c4: double (nullable = true)
 |-- c5: double (nullable = true)
 |-- c6: double (nullable = true)
 |-- c7: double (nullable = true)
 |-- c8: double (nullable = true)
 |-- c9: double (nullable = true)
 |-- c10: double (nullable = true)
schema = ["index"] + ["c%d" % i for i in range(1, 3)]
mat2 = spark.createDataFrame(pandas.read_csv("rnd2.txt", header=None, sep="\t"), schema=schema)
mat2.printSchema()
root
 |-- index: long (nullable = true)
 |-- c1: double (nullable = true)
 |-- c2: double (nullable = true)

Nous allons avoir besoin de quelques-uns des fonctions et types suivant :

Je recommande le type FloatType qui prend deux fois moins de place pour une précision moindre mais suffisante dans la plupart des cas.

from pyspark.sql.types import ArrayType, StructField, StructType, DoubleType, IntegerType
from pyspark.sql.functions import explode, posexplode, array
from pyspark.sql import Row
cols = ["c%d" % i for i in range(1, 11)]
mat1_array = mat1.select(mat1.index, array(*cols).alias("x"))
mat1_array.printSchema()
root
 |-- index: long (nullable = true)
 |-- x: array (nullable = false)
 |    |-- element: double (containsNull = true)
mat1_exploded = mat1_array.select("index", posexplode("x"))
mat1_exploded.printSchema()
root
 |-- index: long (nullable = true)
 |-- pos: integer (nullable = false)
 |-- col: double (nullable = true)
mat1.toPandas().shape, mat1_exploded.toPandas().shape
((10, 11), (100, 3))

On recommence le même procédé pour l’autre matrice.

cols = ["c%d" % i for i in range(1, 3)]
mat2_array = mat2.select(mat2.index, array(*cols).alias("x"))
mat2_exploded = mat2_array.select("index", posexplode("x"))

Il ne reste plus qu’à faire le produit avec la méthode join après avoir renommé les colonnes avant la jointure pour éviter les ambiguïtés.

mat2_exp2 = mat2_exploded.withColumnRenamed("index", "index2") \
                         .withColumnRenamed("pos", "pos2") \
                         .withColumnRenamed("col", "col2")
produit = mat1_exploded.join(mat2_exp2, mat1_exploded.pos == mat2_exp2.index2)
produit.printSchema()
root
 |-- index: long (nullable = true)
 |-- pos: integer (nullable = false)
 |-- col: double (nullable = true)
 |-- index2: long (nullable = true)
 |-- pos2: integer (nullable = false)
 |-- col2: double (nullable = true)
produit.toPandas().head()
index pos col index2 pos2 col2
0 0 0 0.964429 0 0 0.797985
1 0 0 0.964429 0 1 0.429407
2 1 0 0.897878 0 0 0.797985
3 1 0 0.897878 0 1 0.429407
4 2 0 0.997437 0 0 0.797985
prod = produit.select(produit.index.alias("i"), produit.pos2.alias("j"),
                         (produit.col * produit.col2).alias("val"))
final = prod.groupby("i", "j").sum("val")
final.printSchema()
root
 |-- i: long (nullable = true)
 |-- j: integer (nullable = false)
 |-- sum(val): double (nullable = true)
df = final.toPandas()
df.sort_values(["i", "j"]).head()
i j sum(val)
7 0 0 3.164850
10 0 1 4.559446
18 1 0 2.526790
3 1 1 2.870236
6 2 0 2.545688
df.shape
(20, 3)