"""
:class:`Database <pyensae.sql.database_main.Database>`
:githublink:`%|py|6`
"""
import os
import collections
from .file_text_binary import TextFile
from .file_text_binary_columns import TextFileColumns
from .database_exception import DBException
[docs]class DatabaseImportExport:
"""
This class is not meant to be working alone.
It contains import, export function for a database, in various formats.
:githublink:`%|py|21`
"""
##########################################################################
# exporting functions
##########################################################################
[docs] def export_table_into_flat_file(self, table, filename, header=False, columns=None,
post_process=None, encoding="utf8"):
"""
Exports a table into a flat file.
:param table: table name
:param filename: filename
:param header: add a header on the first line
:param columns: export only columns in this list (if None, export all)
:param post_process: post_process a line:
- input: list, dictionary (for your own use, same one all the time)
- output: list
:param encoding: encoding
.. exref::
:title: Export the results of a SQL query into a flat file
:tag: SQL
::
from pyensae.sql.database_main import Database
dbfile = "filename.db3"
filetxt = "fileview.txt"
sql = "..."
db = Database(dbfile)
db.connect()
db.export_view_into_flat_file (sql, fileview, header = True)
db.close()
:githublink:`%|py|55`
"""
if columns is None:
sql = "SELECT * FROM " + table + ";"
else:
sql = "SELECT %s FROM %s ;" % (",".join(columns), table)
self.export_view_into_flat_file(
sql, filename, header, post_process, encoding=encoding)
[docs] def _clean_string(self, s):
"""
Cleans string.
:param s: string
:return: remove \\r\\t\\n
:githublink:`%|py|70`
"""
rep = {"\t": "\\t",
"\n": "\\n",
"\r": "\\r", }
for k, v in rep.items():
s = s.replace(k, v)
return s
[docs] def export_view_into_flat_file(self, view_sql, filename, header=False, post_process=None,
encoding="utf8"):
"""
Exports a table into a flat file.
:param view_sql: SQL request
:param filename: filename
:param header: if != None, add a header on the first line (header is a list of string)
:param post_process: if != None, use this function to post-process a text line extracted from the file
:param encoding: if != None, use this as a parameter to convert any value into str
:githublink:`%|py|88`
"""
sepline = "\n"
self._check_connection()
if header:
if isinstance(header, (list, tuple)):
header_line = "\t".join(header) + sepline
elif isinstance(header, bool):
col = self.get_sql_columns(view_sql)
header_line = "\t".join(col) + sepline
else:
header_line = header + sepline
else:
header_line = ""
sql = view_sql
cur = self.execute(sql)
nbline = 0
f = open(filename, "w", encoding=encoding)
f.write(header_line)
memo = {}
for line_ in cur:
if post_process is not None:
line = post_process(line_, memo)
else:
line = line_
pr = "\t".join([self._clean_string(str(x)) for x in line])
f.write(pr + sepline)
nbline += 1
if nbline % 100000 == 0:
self.LOG(" exporting from view, line ", nbline)
f.close()
cur.close()
##########################################################################
# importing functions
##########################################################################
[docs] def append_values(self, values, tablename, schema, cursor=None,
skip_exception=False, encoding="utf-8"):
"""
Uses :meth:`_append_table <pyensae.sql.database_import_export.DatabaseImportExport._append_table>` to fill a table will the
values contained in values (as list).
:param values: list of list (each cell is a value)
:param tablename: name of the table to fill
:param schema: schema of the database, it must be present in case on the columns
includes the tag "PRIMARYKEY", in that case, the value for this field
will be automatically set up.
:param cursor: if None, create a new one
:param skip_exception: skip exception while inserting an element
:param encoding: encoding
:githublink:`%|py|148`
"""
self._append_table(
values,
tablename,
schema,
cursor=cursor,
skip_exception=skip_exception,
encoding=encoding)
[docs] def _append_table(self, file, table, columns, format="tsv", header=False,
stop=-1, lower_case=False, cursor=None, fill_missing=0,
unique=None, filter_case=None, strict_separator=False,
skip_exception=False, changes=None, encoding="utf-8",
**params):
"""
Appends element to a database.
:param file: file name or a matrix (this matrix can be an iterator)
:param table: table name
:param columns: columns definition (see below)
:param format: tsv, the only one accepted for the time being, it can be a function (line, **params)
:param header: the file has a header of not, if True, skip the first line
:param stop: if -1, insert every line, otherwise stop when the number of inserted lines is stop
:param lower_case: put every str string in lower_case before inserting it
:param cursor: if None, create a new one
:param fill_missing: fill the missing values by a default value, at least not more than fill_missing values
:param unique: if unique is a column number,
the function will not take into account another containing a value already seen on this column
:param filter_case: process every case information (used to replace space for example)
:param strict_separator: strict number of columns, it assumes there is no separator in the content of every column
:param params: see format
:param skip_exception: skip exception while inserting an element
:param changes: to rewrite column names
:param encoding: encoding
:return: number of inserted elements
The columns definition must follow the schema:
- dictionary ``{ key:(column_name,python_type) }``
- or ``{ key:(column_name,python_type,preprocessing_function) }``
``preprocessing_function`` is a function whose prototype is for example::
def preprocessing_score (s) :
return s.replace (",",".")
And:
- if ``PRIMARYKEY`` is added, the key is considered as the primary key
- if ``AUTOINCREMENT`` is added, the key will automatically filled (like an id)
:githublink:`%|py|199`
"""
if changes is None:
changes = {}
if stop != -1:
self.LOG("SQL append table stop is ", stop)
self._check_connection()
nbinsert = 0
unique_key = {}
if isinstance(file, list) or (
isinstance(file, collections.Iterable) and not isinstance(file, str)):
primarykey = None
for c, v in columns.items():
if "PRIMARYKEY" in v:
primarykey = v[0]
if table not in self.get_table_list():
raise DBException("unable to find table " + table)
all = 0
num_line = 0
for line in file:
if stop != -1 and all >= stop:
break
dic = self._process_text_line(line, columns, format=format, lower_case=lower_case,
num_line=num_line, filter_case=filter_case,
strict_separator=strict_separator)
if unique is not None:
if dic[unique] in unique_key:
continue
unique_key[dic[unique]] = 0
num_line += 1
if dic is not None:
self._get_insert_request(dic, table, True, primarykey, cursor=cursor,
skip_exception=skip_exception)
nbinsert += 1
#self._connection.execute (s)
all += 1
if all % 100000 == 0:
self.LOG(
"adding %d lines into table %s" %
(all, table))
else:
primarykey = None
for c, v in columns.items():
if "PRIMARYKEY" in v:
primarykey = v[0]
if table not in self.get_table_list():
table_list = self.get_table_list()
message = "unable to find table " + table + \
" in [" + ",".join(table_list) + "]"
raise DBException(message)
column_has_space = len(
[v[0] for k, v in columns.items() if ' ' in v[0]]) > 0
self.LOG(
" column_has_space", column_has_space, [
v[0] for k, v in columns.items()])
if strict_separator or column_has_space:
file = TextFile(file, errors='ignore',
fLOG=self.LOG, encoding=encoding)
skip = False
else:
self.LOG(" changes", changes)
file = TextFileColumns(file, errors='ignore', fLOG=self.LOG,
regex=columns, changes=changes, encoding=encoding)
skip = True
file.open()
all = 0
num_line = 0
every = 100000
tsv = format == "tsv"
for line in file:
if stop != -1 and all >= stop:
break
num_line += 1
if skip:
dic = line
else:
if header and num_line == 1:
continue
if len(line.strip("\r\n")) == 0:
continue
if tsv:
dic = self._process_text_line(line, columns, format, lower_case=lower_case,
num_line=num_line - 1, fill_missing=fill_missing,
filter_case=filter_case, strict_separator=strict_separator)
else:
dic = format(line, **params)
if dic is None:
continue
if unique is not None:
if dic[unique] in unique_key:
continue
unique_key[dic[unique]] = 0
if dic is not None:
self._get_insert_request(
dic, table, True, primarykey, cursor=cursor)
nbinsert += 1
all += 1
if all % every == 0:
self.LOG(
"adding %d lines into table %s" %
(all, table))
file.close()
if cursor is not None:
cursor.close()
self.commit()
return nbinsert
[docs] def import_table_from_flat_file(self, file, table, columns, format="tsv", header=False,
display=False, lower_case=False, table_exists=False,
temporary=False, fill_missing=False, indexes=None,
filter_case=None, change_to_text=None, strict_separator=False,
add_key=None, encoding="utf-8", **params):
"""
Adds a table to database from a file.
:param file: file name or matrix
:param table: table name
:param columns: columns definition (see below)
if None: columns are guessed
:param format: tsv, the only one accepted for the time being,
it can be a function whose parameter are a line and **params
:param header: the file has a header of not, if True, skip the first line
:param lower_case: put every string in lower case before inserting it
:param table_exists: if True, do not create the table
:param temporary: adding a temporary table
:param fill_missing: fill the missing values
:param indexes: add indexes before appending all the available observations
:param filter_case: process every case information (used to replace space for example)
:param encoding: encoding
:param params: see format
:param change_to_text: changes the format from any to TEXT
:param display: if True, print more information on stdout
:param strict_separator: strict number of columns, it assumes there is no separator in the content of every column
:param add_key: name of a key to add (or None if nothing to add)
:return: the number of added rows
The columns definition must follow the schema:
- dictionary ``{ key: (column_name,python_type) }``
- or ``{ key: (column_name,python_type,preprocessing_function) }``
``preprocessing_function`` is a function whose prototype is for example:
::
def preprocessing_score (s) :
return s.replace (",",".")
And:
- if ``PRIMARYKEY`` is added, the key is considered as the primary key
- if ``AUTOINCREMENT`` is added, the key will automatically filled (like an id)
.. warning:: The function does not react well when a column name
includes a space.
:githublink:`%|py|365`
"""
if indexes is None:
indexes = []
if change_to_text is None:
change_to_text = []
if display:
if isinstance(file, list):
self.LOG("processing file ", file[:min(len(file), 10)])
else:
self.LOG("processing file ", file)
self._check_connection()
if columns is None:
# here, some spaces might have been replaced by "_", we need to get
# them back
columns, changes = self._guess_columns(
file, format, columns, filter_case=filter_case, header=header, encoding=encoding)
elif isinstance(columns, list):
columns_, changes = self._guess_columns(
file, format, columns, filter_case=filter_case, header=header, encoding=encoding)
if len(columns_) != len(columns):
raise DBException(
"different number of columns:\ncolumns={0}\nguessed={1}".format(
str(columns), str(columns_)))
columns = columns_
if add_key is not None:
columns[len(columns)] = (
add_key, int, "PRIMARYKEY", "AUTOINCREMENT")
for i in columns:
v = columns[i]
if v[0] in change_to_text:
if len(v) <= 2:
v = (v[0], (str, 1000000))
else:
v = (v[0], (str, 1000000)) + v[2:]
columns[i] = v
if display:
self.LOG(" columns ", columns)
if not isinstance(file, list) and not os.path.exists(file):
raise DBException("unable to find file " + file)
if not table_exists:
cursor = self.create_table(table, columns, temporary=temporary)
elif table not in self.get_table_list():
raise DBException("unable to find table " + table + " (1)")
else:
cursor = None
if table not in self.get_table_list():
raise DBException("unable to find table " + table + " (2)")
nb = self._append_table(file, table, columns, format=format, header=header,
lower_case=lower_case, cursor=cursor, fill_missing=fill_missing,
filter_case=filter_case, strict_separator=strict_separator,
changes=changes, encoding=encoding, **params)
self.LOG(nb, " lines imported")
for ind in indexes:
if isinstance(ind, str):
indexname = table + "_" + ind
else:
indexname = table + "_" + "_".join(ind)
if not self.has_index(indexname):
self.create_index(indexname, table, ind)
return nb