Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1""" 

2@file 

3 

4@brief @see cl Database 

5""" 

6 

7import re 

8import copy 

9import random 

10import sqlite3 as SQLite 

11import datetime 

12import decimal 

13import numpy 

14 

15 

16from .file_text_binary import TextFile 

17from .database_exception import ExceptionSQL 

18 

19 

20class NoHeaderException(Exception): 

21 

22 """ 

23 just to be meant to be caucht later by a unit test 

24 """ 

25 pass 

26 

27 

28class DatabaseCore2: 

29 

30 """ 

31 Complementary methods for class @see cl Database. 

32 """ 

33 

34 _split_expr = "\\r?\\t" 

35 

36 def _check_connection(self): 

37 """ 

38 Checks the SQL connection. 

39 """ 

40 if "_connection" not in self.__dict__: 

41 message = "use connect method before doing operation on this database" 

42 raise Exception(message) 

43 

44 def _check_values(self, values): 

45 """ 

46 When values are inserted or updated, this method doubles ``"'"`` 

47 it does not allow str values, only str. 

48 

49 @param values dictionary 

50 @return dictionary 

51 """ 

52 mod = [] 

53 for k, v in values.items(): 

54 if isinstance(v, str) and "'" in v: 

55 mod.append(k) 

56 if len(mod) == 0: 

57 return values 

58 else: 

59 values = copy.copy(values) 

60 for k in mod: 

61 values[k] = values[k].replace("'", "''") 

62 return values 

63 

64 def summary(self, light=False): 

65 """ 

66 Returns the list of tables, their columns, and their length. 

67 

68 @param light light version, no count, no first lines 

69 @return a dictionary where the keys are (t,i), t is a table name, i is in ["columns", "size", "first_lines"], 

70 a str message 

71 """ 

72 tables = self.get_table_list() 

73 indexes = self.get_index_list() 

74 res = {} 

75 lines = [] 

76 

77 for t in tables: 

78 col = self.get_table_columns_list(t) 

79 if not light: 

80 size = self.get_table_nb_lines(t) 

81 first = self.get_table_nfirst_lines(t) 

82 else: 

83 size = -1 

84 first = [] 

85 

86 res[t, "columns"] = col 

87 res[t, "size"] = size 

88 res[t, "first_lines"] = first 

89 

90 lines.append(t + "\t" + str(size) + " records") 

91 lines.append(" columns") 

92 for c in col: 

93 lines.append(" " + str(c)) 

94 

95 if len(first) > 0: 

96 lines.append(" first_lines") 

97 for lf in first: 

98 fo = [] 

99 if lf is None: 

100 lines.append(" None") 

101 else: 

102 for x in lf: 

103 if not isinstance(x, str): 

104 fo.append(str(x)) 

105 else: 

106 fo.append(x) 

107 lines.append(" " + "\t".join(fo)) 

108 

109 if len(indexes) > 0: 

110 lines.append("\n") 

111 lines.append("indexes") 

112 for tu in indexes: 

113 if isinstance(tu, (tuple, list)): 

114 lines.append(" " + "\t".join([str(x) for x in tu])) 

115 else: 

116 lines.append(" " + tu) 

117 

118 attached = self.get_attached_database_list() 

119 if len(attached) > 0: 

120 lines.append("\n") 

121 lines.append("attached databases") 

122 for a in attached: 

123 if a == "main": 

124 continue 

125 lines.append(" " + "\t" + a) 

126 continue 

127 # ~ rrr = self.execute( 

128 # ~ "SELECT name FROM %s.sqlite_master ORDER BY name;" % 

129 # ~ (a,)) 

130 # ~ for b in rrr: 

131 # ~ lines.append(" " + "\t" + b[0]) 

132 

133 return res, "\n".join(lines) 

134 

135 def _guess_columns( 

136 self, file, format, columns_name=None, filter_case=None, header=True, encoding="utf-8"): 

137 """ 

138 Guesses the columns types from a file (the method assumes there is a header), 

139 The types are chosen in that order: int, float, str. 

140 It keeps the most frequent one with if there is not too many errors. 

141 The separator must be tabs (``\\t``). 

142 

143 @param file file name 

144 @param format format (only tsv) 

145 @param columns_name if None, the first line contains the columns, otherwise it is the columns name 

146 @param filter_case process every case information (used to replace space for example) 

147 @param header by default, the function is expected a header 

148 @param encoding encoding 

149 @return columns, changes 

150 """ 

