"""
Common class for all benchmarks testing
converted models from :epkg:`scikit-learn`
with :epkg:`asv`. The benchmark can be run through
file :epkg:`run_asv.sh` on Linux or :epkg:`run_asv.bat` on
Windows.
.. warning::
On Windows, you should avoid cloning the repository
on a folder with a long full name. Visual Studio tends to
abide by the rule of the maximum path length even though
the system is told otherwise.
:githublink:`%|py|14`
"""
import os
from datetime import datetime
import pickle
from logging import getLogger
import numpy
from sklearn import set_config
from sklearn.datasets import load_iris
from sklearn.metrics import (
accuracy_score, mean_absolute_error,
silhouette_score)
from sklearn.model_selection import train_test_split
from mlprodict.onnxrt import OnnxInference
from mlprodict.onnx_conv import (
to_onnx, register_rewritten_operators, register_converters)
from mlprodict.onnxrt.validate.validate_benchmark import make_n_rows
from mlprodict.onnxrt.validate.validate_problems import _modify_dimension
from mlprodict.onnxrt.optim import onnx_statistics
from mlprodict.tools.asv_options_helper import (
expand_onnx_options, get_opset_number_from_onnx,
get_ir_version_from_onnx, version2number)
from mlprodict.tools.model_info import set_random_state
[docs]class _CommonAsvSklBenchmark:
"""
Common tests to all benchmarks testing converted
:epkg:`scikit-learn` models. See `benchmark attributes
<https://asv.readthedocs.io/en/stable/benchmarks.html#general>`_.
:githublink:`%|py|42`
"""
# Part which changes.
# params and param_names may be changed too.
params = [
['skl', 'pyrtc', 'ort'], # values for runtime
[1, 10, 100, 10000], # values for N
[4, 20], # values for nf
[get_opset_number_from_onnx()], # values for opset
["float", "double"], # values for dtype
[None], # values for optim
]
param_names = ['rt', 'N', 'nf', 'opset', 'dtype', 'optim']
chk_method_name = None
version = datetime.now().isoformat()
pretty_source = "disabled"
par_ydtype = numpy.int64
par_dofit = True
par_convopts = None
[docs] def _create_model(self): # pragma: no cover
raise NotImplementedError("This method must be overwritten.")
[docs] def _create_onnx_and_runtime(self, runtime, model, X, opset, dtype, optim): # pragma: no cover
raise NotImplementedError("This method must be overwritten.")
[docs] def _score_metric(self, X, y_exp, y_pred): # pragma: no cover
raise NotImplementedError("This method must be overwritten.")
[docs] def _optimize_onnx(self, onx):
return onx
[docs] def _get_xdtype(self, dtype):
if dtype in ('float', numpy.float32):
return numpy.float32
elif dtype in ('double', '64', 64, numpy.float64):
return numpy.float64
raise ValueError( # pragma: no cover
"Unknown dtype '{}'.".format(dtype))
[docs] def _get_dataset(self, nf, dtype):
xdtype = self._get_xdtype(dtype)
data = load_iris()
X, y = data.data, data.target
state = numpy.random.RandomState(seed=34) # pylint: disable=E1101
rnd = state.randn(*X.shape) / 3
X += rnd
X = _modify_dimension(X, nf)
X_train, X_test, y_train, y_test = train_test_split(
X, y, random_state=42)
Xt = X_test.astype(xdtype)
yt = y_test.astype(self.par_ydtype)
return (X_train, y_train), (Xt, yt)
[docs] def _to_onnx(self, model, X, opset, dtype, optim):
if optim is None or len(optim) == 0:
options = self.par_convopts
elif self.par_convopts and len(self.par_convopts) > 0:
raise NotImplementedError( # pragma: no cover
"Conflict between par_convopts={} and optim={}".format(
self.par_convopts, optim))
else:
# Expand common onnx options, see _nick_name_options.
options = expand_onnx_options(model, optim)
return to_onnx(model, X, options=options, target_opset=opset)
[docs] def _create_onnx_inference(self, onx, runtime):
if 'onnxruntime' in runtime:
old = onx.ir_version
onx.ir_version = get_ir_version_from_onnx()
else:
old = None
try:
res = OnnxInference(onx, runtime=runtime)
except RuntimeError as e: # pragma: no cover
if "[ONNXRuntimeError]" in str(e):
return RuntimeError("onnxruntime fails due to {}".format(str(e)))
raise e
if old is not None:
onx.ir_version = old
return res
# Part which does not change.
[docs] def _check_rt(self, rt, meth):
"""
Checks that runtime has the appropriate method.
:githublink:`%|py|133`
"""
if rt is None:
raise ValueError("rt cannot be empty.")
if not hasattr(rt, meth):
raise TypeError(
"rt of type %r has no method %r." % (type(rt), meth))
[docs] def runtime_name(self, runtime):
"""
Returns the runtime shortname.
:githublink:`%|py|143`
"""
if runtime == 'skl':
name = runtime
elif runtime == 'ort':
name = 'onnxruntime1'
elif runtime == 'ort2':
name = 'onnxruntime2' # pragma: no cover
elif runtime == 'pyrt':
name = 'python'
elif runtime == 'pyrtc':
name = 'python_compiled'
else:
raise ValueError( # pragma: no cover
"Unknown runtime '{}'.".format(runtime))
return name
[docs] def _name(self, nf, opset, dtype):
last = 'cache-{}-nf{}-op{}-dt{}.pickle'.format(
self.__class__.__name__, nf, opset, dtype)
return last
[docs] def setup_cache(self):
"asv API"
for dtype in self.params[4]:
for opv in self.params[3]:
for nf in self.params[2]:
(X_train, y_train), (X, y) = self._get_dataset(nf, dtype)
model = self._create_model()
if self.par_dofit:
set_random_state(model)
model.fit(X_train, y_train)
stored = {'model': model, 'X': X, 'y': y}
filename = self._name(nf, opv, dtype)
with open(filename, "wb") as f:
pickle.dump(stored, f)
if not os.path.exists(filename):
raise RuntimeError( # pragma: no cover
"Unable to dump model %r into %r." % (
model, filename))
[docs] def setup(self, runtime, N, nf, opset, dtype, optim):
"asv API"
logger = getLogger('skl2onnx')
logger.disabled = True
register_converters()
register_rewritten_operators()
with open(self._name(nf, opset, dtype), "rb") as f:
stored = pickle.load(f)
self.stored = stored
self.model = stored['model']
self.X, self.y = make_n_rows(stored['X'], N, stored['y'])
onx, rt_, rt_fct_, rt_fct_track_ = self._create_onnx_and_runtime(
runtime, self.model, self.X, opset, dtype, optim)
self.onx = onx
setattr(self, "rt_" + runtime, rt_)
setattr(self, "rt_fct_" + runtime, rt_fct_)
setattr(self, "rt_fct_track_" + runtime, rt_fct_track_)
set_config(assume_finite=True)
[docs] def time_predict(self, runtime, N, nf, opset, dtype, optim):
"asv API"
return getattr(self, "rt_fct_" + runtime)(self.X)
[docs] def peakmem_predict(self, runtime, N, nf, opset, dtype, optim):
"asv API"
return getattr(self, "rt_fct_" + runtime)(self.X)
[docs] def track_score(self, runtime, N, nf, opset, dtype, optim):
"asv API"
yp = getattr(self, "rt_fct_track_" + runtime)(self.X)
return self._score_metric(self.X, self.y, yp)
[docs] def track_onnxsize(self, runtime, N, nf, opset, dtype, optim):
"asv API"
return len(self.onx.SerializeToString())
[docs] def track_nbnodes(self, runtime, N, nf, opset, dtype, optim):
"asv API"
stats = onnx_statistics(self.onx)
return stats.get('nnodes', 0)
[docs] def track_vmlprodict(self, runtime, N, nf, opset, dtype, optim):
"asv API"
from mlprodict import __version__
return version2number(__version__)
[docs] def track_vsklearn(self, runtime, N, nf, opset, dtype, optim):
"asv API"
from sklearn import __version__
return version2number(__version__)
[docs] def track_vort(self, runtime, N, nf, opset, dtype, optim):
"asv API"
try:
from onnxruntime import __version__
return version2number(__version__)
except ImportError: # pragma: no cover
return 0
[docs] def check_method_name(self, method_name):
"Does some verifications. Fails if inconsistencies."
if getattr(self, 'chk_method_name', None) not in (None, method_name):
raise RuntimeError( # pragma: no cover
"Method name must be '{}'.".format(method_name))
if getattr(self, 'chk_method_name', None) is None:
raise RuntimeError( # pragma: no cover
"Unable to check that the method name is correct "
"(expected is '{}')".format(
method_name))
[docs]class _CommonAsvSklBenchmarkClassifier(_CommonAsvSklBenchmark):
"""
Common class for a classifier.
:githublink:`%|py|257`
"""
chk_method_name = 'predict_proba'
[docs] def _score_metric(self, X, y_exp, y_pred):
return accuracy_score(y_exp, y_pred)
[docs] def _create_onnx_and_runtime(self, runtime, model, X, opset, dtype, optim):
self.check_method_name('predict_proba')
onx_ = self._to_onnx(model, X, opset, dtype, optim)
onx = self._optimize_onnx(onx_)
name = self.runtime_name(runtime)
if name == 'skl':
rt_ = None
rt_fct_ = lambda X: model.predict_proba(X)
rt_fct_track_ = lambda X: model.predict(X)
else:
rt_ = self._create_onnx_inference(onx, name)
self._check_rt(rt_, 'run')
rt_fct_ = lambda pX: rt_.run({'X': pX})
rt_fct_track_ = lambda pX: rt_fct_(pX)['output_label']
return onx, rt_, rt_fct_, rt_fct_track_
[docs]class _CommonAsvSklBenchmarkClassifierRawScore(_CommonAsvSklBenchmark):
"""
Common class for a classifier.
:githublink:`%|py|283`
"""
chk_method_name = 'decision_function'
[docs] def _score_metric(self, X, y_exp, y_pred):
return accuracy_score(y_exp, y_pred)
[docs] def _create_onnx_and_runtime(self, runtime, model, X, opset, dtype, optim):
self.check_method_name('decision_function')
onx_ = self._to_onnx(model, X, opset, dtype, optim)
onx = self._optimize_onnx(onx_)
name = self.runtime_name(runtime)
if name == 'skl':
rt_ = None
rt_fct_ = lambda X: model.decision_function(X)
rt_fct_track_ = lambda X: model.predict(X)
else:
rt_ = self._create_onnx_inference(onx, name)
self._check_rt(rt_, 'run')
rt_fct_ = lambda X: rt_.run({'X': X})
rt_fct_track_ = lambda X: rt_fct_(X)['output_label']
return onx, rt_, rt_fct_, rt_fct_track_
[docs]class _CommonAsvSklBenchmarkClustering(_CommonAsvSklBenchmark):
"""
Common class for a clustering algorithm.
:githublink:`%|py|309`
"""
chk_method_name = 'predict'
[docs] def _score_metric(self, X, y_exp, y_pred):
if X.shape[0] == 1:
return 0. # pragma: no cover
elif set(y_pred) == 1:
return 0. # pragma: no cover
return silhouette_score(X, y_pred)
[docs] def _create_onnx_and_runtime(self, runtime, model, X, opset, dtype, optim):
self.check_method_name('predict')
onx_ = self._to_onnx(model, X, opset, dtype, optim)
onx = self._optimize_onnx(onx_)
name = self.runtime_name(runtime)
if name == 'skl':
rt_ = None
rt_fct_ = lambda X: model.predict(X.astype(numpy.float64))
rt_fct_track_ = lambda X: model.predict(X.astype(numpy.float64))
else:
rt_ = self._create_onnx_inference(onx, name)
self._check_rt(rt_, 'run')
rt_fct_ = lambda X: rt_.run({'X': X})
rt_fct_track_ = lambda X: rt_fct_(X)['label']
return onx, rt_, rt_fct_, rt_fct_track_
[docs]class _CommonAsvSklBenchmarkMultiClassifier(_CommonAsvSklBenchmark):
"""
Common class for a multi-classifier.
:githublink:`%|py|339`
"""
chk_method_name = 'predict_proba'
[docs] def _get_dataset(self, nf, dtype):
xdtype = self._get_xdtype(dtype)
data = load_iris()
X, y = data.data, data.target
state = numpy.random.RandomState(seed=34) # pylint: disable=E1101
rnd = state.randn(*X.shape) / 3
X += rnd
nbclass = len(set(y))
y_ = numpy.zeros((y.shape[0], nbclass), dtype=y.dtype)
for i, vy in enumerate(y):
y_[i, vy] = 1
y = y_
X = _modify_dimension(X, nf)
X_train, X_test, y_train, y_test = train_test_split(
X, y, random_state=42)
X = X_test.astype(xdtype)
y = y_test.astype(self.par_ydtype)
return (X_train, y_train), (X, y)
[docs] def _score_metric(self, X, y_exp, y_pred):
return accuracy_score(y_exp.ravel(), y_pred.ravel())
[docs] def _create_onnx_and_runtime(self, runtime, model, X, opset, dtype, optim):
self.check_method_name('predict_proba')
onx_ = self._to_onnx(model, X, opset, dtype, optim)
onx = self._optimize_onnx(onx_)
name = self.runtime_name(runtime)
if name == 'skl':
rt_ = None
rt_fct_ = lambda X: model.predict_proba(X)
rt_fct_track_ = lambda X: model.predict(X)
else:
rt_ = self._create_onnx_inference(onx, name)
self._check_rt(rt_, 'run')
rt_fct_ = lambda X: rt_.run({'X': X})
rt_fct_track_ = lambda X: rt_fct_(X)['output_label']
return onx, rt_, rt_fct_, rt_fct_track_
[docs]class _CommonAsvSklBenchmarkOutlier(_CommonAsvSklBenchmark):
"""
Common class for outlier detection.
:githublink:`%|py|384`
"""
chk_method_name = 'predict'
[docs] def _score_metric(self, X, y_exp, y_pred):
return numpy.sum(y_pred) / y_pred.shape[0]
[docs] def _create_onnx_and_runtime(self, runtime, model, X, opset, dtype, optim):
self.check_method_name('predict')
onx_ = self._to_onnx(model, X, opset, dtype, optim)
onx = self._optimize_onnx(onx_)
name = self.runtime_name(runtime)
if name == 'skl':
rt_ = None
rt_fct_ = lambda X: model.predict(X)
rt_fct_track_ = lambda X: model.predict(X)
else:
rt_ = self._create_onnx_inference(onx, name)
self._check_rt(rt_, 'run')
rt_fct_ = lambda X: rt_.run({'X': X})
rt_fct_track_ = lambda X: rt_fct_(X)['scores']
return onx, rt_, rt_fct_, rt_fct_track_
[docs]class _CommonAsvSklBenchmarkRegressor(_CommonAsvSklBenchmark):
"""
Common class for a regressor.
:githublink:`%|py|410`
"""
chk_method_name = 'predict'
[docs] def _score_metric(self, X, y_exp, y_pred):
return mean_absolute_error(y_exp, y_pred)
[docs] def _create_onnx_and_runtime(self, runtime, model, X, opset, dtype, optim):
self.check_method_name('predict')
onx = self._to_onnx(model, X, opset, dtype, optim)
name = self.runtime_name(runtime)
if name == 'skl':
rt_ = None
rt_fct_ = lambda X: model.predict(X)
rt_fct_track_ = lambda X: model.predict(X)
else:
rt_ = self._create_onnx_inference(onx, name)
self._check_rt(rt_, 'run')
rt_fct_ = lambda X: rt_.run({'X': X})
rt_fct_track_ = lambda X: rt_fct_(X)['variable']
return onx, rt_, rt_fct_, rt_fct_track_