Code source de papierstat.datasets.carreau

# -*- coding: utf-8 -*-
"""
Jeux de données reliés aux données carroyées.


:githublink:`%|py|6`
"""
import os
import zipfile
import collections
import datetime
import tempfile
from io import BytesIO
from dbfread.field_parser import FieldParser
from dbfread import DBF
from dbfread.dbf import expand_year
import pandas
from .data_helper import get_data_folder


[docs]class DBFInMemory(DBF): """ Overwrites `DBF <https://github.com/olemb/dbfread/blob/master/dbfread/dbf.py#L77>`_ to read data from memory and not from a file. The object `DBF <http://dbfread.readthedocs.io/en/latest/dbf_objects.html>`_ needs a file by default. This class avoids creating an intermediate file when the data is compressed in a :epkg:`zip` file. :githublink:`%|py|26` """
[docs] def __init__(self, filename, encoding=None, ignorecase=True, lowernames=False, parserclass=FieldParser, recfactory=collections.OrderedDict, load=False, raw=False, ignore_missing_memofile=False, char_decode_errors='strict'): if isinstance(filename, str): DBF.__init__(self, filename, encoding=encoding, ignorecase=ignorecase, lowernames=lowernames, parserclass=parserclass, recfactory=recfactory, load=load, raw=raw, ignore_missing_memofile=ignore_missing_memofile, char_decode_errors=char_decode_errors) else: self.encoding = encoding self.ignorecase = ignorecase self.lowernames = lowernames self.parserclass = parserclass self.raw = raw self.ignore_missing_memofile = ignore_missing_memofile self.char_decode_errors = char_decode_errors if recfactory is None: self.recfactory = lambda items: items else: self.recfactory = recfactory self.name = None self.filename = None self.content = filename self._records = None self._deleted = None # Filled in by self._read_headers() self.memofilename = None self.header = None self.fields = [] # namedtuples self.field_names = [] # strings obj = BytesIO(filename) self._read_header(obj) self._read_field_headers(obj) self._check_headers() try: self.date = datetime.date(expand_year(self.header.year), self.header.month, self.header.day) except ValueError: # pragma: no cover # Invalid date or '\x00\x00\x00'. self.date = None self.memofilename = self._get_memofilename() if load: self.load()
[docs] def _iter_records(self, record_type=b' '): infile = BytesIO(self.content) with self._open_memofile() as memofile: # Skip to first record. infile.seek(self.header.headerlen, 0) if not self.raw: field_parser = self.parserclass(self, memofile) parse = field_parser.parse # Shortcuts for speed. skip_record = self._skip_record read = infile.read while True: sep = read(1) if sep == record_type: if self.raw: items = [(field.name, read(field.length)) for field in self.fields] else: items = [(field.name, parse(field, read(field.length))) for field in self.fields] yield self.recfactory(items) elif sep in (b'\x1a', b''): # End of records. break else: skip_record(infile)
[docs]def load_dbf_from_zip(filename): """ Loads a *.dbf* file compressed into a zip file. It only takes the first *.dbf* file from the zip. :param filename: zip file :return: dataframe :githublink:`%|py|127` """ with zipfile.ZipFile(filename) as myzip: names0 = myzip.infolist() names = [_.filename for _ in names0 if _.filename.endswith(".dbf")] if len(names) == 0: raise FileNotFoundError( # pragma: no cover "No dbf file in '{0}'".format(filename)) with myzip.open(names[0], "r") as f: content = f.read() data = list(DBFInMemory(content)) return pandas.DataFrame(data)
[docs]def _read_geopandas_from_bytes(mif, mid, **kwargs): """ Returns a :epkg:`GeoDataFrame` from two sequences of bytes, one for file *.mif*, one from file *.mid*. Unfortunately, :epkg:`geopandas` does not read from a buffer, and :epkg:`fiona` does it after writing in a virtual file (not clear if it is a temporary file or not). :githublink:`%|py|147` """ # Delayed import because the import fails sometimes # on Windows. from geopandas import GeoDataFrame with tempfile.NamedTemporaryFile(mode='w+b', delete=False, suffix='.mif') as temp: temp.write(mif) name_mif = temp.name name_mid = temp.name.replace(".mif", ".mid") with open(name_mid, "wb") as f: f.write(mid) gdf = GeoDataFrame.from_file(name_mid, **kwargs) if os.path.exists(name_mid): os.remove(name_mid) if os.path.exists(name_mif): os.remove(name_mif) return gdf
[docs]def load_shapes_from_zip(filename): """ Loads a *.mif* and a *.mid* file compressed into a zip file. It only takes the first *.mid* and *.mif* files from the zip. :param filename: zip file :return: dataframe :githublink:`%|py|173` """ with zipfile.ZipFile(filename) as myzip: names0 = myzip.infolist() names = [_.filename for _ in names0 if _.filename.endswith(".mif")] if len(names) == 0: raise FileNotFoundError( # pragma: no cover "No mif file in '{0}'".format(filename)) with myzip.open(names[0], "r") as f: mif = f.read() names = [_.filename for _ in names0 if _.filename.endswith(".mid")] if len(names) == 0: raise FileNotFoundError( # pragma: no cover "No mid file in '{0}'".format(filename)) with myzip.open(names[0], "r") as f: mid = f.read() data = _read_geopandas_from_bytes(mif, mid) return data
[docs]def load_carreau_from_zip(file_car=None, file_rect=None): """ Retourne un exemple de données carroyées. Les données sont disponibles dans le répertoire `data <https://github.com/sdpython/papierstat/tree/master/ src/papierstat/datasets/data>`_. Notebooks associés à ce jeu de données : .. runpython:: :rst: from papierstat.datasets.documentation import list_notebooks_rst_links links = list_notebooks_rst_links('visualisation', 'carte_carreau') links = [' * %s' % s for s in links] print('\\n'.join(links)) :param file_car: les carreaux :param file_rect: les données :return: 4 dataframes Résultats: * données sur la population par carreaux * shapefiles des carreaux * données sur la population par rectangles * shapefiles des rectangles .. note:: Afin de respecter la règle de diffusion des données sur les revenus fiscaux des ménages, aucune information statistique (à l'exception du nombre total d'individus) n'est diffusée sur des carreaux de moins de 11 ménages. Ces carreaux de faibles effectifs sont donc regroupés en rectangles de taille plus importante et satisfaisant à cette règle des 11 ménages minimum. `source : INSEE <https://www.insee.fr/fr/statistiques/2520034>`_. :githublink:`%|py|229` """ if file_rect is None and file_car is None: data = get_data_folder() file_rect = os.path.join(data, 'reunion_rect.zip') file_car = os.path.join(data, 'reunion.zip') dfcar = load_dbf_from_zip(file_car) shpcar = load_shapes_from_zip(file_car) dfrect = load_dbf_from_zip(file_rect) shprect = load_shapes_from_zip(file_rect) return dfcar, shpcar, dfrect, shprect