Coverage for src/ensae_teaching_cs/special/elections.py: 85%
227 statements
« prev ^ index » next coverage.py v7.1.0, created at 2023-04-28 06:23 +0200
« prev ^ index » next coverage.py v7.1.0, created at 2023-04-28 06:23 +0200
1# -*- coding: utf-8 -*-
2"""
3@file
4@brief Contains a class to process elections results (France)
5"""
6import random
7import numpy
8import pandas
11class ElectionResults:
12 """
13 Processes data coming from
14 `data.gouv.fr <http://www.data.gouv.fr/content/search?SortBy=Pertinence&SortOrder=0&SearchText=%C3%A9lections+2012>`_.
16 The class uses `pandas <http://pandas.pydata.org/>`_ to process the data.
17 See `Elections françaises <http://www.xavierdupre.fr/blog/2013-12-06_nojs.html>`_.
18 See `Optimisation sous contraintes appliquée au calcul du report des voix <http://www.xavierdupre.fr/blog/2013-12-07_nojs.html>`_.
19 """
21 def __init__(self, file, year=None, level="Départements"):
22 """
23 Loads the data downloaded from
24 `data.gouv.fr <http://www.data.gouv.fr/content/search?SortBy=Pertinence&SortOrder=0&SearchText=%C3%A9lections+2012>`_.
26 @param file xls file
27 @param year year (optional)
28 @param level ``Départements`` or ``Cantons``
29 """
30 self.year = year
31 self.level = level.lower().replace("s", "")
32 if isinstance(file, list):
33 self.tours = file
34 else:
35 self.tours = [pandas.read_excel(file, sheet_name=f"{level} T1", engine='openpyxl'),
36 pandas.read_excel(file, sheet_name=f"{level} T2", engine='openpyxl')]
37 for i, t in enumerate(self.tours):
38 if len(t) == 0:
39 raise RuntimeError("no data for tour %d" % (i + 1))
40 self.tours = [self.process_tour(_) for _ in self.tours]
41 for i, t in enumerate(self.tours):
42 if len(t) == 0:
43 raise RuntimeError("no data for tour %d" % i)
44 try:
45 self.tours = [
46 _.sort_values(f"Libellé du {self.level}", inplace=False) for _ in self.tours]
47 except Exception as e:
48 message = "unable to sort, shape={1} columns={0}".format(
49 ",".join(self.tours[0].columns), self.tours[0].shape)
50 raise RuntimeError(message) from e
52 def get_candidates_votes(self, round):
53 """
54 Returns the numbers of voters for each candidate.
56 @param round 0 or 1
57 @return dictionary
58 """
59 cols0 = [_ for _ in self.tours[
60 round].columns if _ not in self.LevelCol]
61 sums = [self.tours[round][c].sum() for c in cols0]
62 return {c: s for c, s in zip(cols0, sums)}
64 def correct(self, method=None):
65 """
66 Corrects the second round in a way there is the same number of voters.
68 @param method some preprocess before going on (see below)
70 About ``method``:
72 - *'N'* --> correct the number of voters for each regions
73 - *'cand'* --> gives the same weights to every candidates
74 """
75 if method == "N":
76 if len(self.T0) != len(self.T1):
77 raise RuntimeError(
78 "unable to proceed because of different numbers of regions")
79 cols0 = [_ for _ in self.tours[
80 0].columns if _ not in self.LevelCol]
81 cols1 = [_ for _ in self.tours[
82 1].columns if _ not in self.LevelCol]
83 for i in range(len(self.T0)):
84 s1 = self.T0.loc[i, cols0].sum()
85 s2 = self.T1.loc[i, cols1].sum()
86 coef = 1.0 * s1 / s2
87 for c in cols1:
88 self.T1.loc[i, c] *= coef
89 elif method == "cand":
90 cols0 = [_ for _ in self.tours[
91 0].columns if _ not in self.LevelCol]
92 sums = [self.T0[c].sum() for c in cols0]
93 total = sum(sums)
94 for c, s in zip(cols0, sums):
95 self.T0[c] = self.T0[c] * total / s
96 self.correct("N")
97 else:
98 raise NotImplementedError("unknown method: " + method)
100 def __str__(self):
101 """usual"""
102 message = "Year: {0} T1: {1} T2: {2}".format(
103 self.Year, len(self.tours[0]), len(self.tours[1]))
104 return message
106 def GetNbCandidates(self, round):
107 """
108 Returns the number of candidates.
109 @param round round (0 or 1)
110 @return number of candidates
111 """
112 return len(self.tours[round].columns) - 4
114 @property
115 def Year(self):
116 """
117 Returns the year.
118 """
119 return self.year
121 @property
122 def Level(self):
123 """
124 Returns the level (``département`` or ``canton``).
125 """
126 return self.level
128 @property
129 def LevelCol(self):
130 """
131 Returns the column associated to the level (their name depends on the level).
132 """
133 return [f"Code du {self.level}", f"Libellé du {self.level}"]
135 @property
136 def T0(self):
137 """
138 Returns the dataframe for the first round.
139 """
140 return self.tours[0]
142 @property
143 def T1(self):
144 """
145 Returns the dataframe for the second round.
146 """
147 return self.tours[1]
149 def process_tour(self, tour):
150 """
151 Keeps the interesting columns, move candidates name as column name.
153 @param tour dataframe
154 @return dataframe
155 """
156 keep = [isinstance(_, (float, int, numpy.int64, numpy.float64)) and ~numpy.isnan(_)
157 for _ in tour["Abstentions"]]
158 tour = tour.loc[keep, :]
159 names = [_ for _ in tour.columns if _.startswith("Nom")]
160 res = []
161 for n in names:
162 c = list(tour[n])
163 res.extend(c)
164 unique = set(res)
165 unique = list(unique)
167 try:
168 unique.sort()
169 except TypeError as e:
170 msg = ','.join(tour.columns)
171 raise RuntimeError(
172 f"Unable to sort {unique!r}\ncolumns:\n{msg}") from e
174 columns0 = [f"Code du {self.level}", f"Libellé du {self.level}", ]
175 columns1 = ["Abstentions", "Blancs et nuls", ]
177 def proc(row):
178 res = {}
179 for i, v in enumerate(row):
180 k = tour.columns[i]
181 if k in columns0:
182 res[k] = row[i]
183 elif k in columns1:
184 res[k] = row[i]
185 elif k.startswith("Nom"):
186 res[v] = row[i + 2]
187 badkeys = [_ for _ in res if len(_) == 0]
188 if len(badkeys) > 0:
189 return None
190 return res
191 rows = list(map(lambda r: proc(r), tour.values))
192 rows = [_ for _ in rows if _ is not None]
193 return pandas.DataFrame(rows)
195 def vote_transfer(self):
196 """
197 Computes the votes between the two rounds using
198 contrainsts optimization, the optimization
199 requires :epkg:`cvxopt`.
201 See `Optimisation sous contraintes appliquée au calcul du report des voix <http://www.xavierdupre.fr/blog/2013-12-07_nojs.html>`_.
203 @return results (as a DataFrame)
204 """
205 cols0 = [_ for _ in self.tours[0] if _ not in self.LevelCol]
206 X = self.tours[0][cols0].values
207 X = numpy.matrix(X)
209 cols1 = [_ for _ in self.tours[1] if _ not in self.LevelCol]
210 Y = self.tours[1][cols1].values
211 Y = numpy.matrix(Y)
213 nbC = Y.shape[1]
214 lin, col = X.shape
216 # construction de Q
217 def _zeros(lin, col):
218 return [[0.0 for i in range(0, col)] for j in range(0, lin)]
219 bigX = [numpy.matrix(_zeros(lin, col * nbC)) for i in range(0, nbC)]
221 for i in range(0, nbC):
222 bigX[i][:, col * i:col * (i + 1)] = X[:, :]
224 pX = []
225 for m in bigX:
226 pX.append(m.transpose() * m)
228 Q = None
229 for m in pX:
230 if Q is None:
231 Q = +m
232 else:
233 Q += m * 2
235 # construction de p
236 p = None
237 for i in range(0, nbC):
238 tr = bigX[i].transpose()
239 y2 = Y[:, i] * (-2)
240 t = tr * y2
241 if p is None:
242 p = t
243 else:
244 p += t
246 # construction de G, h
247 def _identite(n):
248 return [[0.0 if i != j else 1.0 for i in range(0, n)] for j in range(0, n)]
249 h = numpy.matrix(_zeros(col * nbC, 1))
250 G = - numpy.matrix(_identite(col * nbC))
252 # construction de C,b
253 b = numpy.matrix(_zeros(col, 1))
254 b[:, :] = 1.0
255 C = numpy.matrix(_zeros(col, col * nbC))
256 for i in range(0, col):
257 for ni in range(0, nbC):
258 C[i, i + col * ni] = 1.0
260 # résolutation
261 from cvxopt import solvers
262 from cvxopt import matrix
264 Q = matrix(Q)
265 p = matrix(p)
266 G = matrix(G)
267 h = matrix(h)
268 C = matrix(C)
269 b = matrix(b)
271 old = solvers.options.get("show_progress", True)
272 solvers.options["show_progress"] = False
273 sol = solvers.qp(Q, p, G, h, C, b)
274 solvers.options["show_progress"] = old
275 coef = sol['x']
277 res = numpy.matrix(_zeros(col, nbC))
278 for i in range(0, nbC):
279 res[:, i] = coef[col * i:col * (i + 1)]
281 rown = [_ for _ in self.tours[0].columns if _ not in self.LevelCol]
282 coln = [_ for _ in self.tours[1].columns if _ not in self.LevelCol]
283 return pandas.DataFrame(data=res, index=rown, columns=coln)
285 def resample(self, method="uniform"):
286 """
287 Builds a new sample: it produces a results with the same number of
288 rows, but each rows is randomly drawn from the current data.
289 This is needed for the bootstrap procedures.
291 @param method ``weight`` or ``uniform``
292 @return two matrices
293 """
294 if len(self.T0) != len(self.T1):
295 raise RuntimeError(
296 "unable to proceeed, we need to draw the same regions, assuming both matrices are sorted in the same order")
298 def resample_matrix(mat, h):
299 return mat.loc[h, :]
300 if method == "uniform":
301 n = len(self.T0)
302 h = [random.randint(0, n - 1) for i in range(0, n)]
303 else:
304 def find_index(x):
305 s = 0
306 for i, _ in enumerate(self.WeightsNorm):
307 s += _
308 if x < s:
309 return i
310 return len(self.WeightsNorm) - 1
311 n = len(self.T0)
312 h = [find_index(random.random()) for i in range(0, n)]
314 return ElectionResults([resample_matrix(self.T0, h),
315 resample_matrix(self.T1, h), ],
316 year=self.year, level=self.level)
318 def get_people(self, round=0):
319 """
320 Returns the number of people per regions.
321 @param round first (0) or second (1) round
322 @return series
323 """
324 return self.tours[round].apply(lambda row: sum([row[_] for _ in self.tours[round].columns if _ not in self.LevelCol]), axis=1)
326 @property
327 def WeightsNorm(self):
328 """
329 Returns the proportion of voters for each regions.
330 """
331 if "weightsnorm" not in self.__dict__:
332 self.weightsnorm = list(self.get_people())
333 s = sum(self.weightsnorm)
334 self.weightsnorm = [_ * 1.0 / s for _ in self.weightsnorm]
335 return self.weightsnorm
337 @staticmethod
338 def min_max_mean_std(series, alpha=0.05):
339 """
340 returns the mean standard deviation, bounds of the confidence interval
342 @param series list of numbers
343 @param alpha confidence level
344 @return mean, std, low, high
345 """
346 series = list(sorted(series))
347 a = int(len(series) * alpha / 2)
348 low, high = series[a], series[-a - 1]
349 mean = sum(series) / len(series)
350 std = sum([(x - mean) ** 2 for x in series]) / len(series)
351 return mean, std ** 0.5, low, high
353 def bootstrap(self, iter=1000, method="vote_transfer", alpha=0.05, fLOG=None, **params):
354 """
355 Uses the bootstrap method to compute confidence intervals
356 see `bootstrap <http://fr.wikipedia.org/wiki/Bootstrap_%28statistiques%29>`_.
358 @param iter number of iteration
359 @param method method to bootstrap
360 @param alpha confidence level
361 @param fLOG logging function or none
362 @param params parameters to give to ``method``
363 @return four matrices, averaged results, sigma, lower bound, higher bound
364 """
365 if fLOG is None:
366 fLOG = lambda *x: ""
367 fLOG("sampling", iter)
368 samples = [self.resample() for i in range(iter)]
369 if method == "vote_transfer":
370 matrices = [_.vote_transfer(**params) for _ in samples]
371 else:
372 raise NotImplementedError()
374 mean = matrices[0].copy()
375 std = matrices[0].copy()
376 low = matrices[0].copy()
377 high = matrices[0].copy()
379 shape = mean.shape
380 fLOG("level for each coefficient", shape)
381 for i in range(0, shape[0]):
382 for j in range(0, shape[1]):
383 series = [m.iloc[i, j] for m in matrices]
384 xmean, xstd, xlow, xhigh = ElectionResults.min_max_mean_std(
385 series, alpha=alpha)
386 mean.iloc[i, j] = xmean
387 std.iloc[i, j] = xstd
388 low.iloc[i, j] = xlow
389 high.iloc[i, j] = xhigh
390 return mean, std, low, high
392 @staticmethod
393 def combine_into_string(matrices, float_format=str, agg_format=str):
394 """
395 Combines two matrices into one before displaying it.
397 @param matrices list of matrices (same dimension)
398 @param float_format to format each float of all matrices
399 @param agg_format to build the aggregated string
400 @return matrixes (dataframe)
402 Example:
404 ::
406 def pour(x) :
407 if x < 0.01 : return ""
408 else : return "%2.0f" % (x*100) + "%"
410 boot = el.bootstrap(iter=10)
411 comb = el.combine_string( [boot[2],boot[3]], pour, lambda v : "%s-%s" % tuple(v))
412 """
413 shape = matrices[0].shape
414 res = [["" for i in range(shape[1])] for j in range(shape[0])]
415 for i in range(0, shape[0]):
416 for j in range(0, shape[1]):
417 series = [float_format(m.iloc[i, j]) for m in matrices]
418 res[i][j] = agg_format(series)
419 return pandas.DataFrame(data=res, columns=list(matrices[0].columns), index=list(matrices[0].index))