Mapper, Reducers customisés avec SQL

Links: notebook, html, PDF, python, slides, GitHub

Ce notebook propose l’utilisation de SQL avec SQLite pour manipuler les données depuis un notebook (avec le module sqlite3).

from jyquickhelper import add_notebook_menu
add_notebook_menu()
%matplotlib inline
import matplotlib.pyplot as plt
plt.style.use('ggplot')
import pyensae
from pyquickhelper.helpgen import NbImage

Représentation

Le module pandas manipule des tables et c’est la façon la plus commune de représenter les données. Lorsque les données sont multidimensionnelles, on distingue les coordonnées des valeurs :

NbImage("cube1.png")
../_images/sql_map_reduce_5_0.png

Dans cet exemple, il y a :

  • 3 coordonnées : Age, Profession, Annéee

  • 2 valeurs : Espérance de vie, Population

On peut représenter les donnés également comme ceci :

NbImage("cube2.png")
../_images/sql_map_reduce_7_0.png

C’est assez simple. Prenons un exemple : table de mortalité de 1960 à 2010 qu’on récupère à l’aide de la fonction table_mortalite_euro_stat. C’est assez long (4-5 minutes) sur l’ensemble des données car elles doivent être prétraitées (voir la documentation de la fonction). Pour écouter, il faut utiliser le paramètre stop_at.

from sparkouille.datasets import table_mortalite_euro_stat
table_mortalite_euro_stat()
'mortalite.txt'
import os
os.stat("mortalite.txt")
os.stat_result(st_mode=33206, st_ino=33776997205344496, st_dev=1182297439, st_nlink=1, st_uid=0, st_gid=0, st_size=112417907, st_atime=1523378271, st_mtime=1523378288, st_ctime=1523378271)
import pandas
df = pandas.read_csv("mortalite.txt", sep="\t", encoding="utf8", low_memory=False)
df.head()
annee valeur age age_num indicateur genre pays
0 2016 0.00044 Y01 1.0 DEATHRATE F AL
1 2015 0.00043 Y01 1.0 DEATHRATE F AL
2 2014 0.00043 Y01 1.0 DEATHRATE F AL
3 2016 0.00035 Y01 1.0 DEATHRATE F AM
4 2015 0.00035 Y01 1.0 DEATHRATE F AM

Les indicateurs pour deux âges différents :

df[ ((df.age=="Y60") | (df.age=="Y61")) & (df.annee == 2000) & (df.pays=="FR") & (df.genre=="F")]
annee valeur age age_num indicateur genre pays
92886 2000 5.020000e-03 Y60 60.0 DEATHRATE F FR
94532 2000 4.860000e-03 Y61 61.0 DEATHRATE F FR
516236 2000 2.580000e+01 Y60 60.0 LIFEXP F FR
517871 2000 2.490000e+01 Y61 61.0 LIFEXP F FR
938100 2000 5.010000e-03 Y60 60.0 PROBDEATH F FR
939746 2000 4.850000e-03 Y61 61.0 PROBDEATH F FR
1361922 2000 9.949900e-01 Y60 60.0 PROBSURV F FR
1363568 2000 9.951500e-01 Y61 61.0 PROBSURV F FR
1785194 2000 9.307600e+04 Y60 60.0 PYLIVED F FR
1786829 2000 9.261800e+04 Y61 61.0 PYLIVED F FR
2206508 2000 9.331000e+04 Y60 60.0 SURVIVORS F FR
2208143 2000 9.284300e+04 Y61 61.0 SURVIVORS F FR
2627855 2000 2.405594e+06 Y60 60.0 TOTPYLIVED F FR
2629490 2000 2.312517e+06 Y61 61.0 TOTPYLIVED F FR

Données trop grosses pour tenir en mémoire : SQLite

On charge une grosse base de données (assez petite pour que la séance ne soit pas trop longue).

df.shape
(2956833, 7)

Les données sont trop grosses pour tenir dans une feuille Excel et les consulter il n’y a pas d’autres moyens que d’en regarder des extraits. Que passe-t-il quand les données sont encore plus grosses et qu’elles ne tiennent pas en mémoire ? Quelques solutions :

  • augmenter la mémoire de l’ordinateur, avec 20 Go, on peut faire beaucoup de choses,

  • stocker les données dans un serveur SQL,

  • stocker les données sur un système distribué (cloud, Hadoop, …)

La seconde option n’est pas toujours simple, il faut installer un serveur SQL. Pour aller plus vite, on peut simplement utiliser SQLite qui est une façon de faire du SQL sans serveur (cela prend quelques minutes). On utilise la méthode to_sql.

import sqlite3
from pandas.io import sql
cnx = sqlite3.connect('mortalite.db3')
try:
    df.to_sql(name='mortalite', con=cnx)
except ValueError as e:
    if "Table 'mortalite' already exists" not in str(e):
        # seulement si l'erreur ne vient pas du fait que cela
        # a déjà été fait
        raise e
