"""
Functions which converts :epkg:`ONNX` object into
readable :epkg:`python` objects.
:githublink:`%|py|6`
"""
import pprint
import warnings
import numpy
from scipy.sparse import coo_matrix
from onnx import onnx_pb as onnx_proto, TensorProto
from onnx.numpy_helper import to_array, from_array
[docs]def to_bytes(val):
"""
Converts an array into protobuf and then into bytes.
:param val: array
:return: bytes
.. exref::
:title: Converts an array into bytes (serialization)
Useful to serialize.
.. runpython::
:showcode:
import numpy
from mlprodict.onnxrt.onnx2py_helper import to_bytes
data = numpy.array([[0, 1], [2, 3], [4, 5]], dtype=numpy.float32)
pb = to_bytes(data)
print(len(pb), data.size * data.itemsize, pb[:10])
:githublink:`%|py|35`
"""
if isinstance(val, numpy.ndarray):
pb = from_array(val)
else:
pb = val # pragma: no cover
return pb.SerializeToString()
[docs]def from_bytes(b):
"""
Retrieves an array from bytes then protobuf.
:param b: bytes
:return: array
.. exref::
:title: Converts bytes into an array (serialization)
Useful to deserialize.
.. runpython::
:showcode:
import numpy
from mlprodict.onnxrt.onnx2py_helper import to_bytes, from_bytes
data = numpy.array([[0, 1], [2, 3], [4, 5]], dtype=numpy.float32)
pb = to_bytes(data)
data2 = from_bytes(pb)
print(data2)
:githublink:`%|py|65`
"""
if isinstance(b, bytes):
pb = TensorProto()
pb.ParseFromString(b)
else:
pb = b # pragma: no cover
return to_array(pb)
[docs]def _numpy_array(data, dtype=None, copy=True):
"""
Single function to create an array.
:param data: data
:param dtype: dtype
:param copy: copy
:return: numpy array
:githublink:`%|py|82`
"""
if isinstance(data, numpy.ndarray):
res = data
else:
res = numpy.array(data, dtype=dtype, copy=copy)
return res
[docs]def _sparse_array(shape, data, indices, dtype=None, copy=True):
"""
Single function to create an sparse array
(:epkg:`coo_matrix`).
:param shape: shape
:param data: data
:param indices: indices
:param dtype: dtype
:param copy: copy
:return: :epkg:`coo_matrix`
:githublink:`%|py|101`
"""
if len(shape) != 2:
raise ValueError( # pragma: no cover
"Only matrices are allowed or sparse matrices "
"but shape is {}.".format(shape))
rows = numpy.array([i // shape[1] for i in indices])
cols = numpy.array([i % shape[1] for i in indices])
if isinstance(data, numpy.ndarray):
res = coo_matrix((data, (rows, cols)), dtype=dtype)
else:
res = coo_matrix( # pragma: no cover
(numpy.array(data, dtype=dtype, copy=copy),
(rows, cols)), dtype=dtype)
return res
[docs]def _elem_type_as_str(elem_type):
if elem_type == onnx_proto.TensorProto.FLOAT: # pylint: disable=E1101
return 'float'
if elem_type == onnx_proto.TensorProto.BOOL: # pylint: disable=E1101
return 'bool'
if elem_type == onnx_proto.TensorProto.DOUBLE: # pylint: disable=E1101
return 'double'
if elem_type == onnx_proto.TensorProto.STRING: # pylint: disable=E1101
return 'str'
if elem_type == onnx_proto.TensorProto.INT64: # pylint: disable=E1101
return 'int64'
if elem_type == onnx_proto.TensorProto.INT32: # pylint: disable=E1101
return 'int32'
if elem_type == onnx_proto.TensorProto.UINT8: # pylint: disable=E1101
return 'uint8'
if elem_type == onnx_proto.TensorProto.INT8: # pylint: disable=E1101
return 'int8'
if elem_type == onnx_proto.TensorProto.FLOAT16: # pylint: disable=E1101
return 'float16'
# The following code should be refactored.
selem = str(elem_type)
if selem.startswith("tensor_type"):
this = elem_type.tensor_type
et = _elem_type_as_str(this.elem_type)
shape = this.shape
dim = shape.dim
dims = [d.dim_value for d in dim]
if len(dims) == 0:
dims = '?'
return {'kind': 'tensor', 'elem': et, 'shape': shape}
if selem.startswith("map_type"):
this = elem_type.map_type
kt = _elem_type_as_str(this.key_type)
vt = _elem_type_as_str(this.value_type)
return {'kind': 'map', 'key': kt, 'value': vt}
raise NotImplementedError( # pragma: no cover
"elem_type '{}' is unknown\nfields:\n{}\n-----\n{}.".format(
elem_type, pprint.pformat(dir(elem_type)), type(elem_type)))
[docs]def _to_array(var):
try:
data = to_array(var)
except ValueError as e: # pragma: no cover
dims = [d for d in var.dims]
if var.data_type == 1 and var.float_data is not None:
try:
data = _numpy_array(var.float_data, dtype=numpy.float32,
copy=False).reshape(dims)
except ValueError:
data = _numpy_array(to_array(var))
elif var.data_type == 11 and var.double_data is not None:
try:
data = _numpy_array(var.double_data, dtype=numpy.float64,
copy=False).reshape(dims)
except ValueError:
data = _numpy_array(to_array(var))
elif var.data_type == 6 and var.int32_data is not None:
data = _numpy_array(var.int32_data, dtype=numpy.int32,
copy=False).reshape(dims)
elif var.data_type == 7 and var.int64_data is not None:
data = _numpy_array(var.int64_data, dtype=numpy.int64,
copy=False).reshape(dims)
elif var.data_type == 10 and var.float16_data is not None:
data = _numpy_array(var.float16_data, dtype=numpy.float16,
copy=False).reshape(dims)
else:
raise NotImplementedError(
"Iniatilizer {} cannot be converted into a dictionary.".format(var)) from e
return data
[docs]def _var_as_dict(var):
"""
Converts a protobuf object into something readable.
The current implementation relies on :epkg:`json`.
That's not the most efficient way.
:githublink:`%|py|198`
"""
if hasattr(var, 'type') and str(var.type) != '':
# variable
if var.type is not None:
if hasattr(var, 'sparse_tensor') and var.type == 11:
# sparse tensor
t = var.sparse_tensor
values = _var_as_dict(t.values)
dims = list(t.dims)
dtype = dict(kind='sparse_tensor', shape=tuple(dims), elem=1)
elif hasattr(var.type, 'tensor_type') and var.type.tensor_type.elem_type > 0:
t = var.type.tensor_type
elem_type = _elem_type_as_str(t.elem_type)
shape = t.shape
dim = shape.dim
dims = [d.dim_value for d in dim]
if len(dims) == 0:
dims = '?'
dtype = dict(kind='tensor', elem=elem_type,
shape=tuple(dims))
elif hasattr(var.type, 'real') and var.type.real == 5 and hasattr(var, 'g'):
dtype = dict(kind='graph', elem=var.type.real)
elif hasattr(var.type, 'real') and var.type.real == 4 and hasattr(var, 't'):
dtype = dict(kind='tensor', elem=var.type.real)
elif hasattr(var.type, 'real'):
dtype = dict(kind='real', elem=var.type.real)
elif (hasattr(var.type, "sequence_type") and var.type.sequence_type is not None and
str(var.type.sequence_type.elem_type) != ''):
t = var.type.sequence_type
elem_type = _elem_type_as_str(t.elem_type)
dtype = dict(kind='sequence', elem=elem_type)
elif (hasattr(var.type, "map_type") and var.type.map_type is not None and
str(var.type.map_type.key_type) != '' and
str(var.type.map_type.value_type) != ''):
t = var.type.map_type
key_type = _elem_type_as_str(t.key_type)
value_type = _elem_type_as_str(t.value_type)
dtype = dict(kind='map', key=key_type, value=value_type)
else:
raise NotImplementedError( # pragma: no cover
"Unable to convert a type into a dictionary for '{}'. "
"Available fields: {}.".format(
var.type, pprint.pformat(dir(var.type))))
else:
raise NotImplementedError( # pragma: no cover
"Unable to convert variable into a dictionary for '{}'. "
"Available fields: {}.".format(
var, pprint.pformat(dir(var.type))))
res = dict(name=var.name, type=dtype)
if (hasattr(var, 'sparse_tensor') and dtype.get('elem', None) == 1 and
dtype['kind'] == 'sparse_tensor'):
# sparse matrix
t = var.sparse_tensor
try:
values = _var_as_dict(t.values)
except NotImplementedError as e: # pragma: no cover
raise NotImplementedError(
"Issue with\n{}\n---".format(var)) from e
indices = _var_as_dict(t.indices)
res['value'] = _sparse_array(
dtype['shape'], values['value'], indices['value'], dtype=numpy.float32)
elif hasattr(var, 'floats') and dtype.get('elem', None) == 6:
res['value'] = _numpy_array(var.floats, dtype=numpy.float32)
elif hasattr(var, 'strings') and dtype.get('elem', None) == 8:
res['value'] = _numpy_array(var.strings)
elif hasattr(var, 'ints') and dtype.get('elem', None) == 7:
res['value'] = _numpy_array(var.ints)
elif hasattr(var, 'f') and dtype.get('elem', None) == 1:
res['value'] = var.f
elif hasattr(var, 's') and dtype.get('elem', None) == 3:
res['value'] = var.s
elif hasattr(var, 'i') and dtype.get('elem', None) == 2:
res['value'] = var.i
elif hasattr(var, 'g') and dtype.get('elem', None) == 5:
res['value'] = var.g
elif hasattr(var, 't') and dtype.get('elem', None) == 4:
ts = _var_as_dict(var.t)
res['value'] = ts['value']
elif hasattr(var, 'sparse_tensor') and dtype.get('elem', None) == 11:
ts = _var_as_dict(var.sparse_tensor)
res['value'] = ts['value']
elif "'value'" in str(var):
warnings.warn("No value: {} -- {}".format( # pragma: no cover
dtype, str(var).replace("\n", "").replace(" ", "")))
return res
if hasattr(var, 'op_type'):
if hasattr(var, 'attribute'):
atts = {}
for att in var.attribute:
atts[att.name] = _var_as_dict(att)
return dict(name=var.name, op_type=var.op_type,
domain=var.domain, atts=atts)
if hasattr(var, 'dims') and len(var.dims) > 0:
# initializer
data = _to_array(var)
return dict(name=var.name, value=data)
if hasattr(var, 'data_type') and var.data_type > 0:
data = _to_array(var)
return dict(name=var.name, value=data)
raise NotImplementedError( # pragma: no cover
"Unable to guess which object it is.\n{}\n---".format(var))
[docs]def _type_to_string(dtype):
"""
Converts a type into a readable string.
:githublink:`%|py|307`
"""
if not isinstance(dtype, dict):
dtype_ = _var_as_dict(dtype) # pragma: no cover
else:
dtype_ = dtype
if dtype_["kind"] == 'tensor':
return "{0}({1})".format(dtype_['elem'], dtype_['shape'])
if dtype_['kind'] == 'sequence':
return "[{0}]".format(_type_to_string(dtype_['elem']))
if dtype_["kind"] == 'map':
return "{{{0}, {1}}}".format(dtype_['key'], dtype_['value'])
raise NotImplementedError( # pragma: no cover
"Unable to convert into string {} or {}.".format(dtype, dtype_))
[docs]def numpy_min(x):
"""
Returns the minimum of an array.
Deals with text as well.
:githublink:`%|py|326`
"""
try:
if hasattr(x, 'todense'):
x = x.todense()
if x.dtype.kind not in 'cUC':
return x.min()
try: # pragma: no cover
x = x.ravel()
except AttributeError: # pragma: no cover
pass
keep = list(filter(lambda s: isinstance(s, str), x))
if len(keep) == 0: # pragma: no cover
return numpy.nan
keep.sort()
val = keep[0]
if len(val) > 10: # pragma: no cover
val = val[:10] + '...'
return "%r" % val
except (ValueError, TypeError): # pragma: no cover
return '?'
[docs]def numpy_max(x):
"""
Returns the maximum of an array.
Deals with text as well.
:githublink:`%|py|352`
"""
try:
if hasattr(x, 'todense'):
x = x.todense()
if x.dtype.kind not in 'cUC':
return x.max()
try: # pragma: no cover
x = x.ravel()
except AttributeError: # pragma: no cover
pass
keep = list(filter(lambda s: isinstance(s, str), x))
if len(keep) == 0: # pragma: no cover
return numpy.nan
keep.sort()
val = keep[-1]
if len(val) > 10: # pragma: no cover
val = val[:10] + '...'
return "%r" % val
except (ValueError, TypeError): # pragma: no cover
return '?'