Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# -*- coding: utf-8 -*- 

2""" 

3@file 

4@brief Script to process the date from Cresus for the hackathon 2016 

5""" 

6import os 

7import sqlite3 

8import pandas 

9from pyquickhelper.loghelper import fLOG 

10from pyensae.sql import Database 

11 

12 

13def process_cresus_whole_process(infile, outfold, ratio=0.20, fLOG=fLOG): # pylint: disable=W0621 

14 """ 

15 Processes the database from :epkg:`Cresus` until it splits the data into two 

16 two sets of files. 

17 """ 

18 if not os.path.exists(outfold): 

19 os.mkdir(outfold) 

20 out_clean_sql = os.path.join( 

21 outfold, os.path.split(infile)[-1] + ".clean.sql") 

22 outdb = os.path.join(outfold, os.path.split(infile)[-1] + ".db3") 

23 process_cresus_sql(infile, out_clean_sql=out_clean_sql, 

24 outdb=outdb, fLOG=fLOG) 

25 tables = prepare_cresus_data(outdb, outfold=outfold, fLOG=fLOG) 

26 train, test = split_train_test_cresus_data( 

27 tables, outfold, ratio=ratio, fLOG=fLOG) 

28 nf = split_XY_bind_dataset_cresus_data(test["dossier"], fLOG) 

29 test.update(nf) 

30 nf = split_XY_bind_dataset_cresus_data(train["dossier"], fLOG) 

31 train.update(nf) 

32 return train, test 

33 

34 

35def cresus_dummy_file(): 

36 """ 

37 @return local filename 

38 """ 

39 this = os.path.abspath(os.path.dirname(__file__)) 

40 name = os.path.join(this, "hackathon_2016_cresus", 

41 "bdd_anonyme_cresus.enc") 

42 return name 

43 

44 

45def process_cresus_sql(infile, out_clean_sql=None, outdb=None, fLOG=fLOG): # pylint: disable=W0621 

46 """ 

47 Processes the database sent by cresus and produces 

48 a list of flat files. 

49 

50 @param infile dump of a sql database 

51 @param out_clean_sql filename which contains the cleaned sql 

52 @param outdb sqlite3 file (removed if it exists) 

53 @param fLOG logging function 

54 @return dataframe with a list 

55 """ 

56 if out_clean_sql is None: 

57 out_clean_sql = os.path.splitext(infile)[0] + ".cleaned.sql" 

58 if outdb is None: 

59 outdb = os.path.splitext(infile)[0] + ".db3" 

60 

61 dbfile = outdb 

62 if os.path.exists(dbfile): 

63 os.remove(dbfile) 

64 

65 # READING 

66 fLOG("[process_cresus_sql] reading", infile) 

67 with open(infile, "r", encoding="utf-8") as f: 

68 content = f.read() 

69 fLOG("done") 

70 

71 # CLEANING 

72 fLOG("[process_cresus_sql] cleaning") 

73 lines = content.split("\n") 

74 repl = [("ENGINE=InnoDB DEFAULT CHARSET=latin1", ""), 

75 ("CHARACTER SET latin1", ""), 

76 ("CHARACTER SET utf8", ""), 

77 ("\\'", "''"), 

78 ("ENGINE=InnoDB AUTO_INCREMENT=100 DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT", ""), 

79 ("ENGINE=MyISAM AUTO_INCREMENT=5 DEFAULT CHARSET=utf8", ""), 

80 ("ENGINE=MyISAM AUTO_INCREMENT=4 DEFAULT CHARSET=utf8", ""), 

81 ("ENGINE=MyISAM AUTO_INCREMENT=6 DEFAULT CHARSET=utf8", ""), 

82 ("int(11) NOT NULL AUTO_INCREMENT", "INTEGER"), 

83 ] 

84 new_lines = [] 

85 modified = 0 

86 for line in lines: 

87 if line.startswith("CREATE DATABASE "): 

88 line = "-- " + line 

89 modified += 1 

90 if line.startswith("USE `cresus_anonyme`"): 

91 line = "-- " + line 

92 modified += 1 

