Source code for skl2onnx.common._topology

# SPDX-License-Identifier: Apache-2.0


import re
import warnings
import pprint
from logging import getLogger
from collections import OrderedDict
import numpy as np
from onnx import onnx_pb as onnx_proto
from onnxconverter_common.data_types import (  # noqa
    DataType, TensorType,
    FloatType, Int64Type, StringType,
    DictionaryType, FloatTensorType,  # noqa
    Int64TensorType, SequenceType,  # noqa
    StringTensorType, DoubleTensorType,
    Int32TensorType, BooleanTensorType,
    DoubleTensorType)
try:
    from onnxconverter_common.data_types import (
        Int8TensorType, UInt8TensorType)
except ImportError:
    Int8TensorType = None
    UInt8TensorType = None
from ..proto import (
    get_opset_number_from_onnx,
    get_latest_tested_opset_version
)
from ..proto.onnx_helper_modified import (
    make_graph, make_model, make_tensor_value_info
)
from . import _registration
from . import utils
from .exceptions import MissingShapeCalculator, MissingConverter
from ._container import ModelComponentContainer, _build_options
from .onnx_optimisation_identity import onnx_remove_node_identity

type_fct = type


def _default_OPSET_TO_IR_VERSION():
    return {
        1: 3, 2: 3, 3: 3, 4: 3, 5: 3, 6: 3,
        7: 3, 8: 4, 9: 4, 10: 5, 11: 6, 12: 7,
        13: 7, 14: 7, 15: 8, 16: 8, 17: 8, 18: 8
    }


try:
    from onnxconverter_common.topology import OPSET_TO_IR_VERSION
    assert OPSET_TO_IR_VERSION[18] is not None
except (ImportError, KeyError):
    OPSET_TO_IR_VERSION = _default_OPSET_TO_IR_VERSION()

OPSET_ML_TO_OPSET = {1: 11, 2: 15, 3: 18}

logger = getLogger('skl2onnx')


def get_default_opset_for_domain(domain):
    """
    Returns the associated for a domain given the main opset.
    """
    from .. import __max_supported_opset__ as main_opset
    if domain == '':
        return main_opset
    if domain == 'ai.onnx.ml':
        if main_opset >= 16:
            return 3
        if main_opset < 6:
            return 1
        return 2
    if domain == 'ai.onnx.training':
        return 1
    return None


