"""
Builds a tree of logistic regressions.
:githublink:`%|py|5`
"""
import numpy
import scipy.sparse as sparse
from sklearn.linear_model import LogisticRegression
from sklearn.base import BaseEstimator, ClassifierMixin, clone
from sklearn.linear_model._base import LinearClassifierMixin
[docs]def logistic(x):
"""
Computes :math:`\\frac{1}{1 + e^{-x}}`.
:githublink:`%|py|15`
"""
return 1. / (1. + numpy.exp(-x))
[docs]def likelihood(x, y, theta=1., th=0.):
"""
Computes :math:`\\sum_i y_i f(\\theta (x_i - x_0)) + (1 - y_i) (1 - f(\\theta (x_i - x_0)))`
where :math:`f(x_i)` is :math:`\\frac{1}{1 + e^{-x}}`.
:githublink:`%|py|23`
"""
lr = logistic((x - th) * theta)
return y * lr + (1. - y) * (1 - lr)
[docs]class _DecisionTreeLogisticRegressionNode:
"""
Describes the tree structure hold by class
:class:`DecisionTreeLogisticRegression <mlinsights.mlmodel.decision_tree_logreg.DecisionTreeLogisticRegression>`.
See also notebook :ref:`decisiontreelogregrst`.
:githublink:`%|py|33`
"""
[docs] def __init__(self, estimator, threshold=0.5, depth=1, index=0):
"""
constructor
:param estimator: binary estimator
:githublink:`%|py|40`
"""
self.index = index
self.estimator = estimator
self.above = None
self.below = None
self.threshold = threshold
self.depth = depth
[docs] def predict(self, X):
"""
Predicts
:githublink:`%|py|51`
"""
prob = self.predict_proba(X)
return (prob[:, 1] >= 0.5).astype(numpy.int32)
[docs] def predict_proba(self, X):
"""
Returns the classification probabilities.
:param X: features
:return: probabilties
:githublink:`%|py|61`
"""
prob = self.estimator.predict_proba(X)
above = prob[:, 1] > self.threshold
below = ~ above
n_above = above.sum()
n_below = below.sum()
if self.above is not None and n_above > 0:
prob_above = self.above.predict_proba(X[above])
prob[above] = prob_above
if self.below is not None and n_below > 0:
prob_below = self.below.predict_proba(X[below])
prob[below] = prob_below
return prob
[docs] def decision_path(self, X, mat, indices):
"""
Returns the classification probabilities.
:param X: features
:param mat: decision path (allocated matrix)
:githublink:`%|py|81`
"""
mat[indices, self.index] = 1
prob = self.estimator.predict_proba(X)
above = prob[:, 1] > self.threshold
below = ~ above
n_above = above.sum()
n_below = below.sum()
indices_above = indices[above]
indices_below = indices[below]
if self.above is not None and n_above > 0:
self.above.decision_path(X[above], mat, indices_above)
if self.below is not None and n_below > 0:
self.below.decision_path(X[below], mat, indices_below)
[docs] def fit(self, X, y, sample_weight, dtlr, total_N):
"""
Fits a logistic regression, then splits the sample into
positive and negative examples, finally tries to fit
logistic regressions on both subsamples. This method only
works on a linear classifier.
:param X: features
:param y: binary labels
:param sample_weight: weights of every sample
:param dtlr: :class:`DecisionTreeLogisticRegression <mlinsights.mlmodel.decision_tree_logreg.DecisionTreeLogisticRegression>`
:param total_N: total number of observation
:return: last index
:githublink:`%|py|108`
"""
self.estimator.fit(X, y, sample_weight=sample_weight)
if dtlr.verbose >= 1:
print("[DTLR ] %s trained acc %1.2f N=%d" % ( # pragma: no cover
" " * self.depth, self.estimator.score(X, y), X.shape[0]))
prob = self.fit_improve(dtlr, total_N, X, y,
sample_weight=sample_weight)
if self.depth + 1 > dtlr.max_depth:
return self.index
if X.shape[0] < dtlr.min_samples_split:
return self.index
above = prob[:, 1] > self.threshold
below = ~ above
n_above = above.sum()
n_below = below.sum()
y_above = set(y[above])
y_below = set(y[below])
def _fit_side(index, y_above_below, above_below, n_above_below, side):
if dtlr.verbose >= 1:
print("[DTLR*] %s%s: n_class=%d N=%d - %d/%d" % (
" " * self.depth, side,
len(y_above_below), above_below.shape[0],
n_above_below, total_N))
if (len(y_above_below) > 1 and
above_below.shape[0] > dtlr.min_samples_leaf * 2 and
(float(n_above_below) / total_N >=
dtlr.min_weight_fraction_leaf * 2) and
n_above_below < total_N):
estimator = clone(dtlr.estimator)
sw = sample_weight[above_below] if sample_weight is not None else None
node = _DecisionTreeLogisticRegressionNode(
estimator, self.threshold, depth=self.depth + 1, index=index)
last_index = node.fit(
X[above_below], y[above_below], sw, dtlr, total_N)
return node, last_index
return None, index
self.above, last = _fit_side(
self.index + 1, y_above, above, n_above, "above")
self.below, last = _fit_side(
last + 1, y_below, below, n_below, "below")
return last
@property
def tree_depth_(self):
"""
Returns the maximum depth of the tree.
:githublink:`%|py|158`
"""
dt = self.depth
if self.above is not None:
dt = max(dt, self.above.tree_depth_)
if self.below is not None:
dt = max(dt, self.below.tree_depth_)
return dt
[docs] def fit_improve(self, dtlr, total_N, X, y, sample_weight):
"""
The method only works on a linear classifier, it changes
the intercept in order to be within the constraints
imposed by the *min_samples_leaf* and *min_weight_fraction_leaf*.
The algorithm has a significant cost as it sorts every observation
and chooses the best intercept.
:param dtlr: :class:`DecisionTreeLogisticRegression <mlinsights.mlmodel.decision_tree_logreg.DecisionTreeLogisticRegression>`
:param total_N: total number of observations
:param X: features
:param y: labels
:param sample_weight: sample weight
:return: probabilities
:githublink:`%|py|180`
"""
if self.estimator is None:
raise RuntimeError(
"Estimator was not trained.") # pragma: no cover
prob = self.estimator.predict_proba(X)
if dtlr.fit_improve_algo in (None, 'none'):
return prob
if not isinstance(self.estimator, LinearClassifierMixin):
# The classifier is not linear and cannot be improved.
if dtlr.fit_improve_algo == 'intercept_sort_always': # pragma: no cover
raise RuntimeError(
"The model is not linear ({}), "
"intercept cannot be improved.".format(self.estimator.__class__.__name__))
return prob
above = prob[:, 1] > self.threshold
below = ~ above
n_above = above.sum()
n_below = below.sum()
n_min = min(n_above, n_below)
p1p2 = float(n_above * n_below) / X.shape[0] ** 2
if dtlr.verbose >= 2:
print("[DTLRI] %s imp %d <> %d, p1p2=%1.3f <> %1.3f" % ( # pragma: no cover
" " * self.depth, n_min, dtlr.min_samples_leaf,
p1p2, dtlr.p1p2))
if (n_min >= dtlr.min_samples_leaf and
float(n_min) / total_N >= dtlr.min_weight_fraction_leaf and
p1p2 > dtlr.p1p2 and
dtlr.fit_improve_algo != 'intercept_sort_always'):
return prob
coef = self.estimator.coef_
decision_function = (X @ coef.T).ravel()
order = numpy.argsort(decision_function, axis=0)
begin = dtlr.min_samples_leaf
sorted_df = decision_function[order]
sorted_y = decision_function[order]
N = sorted_y.shape[0]
best = None
besti = None
beta_best = None
for i in range(begin, N - begin):
beta = - sorted_df[i]
like = numpy.sum(likelihood(decision_function + beta, y)) / N
w = float(i * (N - i)) / N**2
like += w * dtlr.gamma
if besti is None or like > best:
best = like
besti = i
beta_best = beta
if beta_best is not None:
if dtlr.verbose >= 1:
print("[DTLRI] %s change intercept %f --> %f in [%f, %f]" % ( # pragma: no cover
" " * self.depth, self.estimator.intercept_, beta_best,
- sorted_df[-1], - sorted_df[0]))
self.estimator.intercept_ = beta_best
prob = self.estimator.predict_proba(X)
return prob
[docs] def enumerate_leaves_index(self):
"""
Returns the leaves index.
:githublink:`%|py|246`
"""
if self.above is None or self.below is None:
yield self.index
if self.above is not None:
for index in self.above.enumerate_leaves_index():
yield index
if self.below is not None:
for index in self.below.enumerate_leaves_index():
yield index
[docs]class DecisionTreeLogisticRegression(BaseEstimator, ClassifierMixin):
"""
Fits a logistic regression, then fits two other
logistic regression for every observation on both sides
of the border. It goes one until a tree is built.
It only handles a binary classification.
The built tree cannot be deeper than the maximum recursion.
:param estimator: binary classification estimator,
if empty, use a logistic regression, the theoritical
model defined with a logistic regression but it could
any binary classifier
:param max_depth: int, default=None
The maximum depth of the tree. If None, then nodes are expanded until
all leaves are pure or until all leaves contain less than
min_samples_split samples. It must be below the maximum
allowed recursion by python.
:param min_samples_split: int or float, default=2
The minimum number of samples required to split an internal node:
- If int, then consider `min_samples_split` as the minimum number.
- If float, then `min_samples_split` is a fraction and
`ceil(min_samples_split * n_samples)` are the minimum
number of samples for each split.
:param min_samples_leaf: int or float, default=1
The minimum number of samples required to be at a leaf node.
A split point at any depth will only be considered if it leaves at
least ``min_samples_leaf`` training samples in each of the left and
right branches. This may have the effect of smoothing the model,
especially in regression.
- If int, then consider `min_samples_leaf` as the minimum number.
- If float, then `min_samples_leaf` is a fraction and
`ceil(min_samples_leaf * n_samples)` are the minimum
number of samples for each node.
:param min_weight_fraction_leaf: float, default=0.0
The minimum weighted fraction of the sum total of weights (of all
the input samples) required to be at a leaf node. Samples have
equal weight when sample_weight is not provided.
:param fit_improve_algo: string, one of the following value:
- `'auto'`: chooses the best option below, '`none'` for
every non linear model, `'intercept_sort'` for linear models
- '`none'`: does not nothing once the binary classifier is fit
- `'intercept_sort'`: if one side of the classifier is too small,
the method changes the best intercept possible verifying
the constraints
- `'intercept_sort_always'`: always chooses the best intercept
possible
:param p1p2: threshold in [0, 1]
for every split, we can define probabilities :math:`p_1 p_2`
which define the ratio of samples in both splits,
if :math:`p_1 p_2` is below the threshold,
method *fit_improve* is called
:param gamma: weight before the coefficient :math:`p (1-p)`.
When the model tries to improve the linear classifier,
it looks a better intercept which maximizes the
likelihood and verifies the constraints.
In order to force the classifier to choose a value
which splits the dataset into 2 almost equal folds,
the function maximimes :math:`likelihood + \\gamma p (1 - p)`
where *p* is the proportion of samples falling in the first
fold.
:param verbose: prints out information about the training
Fitted attributes:
* `classes_`: ndarray of shape (n_classes,) or list of ndarray
The classes labels (single output problem),
or a list of arrays of class labels (multi-output problem).
* `tree_`: Tree
The underlying Tree object.
:githublink:`%|py|326`
"""
_fit_improve_algo_values = (
None, 'none', 'auto', 'intercept_sort', 'intercept_sort_always')
[docs] def __init__(self, estimator=None,
max_depth=20, min_samples_split=2,
min_samples_leaf=2, min_weight_fraction_leaf=0.0,
fit_improve_algo='auto', p1p2=0.09,
gamma=1., verbose=0):
"constructor"
ClassifierMixin.__init__(self)
BaseEstimator.__init__(self)
# logistic regression
if estimator is None:
self.estimator = LogisticRegression()
else:
self.estimator = estimator
if max_depth is None:
raise ValueError("'max_depth' cannot be None.") # pragma: no cover
if max_depth > 1024:
raise ValueError(
"'max_depth' must be <= 1024.") # pragma: no cover
self.max_depth = max_depth
self.min_samples_split = min_samples_split
self.min_samples_leaf = min_samples_leaf
self.min_weight_fraction_leaf = min_weight_fraction_leaf
self.fit_improve_algo = fit_improve_algo
self.p1p2 = p1p2
self.gamma = gamma
self.verbose = verbose
if self.fit_improve_algo not in DecisionTreeLogisticRegression._fit_improve_algo_values:
raise ValueError(
"fit_improve_algo='{}' not in {}".format(
self.fit_improve_algo, DecisionTreeLogisticRegression._fit_improve_algo_values))
[docs] def fit(self, X, y, sample_weight=None):
"""
Builds the tree model.
:param X: numpy array or sparse matrix of shape [n_samples,n_features]
Training data
:param y: numpy array of shape [n_samples, n_targets]
Target values. Will be cast to X's dtype if necessary
:param sample_weight: numpy array of shape [n_samples]
Individual weights for each sample
:return: self : returns an instance of self.
Fitted attributes:
* `classes_`: classes
* `tree_`: tree structure, see :class:`_DecisionTreeLogisticRegressionNode <mlinsights.mlmodel.decision_tree_logreg._DecisionTreeLogisticRegressionNode>`
* `n_nodes_`: number of nodes
:githublink:`%|py|380`
"""
if not isinstance(X, numpy.ndarray):
if hasattr(X, 'values'):
X = X.values
if not isinstance(X, numpy.ndarray):
raise TypeError("'X' must be an array.")
if (sample_weight is not None and
not isinstance(sample_weight, numpy.ndarray)):
raise TypeError(
"'sample_weight' must be an array.") # pragma: no cover
self.classes_ = numpy.array(sorted(set(y)))
if len(self.classes_) != 2:
raise RuntimeError(
"The model only supports binary classification but labels are "
"{}.".format(self.classes_))
cls = (y == self.classes_[1]).astype(numpy.int32)
estimator = clone(self.estimator)
self.tree_ = _DecisionTreeLogisticRegressionNode(estimator, 0.5)
self.n_nodes_ = self.tree_.fit(
X, cls, sample_weight, self, X.shape[0]) + 1
return self
[docs] def predict(self, X):
"""
Runs the predictions.
:githublink:`%|py|405`
"""
labels = self.tree_.predict(X)
return numpy.take(self.classes_, labels)
[docs] def predict_proba(self, X):
"""
Converts predictions into probabilities.
:githublink:`%|py|412`
"""
return self.tree_.predict_proba(X)
[docs] def decision_function(self, X):
"""
Calls *decision_function*.
:githublink:`%|py|418`
"""
raise NotImplementedError( # pragma: no cover
"Decision function is not available for this model.")
@property
def tree_depth_(self):
"""
Returns the maximum depth of the tree.
:githublink:`%|py|426`
"""
return self.tree_.tree_depth_
[docs] def decision_path(self, X, check_input=True):
"""
Returns the decision path.
:param X: inputs
:param check_input: unused
:return: sparse matrix
:githublink:`%|py|436`
"""
mat = sparse.lil_matrix((X.shape[0], self.n_nodes_), dtype=numpy.int32)
self.tree_.decision_path(X, mat, numpy.arange(X.shape[0]))
return sparse.csr_matrix(mat)
[docs] def get_leaves_index(self):
"""
Returns the index of every leave.
:githublink:`%|py|444`
"""
indices = self.tree_.enumerate_leaves_index()
return numpy.array(sorted(indices))