Source code for pyensae.sql.database_join_group

"""
:class:`Database <pyensae.sql.database_main.Database>`


:githublink:`%|py|6`
"""

import re
import copy


[docs]class DatabaseJoinGroup: """ This class is not neant to be working alone. It contains functions for a database able to build SQL requests for frequent needs such as join SQL requests. :class:`Database <pyensae.sql.database_main.Database>` :githublink:`%|py|18` """
[docs] class JoinTreeNode: """ define a node meant to be included in a graph to define a big join :githublink:`%|py|23` """
[docs] def __init__(self, table, parent_key=None, key=None, where=None, prefix=None, avoid_prefix=False): """ constructor this node defines a join on two tables (parent_table, table) on two keys (parent_key, key). The keys can be tuple or string. :param table: table name :param parent_key: None if it is the root :param key: None if it is the root :param where: clause where where is a where clause defined as a dictionary: example:: { "field": ("==", value), ("table","field"): (">=", value) } You may add field not connected to a table, they will not taken into account. :param prefix: add a prefix, avoid different fields collide :param avoid_prefix: avoid using a prefix to build SQL queries, use syntax ``( ... ) AS ...`` :githublink:`%|py|50` """ self.table = table self.parent_key = parent_key self.key = key self.where = {} self.predecessor = None self.successor = [] self.prefix = None self._count_as = 0 self._avoid_prefix = avoid_prefix if self.parent_key is None and self.key is not None: raise KeyError( # pragma: no cover "parent_key is missing") if where is not None: if not isinstance(where, dict): raise TypeError( # pragma: no cover "parameter where: only dict or None expected (not %s)" % str( type(where))) for k, v in where.items(): if not isinstance(k, tuple): k = tuple(k.split(".")) if len(k) == 1: self.where[k[0]] = v elif len(k) == 2: if k[-2] == self.table: self.where[k[-1]] = v else: raise ValueError( # pragma: no cover "not able to deal with than clause %s:%s" % (str(k), str(v)))
[docs] def __str__(self): """ usual :githublink:`%|py|84` """ mes = ["*nb succ: %d" % len(self.successor)] for k, v in sorted(self.__dict__.items()): if k not in ["table", "parent_key", "where", "key"]: continue s = k + " " * (12 - len(k)) s += str(v) mes.append(s) i = 0 for n in self.successor: r = str(n) li = r.split("\n") li = [" " + s2 for s2 in li] r = "\n".join(li) mes.append("node %d" % i) mes.append(r) i += 1 return "\n".join(mes) + "\n"
[docs] def append(self, n): """ add a successor :param n: new successor :githublink:`%|py|106` """ if n.predecessor is not None: raise ValueError( # pragma: no cover "This node was already added in another part of the tree. You must duplicate it.") self.successor.append(n) n.predecessor = self
[docs] def get_nb_successor(self): """ :return: the number of successors :githublink:`%|py|116` """ return len(self.successor)
[docs] def check_prefix(self, nb=-1): """ :param nb: index of this node is the predecessor list of successor check the prefixes, all one if there is none :githublink:`%|py|123` """ if self._avoid_prefix: self.PREFIX = "" return if "PREFIX" in self.__dict__: return if self.prefix is None: if nb == -1: self.prefix = "" else: self.prefix = chr(97 + nb) for i, n in enumerate(self.successor): n.check_prefix(i) self.PREFIX = "" if self.prefix is None else self.prefix self.PREFIX = self._build_predecessor_prefix() + self.PREFIX
[docs] def _build_predecessor_prefix(self): """ private method :githublink:`%|py|142` """ if self._avoid_prefix: return "" r = "" n = self.predecessor while n is not None: r += n.prefix n = n.predecessor return r
[docs] def clean(self): """ remove all sql,fields members :githublink:`%|py|155` """ if "SELECT" in self.__dict__: del self.__dict__["SELECT"] del self.__dict__["FIELDS"] del self.__dict__["PREFIX"] for n in self.successor: n.clean()
[docs] def _in_select(self, db): """ return the SQL select on the table :param db: database :return: list of tuple (fieldas, table, field, which), where :githublink:`%|py|167` """ fields = db.get_table_columns_list(self.table) where = {} for f, t in fields: if f in self.where: where["%s.%s" % (self.table, f)] = self.where[f] elif (self.table, f) in self.where: where["%s.%s" % (self.table, f)] = self.where elif "." in f: table, z = f.split(".")[-2:] if table == self.table and z in self.where: where["%s.%s" % (table, z)] = self.where[z] prefix = self.PREFIX fas = [] for f, t in fields: fas.append((prefix + f, self.table, f, True)) return fas, where
[docs] def _build_select(self, db, fas, where, tfrom=None): """ build a select SQL request :param db: database :param fas: list of tuple table,f,fas :param where: where clause :param tfrom: from clause, if None, --> self.table :return: string :githublink:`%|py|193` """ lines = [] lines.append("SELECT") alkey = True if len(fas) > 0: mx = max([len(s[0]) for s in fas]) + 1 name_changed = 0 for well in fas: fn, t, fo = well[:3] if fn == self.key: if alkey: alkey = False doit = True else: doit = False else: doit = True if doit: s = " " * (mx - len(fn)) if len(well) == 4 and well[3]: s = " %s%s AS %s," % (fo, s, fn) name_changed += 1 else: s = " %s," % fn lines.append(s) lines[-1] = lines[-1][:-1] # kill the last comma else: raise Exception("fas should not be empty") if tfrom is None: tfrom = self.table if "\n" in tfrom: lines.append( "FROM (\n%s) AS temp_tbl%d" % (tfrom, self._count_as)) self._count_as += 1 else: lines.append("FROM %s" % tfrom) if len(where) > 0: wh = db._build_where_condition(where) lines.append(wh) return "\n".join(lines)
[docs] def _find_in_fas(self, fas, a, b): """ find a,b in fas (column 1 and 2) :param fas: list [ (new_name, table, name)] :param a: table name :param b: name :githublink:`%|py|246` """ for name, tbl, field in fas: if a == tbl and b == field: return name raise ValueError( # pragma: no cover "unable to find field %s.%s in (%s)" % (a, b, str(fas)))
[docs] def _build_join(self, db, fas, select, n): """ see :meth:`build_sql <pyensae.sql.database_join_group.build_sql>` :param db: database :param fas: list [(new_name, table, name)] :param select: condition :param n: node :githublink:`%|py|262` """ other_select = n.SELECT parent_key = n.parent_key key = n.key other_select = other_select.split("\n") other_select = [" " + s for s in other_select] other_select = "\n".join(other_select) select = select.split("\n") select = [" " + s for s in select] select = "\n".join(select) res = self._build_select(db, fas, {}, select) res += "\nINNER JOIN (\n" res += other_select + ")" res += "\nON " ppref = n.predecessor.PREFIX pref = n.PREFIX if isinstance(parent_key, str): if parent_key.startswith("<PREFIX>"): a, b = parent_key[8:].split(".") parent_key = (self._find_in_fas(fas, a, b),) else: parent_key = (ppref + parent_key,) if key.startswith("<PREFIX>"): a, b = key[8:].split(".") key = (self._find_in_fas(fas, a, b),) else: key = (pref + key,) else: pk = [] k = [] for m, n in zip(parent_key, key): if m.startswith("<PREFIX>"): a, b = m[8:].split(".") pk.append(self._find_in_fas(fas, a, b)) else: pk.append(ppref + m) if n.startswith("<PREFIX>"): a, b = n.split(".") k.append(self._find_in_fas(fas, a, b)) else: k.append(pref + n) parent_key = tuple(pk) key = tuple(k) oni = [] for k, l in zip(parent_key, key): oni.append("%s == %s" % (k, l)) oni = " AND \n ".join(oni) res += oni return res
[docs] def build_sql(self, db): """ build the sql request :param db: database The function adds two attributes: - SELECT: sql request for a node - FIELDS: list of [ (final_name, table, original_name) :githublink:`%|py|331` """ self.check_prefix() for n in self.successor: n.build_sql(db) fas, where = self._in_select(db) fields = [f[:3] for f in fas] select = self._build_select(db, fas, where) if self.get_nb_successor() == 0: pass else: for n in self.successor: fields.extend(n.FIELDS) select = self._build_join(db, fields, select, n) self.SELECT = select self.FIELDS = fields
[docs] def __init__(self): """ constructor :githublink:`%|py|354` """ self._count_as = 0
################################################## # the class itself: multiple joins using this tree ##################################################
[docs] def inner_joins(self, root, execute=False, create_index=False, created_table=None, duplicate_column=True, order=None, unique=False, distinct=False, fields=None, nolog=True): """ create several SQL inner join requests (included into each others) :param root: JoinTreeNode (the root) :param execute: if True, execute the query :param create_index: if True, creates an index on the second table if it does not exist: it accelerates the inner join :param created_table: if execute is True, you must specify a table name to be created :param duplicate_column: do not include columns from the second table if their name is already in the first one :param order: order clause, list of 2-tuple (column, way) way is None or DESC :param unique: unique or not :param distinct: add the keyword DISTINCT :param fields: restriction to fields given by fields or no restriction if None :param nolog: if True, do not log the query :return: SQL request, list of fields ("source", "new name") .. warning:: Some options are not available yet: - create_index True - duplicate_column False - order != [] - unique True .. todo:: Three tasks (however, this won't probably happen) - Finish The function inner_joins (parameters create_index, duplicate_column, order, unique). - Improve the handling of keyword DISTINCT - Handle keyword fields :githublink:`%|py|387` """ if order is None: order = [] if create_index: raise RuntimeError( # pragma: no cover "create_index = True: this option is not available") if not duplicate_column: raise RuntimeError( # pragma: no cover "duplicate_column = False: this option is not available") if len(order) > 0: raise RuntimeError( # pragma: no cover "order != []: this option is not available") if unique: raise RuntimeError( # pragma: no cover "unique = True: this option is not available") if fields is not None: raise RuntimeError( # pragma: no cover "fields != None: this option is not possible yet %s." % (str(fields))) root.build_sql(self) select = root.SELECT fields = root.FIELDS if distinct: if not select.startswith("SELECT"): raise ValueError("algorithm problem") # pragma: no cover select = "SELECT DISTINCT" + select[len("SELECT"):] if execute: if created_table is None: raise RuntimeError( # pragma: no cover "unable to execute the SQL query: not specified name for the table to create") if created_table in self.get_table_list(): raise ValueError("table %r already exists" % created_table) select = "CREATE TABLE %s AS \n" % created_table + select self.execute(select, nolog=nolog) return select, fields
################################################## # the other methods ##################################################
[docs] def _build_where_condition(self, where, add_keyword_where=True): """ builds a where condition (including the WHERE keyword) :param where: condition where to interpret:: { "field": ("==", value) } :param add_keyword_where: add the keyword where ? :return: sql syntax .. todo:: This function should deal with a tree to express AND and OR logical links. (However, this probably won't happen.) :githublink:`%|py|443` """ sql = "" if where is not None and len(where) > 0: if add_keyword_where: sql += " WHERE " if isinstance(where, str): sql += where elif isinstance(where, dict): a = [] for k, v in where.items(): if v[1] not in ['==', '<=', '>=', '>', '<', '!=']: v = (v[1], v[0]) if v[1] not in ['==', '<=', '>=', '>', '<', '!=']: raise ValueError( # pragma: no cover "unable to understand where %s,%s " % (k, str(v))) if v[1] == '==' and self.isMSSQL(): v = (v[0], '=') v = (v[0], " %s " % v[1]) if isinstance(v[0], str): if "'" in v[0]: s = k + v[1] + "'" + v[0].replace("'", "''") + "'" else: s = k + v[1] + "'" + v[0] + "'" else: s = k + v[1] + str(v[0]) a.append(s) sql += " AND ".join(a) else: raise ValueError( # pragma: no cover "unable to interpret this where condition %s" % (str(where))) return sql
[docs] def histogram(self, table, columns, col_sums=None, values=None, sql_add=None, execute=False, created_table=None, new_column="histogram", nolog=False): """ create a SQL request to compute an histogram :param table: table :param columns: column or columns (in a tuple) to be histogrammized :param col_sums: candidate columns for a sum :param values: specific values, several cases: - if None: does a GROUP BY - if dictionary of tuple: ``{'cat1':('val1', 'val2', ...) }`` then groups together several values into one category - if list of float: does an histogram on a real variable :param new_column: name of the new column :param sql_add: string to be added at the end of the SQL request :param execute: if True, execute the request :param created_table: the histogram can be stored into a table whose name is given by this parameter :param nolog: if True, do not log the query :return: SQL request :githublink:`%|py|498` """ if col_sums is None: col_sums = [] if isinstance(columns, str): columns = (columns,) cols = self.get_table_columns_list(table) for column in columns: if column not in [x[0] for x in cols]: raise ValueError( "%r is not a column of table %r\n- columns:\n%r" % (column, table, "\n".join( [ str(x) for x in cols]))) if sql_add is None or len(sql_add) == 0: sql_add = "" else: sql_add = ",\n " + sql_add sum_column = [] for c in col_sums: s = "SUM(%s) AS sum_%s" % (c, c) sum_column.append(s) str_sum = ", ".join(sum_column) if len(str_sum) > 0: str_sum = ", " + str_sum if values is None: sql = "SELECT %s AS %s, COUNT(%s) AS %s_nb%s%s\nFROM %s\nGROUP BY %s" % \ (", ".join(columns), new_column, "*", new_column, str_sum, sql_add, table, ", ".join(columns)) select = sql elif isinstance(values, dict): values_rev = {} for k, vv in values.items(): for v in vv: if v not in values_rev: values_rev[v] = [] values_rev[v].append(k) for k, v in values_rev.items(): if len(v) > 1: raise ValueError( # pragma: no cover "a category is shared by several values %r and %r" % (k, ", ".join(v))) for k in values_rev: values_rev[k] = values_rev[k][0] def filterfunctionhistogramdict1(v): return values_rev.get(v, "none") def filterfunctionhistogramdict2(a, b): return values_rev.get((a, b), "none") def filterfunctionhistogramdict3(a, b, c): return values_rev.get((a, b, c), "none") def filterfunctionhistogramdict4(a, b, c, d): return values_rev.get((a, b, c, d), "none") def filterfunctionhistogramdict5(a, b, c, d, e): return values_rev.get((a, b, c, d, e), "none") self.add_function( "filterfunctionhistogramdict1", 1, filterfunctionhistogramdict1) self.add_function( "filterfunctionhistogramdict2", 2, filterfunctionhistogramdict2) self.add_function( "filterfunctionhistogramdict3", 3, filterfunctionhistogramdict3) self.add_function( "filterfunctionhistogramdict4", 4, filterfunctionhistogramdict4) self.add_function( "filterfunctionhistogramdict5", 5, filterfunctionhistogramdict5) st = ",".join(["a", "b", "c", "d", "e"][:len(cols)]) sql = "\n -- def filterfunctionhistogramdict%d (%s) : return %s.get (%s, 'none')\n\n" % ( len(columns), st, str(values_rev), st) sql += "\n SELECT " + \ ",\n ".join([x[0] for x in cols]) sql += ",\n filterfunctionhistogramdict%d (%s) AS histo_temp_col" % ( len(columns), ", ".join(columns),) sql += "\n FROM %s" % table sql = "(" + sql + ") AS temp_tbl%d" % self._count_as self._count_as += 1 select = "SELECT histo_temp_col AS %s,COUNT(histo_temp_col) AS %s_nb\n %s\n %s\nFROM %s\nGROUP BY histo_temp_col" % \ (new_column, new_column, str_sum, sql_add, sql) elif isinstance(values, list): values = sorted(copy.copy(values)) values2 = values[1:] + [max(1e10, max(values) + 1), ] names = list(values) couple = list(zip(range(0, len(values)), values, values2, names)) def filterfunctionhistogramlist(v): for i, x, x_, n in couple: if v < x_: return n raise RuntimeError( # pragma: no cover "unable to process, " + str(v) + " is a value higher than 1e10") self.add_function( "filterfunctionhistogramlist", 1, filterfunctionhistogramlist) sql = "" sql += "\n SELECT " + ",\n ".join([x[0] for x in cols]) sql += ",\n filterfunctionhistogramlist (%s) AS histo_temp_col" % ( ", ".join(columns),) sql += "\n FROM %s" % table sql = "(" + sql + ")" select = "SELECT histo_temp_col AS %s,COUNT(histo_temp_col) AS %s_nb\n %s\n %s\nFROM %s\nGROUP BY histo_temp_col" % \ (new_column, new_column, str_sum, sql_add, sql) else: raise TypeError( # pragma: no cover "values has not a type (%s) not in [None, dict, list]" % (str( type(values)))) if execute: if created_table is None: raise RuntimeError( # pragma: no cover "unable to execute the SQL query: not specified name for the table to create") if created_table in self.get_table_list(): raise RuntimeError( # pragma: no cover "table %r already exists" % created_table) select = "CREATE TABLE %s AS \n" % created_table + select self.execute(select, nolog=nolog) return select
[docs] def inner_join(self, table1, table2, field1, field2=None, where=None, execute=False, create_index=True, created_table=None, prefix="", duplicate_column=True, prefix_all="", order=None, unique=True, params=None, nolog=True): """ create a SQL inner join request :param table1: first table :param table2: second table :param field1: inner join on field1 from table1 :param field2: inner join on field2 from table2 (if None --> field2 = field1 :param where: where clause (if None, do not add it), dictionary or string :param execute: if True, execute the query :param create_index: if True, creates an index on the second table if it does not exist: it accelerates the inner join :param created_table: if execute is True, you must specify a table name to be created :param prefix: prefix for fields from the second table :param duplicate_column: do not include columns from the second table if their name is already in the first one :param prefix_all: prefix for all fields :param order: order clause, list of 2-tuple (column, way) way is None or DESC :param unique: unique or not :param params: special parameters for inner_joins method :param nolog: if True, do not log the query, otherwise, skip that part :return: SQL request, list of fields ("source", "new name") :githublink:`%|py|671` """ if order is None: order = [] if params is None: params = {} if field2 is None: field2 = field1 cols1 = self.get_table_columns_list(table1) cols1 = [f[0] for f in cols1] if len(cols1) == 0: raise ValueError( # pragma: no cover "table %r has no field" % table1) joinsm = table2 == '________________' if joinsm: cols2 = [f[1] for f in params["fields"]] if len(cols2) == 0: raise RuntimeError( # pragma: no cover "imported table has no field") table2 = params["sql"].split("\n") table2 = [" " + s for s in table2] table2 = "\n".join(table2) table2 = "(%s)" % (table2.strip("\n\r "),) else: cols2 = self.get_table_columns_list(table2) cols2 = [f[0] for f in cols2] if len(cols2) == 0: raise Exception("table %s has no field" % table2) if isinstance(field1, tuple): for k in field1: if k not in cols1: raise ValueError( # pragma: no cover "unable to find field %r in table %r" % (k, table1)) for k in field2: if k not in cols2: raise ValueError( # pragma: no cover "unable to find field %r in table %r" % (k, table2)) else: if field1 not in cols1: raise ValueError( # pragma: no cover "unable to find field %r in table %r" % (field1, table1)) if field2 not in cols2: raise ValueError( # pragma: no cover "unable to find field %r in table %r" % (field2, table2)) field1 = (field1,) field2 = (field2,) if create_index and joinsm: li = self.get_index_list() ind = False for name, tbl_name, sql in li: if tbl_name != table2: continue fields = re.compile("[(](\\w*)[)]").search(sql).groups() if len(fields) == 0: continue field = fields[0] if field not in field2: continue ind = True if not ind: self.LOG( # pragma: no cover "creating an index on table %r, field %r" % (table2, ", ".join(field2))) self.create_index("index_" + table2.replace(".", "_") + "_" + "_".join(field2), table2, field2, unique=unique) keyfields = {} for k in field1: if k in cols2: if k not in field2: keyfield = ("%s.%s" % (table1, k), table1 + "_" + k) else: keyfield = ("%s.%s" % (table1, k), k) else: keyfield = (k, k) keyfields[k] = keyfield if "." in table1: ptable1 = table1.split(".")[1] else: ptable1 = table1 if "." in table2: ptable2 = table2.split(".")[1] else: ptable2 = table2 fields = [] for c in cols1: if ":" in c: continue if c in keyfields: fields.append(keyfields[c]) elif c in cols2: if duplicate_column: fields.append( ("%s.%s" % (ptable1, c), "%s_%s" % (ptable1, c))) else: fields.append(("%s.%s" % (ptable1, c), c)) else: fields.append((c, c)) for c in cols2: if ":" in c: continue if c in field2: continue if c in cols1: if duplicate_column: fields.append( ("%s.%s" % (ptable2, c), prefix + "%s_%s" % (ptable2, c))) else: fields.append((c, prefix + c)) mx = max([len(f[0]) for f in fields]) + 1 rem = params.get("as_remove", None) if rem is None: fields = [(f[0] + " " * (mx - len(f[0])), prefix_all + f[1]) for f in fields] else: cfields = fields fields = [] for f in cfields: a = f[0] + " " * (mx - len(f[0])) b = (prefix_all + f[1]).replace(rem, "") if b in cols1: b = (prefix_all + f[1]).replace(rem, "_") fields.append((a, b)) all = [" AS ".join(f) for f in fields] select = ",\n ".join(all) select = "SELECT " + select + "\nFROM %s\n" % table1 if unique: select += "INNER JOIN %s\n" % table2 else: select += "JOIN %s\n" % table2 nb = 0 for k1, k2 in zip(field1, field2): if nb > 0: select += " AND " else: select += "ON " if k1 in cols2: select += "%s.%s " % (table1, k1) else: select += "%s " % k1 if k2 in cols1 and not joinsm: select += "== %s.%s\n" % (table2, k2) else: select += "== %s\n" % k2 nb += 1 if where is not None and len(where) > 0: select += "WHERE " + where + "\n" if order is not None and len(order) > 0: te = [] for o in order: if isinstance(o, tuple): if o[0] in cols1 and o[0] in cols2: te.append(ptable1 + "." + o[0] + " " + o[1]) else: te.append(o[0] + " " + o[1]) else: if o in cols1 and o in cols2: te.append(ptable1 + "." + o) else: te.append(o) select += "ORDER BY " + ", ".join(te) + "\n" #select += ";" if execute: if created_table is None: raise RuntimeError( # pragma: no cover "unable to execute the SQL query: not specified name for the table to create") if created_table in self.get_table_list(): raise ValueError( # pragma: no cover "table %r already exists" % created_table) select = "CREATE TABLE %s AS \n" % created_table + select self.execute(select, nolog=nolog) fields = [(a_.strip(), b_) for a_, b_ in fields] return select, fields