151 f = TextFile(file, fLOG=self.LOG, encoding=encoding) 

152 f.open() 

153 

154 if header: 

155 _aa, _bb, _cc, _dd = f.guess_columns(fields=columns_name) 

156 reg_exp = re.compile(DatabaseCore2._split_expr.replace( 

157 "\\t", _cc.replace("|", "[|]"))) 

158 else: 

159 # tabulation by default 

160 reg_exp = re.compile(DatabaseCore2._split_expr) 

161 f.close() 

162 raise NoHeaderException("a header is expected for that function") 

163 

164 self.LOG(" [_guess_columns] sep={0}".format([_cc])) 

165 

166 lines = [] 

167 for line in f: 

168 if len(lines) > 1000: 

169 break 

170 if len(lines) > 900 and random.randint(0, 10) > 0: 

171 continue 

172 lines.append(reg_exp.split( 

173 line.strip(" \r\n").strip('\ufeff'))) 

174 f.close() 

175 

176 if len(lines) <= 1: 

177 raise Exception("file %s is empty" % file) 

178 

179 exp = re.compile("\\W+") 

180 columns = {} 

181 done = {} 

182 count = {} 

183 changes = {} 

184 

185 for i in range(0, len(lines[0])): 

186 if lines[0][i] in [ 

187 '\ufeffID', '\ufeffid', '\ufeffqid', '\ufeffQID']: 

188 lines[0][i] = "qid" 

189 

190 if columns_name is None: 

191 name = lines[0][i].replace(":", "_") 

192 origin = lines[0][i] 

193 else: 

194 name = columns_name[i].replace(":", "_") 

195 origin = columns_name[i] 

196 

197 name = name.replace("-", "_").replace(" ", "_") 

198 

199 spl = exp.split(name) 

200 if len(spl) > 1: 

201 name = "".join(spl) 

202 if name[0] in "0123456789": 

203 name = "_" + name 

204 

205 if name in count: 

206 count[name] += 1 

207 name += str(count[name]) 

208 else: 

209 count[name] = 1 

210 

211 #lines [0][i] = name 

212 columns[i] = (name, int) 

213 done[i] = False 

214 

215 if origin != name: 

216 changes[origin] = name 

217 

218 self.LOG(" [_guess_columns] columns_name={0}".format(columns_name)) 

219 

220 length = {} 

221 nbline = 0 

222 count_types = {} 

223 

224 for line_ in lines[1:]: 

225 if filter_case is None: 

226 line = line_ 

227 else: 

228 line = [filter_case(s) for s in line_] 

229 nbline += 1 

230 if line == [] or line == ['']: 

231 continue 

232 

233 for i in range(0, len(line)): 

234 

235 if i >= len(done): 

236 # it is probably a wrong line 

237 continue 

238 

239 vl = length.get(i, 0) 

240 if len(line[i]) > vl: 

241 length[i] = len(line[i]) 

242 

243 try: 

244 if done[i]: 

245 continue 

246 except KeyError as e: 

247 str_columns = "" 

248 for k, v in columns.items(): 

249 str_columns += " " + \ 

250 str(k) + "\t" + str(v) + "\n" 

251 mes = "KeyError:" + str(e) + "\n" + str(done) + "\n" + str_columns + "\nnb line " + str( 

252 nbline) + " columns: " + str(len(line)) + "\n" + str(line) 

253 raise RuntimeError( # pylint: disable=W0707 

254 "problem\n" + 

255 mes + 

256 "\n\ncount_types:\n " + 

257 "\n ".join( 

258 "{0}:{1}".format( 

259 k, 

260 v) for k, 

261 v in sorted( 

262 count_types.items()))) 

263 

264 if line[i] is None or len(line[i]) == 0: 

265 continue 

266 

267 try: 

268 x = int(line[i]) 

269 if abs(x) >= 2147483647: 

270 raise ValueError("too big int") 

271 

272 if i not in count_types: 

273 count_types[i] = {int: 1} 

274 else: 

275 count_types[i][int] = count_types[i].get(int, 0) + 1 

276 

277 except ValueError: 

278 try: 

279 x = float(line[i]) 

280 

281 if i not in count_types: 

