"""
@file
@brief Dig into pipelines.
"""
import textwrap
import warnings
from types import MethodType
from sklearn.base import TransformerMixin, ClassifierMixin, RegressorMixin, BaseEstimator
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.compose import ColumnTransformer, TransformedTargetRegressor
[docs]def enumerate_pipeline_models(pipe, coor=None, vs=None):
"""
Enumerates all the models within a pipeline.
@param pipe *scikit-learn* pipeline
@param coor current coordinate
@param vs subset of variables for the model, None for all
@return iterator on models ``tuple(coordinate, model)``
See notebook :ref:`visualizepipelinerst`.
"""
if coor is None:
coor = (0,)
if pipe == "passthrough":
class PassThrough:
"dummy class to help display"
pass
yield coor, PassThrough(), vs
else:
yield coor, pipe, vs
if hasattr(pipe, 'transformer_and_mapper_list') and len(pipe.transformer_and_mapper_list):
# azureml DataTransformer
raise NotImplementedError( # pragma: no cover
"Unable to handle this specific case.")
elif hasattr(pipe, 'mapper') and pipe.mapper:
# azureml DataTransformer
for couple in enumerate_pipeline_models(pipe.mapper, coor + (0,)):
yield couple
elif hasattr(pipe, 'built_features'): # pragma: no cover
# sklearn_pandas.dataframe_mapper.DataFrameMapper
for i, (columns, transformers, _) in enumerate(pipe.built_features):
if isinstance(columns, str):
columns = (columns,)
if transformers is None:
yield (coor + (i,)), None, columns
else:
for couple in enumerate_pipeline_models(transformers, coor + (i,), columns):
yield couple
elif isinstance(pipe, Pipeline):
for i, (_, model) in enumerate(pipe.steps):
for couple in enumerate_pipeline_models(model, coor + (i,)):
yield couple
elif isinstance(pipe, ColumnTransformer):
for i, (_, fitted_transformer, column) in enumerate(pipe.transformers):
for couple in enumerate_pipeline_models(
fitted_transformer, coor + (i,), column):
yield couple
elif isinstance(pipe, FeatureUnion):
for i, (_, model) in enumerate(pipe.transformer_list):
for couple in enumerate_pipeline_models(model, coor + (i,)):
yield couple
elif isinstance(pipe, TransformedTargetRegressor):
raise NotImplementedError( # pragma: no cover
"Not yet implemented for TransformedTargetRegressor.")
elif isinstance(pipe, (TransformerMixin, ClassifierMixin, RegressorMixin)):
pass
elif isinstance(pipe, BaseEstimator): # pragma: no cover
pass
else:
raise TypeError( # pragma: no cover
"pipe is not a scikit-learn object: {}\n{}".format(type(pipe), pipe))
[docs]def alter_pipeline_for_debugging(pipe):
"""
Overwrite methods *transform*, *predict*, *predict_proba*
or *decision_function* to collect the last inputs and outputs
seen in these methods.
@param pipe *scikit-learn* pipeline
The object *pipe* is modified, it should be copied
before calling this function if you need the object
untouched after that. The prediction is slower.
See notebook :ref:`visualizepipelinerst`.
"""
def transform(self, X, *args, **kwargs):
self._debug.inputs['transform'] = X
y = self._debug.methods['transform'](self, X, *args, **kwargs)
self._debug.outputs['transform'] = y
return y
def predict(self, X, *args, **kwargs):
self._debug.inputs['predict'] = X
y = self._debug.methods['predict'](self, X, *args, **kwargs)
self._debug.outputs['predict'] = y
return y
def predict_proba(self, X, *args, **kwargs):
self._debug.inputs['predict_proba'] = X
y = self._debug.methods['predict_proba'](self, X, *args, **kwargs)
self._debug.outputs['predict_proba'] = y
return y
def decision_function(self, X, *args, **kwargs):
self._debug.inputs['decision_function'] = X
y = self._debug.methods['decision_function'](self, X, *args, **kwargs)
self._debug.outputs['decision_function'] = y
return y
new_methods = {
'decision_function': decision_function,
'transform': transform,
'predict': predict,
'predict_proba': predict_proba,
}
if hasattr(pipe, '_debug'):
raise RuntimeError( # pragma: no cover
"The same operator cannot be used twice in "
"the same pipeline or this method was called "
"a second time.")
for model_ in enumerate_pipeline_models(pipe):
model = model_[1]
model._debug = BaseEstimatorDebugInformation(model)
for k in model._debug.methods:
try:
setattr(model, k, MethodType(new_methods[k], model))
except AttributeError: # pragma: no cover
warnings.warn("Unable to overwrite method '{}' for class "
"{}.".format(k, type(model)))