Source code for mlinsights.mlmodel.piecewise_tree_regression

# -*- coding: utf-8 -*-
"""
Implements a kind of piecewise linear regression by modifying
the criterion used by the algorithm which builds a decision tree.


:githublink:`%|py|7`
"""
import numpy
from sklearn.tree import DecisionTreeRegressor


[docs]class PiecewiseTreeRegressor(DecisionTreeRegressor): """ Implements a kind of piecewise linear regression by modifying the criterion used by the algorithm which builds a decision tree. See :epkg:`sklearn:tree:DecisionTreeRegressor` to get the meaning of the parameters except criterion: * ``mselin``: optimizes for a piecewise linear regression * ``simple``: optimizes for a stepwise regression (equivalent to *mse*) :githublink:`%|py|20` """
[docs] def __init__(self, criterion='mselin', splitter='best', max_depth=None, min_samples_split=2, min_samples_leaf=1, min_weight_fraction_leaf=0.0, max_features=None, random_state=None, max_leaf_nodes=None, min_impurity_decrease=0.0, min_impurity_split=None): DecisionTreeRegressor.__init__( self, criterion=criterion, splitter=splitter, max_depth=max_depth, min_samples_split=min_samples_split, min_samples_leaf=min_samples_leaf, min_weight_fraction_leaf=min_weight_fraction_leaf, max_features=max_features, random_state=random_state, max_leaf_nodes=max_leaf_nodes, min_impurity_decrease=min_impurity_decrease, min_impurity_split=min_impurity_split)
[docs] def fit(self, X, y, sample_weight=None, check_input=True, X_idx_sorted=None): """ Replaces the string stored in criterion by an instance of a class. :githublink:`%|py|42` """ replace = None if isinstance(self.criterion, str): if self.criterion == 'mselin': from .piecewise_tree_regression_criterion_linear import ( # pylint: disable=E0611,C0415 LinearRegressorCriterion) replace = self.criterion self.criterion = LinearRegressorCriterion(X) elif self.criterion == "simple": from .piecewise_tree_regression_criterion_fast import ( # pylint: disable=E0611,C0415 SimpleRegressorCriterionFast) replace = self.criterion self.criterion = SimpleRegressorCriterionFast(X) else: replace = None DecisionTreeRegressor.fit(self, X, y, sample_weight=sample_weight, check_input=check_input, X_idx_sorted=X_idx_sorted) if replace: self.criterion = replace if self.criterion == "mselin": self._fit_reglin(X, y, sample_weight) return self
[docs] def _mapping_train(self, X): tree = self.tree_ leaves = [i for i in range(len(tree.children_left)) if tree.children_left[i] <= i and tree.children_right[i] <= i] # pylint: disable=E1136 dec_path = self.decision_path(X) association = numpy.zeros((X.shape[0],)) association[:] = -1 mapping = {} ntree = 0 for j in leaves: ind = dec_path[:, j] == 1 ind = numpy.asarray(ind.todense()).flatten() if not numpy.any(ind): # No training example for this bucket. continue mapping[j] = ntree association[ind] = ntree ntree += 1 return mapping
[docs] def predict_leaves(self, X): """ Returns the leave index for each observation of *X*. :param X: array :return: array leaves index in ``self.leaves_index_`` :githublink:`%|py|95` """ # The creation of the sparse matrix could be avoided. leaves = self.decision_path(X) leaves = leaves[:, self.leaves_index_] mat = numpy.argmax(leaves, 1) res = numpy.asarray(mat).ravel() return res
[docs] def _fit_reglin(self, X, y, sample_weight): """ Fits linear regressions for all leaves. Sets attributes ``leaves_mapping_``, ``betas_``, ``leaves_index_``. The first attribute is a dictionary ``{leave: row}`` which maps a leave of the tree to the coefficients ``betas_[row, :]`` of a regression trained on all training points mapped a specific leave. ``leaves_index_`` keeps in memory a set of leaves. :githublink:`%|py|112` """ from .piecewise_tree_regression_criterion_linear import ( # pylint: disable=E0611,C0415 LinearRegressorCriterion) tree = self.tree_ self.leaves_index_ = [i for i in range(len(tree.children_left)) if tree.children_left[i] <= i and tree.children_right[i] <= i] # pylint: disable=E1136 if tree.n_leaves != len(self.leaves_index_): raise RuntimeError( # pragma: no cover "Unexpected number of leaves {} != {}".format( tree.n_leaves, len(self.leaves_index_))) pred_leaves = self.predict_leaves(X) self.leaves_mapping_ = {k: i for i, k in enumerate(pred_leaves)} self.betas_ = numpy.empty((len(self.leaves_index_), X.shape[1] + 1)) for i, _ in enumerate(self.leaves_index_): ind = pred_leaves == i xs = X[ind, :].copy() ys = y[ind].astype(numpy.float64) if len(ys.shape) == 1: ys = ys[:, numpy.newaxis] ys = ys.copy() ws = sample_weight[ind].copy() if sample_weight else None dec = LinearRegressorCriterion.create(xs, ys, ws) dec.node_beta(self.betas_[i, :])
[docs] def predict(self, X, check_input=True): """ Overloads method *predict*. Falls back into the predict from a decision tree is criterion is *mse*, *mae*, *simple*. Computes the predictions from linear regression if the criterion is *mselin*. :githublink:`%|py|143` """ if self.criterion == 'mselin': return self._predict_reglin(X, check_input=check_input) return DecisionTreeRegressor.predict(self, X, check_input=check_input)
[docs] def _predict_reglin(self, X, check_input=True): """ Computes the predictions with a linear regression fitted with the observations mapped to each leave of the tree. :param X: array-like or sparse matrix of shape = [n_samples, n_features] The input samples. Internally, it will be converted to ``dtype=np.float32`` and if a sparse matrix is provided to a sparse ``csr_matrix``. :param check_input: boolean, (default=True) Allow to bypass several input checking. Don't use this parameter unless you know what you do. :return: y, array of shape = [n_samples] or [n_samples, n_outputs] The predicted classes, or the predict values. :githublink:`%|py|163` """ leaves = self.predict_leaves(X) pred = numpy.ones((X.shape[0], 1)) Xone = numpy.hstack([X, pred]) for i in range(0, X.shape[0]): li = leaves[i] pred[i] = numpy.dot(Xone[i, :], self.betas_[li, :]) return pred.ravel()