"""
Validates runtime for many :scikit-learn: operators.
The submodule relies on :epkg:`onnxconverter_common`,
:epkg:`sklearn-onnx`.
:githublink:`%|py|7`
"""
import pprint
from inspect import signature
import numpy
from numpy.linalg import LinAlgError
import sklearn
from sklearn import __all__ as sklearn__all__, __version__ as sklearn_version
from sklearn.exceptions import ConvergenceWarning
from sklearn.utils._testing import ignore_warnings
from ... import __version__ as ort_version
from ...onnx_conv import to_onnx, register_converters, register_rewritten_operators
from ...tools.model_info import analyze_model, set_random_state
from ...tools.asv_options_helper import (
get_opset_number_from_onnx, get_ir_version_from_onnx)
from ..onnx_inference import OnnxInference
from ..optim.sklearn_helper import inspect_sklearn_model, set_n_jobs
from ..optim.onnx_helper import onnx_statistics
from ..optim import onnx_optimisations
from .validate_problems import find_suitable_problem
from .validate_scenarios import _extra_parameters
from .validate_difference import measure_relative_difference
from .validate_helper import (
_dispsimple, sklearn_operators,
_measure_time, _shape_exc, dump_into_folder,
default_time_kwargs, RuntimeBadResultsError,
_dictionary2str, _merge_options, _multiply_time_kwargs,
_get_problem_data)
from .validate_benchmark import benchmark_fct
[docs]@ignore_warnings(category=(UserWarning, ConvergenceWarning))
def _dofit_model(dofit, obs, inst, X_train, y_train, X_test, y_test,
Xort_test, init_types, store_models,
debug, verbose, fLOG):
if dofit:
if verbose >= 2 and fLOG is not None:
fLOG("[enumerate_compatible_opset] fit, type: '{}' dtype: {}".format(
type(X_train), getattr(X_train, 'dtype', '-')))
try:
set_random_state(inst)
if y_train is None:
t4 = _measure_time(lambda: inst.fit(X_train))[1]
else:
t4 = _measure_time(
lambda: inst.fit(X_train, y_train))[1]
except (AttributeError, TypeError, ValueError,
IndexError, NotImplementedError, MemoryError,
LinAlgError, StopIteration) as e:
if debug:
raise # pragma: no cover
obs["_1training_time_exc"] = str(e)
return False
obs["training_time"] = t4
try:
skl_st = inspect_sklearn_model(inst)
except NotImplementedError:
skl_st = {}
obs.update({'skl_' + k: v for k, v in skl_st.items()})
if store_models:
obs['MODEL'] = inst
obs['X_test'] = X_test
obs['Xort_test'] = Xort_test
obs['init_types'] = init_types
else:
obs["training_time"] = 0.
if store_models:
obs['MODEL'] = inst
obs['init_types'] = init_types
return True
[docs]def _run_skl_prediction(obs, check_runtime, assume_finite, inst,
method_name, predict_kwargs, X_test,
benchmark, debug, verbose, time_kwargs,
skip_long_test, time_kwargs_fact, fLOG):
if not check_runtime:
return None # pragma: no cover
if verbose >= 2 and fLOG is not None:
fLOG("[enumerate_compatible_opset] check_runtime SKL {}-{}-{}-{}-{}".format(
id(inst), method_name, predict_kwargs, time_kwargs,
time_kwargs_fact))
with sklearn.config_context(assume_finite=assume_finite):
# compute sklearn prediction
obs['ort_version'] = ort_version
try:
meth = getattr(inst, method_name)
except AttributeError as e:
if debug:
raise # pragma: no cover
obs['_2skl_meth_exc'] = str(e)
return e
try:
ypred, t4, ___ = _measure_time(
lambda: meth(X_test, **predict_kwargs))
obs['lambda-skl'] = (lambda xo: meth(xo, **predict_kwargs), X_test)
except (ValueError, AttributeError, TypeError, MemoryError, IndexError) as e:
if debug:
raise # pragma: no cover
obs['_3prediction_exc'] = str(e)
return e
obs['prediction_time'] = t4
obs['assume_finite'] = assume_finite
if benchmark and 'lambda-skl' in obs:
obs['bench-skl'] = benchmark_fct(
*obs['lambda-skl'], obs=obs,
time_kwargs=_multiply_time_kwargs(
time_kwargs, time_kwargs_fact, inst),
skip_long_test=skip_long_test)
if verbose >= 3 and fLOG is not None:
fLOG("[enumerate_compatible_opset] scikit-learn prediction")
_dispsimple(ypred, fLOG)
if verbose >= 2 and fLOG is not None:
fLOG("[enumerate_compatible_opset] predictions stored")
return ypred
[docs]def enumerate_compatible_opset(model, opset_min=-1, opset_max=-1, # pylint: disable=R0914
check_runtime=True, debug=False,
runtime='python', dump_folder=None,
store_models=False, benchmark=False,
assume_finite=True, node_time=False,
fLOG=print, filter_exp=None,
verbose=0, time_kwargs=None,
extended_list=False, dump_all=False,
n_features=None, skip_long_test=True,
filter_scenario=None, time_kwargs_fact=None,
time_limit=4, n_jobs=None):
"""
Lists all compatible opsets for a specific model.
:param model: operator class
:param opset_min: starts with this opset
:param opset_max: ends with this opset (None to use
current onnx opset)
:param check_runtime: checks that runtime can consume the
model and compute predictions
:param debug: catch exception (True) or not (False)
:param runtime: test a specific runtime, by default ``'python'``
:param dump_folder: dump information to replicate in case of mismatch
:param dump_all: dump all models not only the one which fail
:param store_models: if True, the function
also stores the fitted model and its conversion
into :epkg:`ONNX`
:param benchmark: if True, measures the time taken by each function
to predict for different number of rows
:param fLOG: logging function
:param filter_exp: function which tells if the experiment must be run,
None to run all, takes *model, problem* as an input
:param filter_scenario: second function which tells if the experiment must be run,
None to run all, takes *model, problem, scenario, extra, options*
as an input
:param node_time: collect time for each node in the :epkg:`ONNX` graph
:param assume_finite: See `config_context
<https://scikit-learn.org/stable/modules/generated/
sklearn.config_context.html>`_, If True, validation for finiteness
will be skipped, saving time, but leading to potential crashes.
If False, validation for finiteness will be performed, avoiding error.
:param verbose: verbosity
:param extended_list: extends the list to custom converters
and problems
:param time_kwargs: to define a more precise way to measure a model
:param n_features: modifies the shorts datasets used to train the models
to use exactly this number of features, it can also
be a list to test multiple datasets
:param skip_long_test: skips tests for high values of N if they seem too long
:param time_kwargs_fact: see :func:`_multiply_time_kwargs <mlprodict.onnxrt.validate.validate_helper._multiply_time_kwargs>`
:param time_limit: to stop benchmarking after this amount of time was spent
:param n_jobs: *n_jobs* is set to the number of CPU by default unless this
value is changed
:return: dictionaries, each row has the following
keys: opset, exception if any, conversion time,
problem chosen to test the conversion...
The function requires :epkg:`sklearn-onnx`.
The outcome can be seen at pages references
by :ref:`l-onnx-availability`.
The parameter *time_kwargs* is a dictionary which defines the
number of times to repeat the same predictions in order
to give more precise figures. The default value (if None) is returned
by the following code:
.. runpython::
:showcode:
from mlprodict.onnxrt.validate.validate_helper import default_time_kwargs
import pprint
pprint.pprint(default_time_kwargs())
Parameter *time_kwargs_fact* multiples these values for some
specific models. ``'lin'`` multiplies by 10 when the model
is linear.
:githublink:`%|py|250`
"""
if opset_min == -1:
opset_min = get_opset_number_from_onnx() # pragma: no cover
if opset_max == -1:
opset_max = get_opset_number_from_onnx() # pragma: no cover
if verbose > 0 and fLOG is not None:
fLOG("[enumerate_compatible_opset] opset in [{}, {}].".format(
opset_min, opset_max))
if verbose > 1 and fLOG:
fLOG("[enumerate_compatible_opset] validate class '{}'.".format(
model.__name__))
if verbose > 2:
fLOG(model)
if time_kwargs is None:
time_kwargs = default_time_kwargs()
problems, extras = _retrieve_problems_extra(
model, verbose, fLOG, extended_list)
if isinstance(problems, dict):
yield problems # pragma: no cover
problems = [] # pragma: no cover
if opset_max is None:
opset_max = get_opset_number_from_onnx() # pragma: no cover
opsets = list(range(opset_min, opset_max + 1)) # pragma: no cover
opsets.append(None) # pragma: no cover
else:
opsets = list(range(opset_min, opset_max + 1))
if extras is None:
problems = []
yield {'name': model.__name__, 'skl_version': sklearn_version,
'_0problem_exc': 'SKIPPED'}
if not isinstance(n_features, list):
n_features = [n_features]
for prob in problems:
if filter_exp is not None and not filter_exp(model, prob):
continue
for n_feature in n_features:
if verbose >= 2 and fLOG is not None:
fLOG("[enumerate_compatible_opset] problem={} n_feature={}".format(
prob, n_feature))
(X_train, X_test, y_train,
y_test, Xort_test,
init_types, conv_options, method_name,
output_index, dofit, predict_kwargs) = _get_problem_data(prob, n_feature)
for scenario_extra in extras:
subset_problems = None
optimisations = None
new_conv_options = None
if len(scenario_extra) > 2:
options = scenario_extra[2]
if isinstance(options, dict):
subset_problems = options.get('subset_problems', None)
optimisations = options.get('optim', None)
new_conv_options = options.get('conv_options', None)
else:
subset_problems = options
if subset_problems and isinstance(subset_problems, (list, set)):
if prob not in subset_problems:
# Skips unrelated problem for a specific configuration.
continue
elif subset_problems is not None:
raise RuntimeError( # pragma: no cover
"subset_problems must be a set or a list not {}.".format(
subset_problems))
try:
scenario, extra = scenario_extra[:2]
except TypeError as e: # pragma: no cover
raise TypeError(
"Unable to interpret 'scenario_extra'\n{}".format(
scenario_extra)) from e
if optimisations is None:
optimisations = [None]
if new_conv_options is None:
new_conv_options = [{}]
if (filter_scenario is not None and
not filter_scenario(model, prob, scenario,
extra, new_conv_options)):
continue
if verbose >= 2 and fLOG is not None:
fLOG("[enumerate_compatible_opset] ##############################")
fLOG("[enumerate_compatible_opset] scenario={} optim={} extra={} dofit={} (problem={})".format(
scenario, optimisations, extra, dofit, prob))
# training
obs = {'scenario': scenario, 'name': model.__name__,
'skl_version': sklearn_version, 'problem': prob,
'method_name': method_name, 'output_index': output_index,
'fit': dofit, 'conv_options': conv_options,
'idtype': Xort_test.dtype, 'predict_kwargs': predict_kwargs,
'init_types': init_types, 'inst': extra if extra else None,
'n_features': X_train.shape[1] if len(X_train.shape) == 2 else 1}
inst = None
extra = set_n_jobs(model, extra, n_jobs=n_jobs)
try:
inst = model(**extra)
except TypeError as e: # pragma: no cover
if debug: # pragma: no cover
raise
if "__init__() missing" not in str(e):
raise RuntimeError(
"Unable to instantiate model '{}'.\nextra=\n{}".format(
model.__name__, pprint.pformat(extra))) from e
yield obs.copy()
continue
if not _dofit_model(dofit, obs, inst, X_train, y_train, X_test, y_test,
Xort_test, init_types, store_models,
debug, verbose, fLOG):
yield obs.copy()
continue
# statistics about the trained model
skl_infos = analyze_model(inst)
for k, v in skl_infos.items():
obs['fit_' + k] = v
# runtime
ypred = _run_skl_prediction(
obs, check_runtime, assume_finite, inst,
method_name, predict_kwargs, X_test,
benchmark, debug, verbose, time_kwargs,
skip_long_test, time_kwargs_fact, fLOG)
if isinstance(ypred, Exception):
yield obs.copy()
continue
for run_obs in _call_conv_runtime_opset(
obs=obs.copy(), opsets=opsets, debug=debug,
new_conv_options=new_conv_options,
model=model, prob=prob, scenario=scenario,
extra=extra, extras=extras, conv_options=conv_options,
init_types=init_types, inst=inst,
optimisations=optimisations, verbose=verbose,
benchmark=benchmark, runtime=runtime,
filter_scenario=filter_scenario,
X_test=X_test, y_test=y_test, ypred=ypred,
Xort_test=Xort_test, method_name=method_name,
check_runtime=check_runtime,
output_index=output_index,
kwargs=dict(
dump_all=dump_all,
dump_folder=dump_folder,
node_time=node_time,
skip_long_test=skip_long_test,
store_models=store_models,
time_kwargs=_multiply_time_kwargs(
time_kwargs, time_kwargs_fact, inst)
),
time_limit=time_limit,
fLOG=fLOG):
yield run_obs
[docs]def _check_run_benchmark(benchmark, stat_onnx, bench_memo, runtime):
unique = set(stat_onnx.items())
unique.add(runtime)
run_benchmark = benchmark and all(
map(lambda u: unique != u, bench_memo))
if run_benchmark:
bench_memo.append(unique)
return run_benchmark
[docs]def _call_conv_runtime_opset(
obs, opsets, debug, new_conv_options,
model, prob, scenario, extra, extras, conv_options,
init_types, inst, optimisations, verbose,
benchmark, runtime, filter_scenario,
check_runtime, X_test, y_test, ypred, Xort_test,
method_name, output_index,
kwargs, time_limit, fLOG):
# Calls the conversion and runtime for different opets
if None in opsets:
set_opsets = [None] + list(sorted((_ for _ in opsets if _ is not None),
reverse=True))
else:
set_opsets = list(sorted(opsets, reverse=True))
bench_memo = []
for opset in set_opsets:
if verbose >= 2 and fLOG is not None:
fLOG("[enumerate_compatible_opset] opset={} init_types={}".format(
opset, init_types))
obs_op = obs.copy()
if opset is not None:
obs_op['opset'] = opset
if len(init_types) != 1:
raise NotImplementedError( # pragma: no cover
"Multiple types are is not implemented: "
"{}.".format(init_types))
if not isinstance(runtime, list):
runtime = [runtime]
obs_op_0c = obs_op.copy()
for aoptions in new_conv_options:
obs_op = obs_op_0c.copy()
all_conv_options = {} if conv_options is None else conv_options.copy()
all_conv_options = _merge_options(
all_conv_options, aoptions)
obs_op['conv_options'] = all_conv_options
if (filter_scenario is not None and
not filter_scenario(model, prob, scenario,
extra, all_conv_options)):
continue
for rt in runtime:
def fct_conv(itt=inst, it=init_types[0][1], ops=opset,
options=all_conv_options):
return to_onnx(itt, it, target_opset=ops, options=options,
rewrite_ops=rt in ('', None, 'python',
'python_compiled'))
if verbose >= 2 and fLOG is not None:
fLOG(
"[enumerate_compatible_opset] conversion to onnx: {}".format(all_conv_options))
try:
conv, t4 = _measure_time(fct_conv)[:2]
obs_op["convert_time"] = t4
except (RuntimeError, IndexError, AttributeError, TypeError,
ValueError, NameError, NotImplementedError) as e:
if debug:
fLOG(pprint.pformat(obs_op)) # pragma: no cover
raise # pragma: no cover
obs_op["_4convert_exc"] = e
yield obs_op.copy()
continue
if verbose >= 6 and fLOG is not None:
fLOG( # pragma: no cover
"[enumerate_compatible_opset] ONNX:\n{}".format(conv))
if all_conv_options.get('optim', '') == 'cdist': # pragma: no cover
check_cdist = [_ for _ in str(conv).split('\n')
if 'CDist' in _]
check_scan = [_ for _ in str(conv).split('\n')
if 'Scan' in _]
if len(check_cdist) == 0 and len(check_scan) > 0:
raise RuntimeError(
"Operator CDist was not used in\n{}"
"".format(conv))
obs_op0 = obs_op.copy()
for optimisation in optimisations:
obs_op = obs_op0.copy()
if optimisation is not None:
if optimisation == 'onnx':
obs_op['optim'] = optimisation
if len(aoptions) != 0:
obs_op['optim'] += '/' + \
_dictionary2str(aoptions)
conv = onnx_optimisations(conv)
else:
raise ValueError( # pragma: no cover
"Unknown optimisation option '{}' (extra={})"
"".format(optimisation, extras))
else:
obs_op['optim'] = _dictionary2str(aoptions)
if verbose >= 3 and fLOG is not None:
fLOG("[enumerate_compatible_opset] optim='{}' optimisation={} all_conv_options={}".format(
obs_op['optim'], optimisation, all_conv_options))
if kwargs['store_models']:
obs_op['ONNX'] = conv
if verbose >= 2 and fLOG is not None:
fLOG( # pragma: no cover
"[enumerate_compatible_opset] onnx nodes: {}".format(
len(conv.graph.node)))
stat_onnx = onnx_statistics(conv)
obs_op.update(
{'onx_' + k: v for k, v in stat_onnx.items()})
# opset_domain
for op_imp in list(conv.opset_import):
obs_op['domain_opset_%s' %
op_imp.domain] = op_imp.version
run_benchmark = _check_run_benchmark(
benchmark, stat_onnx, bench_memo, rt)
# prediction
if check_runtime:
yield _call_runtime(obs_op=obs_op.copy(), conv=conv,
opset=opset, debug=debug,
runtime=rt, inst=inst,
X_test=X_test, y_test=y_test,
init_types=init_types,
method_name=method_name,
output_index=output_index,
ypred=ypred, Xort_test=Xort_test,
model=model,
dump_folder=kwargs['dump_folder'],
benchmark=run_benchmark,
node_time=kwargs['node_time'],
time_kwargs=kwargs['time_kwargs'],
fLOG=fLOG, verbose=verbose,
store_models=kwargs['store_models'],
dump_all=kwargs['dump_all'],
skip_long_test=kwargs['skip_long_test'],
time_limit=time_limit)
else:
yield obs_op.copy() # pragma: no cover
[docs]def _call_runtime(obs_op, conv, opset, debug, inst, runtime,
X_test, y_test, init_types, method_name, output_index,
ypred, Xort_test, model, dump_folder,
benchmark, node_time, fLOG,
verbose, store_models, time_kwargs,
dump_all, skip_long_test, time_limit):
"""
Private.
:githublink:`%|py|574`
"""
if 'onnxruntime' in runtime:
old = conv.ir_version
conv.ir_version = get_ir_version_from_onnx()
else:
old = None
ser, t5, ___ = _measure_time(lambda: conv.SerializeToString())
obs_op['tostring_time'] = t5
obs_op['runtime'] = runtime
if old is not None:
conv.ir_version = old
# load
if verbose >= 2 and fLOG is not None:
fLOG("[enumerate_compatible_opset-R] load onnx")
try:
sess, t5, ___ = _measure_time(
lambda: OnnxInference(ser, runtime=runtime))
obs_op['tostring_time'] = t5
except (RuntimeError, ValueError, KeyError, IndexError, TypeError) as e:
if debug:
raise # pragma: no cover
obs_op['_5ort_load_exc'] = e
return obs_op
# compute batch
if store_models:
obs_op['OINF'] = sess
if verbose >= 2 and fLOG is not None:
fLOG("[enumerate_compatible_opset-R] compute batch with runtime "
"'{}'".format(runtime))
def fct_batch(se=sess, xo=Xort_test, it=init_types): # pylint: disable=W0102
return se.run({it[0][0]: xo},
verbose=max(verbose - 1, 1) if debug else 0, fLOG=fLOG)
try:
opred, t5, ___ = _measure_time(fct_batch)
obs_op['ort_run_time_batch'] = t5
obs_op['lambda-batch'] = (lambda xo: sess.run(
{init_types[0][0]: xo}, node_time=node_time), Xort_test)
except (RuntimeError, TypeError, ValueError, KeyError, IndexError) as e:
if debug:
raise RuntimeError("Issue with {}.".format(
obs_op)) from e # pragma: no cover
obs_op['_6ort_run_batch_exc'] = e
if (benchmark or node_time) and 'lambda-batch' in obs_op:
try:
benres = benchmark_fct(*obs_op['lambda-batch'], obs=obs_op,
node_time=node_time, time_kwargs=time_kwargs,
skip_long_test=skip_long_test,
time_limit=time_limit)
obs_op['bench-batch'] = benres
except (RuntimeError, TypeError, ValueError) as e: # pragma: no cover
if debug:
raise e # pragma: no cover
obs_op['_6ort_run_batch_exc'] = e
obs_op['_6ort_run_batch_bench_exc'] = e
# difference
debug_exc = []
if verbose >= 2 and fLOG is not None:
fLOG("[enumerate_compatible_opset-R] differences")
if '_6ort_run_batch_exc' not in obs_op:
if isinstance(opred, dict):
ch = [(k, v) for k, v in opred.items()]
opred = [_[1] for _ in ch]
if output_index != 'all':
try:
opred = opred[output_index]
except IndexError as e: # pragma: no cover
if debug:
raise IndexError(
"Issue with output_index={}/{}".format(
output_index, len(opred))) from e
obs_op['_8max_rel_diff_batch_exc'] = (
"Unable to fetch output {}/{} for model '{}'"
"".format(output_index, len(opred),
model.__name__))
opred = None
if opred is not None:
if store_models:
obs_op['skl_outputs'] = ypred
obs_op['ort_outputs'] = opred
if verbose >= 3 and fLOG is not None:
fLOG("[_call_runtime] runtime prediction")
_dispsimple(opred, fLOG)
if (method_name == "decision_function" and hasattr(opred, 'shape') and
hasattr(ypred, 'shape') and len(opred.shape) == 2 and
opred.shape[1] == 2 and len(ypred.shape) == 1):
# decision_function, for binary classification,
# raw score is a distance
max_rel_diff = measure_relative_difference(
ypred, opred[:, 1])
else:
max_rel_diff = measure_relative_difference(
ypred, opred)
if max_rel_diff >= 1e9 and debug: # pragma: no cover
_shape = lambda o: o.shape if hasattr(
o, 'shape') else 'no shape'
raise RuntimeError(
"Big difference (opset={}, runtime='{}' p='{}' s='{}')"
":\n-------\n{}-{}\n{}\n--------\n{}-{}\n{}".format(
opset, runtime, obs_op['problem'], obs_op['scenario'],
type(ypred), _shape(ypred), ypred,
type(opred), _shape(opred), opred))
if numpy.isnan(max_rel_diff):
obs_op['_8max_rel_diff_batch_exc'] = ( # pragma: no cover
"Unable to compute differences between"
" {}-{}\n{}\n--------\n{}".format(
_shape_exc(
ypred), _shape_exc(opred),
ypred, opred))
if debug: # pragma: no cover
debug_exc.append(RuntimeError(
obs_op['_8max_rel_diff_batch_exc']))
else:
obs_op['max_rel_diff_batch'] = max_rel_diff
if dump_folder and max_rel_diff > 1e-5:
dump_into_folder(dump_folder, kind='batch', obs_op=obs_op,
X_test=X_test, y_test=y_test, Xort_test=Xort_test)
if debug and max_rel_diff >= 0.1: # pragma: no cover
raise RuntimeError("Two big differences {}\n{}\n{}\n{}".format(
max_rel_diff, inst, conv, pprint.pformat(obs_op)))
if debug and len(debug_exc) == 2:
raise debug_exc[0] # pragma: no cover
if debug and verbose >= 2: # pragma: no cover
if verbose >= 3:
fLOG(pprint.pformat(obs_op))
else:
obs_op_log = {k: v for k,
v in obs_op.items() if 'lambda-' not in k}
fLOG(pprint.pformat(obs_op_log))
if verbose >= 2 and fLOG is not None:
fLOG("[enumerate_compatible_opset-R] next...")
if dump_all:
dump = dump_into_folder(dump_folder, kind='batch', obs_op=obs_op,
X_test=X_test, y_test=y_test, Xort_test=Xort_test,
is_error=len(debug_exc) > 1,
onnx_bytes=conv.SerializeToString(),
skl_model=inst, ypred=ypred)
obs_op['dumped'] = dump
return obs_op
[docs]def _enumerate_validated_operator_opsets_ops(extended_list, models, skip_models):
ops = [_ for _ in sklearn_operators(extended=extended_list)]
if models is not None:
if not all(map(lambda m: isinstance(m, str), models)):
raise ValueError( # pragma: no cover
"models must be a set of strings.")
ops_ = [_ for _ in ops if _['name'] in models]
if len(ops) == 0:
raise ValueError( # pragma: no cover
"Parameter models is wrong: {}\n{}".format(
models, ops[0]))
ops = ops_
if skip_models is not None:
ops = [m for m in ops if m['name'] not in skip_models]
return ops
[docs]def _enumerate_validated_operator_opsets_version(runtime):
from numpy import __version__ as numpy_version
from onnx import __version__ as onnx_version
from scipy import __version__ as scipy_version
from skl2onnx import __version__ as skl2onnx_version
add_versions = {'v_numpy': numpy_version, 'v_onnx': onnx_version,
'v_scipy': scipy_version, 'v_skl2onnx': skl2onnx_version,
'v_sklearn': sklearn_version, 'v_onnxruntime': ort_version}
if "onnxruntime" in runtime:
from onnxruntime import __version__ as onnxrt_version
add_versions['v_onnxruntime'] = onnxrt_version
return add_versions
[docs]def enumerate_validated_operator_opsets(verbose=0, opset_min=-1, opset_max=-1,
check_runtime=True, debug=False, runtime='python',
models=None, dump_folder=None, store_models=False,
benchmark=False, skip_models=None,
assume_finite=True, node_time=False,
fLOG=print, filter_exp=None,
versions=False, extended_list=False,
time_kwargs=None, dump_all=False,
n_features=None, skip_long_test=True,
fail_bad_results=False,
filter_scenario=None,
time_kwargs_fact=None,
time_limit=4, n_jobs=None):
"""
Tests all possible configurations for all possible
operators and returns the results.
:param verbose: integer 0, 1, 2
:param opset_min: checks conversion starting from the opset, -1
to get the last one
:param opset_max: checks conversion up to this opset,
None means :func:`get_opset_number_from_onnx`.
:param check_runtime: checks the python runtime
:param models: only process a small list of operators,
set of model names
:param debug: stops whenever an exception
is raised
:param runtime: test a specific runtime, by default ``'python'``
:param dump_folder: dump information to replicate in case of mismatch
:param dump_all: dump all models not only the one which fail
:param store_models: if True, the function
also stores the fitted model and its conversion
into :epkg:`ONNX`
:param benchmark: if True, measures the time taken by each function
to predict for different number of rows
:param filter_exp: function which tells if the experiment must be run,
None to run all, takes *model, problem* as an input
:param filter_scenario: second function which tells if the experiment must be run,
None to run all, takes *model, problem, scenario, extra, options*
as an input
:param skip_models: models to skip
:param assume_finite: See `config_context
<https://scikit-learn.org/stable/modules/generated/
sklearn.config_context.html>`_, If True, validation for finiteness
will be skipped, saving time, but leading to potential crashes.
If False, validation for finiteness will be performed, avoiding error.
:param node_time: measure time execution for every node in the graph
:param versions: add columns with versions of used packages,
:epkg:`numpy`, :epkg:`scikit-learn`, :epkg:`onnx`,
:epkg:`onnxruntime`, :epkg:`sklearn-onnx`
:param extended_list: also check models this module implements a converter for
:param time_kwargs: to define a more precise way to measure a model
:param n_features: modifies the shorts datasets used to train the models
to use exactly this number of features, it can also
be a list to test multiple datasets
:param skip_long_test: skips tests for high values of N if they seem too long
:param fail_bad_results: fails if the results are aligned with :epkg:`scikit-learn`
:param time_kwargs_fact: see :func:`_multiply_time_kwargs <mlprodict.onnxrt.validate.validate_helper._multiply_time_kwargs>`
:param time_limit: to skip the rest of the test after this limit (in second)
:param n_jobs: *n_jobs* is set to the number of CPU by default unless this
value is changed
:param fLOG: logging function
:return: list of dictionaries
The function is available through command line
:ref:`validate_runtime <l-cmd-validate_runtime>`.
The default for *time_kwargs* is the following:
.. runpython::
:showcode:
from mlprodict.onnxrt.validate.validate_helper import default_time_kwargs
import pprint
pprint.pprint(default_time_kwargs())
:githublink:`%|py|833`
"""
register_converters()
register_rewritten_operators()
ops = _enumerate_validated_operator_opsets_ops(
extended_list, models, skip_models)
if verbose > 0:
def iterate():
for i, row in enumerate(ops): # pragma: no cover
fLOG("{}/{} - {}".format(i + 1, len(ops), row))
yield row
if verbose >= 11:
verbose -= 10 # pragma: no cover
loop = iterate() # pragma: no cover
else:
try:
from tqdm import trange
def iterate_tqdm():
with trange(len(ops)) as t:
for i in t:
row = ops[i]
disp = row['name'] + " " * (28 - len(row['name']))
t.set_description("%s" % disp)
yield row
loop = iterate_tqdm()
except ImportError: # pragma: no cover
loop = iterate()
else:
loop = ops
if versions:
add_versions = _enumerate_validated_operator_opsets_version(runtime)
else:
add_versions = {}
current_opset = get_opset_number_from_onnx()
if opset_min == -1:
opset_min = get_opset_number_from_onnx()
if opset_max == -1:
opset_max = get_opset_number_from_onnx()
if verbose > 0 and fLOG is not None:
fLOG("[enumerate_validated_operator_opsets] opset in [{}, {}].".format(
opset_min, opset_max))
for row in loop:
model = row['cl']
if verbose > 1:
fLOG("[enumerate_validated_operator_opsets] - model='{}'".format(model))
for obs in enumerate_compatible_opset(
model, opset_min=opset_min, opset_max=opset_max,
check_runtime=check_runtime, runtime=runtime,
debug=debug, dump_folder=dump_folder,
store_models=store_models, benchmark=benchmark,
fLOG=fLOG, filter_exp=filter_exp,
assume_finite=assume_finite, node_time=node_time,
verbose=verbose, extended_list=extended_list,
time_kwargs=time_kwargs, dump_all=dump_all,
n_features=n_features, skip_long_test=skip_long_test,
filter_scenario=filter_scenario,
time_kwargs_fact=time_kwargs_fact,
time_limit=time_limit, n_jobs=n_jobs):
for mandkey in ('inst', 'method_name', 'problem',
'scenario'):
if '_0problem_exc' in obs:
continue
if mandkey not in obs:
raise ValueError("Missing key '{}' in\n{}".format(
mandkey, pprint.pformat(obs))) # pragma: no cover
if verbose > 1:
fLOG('[enumerate_validated_operator_opsets] - OBS')
if verbose > 2:
fLOG(" ", obs)
else:
obs_log = {k: v for k,
v in obs.items() if 'lambda-' not in k}
fLOG(" ", obs_log)
elif verbose > 0 and "_0problem_exc" in obs:
fLOG(" ???", obs) # pragma: no cover
diff = obs.get('max_rel_diff_batch', None)
batch = 'max_rel_diff_batch' in obs and diff is not None
op1 = obs.get('domain_opset_', '')
op2 = obs.get('domain_opset_ai.onnx.ml', '')
op = '{}/{}'.format(op1, op2)
obs['available'] = "?"
if diff is not None:
if diff < 1e-5:
obs['available'] = 'OK'
elif diff < 0.0001:
obs['available'] = 'e<0.0001'
elif diff < 0.001:
obs['available'] = 'e<0.001'
elif diff < 0.01:
obs['available'] = 'e<0.01' # pragma: no cover
elif diff < 0.1:
obs['available'] = 'e<0.1'
else:
obs['available'] = "ERROR->=%1.1f" % diff
obs['available'] += '-' + op
if not batch:
obs['available'] += "-NOBATCH" # pragma: no cover
if fail_bad_results and 'e<' in obs['available']:
raise RuntimeBadResultsError(
"Wrong results '{}'.".format(obs['available']), obs) # pragma: no cover
excs = []
for k, v in sorted(obs.items()):
if k.endswith('_exc'):
excs.append((k, v))
break
if 'opset' not in obs:
# It fails before the conversion happens.
obs['opset'] = current_opset
if obs['opset'] == current_opset and len(excs) > 0:
k, v = excs[0]
obs['available'] = 'ERROR-%s' % k
obs['available-ERROR'] = v
if 'bench-skl' in obs:
b1 = obs['bench-skl']
if 'bench-batch' in obs:
b2 = obs['bench-batch']
else:
b2 = None
if b1 is not None and b2 is not None:
for k in b1:
if k in b2 and b2[k] is not None and b1[k] is not None:
key = 'time-ratio-N=%d' % k
obs[key] = b2[k]['average'] / b1[k]['average']
key = 'time-ratio-N=%d-min' % k
obs[key] = b2[k]['min_exec'] / b1[k]['max_exec']
key = 'time-ratio-N=%d-max' % k
obs[key] = b2[k]['max_exec'] / b1[k]['min_exec']
obs.update(row)
obs.update(add_versions)
yield obs.copy()