2A.i - programmation fonctionnelle

Links: notebook, html, PDF, python, slides, GitHub

Itérateur, générateur, programmation fonctionnelle, tout pour éviter de charger l’intégralité des données en mémoire et commencer les calculs le plus vite possible.

from jyquickhelper import add_notebook_menu
add_notebook_menu()

Données : twitter_for_network_100000.db.zip

import pyensae
pyensae.download_data("twitter_for_network_100000.db.zip")
['twitter_for_network_100000.db']

Programmation fonctionnelle

Fonction pure, tests et modularité

La programmation fonctionnelle se concentre sur la notion de fonction, comme son nom l’indique, et plus précisément de fonction pure.
Une fonction pure est une fonction:
  • dont le résultat dépend uniquement des entrées

  • qui n’a pas d’effet de bord

def sorted_1(l):
    l.sort()
    return l

a = [4,3,2,1]
print(sorted_1(a))
print(a)

a = [4,3,2,1]
print(sorted(a))
print(a)

import random
l = list(range(100000))
random.shuffle( l )

%timeit l.sort()
%timeit sorted(l)
[1, 2, 3, 4]
[1, 2, 3, 4]
[1, 2, 3, 4]
[4, 3, 2, 1]
The slowest run took 23.73 times longer than the fastest. This could mean that an intermediate result is being cached
100 loops, best of 3: 3.96 ms per loop
100 loops, best of 3: 4.81 ms per loop

La programmation fonctionnelle est à mettre en contraste par rapport à la programmation orientée objet. L’objet est plus centré sur la représentation, la fonction sur l’action l’entrée et le résultat. Il existe des langages orientés fonctionnel, comme lisp. Elle présente en effet des avantages considérables sur au moins deux points essentiels en informatique:

  • tests

  • modularité

Un exemple concret, les webservices en python. Ceux-ci sont définies comme des fonctions, ce qui permet notamment de facilement les rendre compatibles avec différents serveurs web, en donnant à ceux-ci non pas le webservice directement, mais une composition de celui-ci.

La composition est une façon très puissante de modifier la comportement d’un objet, car elle n’impacte pas l’objet lui-même.

import os, psutil, gc, sys
if not sys.platform.startswith("win"):
    import resource

