Source code for mlinsights.mlmodel.piecewise_estimator

"""
Implements a piecewise linear regression.


:githublink:`%|py|5`
"""
import numpy
import numpy.random
import pandas
from sklearn.base import BaseEstimator, RegressorMixin, ClassifierMixin, clone
from sklearn.tree import DecisionTreeRegressor, DecisionTreeClassifier
from sklearn.linear_model import LinearRegression, LogisticRegression
from sklearn.preprocessing import KBinsDiscretizer
from sklearn.utils.validation import check_is_fitted
from sklearn.utils._joblib import Parallel, delayed
from sklearn.utils.fixes import _joblib_parallel_args
try:
    from tqdm import tqdm
except ImportError:
    pass


[docs]def _fit_piecewise_estimator(i, model, X, y, sample_weight, association, nb_classes, random_state): ind = association == i if not numpy.any(ind): # No training example for this bucket. return None Xi = X[ind, :] yi = y[ind] sw = sample_weight[ind] if sample_weight is not None else None if nb_classes is not None and len(set(yi)) != nb_classes: # Issues a classifiers requires to have at least one example # of each class. if random_state is None: random_state = numpy.random.RandomState() # pylint: disable=E1101 addition = numpy.arange(len(ind)) random_state.shuffle(addition) found = set(yi) allcl = set(y) res = [] while len(found) < len(allcl): for ki in addition: if y[ki] not in found: res.append(ki) found.add(y[ki]) ind = ind.copy() for ki in res: ind[ki] = True Xi = X[ind, :] yi = y[ind] sw = sample_weight[ind] if sample_weight is not None else None return model.fit(Xi, yi, sample_weight=sw)
[docs]def _predict_piecewise_estimator(i, est, X, association): ind = association == i if not numpy.any(ind): return None, None return ind, est.predict(X[ind, :])
[docs]def _predict_proba_piecewise_estimator(i, est, X, association): ind = association == i if not numpy.any(ind): return None, None return ind, est.predict_proba(X[ind, :])
[docs]def _decision_function_piecewise_estimator(i, est, X, association): ind = association == i if not numpy.any(ind): return None, None return ind, est.decision_function(X[ind, :])
[docs]class PiecewiseEstimator(BaseEstimator): """ Uses a :epkg:`decision tree` to split the space of features into buckets and trains a linear regression on each of them. The second estimator can be a :epkg:`sklearn:linear_model:LinearRegression` for a regression or :epkg:`sklearn:linear_model:LogisticRegression` for a classifier. It can also be :epkg:`sklearn:dummy:DummyRegressor` :epkg:`sklearn:dummy:DummyClassifier` to just get the average on each bucket. :githublink:`%|py|85` """
[docs] def __init__(self, binner=None, estimator=None, n_jobs=None, verbose=False): """ :param binner: transformer or predictor which creates the buckets :param estimator: predictor trained on every bucket :param n_jobs: number of parallel jobs (for training and predicting) :param verbose: boolean or use ``'tqdm'`` to use :epkg:`tqdm` to fit the estimators *binner* must be filled or must be: - ``'bins'``: the model :epkg:`sklearn:preprocessing:KBinsDiscretizer` - any instanciated model *estimator* allows the following values: - ``None``: the model is :epkg:`sklearn:linear_model:LinearRegression` - any instanciated model :githublink:`%|py|104` """ BaseEstimator.__init__(self) if estimator is None: raise ValueError("estimator cannot be null.") if binner is None: raise TypeError( "Unsupported options for binner=='tree' and model {}.".format(type(estimator))) elif binner == "bins": binner = KBinsDiscretizer() self.binner = binner self.estimator = estimator self.n_jobs = n_jobs self.verbose = verbose
@property def n_estimators_(self): """ Returns the number of estimators = the number of buckets the data was split in. :githublink:`%|py|123` """ check_is_fitted(self, 'estimators_') return len(self.estimators_)
[docs] def _mapping_train(self, X, binner): if hasattr(binner, "tree_"): tree = binner.tree_ leaves = [i for i in range(len(tree.children_left)) if tree.children_left[i] <= i and tree.children_right[i] <= i] dec_path = self.binner_.decision_path(X) association = numpy.zeros((X.shape[0],)) association[:] = -1 mapping = {} ntree = 0 for j in leaves: ind = dec_path[:, j] == 1 ind = numpy.asarray(ind.todense()).flatten() if not numpy.any(ind): # No training example for this bucket. continue mapping[j] = ntree association[ind] = ntree ntree += 1 elif hasattr(binner, "transform"): tr = binner.transform(X) unique = set() for x in tr: d = tuple(numpy.asarray( x.todense()).ravel().astype(numpy.int32)) unique.add(d) leaves = list(sorted(unique)) association = numpy.zeros((X.shape[0],)) association[:] = -1 ntree = 0 mapping = {} for i, le in enumerate(leaves): mapping[le] = i for i, x in enumerate(tr): d = tuple(numpy.asarray( x.todense()).ravel().astype(numpy.int32)) association[i] = mapping.get(d, -1) else: raise NotImplementedError( "binner is not a decision tree or a transform") return association, mapping, leaves
[docs] def transform_bins(self, X): """ Maps every row to a tree in *self.estimators_*. :githublink:`%|py|174` """ check_is_fitted(self, 'mapping_') binner = self.binner_ if hasattr(binner, "tree_"): dec_path = self.binner_.decision_path(X) association = numpy.zeros((X.shape[0],)) association[:] = -1 for j in self.leaves_: ind = dec_path[:, j] == 1 ind = numpy.asarray(ind.todense()).flatten() if not numpy.any(ind): # No training example for this bucket. continue association[ind] = self.mapping_.get(j, -1) elif hasattr(binner, "transform"): association = numpy.zeros((X.shape[0],)) association[:] = -1 tr = binner.transform(X) for i, x in enumerate(tr): d = tuple(numpy.asarray( x.todense()).ravel().astype(numpy.int32)) association[i] = self.mapping_.get(d, -1) else: raise NotImplementedError( "binner is not a decision tree or a transform") return association
[docs] def fit(self, X, y, sample_weight=None): """ Trains the binner and an estimator on every bucket. Parameters ---------- X: features, *X* is converted into an array if *X* is a dataframe y: target sample_weight: sample weights Returns ------- self: returns an instance of self. Attributes ---------- binner_ : binner estimators_ : dictionary of estimators, each of them mapped to a leave to the tree mean_estimator_ : estimator trained on the whole datasets in case the binner can find a bucket for a new observation dim_: dimension of the output mean_: average targets :githublink:`%|py|233` """ if isinstance(X, pandas.DataFrame): X = X.values if isinstance(X, list): raise TypeError("X cannot be a list.") binner = clone(self.binner) if sample_weight is None: self.binner_ = binner.fit(X, y) else: self.binner_ = binner.fit(X, y, sample_weight=sample_weight) association, self.mapping_, self.leaves_ = self._mapping_train( X, self.binner_) estimators = [clone(self.estimator) for i in self.mapping_] loop = tqdm(range(len(estimators)) ) if self.verbose == 'tqdm' else range(len(estimators)) verbose = 1 if self.verbose == 'tqdm' else (1 if self.verbose else 0) self.mean_estimator_ = clone(self.estimator).fit(X, y, sample_weight) nb_classes = None if not hasattr(self.mean_estimator_, 'classes_') \ else len(set(self.mean_estimator_.classes_)) if hasattr(self, 'random_state') and self.random_state is not None: # pylint: disable=E1101 rnd = numpy.random.RandomState( # pylint: disable=E1101 self.random_state) # pylint: disable=E1101 else: rnd = None self.estimators_ = \ Parallel(n_jobs=self.n_jobs, verbose=verbose, **_joblib_parallel_args(prefer='threads'))( delayed(_fit_piecewise_estimator)( i, estimators[i], X, y, sample_weight, association, nb_classes, rnd) for i in loop) self.dim_ = 1 if len(y.shape) == 1 else y.shape[1] if hasattr(self.estimators_[0], 'classes_'): self.classes_ = self.estimators_[0].classes_ return self
[docs] def _apply_predict_method(self, X, method, parallelized, dimout): """ Generic *predict* method, works for *predict_proba* and *decision_function* as well. :githublink:`%|py|279` """ check_is_fitted(self, 'estimators_') if len(self.estimators_) == 0: raise RuntimeError( "Estimator was apparently fitted but contains no estimator.") if not hasattr(self.estimators_[0], method): raise TypeError("Estimator {} does not have method '{}'.".format( type(self.estimators_[0]), method)) if isinstance(X, pandas.DataFrame): X = X.values association = self.transform_bins(X) indpred = Parallel(n_jobs=self.n_jobs, **_joblib_parallel_args(prefer='threads'))( delayed(parallelized)(i, model, X, association) for i, model in enumerate(self.estimators_)) pred = numpy.zeros((X.shape[0], dimout) if dimout > 1 else (X.shape[0],)) indall = numpy.empty((X.shape[0],)) indall[:] = False for ind, p in indpred: if ind is None: continue pred[ind] = p indall = numpy.logical_or(indall, ind) # pylint: disable=E1111 # no in a bucket indall = numpy.logical_not(indall) # pylint: disable=E1111 Xmissed = X[indall] if Xmissed.shape[0] > 0: meth = getattr(self.mean_estimator_, method) missed = meth(Xmissed) pred[indall] = missed return pred
[docs]class PiecewiseRegressor(PiecewiseEstimator, RegressorMixin): """ Uses a :epkg:`decision tree` to split the space of features into buckets and trains a linear regression (default) on each of them. The second estimator is usually a :epkg:`sklearn:linear_model:LinearRegression`. It can also be :epkg:`sklearn:dummy:DummyRegressor` to just get the average on each bucket. :githublink:`%|py|323` """
[docs] def __init__(self, binner=None, estimator=None, n_jobs=None, verbose=False): """ :param binner: transformer or predictor which creates the buckets :param estimator: predictor trained on every bucket :param n_jobs: number of parallel jobs (for training and predicting) :param verbose: boolean or use ``'tqdm'`` to use :epkg:`tqdm` to fit the estimators *binner* allows the following values: - ``tree``: the model is :epkg:`sklearn:tree:DecisionTreeRegressor` - ``'bins'``: the model :epkg:`sklearn:preprocessing:KBinsDiscretizer` - any instanciated model *estimator* allows the following values: - ``None``: the model is :epkg:`sklearn:linear_model:LinearRegression` - any instanciated model :githublink:`%|py|343` """ if estimator is None: estimator = LinearRegression() if binner in ('tree', None): binner = DecisionTreeRegressor(min_samples_leaf=2) RegressorMixin.__init__(self) PiecewiseEstimator.__init__(self, binner=binner, estimator=estimator, n_jobs=n_jobs, verbose=verbose)
[docs] def predict(self, X): """ Computes the predictions. Parameters ---------- X: features, *X* is converted into an array if *X* is a dataframe Returns ------- predictions :githublink:`%|py|364` """ return self._apply_predict_method(X, "predict", _predict_piecewise_estimator, self.dim_)
[docs]class PiecewiseClassifier(PiecewiseEstimator, ClassifierMixin): """ Uses a :epkg:`decision tree` to split the space of features into buckets and trains a logistic regression (default) on each of them. The second estimator is usually a :epkg:`sklearn:linear_model:LogisticRegression`. It can also be :epkg:`sklearn:dummy:DummyClassifier` to just get the average on each bucket. The main issue with the *PiecewiseClassifier* is that each piece requires one example of each class in each bucket which may not happen. To avoid that, the training will pick up random example from other bucket to ensure this case does not happen. :githublink:`%|py|380` """
[docs] def __init__(self, binner=None, estimator=None, n_jobs=None, random_state=None, verbose=False): """ :param binner: transformer or predictor which creates the buckets :param estimator: predictor trained on every bucket :param n_jobs: number of parallel jobs (for training and predicting) :param random_state: to pick up random examples when buckets do not contain enough examples of each class :param verbose: boolean or use ``'tqdm'`` to use :epkg:`tqdm` to fit the estimators *binner* allows the following values: - ``tree``: the model is :epkg:`sklearn:tree:DecisionTreeClassifier` - ``'bins'``: the model :epkg:`sklearn:preprocessing:KBinsDiscretizer` - any instanciated model *estimator* allows the following values: - ``None``: the model is :epkg:`sklearn:linear_model:LogisticRegression` - any instanciated model :githublink:`%|py|403` """ if estimator is None: estimator = LogisticRegression() if binner in ('tree', None): binner = DecisionTreeClassifier(min_samples_leaf=5) ClassifierMixin.__init__(self) PiecewiseEstimator.__init__(self, binner=binner, estimator=estimator, n_jobs=n_jobs, verbose=verbose) self.random_state = random_state
[docs] def predict(self, X): """ Computes the predictions. Parameters ---------- X: features, *X* is converted into an array if *X* is a dataframe Returns ------- predictions :githublink:`%|py|425` """ pred = self._apply_predict_method( X, "predict", _predict_piecewise_estimator, 1) return pred.astype(numpy.int32)
[docs] def predict_proba(self, X): """ Computes the predictions probabilities. Parameters ---------- X: features, *X* is converted into an array if *X* is a dataframe Returns ------- predictions probabilities :githublink:`%|py|442` """ return self._apply_predict_method(X, "predict_proba", _predict_proba_piecewise_estimator, len(self.mean_estimator_.classes_))
[docs] def decision_function(self, X): """ Computes the predictions probabilities. Parameters ---------- X: features, *X* is converted into an array if *X* is a dataframe Returns ------- predictions probabilities :githublink:`%|py|458` """ justone = self.mean_estimator_.decision_function(X[:1]) return self._apply_predict_method(X, "decision_function", _decision_function_piecewise_estimator, 1 if len(justone.shape) == 1 else justone.shape[1])