282 count_types[i] = {float: 1} 

283 else: 

284 count_types[i][float] = count_types[ 

285 i].get(float, 0) + 1 

286 

287 if columns[i][1] != float: 

288 columns[i] = (columns[i][0], float) 

289 

290 except ValueError: 

291 columns[i] = ( 

292 columns[i][0], (str, max( 

293 1, len( 

294 line[i])) * 2)) 

295 

296 if i not in count_types: 

297 count_types[i] = {str: 1} 

298 else: 

299 count_types[i][str] = count_types[ 

300 i].get(str, 0) + 1 

301 

302 self.LOG(" guess with ", len(lines), "lines") 

303 self.LOG(" count_types ", count_types) 

304 for i in range(0, len(columns)): 

305 

306 # if i is not in count_types, it means the first rows do now 

307 # contain values for these columns (only null values) 

308 t = count_types.get(i, {str: 1}) 

309 nb = sum(t.values()) 

310 

311 th = 0.0 if nb < 50 else ( 

312 0.01 if nb < 100 else 0.02) # we authorize 2% of wrong types 

313 

314 n = t.get(int, 0) 

315 if n * 1.0 / nb >= 1 - th: 

316 ty = int 

317 else: 

318 n += t.get(float, 0) 

319 if n * 1.0 / nb >= 1 - th: 

320 ty = float 

321 else: 

322 ty = str 

323 

324 columns[i] = (columns[i][0], ty) 

325 

326 self.LOG(" columns ", columns) 

327 

328 # if not done, choose str by default 

329 for c in columns: 

330 v = columns[c] 

331 if v[1] == str: 

332 columns[c] = (v[0], (str, max(1, length.get(c, 4)) * 2)) 

333 

334 for c, v in columns.items(): 

335 t = v[1] 

336 if isinstance(t, tuple) and t[0] == str and t[1] == 0: 

337 raise Exception( 

338 "the length is null for column %s - %s" % 

339 (c, str(v))) 

340 

341 self.LOG(" guess", columns) 

342 return columns, changes 

343 

344 def _process_text_line(self, line, columns, format, lower_case, num_line, 

345 fill_missing=0, filter_case=None, 

346 strict_separator=False): 

347 """ 

348 Processes a text line. 

349 

350 @param line text line to process (or a list if it already splitted) 

351 @param columns columns definition @see me _append_table 

352 @param format only tsv for the moment 

353 @param lower_case put every str object in lower_case 

354 @param num_line line number 

355 @param fill_missing fill the missing values by a default value, at least not more than fill_missing values 

356 @param filter_case process every case information (used to replace space for example) 

357 @param strict_separator strict number of columns, it assumes there is no separator in the content of every column 

358 @return a dictionary 

359 """ 

360 if not isinstance(line, list) and not isinstance( 

361 line, tuple) and not isinstance(line, numpy.ndarray): 

362 if format != "tsv": 

363 raise Exception("unable to process format " + format) 

364 line = line.strip("\r\n ").replace("\n", " ") 

365 line = DatabaseCore2._split_expr.split(line) 

366 

367 if filter_case is not None: 

368 line = [filter_case(s) for s in line] 

369 

370 try: 

371 if fill_missing > 0: 

372 m = max(columns.keys()) 

373 if m >= len(line): 

374 line = copy.copy(line) 

375 add = 0 

376 while m >= len(line) and add < fill_missing: 

377 a, b = columns[len(line)] 

378 if b is int: 

379 line.append("0") 

380 elif b is float: 

381 line.append("0.0") 

382 elif b is decimal.Decimal: 

383 line.append("0") 

384 elif b is str: 

385 line.append("") 

386 else: 

387 line.append("") 

388 add += 1 

389 

390 res = {} 

391 for c, v in columns.items(): 

392 if "AUTOFILL" in v: 

393 res[v[0]] = "NULL" 

394 elif "AUTOINCREMENT" in v: 

395 continue 

396 else: 

397 if c >= len(line): 

398 self.LOG( 

399 "(a)line number ", 

400 num_line, 

401 "*unable to process a line columns ", 

402 c, 

403 "#", 

404 line, 

405 " columns ", 

406 columns) 

407 return None 

408 

409 val = line[c] 

410 if len(v) > 2 and v[2].lower() not in [ 

411 "primarykey", "autofill"]: 

