Code source de actuariat_python.data.elections

# -*- coding: utf-8 -*-
"""
Various function to download data about **French** elections.


:githublink:`%|py|6`
"""
import re
import os
import warnings
from html.parser import HTMLParser
from html.entities import name2codepoint
from http.client import RemoteDisconnected
import urllib.error
import urllib.request
from urllib.error import HTTPError, URLError
import pandas
from pyquickhelper.loghelper import noLOG
from pyquickhelper.filehelper import unzip_files
from pyensae.datasource import download_data
from pyensae.datasource.http_retrieve import DownloadDataException
from .data_exceptions import DataNotAvailableError, DataFormatException


[docs]def elections_presidentielles_local_files(load=False): """ Returns the list of files included in this module about French elections. :param load: True: load the data :return: list of local files If the data is loaded, the function returns a dictionary of dataframe, one per round. :githublink:`%|py|32` """ this = os.path.dirname(__file__) data = os.path.abspath(os.path.join(this, "data_elections")) res = [os.path.join(data, "cdsp_presi2012t1_circ.xls"), os.path.join(data, "cdsp_presi2012t2_circ.xls")] for r in res: if not os.path.exists(r): raise FileNotFoundError(r) # pragma: no cover if not load: return res df1 = pandas.read_excel(res[0], sheet_name=1) df2 = pandas.read_excel(res[1], sheet_name=1) return dict(circ1=df1, circ2=df2)
[docs]def elections_presidentielles(url=None, local=False, agg=None): """ Downloads the data for the French elections from *data.gouv.fr*. :param url: url (None for default value) :param local: prefer local data instead of remote :param agg: kind of aggregation desired (see below) :return: dictionaries of DataFrame (one entry for each round) The default url comes from `Elections présidentielle 2012 - Résultats <https://www.data.gouv.fr/fr/datasets/election-presidentielle-2012-resultats-572124/>`_. You can get more at `Elections présidentielles 1965-2012 <https://www.data.gouv.fr/fr/datasets/elections-presidentielles-1965-2012-1/>`_. If url is None, we pull some data from folder :ref:`data/election <l-data-elections>`. Parameter *agg*: * *circ* or *None* for no aggregation * *dep* to aggregate per department :githublink:`%|py|69` """ if agg is None: if local: return elections_presidentielles_local_files(load=True) else: if url is None: url = "http://static.data.gouv.fr/ff/e9c9483d39e00030815089aca1e2939f9cb99a84b0136e43056790e47bb4f0.xls" url0 = None else: url0 = url try: df = pandas.read_excel(url, sheet_name=None) return df except (HTTPError, URLError, TimeoutError) as e: # pragma: no cover if url0 is None: return elections_presidentielles_local_files(load=True) raise DataNotAvailableError( "unable to get data from " + url) from e else: res = elections_presidentielles(url=url, local=local, agg=None) if agg == "circ": return res if agg == "dep": keys = list(res.keys()) for k in keys: col = res[k].columns key = col[:2] df = res[k].groupby(list(key)) df = df.sum() df = df.reset_index(drop=False) res["dep" + k[-1:]] = df return res raise ValueError( # pragma: no cover "unkown value for agg: '{0}'".format(agg))
[docs]def elections_legislatives_bureau_vote(source=None, folder=".", fLOG=noLOG): """ Retrieves data from `Résultat des élections législatives françaises de 2012 au niveau bureau de vote <https://www.data.gouv.fr/fr/datasets/resultat-des-elections-legislatives-francaises-de-2012-au-niveau-bureau-de-vote-nd/>`_. :param source: should be None unless you want to use the backup plan ("xd") :param folder: where to download :return: list of dataframe Others sources: * `Résultats élections municipales 2014 par bureau de vote <http://www.nosdonnees.fr/dataset/resultats-elections-municipales-2014-par-bureau-de-vote>`_ * `Elections 2015 - Découpage des bureaux de Vote <https://www.data.gouv.fr/fr/datasets/elections-2015-decoupage-des-bureaux-de-vote/>`_ * `Contours des cantons électoraux départementaux 2015 <https://www.data.gouv.fr/fr/datasets/contours-des-cantons-electoraux-departementaux-2015/>`_ * `Découpage électoral de la commune, pour les élections législatives <https://www.data.gouv.fr/fr/datasets/circonscriptions/>`_ (weird bizarre) * `Statistiques démographiques INSEE sur les nouvelles circonscriptions législatives de 2012 <https://www.data.gouv.fr/fr/datasets/statistiques-demographiques-insee -sur-les-nouvelles-circonscriptions-legislatives-de-2012-nd/>`_ :githublink:`%|py|128` """ if source is None: try: # pragma: no cover with urllib.request.urlopen("http://www.nosdonnees.fr/") as f: url = "http://www.nosdonnees.fr/storage/f/2013-03-05T184148/" if f is None: raise Exception( "Not sure we can continue. Pretty sure we should stop.") except (urllib.error.HTTPError, RemoteDisconnected): # pragma: no cover url = "xd" file = "LG12_BV_T1T2.zip" else: url = source file = "LG12_BV_T1T2.zip" data = download_data(file, website=url, whereTo=folder, fLOG=fLOG) res = {} for d in data: df = pandas.read_csv(d, encoding="latin-1", sep=";", low_memory=False) if d.endswith("_T2.txt"): key = "T2" elif d.endswith("_T1.txt"): key = "T1" else: raise ValueError( # pragma: no cover "Unable to guess key for filename: '{0}'".format(d)) res[key] = df return res
[docs]def elections_legislatives_circonscription_geo(source="xd", folder=".", fLOG=noLOG): """ Retrieves data from `Countours des circonscriptions des législatives <https://www.data.gouv.fr/fr/ datasets/countours-des-circonscriptions-des-legislatives-nd/>`_. :param source: should be None unless you want to use the backup plan ("xd") :param folder: where to download :return: list of dataframe :githublink:`%|py|166` """ if source is None: raise NotImplementedError( # pragma: no cover "use source='xd'") url = source file = "toxicode_circonscriptions_legislatives.zip" data = download_data(file, website=url, whereTo=folder, fLOG=fLOG) for d in data: if d.endswith(".csv"): df = pandas.read_csv(d, sep=",", encoding="utf-8") return df raise DataNotAvailableError( "unable to find any csv file in '{0}'".format(file))
[docs]def elections_vote_places_geo(source="xd", folder=".", fLOG=noLOG): """ Retrieves data vote places (bureaux de vote in French) with geocodes. :param source: should be None unless you want to use the backup plan ("xd") :param folder: where to download :param fLOG: logging function :return: list of dataframe :githublink:`%|py|190` """ if source is None: raise NotImplementedError("use source='xd'") url = source file = "bureauxvotegeo.zip" data = download_data(file, website=url, whereTo=folder, fLOG=fLOG) for d in data: if d.endswith(".txt"): df = pandas.read_csv(d, sep="\t", encoding="utf-8") return df raise DataNotAvailableError( "Unable to find any csv file in '{0}'".format(file))
[docs]def villes_geo(folder=".", as_df=False, fLOG=noLOG): """ Retrieves data vote places (bureaux de vote in French) with geocodes. :param folder: where to download :param as_df: return as a dataframe :param fLOG: logging function :return: list of dataframe :githublink:`%|py|213` """ this = os.path.abspath(os.path.dirname(__file__)) data = os.path.join(this, "data_elections", "villesgeo.zip") geo = unzip_files(data, where_to=folder) if isinstance(geo, list): res = geo[0] else: res = geo if as_df: return pandas.read_csv(res, encoding="utf-8", sep="\t") return res
[docs]class _HTMLToText(HTMLParser):
[docs] def __init__(self): HTMLParser.__init__(self) self._buf = [] self.hide_output = False
def handle_starttag(self, tag, attrs): if tag in ('p', 'br') and not self.hide_output: self._buf.append('\n') elif tag in ('script', 'style'): self.hide_output = True def handle_startendtag(self, tag, attrs): if tag == 'br': self._buf.append('\n') def handle_endtag(self, tag): if tag == 'p': self._buf.append('\n') elif tag in ('script', 'style'): self.hide_output = False def handle_data(self, data): if data and not self.hide_output: self._buf.append(re.sub(r'\s+', ' ', data)) def handle_entityref(self, name): if name in name2codepoint and not self.hide_output: c = name2codepoint[name] self._buf.append(c) def handle_charref(self, name): if not self.hide_output: n = int(name[1:], 16) if name.startswith('x') else int(name) self._buf.append(n) def get_text(self): return re.sub(r' +', ' ', ''.join(self._buf))
[docs]def html_to_text(html): """ Given a piece of HTML, return the plain text it contains. This handles entities and char refs, but not javascript and stylesheets. :githublink:`%|py|271` """ parser = _HTMLToText() parser.feed(html) parser.close() return parser.get_text()
[docs]def _elections_vote_place_address_patterns_(): return [ "bureau( de vote)?[- ]*n[^0-9]([0-9]{1,3})[- ]+(.*?)[- ]+([0-9]{5})[- ]+([-a-zéèàùâêîôûïöäëü']{2,40})[. ]"]
[docs]def elections_vote_place_address(folder=".", hide_warning=False, fLOG=noLOG): """ Scrapes and extracts addresses for every vote place (bureau de vote in French). :param folder: where to download the scraped pages :param hide_warnings: hide warnings :param fLOG: logging function :return: dictionary The function does not retrieve everything due to the irregular format. Sometimes, the city is missing or written above. :githublink:`%|py|294` """ _elections_vote_place_address_patterns = _elections_vote_place_address_patterns_() files = [] for deps in range(1, 96): last = "bureaudevote%02d.htm" % deps url = "http://bureaudevote.fr/" try: f = download_data(last, website=url, whereTo=folder, fLOG=fLOG) except (urllib.error.HTTPError, DownloadDataException): # pragma: no cover # backup plan files = download_data("bureauxdevote.zip", website="xd", whereTo=folder, fLOG=fLOG) break if isinstance(f, list): f = f[0] files.append(f) # extract data regex = [re.compile(_) for _ in _elections_vote_place_address_patterns] rows = [] exc = [] for data in files: lrows = [] with open(data, "r", encoding="iso-8859-1") as f: content = f.read().lower() content = html_to_text(content) content0 = content content = content.replace("\n", " ").replace("\t", " ") atous = [] for reg in regex: atous.extend(reg.findall(content)) if len(atous) < 4 and len(atous) > 0: mes = "Not enough vote places ({2}) in\n{0}\nFOUND\n{3}\nCONTENT\n{1}".format( data, content0, len(atous), "\n".join(str(_) for _ in atous)) exc.append(Exception(mes)) if len(atous) > 1: for t in atous: ad = t[-3].split("-") address = ad[-1].strip(" ./<>-") place = "-".join(ad[:-1]).strip(" ./<>-") if "bureau de vote" in place: if not hide_warning: warnings.warn("Too long address {0}".format(t)) else: try: lrows.append(dict(n=int(t[1]), city=t[-1].strip(" .<>/"), zip=t[-2], address=address, place=place)) except ValueError as e: # pragma: no cover raise DataFormatException( "issue with {0}".format(t)) from e if len(lrows[-1]["city"]) <= 1: mes = "No City in {0}\nROWS\n{2}\nCONTENT\n{1}".format( t, content0, "\n".join(str(_) for _ in lrows)) # pragma: no cover raise DataFormatException(mes) # pragma: no cover if lrows: rows.extend(lrows) elif "06.htm" in data: mes = "Not enough vote places ({2}) in\n{0}\nFOUND\n{3}\nCONTENT\n{1}".format( data, content0, len(lrows), "\n".join(str(_) for _ in lrows)) # pragma: no cover raise DataFormatException(mes) # pragma: no cover if len(exc) > 2: mes = "Exception raised: {0}\n---------\n{1}".format( # pragma: no cover len(exc), "\n########################\n".join(str(_) for _ in exc)) raise DataFormatException(mes) # pragma: no cover return pandas.DataFrame(rows)