Source code for ensae_projects.datainc.data_cresus

# -*- coding: utf-8 -*-
Script to process the date from Cresus for the hackathon 2016

import os
import sqlite3
import pandas
from pyquickhelper.loghelper import fLOG
from pyensae.sql import Database

[docs]def process_cresus_whole_process(infile, outfold, ratio=0.20, fLOG=fLOG): # pylint: disable=W0621 """ Processes the database from :epkg:`Cresus` until it splits the data into two two sets of files. :githublink:`%|py|17` """ if not os.path.exists(outfold): os.mkdir(outfold) out_clean_sql = os.path.join( outfold, os.path.split(infile)[-1] + ".clean.sql") outdb = os.path.join(outfold, os.path.split(infile)[-1] + ".db3") process_cresus_sql(infile, out_clean_sql=out_clean_sql, outdb=outdb, fLOG=fLOG) tables = prepare_cresus_data(outdb, outfold=outfold, fLOG=fLOG) train, test = split_train_test_cresus_data( tables, outfold, ratio=ratio, fLOG=fLOG) nf = split_XY_bind_dataset_cresus_data(test["dossier"], fLOG) test.update(nf) nf = split_XY_bind_dataset_cresus_data(train["dossier"], fLOG) train.update(nf) return train, test
[docs]def cresus_dummy_file(): """ :return: local filename :githublink:`%|py|38` """ this = os.path.abspath(os.path.dirname(__file__)) name = os.path.join(this, "hackathon_2016_cresus", "bdd_anonyme_cresus.enc") return name
[docs]def process_cresus_sql(infile, out_clean_sql=None, outdb=None, fLOG=fLOG): # pylint: disable=W0621 """ Processes the database sent by cresus and produces a list of flat files. :param infile: dump of a sql database :param out_clean_sql: filename which contains the cleaned sql :param outdb: sqlite3 file (removed if it exists) :param fLOG: logging function :return: dataframe with a list :githublink:`%|py|55` """ if out_clean_sql is None: out_clean_sql = os.path.splitext(infile)[0] + ".cleaned.sql" if outdb is None: outdb = os.path.splitext(infile)[0] + ".db3" dbfile = outdb if os.path.exists(dbfile): os.remove(dbfile) # READING fLOG("[process_cresus_sql] reading", infile) with open(infile, "r", encoding="utf-8") as f: content = fLOG("done") # CLEANING fLOG("[process_cresus_sql] cleaning") lines = content.split("\n") repl = [("ENGINE=InnoDB DEFAULT CHARSET=latin1", ""), ("CHARACTER SET latin1", ""), ("CHARACTER SET utf8", ""), ("\\'", "''"), ("ENGINE=InnoDB AUTO_INCREMENT=100 DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT", ""), ("ENGINE=MyISAM AUTO_INCREMENT=5 DEFAULT CHARSET=utf8", ""), ("ENGINE=MyISAM AUTO_INCREMENT=4 DEFAULT CHARSET=utf8", ""), ("ENGINE=MyISAM AUTO_INCREMENT=6 DEFAULT CHARSET=utf8", ""), ("int(11) NOT NULL AUTO_INCREMENT", "INTEGER"), ] new_lines = [] modified = 0 for line in lines: if line.startswith("CREATE DATABASE "): line = "-- " + line modified += 1 if line.startswith("USE `cresus_anonyme`"): line = "-- " + line modified += 1 for rep, to in repl: if rep in line: line = line.replace(rep, to) modified += 1 # if line.startswith("REPLACE INTO "): # line = "INSERT" + line[len("REPLACE"):] new_lines.append(line) content = "\n".join(new_lines) fLOG("done l=", len(content), " modified", modified) # DATABASE fLOG("[process_cresus_sql] execute", dbfile) con = sqlite3.connect(dbfile) try: con.executescript(content) except Exception as e: try: exp = str(e).split('"')[1] except Exception: raise e # pylint: disable=W0707 lines = content.split("\n") lines = ["{0}/{2}:{1}".format(i, _, len(lines)) for i, _ in enumerate(lines) if exp in _] raise Exception("\n".join(lines)) from e con.commit() # CHECK fLOG("[process_cresus_sql] CHECK") sql = """SELECT name FROM (SELECT * FROM sqlite_master UNION ALL SELECT * FROM sqlite_temp_master) AS temptbl WHERE type in('table','temp') AND name != 'sqlite_sequence' ORDER BY name;""" df = pandas.read_sql(sql, con) fLOG("[process_cresus_sql]", df) fLOG("[process_cresus_sql] done") # COUNT fLOG("[process_cresus_sql] COUNT") sql = " UNION ALL ".join("SELECT \"{0}\" AS name, COUNT(*) AS nb FROM {0}".format(_) for _ in sorted( df = pandas.read_sql(sql, con) fLOG("[process_cresus_sql]", df) fLOG("[process_cresus_sql] done") con.close() with open(out_clean_sql, "w", encoding="utf-8") as f: f.write(content) fLOG("[process_cresus_sql] done") return df
[docs]def prepare_cresus_data(dbfile, outfold=None, fLOG=fLOG): # pylint: disable=W0621 """ Prepares the data for the challenge. :param dbfile: database file :param outfold: output folder :param fLOG: logging function :return: dictionary of table files :githublink:`%|py|152` """ db = Database(dbfile) db.connect() if outfold is None: outfold = "." remove_column = ['nom', 'prenom', 'tel_fixe', 'tel_mobile', 'email', 'adresse', 'rdv1', 'rdv2', 'rdv3', 'fichier_suivi', 'fichier_suivi2', 'media', 'indicateur_suivi', 'memo', 'num_dossier', 'etat_old', 'orientation_old', 'indicateur_suivi_old', 'transfert', 'plan_bdf', 'effacement_dett', "etat", # 'tel_fixe', 'tel_port', ] new_mapping = {'': 'nul1', None: 'nul2', 'Sur-endettement': 'Surendettement', '0': 'nul'} res = {} tables = db.get_table_list() for table in tables: fLOG("[prepare_cresus_data] exporting", table) df = pandas.read_sql("select * from " + table, db._connection) cols = [_ for _ in df.columns if _ not in remove_column] cols.sort() if "orientation" in cols: cols = [_ for _ in cols if _ not in ("orientation", "nature")] cols += ["orientation", "nature"] df["nature"] = df.nature.apply( lambda x: new_mapping.get(x, x).replace("é", "e").lower()) fLOG(set(df["nature"])) df = df[cols] name = os.path.join(outfold, "tbl_" + table + ".txt") df.to_csv(name, sep="\t", encoding="utf-8", index=False) res[table] = name db.close() return res
[docs]def split_train_test_cresus_data(tables, outfold, ratio=0.20, fLOG=fLOG): # pylint: disable=W0621 """ Splits the tables into two sets for tables (based on users). :param tables: dictionary of tables, :func:`prepare_cresus_data <ensae_projects.datainc.data_cresus.prepare_cresus_data>` :param outfold: if not None, output all tables in this folder :param fLOG: logging function :return: couple of dictionaries of table files :githublink:`%|py|203` """ splits = ["user", "agenda", "dossier", "budget"] df = pandas.read_csv(tables["dossier"], sep="\t", encoding="utf-8") short = df[["id", "id_user", "date_ouverture"] ].sort_values("date_ouverture") nb = len(short) train = int(nb * (1 - ratio)) dossiers = set(short.loc[:train, "id"]) users = set(short.loc[:train, "id_user"]) train_tables = {} test_tables = {} for k, v in tables.items(): if k not in splits: fLOG("[split_train_test_cresus_data] no split for", k) data = pandas.read_csv(v, sep="\t", encoding="utf-8") train_tables[k] = data test_tables[k] = data else: if k == "dossier": train_tables[k] = df[:train].copy() test_tables[k] = df[train:].copy() else: data = pandas.read_csv(v, sep="\t", encoding="utf-8") if "id_dossier" in data.columns: key = "id_dossier" select = dossiers elif k == "user": key = "id" select = users else: raise Exception("unexpected: {0}".format(k)) try: spl = data[key].apply(lambda x, ens=select: x in ens) except KeyError as e: raise Exception("issue for table '{0}' and columns={1}".format( k, data.columns)) from e train_tables[k] = data[spl].copy() test_tables[k] = data[~spl].copy() fLOG("[split_train_test_cresus_data] split for", k, train_tables[k].shape, test_tables[k].shape) rtrain = {} for k, v in train_tables.items(): name = os.path.join(outfold, "tbl_train_" + k + ".txt") v.to_csv(name, index=False, sep="\t", encoding="utf-8") rtrain[k] = name rtest = {} for k, v in test_tables.items(): name = os.path.join(outfold, "tbl_test_" + k + ".txt") v.to_csv(name, index=False, sep="\t", encoding="utf-8") rtest[k] = name return rtrain, rtest
[docs]def split_XY_bind_dataset_cresus_data(filename, fLOG=fLOG): # pylint: disable=W0621 """ Splits XY for the blind set. :param filename: table to split :param fLOG: logging function :return: dictionary of created files It assumes the targets are columns *orientation*, *nature*. :githublink:`%|py|267` """ df = pandas.read_csv(filename, sep="\t", encoding="utf-8") isnull = df["orientation"].isnull() | df["nature"].isnull() df = df[~isnull] X = df.drop(["orientation", "nature"], axis=1) Y = df[["orientation", "nature"]] xname = os.path.splitext(filename)[0] + ".X.txt" yname = os.path.splitext(filename)[0] + ".Y.txt" X.to_csv(xname, sep="\t", index=False, encoding="utf-8") Y.to_csv(yname, sep="\t", index=False, encoding="utf-8") return {"dossier.X": xname, "dossier.Y": yname}