# pylint: disable=C0302
"""
Implements a class able to compute the predictions
from on an :epkg:`ONNX` model.
:githublink:`%|py|7`
"""
from collections import OrderedDict
from io import BytesIO
from time import perf_counter
import warnings
import textwrap
import numpy
from scipy.sparse import coo_matrix
from onnx import load, load_model, checker, shape_inference
from onnx import onnx_pb as onnx_proto
from onnx.helper import make_model
from ..tools.code_helper import make_callable
from .onnx_inference_node import OnnxInferenceNode
from .onnx_inference_manipulations import (
select_model_inputs_outputs, enumerate_model_node_outputs)
from .onnx_inference_exports import OnnxInferenceExport
from .optim import onnx_remove_node_unused
from .onnx2py_helper import _var_as_dict, numpy_min, numpy_max
from .shape_object import ShapeObject
[docs]class OnnxInference:
"""
Loads an :epkg:`ONNX` file or object or stream.
Computes the output of the :epkg:`ONNX` graph.
Several runtimes are available.
* ``'python'``: the runtime implements every onnx operator
needed to run a :epkg:`scikit-learn` model by using :epkg:`numpy`
or C++ code.
* ``'python_compiled'``: it is the same runtime than the previous
one except every operator is called from a compiled function
(:meth:`_build_compile_run <mlprodict.onnxrt.onnx_inference.OnnxInference._build_compile_run>`) instead for a method going through
the list of operator
* ``'onnxruntime1'``: uses :epkg:`onnxruntime`
* ``'onnxruntime2'``: this mode is mostly used to debug as
python handles calling every operator but :epkg:`onnxruntime`
is called for every of them
:githublink:`%|py|44`
"""
[docs] def __init__(self, onnx_or_bytes_or_stream, runtime=None,
skip_run=False, inplace=True,
input_inplace=False, ir_version=None,
target_opset=None, runtime_options=None):
"""
:param onnx_or_bytes_or_stream: :epkg:`onnx` object,
bytes, or filename or stream
:param runtime: runtime options
:param skip_run: do not build the runtime
:param inplace: use inplace computation
as much as possible
:param input_inplace: the computation is allowed
to overwrite the input,
see :meth:`_guess_inplace
<mlprodict.onnxrt.onnx_inference.OnnxInference._guess_inplace>`
:param ir_version: if not None, overwrite the default version
:param target_opset: used to overwrite *target_opset*
:param runtime_options: specific options for the runtime
:githublink:`%|py|64`
"""
if isinstance(onnx_or_bytes_or_stream, bytes):
self.obj = load_model(BytesIO(onnx_or_bytes_or_stream))
elif isinstance(onnx_or_bytes_or_stream, BytesIO):
self.obj = load_model(onnx_or_bytes_or_stream)
elif isinstance(onnx_or_bytes_or_stream, str):
self.obj = load(onnx_or_bytes_or_stream)
elif hasattr(onnx_or_bytes_or_stream, 'graph'):
self.obj = onnx_or_bytes_or_stream
elif isinstance(onnx_or_bytes_or_stream, onnx_proto.GraphProto):
self.obj = make_model(onnx_or_bytes_or_stream,
producer_name='mlprodict')
else:
raise TypeError("Unable to handle type {}.".format( # pragma: no cover
type(onnx_or_bytes_or_stream)))
if ir_version is not None:
self.obj.ir_version = ir_version
self.runtime = runtime
self.skip_run = skip_run
self.input_inplace = input_inplace
self.inplace = inplace
self.force_target_opset = target_opset
self.runtime_options = runtime_options
self._init()
[docs] def __getstate__(self):
"""
To pickle the object.
:githublink:`%|py|92`
"""
return {'onnx': self.obj.SerializeToString(),
'runtime': self.runtime,
'runtime_options': self.runtime_options,
'skip_run': self.skip_run,
'input_inplace': self.input_inplace,
'inplace': self.inplace,
'force_target_opset': self.force_target_opset}
[docs] def __setstate__(self, state):
"""
To unpickle the object.
:githublink:`%|py|104`
"""
onx = state['onnx']
self.obj = load_model(BytesIO(onx))
self.runtime = state['runtime']
self.runtime_options = state['runtime_options']
self.skip_run = state['skip_run']
self.input_inplace = state['input_inplace']
self.inplace = state['inplace']
self.force_target_opset = state['force_target_opset']
self._init()
[docs] def _init(self):
"""
Prepares the instance to deliver predictions.
:githublink:`%|py|118`
"""
self.graph_ = self.to_sequence()
if len(self.graph_['sequence']) == 0:
raise RuntimeError( # pragma: no cover
"No runnable nodes was found in the ONNX graph.")
self.outputs_ = self.graph_['outputs']
self.inputs_ = self.graph_['inputs']
self.target_opset_ = self.graph_['targets']
if self.force_target_opset is not None:
if isinstance(self.force_target_opset, dict):
self.target_opset_ = self.force_target_opset # pragma: no cover
else:
self.target_opset_ = {'': self.force_target_opset}
self.ir_version_ = self.graph_['ir_version']
if not self.skip_run:
if self.runtime == 'onnxruntime1':
# Loads the onnx with onnxruntime as a single file.
del self.graph_
from .ops_whole.session import OnnxWholeSession
self._whole = OnnxWholeSession(
self.obj, self.runtime, self.runtime_options)
self._run = self._run_whole_runtime
else:
self.sequence_ = self.graph_['sequence']
self.inits_ = self.graph_['inits']
dtype = self._guess_input_dtype()
variables = self.inits_.copy()
for node in self.sequence_:
domain = node.onnx_node.domain
target_opset = self.target_opset_.get(domain, None)
if self.runtime == 'onnxruntime2':
node.setup_runtime(self.runtime, variables, self.__class__,
target_opset=target_opset, dtype=dtype,
domain=domain, ir_version=self.ir_version_,
runtime_options=self.runtime_options)
else:
node.setup_runtime(self.runtime, variables, self.__class__,
target_opset=target_opset, domain=domain,
ir_version=self.ir_version_,
runtime_options=self.runtime_options)
if hasattr(node, 'ops_') and hasattr(node.ops_, 'typed_outputs_'):
for k, v in node.ops_.typed_outputs_:
variables[k] = v
self._run = self._run_sequence_runtime
if not self.skip_run and self.runtime in ('python', None):
self.shapes_ = self._set_shape_inference_runtime()
if self.inplace:
self.inplaces_ = self._guess_inplace(self.input_inplace)
self.exporters_ = OnnxInferenceExport(self)
self.to_json = self.exporters_.to_json
self.to_dot = self.exporters_.to_dot
self.to_python = self.exporters_.to_python
if self.runtime in ('python_compiled', 'python_compiled_debug'):
# switch the inference method to the compiled one
_, fct, code = self._build_compile_run('debug' in self.runtime)
setattr(self, '_run_compiled', fct)
setattr(self, '_run_compiled_code', code)
self._run = self._run_sequence_runtime_compiled
[docs] def _run_sequence_runtime_compiled(
self, inputs, clean_right_away=False, intermediate=False,
verbose=0, node_time=False, fLOG=None):
"""
Executes a compiled version of :meth:`_run_sequence_runtime <mlprodict.onnxrt.onnx_inference.OnnxInference._run_sequence_runtime>`,
compiled with method :meth:`_build_compile_run <mlprodict.onnxrt.onnx_inference.OnnxInference._build_compile_run>`.
Every parameter with a default value is ignored.
Switch to ``runtime='python'`` to enable those.
:githublink:`%|py|186`
"""
return self._run_compiled(inputs) # pylint: disable=E1101
[docs] def __str__(self):
"""
usual
:githublink:`%|py|203`
"""
rows = ['OnnxInference(...)']
if hasattr(self, '_run_compiled_code'):
rows.append(
textwrap.indent(
self._run_compiled_code, ' ')) # pylint: disable=E1101
else:
rows.append(textwrap.indent(str(self.obj), ' '))
return "\n".join(rows)
[docs] def __repr__(self):
"""
usual
:githublink:`%|py|216`
"""
return "OnnxInference(...)" # pragma: no cover
[docs] def check_model(self):
"""
Checks the model follow :epkg:`ONNX` conventions.
:githublink:`%|py|222`
"""
checker.check_model(self.obj)
[docs] def shape_inference(self):
"""
Infers the shape of the outputs
with :epkg:`onnx` package.
:return: A new :epkg:`ONNX` graph which defined outputs.
:githublink:`%|py|231`
"""
return shape_inference.infer_shapes(self.obj)
@property
def input_names(self):
"""
Returns the names of all inputs.
It does not include the optional inputs.
.. versionchanged:: 0.6
The list does not include optional inputs anymore.
:githublink:`%|py|242`
"""
inits = set(_.name for _ in self.obj.graph.initializer)
return [_.name for _ in self.obj.graph.input if _.name not in inits]
@property
def input_names_shapes(self):
"""
Returns the names and shapes of all inputs.
This method assumes all inputs are tensors.
It does not include the optional inputs.
.. versionchanged:: 0.6
The list does not include optional inputs anymore.
:githublink:`%|py|255`
"""
names = set(self.input_names)
return [(_.name, _var_as_dict(_)['type']['shape'])
for _ in self.obj.graph.input if _.name in names]
@property
def input_names_shapes_types(self):
"""
Returns the names, shapes, types of all inputs.
This method assumes all inputs are tensors.
It does not include the optional inputs.
.. versionchanged:: 0.6
The list does not include optional inputs anymore.
:githublink:`%|py|269`
"""
names = set(self.input_names)
return [(_.name, _var_as_dict(_)['type']['shape'],
'tensor(%s)' % _var_as_dict(_)['type']['elem'])
for _ in self.obj.graph.input if _.name in names]
@property
def output_names(self):
"""
Returns the names of all outputs.
:githublink:`%|py|279`
"""
return [_.name for _ in self.obj.graph.output]
@property
def output_names_shapes(self):
"""
Returns the names and shapes of all outputs.
This method assumes all inputs are tensors.
:githublink:`%|py|287`
"""
return [(_.name, _var_as_dict(_)['type'].get('shape', None))
for _ in self.obj.graph.output]
[docs] def global_index(self, name):
"""
Maps every name to one integer to avoid using dictionaries
when running the predictions.
:param name: outputs name
:return: integer
:githublink:`%|py|298`
"""
if not hasattr(self, '_global_index'):
self._global_index = {}
if name in self._global_index:
return self._global_index[name]
self._global_index[name] = len(self._global_index)
return self._global_index[name]
[docs] def to_sequence(self):
"""
Produces a graph to facilitate the execution.
One example:
.. exref::
:title: Convert ONNX into graph
An example on how to convert an :epkg:`ONNX`
graph into a graph.
.. runpython::
:showcode:
import pprint
import numpy
from skl2onnx.algebra.onnx_ops import OnnxLinearRegressor
from skl2onnx.common.data_types import FloatTensorType
from mlprodict.onnxrt import OnnxInference
pars = dict(coefficients=numpy.array([1., 2.]),
intercepts=numpy.array([1.]),
post_transform='NONE')
onx = OnnxLinearRegressor('X', output_names=['Y'], **pars)
model_def = onx.to_onnx({'X': pars['coefficients'].astype(numpy.float32)},
outputs=[('Y', FloatTensorType([1]))],
target_opset=12)
oinf = OnnxInference(model_def)
pprint.pprint(oinf.to_sequence())
See an example of representation in notebook
:ref:`onnxvisualizationrst`.
:githublink:`%|py|339`
"""
inits = {}
variables = {}
outputs = {}
nodes = {}
targets = {}
for o in self.obj.opset_import:
targets[o.domain] = o.version
# inputs
for obj in self.obj.graph.input:
variables[obj.name] = _var_as_dict(obj)
self.global_index(obj.name)
# outputs
for obj in self.obj.graph.output:
if hasattr(obj, 'type') and str(obj.type) != '':
outputs[obj.name] = _var_as_dict(obj)
else:
outputs[obj.name] = {'name': obj.name}
self.global_index(obj.name)
# initializer
for obj in self.obj.graph.initializer:
init_obj = _var_as_dict(obj)
if init_obj is None:
raise RuntimeError( # pragma: no cover
"Unable to convert an initializer\n{}".format(obj))
inits[obj.name] = init_obj
self.global_index(obj.name)
if 'value' not in inits[obj.name]:
raise RuntimeError( # pragma: no cover
"One initializer has no value: '{}'\n{}\n{}".format(
obj.name, inits[obj.name], obj))
# nodes
for node in self.obj.graph.node:
dobj = _var_as_dict(node)
if dobj is None:
raise RuntimeError( # pragma: no cover
"Unable to convert a node\n{}".format(node))
if 'atts' in dobj:
atts = dobj['atts']
for k, v in atts.items():
if not isinstance(v, dict) or 'value' not in v:
raise RuntimeError( # pragma: no cover
"A parameter has no (sparse) value '{}' for node '{}'\nv={}\ndobj=[{}]".format(
k, node.name, v, node))
if node.name in nodes: # pragma: no cover
i = 2
while True:
new_name = "%s_n%i" % (node.name, i)
if new_name not in nodes:
break
i += 1
else:
new_name = node.name
nodes[new_name] = OnnxInferenceNode(node, dobj, self.global_index)
# names
names = {}
for k, v in inits.items():
if (k, 0) in names:
raise RuntimeError( # pragma: no cover
"Initializer '{}' already exists (tag='{}').".format(
k, names[k, 0][0]))
names[k, 0] = ('C', v)
for k, v in variables.items():
if (k, 0) in names:
if k in inits:
# Kind of default value for an input
continue
raise RuntimeError( # pragma: no cover
"Variable '{}' already exists (tag='{}').".format(
k, names[k, 0][0]))
names[k, 0] = ('I', v)
for k, v in outputs.items():
if (k, 0) in names:
raise RuntimeError( # pragma: no cover
"Output '{}' already exists (tag='{}').".format(
k, names[k, 0][0]))
names[k, 0] = ('O', v)
for k, v in nodes.items():
if (k, 1) in names:
raise RuntimeError( # pragma: no cover
"Node '{}' already exists (tag='{}').".format(
k, names[k, 0][0]))
names[k, 1] = ('N', v)
# ordering
order = {}
modif = 1
intermediate = {}
while modif > 0:
modif = 0
for (k, _), v in names.items():
if (k, 1) in order:
# The operator node is already processed.
continue
if v[0] in {'I', 'C'}:
if (k, 0) not in order:
order[k, 0] = len(order) # A data node.
modif += 1
continue
if v[0] == 'O':
continue
if all((inp, 0) in order for inp in v[1].inputs):
# If all inputs are available,
# We tell the operator node is processed.
order[k, 1] = len(order)
modif += 1
for o in v[1].outputs:
if (o, 0) in order:
raise RuntimeError( # pragma: no cover
"Two nodes share the same output '{}' or an operator and an output "
"share the same name. "
"(node: {}).".format(o, v[1]))
# We add a data node.
order[o, 0] = len(order)
intermediate[o] = None
modif += 1
# compute
rev = [(v, k[0], k[1]) for k, v in order.items()]
rev.sort()
sequence = []
for _, name, node_kind in rev:
if name not in nodes:
continue
if node_kind == 0:
# It is an output which shares the same name
# as a node.
continue
node = nodes[name]
node.set_order(len(sequence))
sequence.append(node)
if len(sequence) == 0:
raise RuntimeError( # pragma: no cover
"No runnable nodes was found in the ONNX graph"
"\n--rev--\n{}"
"\n--order--\n{}"
"\n--nodes--\n{}"
"\n---".format(
"\n".join([str(_) for _ in names.items()]),
"\n".join([str(_) for _ in order.items()]),
"\n".join([str(_) for _ in nodes.items()])))
# defines where an intermediare output is not needed
last_used = {}
for node in sequence:
for inp in node.inputs:
last_used[inp] = node.order
for k, ord in last_used.items():
sequence[ord].add_variable_to_clean(k)
return dict(inits=inits, inputs=variables, outputs=outputs,
nodes=nodes, sequence=sequence, intermediate=intermediate,
targets=targets, ir_version=self.obj.ir_version)
[docs] def run(self, inputs, clean_right_away=False,
intermediate=False, verbose=0, node_time=False,
fLOG=None):
"""
Computes the predictions for this :epkg:`onnx` graph.
:param inputs: inputs as dictionary or a dataframe
:param clean_right_away: clean the intermediate outputs
as soon as they are not needed
:param intermediate: returns a dictionary of intermediate
variables instead of the results only
:param verbose: display information while predicting
:param node_time: measure time of each node
:param fLOG: logging function if *verbose > 0*
:return: outputs as dictionary
and a second dictionary of the time spent
in each node if *node_time* is True
.. exref::
:title: Computes predictions with any runtime
The following example compares predictions
between :epkg:`scikit-learn` and this runtime
for the python runtime.
.. runpython::
:showcode:
import numpy
from sklearn.linear_model import LinearRegression
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from mlprodict.onnxrt import OnnxInference
from mlprodict.onnx_conv import to_onnx
iris = load_iris()
X, y = iris.data, iris.target
X_train, X_test, y_train, _ = train_test_split(X, y)
clr = LinearRegression()
clr.fit(X_train, y_train)
exp = clr.predict(X_test[:5])
print(exp)
model_def = to_onnx(clr, X_train.astype(numpy.float32),
target_opset=12)
oinf = OnnxInference(model_def)
y = oinf.run({'X': X_test[:5]})
print(y)
The function returns all intermediate outputs
if *intermediate* is True. In case of runtime
*onnxruntime1*, if intermediate is True,
the first class builds all :epkg:`ONNX` cut out
to keep the one output and converted into
*OnnxInference*.
:githublink:`%|py|555`
"""
def retype(col_array):
if (hasattr(col_array, 'categories') and
hasattr(col_array, 'from_codes')):
# isinstance(col_array, pandas.Categorical):
return col_array.astype(numpy.int64)
return col_array
if hasattr(inputs, 'columns') and hasattr(inputs, 'iloc'):
# == isinstance(inputs, pandas.DataFrame)
inputs = OrderedDict((
name, retype(numpy.expand_dims(inputs[name].values, axis=1)))
for name in inputs.columns)
return self._run(inputs, clean_right_away=False, intermediate=intermediate,
verbose=verbose, node_time=node_time, fLOG=fLOG)
[docs] def display_sequence(self, verbose=1):
"""
Shows the sequence of nodes to run if ``runtime=='python'``.
:githublink:`%|py|574`
"""
rows = []
rows.append("#node: {}".format(len(self.sequence_)))
for i, node in enumerate(self.sequence_):
if verbose >= 1:
rows.append("{}: {}".format(i, str(node)))
return "\n".join(rows)
[docs] def _run_sequence_runtime(self, inputs, clean_right_away=False,
intermediate=False, verbose=0, node_time=False,
fLOG=None):
if clean_right_away:
raise NotImplementedError( # pragma: no cover
"clean_right_away=true not implemented.")
if node_time:
mtime = []
if verbose >= 1 and fLOG is not None:
printed = set()
if hasattr(self, "_values_init"):
values = self._values_init.copy() # pylint: disable=E0203
else:
values = [None] * len(self._global_index)
if verbose >= 1 and fLOG is not None:
for k, v in self.inits_.items():
values[self._global_index[k]] = v['value']
fLOG("+ki='{}': {} (dtype={} min={} max={})".format(
k, v['value'].shape, v['value'].dtype,
numpy_min(v['value']), numpy_max(v['value'])))
printed.add(k)
else:
for k, v in self.inits_.items():
values[self._global_index[k]] = v['value']
# stores the array to skip initialing a second time
if verbose == 0 or fLOG is None:
self._values_init = values.copy()
for name, value in inputs.items():
values[self._global_index[name]] = value
if verbose == 0 or fLOG is None:
if node_time:
for i, node in enumerate(self.sequence_):
t = perf_counter()
node.run(values)
t2 = perf_counter()
mtime.append(dict(i=i, name=node.onnx_node.name,
op_type=node.onnx_node.op_type,
time=t2 - t))
else:
for node in self.sequence_:
node.run(values)
else:
def dispsimple(arr):
if hasattr(arr, 'shape'):
if len(arr.shape) <= 1:
threshold = 8
else:
threshold = min(
50, min(50 // arr.shape[1], 8) * arr.shape[1])
if hasattr(arr, 'todense'):
fLOG( # pragma: no cover
numpy.array2string(arr.todense(), max_line_width=120,
suppress_small=True, threshold=threshold))
else:
fLOG(numpy.array2string(arr, max_line_width=120,
suppress_small=True,
threshold=threshold))
else: # pragma: no cover
s = str(arr)
if len(s) > 50:
s = s[:50] + "..."
fLOG(s)
if verbose >= 2:
for k in sorted(self._global_index):
if values[self._global_index[k]] is None:
continue
obj = values[self._global_index[k]]
if k not in printed:
printed.add(k)
if hasattr(obj, 'shape'):
fLOG("-kv='{}' shape={} dtype={} min={} max={}{}".format(
k, obj.shape, obj.dtype, numpy_min(obj),
numpy_max(obj),
' (sparse)' if isinstance(obj, coo_matrix) else ''))
elif (isinstance(obj, list) and len(obj) > 0 and
not isinstance(obj[0], dict)): # pragma: no cover
fLOG("-kv='{}' list len={} min={} max={}".format(
k, len(obj), min(obj), max(obj)))
else: # pragma: no cover
fLOG("-kv='{}' type={}".format(k, type(obj)))
keys = set(k for k in range(len(values)) if values[k] is not None)
if verbose >= 1:
fLOG("-- OnnxInference: run {} nodes".format(len(self.sequence_)))
for i, node in enumerate(self.sequence_):
if verbose >= 1:
fLOG(node)
if node_time:
t = perf_counter()
node.run(values)
t2 = perf_counter()
mtime.append(dict(i=i, name=node.onnx_node.name,
op_type=node.onnx_node.op_type,
time=t2 - t))
else:
node.run(values)
for k in range(len(values)): # pylint: disable=C0200
if values[k] is None:
continue
if k not in keys and k not in printed:
printed.add(k)
name = list(
name for name in self._global_index if self._global_index[name] == k)
if isinstance(values[k], (numpy.ndarray, coo_matrix)):
name = name[0]
mini = numpy_min(values[k])
maxi = numpy_max(values[k])
fLOG("+kr='{}': {} (dtype={} min={} max={}{})".format(
name, values[k].shape, values[k].dtype,
mini, maxi,
' sparse' if isinstance(values[k], coo_matrix) else ''))
if verbose >= 3:
dispsimple(values[k])
else:
fLOG("+kr='{}': {}".format(
name, type(values[k])))
if verbose >= 3: # pragma: no cover
dispsimple(values[k])
if intermediate:
values = [(v, k, values[v]) for k, v in self._global_index.items()]
values.sort()
values = OrderedDict((k, v) for _, k, v in values)
return (values, mtime) if node_time else values
try:
res = {k: values[self._global_index[k]] for k in self.outputs_}
except KeyError as e: # pragma: no cover
raise RuntimeError("Unable to find one output [{}]\n in [{}]"
".".format(", ".join(sorted(self.outputs_)),
", ".join(sorted(values)))) from e
return (res, mtime) if node_time else res
[docs] def _run_whole_runtime(self, inputs, clean_right_away=False,
intermediate=False, verbose=0, node_time=False,
fLOG=None):
# node_time is unused
if clean_right_away:
raise RuntimeError( # pragma: no cover
"clean_right_away=true does not work with this runtime.")
if intermediate:
if hasattr(self, "intermediate_onnx_inference_"):
inter_run = self.intermediate_onnx_inference_ # pylint: disable=E0203
else:
if verbose > 0:
fLOG("-- OnnxInference: build intermediate")
inter_run = self.build_intermediate()
self.intermediate_onnx_inference_ = inter_run
graph = self.to_sequence()
self.inits_ = graph['inits']
if verbose >= 1:
fLOG("-- OnnxInference: run {} nodes".format(
len(self.intermediate_onnx_inference_)))
values = OrderedDict(inputs)
for k, v in self.inits_.items():
values[k] = v['value']
if verbose >= 2: # pragma: no cover
for k in sorted(values):
fLOG("-k='{}' shape={} dtype={}".format(
k, values[k].shape, values[k].dtype))
for node, oinf in self.intermediate_onnx_inference_.items():
if verbose >= 1:
fLOG(node)
if verbose >= 2: # pragma: no cover
fLOG(oinf.obj)
output = oinf.run(inputs)[node]
values[node] = output
if verbose >= 1:
if isinstance(output, numpy.ndarray):
fLOG("+k='{}': {} (dtype={})".format(
k, output.shape, output.dtype))
else:
fLOG("+k='{}': {}".format( # pragma: no cover
k, type(output)))
return values
if verbose != 0:
warnings.warn(
"verbose option not implemented if runtime is 'onnxruntime1'")
res = self._whole.run(inputs)
return {k: v for k, v in zip(self.outputs_, res)}
[docs] def __getitem__(self, item):
"""
Returns the ONNX verions of a node.
:githublink:`%|py|800`
"""
if isinstance(item, tuple):
node_name, att_name = item
else:
node_name = item
att_name = None
node_ = None
for node in self.obj.graph.node:
if node.name == node_name:
node_ = node
break
if node_ is None:
raise IndexError( # pragma: no cover
"Unable to get node name '{}'.\n{}".format(
node_name, "\n".join(node.name for node in self.obj.graph.node)))
if att_name is None:
return node_
for att in node_.attribute:
if att.name == att_name:
return att
raise IndexError("Unable to find attribute '{}' from node '{}'.".format( # pragma: no cover
att_name, node_name))
[docs] def switch_initializers_dtype(self, model=None,
dtype_in=numpy.float32,
dtype_out=numpy.float64):
"""
Switches all initializers to ``numpy.float64``. If *model*
is None, a simple cast is done. Otherwise, the function assumes
the model is a :epkg:`scikit-learn` pipeline.
This only works if the runtime is ``'python'``.
:param model: :epkg:`scikit-learn` model or None
:param dtype_in: previous type
:param dtype_out: next type
:return: done operations
:githublink:`%|py|841`
"""
from .optim.sklearn_helper import enumerate_fitted_arrays, pairwise_array_distances
if self.runtime != 'python': # pragma: no cover
raise RuntimeError("Initializers can be casted only if the "
"runtime is 'python' not '{}'.".format(self.runtime))
if hasattr(self, '_values_init'):
del self._values_init
# first pass: simple cast
done = []
initializer = self.inits_
for k, v in initializer.items():
if isinstance(v['value'], numpy.ndarray):
if v['value'].dtype == dtype_in:
v['value'] = v['value'].astype(dtype_out)
done.append(("pass1", "+", "init", k, v['value']))
else:
done.append(("pass1", "-", "init", k,
v['value'])) # pragma: no cover
for k, v in self.graph_['nodes'].items():
res = v.switch_initializers_dtype(dtype_in=dtype_in,
dtype_out=dtype_out)
for r in res:
done.append(("pass1", "node", k) + r)
for k, v in self.graph_['intermediate'].items():
if v is None:
continue
res = v.switch_initializers_dtype(dtype_in=dtype_in,
dtype_out=dtype_out)
for r in res:
done.append(("pass1", "sub", k) + r)
if model is not None:
# Second pass, we compare all arrays from the model
# to the arrays in the converted models.
def dist(a):
cast = a.astype(dtype_in).astype(dtype_out)
d = pairwise_array_distances([cast], [a])[0, 0]
return d
done_ = [(c, c[-1]) for c in done]
moda_ = [(a, a[-2][-1]) for a in enumerate_fitted_arrays(model)
if dist(a[-2][-1]) > 0]
aconv = [_[-1] for _ in done_]
amoda = [_[-1] for _ in moda_]
distances = pairwise_array_distances(aconv, amoda)
for i in range(distances.shape[0]):
j = numpy.argmin(distances[i])
d = distances[i, j]
if d < 0.1:
numpy.copyto(aconv[i], amoda[j])
done.append(("pass2", d) + done_[i][0])
return done
[docs] def _set_shape_inference_runtime(self):
"""
Set shapes based on shape inference
relying on the runtime.
The values are stored in every node.
:githublink:`%|py|904`
"""
if not hasattr(self, 'sequence_') or not hasattr(self, 'inputs_'):
raise RuntimeError( # pragma: no cover
"This method only works if the runtime is 'python' not "
"'{}'.".format(self.runtime))
values = OrderedDict()
for k, v in self.inputs_.items():
# The function assumes the first dimension is unknown
# and is the batch size.
values[k] = ShapeObject(v, use_n1=True, name=k)
for k, v in self.inits_.items():
values[k] = ShapeObject(v['value'], name=k)
last = None
for i, node in enumerate(self.sequence_):
try:
s = node._set_shape_inference_runtime(values)
last = s
except IndexError as e: # pragma: no cover
rows = []
if last is not None:
for k, v in last.items():
rows.append("{}: {}".format(k, v))
for k in range(i + 1):
rows.append("{} --> {}".format(k, self.sequence_[k]))
raise RuntimeError("Unable to infer shape of node {}\n{}".format(
i, '\n'.join(rows))) from e
return values
[docs] def _guess_inplace(self, input_inplace=False):
"""
Looks into every node of the graph to see
if there is a way to do the computation
inplace. By default (*input_inplace=False*),
the function assumes inputs cannot be modified
so the first node cannot do inplace computation.
This function only works with the python runtime.
:param input_inplace: the computation is allowed
to overwrite the input
This function checks that one node is used only
once and then can be modified by the next node.
Nodes `A`, `C` can be overwritten by the computation.
Node `B` cannot as it is used by two nodes.
.. blockdiag::
diagram {
A -> B -> C -> E;
B -> D;
}
It does not handle specific case such node `B` being
overwritten by node `C` but without changing its shape
and node `D` only needs the shape of `B`. Then `B` could
be overwritten as well.
:githublink:`%|py|960`
"""
forbid = {}
values = OrderedDict()
for k in self.inputs_:
values[k] = dict(inplace=input_inplace, to=[], fr=[])
for k in self.inits_:
values[k] = dict(inplace=False, to=[], fr=[])
for node in self.sequence_:
for n in node.inputs:
values[n]['to'].append(node)
for n in node.outputs:
if node.op_type == 'Constant':
# We cannot modify constant.
forbid[n] = node
if n not in values:
values[n] = dict(inplace=None, to=[], fr=[])
values[n]['fr'].append(node)
# checks the number of outputs
outputs = set(self.output_names)
modif = 1
while modif > 0:
modif = 0
for n, v in values.items():
if v['inplace'] is not None:
continue
if n in forbid:
continue
if len(v['to']) == 1:
v['inplace'] = True
modif += 1
# convey the information to every node
inplaces = {}
for n, v in values.items():
if v['inplace']:
inplaces[n] = v
for node in v['to']:
if n in outputs:
continue
node.enable_inplace_compute(n)
return inplaces
[docs] def _build_compile_run(self, debug=False):
"""
Rewrite the run function in python,
compiles it, and adds it as a method.
:param debug: insert debugging code
:return: method name, callable object
.. exref::
:title: Run a model with runtime 'python_compiled'
The following code trains a model and compute
the predictions with runtime ``'python_compiled'``.
It converts the onnx graph into a python function
which calls every operator. Its code is printed
below.
.. runpython::
:showcode:
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.ensemble import AdaBoostClassifier
from sklearn.tree import DecisionTreeClassifier
from skl2onnx import to_onnx
from mlprodict.onnxrt import OnnxInference
iris = load_iris()
X, y = iris.data, iris.target
X_train, X_test, y_train, __ = train_test_split(X, y, random_state=11)
y_train = y_train.astype(numpy.float32)
clr = AdaBoostClassifier(
base_estimator=DecisionTreeClassifier(max_depth=3),
n_estimators=3)
clr.fit(X_train, y_train)
model_def = to_onnx(clr, X_train.astype(numpy.float32),
target_opset=12)
oinf2 = OnnxInference(model_def, runtime='python_compiled')
print(oinf2.run({'X': X_test[:5]}))
# prints out the python function equivalent
# to the onnx graph
print(oinf2)
:githublink:`%|py|1049`
"""
def clean_name(name):
return name.replace(":", "_").replace('.', '_').replace('/', '_')
# inits
inputs = self.input_names
code = ['def compiled_run(dict_inputs):']
if debug:
code.append(" printed = {}")
context = {}
for k, v in self.inits_.items():
if k.startswith("_OPT_"):
raise RuntimeError( # pragma: no cover
"The runtime cannot handle any constant name "
"starting with '_OPT_': '{}'.".format(k))
if k in inputs:
context["_OPT_" + k] = v['value']
code.append(" # init: _OPT_{0}".format(k))
if debug:
code.append(
" debug_print('c.[_OPT_{0}]', _OPT_{0}, printed)".format(k))
else:
context[k] = v['value']
code.append(" # init: {0}".format(k))
if debug:
code.append(
" debug_print('c.[{0}]', {0}, printed)".format(k))
# method signature
code.append(" # inputs")
for inp in inputs:
if '_OPT_' + inp in context:
# optional inputs
code.append(
" {0} = dict_inputs.get('{1}', _OPT_{0})".format(
clean_name(inp), inp))
else:
code.append(" {0} = dict_inputs['{1}']".format(
clean_name(inp), inp))
if debug:
code.append(
" debug_print('i.{0}', {1}, printed)".format(
clean_name(inp), inp))
# code
for i, node in enumerate(self.sequence_):
name = "n{}_{}".format(i, node.ops_.__class__.__name__.lower())
context[name] = node.ops_._run
code.append(' ({1}, ) = {2}({0})'.format(
', '.join(map(clean_name, node.inputs)),
', '.join(map(clean_name, node.outputs)),
name))
if debug:
code.append(" print('''# {}''')".format(code[-1][4:]))
for o in node.outputs:
code.append(
" debug_print('o.{0}', {1}, printed)".format(
clean_name(o), o))
# return
code.append(' return {')
for out in self.output_names:
code.append(" '{1}': {0},".format(
clean_name(out), out))
code.append(' }')
final_code = '\n'.join(code)
# compile the outcome
context['self'] = self
try:
obj = compile(final_code, "<string>", 'exec')
except SyntaxError as e: # pragma: no cover
raise SyntaxError(
"Unable to compile\n#####\n{}".format(final_code)) from e
fcts_obj = [_ for _ in obj.co_consts
if _ is not None and not isinstance(_, (bool, str, int))]
fct = make_callable(
"compiled_run", fcts_obj[0], final_code, context, debug)
# end
return "compiled_run", fct, final_code
[docs] def reduce_size(self, pickable=False):
"""
Reduces the memory footprint as much as possible.
:param pickable: keeps a pickle object?
:githublink:`%|py|1136`
"""
import gc
del self.graph_
if not pickable:
del self.obj
if self.runtime in ('python_compiled', 'python_compiled_debug'):
del self.sequence_
gc.collect()