"""
:class:`Database <pyensae.sql.database_main.Database>`
:githublink:`%|py|6`
"""
import os
import sys
import math
import re
import time
import decimal
import sqlite3 as SQLite
import datetime
import numpy
from .database_exception import ExceptionSQL, DBException
from .database_core2 import DatabaseCore2
module_odbc = None
[docs]class DatabaseCore(DatabaseCore2):
"""
Core methods for class :class:`Database <pyensae.sql.database_main.Database>`.
.. list-table::
:widths: auto
:header-rows: 1
* - attribute
- meaning
* - _engine
- engine type (SQLite is the only available)
* - _sql_file
- database file, if it does not exist, it will be created.
:githublink:`%|py|27`
"""
_sql_keywords = ["order", "by", "select", "from", "group", "where", "as", "like", "upper", "collapse", "join", "union",
"inner", "default", "id", "double", "text", "varchar", "float", "long", "Decimal"]
_SQL_conversion_types = {"": float,
"TEXT": str,
"text": str,
"INTEGER": int,
"FLOAT": float,
"REAL": float,
"float": float,
"numeric": float,
"LONG": int,
"int": int,
"varchar": str,
"VARCHAR": str,
"Decimal": decimal.Decimal,
"DATETIME": datetime.datetime,
"smallint": int,
"bigint": float, }
_engines = ["SQLite", "MySQL", "ODBCMSSQL"]
_field_option = ["PRIMARYKEY", "AUTOINCREMENT", "AUTOFILL"]
[docs] def __init__(self, sql_file, engine="SQLite", user=None, password=None,
host="localhost", LOG=None, attach=None):
"""
Creates a database object.
:param sql_file: database file database file (use ``:memory:`` to avoid creating a file and using only memory)
it can also contain several files separated by ;
``name_file ; nickname,second_file ; ...``
:param engine: SQLite or MySQL (if it is installed), ODBCMSSQL
:param user: user if needed
:param host: to connect to a MSSQL database
:param password: password if needed
:param LOG: LOG function, if None, choose ``print``
:param attach: dictionary ``{nickname: filename}``,
list of databases to attach
.. warning:: If the folder does not exist, it will be created
Parameter *dbfile* can be of type
`sqlite3.Connection <https://docs.python.org/3/library/sqlite3.html#sqlite3.Connection>`_.
:githublink:`%|py|72`
"""
# attach cases
if attach is None:
attach = {}
else:
attach = attach.copy()
if isinstance(sql_file, str):
for e in DatabaseCore._engines:
if sql_file.startswith(e + ":::"):
engine = e
sql_file = sql_file[len(e) + 3:]
if "###" in sql_file:
host, sql_file = sql_file.split("###")
break
if ";" in sql_file:
li = [s.strip() for s in sql_file.split(";")]
sql_file = li[0]
rest = li[1:]
for s in rest:
ok = s.split(",")
if len(ok) != 2:
raise DBException( # pragma: no cover
"unable to find an alias in %r" % s)
nick = ok[0].strip()
file = ",".join(ok[1:])
attach[nick] = file.strip()
elif sql_file.startswith(":"):
if sql_file != ":memory:":
raise FileNotFoundError( # pragma: no cover
"unable to interpret file: %r" % sql_file)
# some initialization
self._password = password
self._user = user
self._host = host
# the rest
if LOG is None:
def blind(*li, **p): # pragma: no cover
pass
LOG = blind # pragma: no cover
self.LOG = LOG
if isinstance(LOG, dict):
raise TypeError( # pragma: no cover
"fLOG should be a function, not a dictionary")
if isinstance(self.LOG, dict):
raise TypeError( # pragma: no cover
"LOG should be a function, not a dictionary")
if engine == "SQLite":
self._sql_file = sql_file
self._engine = engine
elif engine == "ODBCMSSQL":
raise DBException( # pragma: no cover
"Unable to connect to a SQL server.")
else:
raise DBException( # pragma: no cover
"unfounded engine %s in %s" %
(engine, ", ".join(
DatabaseCore._engines)))
# write a file able to build a database summary
if isinstance(sql_file, str) and not self.isMemory():
folder = os.path.split(sql_file)[0]
if len(folder) > 0 and not os.path.exists(folder):
os.makedirs(folder)
summary = os.path.join(folder, "temp_quick_look_up.py")
if not os.path.exists(summary):
#cwd = os.path.join (os.path.abspath (os.path.split (__file__) [0]), "..", "..")
#fi = os.path.split (sql_file) [1]
if hasattr(DatabaseCore, "SCRIPT_LOOKUP"):
script = DatabaseCore.SCRIPT_LOOKUP
lines = script.split("\n")
lines = [li if "__CWD__ =" not in li else
li.replace(
"(__file__)",
"(r'%s')" %
os.path.realpath(__file__))
for li in lines]
script = "\n".join(lines)
script = script.replace(
"python quick_look_up.py",
"%s quick_look_up.py" %
sys.executable)
self.LOG("creating script ", summary)
try:
f = open(summary, "w")
f.write(script)
f.close()
except IOError:
self.LOG("unable to write ", summary)
self._attach = attach
self._buffer_insert = []
self._buffer_insert_s = 0
if isinstance(sql_file, str) and self.isMemory():
self._connection = SQLite.connect(self._sql_file)
elif isinstance(sql_file, SQLite.Connection):
self._connection = sql_file
self._sql_file = ":memory:"
[docs] def isMSSQL(self):
"""
Says if the syntax is MS SQL Server.
:githublink:`%|py|185`
"""
if self._engine == "ODBCMSSQL":
return True
return False
[docs] def isMemory(self):
"""
Tells if the Database takes place in memory (``:memory:``).
:githublink:`%|py|193`
"""
return self._sql_file == ":memory:"
##########################################################################
# connection
##########################################################################
[docs] def SetBufferInsert(self, n):
"""
This function offers the possibility to postpone the insertion,
they will be processed all at the time during when method commit is called.
:param n: number of insertion to postpone
:githublink:`%|py|206`
"""
self._buffer_insert_s = n
[docs] def is_connected(self):
"""
Says if the database is connected.
:return: "_connection" in self.__dict__
:githublink:`%|py|214`
"""
return "_connection" in self.__dict__
[docs] @staticmethod
def regex_match(exp, st):
"Applies a regular expression. Static method to insert in a SQL query."
return 0 if re.compile(exp).search(st) is None else 1
[docs] @staticmethod
def idaytodate(dayint, year, month, day):
"Date conversion. Static method to insert in a SQL query."
try:
d = datetime.datetime(year, month, day)
day = datetime.datetime(year, month, day + 1) - d
cur = d + day * dayint
return str(cur).split()[0]
except Exception as e:
return str(e)
[docs] @staticmethod
def isectoday(sec):
"Date conversion. Static method to insert in a SQL query."
if sec < 0:
return "negative time"
elif sec >= 86400:
return "out of day"
else:
s = int(sec)
h = s / 3600
m = (s % 3600) / 60
s %= 60
return "%02d:%02d:%02d" % (h, m, s)
[docs] @staticmethod
def itimestamp(t, year, month, day):
"Date conversion. Static method to insert in a SQL query."
d = DatabaseCore.idaytodate(int(t / 86400), year, month, day)
s = DatabaseCore.isectoday(int(t - 86400. * int(t / 86400)))
return d + " " + s
[docs] @staticmethod
def string_to_date(s):
"Date conversion. Static method to insert in a SQL query."
d = int(s[:2])
m = int(s[3:5])
y = int(s[6:])
return datetime.datetime(y, m, d)
[docs] @staticmethod
def _special_function_init_():
_list_special_function = [
("log", math.log, 1, "log(s) --> float", "log"),
("exp", math.exp, 1, "exp(s) --> float", "exp"),
("len", len, 1, "len(s) --> int", "string length"),
("lower",
lambda s:s.lower(),
1,
"lower(s) --> string",
"lower case"),
("upper",
lambda s:s.upper(),
1,
"upper(s) --> string",
"upper case"),
("isubstring", lambda sub, s: 1 if sub in s else 0,
2, "isubstring(sub,str) --> {0,1}", "return 1 if str includes sub, 0 otherwise"),
("match", DatabaseCore.regex_match,
2, "match(regex,str) --> {0,1}", "return 1 if str matches the regular expression exp, 0 otherwise"),
("idaytodate", DatabaseCore.idaytodate,
4, "idaytodate (day, 1970, 1, 1) --> str", "date if day is the number of days since 01/01/1970"),
("itimestamp", DatabaseCore.itimestamp,
4, "itimestamp (t, 1970, 1, 1) --> str", "date,time if t is the number of seconds since 01/01/1970"),
("isectoday", DatabaseCore.isectoday,
1, "isectoday (isec) --> str", "time if isec is the number of seconds since midnight"),
]
return _list_special_function
[docs] def connect(self, odbc_string=None):
"""
Opens a connection to the database.
:param odbc_string: use a different odbc string
:githublink:`%|py|296`
"""
if self.isMemory():
if "_connection" not in self.__dict__:
raise DBException( # pragma: no cover
"It is a database in memory, the database should already be connected.")
else:
if "_connection" in self.__dict__:
raise RuntimeError("A previous connection was not closed.")
if self._engine == "SQLite":
self._connection = SQLite.connect(self._sql_file)
# elif self._engine == "MySQL" : self._connection =
# MySQLdb.connect (self._host, self._user, self._password,
# self._sql_file)
elif self._engine == "ODBCMSSQL": # pragma: no cover
if odbc_string is None:
temp = ["DRIVER={SQL Server Native Client 10.0}", # {SQL Server}",
"SERVER=%s" % self._host,
"DATABASE=%s" % self._sql_file,
"Trusted_Connection=yes",
"MARS_Connection=yes",
# "MultipleActiveResultSets=True",
#"Integrated Security=SSPI",
]
#temp = ["DSN=%s" % self._sql_file ]
if self._user is not None:
temp.append("UID=%s" % self._user)
if self._password is not None:
temp.append("PASSWORD=%s" % self._password)
st = ";".join(temp)
self.LOG("connection string ", st)
self._connection = module_odbc.connect(st)
else:
st = odbc_string
self.LOG("connection string ", st)
self._connection = module_odbc.connect(st)
else:
raise DBException( # pragma: no cover
"This engine does not exists (%r)" % self._engine)
for func in DatabaseCore._special_function_init_():
self.add_function(func[0], func[2], func[1])
for k, v in self._attach.items():
self.attach_database(v, k)
[docs] def close(self):
"""
Closes the database.
:githublink:`%|py|347`
"""
if self.isMemory():
# we should not close, otherwise, we lose the data
pass
else:
self._check_connection()
self._connection.close()
del self._connection
[docs] def commit(self):
"""
Calls this function after any insert request.
:githublink:`%|py|359`
"""
self._check_connection()
for s in self._buffer_insert:
self._connection.execute(s)
del self._buffer_insert[:]
self._connection.commit()
##########################################################################
# access part
##########################################################################
[docs] def get_file(self, attached_db=False):
"""
Gets database file.
:param attached_db: if True, add the list of attached databases
:return: the database file
:githublink:`%|py|378`
"""
if attached_db:
files = [self._sql_file]
att = self.get_attached_database_list(True)
for alias, file in att:
files.append("%s,%s" % (alias, file))
temp = ";".join(files)
if self._engine != "SQLite":
if self._host is None:
temp = "%s:::%s" % (self._engine, temp)
else:
temp = "%s:::%s###%s" % (self._engine, self._host, temp)
return temp
else:
return self._sql_file
[docs] def has_table(self, table):
"""
Says if the table belongs to the database.
:param table: table name
:return: boolean
:githublink:`%|py|400`
"""
return table in self.get_table_list("." in table)
[docs] def has_index(self, index):
"""
Says if the index belongs to the database.
:param index: index name
:return: boolean
:githublink:`%|py|408`
"""
return index in [s[0] for s in self.get_index_list()]
[docs] def get_index_on_table(self, table, full=False):
"""
Returns the list of indexes on a specific table.
:param table: table
:param full: if True returns all fields, otherwise, returns only the index names
:return: list of the index on this table
:githublink:`%|py|418`
"""
indexes = self.get_index_list()
if full:
return [la for la in indexes if la[1] == table]
return [la[0] for la in indexes if la[1] == table]
[docs] def get_column_type(self, table, column):
"""
Returns the column type of a table.
:param table: table name
:param column: column name
:return: type (python class)
:githublink:`%|py|431`
"""
self._check_connection()
cols = self.get_table_columns_list(table)
for c in cols:
if c[0] == column:
return c[1]
raise DBException(
"column %s were not found in table %s" %
(column, table))
[docs] def get_index_list(self, attached="main"):
"""
Returns the list of indexes.
:param attached: if main, returns the index for the main database, otherwise, for an attached database
:return: list of tuple (index_name, table, sql_request, fields)
:githublink:`%|py|447`
"""
self._check_connection()
if attached == "main":
request = """ SELECT name,tbl_name,sql
FROM (SELECT * FROM sqlite_master UNION ALL SELECT * FROM sqlite_temp_master) AS temptbl
WHERE type='index' ORDER BY name;"""
else:
request = """ SELECT name,tbl_name,sql
FROM (SELECT * FROM %s.sqlite_master) AS temptbl
WHERE type='index' ORDER BY name;""" % attached
select = self._connection.execute(request)
exp = re.compile("[(]([a-zA-Z0-9_,]+)[)]")
res = []
for a, b, c in select:
fi = exp.findall(c)
if len(fi) != 1:
raise DBException( # pragma: no cover
"Unable to extract index fields from %r" % c)
fi = tuple([s.strip() for s in fi[0].split(",")])
res.append((a, b, c, fi))
select.close()
#self.LOG ("number of indices ", len (res))
select = res
res = []
if attached == "main":
res = select
else:
for el in select:
res.append((el[0], attached + "." + el[1], el[2], el[3]))
#self.LOG ("number of indices ", len (res))
if attached == "main":
attach = self.get_attached_database_list()
for a in attach:
if a in ("main", "temp"):
continue
r = self.get_index_list(a)
res.extend(r)
return res
[docs] def get_attached_database_list(self, file=False):
"""
Returns all the attached database (avoid the temporary ones and the main one).
:param file: ask for file also
:return: a list of tuple (alias, file)
:githublink:`%|py|496`
"""
if self.isMSSQL():
return [] # pragma: no cover
else:
cur = self._connection.cursor()
cur.execute("PRAGMA database_list;")
res = cur.fetchall()
cur.close()
res = [r for r in res if r[1] != "temp" and r[1] != "main"]
if file:
return [(r[1], r[2]) for r in res]
else:
return [r[1] for r in res]
[docs] def get_table_list(self, add_attached=False):
"""
Returns the list of tables.
:param add_attached: if True, add the list of tables included in the attached databases
:return: the table list
:githublink:`%|py|516`
"""
self._check_connection()
if self.isMSSQL(): # pragma: no cover
request = """ SELECT TABLE_NAME FROM (
SELECT TABLE_NAME, OBJECTPROPERTY(object_id(TABLE_NAME), N'IsUserTable') AS type
FROM INFORMATION_SCHEMA.TABLES) AS temp_tbl
WHERE type = 1 ORDER BY TABLE_NAME"""
else:
request = """ SELECT name
FROM (SELECT * FROM sqlite_master UNION ALL SELECT * FROM sqlite_temp_master) AS temptbl
WHERE type in('table','temp') AND name != 'sqlite_sequence' ORDER BY name;"""
select = self._connection.execute(request)
res = []
for el in select:
res.append(el[0])
if add_attached:
att = self.get_attached_database_list()
for at in att:
if at == "temp":
continue
sql = "SELECT name FROM %s.sqlite_master" % at
vie = self._connection.execute(sql)
vie = ["%s.%s" % (at, v[0]) for v in vie]
res.extend(vie)
return res
[docs] def get_table_columns(self, table, dictionary=False):
"""
See :meth:`get_table_columns_list <pyensae.sql.database_core.DatabaseCore.get_table_columns_list>`.
Example (`dictionary == False`):
::
[('fid', <type 'int'>), ('fidp', <type 'int'>), ('field', <type 'str'>)]
Or (`dictionary = True`):
::
{0: ('fid', <type 'int'>), 1: ('fidp', <type 'int'>), 2: ('field', <type 'str'>)}
:githublink:`%|py|555`
"""
return self.get_table_columns_list(table, dictionary)
[docs] def get_table_columns_list(self, table, dictionary=False):
"""
Returns all the columns for a table.
:param table: table name
:param dictionary: returns the list as a dictionary
:return: a list of tuple (column name, Python type)
Example (`dictionary == False`):
::
[('fid', <type 'int'>), ('fidp', <type 'int'>), ('field', <type 'str'>)]
Or (`dictionary = True`):
::
{0: ('fid', <type 'int'>), 1: ('fidp', <type 'int'>), 2: ('field', <type 'str'>)}
:githublink:`%|py|576`
"""
if "." in table:
prefix = table.split(".")[0] + "."
table = table.split(".")[1]
else:
# table = table
prefix = ""
cur = self._connection.cursor()
if self.isMSSQL(): # pragma: no cover
prf = "" if len(prefix) == 0 else prefix + "."
sql = """SELECT * FROM (SELECT OBJECT_NAME(c.OBJECT_ID) TableName,c.name AS ColumnName,t.name AS TypeName
FROM sys.columns AS c
JOIN sys.types AS t ON c.user_type_id=t.user_type_id
) AS ttt
WHERE ttt.TableName = '%s%s'""" % (prf, table)
cur.execute(sql)
else:
cur.execute("PRAGMA %stable_info(%s)" % (prefix, table) + ";")
res = cur.fetchall()
cur.close()
res = [(r[1], DatabaseCore._SQL_conversion_types[r[2]]) for r in res]
if dictionary:
dic = {}
for i in range(0, len(res)):
dic[i] = res[i]
return dic
else:
return res
[docs] def get_table_nb_lines(self, table):
"""
Returns the number of lines in a table (or number of observations).
:param table: table name
:return: integer
:githublink:`%|py|613`
"""
sql = "SELECT COUNT(*) FROM " + table + ";"
cur = self._connection.cursor()
cur.execute(sql)
res = cur.fetchall()
cur.close()
return res[0][0]
[docs] def len(self, table):
"""
Returns the number of lines of table ``table``.
:param table: table
:return: int
:githublink:`%|py|627`
"""
return self.get_table_nb_lines(table)
[docs] def get_table_nfirst_lines(self, table, n=1):
"""
Returns the *n* first lines.
:param table: table name
:param n: number of asked lines
:return: integer
:githublink:`%|py|637`
"""
sql = "SELECT * FROM %s ;" % table
cur = self._connection.cursor()
cur.execute(sql)
if n <= 1:
res = [cur.fetchone()]
else:
res = []
for line in cur:
n -= 1
if n <= -1:
break
res.append(line)
cur.close()
return res
[docs] def get_sql_columns(self, request):
"""
Returns the columns name for a SQL request.
:param request: SQL request
:return: list of columns name
:githublink:`%|py|659`
"""
cur = self.execute(request)
col_name_list = [tuple[0] for tuple in cur.description]
cur.close()
return col_name_list
##########################################################################
# execution
##########################################################################
[docs] class _cross_product_iter:
"""
Iterator for CROSS.
:githublink:`%|py|673`
"""
[docs] def __init__(self, db, request):
com = re.compile("^(.*)(--.*)$")
lines = request.split("\n")
clean = []
for li in lines:
r = com.match(li)
if r is not None:
li = li[:r.span(1)[1]]
clean.append(li.strip())
req = " ".join(clean)
cross = re.compile(" *CROSS +([ a-zA-Z_0-9,]+?) +"
"PLACE +([,a-zA-Z_0-9()]+?) +"
"FROM +([a-zA-Z_0-9]+?)( +AS +[_a-z0-9])? +ORDER +BY "
"+([ a-zA-Z_0-9,]+?)( +WHERE(.*?))?( +LIMIT +([0-9]+)?)?$", re.IGNORECASE)
db.LOG("cross product", req)
find = cross.search(req)
self.request = request
self.find = find
self.db = db
if self.find is None:
return
gr = self.find.groups()
key, value, table, count_as, order, where, _, __, limit = gr
if limit is not None:
limit = int(limit)
db.LOG("parameters ", [key, value, table, order, where, limit])
fkey = key.split(",")
fval = value.split(",")
if where is None:
where = ""
nkey = len(fkey)
#nval = len (fval)
sql = "SELECT %s,%s FROM %s %s ORDER BY %s" % (
key, value, table, where, order)
cur = self.db.execute(sql)
data = {}
#tot = nkey + nval
for sample in cur:
key = sample[:nkey]
val = sample[nkey:]
if key not in data:
data[key] = []
data[key].append(val)
cur.close()
keys = sorted(data.keys())
if nkey == 1:
temp = [[str(k[0]) + ';' + s for s in fval] for k in keys]
else:
temp = [
[",".join([str(_s) for _s in k]) + ';' + s for s in fval] for k in keys]
self.description = []
for t in temp:
self.description.extend(t)
self.description = [(k, None) for k in self.description]
matrix = []
pos = 0
while True:
stil = 0
line = []
for k in keys:
v = data[k]
if pos < len(v):
stil += 1
line.extend(list(v[pos]))
else:
line.extend([None for f in fval])
if stil > 0:
matrix.append(line)
pos += 1
if limit is not None and pos >= limit:
break
else:
break
self.matrix = matrix
self.pos = 0
[docs] def is_working(self):
return self.find is not None
[docs] def __iter__(self):
"""
iterator
:githublink:`%|py|763`
"""
return self
[docs] def __next__(self):
"""
iterator
:githublink:`%|py|769`
"""
if "matrix" not in self.__dict__:
raise StopIteration # pragma: no cover
if self.pos < len(self.matrix):
n = self.pos
self.pos += 1
return self.matrix[n]
else:
raise StopIteration
[docs] def _analyse(self, request, header=False):
"""
Analyses the request does it contains cross product.
:param request: request
:param header: add a header in the first line
:return: None or an iterator
Example:
::
CROSS f1,f2,f3
PLACE a,b,c
FROM table
ORDER BY f8
WHERE f9 == ' ' -- optional
:githublink:`%|py|799`
"""
if "CROSS" not in request.upper():
return None
iter = DatabaseCore._cross_product_iter(self, request)
if not iter.is_working():
return None
else:
return iter
[docs] def execute(self, request, nolog=False):
"""
Opens a cursor with a query and return it to the user.
:param request: SQL request
:param nolog: if True, do not log anything
:return: cursor
.. exref::
:title: run a select command on a table
:tag: SQL
::
t = Database (file)
cur = t.execute ("SELECT * FROM table1 ;")
for f in cur :
print(f)
cur.close ()
There is another case outside SQL syntax to build cross product. Syntax:
::
CROSS f1,f2,f3
FROM table
PLACE a,b,c
ORDER BY f8
WHERE f9 == ' ' -- optional
The request must begin by CROSS
:githublink:`%|py|839`
"""
res = self._analyse(request)
if res is not None:
return res
else:
# classic ways
self._check_connection()
cur = self._connection.cursor()
dat = time.perf_counter()
try:
if not nolog:
lines = request.split("\n")
if len(lines) > 20:
self.LOG("SQL ", "\n".join(
[repr(x) for x in lines[:20]]))
else:
self.LOG("SQL ", "\n".join([repr(x) for x in lines]))
cur.execute(request)
dat2 = time.perf_counter()
if dat2 - dat > 10:
self.LOG("SQL end") # pragma: no cover
except Exception as e:
raise ExceptionSQL(
"unable to execute a SQL request (1)(file %s)" %
self.get_file(),
e,
request) from e
return cur
[docs] def execute_view(self, request, add_column_name=False, nolog=True):
"""
Opens a cursor with a query and returns the result into a list.
:param request: SQL request
:param add_column_name: add the column name before the first line
:param nolog: if True, do not log anything
:return: cursor
Example:
::
t = Database (file)
view = t.execute_view ("SELECT * FROM table1 ;")
:githublink:`%|py|883`
"""
cur = self.execute(request, nolog=nolog)
if add_column_name:
col_name_list = [tuple[0] for tuple in cur.description]
res = [col_name_list] + list(cur)
else:
res = list(cur)
cur.close()
if not nolog and (len(res) == 0 or len(res) > 1e4):
self.LOG("execute_view ", len(res), "results") # pragma: no cover
return res
[docs] def execute_script(self, script, nolog=True, close=True):
"""
Opens a cursor and run a script.
:param script: SQL script
:param nolog: if True, do not log anything
:param close: close the cursor
:return: cursor
:githublink:`%|py|903`
"""
self._check_connection()
if not nolog: # pragma: no cover
lines = script.split("\n")
if len(lines) > 20:
self.LOG("SQL start + ",
"\n".join([repr(x) for x in lines[:20]]))
else:
self.LOG("SQL start + ",
"\n".join([repr(x) for x in lines]))
cur = self._connection.cursor()
res = cur.executescript(script)
if close:
cur.close()
if not nolog:
self.LOG("SQL end") # pragma: no cover
else:
return res
##########################################################################
# extra functions
##########################################################################
[docs] def attach_database(self, db, alias):
"""
Attaches another database.
:param db: database to attach
:param alias: database alias
:githublink:`%|py|932`
"""
if isinstance(db, str):
self.LOG("ATTACH DATABASE '%s' TO '%s' ALIAS %s" % (db, db, alias))
self.execute("ATTACH DATABASE '%s' AS %s;" % (db, alias))
else: # pragma: no cover
self.LOG(
"ATTACH DATABASE '%s' TO '%s' ALIAS %s" %
(db._sql_file, self._sql_file, alias))
self.execute(
"ATTACH DATABASE '%s' AS %s;" %
(db.get_file(), alias))
[docs] def add_function(self, name, nbparam, function):
"""
Adds a function which can be used as any other SQL function (strim, ...).
:param name: function name (it does not allow _)
:param nbparam: number of parameters
:param function: function to add
:githublink:`%|py|951`
"""
if "_" in name:
raise RuntimeError( # pragma: no cover
"SQLite does not allow function name with _")
self._check_connection()
if self._engine == "SQLite":
self._connection.create_function(name, nbparam, function)
##########################################################################
# creation function
##########################################################################
[docs] def create_index(self, indexname, table, columns, unique=False):
"""
Creates an index on a table using some columns.
:param indexname: index name
:param table: table name
:param columns: list of columns
:param unique: any value in the columns is unique?
:githublink:`%|py|971`
"""
if not isinstance(columns, list) and not isinstance(columns, tuple):
columns = [columns]
if "." in table:
prefix = table.split(".")[0] + "."
table = table.split(".")[1]
else:
prefix = ""
# table = table
self.LOG("index create ", indexname, table, columns, unique)
if unique:
sql = "CREATE UNIQUE INDEX %s%s ON %s (%s);" % (
prefix, indexname, table, ",".join(columns))
else:
sql = "CREATE INDEX %s%s ON %s (%s);" % (
prefix, indexname, table, ",".join(columns))
self.execute(sql)
[docs] def create_table(self, table, columns, temporary=False, nolog=False):
"""
Creates a table.
:param table: table name
:param columns: columns definition, dictionary { key:(column_name,python_type) }
if PRIMARYKEY is added, the key is considered as the primary key.
:param temporary: if True the table is temporary
:param nolog: :meth:`execute`
:return: cursor
Example for *columns*:
::
columns = { -1:("key", int, "PRIMARYKEY", "AUTOINCREMENT"),
0:("name",str), 1:("number", float) }
:githublink:`%|py|1009`
"""
if self._engine == "SQLite" and table == "sqlite_sequence":
raise DBException( # pragma: no cover
"unable to create a table named sql_sequence")
tables = self.get_table_list()
if table in tables:
raise DBException( # pragma: no cover
"table %r is already present, it cannot be added" % table)
if temporary:
sql = "CREATE TEMPORARY TABLE " + table + "("
else:
sql = "CREATE TABLE " + table + "("
col = []
for c, val in columns.items():
if self.isMSSQL(): # pragma: no cover
if isinstance(val[1], tuple):
v, li = val[1]
else:
v, li = val[1], 2048
if li > 8000:
col.append(val[0] + " TEXT")
elif v is str:
col.append(val[0] + " VARCHAR(%d)" % li)
elif v is int:
col.append(val[0] + " INTEGER")
elif v is float:
col.append(val[0] + " FLOAT")
elif v is numpy.int64:
col.append(val[0] + " INTEGER")
elif v is numpy.float64:
col.append(val[0] + " FLOAT")
elif v is decimal.Decimal:
col.append(val[0] + " Decimal")
elif v is datetime.datetime:
col.append(val[0] + " DATETIME")
else:
raise DBException( # pragma: no cover
"unable to add column " +
str(c) +
" ... " +
str(val) +
" v= " +
str(v))
else:
if isinstance(val[1], tuple):
v, li = val[1]
else:
v, li = val[1], 2048
if v is str:
col.append(val[0] + " TEXT")
elif v is int:
col.append(val[0] + " INTEGER")
elif v is float:
col.append(val[0] + " FLOAT")
elif v is numpy.int64:
col.append(val[0] + " INTEGER")
elif v is numpy.float64:
col.append(val[0] + " FLOAT")
elif v is decimal.Decimal:
col.append(val[0] + " Decimal")
elif v is datetime.datetime:
col.append(val[0] + " DATETIME")
else:
raise DBException( # pragma: no cover
"unable to add column " +
str(c) +
" ... " +
str(val) +
" v= " +
str(v))
fval = val[2:]
for v in fval:
if v not in DatabaseCore._field_option:
raise DBException( # pragma: no cover
"an option is unexpected %s should be in %s" %
(v, str(
DatabaseCore._field_option)))
if "PRIMARYKEY" in val:
if val[1] != int:
raise DBException(
"unable to create a primary key on something differont from an integer (%s)" %
str(val))
col[-1] += " PRIMARY KEY"
if "AUTOINCREMENT" in val:
if self.isMSSQL():
col[-1] += " IDENTITY(0,1)"
else:
col[-1] += " AUTOINCREMENT"
sql += ",\n ".join(col)
sql += ");"
return self.execute(sql, nolog=nolog)
##########################################################################
# deletion
##########################################################################
[docs] def remove_table(self, table):
"""
Removes a table.
:param table: table name
:return: return a cursor
:githublink:`%|py|1118`
"""
self.execute("DROP TABLE %s" % table)
##########################################################################
# modification
##########################################################################
[docs] def _insert_sql(self, table, insert_values):
"""
Builds the sql for an insert request.
:param table: table name
:param insert_values: dictionary or a list
:return: string
:githublink:`%|py|1132`
"""
if isinstance(insert_values, dict):
keys = []
values = []
for k, v in insert_values.items():
keys.append(k)
if v is None:
values.append('')
elif isinstance(v, str):
v = "'" + str(v).replace("'", "''") + "'"
values.append(v)
elif isinstance(v, datetime.datetime):
v = "'" + str(v) + "'"
values.append(v)
else:
values.append(str(v))
keys = ",".join(keys)
values = ",".join(values)
sql = "INSERT INTO %s (%s) VALUES (%s)" % (table, keys, values)
return sql
elif isinstance(insert_values, (tuple, list)):
values = []
for v in insert_values:
if v is None:
values.append('')
elif isinstance(v, str):
v = "'" + str(v).replace("'", "''") + "'"
values.append(v)
elif isinstance(v, datetime.datetime):
v = "'" + str(v) + "'"
values.append(v)
else:
values.append(str(v))
values = ",".join(values)
sql = "INSERT INTO %s VALUES (%s)" % (table, values)
return sql
else:
raise TypeError( # pragma: no cover
"unexpected type: " + str(type(insert_values)))
[docs] def insert(self, table, insert_values, cursor=None, nolog=True):
"""
Inserts into a table.
:param table: table name
:param insert_values: values to insert (a list of dictionary or a single dictionary)
:param cursor: if *cursor is not None*, use it, otherwise creates a new one
:param nolog: if True, do not log anything
:return: sql request or None if several insertion were sent (result is too long)
.. warning:: The commit is not done and must be done to stored these modifications.
:githublink:`%|py|1183`
"""
if isinstance(insert_values, list):
# we expect several insertion
if self._engine != "SQLite":
for d in insert_values:
self.insert(table, d, cursor)
else:
if isinstance(insert_values[0], dict):
ins = {}
for k in insert_values[0]:
ins[k] = ":" + k
sql = self._insert_sql(table, ins)
else:
q = tuple('?' for _ in insert_values[0])
sql = self._insert_sql(table, q).replace("'", "")
sql = sql.replace("'", "")
try:
if not nolog: # pragma: no cover
if len(sql) > 1000:
self.LOG("SQLs", sql[:1000])
else:
self.LOG("SQLs", sql)
self._connection.executemany(sql, insert_values)
return ""
except Exception as e:
raise ExceptionSQL( # pylint: disable=W0707
"Unable to execute a SQL request (3) (cursor %r) (file %r)" %
(str(cursor), self.get_file()), e, sql)
elif isinstance(insert_values, dict):
sql = self._insert_sql(table, insert_values)
try:
if not nolog: # pragma: no cover
if len(sql) > 1000:
self.LOG("SQLs", sql[:1000])
else:
self.LOG("SQLs", sql)
if cursor is not None:
cursor.execute(sql)
else:
if self._buffer_insert_s > 0:
self._buffer_insert.append(sql)
if len(self._buffer_insert) >= self._buffer_insert_s:
for s in self._buffer_insert:
self._connection.execute(s)
del self._buffer_insert[:]
else:
self._connection.execute(sql)
return sql
except Exception as e:
raise ExceptionSQL( # pylint: disable=W0707
"unable to execute a SQL request (2) (cursor %r) (file %r)" %
(str(cursor), self.get_file()), e, sql)
else:
raise DBException( # pragma: no cover
"insert: expected type (list of dict or dict) instead of %s" %
(str(
type(insert_values))))
[docs] def update(self, table, key, value, values):
"""
Updates some values ``WHERE key=value``.
:param table: table to update
:param key: key
:param value: WHERE key = value
:param values: values to be updated
.. warning:: The commit is not done and must be done
to stored these modifications.
:githublink:`%|py|1258`
"""
self._check_values(values)
self._check_connection()
alls = []
for k, v in values.items():
if k != key:
if isinstance(v, (str, datetime.datetime)):
alls += ["%s='%s'" % (k, str(v))]
else:
alls += ["%s=%s" % (k, str(v))]
if isinstance(value, str):
sql = "UPDATE %s SET %s WHERE %s='%s'" % (
table, ",".join(alls), key, value.replace("'", "''"))
elif isinstance(value, datetime.datetime):
sql = "UPDATE %s SET %s WHERE %s='%s'" % (
table, ",".join(alls), key, str(value))
else:
sql = "UPDATE %s SET %s WHERE %s=%s" % (
table, ",".join(alls), key, value)
try:
self._connection.execute(sql)
return sql
except Exception as e: # pragma: no cover
raise ExceptionSQL( # pylint: disable=W0707
"Unable to execute a SQL request (4) (file %r)" %
self.get_file(), e, sql)