Code source de mlstatpy.ml.matrices

# -*- coding: utf-8 -*-
"""
Algorithms about matrices.


:githublink:`%|py|6`
"""
import warnings
import numpy
import numpy.linalg
from scipy.linalg.lapack import dtrtri  # pylint: disable=E0611


[docs]def gram_schmidt(mat, change=False): """ Applies the `Gram–Schmidt process <https://en.wikipedia.org/wiki/Gram%E2%80%93Schmidt_process>`_. Due to performance, every row is considered as a vector. :param mat: matrix :param change: returns the matrix to change the basis :return: new matrix or (new matrix, change matrix) The function assumes the matrix *mat* is horizontal: it has more columns than rows. .. note:: The implementation could be improved by directly using :epkg:`BLAS` function. .. runpython:: :showcode: import numpy from mlstatpy.ml.matrices import gram_schmidt X = numpy.array([[1., 2., 3., 4.], [5., 6., 6., 6.], [5., 6., 7., 8.]]) T, P = gram_schmidt(X, change=True) print(T) print(P) :githublink:`%|py|41` """ if len(mat.shape) != 2: raise ValueError("mat must be a matrix.") # pragma: no cover if mat.shape[1] < mat.shape[0]: raise RuntimeError( # pragma: no cover "The function only works if the number of rows is less " "than the number of columns.") if change: base = numpy.identity(mat.shape[0]) # The following code is equivalent to: # res = numpy.empty(mat.shape) # for i in range(0, mat.shape[0]): # res[i, :] = mat[i, :] # for j in range(0, i): # d = numpy.dot(res[j, :], mat[i, :]) # res[i, :] -= res[j, :] * d # if change: # base[i, :] -= base[j, :] * d # d = numpy.dot(res[i, :], res[i, :]) # if d > 0: # d **= 0.5 # res[i, :] /= d # if change: # base[i, :] /= d # But it is faster to write it this way: res = numpy.empty(mat.shape) for i in range(0, mat.shape[0]): res[i, :] = mat[i, :] if i > 0: d = numpy.dot(res[:i, :], mat[i, :]) m = numpy.multiply(res[:i, :], d.reshape((len(d), 1))) m = numpy.sum(m, axis=0) res[i, :] -= m if change: m = numpy.multiply(base[:i, :], d.reshape((len(d), 1))) m = numpy.sum(m, axis=0) base[i, :] -= m d = numpy.dot(res[i, :], res[i, :]) if d > 0: d **= 0.5 res[i, :] /= d if change: base[i, :] /= d return (res, base) if change else res
[docs]def linear_regression(X, y, algo=None): """ Solves the linear regression problem, find :math:`\\beta` which minimizes :math:`\\norme{y - X\\beta}`, based on the algorithm :ref:`Arbre de décision optimisé pour les régressions linéaires <algo_decision_tree_mselin>`. :param X: features :param y: targets :param algo: None to use the standard algorithm :math:`\\beta = (X'X)^{-1} X'y`, `'gram'`, `'qr'` :return: beta .. runpython:: :showcode: import numpy from mlstatpy.ml.matrices import linear_regression X = numpy.array([[1., 2., 3., 4.], [5., 6., 6., 6.], [5., 6., 7., 8.]]).T y = numpy.array([0.1, 0.2, 0.19, 0.29]) beta = linear_regression(X, y, algo="gram") print(beta) ``algo=None`` computes :math:`\\beta = (X'X)^{-1} X'y`. ``algo='qr'`` uses a `QR <https://docs.scipy.org/doc/numpy/reference /generated/numpy.linalg.qr.html>`_ decomposition and calls function `dtrtri <https://docs.scipy.org/doc/scipy/reference/generated/scipy. linalg.lapack.dtrtri.html>`_ to invert an upper triangular matrix. ``algo='gram'`` uses :func:`gram_schmidt <mlstatpy.ml.matrices.gram_schmidt>` and then computes the solution of the linear regression (see above for a link to the algorithm). :githublink:`%|py|124` """ if len(y.shape) != 1: warnings.warn( # pragma: no cover "This function is not tested for a multidimensional linear regression.") if algo is None: inv = numpy.linalg.inv(X.T @ X) return inv @ (X.T @ y) if algo == "gram": T, P = gram_schmidt(X.T, change=True) # T = P X return (y.T @ T.T @ P).ravel() if algo == "qr": Q, R = numpy.linalg.qr(X, "full") Ri = dtrtri(R)[0] gamma = (y.T @ Q).ravel() return (gamma @ Ri.T).ravel() raise ValueError( # pragma: no cover "Unknwown algo='{}'.".format(algo))
[docs]def norm2(X): """ Computes the square norm for all rows of a matrix. :githublink:`%|py|148` """ res = numpy.empty(X.shape[1]) for i in range(X.shape[1]): res[i] = numpy.dot(X[i], X[i]) return res
[docs]def streaming_gram_schmidt_update(Xk, Pk): """ Updates matrix :math:`P_k` to produce :math:`P_{k+1}` which is the matrix *P* in algorithm :ref:`Streaming Linear Regression <algo_reg_lin_gram_schmidt_streaming>`. The function modifies the matrix *Pk* given as an input. :param Xk: kth row :param Pk: matrix *P* at iteration *k-1* :githublink:`%|py|166` """ tki = Pk @ Xk idi = numpy.identity(Pk.shape[0]) for i in range(0, Pk.shape[0]): val = tki[i] if i > 0: # for j in range(0, i): # d = tki[j] * val # tki[i] -= tki[j] * d # Pk[i, :] -= Pk[j, :] * d # idi[i, :] -= idi[j, :] * d dv = tki[:i] * val tki[i] -= numpy.dot(dv, tki[:i]) dv = dv.reshape((i, 1)) Pk[i, :] -= numpy.multiply(Pk[:i, :], dv).sum(axis=0) idi[i, :] -= numpy.multiply(idi[:i, :], dv).sum(axis=0) d = numpy.square(idi[i, :]).sum() # pylint: disable=E1101 d = tki[i] ** 2 + d if d > 0: d **= 0.5 d = 1. / d tki[i] *= d Pk[i, :] *= d idi[i, :] *= d
[docs]def streaming_gram_schmidt(mat, start=None): """ Solves the linear regression problem, find :math:`\\beta` which minimizes :math:`\\norme{y - X\\beta}`, based on algorithm :ref:`Streaming Gram-Schmidt <algo_reg_lin_gram_schmidt_streaming>`. :param mat: matrix :param start: first row to start iteration, ``X.shape[1]`` by default :return: iterator on The function assumes the matrix *mat* is horizontal: it has more columns than rows. .. runpython:: :showcode: import numpy from mlstatpy.ml.matrices import streaming_gram_schmidt X = numpy.array([[1, 0.5, 10., 5., -2.], [0, 0.4, 20, 4., 2.], [0, 0.7, 20, 4., 2.]], dtype=float).T for i, p in enumerate(streaming_gram_schmidt(X.T)): print("iteration", i, "\\n", p) t = X[:i+3] @ p.T print(t.T @ t) :githublink:`%|py|225` """ if len(mat.shape) != 2: raise ValueError("mat must be a matrix.") # pragma: no cover if mat.shape[1] < mat.shape[0]: raise RuntimeError("The function only works if the number of rows is less " "than the number of columns.") if start is None: start = mat.shape[0] mats = mat[:, :start] _, Pk = gram_schmidt(mats, change=True) yield Pk k = start while k < mat.shape[1]: streaming_gram_schmidt_update(mat[:, k], Pk) yield Pk k += 1
[docs]def streaming_linear_regression_update(Xk, yk, XkXk, bk): """ Updates coefficients :math:`\\beta_k` to produce :math:`\\beta_{k+1}` in :ref:`l-piecewise-linear-regression`. The function modifies the matrix *Pk* given as an input. :param Xk: kth row :param yk: kth target :param XkXk: matrix :math:`X_{1..k}'X_{1..k}', updated by the function :param bk: current coefficient (updated by the function) :githublink:`%|py|255` """ Xk = Xk.reshape((1, XkXk.shape[0])) xxk = Xk.T @ Xk XkXk += xxk err = Xk.T * (yk - Xk @ bk) bk[:] += (numpy.linalg.inv(XkXk) @ err).flatten()
[docs]def streaming_linear_regression(mat, y, start=None): """ Streaming algorithm to solve a linear regression. See :ref:`l-piecewise-linear-regression`. :param mat: features :param y: expected target :return: iterator on coefficients .. runpython:: :showcode: import numpy from mlstatpy.ml.matrices import streaming_linear_regression, linear_regression X = numpy.array([[1, 0.5, 10., 5., -2.], [0, 0.4, 20, 4., 2.], [0, 0.7, 20, 4., 3.]], dtype=float).T y = numpy.array([1., 0.3, 10, 5.1, -3.]) for i, bk in enumerate(streaming_linear_regression(X, y)): bk0 = linear_regression(X[:i+3], y[:i+3]) print("iteration", i, bk, bk0) :githublink:`%|py|286` """ if len(mat.shape) != 2: raise ValueError("mat must be a matrix.") # pragma: no cover if mat.shape[0] < mat.shape[1]: raise RuntimeError("The function only works if the number of rows is more " "than the number of columns.") if len(y.shape) != 1: warnings.warn( # pragma: no cover "This function is not tested for a multidimensional linear regression.") if start is None: start = mat.shape[1] Xk = mat[:start] XkXk = Xk.T @ Xk bk = numpy.linalg.inv(XkXk) @ (Xk.T @ y[:start]) yield bk k = start while k < mat.shape[0]: streaming_linear_regression_update(mat[k], y[k], XkXk, bk) yield bk k += 1
[docs]def streaming_linear_regression_gram_schmidt_update(Xk, yk, Xkyk, Pk, bk): """ Updates coefficients :math:`\\beta_k` to produce :math:`\\beta_{k+1}` in :ref:`Streaming Linear Regression <l-piecewise-linear-regression-gram_schmidt>`. The function modifies the matrix *Pk* given as an input. :param Xk: kth row :param yk: kth target :param Xkyk: matrix :math:`X_{1..k}' y_{1..k}' (updated by the function) :param Pk: Gram-Schmidt matrix produced by the streaming algorithm (updated by the function) :param bk: current coefficient (updated by the function) :githublink:`%|py|324` """ Xk = Xk.T streaming_gram_schmidt_update(Xk, Pk) Xkyk += (Xk * yk).reshape(Xkyk.shape) bk[:] = Pk @ Xkyk @ Pk
[docs]def streaming_linear_regression_gram_schmidt(mat, y, start=None): """ Streaming algorithm to solve a linear regression with Gram-Schmidt algorithm. See :ref:`l-piecewise-linear-regression-gram_schmidt`. :param mat: features :param y: expected target :return: iterator on coefficients .. runpython:: :showcode: import numpy from mlstatpy.ml.matrices import streaming_linear_regression, linear_regression X = numpy.array([[1, 0.5, 10., 5., -2.], [0, 0.4, 20, 4., 2.], [0, 0.7, 20, 4., 3.]], dtype=float).T y = numpy.array([1., 0.3, 10, 5.1, -3.]) for i, bk in enumerate(streaming_linear_regression(X, y)): bk0 = linear_regression(X[:i+3], y[:i+3]) print("iteration", i, bk, bk0) :githublink:`%|py|355` """ if len(mat.shape) != 2: raise ValueError("mat must be a matrix.") # pragma: no cover if mat.shape[0] < mat.shape[1]: raise RuntimeError("The function only works if the number of rows is more " "than the number of columns.") if len(y.shape) != 1: warnings.warn( # pragma: no cover "This function is not tested for a multidimensional linear regression.") if start is None: start = mat.shape[1] Xk = mat[:start] xyk = Xk.T @ y[:start] _, Pk = gram_schmidt(Xk.T, change=True) bk = Pk @ xyk @ Pk yield bk k = start while k < mat.shape[0]: streaming_linear_regression_gram_schmidt_update( mat[k], y[k], xyk, Pk, bk) yield bk k += 1