# on peut ajouter d'autres dataframe à la table comme si elle était créée par morceau
# voir le paramètre if_exists de la fonction to_sql

On peut maintenant récupérer un morceau avec la fonction read_sql.

import pandas
example = pandas.read_sql('SELECT * FROM mortalite WHERE age_num==50 LIMIT 5', cnx)
example
index annee valeur age age_num indicateur genre pays
0 74015 2016 0.00148 Y50 50.0 DEATHRATE F AL
1 74016 2015 0.00174 Y50 50.0 DEATHRATE F AL
2 74017 2014 0.00196 Y50 50.0 DEATHRATE F AL
3 74018 2016 0.00283 Y50 50.0 DEATHRATE F AM
4 74019 2015 0.00296 Y50 50.0 DEATHRATE F AM

L’ensemble des données restent sur le disque, seul le résultat de la requête est chargé en mémoire. Si on ne peut pas faire tenir les données en mémoire, il faut soit en obtenir une vue partielle (un échantillon aléatoire, un vue filtrée), soit une vue agrégrée. Pour finir, il faut fermer la connexion pour laisser d’autres applications ou notebook modifier la base ou tout simplement supprimer le fichier.

cnx.close()

Sous Windows, on peut consulter la base avec le logiciel SQLiteSpy.

NbImage("sqlite.png")
../_images/sql_map_reduce_23_0.png

Sous Linux ou Max, on peut utiliser une extension Firefox SQLite Manager. Dans ce notebook, on utilisera la commande magique %%SQL du module pyensae :

%load_ext pyensae
%SQL_connect mortalite.db3
<pyensae.sql.sql_interface_database.InterfaceSQLDatabase at 0x257a407d400>
%SQL_tables
['mortalite']
%SQL_schema mortalite
{0: ('index', int),
 1: ('annee', int),
 2: ('valeur', float),
 3: ('age', str),
 4: ('age_num', float),
 5: ('indicateur', str),
 6: ('genre', str),
 7: ('pays', str)}
%%SQL
SELECT COUNT(*) FROM mortalite
COUNT(*)
0 2956833
%SQL_close

Cas 1 : filtrer pour créer un échantillon aléatoire

Si on ne peut pas faire tenir les données en mémoire, on peut soit regarder les premières lignes soit prendre un échantillon aléatoire. Deux options :

La première fonction est simple :

sample = df.sample(frac=0.1)
sample.shape, df.shape
((295683, 7), (2956833, 7))

Je ne sais pas si cela peut être réalisé sans charger les données en mémoire. Si les données pèsent 20 Go, cette méthode n’aboutira pas. Pourtant, on veut juste un échantillon pour commencer à regarder les données. On utilise la seconde option avec create_function et la fonction suivante :

import random  #loi uniforme
def echantillon(proportion):
    return 1 if random.random() < proportion else 0
import sqlite3
from pandas.io import sql
cnx = sqlite3.connect('mortalite.db3')

On déclare la fonction à la base de données.

cnx.create_function('echantillon', 1, echantillon)

On veut récupérer environ 1% de la table ? On écrit d’abord le filtre.

sample = pandas.read_sql('SELECT * FROM mortalite WHERE echantillon(0.01)', cnx)
sample.shape
(29515, 8)
sample.head()
index annee valeur age age_num indicateur genre pays
0 50 1975 0.00134 Y01 1.0 DEATHRATE F AT
1 129 2009 0.00089 Y01 1.0 DEATHRATE F BG
2 137 2001 0.00103 Y01 1.0 DEATHRATE F BG
3 281 2001 0.00030 Y01 1.0 DEATHRATE F CZ
4 289 1993 0.00051 Y01 1.0 DEATHRATE F CZ

On ferme la connexion.

cnx.close()

Pseudo Map/Reduce avec SQLite

La liste des mots-clés du langage SQL utilisés par SQLite n’est pas aussi riche que d’autres solutions de serveurs SQL. La médiane ne semble pas en faire partie. Cependant, pour une année, un genre, un âge donné, on voudrait calculer la médiane de l’espérance de vie sur l’ensembles des pays.

import sqlite3, pandas
from pandas.io import sql
cnx = sqlite3.connect('mortalite.db3')
pays = pandas.read_sql('SELECT pays, COUNT(*) FROM mortalite GROUP BY pays', cnx)
pays.head()
pays count(*)
0 AL 5418
1 AM 10836
2 AT 84882
3 AZ 16254
4 BE 102942

Il n’y a pas le même nombre de données selon les pays, il est probable que le nombre de pays pour lesquels il existe des données varie selon les âges et les années.

query = """SELECT nb_country, COUNT(*) AS nb_rows FROM (
                SELECT annee,age,age_num, count(*) AS nb_country FROM mortalite
                WHERE indicateur=="LIFEXP" AND genre=="F"
                GROUP BY annee,age,age_num
            ) GROUP BY nb_country"""
