.. _recursivereducersrst: ================== Reducers récursifs ================== .. only:: html **Links:** :download:`notebook `, :downloadlink:`html `, :download:`PDF `, :download:`python `, :downloadlink:`slides `, :githublink:`GitHub|_doc/notebooks/progf/recursive_reducers.ipynb|*` J’utilise volontiers une terminologie découverte chez Microsoft pour illustrer une façon d’écrire le même calcul qui a un impact sur la facilité avec laquelle on peut le distribution : utiliser des comptes plutôt que des moyennes. .. code:: ipython3 from jyquickhelper import add_notebook_menu add_notebook_menu() .. contents:: :local: Le notebook utilise des fonctions développées pour illustrer les notions, plus claires qu’efficaces. Stream ------ Le map reduce s’applique à des jeux de données très grands. D’un point de vue mathématique, on écrit des algorithmes qui s’appliquent à des jeux de données infinis ou plutôt dont la taille n’est pas connu. Pour les distinguer des jeux de données, on les appelle des *flux* ou *stream* en anglais. En aparté, écrits pour être parallélisés, ces traitements ont la particuliarité de ne pas conserver l’ordre dans lequel il traite les données. C’est particulièrement vrai lorsque le jeu de données est divisé sur plusieurs disques durs. Il est impossible de choisir un morceau en premier. Mapper ------ Un *mapper* applique le même traitement à chaque observation du *stream* de façon indépendante. .. code:: ipython3 ens = [('a', 1), ('b', 4), ('a', 6), ('a', 3)] .. code:: ipython3 from sparkouille.fctmr import mapper stream1 = mapper(lambda el: (el[0], el[1]+1), ens) stream1 .. parsed-literal:: Le résultat n’existe pas tant qu’on ne demande explicitement que le calcul soit faut. Il faut parcourir le résultat. .. code:: ipython3 list(stream1) .. parsed-literal:: [('a', 2), ('b', 5), ('a', 7), ('a', 4)] Et on ne peut le parcourir qu’une fois : .. code:: ipython3 list(stream1) .. parsed-literal:: [] Coût du premier élément ----------------------- Quand on a une infinité d’éléments à traiter, il est important de pouvoir regarder ce qu’un traitement donne sur les premiers éléments. Avec un mapper, cela correspond au coût d’un seul map. .. code:: ipython3 from sparkouille.fctmr import take first = lambda it: take(it, count=1) big_ens = ens * 100 .. code:: ipython3 %timeit -n 1000 list(mapper(lambda el: (el[0], el[1]+1), big_ens)) .. parsed-literal:: 124 µs ± 15.6 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each) .. code:: ipython3 %timeit -n 1000 first(mapper(lambda el: (el[0], el[1]+1), big_ens)) .. parsed-literal:: 2.46 µs ± 451 ns per loop (mean ± std. dev. of 7 runs, 1000 loops each) Reducer ------- Un vrai *reducer* réduit les éléments d’un ensemble, il ne répartit pas les données. En pratique, on réduit rarement un ensemble qu’on n’a pas distribué au préalable, comme avec un *groupby*. On ne réduit pas toujours non plus un ensemble à une seule ligne. On empile les opérations de streaming, on repousse également le moment d’évaluer. La distribution s’effectue selon une clé qui est hashée (voir `Hash et distribution `__). La première lambda fonction décrit ce qu’est cette clé, le premier élément du couple dans ce cas. .. code:: ipython3 from sparkouille.fctmr import reducer stream1 = mapper(lambda el: (el[0], el[1]+1), ens) stream2 = reducer(lambda el: el[0], stream1, asiter=False) stream2 .. parsed-literal:: .. code:: ipython3 list(stream2) .. parsed-literal:: [('a', [('a', 2), ('a', 4), ('a', 7)]), ('b', [('b', 5)])] Dans cet exemple, le *reducer* réduit chaque groupe à un seul résultat qui est l’ensemble des éléments. Quel est le coup du premier élément… .. code:: ipython3 def test2(ens, one=False): stream1 = mapper(lambda el: (el[0], el[1]+1), ens) stream2 = reducer(lambda el: el[0], stream1, asiter=False) return list(stream2) if one else first(stream2) %timeit -n 1000 test2(big_ens) .. parsed-literal:: 1.75 µs ± 409 ns per loop (mean ± std. dev. of 7 runs, 1000 loops each) .. code:: ipython3 %timeit -n 1000 test2(big_ens, one=True) .. parsed-literal:: 720 µs ± 31.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each) C’est plus court mais pas significativement plus court. Cela correspond au coût d’un tri de l’ensemble des observations et du coût de la construction du premier groupe. Reducer et tri -------------- Un stream est infini en théorie. En pratique il est fini mais on ne sait pas si un ou plusieurs groupes entiers tiendraient en mémoire. Une façon de faire est de limiter la présence des données en mémoire à un seul groupe et pour cela, il faut d’abord trier les données selon les clés. Ce n’est pas indispensable mais dans le pire des cas, c’est une bonne option. On pourrait avoir un stream comme suit : .. code:: ipython3 pas_cool = [(chr(int(c) + 96), i) for i, c in enumerate(str(11111111 ** 2))] pas_cool .. parsed-literal:: [('a', 0), ('b', 1), ('c', 2), ('d', 3), ('e', 4), ('f', 5), ('g', 6), ('h', 7), ('g', 8), ('f', 9), ('e', 10), ('d', 11), ('c', 12), ('b', 13), ('a', 14)] Le groupe *a* est au début et à la fin, si on regroupe en mémoire, le groupe associé à *a* doit rester en mémoire du début à la fin. On ne sait jamais si un groupe ne va pas réapparaître plus tard. En triant, on est sûr. Un autre map ------------ On ajoute un dernier map qui fait la somme des éléments de chaque groupe. .. code:: ipython3 def sum_gr(key_gr): key, gr = key_gr return key, sum(e[1] for e in gr) stream1 = mapper(lambda el: (el[0], el[1]+1), ens) stream2 = reducer(lambda el: el[0], stream1) stream3 = map(sum_gr, stream2) stream3 .. parsed-literal:: .. code:: ipython3 list(stream3) .. parsed-literal:: [('a', 13), ('b', 5)] Combiner ou join ---------------- Un *combiner* ou *join* permet de fusionner deux bases de données qui ont en commun une clé. .. code:: ipython3 from sparkouille.fctmr import combiner stream1 = mapper(lambda el: (el[0], el[1]+1), ens) stream2 = reducer(lambda el: el[0], stream1) stream3 = map(sum_gr, stream2) stream4 = mapper(lambda el: (el[0], el[1]+10), pas_cool) comb = combiner(lambda el: el[0], stream3, lambda el: el[0], stream4) comb .. parsed-literal:: .. code:: ipython3 list(comb) .. parsed-literal:: [(('a', 13), ('a', 10)), (('a', 13), ('a', 24)), (('b', 5), ('b', 11)), (('b', 5), ('b', 23))] Le coût du premier élément est un peu plus compliqué à inférer, cela dépend beaucoup des données. .. code:: ipython3 def job(ens, ens2, one=False, sens=True): stream1 = mapper(lambda el: (el[0], el[1]+1), ens) stream2 = reducer(lambda el: el[0], stream1) stream3 = map(sum_gr, stream2) stream4 = mapper(lambda el: (el[0], el[1]+10), ens2) if sens: comb = combiner(lambda el: el[0], stream3, lambda el: el[0], stream4) else: comb = combiner(lambda el: el[0], stream4, lambda el: el[0], stream3) return list(comb) if one else first(comb) %timeit -n 1000 job(big_ens, pas_cool) .. parsed-literal:: 2.97 µs ± 793 ns per loop (mean ± std. dev. of 7 runs, 1000 loops each) .. code:: ipython3 %timeit -n 1000 job(big_ens, pas_cool, sens=False) .. parsed-literal:: 3.15 µs ± 1.16 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each) .. code:: ipython3 %timeit -n 1000 job(big_ens, pas_cool, one=True) .. parsed-literal:: 401 µs ± 6.73 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each) .. code:: ipython3 %timeit -n 1000 job(big_ens, pas_cool, one=True, sens=False) .. parsed-literal:: 389 µs ± 10.7 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each) Il y a différentes façons de coder un *combiner*, l’une d’elle consiste à réduire chacun des deux streams puis à faire le produit croisé de chaque groupe assemblé. Reducers récursifs ------------------ C’est pas loin d’être un abus de langage, disons que cela réduit la dépendance au tri. Un exemple. .. code:: ipython3 def sum_gr(key_gr): key, gr = key_gr return key, sum(e[1] for e in gr) def job_recursif(ens): stream2 = reducer(lambda el: el[0], ens) stream3 = map(sum_gr, stream2) return list(stream3) job_recursif(ens) .. parsed-literal:: [('a', 10), ('b', 4)] Et maintenant, on coupe en deux : .. code:: ipython3 n = len(ens) // 2 job_recursif(ens[:n]) .. parsed-literal:: [('a', 1), ('b', 4)] .. code:: ipython3 job_recursif(ens[n:]) .. parsed-literal:: [('a', 9)] Et maintenant : .. code:: ipython3 job_recursif( job_recursif(ens[:n]) + job_recursif(ens[n:])) .. parsed-literal:: [('a', 10), ('b', 4)] Le job ainsi écrit est associatif en quelque sorte. Cela laisse plus de liberté pour la distribution car on peut maintenant distribuer des clés identiques sur des machines différentes puis réappliquer le *reducer* sur les résultats de la première salve. C’est d’autant plus efficace que le *reducer* réduit beaucoup les données. Il reste à voir le cas d’un *reducer* **non récursif**. .. code:: ipython3 def mean(ens): s = 0. for i, e in enumerate(ens): s += e return s / (i + 1) def mean_gr(key_gr): key, gr = key_gr return key, mean(e[1] for e in gr) def job_non_recursif(ens): stream2 = reducer(lambda el: el[0], ens) stream3 = map(mean_gr, stream2) return list(stream3) job_non_recursif(ens) .. parsed-literal:: [('a', 3.3333333333333335), ('b', 4.0)] .. code:: ipython3 n = len(ens) // 2 job_non_recursif(ens[:n]) .. parsed-literal:: [('a', 1.0), ('b', 4.0)] .. code:: ipython3 job_non_recursif(ens[n:]) .. parsed-literal:: [('a', 4.5)] .. code:: ipython3 job_non_recursif( job_non_recursif(ens[:n]) + job_non_recursif(ens[n:])) .. parsed-literal:: [('a', 2.75), ('b', 4.0)] Ce *job* ne doit pas être distribué n’importe comment.