412 val = v[2](val) 

413 

414 try: 

415 if isinstance(v[1], tuple): 

416 val = v[1][0](val) 

417 elif v[1] is datetime.datetime: 

418 if isinstance(val, datetime.datetime): 

419 pass 

420 elif isinstance(val, str): 

421 val = datetime.datetime.parse(val) 

422 else: 

423 raise TypeError( 

424 "unable to convert %s into datetime" % str( 

425 type(val))) 

426 else: 

427 val = v[1](val) 

428 except ValueError: # as e : 

429 self.LOG( 

430 "(b)line number ", 

431 num_line, 

432 "**unable to process a line columns ", 

433 c, 

434 "#", 

435 v[0], 

436 " type ", 

437 v[1], 

438 " value ", 

439 repr( 

440 line[c])) 

441 return None 

442 

443 if isinstance(val, str): 

444 val = val.replace("'", "''") 

445 if lower_case: 

446 val = val.lower() 

447 res[v[0]] = val 

448 

449 return res 

450 except Exception: 

451 self.LOG("(c)line number", num_line, 

452 "***unable to process a line columns:", line) 

453 return None 

454 

455 def _get_insert_request(self, dico, 

456 table, 

457 exe=False, 

458 primarykey=None, 

459 cursor=None, 

460 skip_exception=False): 

461 """ 

462 Builds an ``INSERT SQL`` request from a dictionary. 

463 

464 @param dico dictionary 

465 @param table table name 

466 @param exe if True, execute the request, if False, do nothing except returning the request 

467 @param primarykey primary key column, if it exist 

468 @param cursor if None, creates a new one, otherwise use it 

469 @param skip_exception if True, log exception instead of raising one 

470 @return str """ 

471 keys = [] 

472 values = [] 

473 for k, v in dico.items(): 

474 keys.append(k) 

475 if k != primarykey and isinstance(v, str): 

476 v = "'" + str(v).replace("'", "''") + "'" 

477 values.append(v) 

478 elif isinstance(v, datetime.datetime): 

479 values.append("'" + str(v) + "'") 

480 else: 

481 values.append(str(v)) 

482 keys = ",".join(keys) 

483 values = ",".join(values) 

484 sql = "INSERT INTO %s (%s) VALUES (%s);" % (table, keys, values) 

485 if exe: 

486 try: 

487 if cursor is not None: 

488 cursor.execute(sql) 

489 else: 

490 self._connection.execute(sql) 

491 except SQLite.OperationalError as e: 

492 if skip_exception: 

493 self.LOG( 

494 "OperationalError: unable to execute a query", e, sql) 

495 else: 

496 raise ExceptionSQL( # pylint: disable=W0707 

497 "OperationalError: unable to execute a query", e, sql) 

498 except SQLite.IntegrityError as e: 

499 if skip_exception: 

500 self.LOG("IntegrityError: unable to execute a query", e, sql) 

501 else: 

502 raise ExceptionSQL( # pylint: disable=W0707 

503 "IntegrityError: unable to execute a query", e, sql) 

504 return sql 

505 

506 def get_python_code(self, varname="db"): 

507 """ 

508 Returns the python code associated to this database. 

509 

510 @param varname name of the variable 

511 @return 2-uple: simp, scode (import part, code part) 

512 """ 

513 simp = ["from pyensae import Database"] 

514 

515 code = ["tblname = r'%s'" % self.get_file()] 

516 

517 more = [] 

518 if self._engine != "SQLite": 

519 code.append("engine = '%s'" % self._engine) 

520 if self._user is not None: 

521 code.append("user = '%s'" % self._user) 

522 if self._password is not None: 

523 code.append("password = '%s'" % self._password) 

524 if self._host is not None: 

525 code.append("host = '%s'" % self._host) 

526 more = ", ".join(more) 

527 

528 code.append( 

529 "%s = Database (dbfile = tblname%s)" % 

530 (varname, more)) 

531 code.append("%s.connect ()" % varname) 

532 att = self.get_attached_database_list(file=True) 

533 for alias, file in att: 

534 if len(file) == 0: 

535 continue 

536 code.append( 

537 "%s.attach_database ('%s','%s')" % 

538 (varname, file, alias)) 

539 

540 return "\n".join(simp) + "\n", "\n".join(code) + "\n"