93 for rep, to in repl: 

94 if rep in line: 

95 line = line.replace(rep, to) 

96 modified += 1 

97 # if line.startswith("REPLACE INTO "): 

98 # line = "INSERT" + line[len("REPLACE"):] 

99 new_lines.append(line) 

100 content = "\n".join(new_lines) 

101 fLOG("done l=", len(content), " modified", modified) 

102 

103 # DATABASE 

104 fLOG("[process_cresus_sql] execute", dbfile) 

105 con = sqlite3.connect(dbfile) 

106 try: 

107 con.executescript(content) 

108 except Exception as e: 

109 try: 

110 exp = str(e).split('"')[1] 

111 except Exception: 

112 raise e # pylint: disable=W0707 

113 lines = content.split("\n") 

114 lines = ["{0}/{2}:{1}".format(i, _, len(lines)) 

115 for i, _ in enumerate(lines) if exp in _] 

116 raise Exception("\n".join(lines)) from e 

117 con.commit() 

118 

119 # CHECK 

120 fLOG("[process_cresus_sql] CHECK") 

121 sql = """SELECT name FROM (SELECT * FROM sqlite_master UNION ALL SELECT * FROM sqlite_temp_master) AS temptbl 

122 WHERE type in('table','temp') AND name != 'sqlite_sequence' ORDER BY name;""" 

123 df = pandas.read_sql(sql, con) 

124 fLOG("[process_cresus_sql]", df) 

125 fLOG("[process_cresus_sql] done") 

126 

127 # COUNT 

128 fLOG("[process_cresus_sql] COUNT") 

129 sql = " UNION ALL ".join("SELECT \"{0}\" AS name, COUNT(*) AS nb FROM {0}".format(_) 

130 for _ in sorted(df.name)) 

131 df = pandas.read_sql(sql, con) 

132 fLOG("[process_cresus_sql]", df) 

133 fLOG("[process_cresus_sql] done") 

134 

135 con.close() 

136 

137 with open(out_clean_sql, "w", encoding="utf-8") as f: 

138 f.write(content) 

139 

140 fLOG("[process_cresus_sql] done") 

141 return df 

142 

143 

144def prepare_cresus_data(dbfile, outfold=None, fLOG=fLOG): # pylint: disable=W0621 

145 """ 

146 Prepares the data for the challenge. 

147 

148 @param dbfile database file 

149 @param outfold output folder 

150 @param fLOG logging function 

151 @return dictionary of table files 

152 """ 

153 db = Database(dbfile) 

154 db.connect() 

155 

156 if outfold is None: 

157 outfold = "." 

158 

159 remove_column = ['nom', 'prenom', 

160 'tel_fixe', 'tel_mobile', 'email', 

161 'adresse', 'rdv1', 'rdv2', 'rdv3', 

162 'fichier_suivi', 'fichier_suivi2', 'media', 

163 'indicateur_suivi', 'memo', 'num_dossier', 

164 'etat_old', 'orientation_old', 'indicateur_suivi_old', 

165 'transfert', 'plan_bdf', 'effacement_dett', "etat", 

166 # 

167 'tel_fixe', 'tel_port', 

168 ] 

169 

170 new_mapping = {'': 'nul1', None: 'nul2', 

171 'Sur-endettement': 'Surendettement', '0': 'nul'} 

172 

173 res = {} 

174 tables = db.get_table_list() 

175 for table in tables: 

176 fLOG("[prepare_cresus_data] exporting", table) 

177 df = pandas.read_sql("select * from " + table, db._connection) 

178 cols = [_ for _ in df.columns if _ not in remove_column] 

179 cols.sort() 

180 if "orientation" in cols: 

181 cols = [_ for _ in cols if _ not in ("orientation", "nature")] 

182 cols += ["orientation", "nature"] 

183 df["nature"] = df.nature.apply( # pylint: disable=E1101 

184 lambda x: new_mapping.get(x, x).replace("é", "e").lower()) 

185 fLOG(set(df["nature"])) 

186 df = df[cols] 

187 name = os.path.join(outfold, "tbl_" + table + ".txt") 