def memory_usage_psutil():
    gc.collect()
    process = psutil.Process(os.getpid())
    mem = process.memory_info()[0] / float(2 ** 20)

    print( "Memory used : %i MB" % mem )
    if not sys.platform.startswith("win"):
        print( "Max memory usage : %i MB" % (resource.getrusage(resource.RUSAGE_SELF).ru_maxrss//1024) )
memory_usage_psutil()
Memory used : 109 MB

Fonctions pour la gestion de grosses données : laziness

Lors de la gestion de grosses données, le point crucial est que l’on ne veut pas stocker de valeurs intermédiaires, parce que celle-ci pourraient prendre trop de place en mémoire. Par exemple pour calculer la moyenne du nombre de followers dans la base de donnée, il n’est pas indispensable de stocker tous les users en mémoire.

Les fonctions dans cytoolz sont dites “lazy”, ce qui signifie qu’elles ne s’exécutent effectivement que quand nécessaire. Cela évite d’utiliser de la mémoire pour stocker un résultat intermédiaire.

Par exemple la cellule ci-dessous s’exécute très rapidement, et ne consomme pas de mémoire. En effet a sert à représenter l’ensemble des nombres de 0 à 1000000 au carré, mais ils ne sont pas caculés immédiatement.

a = (it**2 for it in range(1000001))
%timeit a = (it**2 for it in range(1000001))
print( type(a) )
The slowest run took 8.45 times longer than the fastest. This could mean that an intermediate result is being cached
100000 loops, best of 3: 1.77 µs per loop
<class 'generator'>

Ici on calcule la somme de ces nombres, et c’est au moment où on appelle la fonction sum que l’on calcule effectivement les carrés. Mais du coup cette opération est beaucoup plus lente que si l’on avait déjà calculé ces nombres.

%timeit sum( (it**2 for it in range(1000001)) )
sum( a )
1 loops, best of 3: 888 ms per loop
333333833333500000

Ma consommation mémoire n’a quasiment pas bougé.

memory_usage_psutil()
Memory used : 109 MB

Ci-dessous, on n’a simplement remplacé les parenthèses () par des crochets [], mais cela suffit pour dire que l’on veut effectivement calculer ces valeurs et en stocker la liste. Cela est plus lng, consomme de la mémoire, mais en calculer la somme sera beaucoup plus rapide.

b = [it**2 for it in range(1000001)]
%timeit b = [it**2 for it in range(1000001)]
print(type(b))
1 loops, best of 3: 973 ms per loop
<class 'list'>
print( sum(b) )
%timeit sum(b)
333333833333500000
10 loops, best of 3: 72.4 ms per loop
memory_usage_psutil()
Memory used : 149 MB

Attention à ce que a est objet de type iterateur, qui retient sa position. Autrement dit, on ne peut l’utiliser qu’une seule fois.

sum(a)
0

Si on a besoin de le réutiliser, on peut soit stocker les valeurs, soit le mettre dans une fonction

def f():
    return (it**2 for it in range(1000001))
print( sum(f()) )
%timeit sum(f())
333333833333500000
1 loops, best of 3: 1.01 s per loop

Exemple cytoolz / twitters data

Liens vers les données :

import pyensae
pyensae.download_data("twitter_for_network_100000.db.zip")
['twitter_for_network_100000.db']
memory_usage_psutil()
Memory used : 149 MB
import cytoolz as ct # import groupby, valmap, compose
import cytoolz.curried as ctc ## pipe, map, filter, get
import sqlite3
import pprint
try:
    import ujson as json
except:
    print("ujson not available")
    import json

tw_users_limit = 1000000
conn_sqlite = sqlite3.connect("twitter_for_network_100000.db")
cursor_sqlite = conn_sqlite.cursor()

Note : sur internet vous verez plus souvent l’exemple json.loads. ujson est simplement une version plus rapide. Elle n’est pas indispensable

import ujson as ujson_test
import json as json_test

cursor_sqlite.execute("SELECT content FROM tw_users LIMIT 1")
tw_user_json = cursor_sqlite.fetchone()[0]

%timeit ujson_test.loads( tw_user_json )
%timeit json_test.loads( tw_user_json )
The slowest run took 4.26 times longer than the fastest. This could mean that an intermediate result is being cached
10000 loops, best of 3: 16.9 µs per loop
10000 loops, best of 3: 28.6 µs per loop
tw_users_limit = 1000000

Ci-dessous on charge en mémoire la liste des profils utilisateurs de la table tw_users. Il est conseillé de tester vos fonctions sur des extraits de vos données qui tiennent en mémoire. Par contre ensuite il faudra éviter de les charger en mémoire.

## With storing in memory
cursor_sqlite.execute("SELECT id, content FROM tw_users LIMIT %s" % tw_users_limit)
tw_users_as_json = list( ctc.map( json.loads, ctc.pluck( 1, cursor_sqlite ) ) )
len(tw_users_as_json)
100071

On a dans ces deux exemples deux fonctions des plus classiques :

  • ctc.pluck => prend une séquence en entrée et renvoit une séquence de de l’item sélectionnée

  • ctc.map => applique une fonction à chaque élément de la séquence

## Without storing in memory
cursor_sqlite.execute("SELECT id, content FROM tw_users LIMIT %s" % tw_users_limit)
tw_users_as_json = ctc.pluck("followers_count", # Même chose qu'avec le 1, mais on utilise une clé
                             ctc.map(json.loads, # Map applique la fonction json.loads à tous les objets
                                     ctc.pluck(1, # Le curseur renvoit les objets sous forme de tuple des colonnes
                                                  # pluck(1, _) est l'équivalent de (it[1] for it in _)
                                               cursor_sqlite) ) )
sum(tw_users_as_json)
108086205
cursor_sqlite.execute("SELECT id, content FROM tw_users LIMIT %s" % tw_users_limit)
%timeit -n 1 tw_users_as_json = ctc.pluck("followers_count", ctc.map(json.loads, ctc.pluck(1, cursor_sqlite) ) )
1 loops, best of 3: 5.99 µs per loop
## Without storing in memory
def get_tw_users_as_json():
    cursor_sqlite.execute("SELECT content FROM tw_users LIMIT %s" % tw_users_limit)
    return ctc.pluck("followers_count", ctc.map(json.loads, ctc.pluck(0, cursor_sqlite) ) )
sum(get_tw_users_as_json())
108086205
sum(get_tw_users_as_json())
108086205

Quelques exemples :

  • count_all_followers_cyt() fait la somme des followers

  • count_all_followers_cyt_by_location() fait la somme par location différente (nous verrons ensuite que cette donnée, du texte brute, mériterait des traitements particuliers)

tw_users_limit = 1000000
import ujson

def get_users_cyt():
    cursor_sqlite.execute("SELECT content FROM tw_users LIMIT %s" % tw_users_limit)
    return ct.map(ujson.loads, ct.pluck( 0, cursor_sqlite ) )

def count_all_followers_cyt():
    return sum( ct.pluck("followers_count", get_users_cyt() ) )

def count_all_followers_cyt_by_location():
    return ct.reduceby( "location", lambda x, item: x + item["followers_count"], get_users_cyt(), 0 )

%timeit count_all_followers_cyt()
%timeit count_all_followers_cyt_by_location()
1 loops, best of 3: 2.52 s per loop
1 loops, best of 3: 2.75 s per loop
memory_usage_psutil()
Memory used : 156 MB

Leur équivalent en code standard. A noter que la version fonctionnelle n’est pas significativement plus rapide.

from collections import defaultdict

def count_all_followers():
    cursor_sqlite.execute("SELECT content FROM tw_users LIMIT %s" % tw_users_limit)
    nb_totals_followers_id = 0
    for it_json in cursor_sqlite:
        nb_totals_followers_id += json.loads(it_json[0])[ "followers_count" ]
    return nb_totals_followers_id

def count_all_followers_by_location():
    cursor_sqlite.execute("SELECT content FROM tw_users LIMIT %s" % tw_users_limit)
    res = defaultdict(int)
    for it_json in cursor_sqlite:
        it_json = json.loads(it_json[0])
        res[it_json["location"]] += it_json[ "followers_count" ]
    return res

%timeit count_all_followers()
%timeit count_all_followers_by_location()
1 loops, best of 3: 2.66 s per loop
1 loops, best of 3: 2.72 s per loop
cursor_sqlite.execute("SELECT content FROM tw_users LIMIT 10000")
%timeit -n1000 first_content = cursor_sqlite.fetchone()[0]
cursor_sqlite.execute("SELECT content FROM tw_users LIMIT 10000")
first_content = cursor_sqlite.fetchone()[0]
%timeit json.loads( first_content )
1000 loops, best of 3: 11.3 µs per loop
100000 loops, best of 3: 16.6 µs per loop

Cytoolz functions

cytoolz est une implémentation plus performante de la librairie toolz, il faut donc vous référer à la documentation de celle-ci.

http://toolz.readthedocs.org/en/latest/api.html

A noter qu’il y a deux packages, cytoolz et cytoolz.curried, ils contiennent les mêmes fonctions, seulement celles du second supporte le “curry”, l’évaluation partielle (voir plus bas). Cela peut représenter un petit overhead.

les basiques

cytoolz.curried.pluck => sélectionne un item dans chaque élément d’une séquence, à partir d’une clé ou d’un index
cytoolz.curried.map => applique une fonction à tous les éléments d’une séquence
import cytoolz as ct
import cytoolz.curried as ctc

cursor_sqlite.execute("SELECT id, content FROM tw_users LIMIT %s" % tw_users_limit)
a = ctc.pluck( 1, cursor_sqlite )
b = ctc.map( json.loads, a )
c = ctc.pluck("followers_count", b)
print( sum(c) )
108086205

A noter que toutes les fonctions cytoolz du package cytoolz.curry supportent les évaluations partielles, i.e. construire une fonction d’un argument à partir d’une fonction de deux arguments (ou plus généralement n-1 arguments à partir de n)

cursor_sqlite.execute("SELECT id, content FROM tw_users LIMIT %s" % tw_users_limit)

pl_1 = ctc.pluck(1) ## ctc.pluck prend 2 arguments, cette fonction est donc une fonction d'un argument
m_loads = ctc.map(json.loads)
pl_fc = ctc.pluck("followers_count")

a = pl_1( cursor_sqlite )
b = m_loads(a)
c = pl_fc(b)
print( sum(c) )
108086205
tw_users_limit = 10000
cursor_sqlite.execute("SELECT id, content FROM tw_users LIMIT %s" % tw_users_limit)
sum( pl_fc( m_loads( pl_1 ( cursor_sqlite ) ) ) )
4284281
cytoolz.compose permet de créer une fonction par un chaînage de fonction.
Le résultat de chaque fonction est donné en argument à la fonction suivante, chaque fonction doit donc ne prendre qu’un seul argument, d’où l’intérêt de l’évaluation partielle. Comme en mathématique, les fonctions sont évaluées de droite à gauche

count_nb_followers( cursor_sqlite ) est donc équivalent à sum( pl_fc( get_json_seq( cursor_sqlite ) ) )

get_json_seq = ct.compose( m_loads, pl_1 )
count_nb_followers = ct.compose( sum, pl_fc, get_json_seq )
cursor_sqlite.execute("SELECT id, content FROM tw_users LIMIT %s" % tw_users_limit)
count_nb_followers( cursor_sqlite )
4284281

cytoolz.pipe a un comportement similaire, avec une différence importante, l’ordre des fonctions est inversé (ce qui le rend plus lisible, à mon humble avis)

ct.pipe(
    cursor_sqlite.execute("SELECT id, content FROM tw_users LIMIT %s" % tw_users_limit),
    pl_1,
    m_loads,
    pl_fc,
    sum )
4284281
cursor_sqlite.execute("SELECT id, content FROM tw_users LIMIT %s" % tw_users_limit)
print( count_nb_followers( ct.take_nth(2, cursor_sqlite ) ) ) # take_nth, prendre un élément sur n
cursor_sqlite.execute("SELECT id, content FROM tw_users LIMIT %s" % tw_users_limit)
print( count_nb_followers( ct.take_nth(2, ct.drop(1, cursor_sqlite ) ) ) ) # drop, enlève les n premiers éléments
2951686
1332595
cytoolz.take_nth => prend un élément sur n
cytoolz.drop => enlève n éléments
tw_users_limit
10000
Il existe beaucoup de fonctions, dont un certain nombre peuvent faire double emploi.
Par exemple countby prend une fonction et une séquence et compte le nombre de résultat de la fonction appliquée à chaque élément de la séquence, ce qui équivalent à appliquer une fonction à tous les éléments de la séquence, puis calculer la fréquence des résultats (opération effectuée avec frequencies et pluck)
import collections
from operator import ge, le

cursor_sqlite.execute("SELECT id, content FROM tw_users LIMIT %s" % tw_users_limit)
%timeit -n1 ct.countby(ctc.get("location"), get_json_seq( cursor_sqlite ) )

cursor_sqlite.execute("SELECT id, content FROM tw_users LIMIT %s" % tw_users_limit)
%timeit -n1 ct.frequencies(ct.pluck("location", get_json_seq( cursor_sqlite ) ) )

def count_location_frequency(c):
    counter = collections.Counter()
    for it_json in c:
        it_json = json.loads( it_json[1] )
        counter[ it_json["location"] ] += 1
    return counter

cursor_sqlite.execute("SELECT id, content FROM tw_users LIMIT %s" % tw_users_limit)
%timeit -n1 count_location_frequency(cursor_sqlite)

get_freq_by_loc = ct.compose( ct.frequencies, ctc.pluck("location"), get_json_seq )

cursor_sqlite.execute("SELECT id, content FROM tw_users LIMIT %s" % tw_users_limit)
pprint.pprint( ct.frequencies( get_freq_by_loc(cursor_sqlite).values() ) )

cursor_sqlite.execute("SELECT id, content FROM tw_users LIMIT %s" % tw_users_limit)
pprint.pprint( ct.valfilter( ct.curry(le,10), get_freq_by_loc(cursor_sqlite) ) )
The slowest run took 7732.75 times longer than the fastest. This could mean that an intermediate result is being cached
1 loops, best of 3: 46.6 µs per loop
The slowest run took 46923.46 times longer than the fastest. This could mean that an intermediate result is being cached
1 loops, best of 3: 5.56 µs per loop
The slowest run took 36110.35 times longer than the fastest. This could mean that an intermediate result is being cached
1 loops, best of 3: 8.55 µs per loop
{1: 1603,
 2: 107,
 3: 32,
 4: 23,
 5: 16,
 6: 7,
 7: 4,
 8: 1,
 10: 4,
 11: 1,
 12: 1,
 14: 2,
 15: 1,
 16: 1,
 19: 1,
 20: 1,
 25: 1,
 26: 1,
 33: 1,
 39: 1,
 43: 1,
 46: 1,
 47: 1,
 170: 1,
 288: 1,
 6959: 1}
{'': 6959,
 'Abidjan': 10,
 'Bordeaux': 14,
 'Bruxelles': 12,
 'FRANCE': 26,
 'France': 170,
 'Lille': 33,
 'London': 10,
 'Lyon': 25,
 'Marseille': 19,
 'Montpellier': 11,
 'Nantes': 14,
 'Nice': 10,
 'PARIS': 16,
 'Paris': 288,
 'Paris ': 15,
 'Paris, France': 39,
 'Paris, Ile-de-France': 46,
 'Toulouse': 20,
 'Tunisie': 10,
 'france': 43,
 'paris': 47}
memory_usage_psutil()
Memory used : 118 MB

A priori il est préférable de choisir l’ordre de fonctions qui sépare les plus les opérations. Ici countby fait les deux à la fois (appliquer la fonction et calculer le nombre d’occurences).

Les deux derniers que nous allons voir sont reduce, reduceby et groupby. Attention à groupby, celle-ci crée un dictionnaire de liste des éléments donnés en entrées, elle forcera donc le chargement en mémoire de toutes les données.

groupby prend en entrée une clé et une séquence, et groupe les objets pour lesquels cette clé a la même valeur. Son retour sera un dictionnaire dont les clés sont les valeurs prises par la clé (ci-dessous les différentes valeurs de “location” dans les utilisateurs) et les valeurs les listes des objets ayant cette valeur pour la clé.

liste_animaux = [
    { "animal":"chat"  , "age":15,"npm":"Roudy"},
    { "animal":"chien" , "age": 5,"npm":"Medor"},
    { "animal":"chien" , "age": 3,"npm":"Fluffy"},
    { "animal":"chien" , "age": 2,"npm":"Max"},
    { "animal":"chat"  , "age":10,"npm":"Teemo"},
    { "animal":"chat"  , "age":25,"npm":"Garfied"}
]

ct.groupby( "animal", liste_animaux )
{'chat': [{'age': 15, 'animal': 'chat', 'npm': 'Roudy'},
  {'age': 10, 'animal': 'chat', 'npm': 'Teemo'},
  {'age': 25, 'animal': 'chat', 'npm': 'Garfied'}],
 'chien': [{'age': 5, 'animal': 'chien', 'npm': 'Medor'},
  {'age': 3, 'animal': 'chien', 'npm': 'Fluffy'},
  {'age': 2, 'animal': 'chien', 'npm': 'Max'}]}
tw_users_limit = 100000
cursor_sqlite.execute("SELECT id, content FROM tw_users LIMIT %s" % tw_users_limit)
for i, (k, v) in enumerate( ct.valmap( ct.count, ct.groupby( "location",  get_json_seq( cursor_sqlite ) ) ).items() ):
    print(repr(k) + " : " + repr(v))
    if i == 50:
        break
'' : 69975
'Communauté Valencienne, Espagne' : 1
'Coral Springs, Fl' : 1
'Abbiategrasso' : 1
'Getafe - Bordeaux - Vigo' : 1
'Piscop, France' : 1
'Roma - Oslo' : 1
'tarbes' : 2
'Samoa' : 1
'E♡' : 1
'Epernay sous gevrey' : 1
'Porto-Vecchio, Corse' : 2
'Planète Terre®' : 1
'donzere' : 1
'Laval, take it or leave it' : 1
'Albi - Bordeaux ' : 1
'Vannes ' : 1
'Paris - Strasbourg' : 2
'Itabashi-ku, Tokyo' : 1
'Paris, Bilbao, Dieppe' : 1
'hello' : 1
'Francfort-sur-le-Main, Hesse' : 1
'Issy Les Moulineaux' : 1
'montgenost' : 1
'France/Toulouse' : 1
'UAE DUBAI' : 1
'Paris ~ Somewhere' : 1
'Vezin le Coquet' : 1
'تازة المغرب' : 1
'Paris 16' : 1
'Senegal' : 29
'Paris XVème' : 1
'tunis' : 29
'st cyprien plage' : 1
'Dhaka,Bangladesh' : 1
'Saône-et-Loire (71)' : 1
'panama' : 1
'63720 chappes' : 1
'Poueyferré' : 1
'Yaoundé-Cameroon' : 1
'Maroc,Meknès' : 1
'Bucarest' : 2
'france strasbourg' : 1
'dans un monde loin du vôtre' : 1
'nantes/paris' : 1
'Elassona, Greece' : 1
'San Francisco Bay Area' : 3
'Vannes, Bretagne' : 1
'AIX / MARSEILLE / PACA' : 1
'Castellon,España' : 1
'Roque perez' : 1

A noter que si vous voulez utiliser les opérateurs usuels (+, *, etc …), vous pouvez les obtenir sous forme de fonctions dans le package operator

reduce applique une fonction aux deux premiers éléments d’une séquence (ou au premier élément et une valeur initiale) et applique ensuite cette fonction au total et à l’élement suivant.

from operator import add, mul
print( ct.reduce( add, [1,2,3,4,5] ) ) ## calcule add(1,2), puis add(_, 3), add(_, 4), etc ...
print( ct.reduce( mul, [1,2,3,4,5] ) )
15
120

Du coup si votre résultat n’est pas de même nature que vos éléments, la syntaxe ci-dessus ne fonctionnera pas. Dans ce cas, il faut rajouter une valeur initiale.

Dans ce cas la fonction de réduction est appliquée à :

  1. f(valeur_initiale, premier_élément)

  2. f(résultat_précédent, deuxième_élément)

  3. f(résultat_précédent, troisième_élément)

tw_users_limit = 10000
cursor_sqlite.execute("SELECT id, content FROM tw_users LIMIT %s" % tw_users_limit)
ct.reduce((lambda total,elt: total + elt["followers_count"]), # Fonction pour faire la réduction
           get_json_seq( cursor_sqlite ), # séquence à réduire,
           0 # Valeur initiale
         )
4284281

reduceby fait la même chose, avec un groupement selon un critère en plus. Le code ci-dessous calcule la somme du nombre de followers par location, et filtre sur les valeurs supérieures à 10000.

from operator import le

tw_users_limit = 10000
cursor_sqlite.execute("SELECT id, content FROM tw_users LIMIT %s" % tw_users_limit)
%timeit -n1 ct.reduceby( "location", lambda x,y: x + y["followers_count"], get_json_seq( cursor_sqlite ), 0 )
cursor_sqlite.execute("SELECT id, content FROM tw_users LIMIT %s" % tw_users_limit)
ct.valfilter(ct.curry(le,10000), ## Ne sélectionne que les éléments dont la valeur est supérieure à 10000
             ct.reduceby( "location",
                         lambda x,y: x + y["followers_count"],
                         get_json_seq( cursor_sqlite ),
                         0 ))
The slowest run took 52894.00 times longer than the fastest. This could mean that an intermediate result is being cached
1 loops, best of 3: 5.56 µs per loop
{'': 476234,
 'Barcelona (Spain)': 13773,
 'Beijing China': 19296,
 'Conscience': 34254,
 'France': 243077,
 'Futuroscope 86': 10888,
 'Islamic Republic of Iran': 72745,
 'Lens,LOSC,VA,USBCO,Reims,ESTAC': 16054,
 'Libérateur enraciné': 19987,
 'London, UK': 37646,
 'Longueuil, Québec': 43522,
 'Melun City': 18401,
 'Paris': 1506591,
 'Paris /France': 251205,
 'Paris, France': 45841,
 'Plein Sud': 17565,
 'Poitiers, Vienne (86)': 102472,
 'ROUEN (76)FRANCE': 10757,
 'Rosslyn, Va.': 278088,
 'St-Raymond on the Beach': 44569,
 'Tunisia': 16602,
 'Worldwide': 11040,
 'http://www.13or-du-hiphop.fr': 22143,
 'paris ': 366667}