Code source de mlstatpy.ml.neural_tree

# -*- coding: utf-8 -*-
"""
Conversion from tree to neural network.


:githublink:`%|py|6`
"""
from io import BytesIO
import pickle
import numpy
from ._neural_tree_api import _TrainingAPI
from ._neural_tree_node import NeuralTreeNode


[docs]def label_class_to_softmax_output(y_label): """ Converts a binary class label into a matrix with two columns of probabilities. .. runpython:: :showcode: import numpy from mlstatpy.ml.neural_tree import label_class_to_softmax_output y_label = numpy.array([0, 1, 0, 0]) soft_y = label_class_to_softmax_output(y_label) print(soft_y) :githublink:`%|py|27` """ if len(y_label.shape) != 1: raise ValueError( "y_label must be a vector but has shape {}.".format(y_label.shape)) y = numpy.empty((y_label.shape[0], 2), dtype=numpy.float64) y[:, 0] = (y_label < 0.5).astype(numpy.float64) y[:, 1] = 1 - y[:, 0] return y
[docs]class NeuralTreeNet(_TrainingAPI): """ Node ensemble. .. runpython:: :showcode: import numpy from mlstatpy.ml.neural_tree import NeuralTreeNode, NeuralTreeNet w1 = numpy.array([-0.5, 0.8, -0.6]) neu = NeuralTreeNode(w1[1:], bias=w1[0], activation='sigmoid') net = NeuralTreeNet(2, empty=True) net.append(neu, numpy.arange(2)) ide = NeuralTreeNode(numpy.array([1.]), bias=numpy.array([0.]), activation='identity') net.append(ide, numpy.arange(2, 3)) X = numpy.abs(numpy.random.randn(10, 2)) pred = net.predict(X) print(pred) :githublink:`%|py|62` """
[docs] def __init__(self, dim, empty=True): """ :param dim: space dimension :param empty: empty network, other adds an identity node :githublink:`%|py|68` """ self.dim = dim if empty: self.nodes = [] self.nodes_attr = [] else: self.nodes = [ NeuralTreeNode( numpy.ones((dim,), dtype=numpy.float64), bias=numpy.float64(0.), activation='identity', nodeid=0)] self.nodes_attr = [dict(inputs=numpy.arange(0, dim), output=dim, coef_size=self.nodes[0].coef.size, first_coef=0)] self._update_members()
def copy(self): st = BytesIO() pickle.dump(self, st) cop = BytesIO(st.getvalue()) return pickle.load(cop)
[docs] def _update_members(self, node=None, attr=None): "Updates internal members." if node is None or attr is None: if len(self.nodes_attr) == 0: self.size_ = self.dim else: self.size_ = max(d['output'] for d in self.nodes_attr) + 1 self.output_to_node_ = {} self.input_to_node_ = {} for node2, attr2 in zip(self.nodes, self.nodes_attr): if isinstance(attr2['output'], list): for o in attr2['output']: self.output_to_node_[o] = node2, attr2 else: self.output_to_node_[attr2['output']] = node2, attr2 for i in attr2['inputs']: self.input_to_node_[i] = node2, attr2 else: if len(node.input_weights.shape) == 1: self.size_ += 1 else: self.size_ += node.input_weights.shape[0] if isinstance(attr['output'], list): for o in attr['output']: self.output_to_node_[o] = node, attr else: self.output_to_node_[attr['output']] = node, attr for i in attr['inputs']: self.input_to_node_[i] = node, attr
[docs] def __repr__(self): "usual" return "%s(%d)" % (self.__class__.__name__, self.dim)
[docs] def clear(self): "Clear all nodes" del self.nodes[:] del self.nodes_attr[:] self._update_members()
[docs] def append(self, node, inputs): """ Appends a node into the graph. :param node: node to add :param inputs: index of input nodes :githublink:`%|py|136` """ if len(node.input_weights.shape) == 1: if node.input_weights.shape[0] != len(inputs): raise RuntimeError( "Dimension mismatch between weights [{}] and inputs [{}].".format( node.input_weights.shape[0], len(inputs))) node.nodeid = len(self.nodes) self.nodes.append(node) first_coef = ( 0 if len(self.nodes_attr) == 0 else self.nodes_attr[-1]['first_coef'] + self.nodes_attr[-1]['coef_size']) attr = dict(inputs=numpy.array(inputs), output=self.size_, coef_size=node.coef.size, first_coef=first_coef) self.nodes_attr.append(attr) elif len(node.input_weights.shape) == 2: if node.input_weights.shape[1] != len(inputs): raise RuntimeError( # pragma: no cover "Dimension mismatch between weights [{}] and inputs [{}].".format( node.input_weights.shape[1], len(inputs))) node.nodeid = len(self.nodes) self.nodes.append(node) first_coef = ( 0 if len(self.nodes_attr) == 0 else self.nodes_attr[-1]['first_coef'] + self.nodes_attr[-1]['coef_size']) attr = dict(inputs=numpy.array(inputs), output=list(range(self.size_, self.size_ + node.input_weights.shape[0])), coef_size=node.coef.size, first_coef=first_coef) self.nodes_attr.append(attr) else: raise RuntimeError( # pragma: no cover "Coefficients should have 1 or 2 dimension not {}.".format(node.input_weights.shape)) self._update_members(node, attr)
[docs] def __getitem__(self, i): "Retrieves node and attributes for node i." return self.nodes[i], self.nodes_attr[i]
[docs] def __len__(self): "Returns the number of nodes" return len(self.nodes)
[docs] def _predict_one(self, X): res = numpy.zeros((self.size_,), dtype=numpy.float64) res[:self.dim] = X for node, attr in zip(self.nodes, self.nodes_attr): res[attr['output']] = node.predict(res[attr['inputs']]) return res
def predict(self, X): if len(X.shape) == 2: res = numpy.zeros((X.shape[0], self.size_)) for i, x in enumerate(X): res[i, :] = self._predict_one(x) return res return self._predict_one(X)
[docs] @staticmethod def create_from_tree(tree, k=1., arch='one'): """ Creates a :class:`NeuralTreeNet <mlstatpy.ml.neural_tree.NeuralTreeNet>` instance from a :epkg:`DecisionTreeClassifier` :param tree: :epkg:`DecisionTreeClassifier` :param k: slant of the sigmoïd :param arch: architecture, see below :return: :class:`NeuralTreeNet <mlstatpy.ml.neural_tree.NeuralTreeNet>` The function only works for binary problems. Available architecture: * `'one'`: the method adds nodes with one output, there is no soecific definition of layers, * `'compact'`: the adds two nodes, the first computes the threshold, the second one computes the leaves output, a final node merges all outputs into one See notebook :ref:`neuraltreerst` for examples. :githublink:`%|py|213` """ if arch == 'one': return NeuralTreeNet._create_from_tree_one(tree, k) if arch == 'compact': return NeuralTreeNet._create_from_tree_compact(tree, k) raise ValueError("Unknown arch value '{}'.".format(arch))
[docs] @staticmethod def _create_from_tree_one(tree, k=1.): "Implements strategy one. See @see meth create_from_tree." if tree.n_classes_ > 2: raise RuntimeError( "The function only support binary classification problem.") n_nodes = tree.tree_.node_count children_left = tree.tree_.children_left children_right = tree.tree_.children_right feature = tree.tree_.feature threshold = tree.tree_.threshold value = tree.tree_.value.reshape((-1, 2)) output_class = (value[:, 1] > value[:, 0]).astype(numpy.int64) max_features_ = tree.max_features_ root = NeuralTreeNet(tree.max_features_, empty=True) feat_index = numpy.arange(0, max_features_) predecessor = {} outputs = {i: [] for i in range(0, tree.n_classes_)} for i in range(n_nodes): if children_left[i] != children_right[i]: # node with a threshold # right side coef = numpy.zeros((max_features_,), dtype=numpy.float64) coef[feature[i]] = -k node_th = NeuralTreeNode(coef, bias=k * threshold[i], activation='sigmoid4', tag="N%d-th" % i) root.append(node_th, feat_index) if i in predecessor: pred = predecessor[i] node1 = pred node2 = node_th attr1 = root[node1.nodeid][1] attr2 = root[node2.nodeid][1] coef = numpy.ones((2,), dtype=numpy.float64) * k node_true = NeuralTreeNode(coef, bias=-k * 1.5, activation='sigmoid4', tag="N%d-T" % i) root.append(node_true, [attr1['output'], attr2['output']]) coef = numpy.zeros((2,), dtype=numpy.float64) coef[0] = k coef[1] = -k node_false = NeuralTreeNode(coef, bias=-k * 0.25, activation='sigmoid4', tag="N%d-F" % i) root.append(node_false, [attr1['output'], attr2['output']]) predecessor[children_left[i]] = node_true predecessor[children_right[i]] = node_false else: coef = numpy.ones((1,), dtype=numpy.float64) * -1 node_false = NeuralTreeNode( coef, bias=1, activation='identity', tag="N%d-F" % i) attr = root[node_th.nodeid][1] root.append(node_false, [attr['output']]) predecessor[children_left[i]] = node_th predecessor[children_right[i]] = node_false elif i in predecessor: # leave outputs[output_class[i]].append(predecessor[i]) # final node output = [] index = [0] nb = [] for i in range(0, tree.n_classes_): output.extend(outputs[i]) nb.append(len(outputs[i])) index.append(len(outputs[i]) + index[-1]) coef = numpy.zeros((len(nb), len(output)), dtype=numpy.float64) for i in range(0, tree.n_classes_): coef[i, index[i]:index[i + 1]] = k feat = [root[n.nodeid][1]['output'] for n in output] root.append( NeuralTreeNode(coef, bias=(-k / 2) * len(feat), activation='softmax4', tag="Nfinal"), feat) # final return root
[docs] @staticmethod def _create_from_tree_compact(tree, k=1.): "Implements strategy one. See @see meth create_from_tree." if tree.n_classes_ > 2: raise RuntimeError( "The function only support binary classification problem.") n_nodes = tree.tree_.node_count children_left = tree.tree_.children_left children_right = tree.tree_.children_right feature = tree.tree_.feature threshold = tree.tree_.threshold value = tree.tree_.value.reshape((-1, 2)) output_class = (value[:, 1] > value[:, 0]).astype(numpy.int64) max_features_ = tree.max_features_ feat_index = numpy.arange(0, max_features_) root = NeuralTreeNet(tree.max_features_, empty=True) coef1 = [] bias1 = [] parents = {} rows = {} # first pass: threshold for i in range(n_nodes): if children_left[i] == children_right[i]: # leaves continue rows[i] = len(coef1) parents[children_left[i]] = i parents[children_right[i]] = i coef = numpy.zeros((max_features_,), dtype=numpy.float64) coef[feature[i]] = -k coef1.append(coef) bias1.append(k * threshold[i]) coef1 = numpy.vstack(coef1) if len(bias1) == 1: bias1 = bias1[0] node1 = NeuralTreeNode( coef1 if coef1.shape[0] > 1 else coef1[0], bias=bias1, activation='sigmoid4', tag="threshold") root.append(node1, feat_index) th_index = numpy.arange(max_features_, max_features_ + coef1.shape[0]) # second pass: decision path coef2 = [] bias2 = [] output = [] for i in range(n_nodes): if children_left[i] != children_right[i]: # not a leave continue path = [] last = i lr = "class", output_class[i] output.append(output_class[i]) while last is not None: path.append((last, lr)) if last not in parents: break par = parents[last] if children_right[par] == last: lr = 'right' elif children_left[par] == last: lr = 'left' else: raise RuntimeError( # pragma: no cover "Inconsistent tree structure.") last = par coef = numpy.zeros((coef1.shape[0], ), dtype=numpy.float64) bias = 0. for ip, lr in path: if isinstance(lr, tuple): lr, value = lr if lr != 'class': raise RuntimeError( "algorithm issue") # pragma: no cover else: r = rows[ip] if lr == 'right': coef[r] = k bias -= k / 2 else: coef[r] = -k bias += k / 2 coef2.append(coef) bias2.append(bias) coef2 = numpy.vstack(coef2) if len(bias2) == 1: bias2 = bias2[0] node2 = NeuralTreeNode( coef2 if coef2.shape[0] > 1 else coef2[0], bias=bias2, activation='sigmoid4', tag="pathes") root.append(node2, th_index) # final node coef = numpy.zeros( (tree.n_classes_, coef2.shape[0]), dtype=numpy.float64) bias = [0. for i in range(tree.n_classes_)] for i, cls in enumerate(output): coef[cls, i] = -k coef[1 - cls, i] = k bias[cls] += k / 2 bias[1 - cls] += -k / 2 findex = numpy.arange(max_features_ + coef1.shape[0], max_features_ + coef1.shape[0] + coef2.shape[0]) root.append( NeuralTreeNode(coef, bias=bias, activation='softmax4', tag="final"), findex) # end return root
[docs] def to_dot(self, X=None): """ Exports the neural network into :epkg:`dot`. :param X: input as an example :githublink:`%|py|435` """ y = None if X is not None: y = self.predict(X) rows = ['digraph Tree {', "node [shape=box, fontsize=10];", "edge [fontsize=8];"] for i in range(self.dim): if y is None: rows.append('{0} [label="X[{0}]"];'.format(i)) else: rows.append( '{0} [label="X[{0}]=\\n{1:1.2f}"];'.format(i, X[i])) labels = {} for i in range(0, len(self)): # pylint: disable=C0200 o = self[i][1]['output'] if isinstance(o, int): lo = str(o) labels[o] = lo lof = "%s" else: lo = "s" + 'a'.join(map(str, o)) for oo in o: labels[oo] = '{}:f{}'.format(lo, oo) los = "|".join("<f{0}> {0}".format(oo) for oo in o) lof = "%s&#92;n" + los a = "a={}\n".format(self[i][0].activation) stag = "" if self[i][0].tag is None else (self[i][0].tag + "\\n") bias = str(numpy.array(self[i][0].bias)).replace(" ", "&#92; ") if y is None: lab = lof % '{}{}id={} b={} s={}'.format( stag, a, i, bias, self[i][0].n_outputs) else: yo = numpy.array(y[o]) lab = lof % '{}{}id={} b={} s={}\ny={}'.format( stag, a, i, bias, self[i][0].n_outputs, yo) rows.append('{} [label="{}"];'.format( lo, lab.replace("\n", "&#92;n"))) for ii, inp in enumerate(self[i][1]['inputs']): if isinstance(o, int): w = self[i][0].input_weights[ii] if w == 0: c = ', color=grey, fontcolor=grey' elif w < 0: c = ', color=red, fontcolor=red' else: c = ', color=blue, fontcolor=blue' rows.append( '{} -> {} [label="{}"{}];'.format(inp, o, w, c)) continue w = self[i][0].input_weights[:, ii] for oi, oo in enumerate(o): if w[oi] == 0: c = ', color=grey, fontcolor=grey' elif w[oi] < 0: c = ', color=red, fontcolor=red' else: c = ', color=blue, fontcolor=blue' rows.append('{} -> {} [label="{}|{}"{}];'.format( labels.get(inp, inp), labels[oo], oi, w[oi], c)) rows.append('}') return '\n'.join(rows)
@property def shape(self): "Returns the shape of the coefficients." return (sum(n.coef.size for n in self.nodes), ) @property def training_weights(self): "Returns the weights." sh = self.shape res = numpy.empty(sh[0], dtype=numpy.float64) pos = 0 for n in self.nodes: s = n.coef.size res[pos: pos + s] = ( n.coef if len(n.coef.shape) == 1 else n.coef.ravel()) pos += s return res
[docs] def update_training_weights(self, X, add=True): """ Updates weights. :param grad: vector to add to the weights such as gradient :param add: addition or replace :githublink:`%|py|527` """ pos = 0 if add: for n in self.nodes: s = n.coef.size n.coef += X[pos: pos + s].reshape(n.coef.shape) pos += s else: for n in self.nodes: s = n.coef.size numpy.copyto(n.coef, X[pos: pos + s].reshape(n.coef.shape)) pos += s
[docs] def fill_cache(self, X): """ Creates a cache with intermediate results. :githublink:`%|py|543` """ big_cache = {} res = numpy.zeros((self.size_,), dtype=numpy.float64) res[:self.dim] = X for node, attr in zip(self.nodes, self.nodes_attr): cache = node.fill_cache(res[attr['inputs']]) big_cache[node.nodeid] = cache res[attr['output']] = cache['aX'] big_cache[-1] = res return big_cache
[docs] def _get_output_node_attr(self, nb_last): """ Retrieves the output nodes. *nb_last* is the number of expected outputs. :githublink:`%|py|558` """ neurones = set(self.output_to_node_[i][0].nodeid for i in range(self.size_ - nb_last, self.size_)) if len(neurones) != 1: raise RuntimeError( # pragma: no cover "Only one output node is implemented not {}".format( len(neurones))) return self.output_to_node_[self.size_ - 1]
[docs] def _common_loss_dloss(self, X, y, cache=None): """ Common beginning to methods *loss*, *dlossds*, *dlossdw*. :githublink:`%|py|571` """ last = 1 if len(y.shape) <= 1 else y.shape[1] if cache is not None and -1 in cache: res = cache[-1] else: res = self.predict(X) if len(res.shape) == 2: pred = res[:, -last:] else: pred = res[-last:] last_node, last_attr = self._get_output_node_attr(last) return res, pred, last_node, last_attr
[docs] def loss(self, X, y, cache=None): """ Computes the loss due to prediction error. Returns a float. :githublink:`%|py|587` """ res, _, last_node, last_attr = self._common_loss_dloss( X, y, cache=cache) if len(res.shape) <= 1: return last_node.loss(res[last_attr['inputs']], y) # pylint: disable=E1120 return last_node.loss(res[:, last_attr['inputs']], y) # pylint: disable=E1120
[docs] def dlossds(self, X, y, cache=None): """ Computes the loss derivative against the inputs. :githublink:`%|py|597` """ res, _, last_node, last_attr = self._common_loss_dloss( X, y, cache=cache) if len(res.shape) <= 1: return last_node.dlossds(res[last_attr['inputs']], y) # pylint: disable=E1120 return last_node.dlossds(res[:, last_attr['inputs']], y) # pylint: disable=E1120
[docs] def gradient_backward(self, graddx, X, inputs=False, cache=None): """ Computes the gradient in X. :param graddx: existing gradient against the inputs :param X: computes the gradient in X :param inputs: if False, derivative against the coefficients, otherwise against the inputs. :param cache: cache intermediate results to avoid more computation :return: gradient :githublink:`%|py|614` """ if cache is None: cache = self.fill_cache(X) shape = self.training_weights.shape pred = self.predict(X) whole_gradx = numpy.zeros(pred.shape, dtype=numpy.float64) whole_gradw = numpy.zeros(shape, dtype=numpy.float64) if len(graddx.shape) == 0: whole_gradx[-1] = graddx else: whole_gradx[-graddx.shape[0]:] = graddx for node, attr in zip(self.nodes[::-1], self.nodes_attr[::-1]): ch = cache[node.nodeid] node_graddx = whole_gradx[attr['output']] xi = pred[attr['inputs']] temp_gradw = node.gradient_backward( node_graddx, xi, inputs=False, cache=ch) temp_gradx = node.gradient_backward( node_graddx, xi, inputs=True, cache=ch) whole_gradw[attr['first_coef']:attr['first_coef'] + attr['coef_size']] += temp_gradw.reshape((attr['coef_size'],)) whole_gradx[attr['inputs'] ] += temp_gradx.reshape((len(attr['inputs']),)) if inputs: return whole_gradx return whole_gradw