"""
Validates runtime for many :epkg:`scikit-learn` operators.
The submodule relies on :epkg:`onnxconverter_common`,
:epkg:`sklearn-onnx`.
:githublink:`%|py|7`
"""
import math
import copy
from timeit import Timer
import os
import warnings
from importlib import import_module
import pickle
from time import perf_counter
import numpy
from sklearn.base import BaseEstimator
from sklearn.linear_model._base import LinearModel
from sklearn.model_selection import train_test_split
from sklearn import __all__ as sklearn__all__, __version__ as sklearn_version
from .validate_problems import _problems
[docs]class RuntimeBadResultsError(RuntimeError):
"""
Raised when the results are too different from
:epkg:`scikit-learn`.
:githublink:`%|py|27`
"""
[docs] def __init__(self, msg, obs):
"""
:param msg: to display
:param obs: observations
:githublink:`%|py|33`
"""
RuntimeError.__init__(self, msg)
self.obs = obs
[docs]def _dictionary2str(di):
el = []
for k in sorted(di):
el.append('{}={}'.format(k, di[k]))
return '/'.join(el)
[docs]def modules_list():
"""
Returns modules and versions currently used.
.. runpython::
:showcode:
:rst:
from mlprodict.onnxrt.validate.validate_helper import modules_list
from pyquickhelper.pandashelper import df2rst
from pandas import DataFrame
print(df2rst(DataFrame(modules_list())))
:githublink:`%|py|57`
"""
def try_import(name):
try:
mod = import_module(name)
except ImportError: # pragma: no cover
return None
return (dict(name=name, version=mod.__version__)
if hasattr(mod, '__version__') else dict(name=name))
rows = []
for name in sorted(['pandas', 'numpy', 'sklearn', 'mlprodict',
'skl2onnx', 'onnxmltools', 'onnx', 'onnxruntime',
'scipy']):
res = try_import(name)
if res is not None:
rows.append(res)
return rows
[docs]def _dispsimple(arr, fLOG):
if isinstance(arr, (tuple, list)):
for i, a in enumerate(arr):
fLOG("output %d" % i)
_dispsimple(a, fLOG)
elif hasattr(arr, 'shape'):
if len(arr.shape) == 1:
threshold = 8
else:
threshold = min(
50, min(50 // arr.shape[1], 8) * arr.shape[1])
fLOG(numpy.array2string(arr, max_line_width=120,
suppress_small=True,
threshold=threshold))
else: # pragma: no cover
s = str(arr)
if len(s) > 50:
s = s[:50] + "..."
fLOG(s)
[docs]def _merge_options(all_conv_options, aoptions):
if aoptions is None:
return copy.deepcopy(all_conv_options)
if not isinstance(aoptions, dict):
return copy.deepcopy(aoptions) # pragma: no cover
merged = {}
for k, v in all_conv_options.items():
if k in aoptions:
merged[k] = _merge_options(v, aoptions[k])
else:
merged[k] = copy.deepcopy(v)
for k, v in aoptions.items():
if k in all_conv_options:
continue
merged[k] = copy.deepcopy(v)
return merged
[docs]def sklearn_operators(subfolder=None, extended=False,
experimental=True):
"""
Builds the list of operators from :epkg:`scikit-learn`.
The function goes through the list of submodule
and get the list of class which inherit from
:epkg:`scikit-learn:base:BaseEstimator`.
:param subfolder: look into only one subfolder
:param extended: extends the list to the list of operators
this package implements a converter for
:param experimental: includes experimental module from
:epkg:`scikit-learn` (see `sklearn.experimental
<https://github.com/scikit-learn/scikit-learn/
tree/master/sklearn/experimental>`_)
:return: the list of found operators
:githublink:`%|py|131`
"""
if experimental:
from sklearn.experimental import ( # pylint: disable=W0611
enable_hist_gradient_boosting,
enable_iterative_imputer)
subfolders = sklearn__all__ + ['mlprodict.onnx_conv']
found = []
for subm in sorted(subfolders):
if isinstance(subm, list):
continue # pragma: no cover
if subfolder is not None and subm != subfolder:
continue
if subm == 'feature_extraction':
subs = [subm, 'feature_extraction.text']
else:
subs = [subm]
for sub in subs:
if '.' in sub and sub not in {'feature_extraction.text'}:
name_sub = sub
else:
name_sub = "{0}.{1}".format("sklearn", sub)
try:
mod = import_module(name_sub)
except ModuleNotFoundError:
continue
if hasattr(mod, "register_converters"):
fct = getattr(mod, "register_converters")
cls = fct()
else:
cls = getattr(mod, "__all__", None)
if cls is None:
cls = list(mod.__dict__)
cls = [mod.__dict__[cl] for cl in cls]
for cl in cls:
try:
issub = issubclass(cl, BaseEstimator)
except TypeError:
continue
if cl.__name__ in {'Pipeline', 'ColumnTransformer',
'FeatureUnion', 'BaseEstimator',
'BaseEnsemble', 'BaseDecisionTree'}:
continue
if cl.__name__ in {'CustomScorerTransform'}:
continue
if (sub in {'calibration', 'dummy', 'manifold'} and
'Calibrated' not in cl.__name__):
continue
if issub:
pack = "sklearn" if sub in sklearn__all__ else cl.__module__.split('.')[
0]
found.append(
dict(name=cl.__name__, subfolder=sub, cl=cl, package=pack))
if extended:
from ...onnx_conv import register_converters
with warnings.catch_warnings():
warnings.simplefilter("ignore", ResourceWarning)
models = register_converters(True)
done = set(_['name'] for _ in found)
for m in models:
try:
name = m.__module__.split('.')
except AttributeError as e: # pragma: no cover
raise AttributeError("Unexpected value, m={}".format(m)) from e
sub = '.'.join(name[1:])
pack = name[0]
if m.__name__ not in done:
found.append(
dict(name=m.__name__, cl=m, package=pack, sub=sub))
# let's remove models which cannot predict
all_found = found
found = []
for mod in all_found:
cl = mod['cl']
if hasattr(cl, 'fit_predict') and not hasattr(cl, 'predict'):
continue
if hasattr(cl, 'fit_transform') and not hasattr(cl, 'transform'):
continue
if (not hasattr(cl, 'transform') and
not hasattr(cl, 'predict') and
not hasattr(cl, 'decision_function')):
continue
found.append(mod)
return found
[docs]def _measure_time(fct, repeat=1, number=1, first_run=True):
"""
Measures the execution time for a function.
:param fct: function to measure
:param repeat: number of times to repeat
:param number: number of times between two measures
:param first_run: if True, runs the function once before measuring
:return: last result, average, values
:githublink:`%|py|233`
"""
res = None
values = []
if first_run:
fct()
for __ in range(repeat):
begin = perf_counter()
for _ in range(number):
res = fct()
end = perf_counter()
values.append(end - begin)
if repeat * number == 1:
return res, values[0], values
return res, sum(values) / (repeat * number), values # pragma: no cover
[docs]def _shape_exc(obj):
if hasattr(obj, 'shape'):
return obj.shape
if isinstance(obj, (list, dict, tuple)):
return "[{%d}]" % len(obj)
return None
[docs]def dump_into_folder(dump_folder, obs_op=None, is_error=True,
**kwargs):
"""
Dumps information when an error was detected
using :epkg:`*py:pickle`.
:param dump_folder: dump_folder
:param obs_op: obs_op (information)
:param is_error: is it an error or not?
:param kwargs: additional parameters
:return: name
:githublink:`%|py|268`
"""
if dump_folder is None:
raise ValueError("dump_folder cannot be None.")
optim = obs_op.get('optim', '')
optim = str(optim)
optim = optim.replace("<class 'sklearn.", "")
optim = optim.replace("<class '", "")
optim = optim.replace(" ", "")
optim = optim.replace(">", "")
optim = optim.replace("=", "")
optim = optim.replace("{", "")
optim = optim.replace("}", "")
optim = optim.replace(":", "")
optim = optim.replace("'", "")
optim = optim.replace("/", "")
optim = optim.replace("\\", "")
parts = (obs_op['runtime'], obs_op['name'], obs_op['scenario'],
obs_op['problem'], optim,
"op" + str(obs_op.get('opset', '-')),
"nf" + str(obs_op.get('n_features', '-')))
name = "dump-{}-{}.pkl".format(
"ERROR" if is_error else "i",
"-".join(map(str, parts)))
name = os.path.join(dump_folder, name)
obs_op = obs_op.copy()
fcts = [k for k in obs_op if k.startswith('lambda')]
for fct in fcts:
del obs_op[fct]
kwargs.update({'obs_op': obs_op})
with open(name, "wb") as f:
pickle.dump(kwargs, f)
return name
[docs]def default_time_kwargs():
"""
Returns default values *number* and *repeat* to measure
the execution of a function.
.. runpython::
:showcode:
from mlprodict.onnxrt.validate.validate_helper import default_time_kwargs
import pprint
pprint.pprint(default_time_kwargs())
keys define the number of rows,
values defines *number* and *repeat*.
:githublink:`%|py|316`
"""
return {
1: dict(number=30, repeat=20),
10: dict(number=20, repeat=20),
100: dict(number=8, repeat=10),
1000: dict(number=5, repeat=5),
10000: dict(number=3, repeat=3),
}
[docs]def measure_time(stmt, x, repeat=10, number=50, div_by_number=False, first_run=True):
"""
Measures a statement and returns the results as a dictionary.
:param stmt: string
:param x: matrix
:param repeat: average over *repeat* experiment
:param number: number of executions in one row
:param div_by_number: divide by the number of executions
:param first_run: if True, runs the function once before measuring
:return: dictionary
See `Timer.repeat <https://docs.python.org/3/library/timeit.html?timeit.Timer.repeat>`_
for a better understanding of parameter *repeat* and *number*.
The function returns a duration corresponding to
*number* times the execution of the main statement.
:githublink:`%|py|342`
"""
if x is None:
raise ValueError("x cannot be None") # pragma: no cover
try:
stmt(x)
except RuntimeError as e: # pragma: no cover
raise RuntimeError("{}-{}".format(type(x), x.dtype)) from e
def fct():
stmt(x)
if first_run:
fct()
tim = Timer(fct)
res = numpy.array(tim.repeat(repeat=repeat, number=number))
total = numpy.sum(res)
if div_by_number:
res /= number
mean = numpy.mean(res)
dev = numpy.mean(res ** 2)
dev = max(0, (dev - mean**2)) ** 0.5
mes = dict(average=mean, deviation=dev, min_exec=numpy.min(res),
max_exec=numpy.max(res), repeat=repeat, number=number,
total=total)
return mes
[docs]def _multiply_time_kwargs(time_kwargs, time_kwargs_fact, inst):
"""
Multiplies values in *time_kwargs* following strategy
*time_kwargs_fact* for a given model *inst*.
:param time_kwargs: see below
:param time_kwargs_fact: see below
:param inst: :epkg:`scikit-learn` model
:return : new *time_kwargs*
Possible values for *time_kwargs_fact*:
- a integer: multiplies *number* by this number
- `'lin'`: multiplies value *number* for linear models depending
on the number of rows to process (:math:`\\propto 1/\\log_{10}(n)`)
.. runpython::
:showcode:
from pprint import pprint
from sklearn.linear_model import LinearRegression
from mlprodict.onnxrt.validate.validate_helper import (
default_time_kwargs, _multiply_time_kwargs)
lr = LinearRegression()
kw = default_time_kwargs()
pprint(kw)
kw2 = _multiply_time_kwargs(kw, 'lin', lr)
pprint(kw2)
:githublink:`%|py|400`
"""
if time_kwargs is None:
raise ValueError("time_kwargs cannot be None.") # pragma: no cover
if time_kwargs_fact in ('', None):
return time_kwargs
try:
vi = int(time_kwargs_fact)
time_kwargs_fact = vi
except (TypeError, ValueError):
pass
if isinstance(time_kwargs_fact, int):
time_kwargs_modified = copy.deepcopy(time_kwargs)
for k in time_kwargs_modified:
time_kwargs_modified[k]['number'] *= time_kwargs_fact
return time_kwargs_modified
if time_kwargs_fact == 'lin':
if isinstance(inst, LinearModel):
time_kwargs_modified = copy.deepcopy(time_kwargs)
for k in time_kwargs_modified:
kl = max(int(math.log(k) / math.log(10) + 1e-5), 1)
f = max(int(10 / kl + 0.5), 1)
time_kwargs_modified[k]['number'] *= f
time_kwargs_modified[k]['repeat'] *= 1
return time_kwargs_modified
return time_kwargs
raise ValueError( # pragma: no cover
"Unable to interpret time_kwargs_fact='{}'.".format(
time_kwargs_fact))
[docs]def _get_problem_data(prob, n_features):
data_problem = _problems[prob](n_features=n_features)
if len(data_problem) == 6:
X_, y_, init_types, method, output_index, Xort_ = data_problem
dofit = True
elif len(data_problem) == 7:
X_, y_, init_types, method, output_index, Xort_, dofit = data_problem
else:
raise RuntimeError( # pragma: no cover
"Unable to interpret problem '{}'.".format(prob))
if (len(X_.shape) == 2 and X_.shape[1] != n_features and
n_features is not None):
raise RuntimeError( # pragma: no cover
"Problem '{}' with n_features={} returned {} features"
"(func={}).".format(prob, n_features, X_.shape[1],
_problems[prob]))
if y_ is None:
(X_train, X_test, Xort_train, # pylint: disable=W0612
Xort_test) = train_test_split(
X_, Xort_, random_state=42)
y_train, y_test = None, None
else:
(X_train, X_test, y_train, y_test, # pylint: disable=W0612
Xort_train, Xort_test) = train_test_split(
X_, y_, Xort_, random_state=42)
if isinstance(init_types, tuple):
init_types, conv_options = init_types
else:
conv_options = None
if isinstance(method, tuple):
method_name, predict_kwargs = method
else:
method_name = method
predict_kwargs = {}
return (X_train, X_test, y_train,
y_test, Xort_test,
init_types, conv_options, method_name,
output_index, dofit, predict_kwargs)