Source code for mlprodict.grammar_sklearn.grammar.gactions

"""
@file
@brief Action definition.
"""
import numpy
from .api_extension import AutoAction
from .gtypes import (
    MLType, MLNumTypeFloat32, MLNumTypeFloat64,
    MLTensor, MLNumTypeInt32, MLNumTypeInt64, MLNumTypeBool)


[docs]class MLAction(AutoAction): """ Base class for every action. """
[docs] def __init__(self, inputs, output, name, children=None): """ @param inputs type of inputs @param output output type @param name a name which identifies the action @param children actions used to compute this one """ if not isinstance(inputs, list): raise TypeError( 'inputs must be a list of MLType.') # pragma: no cover for t in inputs: if not isinstance(t, MLType): raise TypeError( # pragma: no cover "Every input must be a MLType not '{0}'.".format(type(t))) if not isinstance(output, MLType): raise TypeError('output must be of MLType.') # pragma: no cover self.inputs = inputs self.output = output self.name = name self.children = children if children is not None else [] for child in self.children: if not isinstance(child, MLAction): # pragma: no cover raise TypeError("All children must be of type MLAction")
[docs] def execute(self, **kwargs): """ Computes the action. Returns the output. """ # It must be overwritten. self.children_results_ = [child.execute( **kwargs) for child in self.children] for v, tv in zip(self.children_results_, self.inputs): tv.validate(v)
@property def ChildrenResults(self): """ Return the last execution results. """ return self.children_results_
[docs] def enumerate_variables(self): """ Enumerates all variables. """ for child in self.children: for var in child.enumerate_variables(): yield var
[docs] def graph_execution(self): """ Returns a formated string which retruns the outputs. """ rows = [] rows.append("-- BEGIN {0} {3} id={1} output={2}".format( self.name, id(self), self.output._cache, getattr(self, "comment", ""))) for i, ch in enumerate(self.children): gr = ch.graph_execution() temp = [" " + li for li in gr.split("\n")] temp[0] = " {0}-".format(i) + temp[0][4:] rows.extend(temp) rows.append( "-- END {0} -- output={1}".format(self.name, self.output._cache)) return "\n".join(rows)
@AutoAction.cache def _export_json(self, hook=None, result_name=None): val = {"output": self.output._export_json()} if self.children: val["action"] = dict(name=self.name, variants=[c._export_json(hook=hook) for c in self.children]) else: val["action"] = dict(name=self.name) if self.inputs: val["input"] = [i._export_json(hook=hook) for i in self.inputs] return val @AutoAction.cache def _export_c(self, hook=None, result_name=None): if result_name is None: raise ValueError( "result_name must not be None") # pragma: no cover rows = [] rows.append("// {0}-{1} - children".format(id(self), self.name)) names = [] if self.children: for i, c in enumerate(self.children): rname = "{0}{1}{2}".format( result_name, getattr(self, "cname", ""), i) dc = c._export_c(hook=hook, result_name=rname) if not dc['cache']: rows.append(dc['code']) names.append(dc['result_name']) rows.append("// {0}-{1} - itself".format(id(self), self.name)) res = "\n".join(rows) return {'code': res, 'result_name': result_name, 'child_names': names}
[docs]class MLActionCst(MLAction): """ Constant """
[docs] def __init__(self, cst, inout_type=None, comment=None): """ @param cst constant @param inout_type type @param comment comment """ if inout_type is None: inout_type = MLActionCst.guess_type(cst) MLAction.__init__(self, [], inout_type, "cst") inout_type.validate(cst) self.cst = cst self.comment = comment
[docs] @staticmethod def guess_type(value): """ Guesses a type given a value. """ if isinstance(value, numpy.float32): return MLNumTypeFloat32() if isinstance(value, numpy.float64): return MLNumTypeFloat64() if isinstance(value, (int, numpy.int32)): return MLNumTypeInt32() if isinstance(value, (int, numpy.int64)): return MLNumTypeInt64() if isinstance(value, numpy.ndarray): a = numpy.zeros(1, value.dtype) t = MLActionCst.guess_type(a[0]) return MLTensor(t, value.shape) raise NotImplementedError( # pragma: no cover "Not implemented for type '{0}'".format(type(value)))
[docs] def execute(self, **kwargs): MLAction.execute(self, **kwargs) return self.output.validate(self.cst)
[docs] def graph_execution(self): if self.comment: return "cst: {0} = {1}".format(self.comment, self.cst) return "cst: {0}".format(self.cst)
@AutoAction.cache def _export_json(self, hook=None, result_name=None): res = {"name": "cst", "value": self.output._format_value_json(self.cst, hook=hook)} if hasattr(self, "comment"): res["comment"] = self.comment return res @AutoAction.cache def _export_c(self, hook=None, result_name=None): if result_name is None: raise ValueError("result_name cannot be None.") # pragma: no cover dc = self.output._export_c(hook='declare', result_name=result_name) res = "{0} = {1};".format( dc['code'], self.output._format_value_c(self.cst)) if self.comment: res += " // {0}".format(self.comment) return {'code': res, 'result_name': result_name}
[docs]class MLActionVar(MLActionCst): """ Variable. The constant is only needed to guess the variable type. """
[docs] def __init__(self, value, name, inout_type=None): """ @param value value @param name variable name @param inout_type type """ MLActionCst.__init__(self, value, inout_type) self.name = "var" self.name_var = name
[docs] def execute(self, **kwargs): MLAction.execute(self, **kwargs) if self.name_var not in kwargs: raise KeyError( # pragma: no cover "Unable to find variable name '{0}'".format(self.name_var)) return self.output.validate(kwargs[self.name_var])
[docs] def enumerate_variables(self): """ Enumerates itself. """ yield self
[docs] def graph_execution(self): return "var: {0} = {1} ({2})".format(self.name_var, self.name, self.output._cache)
@AutoAction.cache def _export_json(self, hook=None, result_name=None): return {"name": "var", "value": self.name_var} @AutoAction.cache def _export_c(self, hook=None, result_name=None): if result_name is None: raise ValueError( # pragma: no cover "result_name must not be None") dc = self.output._export_c(hook='typeref', result_name=result_name) res = "{0} = {1};".format(dc['code'], self.name_var) return {'code': res, 'result_name': result_name}
[docs]class MLActionFunctionCall(MLAction): """ Any function call. """
[docs] def __init__(self, name, output, *acts): """ @param name function name @param output type @param *acts list of arguments """ for act in acts: if not isinstance(act, MLAction): raise TypeError( # pragma: no cover "All element of acts must be MLAction not '{0}'.".format(type(act))) MLAction.__init__(self, [act.output for act in acts], output, name, children=acts) self.cname = 'c'
[docs] def _optional_parameters(self): """ Returns additional parameters to add the function call. """ return None
@AutoAction.cache def _export_c(self, hook=None, result_name=None): if result_name is None: raise ValueError( "result_name must not be None") # pragma: no cover dcf = MLAction._export_c(self, hook=hook, result_name=result_name) rows = [dcf['code']] fcall = ", ".join(dcf['child_names']) add = self._optional_parameters() # pylint: disable=E1128 if add is not None: fcall = ", ".join([fcall, add]) dc = self.output._export_c(hook='declare', result_name=result_name) rows.append(dc['code'] + ";") ep = self.output._byref_c() type_list = "_".join(c.output.CTypeSingle for c in self.children) rows.append("{0}_{4}({3}{1}, {2});".format( self.name, result_name, fcall, ep, type_list)) rows.append("// {0}-{1} - done".format(id(self), self.name)) # Addition printf to debug the C++ code. # rows.append('printf("C++ {1} %f\\n", {0});'.format(result_name, self.name)) res = {'code': "\n".join(rows), 'result_name': dcf['result_name']} return res
[docs]class MLActionBinary(MLAction): """ Any binary operation. """
[docs] def __init__(self, act1, act2, name): """ @param act1 first element @param act2 second element @param name operator name """ if not isinstance(act1, MLAction): raise TypeError("act1 must be MLAction.") # pragma: no cover if not isinstance(act2, MLAction): raise TypeError("act2 must be MLAction.") # pragma: no cover MLAction.__init__(self, [act1.output, act2.output], act2.output, name, children=[act1, act2])
@AutoAction.cache def _export_c(self, hook=None, result_name=None): if result_name is None: raise ValueError( "result_name must not be None") # pragma: no cover dc = MLAction._export_c(self, hook=hook, result_name=result_name) rows = [dc['code']] dc2 = self.output._export_c(hook='type') op = "{2} {0} = {0}0 {1} {0}1;".format( result_name, self.name, dc2['code']) rows.append(op) rows.append("// {0}-{1} - done".format(id(self), self.name)) return {'code': "\n".join(rows), 'result_name': result_name}
[docs]class MLActionUnary(MLAction): """ Any binary operation. """
[docs] def __init__(self, act1, name): """ @param act1 element @param name operator name """ if not isinstance(act1, MLAction): raise TypeError("act1 must be MLAction.") # pragma: no cover MLAction.__init__(self, [act1.output], act1.output, name, children=[act1])
@AutoAction.cache def _export_c(self, hook=None, result_name=None): if result_name is None: raise ValueError( # pragma: no cover "result_name must not be None") dc = MLAction._export_c(self, hook=hook, result_name=result_name) rows = [dc['code']] op = "auto {0} = {1} {0}0;".format(result_name, self.name) rows.append(op) rows.append("// {0}-{1} - done".format(id(self), self.name)) return {'code': "\n".join(rows), 'result_name': result_name}
[docs]class MLActionConcat(MLActionFunctionCall): """ Concatenate number of arrays into an array. """
[docs] def __init__(self, act1, act2): """ @param act1 first element @param act2 second element """ if not isinstance(act1, MLAction): raise TypeError("act1 must be MLAction.") # pragma: no cover if not isinstance(act2, MLAction): raise TypeError("act2 must be MLAction.") # pragma: no cover n1 = (1 if isinstance(act1.output, (MLNumTypeFloat32, MLNumTypeFloat64)) else act1.output.dim[0]) n2 = (1 if isinstance(act2.output, (MLNumTypeFloat32, MLNumTypeFloat64)) else act2.output.dim[0]) MLActionFunctionCall.__init__(self, "concat", MLTensor( act1.output.__class__(), (n1 + n2,)), act1, act2)
[docs] def execute(self, **kwargs): """ Concatenation """ MLActionFunctionCall.execute(self, **kwargs) res = self.ChildrenResults return self.output.validate(numpy.array(res))
[docs]class MLActionCast(MLActionUnary): """ Cast into another type. """
[docs] def __init__(self, act1, new_type): """ @param act1 element @param new_type new type """ MLActionUnary.__init__(self, act1, "cast") self.output = new_type
[docs] def execute(self, **kwargs): MLActionUnary.execute(self, **kwargs) res = self.ChildrenResults return self.output.validate(self.output.cast(res[0]))
@AutoAction.cache def _export_c(self, hook=None, result_name=None): raise NotImplementedError( # pragma: no cover "Not enough information to do it here.")
[docs]class MLActionIfElse(MLAction): """ Addition """
[docs] def __init__(self, cond, act1, act2, check_type=True, comment=None): """ @param cond condition @param act1 first action @param ect2 second action @param check_type check ype @param comment comment """ if not isinstance(act1, MLAction): raise TypeError("act1 must be MLAction.") # pragma: no cover if not isinstance(act2, MLAction): raise TypeError("act2 must be MLAction.") # pragma: no cover if not isinstance(cond, MLAction): raise TypeError("cond must be MLAction.") # pragma: no cover if not isinstance(cond.output, MLNumTypeBool): raise TypeError( # pragma: no cover "No boolean condition {0}".format(type(cond.output))) if check_type and type(act1.output) != type(act2.output): raise TypeError("Not the same input type {0} != {1}".format( # pragma: no cover type(act1.output), type(act2.output))) MLAction.__init__(self, [cond.output, act1.output, act2.output], act2.output, "if", children=[cond, act1, act2]) self.comment = comment
[docs] def execute(self, **kwargs): self.children_results_ = [ self.children[0].execute(**kwargs), None, None] self.inputs[0].validate(self.children_results_[0]) if self.children_results_[0]: self.children_results_[1] = self.children[1].execute(**kwargs) self.inputs[1].validate(self.children_results_[1]) res = self.children_results_[1] else: self.children_results_[2] = self.children[2].execute(**kwargs) self.inputs[2].validate(self.children_results_[2]) res = self.children_results_[2] return self.output.validate(res)
@AutoAction.cache def _export_c(self, hook=None, result_name=None): if result_name is None: raise ValueError( "result_name must not be None") # pragma: no cover dc = MLAction._export_c(self, hook=hook, result_name=result_name) rows = [dc['code']] dc2 = self.output._export_c(hook='type') op = "{1} {0} = {0}0 ? {0}1 : {0}2;".format(result_name, dc2['code']) rows.append(op) rows.append("// {0}-{1} - done".format(id(self), self.name)) return {'code': "\n".join(rows), 'result_name': result_name}
[docs]class MLActionReturn(MLAction): """ Returns a results. """
[docs] def __init__(self, act): """ @param act action to return """ MLAction.__init__(self, [act.output], act.output, "return", children=[act])
[docs] def execute(self, **kwargs): MLAction.execute(self, **kwargs) res = self.ChildrenResults return self.output.validate(res[0])
@AutoAction.cache def _export_c(self, hook=None, result_name=None): if len(self.children) != 1: raise ValueError( "Only one result can be returned.") # pragma: no cover if result_name is None: raise ValueError( "result_name must not be None") # pragma: no cover dc = self.children[0]._export_c(hook=hook, result_name=result_name) if not dc['cache']: code = dc['code'] else: code = '' add = self.output._copy_c( result_name, result_name[:-1], hook="typeref") code += "\n" + add return {'code': code, 'result_name': result_name}
[docs]class MLActionFunction(MLActionUnary): """ A function. """
[docs] def __init__(self, act, name): """ @param act action @param name name """ if not isinstance(act, MLActionReturn): raise NotImplementedError( # pragma: no cover "Last result must be MLActionReturn.") MLActionUnary.__init__(self, act, name)
[docs] def execute(self, **kwargs): MLActionUnary.execute(self, **kwargs) res = self.ChildrenResults return self.output.validate(res[0])
@AutoAction.cache def _export_c(self, hook=None, result_name=None): if result_name is None: raise ValueError( "result_name must not be None") # pragma: no cover if len(self.children) != 1: raise ValueError( "The function must return one result.") # pragma: no cover if result_name[-1] == '0': raise ValueError( # pragma: no cover "result_name '{0}' cannot end with 0.".format(result_name)) vars = {v.name: v for v in self.enumerate_variables()} vars = [_[1] for _ in list(sorted(vars.items()))] parameters = ", ".join("{0} {1}".format( v.output._export_c(hook='type')['code'], v.name_var) for v in vars) typename = self.children[0].output._export_c( hook='typeref', result_name=result_name)['code'] signature = "int {1} ({0}, {2})".format( typename, self.name, parameters) dc = MLAction._export_c(self, hook=hook, result_name=result_name) code = dc['code'] rows = [signature, "{"] rows.extend(" " + line for line in code.split("\n")) rows.extend( [' return 0;', " // {0}-{1} - done".format(id(self), self.name), '}']) return {'code': "\n".join(rows), 'result_name': result_name}