"""
A pipeline which serializes into ONNX steps by steps.
:githublink:`%|py|5`
"""
import numpy
from sklearn.base import clone
from sklearn.pipeline import Pipeline, _fit_transform_one
from sklearn.utils.validation import check_memory
from sklearn.utils import _print_elapsed_time
from ..onnx_conv import to_onnx
from .onnx_transformer import OnnxTransformer
[docs]class OnnxPipeline(Pipeline):
"""
The pipeline overwrites method *fit*, it trains and converts
every steps into ONNX before training the next step
in order to minimize discrepencies. By default,
ONNX is using float and not double which is the default
for :epkg:`scikit-learn`. It may introduce discrepencies
when a non-continuous model (mathematical definition) such
as tree ensemble and part of the pipeline.
:param steps:
List of (name, transform) tuples (implementing fit/transform) that are
chained, in the order in which they are chained, with the last object
an estimator.
:param memory: str or object with the joblib.Memory interface, default=None
Used to cache the fitted transformers of the pipeline. By default,
no caching is performed. If a string is given, it is the path to
the caching directory. Enabling caching triggers a clone of
the transformers before fitting. Therefore, the transformer
instance given to the pipeline cannot be inspected
directly. Use the attribute ``named_steps`` or ``steps`` to
inspect estimators within the pipeline. Caching the
transformers is advantageous when fitting is time consuming.
:param verbose: bool, default=False
If True, the time elapsed while fitting each step will be printed as it
is completed.
:param output_name: string
requested output name or None to request all and
have method *transform* to store all of them in a dataframe
:param enforce_float32: boolean
:epkg:`onnxruntime` only supports *float32*,
:epkg:`scikit-learn` usually uses double floats, this parameter
ensures that every array of double floats is converted into
single floats
:param runtime: string, defined the runtime to use
as described in :class:`OnnxInference <mlprodict.onnxrt.onnx_inference.OnnxInference>`.
:param options: see :func:`to_onnx <mlprodict.onnx_conv.convert.to_onnx>`
:param white_op: see :func:`to_onnx <mlprodict.onnx_conv.convert.to_onnx>`
:param black_op: see :func:`to_onnx <mlprodict.onnx_conv.convert.to_onnx>`
:param final_types: see :func:`to_onnx <mlprodict.onnx_conv.convert.to_onnx>`
:param op_version: ONNX targeted opset
The class stores transformers before converting them into ONNX
in attributes ``raw_steps_``.
See notebook :ref:`onnxdiscrepenciesrst` to see it can
be used to reduce discrepencies after it was converted into
*ONNX*.
:githublink:`%|py|62`
"""
[docs] def __init__(self, steps, *, memory=None, verbose=False,
output_name=None, enforce_float32=True,
runtime='python', options=None,
white_op=None, black_op=None, final_types=None,
op_version=None):
self.output_name = output_name
self.enforce_float32 = enforce_float32
self.runtime = runtime
self.options = options
self.white_op = white_op
self.white_op = white_op
self.black_op = black_op
self.final_types = final_types
self.op_version = op_version
# The constructor calls _validate_step and it checks the value
# of black_op.
Pipeline.__init__(
self, steps, memory=memory, verbose=verbose)
[docs] def fit(self, X, y=None, **fit_params):
"""
Fits the model, fits all the transforms one after the
other and transform the data, then fit the transformed
data using the final estimator.
:param X: iterable
Training data. Must fulfill input requirements of first step of the
pipeline.
:param y: iterable, default=None
Training targets. Must fulfill label requirements for all steps of
the pipeline.
:param fit_params: dict of string -> object
Parameters passed to the ``fit`` method of each step, where
each parameter name is prefixed such that parameter ``p`` for step
``s`` has key ``s__p``.
:return: self, Pipeline, this estimator
:githublink:`%|py|100`
"""
fit_params_steps = self._check_fit_params(**fit_params)
Xt = self._fit(X, y, **fit_params_steps)
with _print_elapsed_time('OnnxPipeline',
self._log_message(len(self.steps) - 1)):
if self._final_estimator != 'passthrough':
fit_params_last_step = fit_params_steps[self.steps[-1][0]]
self._final_estimator.fit(Xt, y, **fit_params_last_step)
return self
[docs] def _fit(self, X, y=None, **fit_params_steps):
# shallow copy of steps - this should really be steps_
if hasattr(self, 'raw_steps_') and self.raw_steps_ is not None: # pylint: disable=E0203
# Let's reuse the previous training.
self.steps = list(self.raw_steps_) # pylint: disable=E0203
self.raw_steps_ = list(self.raw_steps_)
else:
self.steps = list(self.steps)
self.raw_steps_ = list(self.steps)
self._validate_steps()
# Setup the memory
memory = check_memory(self.memory)
fit_transform_one_cached = memory.cache(_fit_transform_one)
for (step_idx,
name,
transformer) in self._iter(with_final=False,
filter_passthrough=False):
if (transformer is None or transformer == 'passthrough'):
with _print_elapsed_time('Pipeline',
self._log_message(step_idx)):
continue
if hasattr(memory, 'location'):
# joblib >= 0.12
if memory.location is None:
# we do not clone when caching is disabled to
# preserve backward compatibility
cloned_transformer = transformer
else:
cloned_transformer = clone(transformer)
else:
cloned_transformer = clone(transformer)
# Fit or load from cache the current transformer
x_train = X
X, fitted_transformer = fit_transform_one_cached(
cloned_transformer, X, y, None,
message_clsname='Pipeline',
message=self._log_message(step_idx),
**fit_params_steps[name])
# Replace the transformer of the step with the fitted
# transformer. This is necessary when loading the transformer
# from the cache.
self.raw_steps_[step_idx] = (name, fitted_transformer)
self.steps[step_idx] = (
name, self._to_onnx(name, fitted_transformer, x_train))
return X
[docs] def _to_onnx(self, name, fitted_transformer, x_train):
"""
Converts a transformer into ONNX.
:param name: model name
:param fitted_transformer: fitted transformer
:param x_train: training dataset
:return: corresponding :class:`OnnxTransformer <mlprodict.sklapi.onnx_transformer.OnnxTransformer>`
:githublink:`%|py|170`
"""
if not isinstance(x_train, numpy.ndarray):
raise RuntimeError( # pragma: no cover
"The pipeline only handle numpy arrays not {}.".format(
type(x_train)))
atts = {'options', 'white_op', 'black_op', 'final_types'}
kwargs = {k: getattr(self, k) for k in atts}
if self.enforce_float32 or x_train.dtype != numpy.float64:
x_train = x_train.astype(numpy.float32)
if 'options' in kwargs:
kwargs['options'] = self._preprocess_options(
name, kwargs['options'])
kwargs['target_opset'] = self.op_version
onx = to_onnx(fitted_transformer, x_train, **kwargs)
tr = OnnxTransformer(
onx.SerializeToString(), output_name=self.output_name,
enforce_float32=self.enforce_float32, runtime=self.runtime)
return tr.fit()
[docs] def _preprocess_options(self, name, options):
"""
Preprocesses the options.
:param name: option name
:param options: conversion options
:return: new options
:githublink:`%|py|196`
"""
if options is None:
return None
prefix = name + '__'
new_options = {}
for k, v in options.items():
if isinstance(k, str):
if k.startswith(prefix):
new_options[k[len(prefix):]] = v
else:
new_options[k] = v
return new_options