Source code for mlprodict.onnxrt.validate.validate_benchmark

"""
Measures time processing for ONNX models.


:githublink:`%|py|5`
"""
import numpy
from sklearn import __all__ as sklearn__all__, __version__ as sklearn_version
from ... import __version__ as ort_version
from .validate_helper import default_time_kwargs, measure_time


[docs]def make_n_rows(x, n, y=None): """ Multiplies or reduces the rows of x to get exactly *n* rows. :param x: matrix :param n: number of rows :param y: target (optional) :return: new matrix or two new matrices if y is not None :githublink:`%|py|20` """ if n < x.shape[0]: if y is None: return x[:n].copy() return x[:n].copy(), y[:n].copy() if len(x.shape) < 2: r = numpy.empty((n, ), dtype=x.dtype) if y is not None: ry = numpy.empty((n, ), dtype=y.dtype) # pragma: no cover for i in range(0, n, x.shape[0]): end = min(i + x.shape[0], n) r[i: end] = x[0: end - i] if y is not None: ry[i: end] = y[0: end - i] # pragma: no cover else: r = numpy.empty((n, x.shape[1]), dtype=x.dtype) if y is not None: if len(y.shape) < 2: ry = numpy.empty((n, ), dtype=y.dtype) else: ry = numpy.empty((n, y.shape[1]), dtype=y.dtype) for i in range(0, n, x.shape[0]): end = min(i + x.shape[0], n) try: r[i: end, :] = x[0: end - i, :] except ValueError as e: # pragma: no cover raise ValueError( "Unexpected error: r.shape={} x.shape={} end={} i={}".format( r.shape, x.shape, end, i)) from e if y is not None: if len(y.shape) < 2: ry[i: end] = y[0: end - i] else: ry[i: end, :] = y[0: end - i, :] if y is None: return r return r, ry
[docs]def benchmark_fct(fct, X, time_limit=4, obs=None, node_time=False, time_kwargs=None, skip_long_test=True): """ Benchmarks a function which takes an array as an input and changes the number of rows. :param fct: function to benchmark, signature is `fct(xo)` :param X: array :param time_limit: above this time, measurement is stopped :param obs: all information available in a dictionary :param node_time: measure time execution for each node in the graph :param time_kwargs: to define a more precise way to measure a model :param skip_long_test: skips tests for high values of N if they seem too long :return: dictionary with the results The function uses *obs* to reduce the number of tries it does. :epkg:`sklearn:gaussian_process:GaussianProcessRegressor` produces huge *NxN* if predict method is called with ``return_cov=True``. The default for *time_kwargs* is the following: .. runpython:: :showcode: from mlprodict.onnxrt.validate.validate_helper import default_time_kwargs import pprint pprint.pprint(default_time_kwargs()) See also notebook :ref:`onnxnodetimerst` to see how this function can be used to measure time spent in each node. :githublink:`%|py|90` """ if time_kwargs is None: time_kwargs = default_time_kwargs() # pragma: no cover def make(x, n): return make_n_rows(x, n) def allow(N, obs): if obs is None: return True # pragma: no cover prob = obs['problem'] if "-cov" in prob and N > 1000: return False # pragma: no cover return True Ns = list(sorted(time_kwargs)) res = {} for N in Ns: if not isinstance(N, int): raise RuntimeError( # pragma: no cover "time_kwargs ({}) is wrong:\n{}".format( type(time_kwargs), time_kwargs)) if not allow(N, obs): continue # pragma: no cover x = make(X, N) number = time_kwargs[N]['number'] repeat = time_kwargs[N]['repeat'] if node_time: fct(x) main = None for __ in range(repeat): agg = None for _ in range(number): ms = fct(x)[1] if agg is None: agg = ms for row in agg: row['N'] = N else: if len(agg) != len(ms): raise RuntimeError( # pragma: no cover "Not the same number of nodes {} != {}.".format(len(agg), len(ms))) for a, b in zip(agg, ms): a['time'] += b['time'] if main is None: main = agg else: if len(agg) != len(main): raise RuntimeError( # pragma: no cover "Not the same number of nodes {} != {}.".format(len(agg), len(main))) for a, b in zip(main, agg): a['time'] += b['time'] a['max_time'] = max( a.get('max_time', b['time']), b['time']) a['min_time'] = min( a.get('min_time', b['time']), b['time']) for row in main: row['repeat'] = repeat row['number'] = number row['time'] /= repeat * number if 'max_time' in row: row['max_time'] /= number row['min_time'] /= number else: row['max_time'] = row['time'] row['min_time'] = row['time'] res[N] = main else: res[N] = measure_time(fct, x, repeat=repeat, number=number, div_by_number=True) if (skip_long_test and not node_time and res[N] is not None and res[N].get('total', time_limit) >= time_limit): # too long break # pragma: no cover if node_time: rows = [] for _, v in res.items(): rows.extend(v) return rows return res