188 df.to_csv(name, sep="\t", encoding="utf-8", index=False) 

189 res[table] = name 

190 db.close() 

191 return res 

192 

193 

194def split_train_test_cresus_data(tables, outfold, ratio=0.20, fLOG=fLOG): # pylint: disable=W0621 

195 """ 

196 Splits the tables into two sets for tables (based on users). 

197 

198 @param tables dictionary of tables, 

199 @see fn prepare_cresus_data 

200 @param outfold if not None, output all tables in this folder 

201 @param fLOG logging function 

202 @return couple of dictionaries of table files 

203 """ 

204 splits = ["user", "agenda", "dossier", "budget"] 

205 df = pandas.read_csv(tables["dossier"], sep="\t", encoding="utf-8") 

206 short = df[["id", "id_user", "date_ouverture"] 

207 ].sort_values("date_ouverture") 

208 nb = len(short) 

209 train = int(nb * (1 - ratio)) 

210 dossiers = set(short.loc[:train, "id"]) 

211 users = set(short.loc[:train, "id_user"]) 

212 

213 train_tables = {} 

214 test_tables = {} 

215 for k, v in tables.items(): 

216 if k not in splits: 

217 fLOG("[split_train_test_cresus_data] no split for", k) 

218 data = pandas.read_csv(v, sep="\t", encoding="utf-8") 

219 train_tables[k] = data 

220 test_tables[k] = data 

221 else: 

222 if k == "dossier": 

223 train_tables[k] = df[:train].copy() 

224 test_tables[k] = df[train:].copy() 

225 else: 

226 data = pandas.read_csv(v, sep="\t", encoding="utf-8") 

227 if "id_dossier" in data.columns: 

228 key = "id_dossier" 

229 select = dossiers 

230 elif k == "user": 

231 key = "id" 

232 select = users 

233 else: 

234 raise Exception("unexpected: {0}".format(k)) 

235 try: 

236 spl = data[key].apply(lambda x, ens=select: x in ens) # pylint: disable=E1136 

237 except KeyError as e: 

238 raise Exception("issue for table '{0}' and columns={1}".format( 

239 k, data.columns)) from e # pylint: disable=E1101 

240 train_tables[k] = data[spl].copy() # pylint: disable=E1136 

241 test_tables[k] = data[~spl].copy() # pylint: disable=E1136 

242 fLOG("[split_train_test_cresus_data] split for", k, 

243 train_tables[k].shape, test_tables[k].shape) 

244 

245 rtrain = {} 

246 for k, v in train_tables.items(): 

247 name = os.path.join(outfold, "tbl_train_" + k + ".txt") 

248 v.to_csv(name, index=False, sep="\t", encoding="utf-8") 

249 rtrain[k] = name 

250 rtest = {} 

251 for k, v in test_tables.items(): 

252 name = os.path.join(outfold, "tbl_test_" + k + ".txt") 

253 v.to_csv(name, index=False, sep="\t", encoding="utf-8") 

254 rtest[k] = name 

255 return rtrain, rtest 

256 

257 

258def split_XY_bind_dataset_cresus_data(filename, fLOG=fLOG): # pylint: disable=W0621 

259 """ 

260 Splits XY for the blind set. 

261 

262 @param filename table to split 

263 @param fLOG logging function 

264 @return dictionary of created files 

265 

266 It assumes the targets are columns *orientation*, *nature*. 

267 """ 

268 df = pandas.read_csv(filename, sep="\t", encoding="utf-8") 

269 isnull = df["orientation"].isnull() | df["nature"].isnull() 

270 df = df[~isnull] 

271 X = df.drop(["orientation", "nature"], axis=1) 

272 Y = df[["orientation", "nature"]] 

273 xname = os.path.splitext(filename)[0] + ".X.txt" 

274 yname = os.path.splitext(filename)[0] + ".Y.txt" 

275 X.to_csv(xname, sep="\t", index=False, encoding="utf-8") 

276 Y.to_csv(yname, sep="\t", index=False, encoding="utf-8") 

277 return {"dossier.X": xname, "dossier.Y": yname}