Code source de mlstatpy.ml._neural_tree_node

# -*- coding: utf-8 -*-
"""
Conversion from tree to neural network.


:githublink:`%|py|6`
"""
import numpy
import numpy.random as rnd
from scipy.special import expit, softmax, kl_div as kl_fct  # pylint: disable=E0611
from ._neural_tree_api import _TrainingAPI


[docs]class NeuralTreeNode(_TrainingAPI): """ One node in a neural network. :githublink:`%|py|15` """
[docs] @staticmethod def _relu(x): "Relu function." return numpy.maximum(x, 0)
[docs] @staticmethod def _leakyrelu(x): "Leaky Relu function." return numpy.maximum(x, 0) + numpy.minimum(x, 0) * 0.01
[docs] @staticmethod def _drelu(x): "Derivative of the Relu function." res = numpy.ones(x.shape, dtype=x.dtype) res[x < 0] = 0. return res
[docs] @staticmethod def _dleakyrelu(x): "Derivative of the Leaky Relu function." res = numpy.ones(x.shape, dtype=x.dtype) res[x < 0] = 0.01 return res
[docs] @staticmethod def _dsigmoid(x): "Derivativ of the sigmoid function." y = expit(x) return y * (1 - y)
[docs] @staticmethod def _softmax(x): "Derivative of the softmax function." if len(x.shape) == 2: return softmax(x, axis=1) return softmax(x)
[docs] @staticmethod def _dsoftmax(x): "Derivative of the softmax function." soft = softmax(x) grad = - soft @ soft.T diag = numpy.diag(soft) return diag + grad
[docs] @staticmethod def get_activation_function(activation): """ Returns the activation function. It returns a function *y=f(x)*. :githublink:`%|py|67` """ if activation == 'softmax': return NeuralTreeNode._softmax if activation == 'softmax4': return lambda x: NeuralTreeNode._softmax(x * 4) if activation in {'logistic', 'expit', 'sigmoid'}: return expit if activation == 'sigmoid4': return lambda x: expit(x * 4) if activation == 'relu': return NeuralTreeNode._relu if activation == 'leakyrelu': return NeuralTreeNode._leakyrelu if activation == 'identity': return lambda x: x raise ValueError( # pragma: no cover "Unknown activation function '{}'.".format(activation))
[docs] @staticmethod def get_activation_gradient_function(activation): """ Returns the activation function. It returns a function *y=f'(x)*. About the sigmoid: .. math:: \\begin{array}{l} f(x) &=& \frac{1}{1 + e^{-x}} \\\\ f'(x) &=& \frac{e^{-x}}{(1 + e^{-x})^2} = f(x)(1-f(x)) \\end{array}} :githublink:`%|py|98` """ if activation == 'softmax': return NeuralTreeNode._dsoftmax if activation == 'softmax4': return lambda x: NeuralTreeNode._dsoftmax(x) * 4 if activation in {'logistic', 'expit', 'sigmoid'}: return NeuralTreeNode._dsigmoid if activation == 'sigmoid4': return lambda x: NeuralTreeNode._dsigmoid(x) * 4 if activation == 'relu': return NeuralTreeNode._drelu if activation == 'leakyrelu': return NeuralTreeNode._dleakyrelu if activation == 'identity': return lambda x: numpy.ones(x.shape, dtype=x.dtype) raise ValueError( # pragma: no cover "Unknown activation gradient function '{}'.".format(activation))
[docs] @staticmethod def get_activation_loss_function(activation): """ Returns a default loss function based on the activation function. It returns two functions *g=loss(x,y)*. :githublink:`%|py|121` """ if activation in {'logistic', 'expit', 'sigmoid', 'sigmoid4'}: # regression + regularization return lambda x, y: (x - y) ** 2 if activation in {'softmax', 'softmax4'}: cst = numpy.finfo(numpy.float32).eps # classification def kl_fct2(x, y): return kl_fct(x + cst, y + cst) return kl_fct2 if activation in {'identity', 'relu', 'leakyrelu'}: # regression return lambda x, y: (x - y) ** 2 raise ValueError( "Unknown activation function '{}'.".format(activation))
[docs] @staticmethod def get_activation_dloss_function(activation): """ Returns the derivative of the default loss function based on the activation function. It returns a function *df(x,y)/dw, df(w)/dw* where *w* are the weights. :githublink:`%|py|144` """ if activation in {'logistic', 'expit', 'sigmoid', 'sigmoid4'}: # regression + regularization def dregrdx(x, y): return (x - y) * 2 return dregrdx if activation in {'softmax', 'softmax4'}: # classification cst = numpy.finfo(numpy.float32).eps def dclsdx(x, y): return numpy.log(x + cst) - numpy.log(y + cst) return dclsdx if activation in {'identity', 'relu', 'leakyrelu'}: # regression def dregdx(x, y): return (x - y) * 2 return dregdx raise ValueError( # pragma: no cover "Unknown activation function '{}'.".format(activation))
[docs] def __init__(self, weights, bias=None, activation='sigmoid', nodeid=-1, tag=None): """ :param weights: weights :param bias: bias, if None, draws a random number :param activation: activation function :param nodeid: node id :param tag: unused but to add information on how this node was created :githublink:`%|py|179` """ self.tag = tag if isinstance(weights, int): if activation.startswith('softmax'): weights = rnd.randn(2, weights) else: weights = rnd.randn(weights) if isinstance(weights, list): weights = numpy.array(weights) if len(weights.shape) == 1: self.n_outputs = 1 if bias is None: bias = rnd.randn() self.coef = numpy.empty(len(weights) + 1) self.coef[1:] = weights self.coef[0] = bias elif len(weights.shape) == 2: self.n_outputs = weights.shape[0] if self.n_outputs == 1: raise RuntimeError( # pragma: no cover "Unexpected unsqueezed weights shape: {}".format(weights.shape)) if bias is None: bias = rnd.randn(self.n_outputs) shape = list(weights.shape) shape[1] += 1 self.coef = numpy.empty(shape) self.coef[:, 1:] = weights self.coef[:, 0] = bias else: raise RuntimeError( # pragma: no cover "Unexpected weights shape: {}".format(weights.shape)) self.activation = activation self.nodeid = nodeid self._set_fcts()
[docs] def _set_fcts(self): self.activation_ = NeuralTreeNode.get_activation_function( self.activation) self.gradient_ = NeuralTreeNode.get_activation_gradient_function( self.activation) self.losss_ = NeuralTreeNode.get_activation_loss_function( self.activation) self.dlossds_ = NeuralTreeNode.get_activation_dloss_function( self.activation)
@property def input_weights(self): "Returns the weights." if self.n_outputs == 1: return self.coef[1:] return self.coef[:, 1:] @property def bias(self): "Returns the weights." if self.n_outputs == 1: return self.coef[0] return self.coef[:, 0]
[docs] def __getstate__(self): "usual" return { 'coef': self.coef, 'activation': self.activation, 'nodeid': self.nodeid, 'n_outputs': self.n_outputs, 'tag': self.tag}
[docs] def __setstate__(self, state): "usual" self.coef = state['coef'] self.activation = state['activation'] self.nodeid = state['nodeid'] self.n_outputs = state['n_outputs'] self.tag = state['tag'] self._set_fcts()
[docs] def __eq__(self, obj): if self.coef.shape != obj.coef.shape: return False if any(map(lambda xy: xy[0] != xy[1], zip(self.coef.ravel(), obj.coef.ravel()))): return False if self.activation != obj.activation: return False return True
[docs] def __repr__(self): "usual" if len(self.coef.shape) == 1: return "%s(weights=%r, bias=%r, activation=%r)" % ( self.__class__.__name__, self.coef[1:], self.coef[0], self.activation) return "%s(weights=%r, bias=%r, activation=%r)" % ( self.__class__.__name__, self.coef[:, 1:], self.coef[:, 0], self.activation)
[docs] def _predict(self, X): "Computes inputs of the activation function." if self.n_outputs == 1: return X @ self.coef[1:] + self.coef[0] return (X.reshape((1, -1)) @ self.coef[:, 1:].T + self.coef[:, 0]).ravel()
[docs] def predict(self, X): "Computes neuron outputs." if self.n_outputs == 1: return self.activation_(X @ self.coef[1:] + self.coef[0]) if len(X.shape) == 2: return self.activation_( (X @ self.coef[:, 1:].T + self.coef[:, 0])) return self.activation_( (X.reshape((1, -1)) @ self.coef[:, 1:].T + self.coef[:, 0]).ravel())
@property def ndim(self): "Returns the input dimension." if len(self.coef.shape) == 1: return self.coef.shape[0] - 1 return self.coef.shape[1] - 1 @property def training_weights(self): "Returns the weights stored in the neuron." return self.coef.ravel()
[docs] def update_training_weights(self, X, add=True): """ Updates weights. :param grad: vector to add to the weights such as gradient :param add: addition or replace :githublink:`%|py|311` """ if add: self.coef += X.reshape(self.coef.shape) else: numpy.copyto(self.coef, X.reshape(self.coef.shape))
[docs] def fill_cache(self, X): """ Creates a cache with intermediate results. ``lX`` is the results before the activation function, ``aX`` is the results after the activation function, the prediction. :githublink:`%|py|322` """ cache = dict(lX=self._predict(X)) cache['aX'] = self.activation_(cache['lX']) return cache
[docs] def _common_loss_dloss(self, X, y, cache=None): """ Common beginning to methods *loss*, *dlossds*, *dlossdw*. :githublink:`%|py|331` """ if cache is not None and 'aX' in cache: act = cache['aX'] else: act = self.predict(X) return act
[docs] def loss(self, X, y, cache=None): """ Computes the loss. Returns a float. :githublink:`%|py|341` """ act = self._common_loss_dloss(X, y, cache=cache) if len(X.shape) == 1: return self.losss_(act, y) # pylint: disable=E1120 return self.losss_(act, y) # pylint: disable=E1120
[docs] def dlossds(self, X, y, cache=None): """ Computes the loss derivative due to prediction error. :githublink:`%|py|350` """ act = self._common_loss_dloss(X, y, cache=cache) return self.dlossds_(act, y)
[docs] def gradient_backward(self, graddx, X, inputs=False, cache=None): """ Computes the gradients at point *X*. :param graddx: existing gradient against the inputs :param X: computes the gradient in X :param inputs: if False, derivative against the coefficients, otherwise against the inputs. :param cache: cache intermediate results :return: gradient :githublink:`%|py|364` """ if cache is None: cache = self.fill_cache(X) pred = cache['aX'] ga = self.gradient_(pred) if len(ga.shape) == 2: f = graddx @ ga else: f = graddx * ga if inputs: if len(self.coef.shape) == 1: rgrad = numpy.empty(X.shape) rgrad[:] = self.coef[1:] rgrad *= f else: rgrad = numpy.sum( self.coef[:, 1:] * f.reshape((-1, 1)), axis=0) return rgrad rgrad = numpy.empty(self.coef.shape) if len(self.coef.shape) == 1: rgrad[0] = 1 rgrad[1:] = X rgrad *= f else: rgrad[:, 0] = 1 rgrad[:, 1:] = X rgrad *= f.reshape((-1, 1)) return rgrad