df = pandas.read_sql(query, cnx)
df.sort_values("nb_country", ascending=False).head(n=2)
nb_country nb_rows
40 100 7
39 98 2
ax = df.plot(x="nb_country", y="nb_rows")
ax.set_title("Nombre de données par pays");
../_images/sql_map_reduce_49_0.png

Soit un nombre inconstant de pays. Le fait qu’on est 100 pays suggère qu’on ait une erreur également.

query = """SELECT annee,age,age_num, count(*) AS nb_country FROM mortalite
                WHERE indicateur=="LIFEXP" AND genre=="F"
                GROUP BY annee,age,age_num
                HAVING nb_country >= 100"""
df = pandas.read_sql(query, cnx)
df.head()
annee age age_num nb_country
0 2006 None None 100
1 2007 None None 100
2 2008 None None 100
3 2009 None None 100
4 2010 None None 100

Ce sont des valeurs manquantes. Le problème pour calculer la médiane pour chaque observation est qu’il faut d’abord regrouper les lignes de la table par indicateur puis choisir la médiane dans chaque de ces petits groupes. On s’inspire pour cela de la logique Map/Reduce et de la fonction create_aggregate.

Cas 2 : reducer customisé avec SQL

Le reducer se présente toujours sous la forme suivante :

class ReducerMediane:
    def __init__(self):
        # ???
        pass
    def step(self, value):
        # ???
        #
        pass
    def finalize(self):
        # ???
        # return ... //2 ]
        pass

Qu’on renseigne de la sorte :

class ReducerMediane:
    def __init__(self):
        self.indicateur = []
    def step(self, value):
        if value >= 0:
            self.indicateur.append(value)
    def finalize(self):
        self.indicateur.sort()
        return self.indicateur[len(self.indicateur)//2]

On le déclare ensuite à sqllite3.

cnx.create_aggregate("ReducerMediane", 1, ReducerMediane)
query = """SELECT annee,age,age_num, ReducerMediane(valeur) AS mediane FROM mortalite
                WHERE indicateur=="LIFEXP" AND genre=="F"
                GROUP BY annee,age,age_num"""
df = pandas.read_sql(query, cnx)
df.head()
annee age age_num mediane
0 1960 None NaN 66.7
1 1960 Y01 1.0 73.7
2 1960 Y02 2.0 72.8
3 1960 Y03 3.0 71.9
4 1960 Y04 4.0 71.0

Un reducer à deux entrées même si cela n’a pas beaucoup de sens ici :

class ReducerMediane2:
    def __init__(self):
        self.indicateur = []
    def step(self, value, value2):
        if value >= 0:
            self.indicateur.append(value)
        if value2 >= 0:
            self.indicateur.append(value2)
    def finalize(self):
        self.indicateur.sort()
        return self.indicateur[len(self.indicateur)//2]

cnx.create_aggregate("ReducerMediane2", 2, ReducerMediane2)
query = """SELECT annee,age,age_num, ReducerMediane2(valeur, valeur+1) AS mediane2 FROM mortalite
                WHERE indicateur=="LIFEXP" AND genre=="F"
                GROUP BY annee,age,age_num"""
df = pandas.read_sql(query, cnx)
df.head()
annee age age_num mediane2
0 1960 None NaN 66.7
1 1960 Y01 1.0 74.0
2 1960 Y02 2.0 73.2
3 1960 Y03 3.0 72.3
4 1960 Y04 4.0 71.3

Il n’est apparemment pas possible de retourner deux résultats mais on peut utiliser une ruse qui consise à les concaténer dans une chaîne de caracères.

class ReducerQuantile:
    def __init__(self):
        self.indicateur = []
    def step(self, value):
        if value >= 0:
            self.indicateur.append(value)
    def finalize(self):
        self.indicateur.sort()
        q1 = self.indicateur[len(self.indicateur)//4]
        q2 = self.indicateur[3*len(self.indicateur)//4]
        n = len(self.indicateur)
        return "%f;%f;%s" % (q1,q2,n)

cnx.create_aggregate("ReducerQuantile", 1, ReducerQuantile)
query = """SELECT annee,age,age_num, ReducerQuantile(valeur) AS quantiles FROM mortalite
                WHERE indicateur=="LIFEXP" AND genre=="F"
                GROUP BY annee,age,age_num"""
df = pandas.read_sql(query, cnx)
df.head()
annee age age_num quantiles
0 1960 None NaN 4.400000;72.800000;20
1 1960 Y01 1.0 73.000000;74.000000;10
2 1960 Y02 2.0 72.100000;73.200000;10
3 1960 Y03 3.0 71.200000;72.300000;10
4 1960 Y04 4.0 70.300000;71.300000;10

On ferme la connexion.

cnx.close()

Notion d’index

En SQL et pour de grandes tables, la notion d’index joue un rôle important pour accélérer les opérations de jointures (JOIN) ou de regroupement (GROUP BY). L’article A thorough guide to SQLite database operations in Python montre comment faire les principales opérations.