Source code for mlprodict.onnxrt.optim.sklearn_helper

"""
Helpers to manipulate :epkg:`scikit-learn` models.


:githublink:`%|py|5`
"""
import inspect
import multiprocessing
import numpy
from sklearn.base import (
    TransformerMixin, ClassifierMixin, RegressorMixin, BaseEstimator)
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.compose import ColumnTransformer, TransformedTargetRegressor


[docs]def enumerate_pipeline_models(pipe, coor=None, vs=None): """ Enumerates all the models within a pipeline. :param pipe: *scikit-learn* pipeline :param coor: current coordinate :param vs: subset of variables for the model, None for all :return: iterator on models ``tuple(coordinate, model)`` Example: .. runpython:: :showcode: from sklearn.datasets import load_iris from sklearn.decomposition import PCA from sklearn.linear_model import LogisticRegression from sklearn.pipeline import make_pipeline from sklearn.model_selection import train_test_split from mlprodict.onnxrt.optim.sklearn_helper import enumerate_pipeline_models iris = load_iris() X, y = iris.data, iris.target X_train, __, y_train, _ = train_test_split(X, y, random_state=11) clr = make_pipeline(PCA(n_components=2), LogisticRegression(solver="liblinear")) clr.fit(X_train, y_train) for a in enumerate_pipeline_models(clr): print(a) :githublink:`%|py|44` """ if coor is None: coor = (0,) yield coor, pipe, vs if hasattr(pipe, 'transformer_and_mapper_list') and len(pipe.transformer_and_mapper_list): # azureml DataTransformer raise NotImplementedError( # pragma: no cover "Unable to handle this specific case.") elif hasattr(pipe, 'mapper') and pipe.mapper: # azureml DataTransformer for couple in enumerate_pipeline_models( # pragma: no cover pipe.mapper, coor + (0,)): yield couple elif hasattr(pipe, 'built_features'): # pragma: no cover # sklearn_pandas.dataframe_mapper.DataFrameMapper for i, (columns, transformers, _) in enumerate( pipe.built_features): if isinstance(columns, str): columns = (columns,) if transformers is None: yield (coor + (i,)), None, columns else: for couple in enumerate_pipeline_models(transformers, coor + (i,), columns): yield couple elif isinstance(pipe, Pipeline): for i, (_, model) in enumerate(pipe.steps): for couple in enumerate_pipeline_models(model, coor + (i,)): yield couple elif isinstance(pipe, ColumnTransformer): for i, (_, fitted_transformer, column) in enumerate(pipe.transformers): for couple in enumerate_pipeline_models( fitted_transformer, coor + (i,), column): yield couple elif isinstance(pipe, FeatureUnion): for i, (_, model) in enumerate(pipe.transformer_list): for couple in enumerate_pipeline_models(model, coor + (i,)): yield couple elif isinstance(pipe, TransformedTargetRegressor): raise NotImplementedError( "Not yet implemented for TransformedTargetRegressor.") elif isinstance(pipe, (TransformerMixin, ClassifierMixin, RegressorMixin)): pass elif isinstance(pipe, BaseEstimator): pass elif isinstance(pipe, (list, numpy.ndarray)): for i, m in enumerate(pipe): for couple in enumerate_pipeline_models(m, coor + (i,)): yield couple else: raise TypeError( # pragma: no cover "pipe is not a scikit-learn object: {}\n{}".format(type(pipe), pipe))
[docs]def enumerate_fitted_arrays(model): """ Enumerate all fitted arrays included in a :epkg:`scikit-learn` object. :param model: :epkg:`scikit-learn` object :return: enumerator One example: .. runpython:: :showcode: from sklearn.datasets import load_iris from sklearn.decomposition import PCA from sklearn.linear_model import LogisticRegression from sklearn.pipeline import make_pipeline from sklearn.model_selection import train_test_split from mlprodict.onnxrt.optim.sklearn_helper import enumerate_fitted_arrays iris = load_iris() X, y = iris.data, iris.target X_train, __, y_train, _ = train_test_split(X, y, random_state=11) clr = make_pipeline(PCA(n_components=2), LogisticRegression(solver="liblinear")) clr.fit(X_train, y_train) for a in enumerate_fitted_arrays(clr): print(a) :githublink:`%|py|126` """ def enumerate__(obj): if isinstance(obj, (tuple, list)): for el in obj: for o in enumerate__(el): yield (obj, el, o) elif isinstance(obj, dict): for k, v in obj.items(): for o in enumerate__(v): yield (obj, k, v, o) elif hasattr(obj, '__dict__'): for k, v in obj.__dict__.items(): if k[-1] != '_' and k[0] != '_': continue if isinstance(v, numpy.ndarray): yield (obj, k, v) else: for row in enumerate__(v): yield row for row in enumerate_pipeline_models(model): coord = row[:-1] sub = row[1] last = row[2:] for sub_row in enumerate__(sub): yield coord + (sub, sub_row) + last
[docs]def pairwise_array_distances(l1, l2, metric='l1med'): """ Computes pairwise distances between two lists of arrays *l1* and *l2*. The distance is 1e9 if shapes are not equal. :param l1: first list of arrays :param l2: second list of arrays :param metric: metric to use, `'l1med'` compute the average absolute error divided by the ansolute median :return: matrix :githublink:`%|py|165` """ dist = numpy.full((len(l1), len(l2)), 1e9) for i, a1 in enumerate(l1): if not isinstance(a1, numpy.ndarray): continue # pragma: no cover for j, a2 in enumerate(l2): if not isinstance(a2, numpy.ndarray): continue # pragma: no cover if a1.shape != a2.shape: continue a = numpy.median(numpy.abs(a1)) if a == 0: a = 1 diff = numpy.sum(numpy.abs(a1 - a2)) / a dist[i, j] = diff / diff.size return dist
[docs]def max_depth(estimator): """ Retrieves the max depth assuming the estimator is a decision tree. :githublink:`%|py|187` """ n_nodes = estimator.tree_.node_count children_left = estimator.tree_.children_left children_right = estimator.tree_.children_right node_depth = numpy.zeros(shape=n_nodes, dtype=numpy.int64) is_leaves = numpy.zeros(shape=n_nodes, dtype=bool) stack = [(0, -1)] # seed is the root node id and its parent depth while len(stack) > 0: node_id, parent_depth = stack.pop() node_depth[node_id] = parent_depth + 1 # If we have a test node if children_left[node_id] != children_right[node_id]: stack.append((children_left[node_id], parent_depth + 1)) stack.append((children_right[node_id], parent_depth + 1)) else: is_leaves[node_id] = True return max(node_depth)
[docs]def inspect_sklearn_model(model, recursive=True): """ Inspects a :epkg:`scikit-learn` model and produces some figures which tries to represent the complexity of it. :param model: model :param recursive: recursive look :return: dictionary .. runpython:: :showcode: import pprint from sklearn.ensemble import RandomForestClassifier from sklearn.linear_model import LogisticRegression from sklearn.datasets import load_iris from mlprodict.onnxrt.optim.sklearn_helper import inspect_sklearn_model iris = load_iris() X = iris.data y = iris.target lr = LogisticRegression() lr.fit(X, y) pprint.pprint((lr, inspect_sklearn_model(lr))) iris = load_iris() X = iris.data y = iris.target rf = RandomForestClassifier() rf.fit(X, y) pprint.pprint((rf, inspect_sklearn_model(rf))) :githublink:`%|py|240` """ def update(sts, st): for k, v in st.items(): if k in sts: if 'max_' in k: sts[k] = max(v, sts[k]) else: sts[k] += v else: sts[k] = v def insmodel(m): st = {'nop': 1} if hasattr(m, 'tree_') and hasattr(m.tree_, 'node_count'): st['nnodes'] = m.tree_.node_count st['ntrees'] = 1 st['max_depth'] = max_depth(m) try: if hasattr(m, 'coef_'): st['ncoef'] = len(m.coef_) st['nlin'] = 1 except KeyError: # pragma: no cover # added to deal with xgboost 1.0 (KeyError: 'weight') pass if hasattr(m, 'estimators_'): for est in m.estimators_: st_ = inspect_sklearn_model(est, recursive=recursive) update(st, st_) return st if recursive: sts = {} for __, m, _ in enumerate_pipeline_models(model): st = inspect_sklearn_model(m, recursive=False) update(sts, st) st = insmodel(m) update(sts, st) return st return insmodel(model)
[docs]def set_n_jobs(model, params, n_jobs=None): """ Looks into model signature and add parameter *n_jobs* if available. The function does not overwrite the parameter. :param model: model class :param params: current set of parameters :param n_jobs: number of CPU or *n_jobs* if specified or 0 :return: new set of parameters On this machine, the default value is the following. .. runpython:: :showcode: import multiprocessing print(multiprocessing.cpu_count()) :githublink:`%|py|298` """ if params is not None and 'n_jobs' in params: return params sig = inspect.signature(model.__init__) if 'n_jobs' not in sig.parameters: return params if n_jobs == 0: n_jobs = None params = params.copy() if params else {} params['n_jobs'] = n_jobs or multiprocessing.cpu_count() return params