Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2@file
4@brief @see cl Database
5"""
7import re
8import copy
9import random
10import sqlite3 as SQLite
11import datetime
12import decimal
13import numpy
16from .file_text_binary import TextFile
17from .database_exception import ExceptionSQL
20class NoHeaderException(Exception):
22 """
23 just to be meant to be caucht later by a unit test
24 """
25 pass
28class DatabaseCore2:
30 """
31 Complementary methods for class @see cl Database.
32 """
34 _split_expr = "\\r?\\t"
36 def _check_connection(self):
37 """
38 Checks the SQL connection.
39 """
40 if "_connection" not in self.__dict__:
41 message = "use connect method before doing operation on this database"
42 raise Exception(message)
44 def _check_values(self, values):
45 """
46 When values are inserted or updated, this method doubles ``"'"``
47 it does not allow str values, only str.
49 @param values dictionary
50 @return dictionary
51 """
52 mod = []
53 for k, v in values.items():
54 if isinstance(v, str) and "'" in v:
55 mod.append(k)
56 if len(mod) == 0:
57 return values
58 else:
59 values = copy.copy(values)
60 for k in mod:
61 values[k] = values[k].replace("'", "''")
62 return values
64 def summary(self, light=False):
65 """
66 Returns the list of tables, their columns, and their length.
68 @param light light version, no count, no first lines
69 @return a dictionary where the keys are (t,i), t is a table name, i is in ["columns", "size", "first_lines"],
70 a str message
71 """
72 tables = self.get_table_list()
73 indexes = self.get_index_list()
74 res = {}
75 lines = []
77 for t in tables:
78 col = self.get_table_columns_list(t)
79 if not light:
80 size = self.get_table_nb_lines(t)
81 first = self.get_table_nfirst_lines(t)
82 else:
83 size = -1
84 first = []
86 res[t, "columns"] = col
87 res[t, "size"] = size
88 res[t, "first_lines"] = first
90 lines.append(t + "\t" + str(size) + " records")
91 lines.append(" columns")
92 for c in col:
93 lines.append(" " + str(c))
95 if len(first) > 0:
96 lines.append(" first_lines")
97 for lf in first:
98 fo = []
99 if lf is None:
100 lines.append(" None")
101 else:
102 for x in lf:
103 if not isinstance(x, str):
104 fo.append(str(x))
105 else:
106 fo.append(x)
107 lines.append(" " + "\t".join(fo))
109 if len(indexes) > 0:
110 lines.append("\n")
111 lines.append("indexes")
112 for tu in indexes:
113 if isinstance(tu, (tuple, list)):
114 lines.append(" " + "\t".join([str(x) for x in tu]))
115 else:
116 lines.append(" " + tu)
118 attached = self.get_attached_database_list()
119 if len(attached) > 0:
120 lines.append("\n")
121 lines.append("attached databases")
122 for a in attached:
123 if a == "main":
124 continue
125 lines.append(" " + "\t" + a)
126 continue
127 # ~ rrr = self.execute(
128 # ~ "SELECT name FROM %s.sqlite_master ORDER BY name;" %
129 # ~ (a,))
130 # ~ for b in rrr:
131 # ~ lines.append(" " + "\t" + b[0])
133 return res, "\n".join(lines)
135 def _guess_columns(
136 self, file, format, columns_name=None, filter_case=None, header=True, encoding="utf-8"):
137 """
138 Guesses the columns types from a file (the method assumes there is a header),
139 The types are chosen in that order: int, float, str.
140 It keeps the most frequent one with if there is not too many errors.
141 The separator must be tabs (``\\t``).
143 @param file file name
144 @param format format (only tsv)
145 @param columns_name if None, the first line contains the columns, otherwise it is the columns name
146 @param filter_case process every case information (used to replace space for example)
147 @param header by default, the function is expected a header
148 @param encoding encoding
149 @return columns, changes
150 """
151 f = TextFile(file, fLOG=self.LOG, encoding=encoding)
152 f.open()
154 if header:
155 _aa, _bb, _cc, _dd = f.guess_columns(fields=columns_name)
156 reg_exp = re.compile(DatabaseCore2._split_expr.replace(
157 "\\t", _cc.replace("|", "[|]")))
158 else:
159 # tabulation by default
160 reg_exp = re.compile(DatabaseCore2._split_expr)
161 f.close()
162 raise NoHeaderException("a header is expected for that function")
164 self.LOG(" [_guess_columns] sep={0}".format([_cc]))
166 lines = []
167 for line in f:
168 if len(lines) > 1000:
169 break
170 if len(lines) > 900 and random.randint(0, 10) > 0:
171 continue
172 lines.append(reg_exp.split(
173 line.strip(" \r\n").strip('\ufeff')))
174 f.close()
176 if len(lines) <= 1:
177 raise Exception("file %s is empty" % file)
179 exp = re.compile("\\W+")
180 columns = {}
181 done = {}
182 count = {}
183 changes = {}
185 for i in range(0, len(lines[0])):
186 if lines[0][i] in [
187 '\ufeffID', '\ufeffid', '\ufeffqid', '\ufeffQID']:
188 lines[0][i] = "qid"
190 if columns_name is None:
191 name = lines[0][i].replace(":", "_")
192 origin = lines[0][i]
193 else:
194 name = columns_name[i].replace(":", "_")
195 origin = columns_name[i]
197 name = name.replace("-", "_").replace(" ", "_")
199 spl = exp.split(name)
200 if len(spl) > 1:
201 name = "".join(spl)
202 if name[0] in "0123456789":
203 name = "_" + name
205 if name in count:
206 count[name] += 1
207 name += str(count[name])
208 else:
209 count[name] = 1
211 #lines [0][i] = name
212 columns[i] = (name, int)
213 done[i] = False
215 if origin != name:
216 changes[origin] = name
218 self.LOG(" [_guess_columns] columns_name={0}".format(columns_name))
220 length = {}
221 nbline = 0
222 count_types = {}
224 for line_ in lines[1:]:
225 if filter_case is None:
226 line = line_
227 else:
228 line = [filter_case(s) for s in line_]
229 nbline += 1
230 if line == [] or line == ['']:
231 continue
233 for i in range(0, len(line)):
235 if i >= len(done):
236 # it is probably a wrong line
237 continue
239 vl = length.get(i, 0)
240 if len(line[i]) > vl:
241 length[i] = len(line[i])
243 try:
244 if done[i]:
245 continue
246 except KeyError as e:
247 str_columns = ""
248 for k, v in columns.items():
249 str_columns += " " + \
250 str(k) + "\t" + str(v) + "\n"
251 mes = "KeyError:" + str(e) + "\n" + str(done) + "\n" + str_columns + "\nnb line " + str(
252 nbline) + " columns: " + str(len(line)) + "\n" + str(line)
253 raise RuntimeError( # pylint: disable=W0707
254 "problem\n" +
255 mes +
256 "\n\ncount_types:\n " +
257 "\n ".join(
258 "{0}:{1}".format(
259 k,
260 v) for k,
261 v in sorted(
262 count_types.items())))
264 if line[i] is None or len(line[i]) == 0:
265 continue
267 try:
268 x = int(line[i])
269 if abs(x) >= 2147483647:
270 raise ValueError("too big int")
272 if i not in count_types:
273 count_types[i] = {int: 1}
274 else:
275 count_types[i][int] = count_types[i].get(int, 0) + 1
277 except ValueError:
278 try:
279 x = float(line[i])
281 if i not in count_types:
282 count_types[i] = {float: 1}
283 else:
284 count_types[i][float] = count_types[
285 i].get(float, 0) + 1
287 if columns[i][1] != float:
288 columns[i] = (columns[i][0], float)
290 except ValueError:
291 columns[i] = (
292 columns[i][0], (str, max(
293 1, len(
294 line[i])) * 2))
296 if i not in count_types:
297 count_types[i] = {str: 1}
298 else:
299 count_types[i][str] = count_types[
300 i].get(str, 0) + 1
302 self.LOG(" guess with ", len(lines), "lines")
303 self.LOG(" count_types ", count_types)
304 for i in range(0, len(columns)):
306 # if i is not in count_types, it means the first rows do now
307 # contain values for these columns (only null values)
308 t = count_types.get(i, {str: 1})
309 nb = sum(t.values())
311 th = 0.0 if nb < 50 else (
312 0.01 if nb < 100 else 0.02) # we authorize 2% of wrong types
314 n = t.get(int, 0)
315 if n * 1.0 / nb >= 1 - th:
316 ty = int
317 else:
318 n += t.get(float, 0)
319 if n * 1.0 / nb >= 1 - th:
320 ty = float
321 else:
322 ty = str
324 columns[i] = (columns[i][0], ty)
326 self.LOG(" columns ", columns)
328 # if not done, choose str by default
329 for c in columns:
330 v = columns[c]
331 if v[1] == str:
332 columns[c] = (v[0], (str, max(1, length.get(c, 4)) * 2))
334 for c, v in columns.items():
335 t = v[1]
336 if isinstance(t, tuple) and t[0] == str and t[1] == 0:
337 raise Exception(
338 "the length is null for column %s - %s" %
339 (c, str(v)))
341 self.LOG(" guess", columns)
342 return columns, changes
344 def _process_text_line(self, line, columns, format, lower_case, num_line,
345 fill_missing=0, filter_case=None,
346 strict_separator=False):
347 """
348 Processes a text line.
350 @param line text line to process (or a list if it already splitted)
351 @param columns columns definition @see me _append_table
352 @param format only tsv for the moment
353 @param lower_case put every str object in lower_case
354 @param num_line line number
355 @param fill_missing fill the missing values by a default value, at least not more than fill_missing values
356 @param filter_case process every case information (used to replace space for example)
357 @param strict_separator strict number of columns, it assumes there is no separator in the content of every column
358 @return a dictionary
359 """
360 if not isinstance(line, list) and not isinstance(
361 line, tuple) and not isinstance(line, numpy.ndarray):
362 if format != "tsv":
363 raise Exception("unable to process format " + format)
364 line = line.strip("\r\n ").replace("\n", " ")
365 line = DatabaseCore2._split_expr.split(line)
367 if filter_case is not None:
368 line = [filter_case(s) for s in line]
370 try:
371 if fill_missing > 0:
372 m = max(columns.keys())
373 if m >= len(line):
374 line = copy.copy(line)
375 add = 0
376 while m >= len(line) and add < fill_missing:
377 a, b = columns[len(line)]
378 if b is int:
379 line.append("0")
380 elif b is float:
381 line.append("0.0")
382 elif b is decimal.Decimal:
383 line.append("0")
384 elif b is str:
385 line.append("")
386 else:
387 line.append("")
388 add += 1
390 res = {}
391 for c, v in columns.items():
392 if "AUTOFILL" in v:
393 res[v[0]] = "NULL"
394 elif "AUTOINCREMENT" in v:
395 continue
396 else:
397 if c >= len(line):
398 self.LOG(
399 "(a)line number ",
400 num_line,
401 "*unable to process a line columns ",
402 c,
403 "#",
404 line,
405 " columns ",
406 columns)
407 return None
409 val = line[c]
410 if len(v) > 2 and v[2].lower() not in [
411 "primarykey", "autofill"]:
412 val = v[2](val)
414 try:
415 if isinstance(v[1], tuple):
416 val = v[1][0](val)
417 elif v[1] is datetime.datetime:
418 if isinstance(val, datetime.datetime):
419 pass
420 elif isinstance(val, str):
421 val = datetime.datetime.parse(val)
422 else:
423 raise TypeError(
424 "unable to convert %s into datetime" % str(
425 type(val)))
426 else:
427 val = v[1](val)
428 except ValueError: # as e :
429 self.LOG(
430 "(b)line number ",
431 num_line,
432 "**unable to process a line columns ",
433 c,
434 "#",
435 v[0],
436 " type ",
437 v[1],
438 " value ",
439 repr(
440 line[c]))
441 return None
443 if isinstance(val, str):
444 val = val.replace("'", "''")
445 if lower_case:
446 val = val.lower()
447 res[v[0]] = val
449 return res
450 except Exception:
451 self.LOG("(c)line number", num_line,
452 "***unable to process a line columns:", line)
453 return None
455 def _get_insert_request(self, dico,
456 table,
457 exe=False,
458 primarykey=None,
459 cursor=None,
460 skip_exception=False):
461 """
462 Builds an ``INSERT SQL`` request from a dictionary.
464 @param dico dictionary
465 @param table table name
466 @param exe if True, execute the request, if False, do nothing except returning the request
467 @param primarykey primary key column, if it exist
468 @param cursor if None, creates a new one, otherwise use it
469 @param skip_exception if True, log exception instead of raising one
470 @return str """
471 keys = []
472 values = []
473 for k, v in dico.items():
474 keys.append(k)
475 if k != primarykey and isinstance(v, str):
476 v = "'" + str(v).replace("'", "''") + "'"
477 values.append(v)
478 elif isinstance(v, datetime.datetime):
479 values.append("'" + str(v) + "'")
480 else:
481 values.append(str(v))
482 keys = ",".join(keys)
483 values = ",".join(values)
484 sql = "INSERT INTO %s (%s) VALUES (%s);" % (table, keys, values)
485 if exe:
486 try:
487 if cursor is not None:
488 cursor.execute(sql)
489 else:
490 self._connection.execute(sql)
491 except SQLite.OperationalError as e:
492 if skip_exception:
493 self.LOG(
494 "OperationalError: unable to execute a query", e, sql)
495 else:
496 raise ExceptionSQL( # pylint: disable=W0707
497 "OperationalError: unable to execute a query", e, sql)
498 except SQLite.IntegrityError as e:
499 if skip_exception:
500 self.LOG("IntegrityError: unable to execute a query", e, sql)
501 else:
502 raise ExceptionSQL( # pylint: disable=W0707
503 "IntegrityError: unable to execute a query", e, sql)
504 return sql
506 def get_python_code(self, varname="db"):
507 """
508 Returns the python code associated to this database.
510 @param varname name of the variable
511 @return 2-uple: simp, scode (import part, code part)
512 """
513 simp = ["from pyensae import Database"]
515 code = ["tblname = r'%s'" % self.get_file()]
517 more = []
518 if self._engine != "SQLite":
519 code.append("engine = '%s'" % self._engine)
520 if self._user is not None:
521 code.append("user = '%s'" % self._user)
522 if self._password is not None:
523 code.append("password = '%s'" % self._password)
524 if self._host is not None:
525 code.append("host = '%s'" % self._host)
526 more = ", ".join(more)
528 code.append(
529 "%s = Database (dbfile = tblname%s)" %
530 (varname, more))
531 code.append("%s.connect ()" % varname)
532 att = self.get_attached_database_list(file=True)
533 for alias, file in att:
534 if len(file) == 0:
535 continue
536 code.append(
537 "%s.attach_database ('%s','%s')" %
538 (varname, file, alias))
540 return "\n".join(simp) + "\n", "\n".join(code) + "\n"