# SPDX-License-Identifier: Apache-2.0
import re
import warnings
import pprint
from logging import getLogger
from collections import OrderedDict
import numpy as np
from onnx import onnx_pb as onnx_proto
from onnxconverter_common.data_types import ( # noqa
DataType, TensorType,
FloatType, Int64Type, StringType,
DictionaryType, FloatTensorType, # noqa
Int64TensorType, SequenceType, # noqa
StringTensorType, DoubleTensorType,
Int32TensorType, BooleanTensorType,
DoubleTensorType)
try:
from onnxconverter_common.data_types import (
Int8TensorType, UInt8TensorType)
except ImportError:
Int8TensorType = None
UInt8TensorType = None
from ..proto import (
get_opset_number_from_onnx,
get_latest_tested_opset_version
)
from ..proto.onnx_helper_modified import (
make_graph, make_model, make_tensor_value_info
)
from . import _registration
from . import utils
from .exceptions import MissingShapeCalculator, MissingConverter
from ._container import ModelComponentContainer, _build_options
from .onnx_optimisation_identity import onnx_remove_node_identity
type_fct = type
def _default_OPSET_TO_IR_VERSION():
return {
1: 3, 2: 3, 3: 3, 4: 3, 5: 3, 6: 3,
7: 3, 8: 4, 9: 4, 10: 5, 11: 6, 12: 7,
13: 7, 14: 7, 15: 8, 16: 8, 17: 8, 18: 8
}
try:
from onnxconverter_common.topology import OPSET_TO_IR_VERSION
assert OPSET_TO_IR_VERSION[18] is not None
except (ImportError, KeyError):
OPSET_TO_IR_VERSION = _default_OPSET_TO_IR_VERSION()
OPSET_ML_TO_OPSET = {1: 11, 2: 15, 3: 18}
logger = getLogger('skl2onnx')
def get_default_opset_for_domain(domain):
"""
Returns the associated for a domain given the main opset.
"""
from .. import __max_supported_opset__ as main_opset
if domain == '':
return main_opset
if domain == 'ai.onnx.ml':
if main_opset >= 16:
return 3
if main_opset < 6:
return 1
return 2
if domain == 'ai.onnx.training':
return 1
return None
[docs]class Variable:
"""
Defines a variable which holds any data defined
from *ONNX* types.
"""
_UNIQUE_NUMBER_ = 0
def __init__(self, raw_name, onnx_name, scope, type=None):
"""
:param raw_name: A string indicating the variable's name in the
original model. Usually, it's the seed string
used to created its ONNX name (i.e., the
field *onnx_name* below).
:param onnx_name: A string indicating the variable's name in
the converted model
:param scope: A string. It's the name of the scope where this
variable is declared
:param type: A type object defined in .common.data_types.py;
e.g., FloatTensorType
"""
if not isinstance(raw_name, str):
raise TypeError(
"raw_name must be a string not '%s'." % raw_name.__class__)
if type is not None and not hasattr(type, 'shape'):
raise TypeError(
"Unexpected type for variable raw_name=%r, type=%r." % (
raw_name, type))
if not isinstance(onnx_name, str) or '(' in onnx_name:
if onnx_name.startswith('u(') and onnx_name[-1] == ')':
onnx_name0 = onnx_name
if scope is None:
onnx_name = "UU%03dUU" % Variable._UNIQUE_NUMBER_
Variable._UNIQUE_NUMBER_ += 1
else:
onnx_name = scope.get_unique_variable_name("U")
logger.debug(
'[Var] rename raw_name=%r, onnx_name=%r into %r',
raw_name, onnx_name0, onnx_name)
else:
raise TypeError(
"onnx_name must be a string not %r." % onnx_name)
if type is not None:
shape = type.shape
if shape is not None:
not_none = [v for v in shape if v is not None]
if len(not_none) and min(not_none) == 0:
raise RuntimeError(
"A variable cannot be empty, raw_name=%r, "
"onnx_name=%r, shape=%r, type=%r." % (
raw_name, onnx_name, shape, type))
self._raw_name = raw_name
self._onnx_name = onnx_name
self._scope = scope
self._type = type
self._parent = None
# The following fields are bool variables used in parsing and
# compiling stages
self._is_fed = None
self._is_root = None
self._is_leaf = None
if self.type is not None and not isinstance(self.type, DataType):
raise TypeError(
"shape must be a DataType not {}.".format(self.type))
if isinstance(self.type, TensorType):
shape = self.type.shape
if not isinstance(shape, (list, tuple)):
try:
shape = list(shape)
except TypeError:
raise TypeError("shape must be a tuple or a list not "
"{}.".format(type_fct(shape)))
for dim in shape:
if dim is None:
continue
if not isinstance(dim, (int, np.int32, np.int64, np.intc)):
raise TypeError(
"shape must contains integers not %r (type=%r)."
"" % (dim, dim.__class__))
logger.debug('[Var] +%s', self)
# links to operators using those variables
self.operators_outputs_ = []
self.operators_inputs_ = []
self._check()
def _check(self):
if self.type is not None and self.type.shape is not None:
for k in self.type.shape:
if k is None:
continue
if not isinstance(k, (int, np.integer)):
raise ValueError(
"Unexpected type %r for shape %r."
"" % (type(k), self))
@property
def raw_name(self):
return self._raw_name
@property
def onnx_name(self):
return self._onnx_name
@property
def scope(self):
return self._scope
@property
def type(self):
return self._type
@property
def is_fed(self):
return self._is_fed
@property
def is_root(self):
return self._is_root
@property
def is_leaf(self):
return self._is_leaf
def init_status(self, is_fed=None, is_root=None, is_leaf=None):
if is_fed is not None and is_fed != self.is_fed:
logger.debug(
'[Var] update is_fed=%r for %r, parent=%r',
is_fed, self, self._parent)
self._is_fed = is_fed
if is_root is not None and is_root != self.is_root:
logger.debug('[Var] update is_root=%r for %r', is_root, self)
self._is_root = is_root
if is_leaf is not None and is_leaf != self.is_leaf:
logger.debug('[Var] update is_leaf=%r for %r', is_leaf, self)
self._is_leaf = is_leaf
def __setattr__(self, name, value):
if name == "type":
self.set_type(value)
elif name == "onnx_name":
raise AttributeError("You must use method set_onnx_name.")
elif name in {"is_fed", "is_root", "is_leaf"}:
raise AttributeError("You must use method init_status.")
elif name in {'scope', 'raw_name'}:
raise AttributeError("scope or raw_name cannot be changed.")
self.__dict__[name] = value
def set_type(self, new_type):
if (new_type is None or isinstance(new_type, (str, Variable)) or
not hasattr(new_type, 'shape')):
raise TypeError(
"Unexpected new type for variable %r, new_type=%r." % (
self, new_type))
logger.debug('[Var] update type for %r', self)
self._type = new_type
self._check()
def set_onnx_name(self, onnx_name):
if onnx_name != self._onnx_name:
logger.debug(
'[Var] update onnx_name, from %r to %r in %r',
self.onnx_name, onnx_name, self)
if self.scope is not None and not isinstance(self.scope, str):
self.scope.rename_onnx_name(self._onnx_name, onnx_name)
self._onnx_name = onnx_name
def set_parent(self, operator):
if self._parent is not None:
raise RuntimeError(
"This variable is already the output of operator %r. "
"It cannot be the output of %r." % (self._parent, operator))
logger.debug(
'[Var] set parent for %r, parent=%r', self, operator)
self._parent = operator
def get_first_dimension(self):
"""
Returns the first dimension (batch dimension) or
None if not specified (shape is empty).
"""
if (self.type is None or self.type.shape is None or
len(self.type.shape) == 0):
return None
return self.type.shape[0]
def get_second_dimension(self):
if (self.type is None or self.type.shape is None or
len(self.type.shape) < 2):
return None
return self.type.shape[1]
@property
def full_name(self):
"""
Return a globally unique variable ID
"""
return self.onnx_name
def __repr__(self):
return ("Variable('{0}', '{1}', type={2})".format(
self.raw_name, self.onnx_name, self.type))
@staticmethod
def from_pb(obj):
"""
Creates a data type from a protobuf object.
"""
def get_dim(d):
r = d.dim_value
if "dim_param" in str(d):
return None
if r == 0:
# dim_value is 0 when it is 0 or undefined
return 0 if "0" in str(d) else None
return r
def get_shape(tt):
return [get_dim(tt.shape.dim[i])
for i in range(len(tt.shape.dim))]
if hasattr(obj, 'extend'):
return [Variable.from_pb(o) for o in obj]
name = obj.name
if obj.type.tensor_type:
tt = obj.type.tensor_type
elem = tt.elem_type
shape = get_shape(tt)
if elem == onnx_proto.TensorProto.FLOAT:
ty = FloatTensorType(shape)
elif elem == onnx_proto.TensorProto.BOOL:
ty = BooleanTensorType(shape)
elif elem == onnx_proto.TensorProto.DOUBLE:
ty = DoubleTensorType(shape)
elif elem == onnx_proto.TensorProto.STRING:
ty = StringTensorType(shape)
elif elem == onnx_proto.TensorProto.INT64:
ty = Int64TensorType(shape)
elif elem == onnx_proto.TensorProto.INT32:
ty = Int32TensorType(shape)
elif (UInt8TensorType is not None and
elem == onnx_proto.TensorProto.UINT8):
ty = UInt8TensorType(shape)
elif (Int8TensorType is not None and
elem == onnx_proto.TensorProto.INT8):
ty = Int8TensorType(shape)
elif elem == 0:
ty = FloatTensorType(shape)
else:
raise NotImplementedError(
"Unsupported type '{}' (elem_type={}).".format(
type(obj.type.tensor_type), elem))
else:
raise NotImplementedError("Unsupported type '{}' as "
"a string ({}).".format(
type(obj), obj))
return Variable(name, name, None, ty)
def __iter__(self):
"Enables expression such as `a,b = self`."
yield self.onnx_name
yield self.type
def __getitem__(self, index):
if index == 0:
return self.onnx_name
if index == 1:
return self.type
raise IndexError("Unreachable element at index %d." % index)
def add_operator(self, op, in_or_out):
"Add a link to an operator, True for output, False for input."
if in_or_out:
self.operators_outputs_.append(op)
else:
self.operators_inputs_.append(op)
def check_compatible_type(self, other_type):
def empty_shape(shape):
return shape is None or len(shape) == 0
if self.type is None:
if other_type is None:
return
elif other_type is not None:
if isinstance(self.type, type(other_type)):
if self.type.shape == other_type.shape:
return
if empty_shape(other_type.shape):
return
raise TypeError(
"Incompatible type for variable %r and type %r." % (
self, other_type))
class VariableStr(Variable):
"""
Defines a variable a string. This should be avoided.
"""
def __init__(self, name, scope=None, type=None):
Variable.__init__(self, name, name, scope=scope, type=type)
@property
def raw_name(self):
return self._raw_name
@property
def onnx_name(self):
if self._onnx_name.startswith("u("):
raise RuntimeError(
"Variable should be renamed as onnx_name=%r."
"" % self._onnx_name)
return self._onnx_name
[docs]class Operator:
"""
Defines an operator available in *ONNX*.
"""
class OperatorList(list):
def __init__(self, parent, kind):
super(Operator.OperatorList, self).__init__()
self.parent = parent
self.kind = kind
def __eq__(self, second):
raise NotImplementedError(
"Operator equal not implemented and not needed.")
def append(self, v):
if not isinstance(v, Variable):
raise TypeError(
"Input and output must be of type Variable not %r."
"" % type(v))
if self.kind == 'Out':
v.set_parent(self.parent)
super(Operator.OperatorList, self).append(v)
logger.debug("[Op] add %s %r to %r", self.kind, v, self.parent)
if self.kind == 'In':
v.add_operator(self.parent, False)
elif self.kind == "Out":
v.add_operator(self.parent, True)
else:
raise RuntimeError(
"Unexpected value for kind=%r." % self.kind)
def extend(self, vs):
for v in vs:
self.append(v)
def __getitem__(self, i):
v = list.__getitem__(self, i)
if isinstance(i, int) and not isinstance(v, Variable):
raise TypeError("Element %d must be a Variable not %r." % (
i, type(v)))
return v
def __setitem__(self, i, v):
raise LookupError(
"Setter should not be used to modify an element.")
def set_element(self, i, v):
"Updates element i."
if not isinstance(v, Variable):
raise TypeError(
"Value v must be a Variable not %r." % type(v))
logger.debug(
"[Op] %s-change element %d from %r to %r in %r",
self.kind, i, self[i], v, self.parent)
list.__setitem__(self, i, v)
def to_string(self):
names = []
for o in self:
if hasattr(o, 'onnx_name'):
names.append(o.onnx_name)
else:
names.append('"%s"' % str(o))
return ",".join(names)
def __init__(self, onnx_name, scope, type, raw_operator,
target_opset, scope_inst):
"""
:param onnx_name: A unique ID, which is a string
:param scope: The name of the scope where this operator is
declared. It's a string.
:param type: A object which uniquely characterizes the type of
this operator. For example, it can be a string,
pooling, if this operator is associated with a
CoreML pooling layer.
:param raw_operator: The original operator which defines this operator;
for example, a scikit-learn Imputer and
a CoreML Normalizer.
:param target_opset: The target opset number for the converted model.
:param scope_inst: :class:`Scope` instance the operator belongs to
"""
if isinstance(raw_operator, str):
raise RuntimeError("Parameter raw_operator must be an object not "
"a string '{0}'.".format(raw_operator))
# operator name in the converted model, if raw_operator
# is not None, output_shapes can be guessed
# from the raw model. Otherwise, it can be guessed
# from the input shapes.
self.onnx_name = onnx_name
self.scope = scope
self.type = type
self.raw_operator = raw_operator
self.inputs = Operator.OperatorList(self, 'In')
self.outputs = Operator.OperatorList(self, 'Out')
self._is_evaluated = None
self.target_opset = target_opset
self.scope_inst = scope_inst
logger.debug('[Op] +%r', self)
def new_raw_operator(self, raw_operator, alias):
"""
Returns a shallow copy of this operator,
changes the raw_operator but keeps the same inputs
and outputs.
"""
op = Operator(self.onnx_name, self.scope, alias, raw_operator,
self.target_opset, self.scope_inst)
op.inputs = self.inputs
op.outputs = self.outputs
return op
def __repr__(self):
try:
textop = repr(self.raw_operator)
except AttributeError:
textop = "MISSING OP"
except KeyError:
# The line above fails for python 3.7
textop = type(self.raw_operator)
if isinstance(textop, str) and "\n" in textop:
textop = textop.replace('\n', '').replace(' ', '')
return ("Operator(type='{0}', onnx_name='{1}', inputs='{2}', "
"outputs='{3}', raw_operator={4})".format(
self.type, self.onnx_name,
self.inputs.to_string(),
self.outputs.to_string(),
textop))
def __setattr__(self, name, value):
if name in ('inputs', 'outputs'):
if (isinstance(value, list) and
not isinstance(value, Operator.OperatorList)):
if name == 'inputs':
self.inputs = Operator.OperatorList(self, 'In')
self.inputs.extend(value)
return
if name == 'outputs':
self.outputs = Operator.OperatorList(self, 'Out')
self.outputs.extend(value)
return
if not isinstance(value, Operator.OperatorList):
raise TypeError(
"inputs or outputs must be of type Operator.OperatorList.")
ioo = name == 'outputs'
for v in value:
v.add_operator(self, ioo)
self.__dict__[name] = value
@property
def is_evaluated(self):
return self._is_evaluated
def init_status(self, is_evaluated=None):
if is_evaluated is not None and is_evaluated != self.is_evaluated:
logger.debug(
'[Op] update is_evaluated=%r for %r',
is_evaluated, self)
self._is_evaluated = is_evaluated
@property
def full_name(self):
"""
Return a globally unique operator ID
"""
return self.onnx_name
@property
def input_full_names(self):
"""
Return all input variables' names
"""
return [variable.full_name for variable in self.inputs]
@property
def output_full_names(self):
"""
Return all output variables' names
"""
return [variable.full_name for variable in self.outputs]
@property
def original_operator(self):
"""
Return the original operator/layer
"""
return self.raw_operator
def infer_types(self):
# Invoke a core inference function
if self.type is None:
raise MissingShapeCalculator(
"Unable to find a shape calculator for type '{}'.".format(
type(self.raw_operator)))
try:
shape_calc = _registration.get_shape_calculator(self.type)
except ValueError:
raise MissingShapeCalculator(
"Unable to find a shape calculator for alias '{}' "
"and type '{}'.".format(self.type, type(self.raw_operator)))
if shape_calc is None:
raise MissingShapeCalculator(
"Unexpected shape calculator for alias '{}' "
"and type '{}'.".format(self.type, type(self.raw_operator)))
logger.debug(
"[Shape-a] %r fed %r - %r", self,
"".join(str(i.is_fed) for i in self.inputs),
"".join(str(i.is_fed) for i in self.outputs))
shape_calc(self)
logger.debug(
"[Shape-b] %r inputs=%r - outputs=%r",
self, self.inputs, self.outputs)
[docs]class Scope:
"""
Every node of an *ONNX* graph must be unique. This class holds the list
of existing name for every node already defined in graph. It also
provides functions to create a unique unused name.
"""
def __init__(self, name, target_opset=None,
custom_shape_calculators=None, options=None,
registered_models=None, naming=None):
"""
:param name: A string, the unique ID of this scope in a
Topology object
:param target_opset: The target opset number for the converted
model.
:param custom_conversion_functions: a dictionary for specifying
the user customized conversion function
:param custom_shape_calculators: a dictionary for specifying
the user customized shape calculator
:param options: see :ref:`l-conv-options`
:param naming: the user may want to change the way intermediate
are named, this parameter can be a string (a prefix) or a
function, which signature is the following:
`get_name(name, existing_names)`, the library will then
check this name is unique and modify it if not
:param registered_models: registered models
.. versionchanged:: 1.10.0
Parameter *naming* was added.
"""
self.name = name
self.onnx_variable_names = set()
self.onnx_operator_names = set()
self.target_opset = target_opset
self.custom_shape_calculators = custom_shape_calculators
# An one-to-many map from raw variable name to ONNX variable
# names. It looks like
# (key, value) = (raw_name, [onnx_name, onnx_name1, onnx_name2, ..., onnx_nameN]) # noqa
# The last name may hide all other names in this scope.
self.variable_name_mapping = {}
# A map of local variables defined in this scope.
# (key, value) = (onnx_name, variable)
self.variables = OrderedDict()
self.input_variables = []
self.output_variables = []
# A map of local operators defined in this scope.
# (key, value) = (onnx_name, operator)
self.operators = {}
# Additional options given to converters.
self.options = options
# Registered models
self.registered_models = registered_models
self.naming = naming
if naming is None:
self._naming = Topology._generate_unique_name
elif isinstance(naming, str):
self._naming = (
lambda seed, names: Topology._generate_unique_name(
self.naming + seed, names))
elif callable(self.naming):
self._naming = (
lambda seed, names: Topology._generate_unique_name(
self.naming(seed, names), names))
else:
raise TypeError(
"Unexpected type for parameter naming: %r." % type(naming))
def get(self, var_name, default_value):
"Returns variable with 'name' or default value is not found."
return self.variables.get(var_name, default_value)
def has_variable_name(self, name):
"""
Tells if a variable is already registered.
"""
return name in self.onnx_variable_names
def get_shape_calculator(self, model_type):
"""
Returns the shape calculator for the given model type.
:param model_type: model type such as *LogisticRegression*
:return: alias or None if not found
"""
return self.custom_shape_calculators.get(model_type, None)
[docs] def get_unique_variable_name(self, seed, rename=True):
"""
Creates a unique variable ID based on the given seed.
"""
if not isinstance(seed, str):
raise TypeError("Parameter seed must be a string not {}."
"".format(type(seed)))
if rename:
name = self._naming(seed, self.onnx_variable_names)
else:
name = Topology._generate_unique_name(
seed, self.onnx_variable_names)
return name
[docs] def get_unique_operator_name(self, seed):
"""
Creates a unique operator ID based on the given seed.
"""
return self._naming(seed, self.onnx_operator_names)
def declare_local_variable(self, raw_name, type=None, prepend=False,
missing_type=False, rename=True):
"""
This function may create a new variable in this scope. If
*raw_name* has been used to create other variables, the new
variable will hide all other variables created using *raw_name*.
"""
if type is None and not missing_type:
raise RuntimeError(
"Unknown type for %r (type=%r)." % (raw_name, type))
# Get unique ID for the new variable
onnx_name = self.get_unique_variable_name(raw_name, rename=rename)
# Create the variable
variable = Variable(raw_name, onnx_name, self.name, type)
self.register_variable(variable, prepend=prepend)
return variable
def register_variable(self, var, prepend=False):
"Adds a variable to the scope."
if var.onnx_name in self.variables:
raise RuntimeError(
"Variable %r already registered (other=%r)." % (
var, self.variables[var.onnx_name]))
if var.raw_name in self.variable_name_mapping:
# Hide existing variables with the same raw_name
if not prepend:
self.variable_name_mapping[var.raw_name].append(var.onnx_name)
else:
self.variable_name_mapping[var.raw_name].insert(
0, var.onnx_name)
else:
self.variable_name_mapping[var.raw_name] = [var.onnx_name]
self.variables[var.onnx_name] = var
def declare_existing_subgraph_name(self, graph_proto):
"""
Declare all name from a subgraph in order to avoid being picked twice.
"""
output_name = {o.name for o in graph_proto.output}
for node in graph_proto.node:
for name in node.output:
if name in output_name:
continue
if self.has_variable_name(name):
raise NameError(
"Result name %r is already taken (outputs=%r) "
"(node=%r)." % (
name, output_name, node))
self.onnx_variable_names.add(name)
if node.name in self.onnx_operator_names:
raise NameError(
"Operator name %r is already taken "
"(node=%r)." % (
node.name, node))
self.onnx_operator_names.add(node.name)
def rename_onnx_name(self, old_name, new_name):
if new_name in self.variables:
raise RuntimeError(
"Name %r already in variables (%r)." % (
new_name, self.variables[new_name]))
if old_name not in self.variables:
raise RuntimeError(
"Unable to find name %r in variables." % old_name)
logger.debug(
'[Scope] update onnx_name, from %r to %r',
old_name, new_name)
self.variables[new_name] = self.variables[old_name]
del self.variables[old_name]
def declare_local_input(self, raw_name, type=None, prepend=False,
rename=True):
"""
Calls `declare_local_variable`. Registers this variable
as an input.
"""
var = self.declare_local_variable(
raw_name, type=type, prepend=prepend, rename=rename)
self.input_variables.append(var)
return var
def declare_local_output(self, raw_name, type=None, prepend=False,
missing_type=False):
"""
Calls `declare_local_variable`. Registers this variable
as an output.
"""
var = self.declare_local_variable(
raw_name, type=type, prepend=prepend,
missing_type=missing_type)
self.output_variables.append(var)
return var
def declare_local_operator(self, type, raw_model=None):
"""
This function is used to declare new local operator.
"""
onnx_name = self.get_unique_operator_name(str(type))
operator = Operator(onnx_name, self.name, type, raw_model,
self.target_opset, scope_inst=self)
self.operators[onnx_name] = operator
return operator
def _get_allowed_options(self, model, fail=True):
if self.registered_models is not None:
if type(model) not in self.registered_models['aliases']:
if fail:
raise NotImplementedError(
"No registered models, no known allowed options "
"for model '{}'.".format(model.__class__.__name__))
return {}
alias = self.registered_models['aliases'][type(model)]
conv = self.registered_models['conv'][alias]
allowed = conv.get_allowed_options()
return allowed
raise NotImplementedError(
"No registered models, no known allowed options "
"for model '{}'.".format(model.__class__.__name__))
def add_options(self, model_id, options):
"""
Adds an option, for example,
``add_options(id(clr), {'raw_scores': True})``
tells the converter associated to ``clr`` to
use raw score instead of probabilities.
:param model_id: class or ``id(instance)``
:param options: dictionary with the new values
"""
if options is None:
return
if self.options is None:
self.options = {}
if model_id not in self.options:
self.options[model_id] = None
if self.options[model_id] is None:
self.options[model_id] = {}
self.options[model_id].update(options)
def get_options(self, model, default_values=None, fail=True):
"""
Returns additional options for a model.
It first looks by class then by id (``id(model)``).
:param model: model being converted
:param default_values: default options (it is modified by
the function)
:param fail: fails if option it not found
:return: dictionary
"""
return _build_options(
model, self.options, default_values,
self._get_allowed_options(model, fail=fail),
fail=fail)
def replace_raw_operator(self, op1, op2, alias):
"""
Replaces every raw operator op1 by op2.
The function uses `id()` to detect op1.
"""
for v in self.operators.values():
if id(v.raw_operator) == id(op1):
logger.debug(
'[Scope] replace %d by %d in %r.',
id(v.raw_operator), id(op1), v)
v.raw_operator = op2
v.type = alias
[docs]class Topology:
"""
Holds instances on :class:`Scope <skl2onnx.common._topology.Scope>` and
:class:`SklearnModelContainer
<skl2onnx.common._container.SklearnModelContainer>`.
These are filled by the converters while a pipeline is being converted.
"""
def __init__(self, model, default_batch_size=1, initial_types=None,
target_opset=None, custom_conversion_functions=None,
custom_shape_calculators=None, registered_models=None):
"""
Initializes a *Topology* object, which is an intermediate
representation of a computational graph.
:param model: RawModelContainer object or one of its derived
classes. It contains the original model.
:param default_batch_size: batch_size prepend to scalar and
array types from CoreML. It's usually
1 or None.
:param initial_types: A list providing some types for some
root variables.
Each element is a tuple of a variable name and a type defined
in *data_types.py*.
:param custom_conversion_functions: a dictionary for specifying
the user customized conversion function
:param custom_shape_calculators: a dictionary for specifying the
user customized shape calculator
:param registered_models: registered models
"""
self.scopes = []
self.raw_model = model
self.scope_names = set()
self.initial_types = initial_types if initial_types else list()
self.default_batch_size = default_batch_size
self.target_opset = target_opset
self.custom_conversion_functions = (
custom_conversion_functions if custom_conversion_functions else {})
self.custom_shape_calculators = (
custom_shape_calculators if custom_shape_calculators else {})
for k in self.custom_conversion_functions:
if not callable(k):
raise TypeError("Keys in custom_conversion_functions must be "
"types not strings.")
for k in self.custom_shape_calculators:
if not callable(k):
raise TypeError("Keys in custom_shape_calculators must be "
"types not strings.")
# A map of local overwritten model aliases.
self.model_aliases = {}
all_model_types = (set(self.custom_conversion_functions)
| set(self.custom_shape_calculators))
for mtype in all_model_types:
alias = "{}_{}".format(mtype.__name__, id(self))
self.model_aliases[mtype] = alias
# Registered models
if registered_models is None:
raise AssertionError()
self.registered_models = registered_models
@property
def scope(self):
if len(self.scopes) != 1:
raise RuntimeError(
"Only one scope is allowed not %d." % len(self.scopes))
return self.scopes[0]
@staticmethod
def _generate_unique_name(seed, existing_names):
"""
Produce an unique string based on the seed
:param seed: a string
:param existing_names: a set containing strings which cannot be
produced
:return: a string similar to the seed
"""
if seed == '':
raise ValueError('Name seed must be a non-empty string.')
# Make the seed meet C-style naming convention
# Only alphabets and numbers are allowed
seed = re.sub('[^\\w+]', '_', seed)
# The first symbol cannot be a number
if re.match('^[0-9]', seed):
seed = '_' + seed
# If seed has never been seen, we return it as it is. Otherwise,
# we will append an number to make it unique.
if seed not in existing_names:
existing_names.add(seed)
return seed
else:
i = 1
while seed + str(i) in existing_names:
i += 1
new_name = seed + str(i)
existing_names.add(new_name)
return new_name
def get_unique_scope_name(self, seed):
return Topology._generate_unique_name(seed, self.scope_names)
def declare_scope(self, seed, parent_scopes=None, options=None,
naming=None):
"""
Creates a new :class:`Scope <skl2onnx.common._topology.Scope>`
and appends it to the list of existing scopes.
"""
if len(self.scopes) != 0:
raise RuntimeError(
"Only one scope can be created.")
scope = Scope(
self.get_unique_scope_name(seed), target_opset=self.target_opset,
custom_shape_calculators=self.custom_shape_calculators,
options=options, registered_models=self.registered_models,
naming=naming)
# Declare input variables.
# They should be the inputs of the scikit-learn
# model you want to convert into ONNX.
for var_name, initial_type in self.initial_types:
scope.declare_local_input(var_name, initial_type, rename=False)
self.scopes.append(scope)
return scope
def unordered_operator_iterator(self):
for scope in self.scopes:
for operator in scope.operators.values():
yield operator
def unordered_variable_iterator(self):
for scope in self.scopes:
for variable in scope.variables.values():
yield variable
def call_converter(self, operator, container, verbose=0):
"Calls converter for operator *operator*."
mtype = type(operator.raw_operator)
if mtype in self.custom_conversion_functions:
conv = self.custom_conversion_functions[mtype]
elif operator.type in self.custom_conversion_functions:
conv = self.custom_conversion_functions[operator.type]
elif hasattr(operator.raw_operator, "onnx_converter"):
conv = operator.raw_operator.onnx_converter()
else:
# Convert the selected operator into some ONNX objects and
# save them into the container
try:
conv = _registration.get_converter(operator.type)
except ValueError:
raise MissingConverter(
"Unable to find converter for alias '{}' type "
"'{}'. You may raise an issue at "
"https://github.com/onnx/sklearn-onnx/issues."
"".format(operator.type,
type(getattr(operator, 'raw_model', None))))
container.validate_options(operator)
if verbose > 0:
print("[call_converter] call converter for %r." % operator.type)
logger.debug(
"[Conv] call %r fed %r - %r", operator,
"".join(str(i.is_fed) for i in operator.inputs),
"".join(str(i.is_fed) for i in operator.outputs))
conv(self.scopes[0], operator, container)
logger.debug("[Conv] end - %r", operator)
def call_shape_calculator(self, operator):
"Calls shape_calculator for operator *operator*."
mtype = type(operator.raw_operator)
if mtype in self.custom_shape_calculators:
# overwritten operator.
source = 'custom'
shape_calc = self.custom_shape_calculators[mtype]
elif operator.type in self.custom_shape_calculators:
source = 'custom'
shape_calc = self.custom_shape_calculators[operator.type]
elif hasattr(operator.raw_operator, "onnx_shape_calculator"):
source = 'onnx_shape_calculator'
shape_calc = operator.raw_operator.onnx_shape_calculator()
else:
source = ""
shape_calc = None
if shape_calc is not None:
logger.debug(
"[Shape1] %r fed %r - %r (source=%r)", operator,
",".join(str(i.is_fed) for i in operator.inputs),
",".join(str(i.is_fed) for i in operator.outputs),
source)
shape_calc(operator)
else:
logger.debug('[Shape2] call infer_types for %r', operator)
operator.infer_types()
def _initialize_graph_status_for_traversing(self):
"""
Initialize the status of all variables and operators before
traversing the graph. Only used by convert_operators.
"""
if len(self.scopes) != 1:
raise RuntimeError(
"Only one scope is allowed not %d." % len(self.scopes))
input_names = set(v.onnx_name for v in self.scopes[0].input_variables)
if len(input_names) == 0:
raise RuntimeError("No detected inputs.")
for variable in self.unordered_variable_iterator():
is_input = variable.onnx_name in input_names
variable.init_status(is_fed=is_input)
for operator in self.unordered_operator_iterator():
operator.init_status(is_evaluated=False)
def _propagate_status(self, operator, container, fed_variables,
verbose=0):
"""
Propagates status *is_fed* based on output variable
and node added in the container.
"""
if verbose > 1:
print("[_propagate_status] after op=%r" % operator)
vars = {}
for node in container.nodes:
for i in node.input:
if i not in vars:
vars[i] = []
vars[i].append(node)
if verbose > 1:
print("[_propagate_status] newly fed=%r" % list(
v.onnx_name for v in operator.outputs if v.is_fed))
stack = list(fed_variables)
scope = self.scopes[0]
while len(stack) > 0:
nodes = {}
for name in stack:
if name not in vars:
continue
for n in vars[name]:
nodes[id(n)] = n
stack = []
for node in nodes.values():
if all(fed_variables.get(n, False) for n in node.input):
for o in node.output:
if o not in fed_variables:
if verbose > 1:
print("[_propagate_status] add=%r" % o)
fed_variables[o] = o
stack.append(o)
if o in scope.variables:
var = scope.variables[o]
var.init_status(is_fed=True)
if verbose > 1:
print("[_propagate_status] fed=%r" % var)
def convert_operators(self, container=None, verbose=0):
"""
Calls all converters and shape_calculator for existing
operators. It also processes new operators created by
converters.
"""
def _check_operator_(operator):
if not isinstance(operator.inputs, Operator.OperatorList):
raise TypeError(
"operator.inputs must be a Operator.OperatorList "
"not %r." % type(operator.inputs))
if not isinstance(operator.outputs, Operator.OperatorList):
raise TypeError(
"operator.outputs must be a Operator.OperatorList "
"not %r." % type(operator.outputs))
if any(not isinstance(i, Variable) for i in operator.inputs):
raise TypeError(
"One input is not a Variable for operator %r - %r."
"" % (type(operator.raw_operator), operator))
if any(not isinstance(i, Variable) for i in operator.outputs):
raise TypeError(
"One output is not a Variable for operator %r - %r."
"" % (type(operator.raw_operator), operator))
def _check_variable_in_(variable, operator):
idop = id(operator)
ids = set(id(op) for op in variable.operators_inputs_)
if idop not in ids:
raise RuntimeError(
"Operator %r not registered in the list of operators "
"of %r taking it as an input [\n%s]." % (
operator, variable,
"\n".join(map(str, variable.operators_inputs_))))
def _check_variable_out_(variable, operator):
if variable.is_fed:
add = ["", "--DEBUG-INFO--"]
for scope in self.scopes:
add.append('---')
add.append(pprint.pformat(
scope.variable_name_mapping))
add.append('---')
for var in scope.variables.values():
add.append(" is_fed=%s %s - n_in=%d n_out=%d" % (
getattr(var, 'is_fed', '?'), var,
len(var.operators_inputs_),
len(var.operators_outputs_)))
add.append('---')
for op in scope.operators.values():
add.append(" is_evaluated=%s %s" % (
getattr(op, 'is_evaluated', '?'), op))
add.append('---')
for v in operator.inputs:
add.append(" inputs={}".format(v))
for v in operator.outputs:
add.append(" outputs={}".format(v))
add.append('--- operator producing this variable--')
for op in variable.operators_outputs_:
add.append(str(op))
raise RuntimeError(
"A variable is already assigned ({}) "
"for operator '{}' (name='{}'). "
"operator.is_evaluated={}, inputs.is_fed={}, "
"outputs.is_fed={}. "
"This may still happen if a converter is a "
"combination of sub-estimators and one "
"of them is producing this output. "
"In that case, an identity node must be "
"added.{}".format(
variable, operator.type,
operator.onnx_name, operator.is_evaluated,
[v.is_fed for v in operator.inputs],
[v.is_fed for v in operator.outputs],
"\n".join(add)))
if verbose > 0:
print("[convert_operators] begin")
self._initialize_graph_status_for_traversing()
fed_variables = {i.name: i for i in container.initializers}
changes = 1
n_iter = 0
while changes > 0:
n_iter += 1
changes = 0
ops = list(self.unordered_operator_iterator())
if verbose > 0:
print("[convert_operators] iteration %d - n_vars=%d "
"n_ops=%d" % (
n_iter, len(fed_variables), len(ops)))
for operator in ops:
_check_operator_(operator)
for var in operator.inputs:
if var.is_fed:
fed_variables[var.onnx_name] = var
if (all(variable.is_fed for variable in operator.inputs) and
not operator.is_evaluated):
for variable in operator.inputs:
_check_variable_in_(variable, operator)
for variable in operator.outputs:
_check_variable_out_(variable, operator)
self.call_shape_calculator(operator)
self.call_converter(operator, container, verbose=verbose)
# If an operator contains a sequence of operators,
# output variables are not necessarily known at this stage.
operator.init_status(is_evaluated=True)
for variable in operator.outputs:
if all(op.is_evaluated
for op in variable.operators_outputs_):
variable.init_status(is_fed=True)
fed_variables[variable.onnx_name] = variable
fed_variables.update(
{i.name: i for i in container.initializers
if i.name not in fed_variables})
self._propagate_status(operator, container, fed_variables,
verbose=verbose)
# unfed some variables (it happens when a node
# shares an output with another node)
rem = []
for n, var in fed_variables.items():
if not hasattr(var, 'operators_outputs_'):
# initializer
continue
if any(not o.is_evaluated
for o in var.operators_outputs_):
rem.append(n)
for r in rem:
v = fed_variables[r]
v.init_status(is_fed=False)
del fed_variables[v.onnx_name]
changes += 1
if verbose > 0:
print("[convert_operators] end iter: %d - n_vars=%d" % (
n_iter, len(fed_variables)))
if verbose > 0:
print("[convert_operators] end.")
# Last verification.
not_evaluated = []
for op in self.unordered_operator_iterator():
if not op.is_evaluated:
not_evaluated.append(op)
if len(not_evaluated) > 0:
rows = ["---VARS---"]
for var in self.unordered_variable_iterator():
rows.append(
"is_fed=%r is_leaf=%r is_root=%r - %r - n_in=%d n_out=%d"
"" % (var.is_fed, var.is_leaf, var.is_root, var,
len(var.operators_inputs_),
len(var.operators_outputs_)))
rows.append("---OPERATORS---")
for op in self.unordered_operator_iterator():
rows.append("is_eval=%r - %r" % (op.is_evaluated, op))
rows.append("---NODES---")
for node in container.nodes:
rows.append("%s: %r -> %r" % (
node.op_type, node.input, node.output))
raise RuntimeError(
"Not all operators have been evaluated. A variable name "
"is probably misspelled.\n%s"
"" % "\n".join(rows))
# Input and output
if len(self.scopes[0].input_variables) > 0:
inputs = self.scopes[0].input_variables
else:
inputs = [v for v in self.unordered_variable_iterator()
if v.is_root]
for i in inputs:
container.add_input(i)
outputs = [v for v in self.unordered_variable_iterator()
if v.is_leaf]
# The function checks that for output variable,
# raw_name equal onnx_name. It swaps names if it is not the case.
to_swap = []
for out in outputs:
if out.raw_name != out.onnx_name:
to_swap.append(out)
if len(to_swap) != 0:
swaped = set()
for var in to_swap:
if var.raw_name in swaped:
continue
swaped.add(var.raw_name)
if verbose > 1:
print("[convert_operators] %r <-> %r." % (
var.raw_name, var.onnx_name))
old_name = var.onnx_name
new_name = var.raw_name
try:
container.swap_names(old_name, new_name)
except NotImplementedError as e:
logger.debug(
'[Topo] unable to swap %r and %r (%r).',
old_name, new_name, e)
continue
for v in self.unordered_variable_iterator():
if v.onnx_name == old_name:
v.set_onnx_name(new_name)
elif v.onnx_name == new_name:
v.set_onnx_name(old_name)
for o in outputs:
container.add_output(o)
[docs]def convert_topology(topology, model_name, doc_string, target_opset,
channel_first_inputs=None,
options=None, remove_identity=True,
verbose=0):
"""
This function is used to convert our Topology object defined in
_parser.py into a ONNX model (type: ModelProto).
:param topology: The Topology object we are going to convert
:param model_name: GraphProto's name. Let "model" denote the
returned model. The string "model_name" would be
assigned to "model.graph.name."
:param doc_string: A string attached to the produced model
:param target_opset: number or dictionary,
for example, 7 for ONNX 1.2, and 8 for ONNX 1.3,
a dictionary is used to indicate different opset for
different domains
:param options: see :ref:`l-conv-options`
:param remove_identity: removes identity nodes
include '1.1.2', '1.2', and so on.
:param verbose: displays information while converting
:return: a ONNX ModelProto
"""
if target_opset is None:
target_opset = get_latest_tested_opset_version()
if isinstance(target_opset, dict):
onnx_target_opset = target_opset.get(
'', get_latest_tested_opset_version())
else:
onnx_target_opset = target_opset
if onnx_target_opset > get_opset_number_from_onnx():
found = get_opset_number_from_onnx()
raise RuntimeError(
"Parameter target_opset {} > {} is higher than the "
"version of the installed onnx package. See "
"https://github.com/onnx/onnx/blob/master/docs/"
"Versioning.md#released-versions"
".".format(onnx_target_opset, found))
if onnx_target_opset > get_latest_tested_opset_version():
warnings.warn(
"Parameter target_opset {} > {} is higher than the "
"the latest tested version"
".".format(
onnx_target_opset,
get_latest_tested_opset_version()))
container = ModelComponentContainer(
target_opset, options=options,
registered_models=topology.registered_models,
white_op=topology.raw_model._white_op,
black_op=topology.raw_model._black_op,
verbose=verbose)
# Traverse the graph from roots to leaves
# This loop could eventually be parallelized.
topology.convert_operators(container=container, verbose=verbose)
container.ensure_topological_order()
if len(container.inputs) == 0:
raise RuntimeError("No detected inputs after conversion.")
if len(container.outputs) == 0:
raise RuntimeError("No detected outputs after conversion.")
if verbose >= 2:
print("---NODES---")
for node in container.nodes:
print(" %s - %s: %r -> %r" % (
node.op_type, node.name, node.input, node.output))
# Create a graph from its main components
if container.target_opset_onnx < 9:
# When calling ModelComponentContainer's add_initializer(...),
# nothing is added into the input list. However, for ONNX target
# opset < 9, initializers should also be a part of model's
# (GraphProto) inputs. Thus, we create ValueInfoProto objects
# from initializers (type: TensorProto) directly and then add
# them into model's input list.
extra_inputs = [] # ValueInfoProto list of the initializers
for tensor in container.initializers:
# Sometimes (especially when creating optional input values
# such as RNN's initial hidden state), an initializer is also
# one of the original model's input, so it has been added into
# the container's input list. If this is the case, we need to
# skip one iteration to avoid duplicated inputs.
if tensor.name in [value_info.name for value_info in
container.inputs]:
continue
# Initializers are always tensors so we can just call
# make_tensor_value_info(...).
value_info = make_tensor_value_info(
tensor.name, tensor.data_type, tensor.dims)
extra_inputs.append(value_info)
# Before ONNX opset 9, initializers were needed to be passed in
# with inputs.
graph = make_graph(container.nodes, model_name,
container.inputs + extra_inputs,
container.outputs, container.initializers)
else:
# In ONNX opset 9 and above, initializers are included as
# operator inputs and therefore do not need to be passed as
# extra_inputs.
graph = make_graph(
container.nodes, model_name, container.inputs,
container.outputs, container.initializers)
# Add extra information related to the graph
graph.value_info.extend(container.value_info)
# Create model
onnx_model = make_model(graph)
# Update domain version
opv = min(onnx_target_opset,
_get_main_opset_version(onnx_model) or onnx_target_opset)
if not _update_domain_version(container, onnx_model, verbose=verbose):
# Main opset was not added. Doing it here.
op_set = onnx_model.opset_import.add()
op_set.domain = ''
op_set.version = opv
if verbose > 0:
print('[convert_topology] +opset: name=%r, version=%s' % (
'', opv))
# Add extra information
irv = OPSET_TO_IR_VERSION.get(opv, onnx_proto.IR_VERSION)
onnx_model.ir_version = irv
onnx_model.producer_name = utils.get_producer()
onnx_model.producer_version = utils.get_producer_version()
onnx_model.domain = utils.get_domain()
onnx_model.model_version = utils.get_model_version()
onnx_model.doc_string = doc_string
# Removes many identity nodes,
# the converter may introduct identity nodes
# after a zipmap operator and onnx <= 1.7 does not
# support that. It does not use onnxconverter-common
# as the optimizer only support opset >= 9.
if remove_identity:
onnx_model = onnx_remove_node_identity(onnx_model)
return onnx_model
def _update_domain_version(container, onnx_model, verbose=0):
# Merge operator sets for the same domain, the largest version
# number would be kept
purified_operator_set = dict()
for op_domain, op_version in container.node_domain_version_pair_sets:
if op_domain not in purified_operator_set:
purified_operator_set[op_domain] = op_version
else:
purified_operator_set[op_domain] = max(
purified_operator_set[op_domain], op_version)
# Fill operator sets
i = 0
for op_domain, op_version in purified_operator_set.items():
if op_version is None:
continue
if i == 0 and len(onnx_model.opset_import) == 1:
# Overwrite the default operator set created by
# make_model(...)
op_set = onnx_model.opset_import[0]
else:
# Just create one ONNX element in opset_import
op_set = onnx_model.opset_import.add()
if verbose > 0:
print('[_update_domain_version] +opset %d: name=%r, version=%s' % (
i, op_domain, op_version))
op_set.domain = op_domain
if op_set != '':
max_supported = get_default_opset_for_domain(op_domain)
if max_supported is not None and max_supported < op_version:
raise RuntimeError(
"The model is using version %d of domain %r not supported "
"yet by this library. You need to specify "
"target_opset={%r: %r}." % (
op_version, op_domain, op_domain, max_supported))
op_set.version = op_version
i += 1
if container.target_opset_any_domain(op_domain) < op_version:
raise RuntimeError(
'The specified opset %d is too low to convert '
'this model, which requires at least opset '
'%d.' % (
container.target_opset_any_domain(op_domain),
op_version))
return '' in purified_operator_set
def _get_main_opset_version(model):
"""
Returns the main opset version.
"""
mld = None
for op in model.opset_import:
if op.domain == '':
return op.version
if op.domain == "ai.onnx.ml":
mld = op.version
if mld is not None:
return OPSET_ML_TO_OPSET.get(mld, None)
return None