"""
:class:`Database <pyensae.sql.database_main.Database>`
:githublink:`%|py|6`
"""
import re
import copy
import random
import sqlite3 as SQLite
import datetime
import decimal
import numpy
from .file_text_binary import TextFile
from .database_exception import ExceptionSQL
[docs]class DatabaseCore2:
"""
Complementary methods for class :class:`Database <pyensae.sql.database_main.Database>`.
:githublink:`%|py|32`
"""
_split_expr = "\\r?\\t"
[docs] def _check_connection(self):
"""
Checks the SQL connection.
:githublink:`%|py|39`
"""
if "_connection" not in self.__dict__:
message = "use connect method before doing operation on this database"
raise Exception(message)
[docs] def _check_values(self, values):
"""
When values are inserted or updated, this method doubles ``"'"``
it does not allow str values, only str.
:param values: dictionary
:return: dictionary
:githublink:`%|py|51`
"""
mod = []
for k, v in values.items():
if isinstance(v, str) and "'" in v:
mod.append(k)
if len(mod) == 0:
return values
else:
values = copy.copy(values)
for k in mod:
values[k] = values[k].replace("'", "''")
return values
[docs] def summary(self, light=False):
"""
Returns the list of tables, their columns, and their length.
:param light: light version, no count, no first lines
:return: a dictionary where the keys are (t,i), t is a table name, i is in ["columns", "size", "first_lines"],
a str message
:githublink:`%|py|71`
"""
tables = self.get_table_list()
indexes = self.get_index_list()
res = {}
lines = []
for t in tables:
col = self.get_table_columns_list(t)
if not light:
size = self.get_table_nb_lines(t)
first = self.get_table_nfirst_lines(t)
else:
size = -1
first = []
res[t, "columns"] = col
res[t, "size"] = size
res[t, "first_lines"] = first
lines.append(t + "\t" + str(size) + " records")
lines.append(" columns")
for c in col:
lines.append(" " + str(c))
if len(first) > 0:
lines.append(" first_lines")
for lf in first:
fo = []
if lf is None:
lines.append(" None")
else:
for x in lf:
if not isinstance(x, str):
fo.append(str(x))
else:
fo.append(x)
lines.append(" " + "\t".join(fo))
if len(indexes) > 0:
lines.append("\n")
lines.append("indexes")
for tu in indexes:
if isinstance(tu, (tuple, list)):
lines.append(" " + "\t".join([str(x) for x in tu]))
else:
lines.append(" " + tu)
attached = self.get_attached_database_list()
if len(attached) > 0:
lines.append("\n")
lines.append("attached databases")
for a in attached:
if a == "main":
continue
lines.append(" " + "\t" + a)
continue
# ~ rrr = self.execute(
# ~ "SELECT name FROM %s.sqlite_master ORDER BY name;" %
# ~ (a,))
# ~ for b in rrr:
# ~ lines.append(" " + "\t" + b[0])
return res, "\n".join(lines)
[docs] def _guess_columns(
self, file, format, columns_name=None, filter_case=None, header=True, encoding="utf-8"):
"""
Guesses the columns types from a file (the method assumes there is a header),
The types are chosen in that order: int, float, str.
It keeps the most frequent one with if there is not too many errors.
The separator must be tabs (``\\t``).
:param file: file name
:param format: format (only tsv)
:param columns_name: if None, the first line contains the columns, otherwise it is the columns name
:param filter_case: process every case information (used to replace space for example)
:param header: by default, the function is expected a header
:param encoding: encoding
:return: columns, changes
:githublink:`%|py|150`
"""
f = TextFile(file, fLOG=self.LOG, encoding=encoding)
f.open()
if header:
_aa, _bb, _cc, _dd = f.guess_columns(fields=columns_name)
reg_exp = re.compile(DatabaseCore2._split_expr.replace(
"\\t", _cc.replace("|", "[|]")))
else:
# tabulation by default
reg_exp = re.compile(DatabaseCore2._split_expr)
f.close()
raise NoHeaderException("a header is expected for that function")
self.LOG(" [_guess_columns] sep={0}".format([_cc]))
lines = []
for line in f:
if len(lines) > 1000:
break
if len(lines) > 900 and random.randint(0, 10) > 0:
continue
lines.append(reg_exp.split(
line.strip(" \r\n").strip('\ufeff')))
f.close()
if len(lines) <= 1:
raise Exception("file %s is empty" % file)
exp = re.compile("\\W+")
columns = {}
done = {}
count = {}
changes = {}
for i in range(0, len(lines[0])):
if lines[0][i] in [
'\ufeffID', '\ufeffid', '\ufeffqid', '\ufeffQID']:
lines[0][i] = "qid"
if columns_name is None:
name = lines[0][i].replace(":", "_")
origin = lines[0][i]
else:
name = columns_name[i].replace(":", "_")
origin = columns_name[i]
name = name.replace("-", "_").replace(" ", "_")
spl = exp.split(name)
if len(spl) > 1:
name = "".join(spl)
if name[0] in "0123456789":
name = "_" + name
if name in count:
count[name] += 1
name += str(count[name])
else:
count[name] = 1
#lines [0][i] = name
columns[i] = (name, int)
done[i] = False
if origin != name:
changes[origin] = name
self.LOG(" [_guess_columns] columns_name={0}".format(columns_name))
length = {}
nbline = 0
count_types = {}
for line_ in lines[1:]:
if filter_case is None:
line = line_
else:
line = [filter_case(s) for s in line_]
nbline += 1
if line == [] or line == ['']:
continue
for i in range(0, len(line)):
if i >= len(done):
# it is probably a wrong line
continue
vl = length.get(i, 0)
if len(line[i]) > vl:
length[i] = len(line[i])
try:
if done[i]:
continue
except KeyError as e:
str_columns = ""
for k, v in columns.items():
str_columns += " " + \
str(k) + "\t" + str(v) + "\n"
mes = "KeyError:" + str(e) + "\n" + str(done) + "\n" + str_columns + "\nnb line " + str(
nbline) + " columns: " + str(len(line)) + "\n" + str(line)
raise RuntimeError( # pylint: disable=W0707
"problem\n" +
mes +
"\n\ncount_types:\n " +
"\n ".join(
"{0}:{1}".format(
k,
v) for k,
v in sorted(
count_types.items())))
if line[i] is None or len(line[i]) == 0:
continue
try:
x = int(line[i])
if abs(x) >= 2147483647:
raise ValueError("too big int")
if i not in count_types:
count_types[i] = {int: 1}
else:
count_types[i][int] = count_types[i].get(int, 0) + 1
except ValueError:
try:
x = float(line[i])
if i not in count_types:
count_types[i] = {float: 1}
else:
count_types[i][float] = count_types[
i].get(float, 0) + 1
if columns[i][1] != float:
columns[i] = (columns[i][0], float)
except ValueError:
columns[i] = (
columns[i][0], (str, max(
1, len(
line[i])) * 2))
if i not in count_types:
count_types[i] = {str: 1}
else:
count_types[i][str] = count_types[
i].get(str, 0) + 1
self.LOG(" guess with ", len(lines), "lines")
self.LOG(" count_types ", count_types)
for i in range(0, len(columns)):
# if i is not in count_types, it means the first rows do now
# contain values for these columns (only null values)
t = count_types.get(i, {str: 1})
nb = sum(t.values())
th = 0.0 if nb < 50 else (
0.01 if nb < 100 else 0.02) # we authorize 2% of wrong types
n = t.get(int, 0)
if n * 1.0 / nb >= 1 - th:
ty = int
else:
n += t.get(float, 0)
if n * 1.0 / nb >= 1 - th:
ty = float
else:
ty = str
columns[i] = (columns[i][0], ty)
self.LOG(" columns ", columns)
# if not done, choose str by default
for c in columns:
v = columns[c]
if v[1] == str:
columns[c] = (v[0], (str, max(1, length.get(c, 4)) * 2))
for c, v in columns.items():
t = v[1]
if isinstance(t, tuple) and t[0] == str and t[1] == 0:
raise Exception(
"the length is null for column %s - %s" %
(c, str(v)))
self.LOG(" guess", columns)
return columns, changes
[docs] def _process_text_line(self, line, columns, format, lower_case, num_line,
fill_missing=0, filter_case=None,
strict_separator=False):
"""
Processes a text line.
:param line: text line to process (or a list if it already splitted)
:param columns: columns definition :meth:`_append_table <pyensae.sql.database_import_export.DatabaseImportExport._append_table>`
:param format: only tsv for the moment
:param lower_case: put every str object in lower_case
:param num_line: line number
:param fill_missing: fill the missing values by a default value, at least not more than fill_missing values
:param filter_case: process every case information (used to replace space for example)
:param strict_separator: strict number of columns, it assumes there is no separator in the content of every column
:return: a dictionary
:githublink:`%|py|359`
"""
if not isinstance(line, list) and not isinstance(
line, tuple) and not isinstance(line, numpy.ndarray):
if format != "tsv":
raise Exception("unable to process format " + format)
line = line.strip("\r\n ").replace("\n", " ")
line = DatabaseCore2._split_expr.split(line)
if filter_case is not None:
line = [filter_case(s) for s in line]
try:
if fill_missing > 0:
m = max(columns.keys())
if m >= len(line):
line = copy.copy(line)
add = 0
while m >= len(line) and add < fill_missing:
a, b = columns[len(line)]
if b is int:
line.append("0")
elif b is float:
line.append("0.0")
elif b is decimal.Decimal:
line.append("0")
elif b is str:
line.append("")
else:
line.append("")
add += 1
res = {}
for c, v in columns.items():
if "AUTOFILL" in v:
res[v[0]] = "NULL"
elif "AUTOINCREMENT" in v:
continue
else:
if c >= len(line):
self.LOG(
"(a)line number ",
num_line,
"*unable to process a line columns ",
c,
"#",
line,
" columns ",
columns)
return None
val = line[c]
if len(v) > 2 and v[2].lower() not in [
"primarykey", "autofill"]:
val = v[2](val)
try:
if isinstance(v[1], tuple):
val = v[1][0](val)
elif v[1] is datetime.datetime:
if isinstance(val, datetime.datetime):
pass
elif isinstance(val, str):
val = datetime.datetime.parse(val)
else:
raise TypeError(
"unable to convert %s into datetime" % str(
type(val)))
else:
val = v[1](val)
except ValueError: # as e :
self.LOG(
"(b)line number ",
num_line,
"**unable to process a line columns ",
c,
"#",
v[0],
" type ",
v[1],
" value ",
repr(
line[c]))
return None
if isinstance(val, str):
val = val.replace("'", "''")
if lower_case:
val = val.lower()
res[v[0]] = val
return res
except Exception:
self.LOG("(c)line number", num_line,
"***unable to process a line columns:", line)
return None
[docs] def _get_insert_request(self, dico,
table,
exe=False,
primarykey=None,
cursor=None,
skip_exception=False):
"""
Builds an ``INSERT SQL`` request from a dictionary.
:param dico: dictionary
:param table: table name
:param exe: if True, execute the request, if False, do nothing except returning the request
:param primarykey: primary key column, if it exist
:param cursor: if None, creates a new one, otherwise use it
:param skip_exception: if True, log exception instead of raising one
:return: str
:githublink:`%|py|470`
"""
keys = []
values = []
for k, v in dico.items():
keys.append(k)
if k != primarykey and isinstance(v, str):
v = "'" + str(v).replace("'", "''") + "'"
values.append(v)
elif isinstance(v, datetime.datetime):
values.append("'" + str(v) + "'")
else:
values.append(str(v))
keys = ",".join(keys)
values = ",".join(values)
sql = "INSERT INTO %s (%s) VALUES (%s);" % (table, keys, values)
if exe:
try:
if cursor is not None:
cursor.execute(sql)
else:
self._connection.execute(sql)
except SQLite.OperationalError as e:
if skip_exception:
self.LOG(
"OperationalError: unable to execute a query", e, sql)
else:
raise ExceptionSQL( # pylint: disable=W0707
"OperationalError: unable to execute a query", e, sql)
except SQLite.IntegrityError as e:
if skip_exception:
self.LOG("IntegrityError: unable to execute a query", e, sql)
else:
raise ExceptionSQL( # pylint: disable=W0707
"IntegrityError: unable to execute a query", e, sql)
return sql
[docs] def get_python_code(self, varname="db"):
"""
Returns the python code associated to this database.
:param varname: name of the variable
:return: 2-uple: simp, scode (import part, code part)
:githublink:`%|py|512`
"""
simp = ["from pyensae import Database"]
code = ["tblname = r'%s'" % self.get_file()]
more = []
if self._engine != "SQLite":
code.append("engine = '%s'" % self._engine)
if self._user is not None:
code.append("user = '%s'" % self._user)
if self._password is not None:
code.append("password = '%s'" % self._password)
if self._host is not None:
code.append("host = '%s'" % self._host)
more = ", ".join(more)
code.append(
"%s = Database (dbfile = tblname%s)" %
(varname, more))
code.append("%s.connect ()" % varname)
att = self.get_attached_database_list(file=True)
for alias, file in att:
if len(file) == 0:
continue
code.append(
"%s.attach_database ('%s','%s')" %
(varname, file, alias))
return "\n".join(simp) + "\n", "\n".join(code) + "\n"