# 2A.i - Données non structurées, programmation fonctionnelle - correction

Calculs de moyennes et autres statistiques sur une base twitter au format JSON avec de la programmation fonctionnelle (module [cytoolz](https://github.com/pytoolz/cytoolz)).

In [1]:
from jyquickhelper import add_notebook_menu
add_notebook_menu()

Commencez par télécharger la base de donnée :

twitter_for_network_100000.db  
https://drive.google.com/file/d/0B6jkqYitZ0uTWjFjd3lpREpFYVE/view?usp=sharing

Vous pourrez éventuellement télécharger la base complète (3,4 millions d'utilisateurs, plutôt que 100000) ultérieurement si vous souhaitez tester vos fonctions. Ne perdez pas de temps avec ceci dans ce tp.
twitter_for_network_full.db   
https://drive.google.com/file/d/0B6jkqYitZ0uTWkR6cDZQUTlVSWM/view?usp=sharing

Vous pouvez consulter l'aide de pytoolz (même interface que cytoolz) ici :  
http://toolz.readthedocs.org/en/latest/  

La section sur l'API est particulièrement utile car elle résume bien les différentes fonctions :
http://toolz.readthedocs.org/en/latest/api.html  

Ensuite exécutez la cellule suivante :

Liens alternatifs :

* [twitter_for_network_100000.db.zip](http://www.xavierdupre.fr/enseignement/complements/twitter_for_network_100000.db.zip)
* [twitter_for_network_full.db.zip](http://www.xavierdupre.fr/enseignement/complements/twitter_for_network_full.db.zip)

In [2]:
import pyensae.datasource
pyensae.datasource.download_data("twitter_for_network_100000.db.zip")

['.\\twitter_for_network_100000.db']

In [3]:
import cytoolz as ct 
import cytoolz.curried as ctc
import sqlite3
import pprint
try:
    import ujson as json
except:
    import json

conn_sqlite = sqlite3.connect("twitter_for_network_100000.db")
cursor_sqlite = conn_sqlite.cursor()

## Description de la base de donnée

Nous nous intéresserons à 3 tables : **tw_users**, **tw_status** et **tw_followers_id**.  

La première (**tw_users**) contient des profils utilisateurs tels que retournés par l'api twitter (à noter que les profils ont été "épurés" d'informations jugées inutiles pour limiter la taille de la base de donnée).

La deuxième (**tw_status**) contient des status twitter (tweet, retweet, ou réponse à un tweet), complets, issus d'une certaine catégorie d'utilisateurs (les tweets sont tous issus d'environ 70 profils).

La troisième (**tw_followers_id**) contient des listes d'id d'users, qui suivent les utilisateurs référencés par la colonne user_id. Là encore ce ne sont les followers que de environ 70 profils. Chaque entrée contient au plus 5000 id de followers (il s'agit d'une limitation de twitter).

Elles ont les structures suivantes :

Les trois possèdent un champ content, de type json, qui sera celui qui nous interessera le plus. Vous pouvez accédez aux données dans les tables avec les syntaxes suivantes (vous pouvez commenter/décommenter les différentes requêtes).

In [4]:
# cursor_sqlite.execute( "SELECT user_id, content FROM tw_followers_id")
cursor_sqlite.execute( "SELECT id, content, screen_name FROM tw_users")
# cursor_sqlite.execute( "SELECT id, content, user_id FROM tw_status")

for it_elt in cursor_sqlite:
    ## do something here
    pass

# ou, pour accéder à un élément :
cursor_sqlite.execute( "SELECT id, content, screen_name FROM tw_users")
cursor_sqlite.fetchone()

(1103159180,
 '{"utc_offset": 7200, "friends_count": 454, "entities": {"description": {"urls": []}, "url": {"urls": [{"expanded_url": "http://www.havas.com", "display_url": "havas.com", "indices": [0, 22], "url": "http://t.co/8GcZtydjWh"}]}}, "description": "Havas Group CEO", "id": 1103159180, "contributors_enabled": false, "geo_enabled": false, "name": "Yannick Bollor\\u00e9", "favourites_count": 873, "verified": true, "protected": false, "created_at": "Sat Jan 19 08:23:33 +0000 2013", "statuses_count": 654, "lang": "en", "time_zone": "Ljubljana", "screen_name": "YannickBollore", "location": "", "id_str": "1103159180", "url": "http://t.co/8GcZtydjWh", "followers_count": 7345, "listed_count": 118, "has_extended_profile": false}',
 'YannickBollore')

Toutefois les curseurs de base de donnée en python se comportent comme des "iterables" (i.e. comme une liste ou une séquence, mais sans nécessairement charger toutes les données en mémoire).  
On peut donc les passer directement en argument aux fonctions de cytoolz.

In [5]:
cursor_sqlite.execute( "SELECT user_id, content FROM tw_followers_id")
print( ct.count( cursor_sqlite ) )
cursor_sqlite.execute( "SELECT id, content, user_id FROM tw_status")
print( ct.count( cursor_sqlite ) )
cursor_sqlite.execute( "SELECT id, content, screen_name FROM tw_users")
print( ct.count( cursor_sqlite ) )

2079
16092
100071


**Attention au fait que le curseur garde un état**.  
Par exemple exécutez le code suivant :  

In [6]:
cursor_sqlite.execute( "SELECT user_id, content FROM tw_followers_id")
print( ct.count( cursor_sqlite ) )
print( ct.count( cursor_sqlite ) )

2079
0


Le deuxième count renvoit 0 car le curseur se rappelle qu'il est déjà arrivé à la fin des données qu'il devait parcourir. Il faut donc réinitialiser le curseur :

In [7]:
cursor_sqlite.execute( "SELECT user_id, content FROM tw_followers_id")
print( ct.count( cursor_sqlite ) )
cursor_sqlite.execute( "SELECT user_id, content FROM tw_followers_id")
print( ct.count( cursor_sqlite ) )

2079
2079


On peut également mettre la commande execute à l'intérieur d'une fonction, que l'on appelle ensuite :

In [8]:
def get_tw_followers_id():
    return cursor_sqlite.execute( "SELECT user_id, content FROM tw_followers_id")
print( ct.count( get_tw_followers_id() ) )
print( ct.count( get_tw_followers_id() ) )

2079
2079


La commande exécute en elle-même ne prend pas du tout de temps, car elle ne fait que préparer la requête, n'hésitez donc pas à en mettre systématiquement dans vos cellules, plutôt que de risquer d'avoir un curseur dont vous ne vous souvenez plus de l'état.

## Partie 1 - description de la base de donnée

#### Question 1 - éléments unique d'une table

Trouvez la liste des user_id différents dans la table tw_followers_id, en utilisant les fonctions cytoolz.  
La fonction qui pourra vous être utiles ici :  

  - `ct.unique(seq)` => à partir d'une séquence, renvoit une séquence où tous les doublons ont été supprimés
  
Vous vous rappelez sans doute que nous utilisions systématiquement pluck et map pour les exemples du cours, ceux-ci ne sont pas nécessaires ici.  
A noter qu'il faudra sans doute utilisez la fonction list( ... ), ou une boucle for pour forcer l'évaluation des fonctions [cytoolz](https://github.com/pytoolz/cytoolz).

In [9]:
import cytoolz as ct 
import cytoolz.curried as ctc

list( ct.unique( cursor_sqlite.execute( "SELECT user_id FROM tw_followers_id") ) )

[(18814998,),
 (26031424,),
 (96093611,),
 (24906326,),
 (17211968,),
 (17172319,),
 (14658057,),
 (427288938,),
 (105116603,),
 (95533081,),
 (24732180,),
 (17508660,),
 (38170599,),
 (83784709,),
 (22428439,),
 (551669623,),
 (14389177,),
 (26073581,),
 (360680603,),
 (104857771,),
 (1055021191,),
 (217749896,),
 (2445174992,),
 (258345629,),
 (20702279,),
 (1134643441,),
 (415498269,),
 (374184532,),
 (1976143068,),
 (520449734,),
 (483087012,),
 (80820758,),
 (69255422,),
 (413839619,),
 (34225599,),
 (38021678,),
 (472412809,),
 (219361536,),
 (49969223,),
 (359979086,),
 (72276456,),
 (69032746,),
 (490126636,),
 (93484066,),
 (308679667,),
 (70385068,),
 (19438626,),
 (2709830497,),
 (120003933,),
 (61903470,),
 (273341346,),
 (146423935,),
 (601970599,),
 (3301529297,),
 (3026614773,),
 (3115155587,),
 (1374567409,),
 (2336714149,),
 (873851912,),
 (17744075,),
 (283979861,),
 (2310376344,),
 (88884666,),
 (417340481,),
 (274924087,),
 (534135763,),
 (16222469,),
 (497110024,),

A noter que si vous voyez apparaître vos résultats sous la forme (79145543,), c'est normal, le curseur sqlite renvoit toujours ces résultats sous forme de tuple : (colonne1, colonne2, colonne3, ...) et ce même si il n'y a qu'une seule colonne dans la requête.  

Nous utiliserons pluck pour extraire le premier élément du tuple.

#### Question 2 - nombre d'élements unique d'une table

Trouvez le nombre de user_id différents dans la table tw_followers_id, en utilisant les fonctions cytoolz.  
Les fonctions qui pourront vous êtres utiles ici :  

  - `ct.count(seq)` => compte le nombre d'éléments d'une séquence
  - `ct.unique(seq)` => à partir d'une séquence, renvoit une séquence où tous les doublons ont été supprimés
  
Vous vous rappelez sans doute que nous utilisions systématiquement pluck et map pour les exemples du cours, ceux-ci ne sont pas nécessaires, ici.

In [10]:
import cytoolz as ct 
import cytoolz.curried as ctc

ct.count( ct.unique( cursor_sqlite.execute( "SELECT user_id FROM tw_followers_id") ) )

73

#### Question 3 : création d'une fonction comptez_unique

A l'aide de `ct.compose`, créez une fonction **comptez_unique** qui effectue directement cette opération. Pour rappel, ``ct.compose( f, g, h, ...)`` renvoit une fonction qui appelée sur `x` exécute (``f(g(h(x))``). `ct.compose` prend un nombre d'arguments quelconque. A noter que les fonctions données en argument doivent ne prendre qu'un seul argument, ce qui est le cas ici. Pensez bien que comme vous manipulez ici les fonctions elle-même, il ne faut pas mettre de parenthèses après 

In [11]:
import cytoolz as ct 
import cytoolz.curried as ctc

comptez_unique = ct.compose( ct.count, ct.unique )

In [12]:
## Pour tester votre code, cette ligne doit renvoyer le même nombre qu'à la question 2
comptez_unique( cursor_sqlite.execute( "SELECT user_id FROM tw_followers_id") )

73

#### Question 4 : compte du nombre de valeurs de "location" différentes dans la table tw_users

Nous allons utiliser la fonction comptez_unique définie précédemment pour comptez le nombre de "location" différentes dans la table **tw_users**.  
Pour cela il faudra faire appel à deux fonctions :  

  - **ct.pluck** pour extraire une valeur de tous les éléments d'une séquence
  - **ct.map** pour appliquer une fonction (ici `json.loads` pour transformer une chaîne de caractère au format json en objet python).
  
Il faudra sans doute appliquer `ct.pluck` deux fois, une fois pour extraire la colonne content du résultat de la requête (même si celle-ci ne comprend qu'une colonne) et une fois pour extraire le champ "location" du json. Les syntaxes de ces fonctions sont les suivantes :

  - **ct.pluck( 0, seq )** (cas d'une séquence de liste ou de tuple) ou **ct.pluck( key, seq )** (cas d'une séquence de dictionnaire).
  - **ct.map( f, seq )** où f est la fonction que l'on souhaite appliquer (ne mettez pas les parenthèses après le `f`, ici vous faites références à la fonction, pas son résultat)

**Astuce :** dans le cas improbable où vous auriez un ordinateur sensiblement plus lent que le rédacteur du tp, rajoutez `LIMIT 10000` à la fin des requêtes

In [13]:
import cytoolz as ct 

cursor_sqlite.execute( "SELECT content FROM tw_users")
comptez_unique( ct.pluck("location", ct.map(json.loads, ct.pluck(0, cursor_sqlite))))
# Le résultat attendu est 13730

13730

#### Question 5 : curly fonctions

Comme on risque de beaucoup utiliser les fonctions `ct.map` et `ct.pluck`, on veut se simplifier la vie en utilisant la notation suivante :  

In [14]:
pluck_loc = ctc.pluck("location")
map_loads = ctc.map(json.loads)
pluck_0   = ctc.pluck(0)

Notez bien que nous utilisons ctc.pluck et non pas ct.pluck, car le package [cytoolz.curry](http://toolz.readthedocs.io/en/latest/curry.html) (ici importé en temps que `ctc`) contient les versions de ces fonctions qui supportent l'évaluation partielle. Les objets **pluck_loc**, **map_loads**, **pluck_0** sont donc des fonctions à un argument, construites à partir de fonctions à deux arguments. Utilisez ces 3 fonctions pour simplifier l'écriture de la question 4

In [15]:
import cytoolz as ct 

cursor_sqlite.execute( "SELECT content FROM tw_users")
comptez_unique( pluck_loc( map_loads( pluck_0(cursor_sqlite))))
# Le résultat attendu est 13730

13730

#### Question 6 : fonction get_json_seq

A partir des fonctions précédentes et de la fonction compose, créez une fonction get_json_seq, qui à partir d'un curseur d'une requête dont la colonne content est en première position, renvoit une séquence des objets json loadés. Vous devez pouvoir l'utiliser pour réécrire le code de la question précédente ainsi :

In [16]:
import cytoolz as ct 

cursor_sqlite.execute( "SELECT content FROM tw_users")
get_json_seq = ct.compose( map_loads, pluck_0 )
comptez_unique( pluck_loc( get_json_seq(cursor_sqlite)))

13730

#### Question 7 : liste des localisations avec Paris

On peut vérifier si une localisation contient le mot "Paris", avec toutes ces variations de casse possible avec la fonction suivante :

In [17]:
def contains_paris(loc):
    return "paris" in loc.lower()

En utilisant cette fonction et la fonction ct.filter, trouvez :  

  - le nombre d'utilisateur dont la location contient Paris sous une forme ou une autre (question 7.1)
  - tous les variantes de location contenant Paris (pour info il y en a 977)
  
`ct.filter` s'utilise avec la syntaxe ``ct.filter( f, seq )`` et renvoit une séquence de tous les éléments de la séquence en entrée pour lesquels f renvoit `true`. Vous aurez besoin des fonctions ct.unique et ct.count. Si vous avez une sortie du type `<cytoolz.itertoolz._unique_identity at 0x7f3e7f3d6d30>`, rajouter la fonction `list( ... )` autour pour forcer l'évaluation.

In [18]:
## Réponse 7.1
import cytoolz as ct 

cursor_sqlite.execute( "SELECT content FROM tw_users")
ct.count( ct.filter( contains_paris, pluck_loc( get_json_seq(cursor_sqlite))))
## le résultat attendu est 5470

5470

In [19]:
## Réponse 7.2
import cytoolz as ct 

cursor_sqlite.execute( "SELECT content FROM tw_users")
list(ct.unique( ct.filter( contains_paris, pluck_loc( get_json_seq(cursor_sqlite)))))
## la liste doit contenir 977 éléments

['Paris, Ile-de-France',
 'Paris',
 'Rouen/Paris, France',
 'paris 75006',
 'paris 75018',
 'Somewhere in Paris area',
 'Paris.',
 'Paris, New York, Indonesia',
 'PARiS',
 'Now in Paris',
 'paris',
 'Paris France',
 'Est parisien',
 'PARIS',
 'Paris ',
 'paris ',
 '- Paris -',
 'France, Paris',
 '10, rue du Colisée 75008 Paris',
 'Paris - Munich',
 "Paris' Beach",
 'Paris - Uppsala',
 'Groupe La Poste -Paris',
 'France (Paris) ',
 'Paris, France',
 'Normandie Deauville et Paris',
 'Paris /France',
 'Perpignan, Paris, Cordoba (SP)',
 'Paris - Arras - Gascogne ',
 'Paris / Toulouse / Krakow',
 '#Paris, Ile-de-France',
 'Montréal - Paris',
 'Limoges/Paris',
 'LyonParisClermontNantesMorzine',
 'paris 13',
 'Grenoble, Paris, Annonay ',
 'Bordeaux/Paris',
 '#Paris5',
 "Paris / Ile d'Yeu",
 '#Paris - #Angers',
 'Paris et Toulouse',
 'Paris 19ème',
 'paris Marais',
 'REIMS. METZ. PARIS',
 'Europe Paris Nantes',
 'Auxerre Paris Toulouse',
 'Paris, 3e',
 'Paris - Montpellier',
 'Paris-Reims',
 '

#### Question 8 : somme des tweets de tous les utilisateurs dont la location contient Paris

Calculez le nombre de tweets total par les utilisateurs dont la "location" contient Paris.  
Dans le json de twitter, la clé pour cela est "statuses_count"

Pour cela plusieurs possibilités :  

  - la plus simple est de redéfinir une fonction contains_paris, qui prenne en entrée un user json 
  - groupby("location", seq) vous renvoit les réponses groupées par location. Cette méthode possède l'inconvénient de charger toutes les données en mémoire
  - reduceby("location", lambda x,y: x + y["statuses_count"], seq, 0) vous renvoit la somme par location, il ne reste plus qu'à filtrer et additionner
  - pluck(["location", "statuses_count"], seq) vous permet de garder les deux informations. Il faudra changer la fonction contains paris pour celle suivante (contains_paris_tuple)
  
Réponse attendue : 9811612

In [20]:
## Réponse 8
import cytoolz as ct 

# solution 1 
def contains_paris_json(loc):
    return "paris" in loc["location"].lower()

cursor_sqlite.execute( "SELECT content FROM tw_users")
# Ici j'utilise pipe, car les fonctions sont appelées dans l'ordre indiqué, cela est plus pratique
# que compose
print( ct.pipe( cursor_sqlite,
                get_json_seq,
                ctc.filter(contains_paris_json),
                ctc.pluck("statuses_count"),
                sum ) )


# Solution 2 - déconseillée
cursor_sqlite.execute( "SELECT content FROM tw_users")
# Ici j'utilise pipe, car les fonctions sont appelées dans l'ordre indiqué, cela est plus pratique
# que compose
print(
  ct.pipe(cursor_sqlite,
          ## on récupère les objets json
          get_json_seq,
          ## on regroupe par localisation
          ctc.groupby("location"), 
          ## on ne garde que les entrées dont la clé contient paris
          ctc.keyfilter(contains_paris), 
          # Pour les valeurs, ont fait la somme des statuses_count
          ctc.valmap( ctc.compose( sum, ctc.pluck("statuses_count") )),     
          lambda x:x.values(),
          sum ))

# Solution 4 - valable également
cursor_sqlite.execute( "SELECT content FROM tw_users")
print(
  ct.pipe(cursor_sqlite,
          ## on récupère les objets json
          get_json_seq,
          ## on garde la location et les nombres de status
          ctc.pluck(["location","statuses_count"]),
          ## on applique le filtre, avec contains_paris sur le premier élément 
          ctc.filter( ctc.compose( contains_paris, ctc.get(0)) ),
          ## une fois le filtre appliqué, on ne garde que les statuses_count
          ctc.pluck(1),
          # on fait la somme
          sum))

9811612
9811612
9811612


#### Question 9 : comparaison des followers d'homme politique

On va maintenant s'intéresser à la proximité / corrélation entre les hommes politiques, que l'on mesurera à partir de la formule :

$\frac{1}{2}*( \frac{nbFollowersCommun}{nbFollowersHommePolitique_1} + \frac{nbFollowersCommun}{nbFollowersHommePolitique_2}$)

On prend donc la moyenne des ratios des followers de chaque homme politique suivant l'autre (cette formule semble s'accommoder assez bien des différences du nombre de followers entre homme politiques)

On s'intéressera notamment aux hommes politiques suivants :

```
benoithamon | 14389177
montebourg  | 69255422
alainjuppe  | 258345629
```

De fait vous pouvez prendre n'importe quel homme ou femme politique, les résultats de cette méthode sont assez probants malgré sa rusticité. 

**Important : pensez à appliquer la cellule ci-dessous**

In [21]:
try:
    cursor_sqlite.execute("CREATE UNIQUE INDEX tw_users_id_index ON tw_users(id)")
    print("Index created")
except sqlite3.OperationalError as e:
    if( "index tw_users_id_index already exists" in str(e)):
        print("Ok, index already exists")
    else:
        raise e

Index created


La façon la plus simple est de charger les listes d'id de followers en mémoire, dans des objets de type set, et de les comparer avec les opérateurs & (intersection) - (différences).  

On peut aussi chercher une méthode approchée, en comparant de façon aléatoire les listes contenues dans tw_follower_id. 

In [22]:
part_taken = 2

get_all_followers_set = ctc.compose(
    ctc.reduce(set.union), 
    ctc.map(set),
    get_json_seq,
    ctc.take_nth(part_taken),
    ct.curry(cursor_sqlite.execute,"SELECT content FROM tw_followers_id WHERE user_id = ?"),
    lambda x:(x,)
)

def proximite(a,b):
    c = a & b
    return min(1, 0.5*len(c)*(1/len(a)+1/len(b))*part_taken)

users_id_list = [ 14389177, 69255422, 258345629 ]

users_id_f_set = list( ct.map( get_all_followers_set, users_id_list ) )

for it_l in users_id_f_set:
    print( ";".join("{0:.2f}".format(proximite(it,it_l)) for it in users_id_f_set) )

1.00;0.46;0.23
0.46;1.00;0.27
0.23;0.27;1.00


## Partie 2 : avec dask

Essayez d'exécuter le code suivant

In [23]:
import dask

On affiche la liste des tables de la base sqlite:

In [24]:
from pyensae.sql import Database
dby = Database("twitter_for_network_100000.db")
dby.connect()

In [25]:
dby.get_table_list()

['tw_followers_id',
 'tw_search_count',
 'tw_status',
 'tw_users',
 'tw_users_history',
 'tw_users_htmldata']

In [26]:
dby.close()

[dask](http://dask.pydata.org/en/latest/) peut vous permettre de paralléliser de façon efficace votre code entre plusieurs processeurs. Utilisez le code suivant pour splitter la base 'twitter_for_network_full.db' en plusieurs fichiers plats (NB: pensez à nettoyer votre disque dur après ce tp).

In [27]:
import cytoolz as ct # import groupby, valmap, compose
import cytoolz.curried as ctc ## pipe, map, filter, get
import sqlite3
import pprint
try:
    import ujson as json
except:
    import json

conn_sqlite_f = sqlite3.connect("twitter_for_network_100000.db")
cursor_sqlite_f = conn_sqlite_f.cursor()

In [28]:
cursor_sqlite_f.execute("SELECT content FROM tw_users")

for it in range(100):
    with open( "tw_users_split_{0:02d}.json".format(it), 'w') as f:
        for it_index, it_json in enumerate( cursor_sqlite_f ):
            f.write( it_json[0] )
            f.write("\n")
            if it_index == 100000:
                break
        else:
            break

Calculez maintenant, en utilisant [dask.bag](http://dask.pydata.org/en/latest/bag.html) :

- le nombre total de status
- le nombre de status moyen par location
- la distribution du nombre de followers par puissance de 10 sur l'ensemble des users

In [29]:
## Code commun nécessaire

import dask.bag as dbag
try:
    import ujson as json
except:
    print("ujson unavailable")
    import json
from operator import add

a = dbag.read_text('tw_users_split*.json')

In [30]:
# Le nombre total de status

a.map(json.loads).pluck("statuses_count").fold(add).compute()

80536977

In [31]:
# Le nombre moyen de tweet par location.
import cytoolz

# Solution
def mean(l):
    # la parallélisation n'est pas effectuée de la même manière
    # sur linux et Windows
    # dans un process différent, les import
    # faits au début du programme ne font pas partie
    # du code parallélisé, c'est pourquoi il faut les ajouter
    # dans le code de la fonction
    import cytoolz.curried as ctc
    return sum( ctc.pluck(1, l) ) / len(l)

def function_mapped(*args):
    # Example of args:
    # (('Lille', [['Lille', 483]]),)
    if len(args) == 2:
        x, y = args
    else:
        x, y = args[0]
    me = mean(y)
    return x, me

result = a.map(json.loads).pluck(["location","statuses_count"]) \
          .groupby(0).map(function_mapped).compute()
result[:10]

[('www.ville-troyes.fr', 292.0),
 ('', 299.8513534747518),
 ('Pau/Paris', 2147.0),
 ('France', 2739.652611705475),
 ('Villeneuve, Paris, monde', 13678.0),
 ('Nice, France', 7690.0),
 ('Trappes - Yvelines', 3602.0),
 ('Paris', 5858.625),
 ('France - Bordeaux', 2134.0),
 ('Crest', 2240.0)]

In [32]:
# La distribution du nombre de followers par puissance de 10
import math
a.map(json.loads) \
 .pluck("followers_count") \
 .map(lambda x,math=math: int(math.log10(x+1)) ) \
 .frequencies() \
 .compute()

[(3, 1922),
 (1, 28931),
 (4, 290),
 (5, 65),
 (2, 13377),
 (0, 55480),
 (6, 5),
 (7, 1)]

Dans ce second cas, on a gardé la parallélisation mais il a fallu ajouter le module [math](https://docs.python.org/3/library/math.html) au contexte de la fonction parallélisée.