Code source de ensae_teaching_cs.special.elections

# -*- coding: utf-8 -*-
"""
Contains a class to process elections results (France)


:githublink:`%|py|6`
"""
import random
import numpy
import pandas


[docs]class ElectionResults: """ Processes data coming from `data.gouv.fr <http://www.data.gouv.fr/content/search?SortBy=Pertinence&SortOrder=0&SearchText=%C3%A9lections+2012>`_. The class uses `pandas <http://pandas.pydata.org/>`_ to process the data. See `Elections françaises <http://www.xavierdupre.fr/blog/2013-12-06_nojs.html>`_. See `Optimisation sous contraintes appliquée au calcul du report des voix <http://www.xavierdupre.fr/blog/2013-12-07_nojs.html>`_. :githublink:`%|py|19` """
[docs] def __init__(self, file, year=None, level="Départements"): """ Loads the data downloaded from `data.gouv.fr <http://www.data.gouv.fr/content/search?SortBy=Pertinence&SortOrder=0&SearchText=%C3%A9lections+2012>`_. :param file: xls file :param year: year (optional) :param level: ``Départements`` or ``Cantons`` :githublink:`%|py|29` """ self.year = year self.level = level.lower().replace("s", "") if isinstance(file, list): self.tours = file else: self.tours = [pandas.read_excel(file, sheet_name="%s T1" % level, engine='openpyxl'), pandas.read_excel(file, sheet_name="%s T2" % level, engine='openpyxl')] for i, t in enumerate(self.tours): if len(t) == 0: raise Exception("no data for tour %d" % (i + 1)) self.tours = [self.process_tour(_) for _ in self.tours] for i, t in enumerate(self.tours): if len(t) == 0: raise Exception("no data for tour %d" % i) try: self.tours = [ _.sort_values("Libellé du %s" % self.level, inplace=False) for _ in self.tours] except Exception as e: message = "unable to sort, shape={1} columns={0}".format( ",".join(self.tours[0].columns), self.tours[0].shape) raise Exception(message) from e
[docs] def get_candidates_votes(self, round): """ Returns the numbers of voters for each candidate. :param round: 0 or 1 :return: dictionary :githublink:`%|py|58` """ cols0 = [_ for _ in self.tours[ round].columns if _ not in self.LevelCol] sums = [self.tours[round][c].sum() for c in cols0] return {c: s for c, s in zip(cols0, sums)}
[docs] def correct(self, method=None): """ Corrects the second round in a way there is the same number of voters. :param method: some preprocess before going on (see below) About ``method``: - *'N'* --> correct the number of voters for each regions - *'cand'* --> gives the same weights to every candidates :githublink:`%|py|74` """ if method == "N": if len(self.T0) != len(self.T1): raise Exception( "unable to proceed because of different numbers of regions") cols0 = [_ for _ in self.tours[ 0].columns if _ not in self.LevelCol] cols1 = [_ for _ in self.tours[ 1].columns if _ not in self.LevelCol] for i in range(len(self.T0)): s1 = self.T0.loc[i, cols0].sum() s2 = self.T1.loc[i, cols1].sum() coef = 1.0 * s1 / s2 for c in cols1: self.T1.loc[i, c] *= coef elif method == "cand": cols0 = [_ for _ in self.tours[ 0].columns if _ not in self.LevelCol] sums = [self.T0[c].sum() for c in cols0] total = sum(sums) for c, s in zip(cols0, sums): self.T0[c] = self.T0[c] * total / s self.correct("N") else: raise NotImplementedError("unknown method: " + method)
[docs] def __str__(self): """ usual :githublink:`%|py|101` """ message = "Year: {0} T1: {1} T2: {2}".format( self.Year, len(self.tours[0]), len(self.tours[1])) return message
[docs] def GetNbCandidates(self, round): """ Returns the number of candidates. :param round: round (0 or 1) :return: number of candidates :githublink:`%|py|111` """ return len(self.tours[round].columns) - 4
@property def Year(self): """ Returns the year. :githublink:`%|py|118` """ return self.year @property def Level(self): """ Returns the level (``département`` or ``canton``). :githublink:`%|py|125` """ return self.level @property def LevelCol(self): """ Returns the column associated to the level (their name depends on the level). :githublink:`%|py|132` """ return ["Code du %s" % self.level, "Libellé du %s" % self.level] @property def T0(self): """ Returns the dataframe for the first round. :githublink:`%|py|139` """ return self.tours[0] @property def T1(self): """ Returns the dataframe for the second round. :githublink:`%|py|146` """ return self.tours[1]
[docs] def process_tour(self, tour): """ Keeps the interesting columns, move candidates name as column name. :param tour: dataframe :return: dataframe :githublink:`%|py|155` """ keep = [isinstance(_, (float, int, numpy.int64, numpy.float64)) and ~numpy.isnan(_) for _ in tour["Abstentions"]] tour = tour.loc[keep, :] names = [_ for _ in tour.columns if _.startswith("Nom")] res = [] for n in names: c = list(tour[n]) res.extend(c) unique = set(res) unique = list(unique) try: unique.sort() except TypeError as e: raise Exception("unable to sort " + str(unique) + "\ncolumns:{0}".format(",".join(tour.columns))) from e columns0 = ["Code du %s" % self.level, "Libellé du %s" % self.level, ] columns1 = ["Abstentions", "Blancs et nuls", ] def proc(row): res = {} for i, v in enumerate(row): k = tour.columns[i] if k in columns0: res[k] = row[i] elif k in columns1: res[k] = row[i] elif k.startswith("Nom"): res[v] = row[i + 2] badkeys = [_ for _ in res if len(_) == 0] if len(badkeys) > 0: return None return res rows = list(map(lambda r: proc(r), tour.values)) rows = [_ for _ in rows if _ is not None] return pandas.DataFrame(rows)
[docs] def vote_transfer(self): """ Computes the votes between the two rounds using contrainsts optimization, the optimization requires :epkg:`cvxopt`. See `Optimisation sous contraintes appliquée au calcul du report des voix <http://www.xavierdupre.fr/blog/2013-12-07_nojs.html>`_. :return: results (as a DataFrame) :githublink:`%|py|203` """ cols0 = [_ for _ in self.tours[0] if _ not in self.LevelCol] X = self.tours[0][cols0].values X = numpy.matrix(X) cols1 = [_ for _ in self.tours[1] if _ not in self.LevelCol] Y = self.tours[1][cols1].values Y = numpy.matrix(Y) nbC = Y.shape[1] lin, col = X.shape # construction de Q def _zeros(lin, col): return [[0.0 for i in range(0, col)] for j in range(0, lin)] bigX = [numpy.matrix(_zeros(lin, col * nbC)) for i in range(0, nbC)] for i in range(0, nbC): bigX[i][:, col * i:col * (i + 1)] = X[:, :] pX = [] for m in bigX: pX.append(m.transpose() * m) Q = None for m in pX: if Q is None: Q = +m else: Q += m * 2 # construction de p p = None for i in range(0, nbC): tr = bigX[i].transpose() y2 = Y[:, i] * (-2) t = tr * y2 if p is None: p = t else: p += t # construction de G, h def _identite(n): return [[0.0 if i != j else 1.0 for i in range(0, n)] for j in range(0, n)] h = numpy.matrix(_zeros(col * nbC, 1)) G = - numpy.matrix(_identite(col * nbC)) # construction de C,b b = numpy.matrix(_zeros(col, 1)) b[:, :] = 1.0 C = numpy.matrix(_zeros(col, col * nbC)) for i in range(0, col): for ni in range(0, nbC): C[i, i + col * ni] = 1.0 # résolutation from cvxopt import solvers from cvxopt import matrix Q = matrix(Q) p = matrix(p) G = matrix(G) h = matrix(h) C = matrix(C) b = matrix(b) old = solvers.options.get("show_progress", True) solvers.options["show_progress"] = False sol = solvers.qp(Q, p, G, h, C, b) solvers.options["show_progress"] = old coef = sol['x'] res = numpy.matrix(_zeros(col, nbC)) for i in range(0, nbC): res[:, i] = coef[col * i:col * (i + 1)] rown = [_ for _ in self.tours[0].columns if _ not in self.LevelCol] coln = [_ for _ in self.tours[1].columns if _ not in self.LevelCol] return pandas.DataFrame(data=res, index=rown, columns=coln)
[docs] def resample(self, method="uniform"): """ Builds a new sample: it produces a results with the same number of rows, but each rows is randomly drawn from the current data. This is needed for the bootstrap procedures. :param method: ``weight`` or ``uniform`` :return: two matrices :githublink:`%|py|292` """ if len(self.T0) != len(self.T1): raise Exception( "unable to proceeed, we need to draw the same regions, assuming both matrices are sorted in the same order") def resample_matrix(mat, h): return mat.loc[h, :] if method == "uniform": n = len(self.T0) h = [random.randint(0, n - 1) for i in range(0, n)] else: def find_index(x): s = 0 for i, _ in enumerate(self.WeightsNorm): s += _ if x < s: return i return len(self.WeightsNorm) - 1 n = len(self.T0) h = [find_index(random.random()) for i in range(0, n)] return ElectionResults([resample_matrix(self.T0, h), resample_matrix(self.T1, h), ], year=self.year, level=self.level)
[docs] def get_people(self, round=0): """ Returns the number of people per regions. :param round: first (0) or second (1) round :return: series :githublink:`%|py|322` """ return self.tours[round].apply(lambda row: sum([row[_] for _ in self.tours[round].columns if _ not in self.LevelCol]), axis=1)
@property def WeightsNorm(self): """ Returns the proportion of voters for each regions. :githublink:`%|py|329` """ if "weightsnorm" not in self.__dict__: self.weightsnorm = list(self.get_people()) s = sum(self.weightsnorm) self.weightsnorm = [_ * 1.0 / s for _ in self.weightsnorm] return self.weightsnorm
[docs] @staticmethod def min_max_mean_std(series, alpha=0.05): """ returns the mean standard deviation, bounds of the confidence interval :param series: list of numbers :param alpha: confidence level :return: mean, std, low, high :githublink:`%|py|344` """ series = list(sorted(series)) a = int(len(series) * alpha / 2) low, high = series[a], series[-a - 1] mean = sum(series) / len(series) std = sum([(x - mean) ** 2 for x in series]) / len(series) return mean, std ** 0.5, low, high
[docs] def bootstrap(self, iter=1000, method="vote_transfer", alpha=0.05, fLOG=None, **params): """ Uses the bootstrap method to compute confidence intervals see `bootstrap <http://fr.wikipedia.org/wiki/Bootstrap_%28statistiques%29>`_. :param iter: number of iteration :param method: method to bootstrap :param alpha: confidence level :param fLOG: logging function or none :param params: parameters to give to ``method`` :return: four matrices, averaged results, sigma, lower bound, higher bound :githublink:`%|py|363` """ if fLOG is None: fLOG = lambda *x: "" fLOG("sampling", iter) samples = [self.resample() for i in range(iter)] if method == "vote_transfer": matrices = [_.vote_transfer(**params) for _ in samples] else: raise NotImplementedError() mean = matrices[0].copy() std = matrices[0].copy() low = matrices[0].copy() high = matrices[0].copy() shape = mean.shape fLOG("level for each coefficient", shape) for i in range(0, shape[0]): for j in range(0, shape[1]): series = [m.iloc[i, j] for m in matrices] xmean, xstd, xlow, xhigh = ElectionResults.min_max_mean_std( series, alpha=alpha) mean.iloc[i, j] = xmean std.iloc[i, j] = xstd low.iloc[i, j] = xlow high.iloc[i, j] = xhigh return mean, std, low, high
[docs] @staticmethod def combine_into_string(matrices, float_format=str, agg_format=str): """ Combines two matrices into one before displaying it. :param matrices: list of matrices (same dimension) :param float_format: to format each float of all matrices :param agg_format: to build the aggregated string :return: matrixes (dataframe) Example: :: def pour(x) : if x < 0.01 : return "" else : return "%2.0f" % (x*100) + "%" boot = el.bootstrap(iter=10) comb = el.combine_string( [boot[2],boot[3]], pour, lambda v : "%s-%s" % tuple(v)) :githublink:`%|py|411` """ shape = matrices[0].shape res = [["" for i in range(shape[1])] for j in range(shape[0])] for i in range(0, shape[0]): for j in range(0, shape[1]): series = [float_format(m.iloc[i, j]) for m in matrices] res[i][j] = agg_format(series) return pandas.DataFrame(data=res, columns=list(matrices[0].columns), index=list(matrices[0].index))