Source code for pysqllike.translation.translation_class

"""
One class which visits a syntax tree.


:githublink:`%|py|5`
"""

import ast
import inspect
from .code_exception import CodeException
from .node_visitor_translator import CodeNodeVisitor


[docs]class TranslateClass: """ Interface for a class which translates a code written in pseudo-SQL syntax into another language. :githublink:`%|py|17` """
[docs] def __init__(self, code_func): """ Constructor. :param code_func: code (str) or function(func) :githublink:`%|py|24` """ if isinstance(code_func, str): code = code_func else: code = inspect.getsource(code_func) self.init(code)
[docs] def init(self, code): """ Parses the function code and add it the class, it complements the constructor. :param code: function code :githublink:`%|py|37` """ node = ast.parse(code) v = CodeNodeVisitor() v.visit(node) self._rows = v.Rows self._code = code
[docs] def __str__(self): """ Returns a string representing a tree. :githublink:`%|py|48` """ return self.to_str()
[docs] def to_str(self, fields=None): """ Returns a string representing a tree. :param fields: additional fields to add at the end of each row :return: string :githublink:`%|py|57` """ if fields is None: fields = [] if len(fields) == 0: rows = ["{0}{1}: {2} - nbch {3}".format(" " * r["indent"], r["type"], r["str"], len(r.get("children", []))) for r in self._rows] else: rows = ["{0}{1}: {2} - nbch {3}".format(" " * r["indent"], r["type"], r["str"], len(r.get("children", []))) + " --- " + ",".join(["%s=%s" % (_, r.get(_, "")) for _ in fields]) for r in self._rows] return "\n".join(rows)
[docs] def Code(self): """ Returns the code of the initial Python function into another language. :return: str :githublink:`%|py|77` """ # we add a field "processed" in each rows to tell it was interpreted for row in self._rows: row["processed"] = row["type"] == "Module" code_rows = [] for row in self._rows: if row["processed"]: continue if row["type"] == "FunctionDef": res = self.interpretFunction(row) if res is not None and len(res) > 0: code_rows.extend(res) for row in self._rows: if not row["processed"]: return self.RaiseCodeException( "the function was unable to interpret all the lines", code_rows=code_rows) return "\n".join(code_rows)
[docs] def RaiseCodeException(self, message, field="processed", code_rows=None): """ Raises an exception when interpreting the code. :param field: field to add to the message exception :param code_rows: list of rows to display :raises: CodeException :githublink:`%|py|109` """ if code_rows is None: code_rows = [] if len(code_rows) > 0: if "_status" in self.__dict__ and len(self._status) > 0: code_rows = code_rows + ["", "-- STATUS --", ""] + self._status raise CodeException(message + "\n---tree:\n" + self.to_str(["processed"]) + "\n\n---so far:\n" + "\n".join(code_rows)) elif "_status" in self.__dict__: raise CodeException(message + "\n---tree:\n" + self.to_str(["processed"]) + "\n\n-- STATUS --\n" + "\n".join(self._status)) else: raise CodeException(message + "\n---tree:\n" + self.to_str(["processed"]))
[docs] def interpretFunction(self, obj): """ Starts the interpretation of node which begins a function. :param obj: obj to begin with (a function) :return: list of strings :githublink:`%|py|132` """ if "children" not in obj: return self.RaiseCodeException("children key is missing") if "name" not in obj: return self.RaiseCodeException("name is missing") obj["processed"] = True chil = obj["children"] code_rows = [] # signature name = obj["name"] argus = [_ for _ in chil if _["type"] == "arguments"] args = [] for a in argus: a["processed"] = True for ch in a["children"]: if ch["type"] == "arg": ch["processed"] = True args.append(ch) names = [_["str"] for _ in args] sign = self.Signature(name, names) if sign is not None and len(sign) > 0: code_rows.extend(sign) # the rest self._status = code_rows # for debugging purpose # assi = [_ for _ in chil if _["type"] == "Assign"] for an in chil: if an["type"] == "Assign": one = self.Intruction(an) if one is not None and len(one) > 0: code_rows.extend(one) elif an["type"] == "Return": one = self.interpretReturn(an) if one is not None and len(one) > 0: code_rows.extend(one) elif an["type"] == "arguments": pass else: return self.RaiseCodeException("unexpected type: " + an["type"]) return code_rows
[docs] def Signature(self, name, rows): """ Build the signature of a function based on its name and its children. :param name: name :param rows: node where type == arguments :return: list of strings (code) :githublink:`%|py|183` """ return self.RaiseCodeException("not implemented")
[docs] def Intruction(self, rows): """ Builds an instruction of a function based on its name and its children. :param rows: node where type == Assign :return: list of strings (code) :githublink:`%|py|192` """ rows["processed"] = True chil = rows["children"] name = [_ for _ in chil if _["type"] == "Name"] if len(name) != 1: return self.RaiseCodeException( "expecting only one row not %d" % len(chil)) call = [_ for _ in chil if _["type"] == "Call"] if len(call) != 1: return self.RaiseCodeException( "expecting only one row not %d" % len(call)) name = name[0] name["processed"] = True call = call[0] call["processed"] = True varn = name["str"] kind = call["str"] # the first attribute gives the name the table method = call["children"][0] method["processed"] = True table, meth = method["str"].split(".") if meth != kind: return self.RaiseCodeException( "cannot go further, expects: {0}=={1}".format( kind, meth)) if kind == "select": top = call["children"][1:] return self.Select(varn, table, top) elif kind == "where": top = call["children"][1:] return self.Where(varn, table, top) elif kind == "groupby": top = call["children"][1:] return self.GroupBy(varn, table, top) else: return self.RaiseCodeException("not implemented for: " + kind)
[docs] def Select(self, name, table, rows): """ Interprets a select statement. :param name: name of the table which receives the results :param table: name of the table it applies to :param rows: rows to consider :return: list of strings (code) :githublink:`%|py|245` """ return self.RaiseCodeException("not implemented")
[docs] def Where(self, name, table, rows): """ Interprets a select statement. :param name: name of the table which receives the results :param table: name of the table it applies to :param rows: rows to consider :return: list of strings (code) :githublink:`%|py|256` """ return self.RaiseCodeException("not implemented")
_symbols = {"Lt": "<", "Gt": ">", "Mult": "*", }
[docs] def ResolveExpression(self, node, prefixAtt): """ Produces an expression based on a a node and its children. :param node: node :param prefixAtt: prefix to add before an attribute (usually _) :return: a string, the used fields, the called functions :githublink:`%|py|268` """ if node["type"] == "keyword": chil = node["children"] if len(chil) == 1: node["processed"] = True return self.ResolveExpression(chil[0], prefixAtt) else: return self.RaiseCodeException( "not implemented for type: " + node["type"]) elif node["type"] == "BinOp" or node["type"] == "Compare": chil = node["children"] if len(chil) == 3: node["processed"] = True ex1, fi1, fu1 = self.ResolveExpression(chil[0], prefixAtt) ex2, fi2, fu2 = self.ResolveExpression(chil[1], prefixAtt) ex3, fi3, fu3 = self.ResolveExpression(chil[2], prefixAtt) fi1.update(fi2) fi1.update(fi3) fu1.update(fu2) fu1.update(fu3) ex = "{0}{1}{2}".format(ex1, ex2, ex3) return ex, fi1, fu1 else: return self.RaiseCodeException( "not implemented for type: " + node["type"]) elif node["type"] in TranslateClass._symbols: node["processed"] = True return TranslateClass._symbols[node["type"]], {}, {} elif node["type"] == "Attribute": node["processed"] = True return prefixAtt + node["str"], {node["str"]: node}, {} elif node["type"] == "Num": node["processed"] = True return node["str"], {}, {} elif node["type"] == "Call": node["processed"] = True expre = [] field = {} funcs = {} if node["str"] in ["Or", "And", "Not"]: for chil in node["children"]: if chil["type"] == "Attribute": if chil["str"] != node["str"]: return self.RaiseCodeException("incoherence") elif len(chil["children"]) != 1: return self.RaiseCodeException("incoherence") else: chil["processed"] = True ex, fi, fu = self.ResolveExpression( chil["children"][0], prefixAtt) expre.append("({0})".format(ex)) field.update(fi) funcs.update(funcs) expre.append(node["str"].lower()) else: ex, fi, fu = self.ResolveExpression(chil, prefixAtt) expre.append("({0})".format(ex)) field.update(fi) funcs.update(funcs) elif node["str"] == "CFT": # we need to look further as CFT is a way to call a function funcName = None subexp = [] for chil in node["children"]: if chil["type"] == "Attribute": chil["processed"] = True ex, fi, fu = self.ResolveExpression(chil, prefixAtt) subexp.append(ex) field.update(fi) funcs.update(funcs) elif chil["type"] == "Name" and chil["str"] == "CFT": pass elif chil["type"] == "Name": # we call function chil["str"] funcName = chil["str"] funcs[chil["str"]] = chil["str"] else: return self.RaiseCodeException( "unexpected configuration: " + node["type"]) chil["processed"] = True expre.append("{0}({1})".format(funcName, ",".join(subexp))) elif node["str"] == "len": # aggregated function funcName = None subexp = [] for chil in node["children"]: if chil["type"] == "Attribute": chil["processed"] = True ex, fi, fu = self.ResolveExpression(chil, prefixAtt) subexp.append(ex) field.update(fi) funcs.update(funcs) elif chil["type"] == "Name" and chil["str"] == "CFT": pass elif chil["type"] == "Name": # we call function chil["str"] funcName = chil["str"] funcs[chil["str"]] = chil["str"] else: return self.RaiseCodeException( "unexpected configuration: " + node["type"]) chil["processed"] = True expre.append("{0}({1})".format(funcName, ",".join(subexp))) else: return self.RaiseCodeException( "not implemented for function: " + node["str"]) return " ".join(expre), field, funcs else: return self.RaiseCodeException( "not implemented for type: " + node["type"])
[docs] def interpretReturn(self, obj): """ Starts the interpretation of a node which sets a return. :param obj: obj to begin with (a function) :return: list of strings :githublink:`%|py|401` """ if "children" not in obj: return self.RaiseCodeException("children key is missing") allret = [] obj["processed"] = True for node in obj["children"]: if node["type"] == "Name": allret.append(node) else: return self.RaiseCodeException("unexpected type: " + node["type"]) return self.setReturn(allret)
[docs] def setReturn(self, nodes): """ Indicates all nodes containing information about returned results. :param node: list of nodes :return: list of string :githublink:`%|py|419` """ return self.RaiseCodeException("not implemented")