[docs]class Variable: """ Defines a variable which holds any data defined from *ONNX* types. """ _UNIQUE_NUMBER_ = 0 def __init__(self, raw_name, onnx_name, scope, type=None): """ :param raw_name: A string indicating the variable's name in the original model. Usually, it's the seed string used to created its ONNX name (i.e., the field *onnx_name* below). :param onnx_name: A string indicating the variable's name in the converted model :param scope: A string. It's the name of the scope where this variable is declared :param type: A type object defined in .common.data_types.py; e.g., FloatTensorType """ if not isinstance(raw_name, str): raise TypeError( "raw_name must be a string not '%s'." % raw_name.__class__) if type is not None and not hasattr(type, 'shape'): raise TypeError( "Unexpected type for variable raw_name=%r, type=%r." % ( raw_name, type)) if not isinstance(onnx_name, str) or '(' in onnx_name: if onnx_name.startswith('u(') and onnx_name[-1] == ')': onnx_name0 = onnx_name if scope is None: onnx_name = "UU%03dUU" % Variable._UNIQUE_NUMBER_ Variable._UNIQUE_NUMBER_ += 1 else: onnx_name = scope.get_unique_variable_name("U") logger.debug( '[Var] rename raw_name=%r, onnx_name=%r into %r', raw_name, onnx_name0, onnx_name) else: raise TypeError( "onnx_name must be a string not %r." % onnx_name) if type is not None: shape = type.shape if shape is not None: not_none = [v for v in shape if v is not None] if len(not_none) and min(not_none) == 0: raise RuntimeError( "A variable cannot be empty, raw_name=%r, " "onnx_name=%r, shape=%r, type=%r." % ( raw_name, onnx_name, shape, type)) self._raw_name = raw_name self._onnx_name = onnx_name self._scope = scope self._type = type self._parent = None # The following fields are bool variables used in parsing and # compiling stages self._is_fed = None self._is_root = None self._is_leaf = None if self.type is not None and not isinstance(self.type, DataType): raise TypeError( "shape must be a DataType not {}.".format(self.type)) if isinstance(self.type, TensorType): shape = self.type.shape if not isinstance(shape, (list, tuple)): try: shape = list(shape) except TypeError: raise TypeError("shape must be a tuple or a list not " "{}.".format(type_fct(shape))) for dim in shape: if dim is None: continue if not isinstance(dim, (int, np.int32, np.int64, np.intc)): raise TypeError( "shape must contains integers not %r (type=%r)." "" % (dim, dim.__class__)) logger.debug('[Var] +%s', self) # links to operators using those variables self.operators_outputs_ = [] self.operators_inputs_ = [] self._check() def _check(self): if self.type is not None and self.type.shape is not None: for k in self.type.shape: if k is None: continue if not isinstance(k, (int, np.integer)): raise ValueError( "Unexpected type %r for shape %r." "" % (type(k), self)) @property def raw_name(self): return self._raw_name @property def onnx_name(self): return self._onnx_name @property def scope(self): return self._scope @property def type(self): return self._type @property def is_fed(self): return self._is_fed @property def is_root(self): return self._is_root @property def is_leaf(self): return self._is_leaf def init_status(self, is_fed=None, is_root=None, is_leaf=None): if is_fed is not None and is_fed != self.is_fed: logger.debug( '[Var] update is_fed=%r for %r, parent=%r', is_fed, self, self._parent) self._is_fed = is_fed if is_root is not None and is_root != self.is_root: logger.debug('[Var] update is_root=%r for %r', is_root, self) self._is_root = is_root if is_leaf is not None and is_leaf != self.is_leaf: logger.debug('[Var] update is_leaf=%r for %r', is_leaf, self) self._is_leaf = is_leaf def __setattr__(self, name, value): if name == "type": self.set_type(value) elif name == "onnx_name": raise AttributeError("You must use method set_onnx_name.") elif name in {"is_fed", "is_root", "is_leaf"}: raise AttributeError("You must use method init_status.") elif name in {'scope', 'raw_name'}: raise AttributeError("scope or raw_name cannot be changed.") self.__dict__[name] = value def set_type(self, new_type): if (new_type is None or isinstance(new_type, (str, Variable)) or not hasattr(new_type, 'shape')): raise TypeError( "Unexpected new type for variable %r, new_type=%r." % ( self, new_type)) logger.debug('[Var] update type for %r', self) self._type = new_type self._check() def set_onnx_name(self, onnx_name): if onnx_name != self._onnx_name: logger.debug( '[Var] update onnx_name, from %r to %r in %r', self.onnx_name, onnx_name, self) if self.scope is not None and not isinstance(self.scope, str): self.scope.rename_onnx_name(self._onnx_name, onnx_name) self._onnx_name = onnx_name def set_parent(self, operator): if self._parent is not None: raise RuntimeError( "This variable is already the output of operator %r. " "It cannot be the output of %r." % (self._parent, operator)) logger.debug( '[Var] set parent for %r, parent=%r', self, operator) self._parent = operator def get_first_dimension(self): """ Returns the first dimension (batch dimension) or None if not specified (shape is empty). """ if (self.type is None or self.type.shape is None or len(self.type.shape) == 0): return None return self.type.shape[0] def get_second_dimension(self): if (self.type is None or self.type.shape is None or len(self.type.shape) < 2): return None return self.type.shape[1] @property def full_name(self): """ Return a globally unique variable ID """ return self.onnx_name def __repr__(self): return ("Variable('{0}', '{1}', type={2})".format( self.raw_name, self.onnx_name, self.type)) @staticmethod def from_pb(obj): """ Creates a data type from a protobuf object. """ def get_dim(d): r = d.dim_value if "dim_param" in str(d): return None if r == 0: # dim_value is 0 when it is 0 or undefined return 0 if "0" in str(d) else None return r def get_shape(tt): return [get_dim(tt.shape.dim[i]) for i in range(len(tt.shape.dim))] if hasattr(obj, 'extend'): return [Variable.from_pb(o) for o in obj] name = obj.name if obj.type.tensor_type: tt = obj.type.tensor_type elem = tt.elem_type shape = get_shape(tt) if elem == onnx_proto.TensorProto.FLOAT: ty = FloatTensorType(shape) elif elem == onnx_proto.TensorProto.BOOL: ty = BooleanTensorType(shape) elif elem == onnx_proto.TensorProto.DOUBLE: ty = DoubleTensorType(shape) elif elem == onnx_proto.TensorProto.STRING: ty = StringTensorType(shape) elif elem == onnx_proto.TensorProto.INT64: ty = Int64TensorType(shape) elif elem == onnx_proto.TensorProto.INT32: ty = Int32TensorType(shape) elif (UInt8TensorType is not None and elem == onnx_proto.TensorProto.UINT8): ty = UInt8TensorType(shape) elif (Int8TensorType is not None and elem == onnx_proto.TensorProto.INT8): ty = Int8TensorType(shape) elif elem == 0: ty = FloatTensorType(shape) else: raise NotImplementedError( "Unsupported type '{}' (elem_type={}).".format( type(obj.type.tensor_type), elem)) else: raise NotImplementedError("Unsupported type '{}' as " "a string ({}).".format( type(obj), obj)) return Variable(name, name, None, ty) def __iter__(self): "Enables expression such as `a,b = self`." yield self.onnx_name yield self.type def __getitem__(self, index): if index == 0: return self.onnx_name if index == 1: return self.type raise IndexError("Unreachable element at index %d." % index) def add_operator(self, op, in_or_out): "Add a link to an operator, True for output, False for input." if in_or_out: self.operators_outputs_.append(op) else: self.operators_inputs_.append(op) def check_compatible_type(self, other_type): def empty_shape(shape): return shape is None or len(shape) == 0 if self.type is None: if other_type is None: return elif other_type is not None: if isinstance(self.type, type(other_type)): if self.type.shape == other_type.shape: return if empty_shape(other_type.shape): return raise TypeError( "Incompatible type for variable %r and type %r." % ( self, other_type))
class VariableStr(Variable): """ Defines a variable a string. This should be avoided. """ def __init__(self, name, scope=None, type=None): Variable.__init__(self, name, name, scope=scope, type=type) @property def raw_name(self): return self._raw_name @property def onnx_name(self): if self._onnx_name.startswith("u("): raise RuntimeError( "Variable should be renamed as onnx_name=%r." "" % self._onnx_name) return self._onnx_name
[docs]class Operator: """ Defines an operator available in *ONNX*. """ class OperatorList(list): def __init__(self, parent, kind): super(Operator.OperatorList, self).__init__() self.parent = parent self.kind = kind def __eq__(self, second): raise NotImplementedError( "Operator equal not implemented and not needed.") def append(self, v): if not isinstance(v, Variable): raise TypeError( "Input and output must be of type Variable not %r." "" % type(v)) if self.kind == 'Out': v.set_parent(self.parent) super(Operator.OperatorList, self).append(v) logger.debug("[Op] add %s %r to %r", self.kind, v, self.parent) if self.kind == 'In': v.add_operator(self.parent, False) elif self.kind == "Out": v.add_operator(self.parent, True) else: raise RuntimeError( "Unexpected value for kind=%r." % self.kind) def extend(self, vs): for v in vs: self.append(v) def __getitem__(self, i): v = list.__getitem__(self, i) if isinstance(i, int) and not isinstance(v, Variable): raise TypeError("Element %d must be a Variable not %r." % ( i, type(v))) return v def __setitem__(self, i, v): raise LookupError( "Setter should not be used to modify an element.") def set_element(self, i, v): "Updates element i." if not isinstance(v, Variable): raise TypeError( "Value v must be a Variable not %r." % type(v)) logger.debug( "[Op] %s-change element %d from %r to %r in %r", self.kind, i, self[i], v, self.parent) list.__setitem__(self, i, v) def to_string(self): names = [] for o in self: if hasattr(o, 'onnx_name'): names.append(o.onnx_name) else: names.append('"%s"' % str(o)) return ",".join(names) def __init__(self, onnx_name, scope, type, raw_operator, target_opset, scope_inst): """ :param onnx_name: A unique ID, which is a string :param scope: The name of the scope where this operator is declared. It's a string. :param type: A object which uniquely characterizes the type of this operator. For example, it can be a string, pooling, if this operator is associated with a CoreML pooling layer. :param raw_operator: The original operator which defines this operator; for example, a scikit-learn Imputer and a CoreML Normalizer. :param target_opset: The target opset number for the converted model. :param scope_inst: :class:`Scope` instance the operator belongs to """ if isinstance(raw_operator, str): raise RuntimeError("Parameter raw_operator must be an object not " "a string '{0}'.".format(raw_operator)) # operator name in the converted model, if raw_operator # is not None, output_shapes can be guessed # from the raw model. Otherwise, it can be guessed # from the input shapes. self.onnx_name = onnx_name self.scope = scope self.type = type self.raw_operator = raw_operator self.inputs = Operator.OperatorList(self, 'In') self.outputs = Operator.OperatorList(self, 'Out') self._is_evaluated = None self.target_opset = target_opset self.scope_inst = scope_inst logger.debug('[Op] +%r', self) def new_raw_operator(self, raw_operator, alias): """ Returns a shallow copy of this operator, changes the raw_operator but keeps the same inputs and outputs. """ op = Operator(self.onnx_name, self.scope, alias, raw_operator, self.target_opset, self.scope_inst) op.inputs = self.inputs op.outputs = self.outputs return op def __repr__(self): try: textop = repr(self.raw_operator) except AttributeError: textop = "MISSING OP" except KeyError: # The line above fails for python 3.7 textop = type(self.raw_operator) if isinstance(textop, str) and "\n" in textop: textop = textop.replace('\n', '').replace(' ', '') return ("Operator(type='{0}', onnx_name='{1}', inputs='{2}', " "outputs='{3}', raw_operator={4})".format( self.type, self.onnx_name, self.inputs.to_string(), self.outputs.to_string(), textop)) def __setattr__(self, name, value): if name in ('inputs', 'outputs'): if (isinstance(value, list) and not isinstance(value, Operator.OperatorList)): if name == 'inputs': self.inputs = Operator.OperatorList(self, 'In') self.inputs.extend(value) return if name == 'outputs': self.outputs = Operator.OperatorList(self, 'Out') self.outputs.extend(value) return if not isinstance(value, Operator.OperatorList): raise TypeError( "inputs or outputs must be of type Operator.OperatorList.") ioo = name == 'outputs' for v in value: v.add_operator(self, ioo) self.__dict__[name] = value @property def is_evaluated(self): return self._is_evaluated def init_status(self, is_evaluated=None): if is_evaluated is not None and is_evaluated != self.is_evaluated: logger.debug( '[Op] update is_evaluated=%r for %r', is_evaluated, self) self._is_evaluated = is_evaluated @property def full_name(self): """ Return a globally unique operator ID """ return self.onnx_name @property def input_full_names(self): """ Return all input variables' names """ return [variable.full_name for variable in self.inputs] @property def output_full_names(self): """ Return all output variables' names """ return [variable.full_name for variable in self.outputs] @property def original_operator(self): """ Return the original operator/layer """ return self.raw_operator def infer_types(self): # Invoke a core inference function if self.type is None: raise MissingShapeCalculator( "Unable to find a shape calculator for type '{}'.".format( type(self.raw_operator))) try: shape_calc = _registration.get_shape_calculator(self.type) except ValueError: raise MissingShapeCalculator( "Unable to find a shape calculator for alias '{}' " "and type '{}'.".format(self.type, type(self.raw_operator))) if shape_calc is None: raise MissingShapeCalculator( "Unexpected shape calculator for alias '{}' " "and type '{}'.".format(self.type, type(self.raw_operator))) logger.debug( "[Shape-a] %r fed %r - %r", self, "".join(str(i.is_fed) for i in self.inputs), "".join(str(i.is_fed) for i in self.outputs)) shape_calc(self) logger.debug( "[Shape-b] %r inputs=%r - outputs=%r", self, self.inputs, self.outputs)
[docs]class Scope: """ Every node of an *ONNX* graph must be unique. This class holds the list of existing name for every node already defined in graph. It also provides functions to create a unique unused name. """ def __init__(self, name, target_opset=None, custom_shape_calculators=None, options=None, registered_models=None, naming=None): """ :param name: A string, the unique ID of this scope in a Topology object :param target_opset: The target opset number for the converted model. :param custom_conversion_functions: a dictionary for specifying the user customized conversion function :param custom_shape_calculators: a dictionary for specifying the user customized shape calculator :param options: see :ref:`l-conv-options` :param naming: the user may want to change the way intermediate are named, this parameter can be a string (a prefix) or a function, which signature is the following: `get_name(name, existing_names)`, the library will then check this name is unique and modify it if not :param registered_models: registered models .. versionchanged:: 1.10.0 Parameter *naming* was added. """ self.name = name self.onnx_variable_names = set() self.onnx_operator_names = set() self.target_opset = target_opset self.custom_shape_calculators = custom_shape_calculators # An one-to-many map from raw variable name to ONNX variable # names. It looks like # (key, value) = (raw_name, [onnx_name, onnx_name1, onnx_name2, ..., onnx_nameN]) # noqa # The last name may hide all other names in this scope. self.variable_name_mapping = {} # A map of local variables defined in this scope. # (key, value) = (onnx_name, variable) self.variables = OrderedDict() self.input_variables = [] self.output_variables = [] # A map of local operators defined in this scope. # (key, value) = (onnx_name, operator) self.operators = {} # Additional options given to converters. self.options = options # Registered models self.registered_models = registered_models self.naming = naming if naming is None: self._naming = Topology._generate_unique_name elif isinstance(naming, str): self._naming = ( lambda seed, names: Topology._generate_unique_name( self.naming + seed, names)) elif callable(self.naming): self._naming = ( lambda seed, names: Topology._generate_unique_name( self.naming(seed, names), names)) else: raise TypeError( "Unexpected type for parameter naming: %r." % type(naming)) def get(self, var_name, default_value): "Returns variable with 'name' or default value is not found." return self.variables.get(var_name, default_value) def has_variable_name(self, name): """ Tells if a variable is already registered. """ return name in self.onnx_variable_names def get_shape_calculator(self, model_type): """ Returns the shape calculator for the given model type. :param model_type: model type such as *LogisticRegression* :return: alias or None if not found """ return self.custom_shape_calculators.get(model_type, None)
[docs] def get_unique_variable_name(self, seed, rename=True): """ Creates a unique variable ID based on the given seed. """ if not isinstance(seed, str): raise TypeError("Parameter seed must be a string not {}." "".format(type(seed))) if rename: name = self._naming(seed, self.onnx_variable_names) else: name = Topology._generate_unique_name( seed, self.onnx_variable_names) return name
[docs] def get_unique_operator_name(self, seed): """ Creates a unique operator ID based on the given seed. """ return self._naming(seed, self.onnx_operator_names)
def declare_local_variable(self, raw_name, type=None, prepend=False, missing_type=False, rename=True): """ This function may create a new variable in this scope. If *raw_name* has been used to create other variables, the new variable will hide all other variables created using *raw_name*. """ if type is None and not missing_type: raise RuntimeError( "Unknown type for %r (type=%r)." % (raw_name, type)) # Get unique ID for the new variable onnx_name = self.get_unique_variable_name(raw_name, rename=rename) # Create the variable variable = Variable(raw_name, onnx_name, self.name, type) self.register_variable(variable, prepend=prepend) return variable def register_variable(self, var, prepend=False): "Adds a variable to the scope." if var.onnx_name in self.variables: raise RuntimeError( "Variable %r already registered (other=%r)." % ( var, self.variables[var.onnx_name])) if var.raw_name in self.variable_name_mapping: # Hide existing variables with the same raw_name if not prepend: self.variable_name_mapping[var.raw_name].append(var.onnx_name) else: self.variable_name_mapping[var.raw_name].insert( 0, var.onnx_name) else: self.variable_name_mapping[var.raw_name] = [var.onnx_name] self.variables[var.onnx_name] = var def declare_existing_subgraph_name(self, graph_proto): """ Declare all name from a subgraph in order to avoid being picked twice. """ output_name = {o.name for o in graph_proto.output} for node in graph_proto.node: for name in node.output: if name in output_name: continue if self.has_variable_name(name): raise NameError( "Result name %r is already taken (outputs=%r) " "(node=%r)." % ( name, output_name, node)) self.onnx_variable_names.add(name) if node.name in self.onnx_operator_names: raise NameError( "Operator name %r is already taken " "(node=%r)." % ( node.name, node)) self.onnx_operator_names.add(node.name) def rename_onnx_name(self, old_name, new_name): if new_name in self.variables: raise RuntimeError( "Name %r already in variables (%r)." % ( new_name, self.variables[new_name])) if old_name not in self.variables: raise RuntimeError( "Unable to find name %r in variables." % old_name) logger.debug( '[Scope] update onnx_name, from %r to %r', old_name, new_name) self.variables[new_name] = self.variables[old_name] del self.variables[old_name] def declare_local_input(self, raw_name, type=None, prepend=False, rename=True): """ Calls `declare_local_variable`. Registers this variable as an input. """ var = self.declare_local_variable( raw_name, type=type, prepend=prepend, rename=rename) self.input_variables.append(var) return var def declare_local_output(self, raw_name, type=None, prepend=False, missing_type=False): """ Calls `declare_local_variable`. Registers this variable as an output. """ var = self.declare_local_variable( raw_name, type=type, prepend=prepend, missing_type=missing_type) self.output_variables.append(var) return var def declare_local_operator(self, type, raw_model=None): """ This function is used to declare new local operator. """ onnx_name = self.get_unique_operator_name(str(type)) operator = Operator(onnx_name, self.name, type, raw_model, self.target_opset, scope_inst=self) self.operators[onnx_name] = operator return operator def _get_allowed_options(self, model, fail=True): if self.registered_models is not None: if type(model) not in self.registered_models['aliases']: if fail: raise NotImplementedError( "No registered models, no known allowed options " "for model '{}'.".format(model.__class__.__name__)) return {} alias = self.registered_models['aliases'][type(model)] conv = self.registered_models['conv'][alias] allowed = conv.get_allowed_options() return allowed raise NotImplementedError( "No registered models, no known allowed options " "for model '{}'.".format(model.__class__.__name__)) def add_options(self, model_id, options): """ Adds an option, for example, ``add_options(id(clr), {'raw_scores': True})`` tells the converter associated to ``clr`` to use raw score instead of probabilities. :param model_id: class or ``id(instance)`` :param options: dictionary with the new values """ if options is None: return if self.options is None: self.options = {} if model_id not in self.options: self.options[model_id] = None if self.options[model_id] is None: self.options[model_id] = {} self.options[model_id].update(options) def get_options(self, model, default_values=None, fail=True): """ Returns additional options for a model. It first looks by class then by id (``id(model)``). :param model: model being converted :param default_values: default options (it is modified by the function) :param fail: fails if option it not found :return: dictionary """ return _build_options( model, self.options, default_values, self._get_allowed_options(model, fail=fail), fail=fail) def replace_raw_operator(self, op1, op2, alias): """ Replaces every raw operator op1 by op2. The function uses `id()` to detect op1. """ for v in self.operators.values(): if id(v.raw_operator) == id(op1): logger.debug( '[Scope] replace %d by %d in %r.', id(v.raw_operator), id(op1), v) v.raw_operator = op2 v.type = alias
[docs]class Topology: """ Holds instances on :class:`Scope <skl2onnx.common._topology.Scope>` and :class:`SklearnModelContainer <skl2onnx.common._container.SklearnModelContainer>`. These are filled by the converters while a pipeline is being converted. """ def __init__(self, model, default_batch_size=1, initial_types=None, target_opset=None, custom_conversion_functions=None, custom_shape_calculators=None, registered_models=None): """ Initializes a *Topology* object, which is an intermediate representation of a computational graph. :param model: RawModelContainer object or one of its derived classes. It contains the original model. :param default_batch_size: batch_size prepend to scalar and array types from CoreML. It's usually 1 or None. :param initial_types: A list providing some types for some root variables. Each element is a tuple of a variable name and a type defined in *data_types.py*. :param custom_conversion_functions: a dictionary for specifying the user customized conversion function :param custom_shape_calculators: a dictionary for specifying the user customized shape calculator :param registered_models: registered models """ self.scopes = [] self.raw_model = model self.scope_names = set() self.initial_types = initial_types if initial_types else list() self.default_batch_size = default_batch_size self.target_opset = target_opset self.custom_conversion_functions = ( custom_conversion_functions if custom_conversion_functions else {}) self.custom_shape_calculators = ( custom_shape_calculators if custom_shape_calculators else {}) for k in self.custom_conversion_functions: if not callable(k): raise TypeError("Keys in custom_conversion_functions must be " "types not strings.") for k in self.custom_shape_calculators: if not callable(k): raise TypeError("Keys in custom_shape_calculators must be " "types not strings.") # A map of local overwritten model aliases. self.model_aliases = {} all_model_types = (set(self.custom_conversion_functions) | set(self.custom_shape_calculators)) for mtype in all_model_types: alias = "{}_{}".format(mtype.__name__, id(self)) self.model_aliases[mtype] = alias # Registered models if registered_models is None: raise AssertionError() self.registered_models = registered_models @property def scope(self): if len(self.scopes) != 1: raise RuntimeError( "Only one scope is allowed not %d." % len(self.scopes)) return self.scopes[0] @staticmethod def _generate_unique_name(seed, existing_names): """ Produce an unique string based on the seed :param seed: a string :param existing_names: a set containing strings which cannot be produced :return: a string similar to the seed """ if seed == '': raise ValueError('Name seed must be a non-empty string.') # Make the seed meet C-style naming convention # Only alphabets and numbers are allowed seed = re.sub('[^\\w+]', '_', seed) # The first symbol cannot be a number if re.match('^[0-9]', seed): seed = '_' + seed # If seed has never been seen, we return it as it is. Otherwise, # we will append an number to make it unique. if seed not in existing_names: existing_names.add(seed) return seed else: i = 1 while seed + str(i) in existing_names: i += 1 new_name = seed + str(i) existing_names.add(new_name) return new_name def get_unique_scope_name(self, seed): return Topology._generate_unique_name(seed, self.scope_names) def declare_scope(self, seed, parent_scopes=None, options=None, naming=None): """ Creates a new :class:`Scope <skl2onnx.common._topology.Scope>` and appends it to the list of existing scopes. """ if len(self.scopes) != 0: raise RuntimeError( "Only one scope can be created.") scope = Scope( self.get_unique_scope_name(seed), target_opset=self.target_opset, custom_shape_calculators=self.custom_shape_calculators, options=options, registered_models=self.registered_models, naming=naming) # Declare input variables. # They should be the inputs of the scikit-learn # model you want to convert into ONNX. for var_name, initial_type in self.initial_types: scope.declare_local_input(var_name, initial_type, rename=False) self.scopes.append(scope) return scope def unordered_operator_iterator(self): for scope in self.scopes: for operator in scope.operators.values(): yield operator def unordered_variable_iterator(self): for scope in self.scopes: for variable in scope.variables.values(): yield variable def call_converter(self, operator, container, verbose=0): "Calls converter for operator *operator*." mtype = type(operator.raw_operator) if mtype in self.custom_conversion_functions: conv = self.custom_conversion_functions[mtype] elif operator.type in self.custom_conversion_functions: conv = self.custom_conversion_functions[operator.type] elif hasattr(operator.raw_operator, "onnx_converter"): conv = operator.raw_operator.onnx_converter() else: # Convert the selected operator into some ONNX objects and # save them into the container try: conv = _registration.get_converter(operator.type) except ValueError: raise MissingConverter( "Unable to find converter for alias '{}' type " "'{}'. You may raise an issue at " "https://github.com/onnx/sklearn-onnx/issues." "".format(operator.type, type(getattr(operator, 'raw_model', None)))) container.validate_options(operator) if verbose > 0: print("[call_converter] call converter for %r." % operator.type) logger.debug( "[Conv] call %r fed %r - %r", operator, "".join(str(i.is_fed) for i in operator.inputs), "".join(str(i.is_fed) for i in operator.outputs)) conv(self.scopes[0], operator, container) logger.debug("[Conv] end - %r", operator) def call_shape_calculator(self, operator): "Calls shape_calculator for operator *operator*." mtype = type(operator.raw_operator) if mtype in self.custom_shape_calculators: # overwritten operator. source = 'custom' shape_calc = self.custom_shape_calculators[mtype] elif operator.type in self.custom_shape_calculators: source = 'custom' shape_calc = self.custom_shape_calculators[operator.type] elif hasattr(operator.raw_operator, "onnx_shape_calculator"): source = 'onnx_shape_calculator' shape_calc = operator.raw_operator.onnx_shape_calculator() else: source = "" shape_calc = None if shape_calc is not None: logger.debug( "[Shape1] %r fed %r - %r (source=%r)", operator, ",".join(str(i.is_fed) for i in operator.inputs), ",".join(str(i.is_fed) for i in operator.outputs), source) shape_calc(operator) else: logger.debug('[Shape2] call infer_types for %r', operator) operator.infer_types() def _initialize_graph_status_for_traversing(self): """ Initialize the status of all variables and operators before traversing the graph. Only used by convert_operators. """ if len(self.scopes) != 1: raise RuntimeError( "Only one scope is allowed not %d." % len(self.scopes)) input_names = set(v.onnx_name for v in self.scopes[0].input_variables) if len(input_names) == 0: raise RuntimeError("No detected inputs.") for variable in self.unordered_variable_iterator(): is_input = variable.onnx_name in input_names variable.init_status(is_fed=is_input) for operator in self.unordered_operator_iterator(): operator.init_status(is_evaluated=False) def _propagate_status(self, operator, container, fed_variables, verbose=0): """ Propagates status *is_fed* based on output variable and node added in the container. """ if verbose > 1: print("[_propagate_status] after op=%r" % operator) vars = {} for node in container.nodes: for i in node.input: if i not in vars: vars[i] = [] vars[i].append(node) if verbose > 1: print("[_propagate_status] newly fed=%r" % list( v.onnx_name for v in operator.outputs if v.is_fed)) stack = list(fed_variables) scope = self.scopes[0] while len(stack) > 0: nodes = {} for name in stack: if name not in vars: continue for n in vars[name]: nodes[id(n)] = n stack = [] for node in nodes.values(): if all(fed_variables.get(n, False) for n in node.input): for o in node.output: if o not in fed_variables: if verbose > 1: print("[_propagate_status] add=%r" % o) fed_variables[o] = o stack.append(o) if o in scope.variables: var = scope.variables[o] var.init_status(is_fed=True) if verbose > 1: print("[_propagate_status] fed=%r" % var) def convert_operators(self, container=None, verbose=0): """ Calls all converters and shape_calculator for existing operators. It also processes new operators created by converters. """ def _check_operator_(operator): if not isinstance(operator.inputs, Operator.OperatorList): raise TypeError( "operator.inputs must be a Operator.OperatorList " "not %r." % type(operator.inputs)) if not isinstance(operator.outputs, Operator.OperatorList): raise TypeError( "operator.outputs must be a Operator.OperatorList " "not %r." % type(operator.outputs)) if any(not isinstance(i, Variable) for i in operator.inputs): raise TypeError( "One input is not a Variable for operator %r - %r." "" % (type(operator.raw_operator), operator)) if any(not isinstance(i, Variable) for i in operator.outputs): raise TypeError( "One output is not a Variable for operator %r - %r." "" % (type(operator.raw_operator), operator)) def _check_variable_in_(variable, operator): idop = id(operator) ids = set(id(op) for op in variable.operators_inputs_) if idop not in ids: raise RuntimeError( "Operator %r not registered in the list of operators " "of %r taking it as an input [\n%s]." % ( operator, variable, "\n".join(map(str, variable.operators_inputs_)))) def _check_variable_out_(variable, operator): if variable.is_fed: add = ["", "--DEBUG-INFO--"] for scope in self.scopes: add.append('---') add.append(pprint.pformat( scope.variable_name_mapping)) add.append('---') for var in scope.variables.values(): add.append(" is_fed=%s %s - n_in=%d n_out=%d" % ( getattr(var, 'is_fed', '?'), var, len(var.operators_inputs_), len(var.operators_outputs_))) add.append('---') for op in scope.operators.values(): add.append(" is_evaluated=%s %s" % ( getattr(op, 'is_evaluated', '?'), op)) add.append('---') for v in operator.inputs: add.append(" inputs={}".format(v)) for v in operator.outputs: add.append(" outputs={}".format(v)) add.append('--- operator producing this variable--') for op in variable.operators_outputs_: add.append(str(op)) raise RuntimeError( "A variable is already assigned ({}) " "for operator '{}' (name='{}'). " "operator.is_evaluated={}, inputs.is_fed={}, " "outputs.is_fed={}. " "This may still happen if a converter is a " "combination of sub-estimators and one " "of them is producing this output. " "In that case, an identity node must be " "added.{}".format( variable, operator.type, operator.onnx_name, operator.is_evaluated, [v.is_fed for v in operator.inputs], [v.is_fed for v in operator.outputs], "\n".join(add))) if verbose > 0: print("[convert_operators] begin") self._initialize_graph_status_for_traversing() fed_variables = {i.name: i for i in container.initializers} changes = 1 n_iter = 0 while changes > 0: n_iter += 1 changes = 0 ops = list(self.unordered_operator_iterator()) if verbose > 0: print("[convert_operators] iteration %d - n_vars=%d " "n_ops=%d" % ( n_iter, len(fed_variables), len(ops))) for operator in ops: _check_operator_(operator) for var in operator.inputs: if var.is_fed: fed_variables[var.onnx_name] = var if (all(variable.is_fed for variable in operator.inputs) and not operator.is_evaluated): for variable in operator.inputs: _check_variable_in_(variable, operator) for variable in operator.outputs: _check_variable_out_(variable, operator) self.call_shape_calculator(operator) self.call_converter(operator, container, verbose=verbose) # If an operator contains a sequence of operators, # output variables are not necessarily known at this stage. operator.init_status(is_evaluated=True) for variable in operator.outputs: if all(op.is_evaluated for op in variable.operators_outputs_): variable.init_status(is_fed=True) fed_variables[variable.onnx_name] = variable fed_variables.update( {i.name: i for i in container.initializers if i.name not in fed_variables}) self._propagate_status(operator, container, fed_variables, verbose=verbose) # unfed some variables (it happens when a node # shares an output with another node) rem = [] for n, var in fed_variables.items(): if not hasattr(var, 'operators_outputs_'): # initializer continue if any(not o.is_evaluated for o in var.operators_outputs_): rem.append(n) for r in rem: v = fed_variables[r] v.init_status(is_fed=False) del fed_variables[v.onnx_name] changes += 1 if verbose > 0: print("[convert_operators] end iter: %d - n_vars=%d" % ( n_iter, len(fed_variables))) if verbose > 0: print("[convert_operators] end.") # Last verification. not_evaluated = [] for op in self.unordered_operator_iterator(): if not op.is_evaluated: not_evaluated.append(op) if len(not_evaluated) > 0: rows = ["---VARS---"] for var in self.unordered_variable_iterator(): rows.append( "is_fed=%r is_leaf=%r is_root=%r - %r - n_in=%d n_out=%d" "" % (var.is_fed, var.is_leaf, var.is_root, var, len(var.operators_inputs_), len(var.operators_outputs_))) rows.append("---OPERATORS---") for op in self.unordered_operator_iterator(): rows.append("is_eval=%r - %r" % (op.is_evaluated, op)) rows.append("---NODES---") for node in container.nodes: rows.append("%s: %r -> %r" % ( node.op_type, node.input, node.output)) raise RuntimeError( "Not all operators have been evaluated. A variable name " "is probably misspelled.\n%s" "" % "\n".join(rows)) # Input and output if len(self.scopes[0].input_variables) > 0: inputs = self.scopes[0].input_variables else: inputs = [v for v in self.unordered_variable_iterator() if v.is_root] for i in inputs: container.add_input(i) outputs = [v for v in self.unordered_variable_iterator() if v.is_leaf] # The function checks that for output variable, # raw_name equal onnx_name. It swaps names if it is not the case. to_swap = [] for out in outputs: if out.raw_name != out.onnx_name: to_swap.append(out) if len(to_swap) != 0: swaped = set() for var in to_swap: if var.raw_name in swaped: continue swaped.add(var.raw_name) if verbose > 1: print("[convert_operators] %r <-> %r." % ( var.raw_name, var.onnx_name)) old_name = var.onnx_name new_name = var.raw_name try: container.swap_names(old_name, new_name) except NotImplementedError as e: logger.debug( '[Topo] unable to swap %r and %r (%r).', old_name, new_name, e) continue for v in self.unordered_variable_iterator(): if v.onnx_name == old_name: v.set_onnx_name(new_name) elif v.onnx_name == new_name: v.set_onnx_name(old_name) for o in outputs: container.add_output(o)
[docs]def convert_topology(topology, model_name, doc_string, target_opset, channel_first_inputs=None, options=None, remove_identity=True, verbose=0): """ This function is used to convert our Topology object defined in _parser.py into a ONNX model (type: ModelProto). :param topology: The Topology object we are going to convert :param model_name: GraphProto's name. Let "model" denote the returned model. The string "model_name" would be assigned to "model.graph.name." :param doc_string: A string attached to the produced model :param target_opset: number or dictionary, for example, 7 for ONNX 1.2, and 8 for ONNX 1.3, a dictionary is used to indicate different opset for different domains :param options: see :ref:`l-conv-options` :param remove_identity: removes identity nodes include '1.1.2', '1.2', and so on. :param verbose: displays information while converting :return: a ONNX ModelProto """ if target_opset is None: target_opset = get_latest_tested_opset_version() if isinstance(target_opset, dict): onnx_target_opset = target_opset.get( '', get_latest_tested_opset_version()) else: onnx_target_opset = target_opset if onnx_target_opset > get_opset_number_from_onnx(): found = get_opset_number_from_onnx() raise RuntimeError( "Parameter target_opset {} > {} is higher than the " "version of the installed onnx package. See " "https://github.com/onnx/onnx/blob/master/docs/" "Versioning.md#released-versions" ".".format(onnx_target_opset, found)) if onnx_target_opset > get_latest_tested_opset_version(): warnings.warn( "Parameter target_opset {} > {} is higher than the " "the latest tested version" ".".format( onnx_target_opset, get_latest_tested_opset_version())) container = ModelComponentContainer( target_opset, options=options, registered_models=topology.registered_models, white_op=topology.raw_model._white_op, black_op=topology.raw_model._black_op, verbose=verbose) # Traverse the graph from roots to leaves # This loop could eventually be parallelized. topology.convert_operators(container=container, verbose=verbose) container.ensure_topological_order() if len(container.inputs) == 0: raise RuntimeError("No detected inputs after conversion.") if len(container.outputs) == 0: raise RuntimeError("No detected outputs after conversion.") if verbose >= 2: print("---NODES---") for node in container.nodes: print(" %s - %s: %r -> %r" % ( node.op_type, node.name, node.input, node.output)) # Create a graph from its main components if container.target_opset_onnx < 9: # When calling ModelComponentContainer's add_initializer(...), # nothing is added into the input list. However, for ONNX target # opset < 9, initializers should also be a part of model's # (GraphProto) inputs. Thus, we create ValueInfoProto objects # from initializers (type: TensorProto) directly and then add # them into model's input list. extra_inputs = [] # ValueInfoProto list of the initializers for tensor in container.initializers: # Sometimes (especially when creating optional input values # such as RNN's initial hidden state), an initializer is also # one of the original model's input, so it has been added into # the container's input list. If this is the case, we need to # skip one iteration to avoid duplicated inputs. if tensor.name in [value_info.name for value_info in container.inputs]: continue # Initializers are always tensors so we can just call # make_tensor_value_info(...). value_info = make_tensor_value_info( tensor.name, tensor.data_type, tensor.dims) extra_inputs.append(value_info) # Before ONNX opset 9, initializers were needed to be passed in # with inputs. graph = make_graph(container.nodes, model_name, container.inputs + extra_inputs, container.outputs, container.initializers) else: # In ONNX opset 9 and above, initializers are included as # operator inputs and therefore do not need to be passed as # extra_inputs. graph = make_graph( container.nodes, model_name, container.inputs, container.outputs, container.initializers) # Add extra information related to the graph graph.value_info.extend(container.value_info) # Create model onnx_model = make_model(graph) # Update domain version opv = min(onnx_target_opset, _get_main_opset_version(onnx_model) or onnx_target_opset) if not _update_domain_version(container, onnx_model, verbose=verbose): # Main opset was not added. Doing it here. op_set = onnx_model.opset_import.add() op_set.domain = '' op_set.version = opv if verbose > 0: print('[convert_topology] +opset: name=%r, version=%s' % ( '', opv)) # Add extra information irv = OPSET_TO_IR_VERSION.get(opv, onnx_proto.IR_VERSION) onnx_model.ir_version = irv onnx_model.producer_name = utils.get_producer() onnx_model.producer_version = utils.get_producer_version() onnx_model.domain = utils.get_domain() onnx_model.model_version = utils.get_model_version() onnx_model.doc_string = doc_string # Removes many identity nodes, # the converter may introduct identity nodes # after a zipmap operator and onnx <= 1.7 does not # support that. It does not use onnxconverter-common # as the optimizer only support opset >= 9. if remove_identity: onnx_model = onnx_remove_node_identity(onnx_model) return onnx_model
def _update_domain_version(container, onnx_model, verbose=0): # Merge operator sets for the same domain, the largest version # number would be kept purified_operator_set = dict() for op_domain, op_version in container.node_domain_version_pair_sets: if op_domain not in purified_operator_set: purified_operator_set[op_domain] = op_version else: purified_operator_set[op_domain] = max( purified_operator_set[op_domain], op_version) # Fill operator sets i = 0 for op_domain, op_version in purified_operator_set.items(): if op_version is None: continue if i == 0 and len(onnx_model.opset_import) == 1: # Overwrite the default operator set created by # make_model(...) op_set = onnx_model.opset_import[0] else: # Just create one ONNX element in opset_import op_set = onnx_model.opset_import.add() if verbose > 0: print('[_update_domain_version] +opset %d: name=%r, version=%s' % ( i, op_domain, op_version)) op_set.domain = op_domain if op_set != '': max_supported = get_default_opset_for_domain(op_domain) if max_supported is not None and max_supported < op_version: raise RuntimeError( "The model is using version %d of domain %r not supported " "yet by this library. You need to specify " "target_opset={%r: %r}." % ( op_version, op_domain, op_domain, max_supported)) op_set.version = op_version i += 1 if container.target_opset_any_domain(op_domain) < op_version: raise RuntimeError( 'The specified opset %d is too low to convert ' 'this model, which requires at least opset ' '%d.' % ( container.target_opset_any_domain(op_domain), op_version)) return '' in purified_operator_set def _get_main_opset_version(model): """ Returns the main opset version. """ mld = None for op in model.opset_import: if op.domain == '': return op.version if op.domain == "ai.onnx.ml": mld = op.version if mld is not None: return OPSET_ML_TO_OPSET.get(mld, None) return None