Source code for mlprodict.onnxrt.validate.side_by_side

"""
Helpers to compare executions.


:githublink:`%|py|5`
"""
import copy
from .validate_difference import measure_relative_difference


[docs]def side_by_side_by_values(sessions, *args, inputs=None, **kwargs): """ Compares the execution of two sessions. It calls method :meth:`OnnxInference.run <mlprodict.onnxrt.onnx_inference.OnnxInference.run>` with value ``intermediate=True`` and compares the results. :param sessions: list of class :class:`OnnxInference <mlprodict.onnxrt.onnx_inference.OnnxInference>` :param inputs: inputs :param args: additional parameters for :meth:`OnnxInference.run <mlprodict.onnxrt.onnx_inference.OnnxInference.run` :param kwargs: additional parameters for :meth:`OnnxInference.run <mlprodict.onnxrt.onnx_inference.OnnxInference.run` :return: list of dictionaries The first session is considered as the baseline. See notebook :ref:`onnxsbsrst` for an example. If *inputs* is None, the function assumes *sessions* is a list of *tuple(sessions, inputs)* because sometimes inputs must be different. :githublink:`%|py|31` """ if not kwargs.get('intermediate', True): raise ValueError( # pragma: no cover "kwargs must not set intermediate to True") kwargs['intermediate'] = True verbose = kwargs.get('verbose', 0) fLOG = kwargs.get('fLOG', None) # run results = [] for i, sess in enumerate(sessions): if isinstance(sess, tuple) and inputs is None: new_sess, new_inputs = sess elif isinstance(inputs, list): new_sess = sess new_inputs = inputs[i] else: new_sess = sess new_inputs = copy.deepcopy(inputs) if verbose > 0 and fLOG: fLOG( # pragma: no cover '[side_by_side_by_values] run session {}/{}'.format( i + 1, len(sessions))) res = new_sess.run(new_inputs, *args, **kwargs) results.append([(k, v) for k, v in res.items()]) # same number of results? rows = [] row = {"metric": "nb_results", 'step': -1} for i, res in enumerate(results): row["v[%d]" % i] = len(res) mnd = min(map(len, results)) mxd = max(map(len, results)) row['cmp'] = 'OK' if mnd == mxd else '!=' rows.append(row) merged = merge_results(results) # analysis for i in range(len(merged)): # pylint: disable=C0200 row = {'step': i} name, res_row = merged[i] row['name'] = name row['metric'] = 'abs-diff' vals = [] for j, r in enumerate(res_row): row['value[%d]' % j] = r if hasattr(r, 'shape'): row['shape[%d]' % j] = r.shape if j == 0: row['v[%d]' % j] = 0 elif res_row[0] is not None and r is not None: v = measure_relative_difference(res_row[0], r) row['v[%d]' % j] = v vals.append(v) if len(vals) > 0: diff = max(vals) if diff < 1e-5: row['cmp'] = 'OK' elif diff < 0.0001: row['cmp'] = 'e<0.0001' # pragma: no cover elif diff < 0.001: row['cmp'] = 'e<0.001' # pragma: no cover elif diff < 0.01: row['cmp'] = 'e<0.01' # pragma: no cover elif diff < 0.1: row['cmp'] = 'e<0.1' # pragma: no cover else: row['cmp'] = "ERROR->=%1.1f" % diff rows.append(row) return rows
[docs]def merge_results(results): """ Merges results by name. The first ones are used to keep the order. :githublink:`%|py|110` """ # matrix of names rows = [(k, []) for k, _ in results[0]] positions = {k[0]: i for i, k in enumerate(rows)} todos = [] for result in results: todo = [] for row in rows: row[1].append(None) for i, (k, v) in enumerate(result): pos = positions.get(k, None) if pos is None: todo.append((i, k, v)) else: rows[pos][1][-1] = (v, i) todos.append(todo) # left over if len(todos) > 0: for i, todo in enumerate(todos): if len(todo) == 0: continue for pos, name, val in todo: pos1 = pos + 1 found = -1 for ik, row in enumerate(rows): if row[1][i] is not None and row[1][i][1] == pos1: found = ik break vv = [None] * len(results) if found == -1: vv[i] = (val, len(rows)) rows.append((name, vv)) else: vv[i] = (val, pos) rows.insert(found, (name, vv)) # final final = [] for row in rows: nrow = (row[0], [_ if _ is None else _[0] for _ in row[1]]) final.append(nrow) return final