Code source de mlstatpy.ml.roc

# -*- coding: utf-8 -*-
"""
About ROC


:githublink:`%|py|6`
"""
import math
import itertools
from enum import Enum
import pandas
import numpy


[docs]class ROC: """ Helper to draw a :epkg:`ROC` curve. :githublink:`%|py|16` """
[docs] class CurveType(Enum): """ Curve types: * *PROBSCORE*: 1 - False Positive / True Positive * *ERRPREC*: error / recall * *RECPREC*: precision / recall * *ROC*: False Positive / True Positive * *SKROC*: False Positive / True Positive (`scikit-learn <http://scikit-learn.org/stable/modules/generated/sklearn.metrics.roc_curve.html>`_) :githublink:`%|py|29` """ PROBSCORE = 2 ERRREC = 3 RECPREC = 4 ROC = 5 SKROC = 6
[docs] def __init__(self, y_true=None, y_score=None, sample_weight=None, df=None): """ Initialisation with a dataframe and two or three columns: * column 1: score (y_score) * column 2: expected answer (boolean) (y_true) * column 3: weight (optional) (sample_weight) :param y_true: if *df* is None, *y_true*, *y_score*, *sample_weight* must be filled, *y_true* is whether or None the answer is true. *y_true* means the prediction is right. :param y_score: score prediction :param sample_weight: weights :param df: dataframe or array or list, it must contains 2 or 3 columns always in the same order :githublink:`%|py|51` """ if df is None: df = pandas.DataFrame() df["score"] = y_score df["label"] = y_true if sample_weight is not None: df["weight"] = sample_weight self.data = df elif isinstance(df, list): if len(df[0]) == 2: self.data = pandas.DataFrame(df, columns=["score", "label"]) else: self.data = pandas.DataFrame( df, columns=["score", "label", "weight"]) elif isinstance(df, numpy.ndarray): if df.shape[1] == 2: self.data = pandas.DataFrame(df, columns=["score", "label"]) else: self.data = pandas.DataFrame( df, columns=["score", "label", "weight"]) elif not isinstance(df, pandas.DataFrame): raise TypeError( "df should be a DataFrame, not {0}".format(type(df))) else: self.data = df.copy() self.data.sort_values(self.data.columns[0], inplace=True) self.data.reset_index(drop=True, inplace=True) if self.data.shape[1] == 2: self.data["weight"] = 1.0
@property def Data(self): """ Returns the underlying dataframe. :githublink:`%|py|85` """ return self.data
[docs] def __len__(self): """ usual :githublink:`%|py|91` """ return len(self.data)
[docs] def __repr__(self): """ Shows first elements, precision rate. :githublink:`%|py|97` """ return self.__str__()
[docs] def __str__(self): """ Shows first elements, precision rate. :githublink:`%|py|103` """ rows = [] rows.append("Overall precision: %3.2f - AUC=%f" % (self.precision(), self.auc())) rows.append("--------------") rows.append(str(self.data.head(min(5, len(self))))) rows.append("--------------") rows.append(str(self.data.tail(min(5, len(self))))) rows.append("--------------") roc = self.compute_roc_curve(10, ROC.CurveType.ROC) rows.append(str(roc)) rows.append("--------------") roc = self.compute_roc_curve(10, ROC.CurveType.ERRREC) rows.append(str(roc)) return "\n".join(rows)
[docs] def confusion(self, score=None, nb=10, curve=CurveType.ROC, bootstrap=False): """ Computes the confusion matrix for a specific *score* or all if *score* is None. :param score: score or None. :param nb: number of scores (if *score* is None) :param curve: see :class:`CurveType <mlstatpy.ml.roc.ROC.CurveType>` :param boostrap: builds the curve after resampling :return: One row if score is precised, many roww is score is None :githublink:`%|py|129` """ if not bootstrap: cloud = self.data.copy() else: cloud = self.random_cloud() if score is None: sum_weights = cloud[cloud.columns[2]].sum() if nb <= 0: nb = len(cloud) else: nb = min(nb, len(cloud)) seuil = numpy.arange(nb + 1) * sum_weights / nb cloud = cloud.iloc[::-1].copy() cloud["lw"] = cloud[cloud.columns[2]] * cloud[cloud.columns[1]] cloud["cw"] = cloud[cloud.columns[2]].cumsum() cloud["clw"] = cloud["lw"].cumsum() if cloud.columns[4] != "cw": raise ValueError("Column 4 should be 'cw'.") if cloud.columns[5] != "clw": raise ValueError("Column 5 should be 'clw'.") pos_roc = 0 pos_seuil = 0 if curve is ROC.CurveType.ROC: roc = pandas.DataFrame(0, index=numpy.arange( nb + 2), columns=["True Positive", "False Positive", "False Negative", "True Negative", "threshold"]) sum_good_weights = cloud.iloc[-1, 5] sum_bad_weights = sum_weights - sum_good_weights roc.iloc[0, 0] = 0 roc.iloc[0, 1] = 0 roc.iloc[0, 2] = sum_good_weights roc.iloc[0, 3] = sum_bad_weights roc.iloc[0, 4] = max(cloud.iloc[:, 0]) pos_roc += 1 for i in range(len(cloud)): if cloud.iloc[i, 4] > seuil[pos_seuil]: tp = cloud.iloc[i, 5] fp = cloud.iloc[i, 4] - cloud.iloc[i, 5] roc.iloc[pos_roc, 0] = tp roc.iloc[pos_roc, 1] = fp roc.iloc[pos_roc, 2] = sum_good_weights - tp roc.iloc[pos_roc, 3] = sum_bad_weights - fp roc.iloc[pos_roc, 4] = cloud.iloc[i, 0] pos_roc += 1 pos_seuil += 1 roc.iloc[pos_roc:, 0] = sum_good_weights roc.iloc[pos_roc:, 1] = sum_bad_weights roc.iloc[pos_roc:, 2] = 0 roc.iloc[pos_roc:, 3] = 0 roc.iloc[pos_roc:, 4] = min(cloud.iloc[:, 0]) return roc else: raise NotImplementedError( "Unexpected type '{0}', only ROC is allowed.".format(curve)) else: roc = self.confusion(nb=len(self), curve=curve, bootstrap=False, score=None) roc = roc[roc["threshold"] <= score] if len(roc) == 0: raise ValueError( "The requested confusion is empty for score={0}.".format(score)) return roc[:1]
[docs] def precision(self): """ Computes the precision. :githublink:`%|py|200` """ score, weight = self.data.columns[0], self.data.columns[2] return (self.data[score] * self.data[weight] * 1.0).sum() / self.data[weight].sum()
[docs] def compute_roc_curve(self, nb=100, curve=CurveType.ROC, bootstrap=False): """ Computes a ROC curve with *nb* points avec nb, if *nb == -1*, there are as many as points as the data contains, if *bootstrap == True*, it draws random number to create confidence interval based on bootstrap method. :param nb: number of points for the curve :param curve: see :class:`CurveType <mlstatpy.ml.roc.ROC.CurveType>` :param boostrap: builds the curve after resampling :return: DataFrame (metrics and threshold) If *curve* is *SKROC*, the parameter *nb* is not taken into account. It should be set to 0. :githublink:`%|py|218` """ if curve is ROC.CurveType.ERRREC: roc = self.compute_roc_curve( nb=nb, curve=ROC.CurveType.RECPREC, bootstrap=bootstrap) roc["error"] = - roc["precision"] + 1 return roc[["error", "recall", "threshold"]] elif curve is ROC.CurveType.PROBSCORE: roc = self.compute_roc_curve( nb=nb, curve=ROC.CurveType.ROC, bootstrap=bootstrap) roc["P(->s)"] = roc["False Positive Rate"] roc["P(+<s)"] = - roc["True Positive Rate"] + 1 return roc[["P(+<s)", "P(->s)", "threshold"]] if not bootstrap: cloud = self.data.copy() else: cloud = self.random_cloud() if curve is ROC.CurveType.SKROC: if nb > 0: raise NotImplementedError("nb must be <= 0 si curve is SKROC") from sklearn.metrics import roc_curve fpr, tpr, thresholds = roc_curve(y_true=cloud[cloud.columns[1]], y_score=cloud[cloud.columns[0]], sample_weight=cloud[cloud.columns[2]]) roc = pandas.DataFrame(0, index=numpy.arange(len(fpr)), columns=["False Positive Rate", "True Positive Rate", "threshold"]) roc_cols = list(roc.columns) roc[roc_cols[0]] = fpr roc[roc_cols[1]] = tpr roc[roc_cols[2]] = thresholds return roc sum_weights = cloud[cloud.columns[2]].sum() if nb <= 0: nb = len(cloud) else: nb = min(nb, len(cloud)) seuil = numpy.arange(nb + 1) * sum_weights / nb cloud = cloud.iloc[::-1].copy() cloud["lw"] = cloud[cloud.columns[2]] * cloud[cloud.columns[1]] cloud["cw"] = cloud[cloud.columns[2]].cumsum() cloud["clw"] = cloud["lw"].cumsum() sum_weights_ans = cloud["lw"].sum() if cloud.columns[4] != "cw": raise ValueError("Column 4 should be 'cw'.") if cloud.columns[5] != "clw": raise ValueError("Column 5 should be 'clw'.") pos_roc = 0 pos_seuil = 0 if curve is ROC.CurveType.ROC: roc = pandas.DataFrame(0, index=numpy.arange( nb + 1), columns=["False Positive Rate", "True Positive Rate", "threshold"]) sum_good_weights = cloud.iloc[-1, 5] sum_bad_weights = sum_weights - sum_good_weights for i in range(len(cloud)): if cloud.iloc[i, 4] > seuil[pos_seuil]: ds = cloud.iloc[i, 4] - cloud.iloc[i, 5] roc.iloc[pos_roc, 0] = ds / sum_bad_weights roc.iloc[pos_roc, 1] = cloud.iloc[i, 5] / sum_good_weights roc.iloc[pos_roc, 2] = cloud.iloc[i, 0] pos_roc += 1 pos_seuil += 1 roc.iloc[pos_roc:, 0] = ( cloud.iloc[-1, 4] - cloud.iloc[-1, 5]) / sum_bad_weights roc.iloc[pos_roc:, 1] = cloud.iloc[-1, 5] / sum_good_weights roc.iloc[pos_roc:, 2] = cloud.iloc[-1, 0] elif curve is ROC.CurveType.RECPREC: roc = pandas.DataFrame(0, index=numpy.arange( nb + 1), columns=["recall", "precision", "threshold"]) for i in range(len(cloud)): if cloud.iloc[i, 4] > seuil[pos_seuil]: roc.iloc[pos_roc, 0] = cloud.iloc[i, 4] / sum_weights if cloud.iloc[i, 4] > 0: roc.iloc[pos_roc, 1] = cloud.iloc[ i, 5] / cloud.iloc[i, 4] else: roc.iloc[pos_roc, 1] = 0.0 roc.iloc[pos_roc, 2] = cloud.iloc[i, 0] pos_roc += 1 pos_seuil += 1 roc.iloc[pos_roc:, 0] = 1.0 roc.iloc[pos_roc:, 1] = sum_weights_ans / sum_weights roc.iloc[pos_roc:, 2] = cloud.iloc[-1, 0] else: raise NotImplementedError("Unknown curve type '{}'.".format(curve)) return roc
[docs] def random_cloud(self): """ Resamples among the data. :return: DataFrame :githublink:`%|py|317` """ res = self.data.sample(len(self.data), weights=self.data[ self.data.columns[2]], replace=True) return res.sort_values(res.columns[0])
[docs] def plot(self, nb=100, curve=CurveType.ROC, bootstrap=0, ax=None, thresholds=False, **kwargs): """ Plots a :epkg:`ROC` curve. :param nb: number of points :param curve: see :class:`CurveType <mlstatpy.ml.roc.ROC.CurveType>` :param boostrap: number of curves for the boostrap (0 for None) :param ax: axis :param thresholds: use thresholds for the X axis :param kwargs: sent to `pandas.plot <http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.plot.html>`_ :return: ax :githublink:`%|py|334` """ nb_bootstrap = 0 if bootstrap > 0: ckwargs = kwargs.copy() if 'color' not in ckwargs: ckwargs['color'] = 'r' if 'linewidth' not in kwargs: ckwargs['linewidth'] = 0.2 ckwargs['legend'] = False if 'label' in ckwargs: del ckwargs['label'] for _ in range(0, bootstrap): roc = self.compute_roc_curve(nb, curve=curve, bootstrap=True) if thresholds: cols = list(_ for _ in roc.columns if _ != "threshold") roc = roc.sort_values("threshold").reset_index(drop=True) ax = roc.plot(x="threshold", y=cols, ax=ax, label=['_nolegend_' for i in cols], **ckwargs) else: cols = list(_ for _ in roc.columns[ 1:] if _ != "threshold") roc = roc.sort_values( roc.columns[0]).reset_index(drop=True) ax = roc.plot(x=roc.columns[0], y=cols, ax=ax, label=['_nolegend_' for i in cols], **ckwargs) nb_bootstrap += len(cols) bootstrap = 0 if bootstrap <= 0: if 'legend' not in kwargs: kwargs['legend'] = False roc = self.compute_roc_curve(nb, curve=curve) if not thresholds: roc = roc[[_ for _ in roc.columns if _ != "threshold"]] cols = list(_ for _ in roc.columns if _ != "threshold") final = 0 if thresholds: if 'label' in kwargs and len(cols) != len(kwargs['label']): raise ValueError( 'label must have {0} values'.format(len(cols))) roc = roc.sort_values("threshold").reset_index(drop=True) ax = roc.plot(x="threshold", y=cols, ax=ax, **kwargs) ax.set_ylim([0, 1]) ax.set_xlabel("thresholds") final += len(cols) diag = 0 else: if 'label' in kwargs and len(cols) - 1 != len(kwargs['label']): raise ValueError( 'label must have {0} values'.format(len(cols) - 1)) final += len(cols) - 1 roc = roc.sort_values(cols[0]).reset_index(drop=True) ax = roc.plot(x=cols[0], y=cols[1:], ax=ax, **kwargs) if curve is ROC.CurveType.ROC or curve is ROC.CurveType.SKROC: ax.plot([0, 1], [0, 1], "k--", linewidth=1) diag = 1 else: diag = 0 ax.set_xlabel(roc.columns[0]) if len(roc.columns) == 2: ax.set_ylabel(roc.columns[1]) ax.set_xlim([0, 1]) ax.set_ylim([0, 1]) # legend handles, labels = ax.get_legend_handles_labels() tot = final + nb_bootstrap diag = len(handles) - diag handles = handles[:-tot] + handles[-final:diag] new_labels = labels[:-tot] + labels[-final:diag] ax.legend(handles, new_labels) return ax
[docs] def auc(self, cloud=None): """ Computes the area under the curve (:epkg:`AUC`). :param cloud: data or None to use ``self.data``, the function assumes the data is sorted. :return: AUC The first column is the label, the second one is the score, the third one is the weight. :githublink:`%|py|419` """ if cloud is None: cloud = self.data good = cloud[cloud[cloud.columns[1]] == 1] wrong = cloud[cloud[cloud.columns[1]] == 0] auc = 0.0 for a, b in itertools.product(good.itertuples(False), wrong.itertuples(False)): if a[0] > b[0]: auc += a[2] * b[2] elif a[0] >= b[0]: auc += a[2] * b[2] / 2 if auc == 0 and good.shape[0] + wrong.shape[0] < self.data.shape[0]: raise ValueError("Label are not right, expect 0 and 1 not {0}".format( set(cloud[cloud.columns[1]]))) n = len(wrong) * len(good) if n > 0: auc /= float(n) return auc
[docs] def auc_interval(self, bootstrap=10, alpha=0.95): """ Determines a confidence interval for the :epkg:`AUC` with bootstrap. :param bootstrap: number of random estimation :param alpha: define the confidence interval :return: dictionary of values :githublink:`%|py|445` """ if bootstrap <= 1: raise ValueError("Use auc instead, bootstrap < 2") rate = [] for _ in range(0, bootstrap): cloud = self.random_cloud() auc = self.auc(cloud) rate.append(auc) rate.sort() ra = self.auc(self.data) i1 = int(alpha * len(rate) / 2) i2 = max(i1, len(rate) - i1 - 1) med = rate[len(rate) // 2] moy = float(sum(rate)) / len(rate) var = 0 for r in rate: var += r * r var = float(var) / len(rate) var = var - moy * moy return dict(auc=ra, interval=(rate[i1], rate[i2]), min=rate[0], max=rate[len(rate) - 1], mean=moy, var=math.sqrt(var), mediane=med)
[docs] def roc_intersect(self, roc, x): """ The :epkg:`ROC` curve is defined by a set of points. This function interpolates those points to determine *y* for any *x*. :param roc: ROC curve :param x: x :return: y :githublink:`%|py|479` """ below = roc[roc[roc.columns[0]] <= x] i = len(below) if i == len(roc): return 0.0 p2 = tuple(roc.iloc[i, :]) if i - 1 <= 0: p1 = (1, 1) else: p1 = tuple(roc.iloc[i - 1, :]) if p1[0] == p2[0]: return (p1[1] + p2[0]) / 2 else: return (x - p1[0]) / (p2[0] - p1[0]) * (p2[1] - p1[1]) + p1[1]
[docs] def roc_intersect_interval(self, x, nb, curve=CurveType.ROC, bootstrap=10, alpha=0.05): """ Computes a confidence interval for the value returned by :meth:`roc_intersect <mlstatpy.ml.roc.ROC.roc_intersect>`. :param roc: ROC curve :param x: x :param curve: see :class:`CurveType <mlstatpy.ml.roc.ROC.CurveType>` :return: dictionary :githublink:`%|py|505` """ rate = [] for _ in range(0, bootstrap): roc = self.compute_roc_curve(nb, curve=curve, bootstrap=True) r = self.roc_intersect(roc, x) rate.append(r) rate.sort() roc = self.compute_roc_curve(nb, curve=curve) ra = self.roc_intersect(roc, x) i1 = int(alpha * len(rate) / 2) i2 = max(i1, len(rate) - i1 - 1) med = rate[len(rate) // 2] moy = float(sum(rate)) / len(rate) var = 0 for r in rate: var += r * r var = float(var) / len(rate) var = var - moy * moy return dict(y=ra, interval=(rate[i1], rate[i2]), min=rate[0], max=rate[len(rate) - 1], mean=moy, var=math.sqrt(var), mediane=med)