Source code for mlinsights.mlmodel.sklearn_transform_inv_fct

"""
Implements a transform which modifies the target
and applies the reverse transformation on the target.


:githublink:`%|py|6`
"""
import numpy
from sklearn.exceptions import NotFittedError
from sklearn.neighbors import NearestNeighbors
from .sklearn_transform_inv import BaseReciprocalTransformer


[docs]class FunctionReciprocalTransformer(BaseReciprocalTransformer): """ The transform is used to apply a function on a the target, predict, then transform the target back before scoring. The transforms implements a series of predefined functions: .. runpython:: :showcode: import pprint from mlinsights.mlmodel.sklearn_transform_inv_fct import FunctionReciprocalTransformer pprint.pprint(FunctionReciprocalTransformer.available_fcts()) :githublink:`%|py|24` """
[docs] @staticmethod def available_fcts(): """ Returns the list of predefined functions. :githublink:`%|py|30` """ return { 'log': (numpy.log, 'exp'), 'exp': (numpy.exp, 'log'), 'log(1+x)': (lambda x: numpy.log(x + 1), 'exp(x)-1'), 'log1p': (numpy.log1p, 'expm1'), 'exp(x)-1': (lambda x: numpy.exp(x) - 1, 'log'), 'expm1': (numpy.expm1, 'log1p'), }
[docs] def __init__(self, fct, fct_inv=None): """ :param fct: function name of numerical function :param fct_inv: optional if *fct* is a function name, reciprocal function otherwise :githublink:`%|py|45` """ BaseReciprocalTransformer.__init__(self) if isinstance(fct, str): if fct_inv is not None: raise ValueError( "If fct is a function name, fct_inv must not be specified.") opts = self.__class__.available_fcts() if fct not in opts: raise ValueError("Unknown fct '{}', it should in {}.".format( fct, list(sorted(opts)))) else: if fct_inv is None: raise ValueError( "If fct is callable, fct_inv must be specified.") self.fct = fct self.fct_inv = fct_inv
[docs] def fit(self, X=None, y=None, sample_weight=None): """ Just defines *fct* and *fct_inv*. :githublink:`%|py|65` """ if callable(self.fct): self.fct_ = self.fct self.fct_inv_ = self.fct_inv else: opts = self.__class__.available_fcts() self.fct_, self.fct_inv_ = opts[self.fct] return self
[docs] def get_fct_inv(self): """ Returns a trained transform which reverse the target after a predictor. :githublink:`%|py|78` """ if isinstance(self.fct_inv_, str): res = FunctionReciprocalTransformer(self.fct_inv_) else: res = FunctionReciprocalTransformer(self.fct_inv_, self.fct_) return res.fit()
[docs] def transform(self, X, y): """ Transforms *X* and *y*. Returns transformed *X* and *y*. If *y* is None, the returned value for *y* is None as well. :githublink:`%|py|91` """ if y is None: return X, None return X, self.fct_(y)
[docs]class PermutationReciprocalTransformer(BaseReciprocalTransformer): """ The transform is used to permute targets, predict, then permute the target back before scoring. nan values remain nan values. Once fitted, the transform has attribute ``permutation_`` which keeps track of the permutation to apply. :githublink:`%|py|104` """
[docs] def __init__(self, random_state=None, closest=False): """ :param random_state: random state :param closest: if True, finds the closest permuted element :githublink:`%|py|110` """ BaseReciprocalTransformer.__init__(self) self.random_state = random_state self.closest = closest
[docs] def fit(self, X=None, y=None, sample_weight=None): """ Defines a random permutation over the targets. :githublink:`%|py|118` """ if y is None: raise RuntimeError("targets cannot be empty.") num = numpy.issubdtype(y.dtype, numpy.floating) perm = {} for u in y.ravel(): if num and numpy.isnan(u): continue if u in perm: continue perm[u] = len(perm) lin = numpy.arange(len(perm)) if self.random_state is None: lin = numpy.random.permutation(lin) else: rs = numpy.random.RandomState( # pylint: disable=E1101 self.random_state) # pylint: disable=E1101 lin = rs.permutation(lin) for u in perm: perm[u] = lin[perm[u]] self.permutation_ = perm
[docs] def _check_is_fitted(self): if not hasattr(self, 'permutation_'): raise NotFittedError( "This instance {} is not fitted yet. Call 'fit' with " "appropriate arguments before using this method.".format( type(self)))
[docs] def get_fct_inv(self): """ Returns a trained transform which reverse the target after a predictor. :githublink:`%|py|153` """ self._check_is_fitted() res = PermutationReciprocalTransformer( self.random_state, closest=self.closest) res.permutation_ = {v: k for k, v in self.permutation_.items()} return res
[docs] def _find_closest(self, cl): if not hasattr(self, 'knn_'): self.knn_ = NearestNeighbors(n_neighbors=1, algorithm='kd_tree') self.knn_perm_ = numpy.array(list(self.permutation_)) self.knn_perm_ = self.knn_perm_.reshape((len(self.knn_perm_), 1)) self.knn_.fit(self.knn_perm_) ind = self.knn_.kneighbors([[cl]], return_distance=False) res = self.knn_perm_[ind, 0] if self.knn_perm_.dtype in (numpy.float32, numpy.float64): return float(res) if self.knn_perm_.dtype in (numpy.int32, numpy.int64): return int(res) raise NotImplementedError("The function does not work for type {}.".format( self.knn_perm_.dtype))
[docs] def transform(self, X, y): """ Transforms *X* and *y*. Returns transformed *X* and *y*. If *y* is None, the returned value for *y* is None as well. :githublink:`%|py|181` """ if y is None: return X, None self._check_is_fitted() if len(y.shape) == 1 or y.dtype in (numpy.str, numpy.int32, numpy.int64): # permutes classes yp = y.copy().ravel() num = numpy.issubdtype(y.dtype, numpy.floating) for i in range(len(yp)): # pylint: disable=C0200 if num and numpy.isnan(yp[i]): continue if yp[i] not in self.permutation_: if self.closest: cl = self._find_closest(yp[i]) else: raise RuntimeError("Unable to find key '{}' in {}.".format( yp[i], list(sorted(self.permutation_)))) else: cl = yp[i] yp[i] = self.permutation_[cl] return X, yp.reshape(y.shape) else: # y is probababilies or raw score if len(y.shape) != 2: raise RuntimeError( "yp should be a matrix but has shape {}.".format(y.shape)) cl = [(v, k) for k, v in self.permutation_.items()] cl.sort() new_perm = {} for cl, current in cl: new_perm[current] = len(new_perm) yp = y.copy() for i in range(y.shape[1]): yp[:, new_perm[i]] = y[:, i] return X, yp