Source code for mlinsights.mlmodel._kmeans_022

# pylint: disable=C0302
"""
Implements k-means with norms L1 and L2.


:githublink:`%|py|6`
"""
import numpy
from scipy.sparse import issparse
# Source: https://github.com/scikit-learn/scikit-learn/blob/95d4f0841d57e8b5f6b2a570312e9d832e69debc/sklearn/cluster/_k_means_fast.pyx
from sklearn.utils.sparsefuncs_fast import assign_rows_csr  # pylint: disable=W0611,E0611
try:
    from sklearn.cluster._kmeans import _check_sample_weight
except ImportError:
    from sklearn.cluster._kmeans import (
        _check_normalize_sample_weight as _check_sample_weight)
from sklearn.metrics.pairwise import pairwise_distances_argmin_min


[docs]def _labels_inertia_precompute_dense(norm, X, sample_weight, centers, distances): """ Computes labels and inertia using a full distance matrix. This will overwrite the 'distances' array in-place. :param norm: 'l1' or 'l2' :param X: numpy array, shape (n_sample, n_features) Input data. :param sample_weight: array-like, shape (n_samples,) The weights for each observation in X. :param centers: numpy array, shape (n_clusters, n_features) Cluster centers which data is assigned to. :param distances: numpy array, shape (n_samples,) Pre-allocated array in which distances are stored. :return: labels : numpy array, dtype=numpy.int, shape (n_samples,) Indices of clusters that samples are assigned to. :return: inertia : float Sum of squared distances of samples to their closest cluster center. :githublink:`%|py|38` """ n_samples = X.shape[0] if norm == 'l2': labels, mindist = pairwise_distances_argmin_min( X=X, Y=centers, metric='euclidean', metric_kwargs={'squared': True}) elif norm == 'l1': labels, mindist = pairwise_distances_argmin_min( X=X, Y=centers, metric='manhattan') else: # pragma no cover raise NotImplementedError( "Not implemented for norm '{}'.".format(norm)) # cython k-means code assumes int32 inputs labels = labels.astype(numpy.int32, copy=False) if n_samples == distances.shape[0]: # distances will be changed in-place distances[:] = mindist inertia = (mindist * sample_weight).sum() return labels, inertia
[docs]def _assign_labels_csr(X, sample_weight, x_squared_norms, centers, labels, distances): """ Compute label assignment and inertia for a CSR input Return the inertia (sum of squared distances to the centers). :githublink:`%|py|62` """ if (distances is not None and distances.shape != (X.shape[0], )): raise ValueError( # pragma: no cover "Dimension mismatch for distance got {}, expecting {}." "".format(distances.shape, (X.shape[0], centers.shape[0]))) n_clusters = centers.shape[0] n_samples = X.shape[0] store_distances = 0 inertia = 0.0 if centers.dtype == numpy.float32: center_squared_norms = numpy.zeros(n_clusters, dtype=numpy.float32) else: center_squared_norms = numpy.zeros(n_clusters, dtype=numpy.float64) if n_samples == distances.shape[0]: store_distances = 1 for center_idx in range(n_clusters): center_squared_norms[center_idx] = numpy.dot( centers[center_idx, :], centers[center_idx, :]) for sample_idx in range(n_samples): min_dist = -1 for center_idx in range(n_clusters): dist = 0.0 # hardcoded: minimize euclidean distance to cluster center: # ||a - b||^2 = ||a||^2 + ||b||^2 -2 <a, b> dist += X[sample_idx, :] @ centers[center_idx, :].reshape((-1, 1)) dist *= -2 dist += center_squared_norms[center_idx] dist += x_squared_norms[sample_idx] dist *= sample_weight[sample_idx] if min_dist == -1 or dist < min_dist: min_dist = dist labels[sample_idx] = center_idx if store_distances: distances[sample_idx] = dist inertia += min_dist return inertia
[docs]def _assign_labels_array(X, sample_weight, x_squared_norms, centers, labels, distances): """ Compute label assignment and inertia for a dense array Return the inertia (sum of squared distances to the centers). :githublink:`%|py|110` """ n_clusters = centers.shape[0] n_samples = X.shape[0] store_distances = 0 inertia = 0.0 if centers.dtype == numpy.float32: center_squared_norms = numpy.zeros(n_clusters, dtype=numpy.float32) else: center_squared_norms = numpy.zeros(n_clusters, dtype=numpy.float64) if n_samples == distances.shape[0]: store_distances = 1 for center_idx in range(n_clusters): center_squared_norms[center_idx] = numpy.dot( centers[center_idx, :], centers[center_idx, :]) for sample_idx in range(n_samples): min_dist = -1 for center_idx in range(n_clusters): dist = 0.0 # hardcoded: minimize euclidean distance to cluster center: # ||a - b||^2 = ||a||^2 + ||b||^2 -2 <a, b> dist += numpy.dot(X[sample_idx, :], centers[center_idx, :]) dist *= -2 dist += center_squared_norms[center_idx] dist += x_squared_norms[sample_idx] dist *= sample_weight[sample_idx] if min_dist == -1 or dist < min_dist: min_dist = dist labels[sample_idx] = center_idx if store_distances: distances[sample_idx] = min_dist inertia += min_dist return inertia
[docs]def _labels_inertia_skl(X, sample_weight, x_squared_norms, centers, precompute_distances=True, distances=None): """ E step of the K-means EM algorithm. Compute the labels and the inertia of the given samples and centers. This will compute the distances in-place. :param X: float64 array-like or CSR sparse matrix, shape (n_samples, n_features) The input samples to assign to the labels. :param sample_weight: array-like, shape (n_samples,) The weights for each observation in X. :param x_squared_norms: array, shape (n_samples,) Precomputed squared euclidean norm of each data point, to speed up computations. :param centers: float array, shape (k, n_features) The cluster centers. :param precompute_distances: boolean, default: True Precompute distances (faster but takes more memory). :param distances: float array, shape (n_samples,) Pre-allocated array to be filled in with each sample's distance to the closest center. :return: labels, int array of shape(n) The resulting assignment :return: inertia, float Sum of squared distances of samples to their closest cluster center. :githublink:`%|py|174` """ n_samples = X.shape[0] sample_weight = _check_sample_weight(sample_weight, X) # set the default value of centers to -1 to be able to detect any anomaly # easily labels = numpy.full(n_samples, -1, numpy.int32) if distances is None: distances = numpy.zeros(shape=(0,), dtype=X.dtype) # distances will be changed in-place if issparse(X): inertia = _assign_labels_csr( X, sample_weight, x_squared_norms, centers, labels, distances=distances) else: if precompute_distances: return _labels_inertia_precompute_dense( norm='l2', X=X, sample_weight=sample_weight, centers=centers, distances=distances) inertia = _assign_labels_array( X, sample_weight, x_squared_norms, centers, labels, distances=distances) return labels, inertia
[docs]def _centers_dense(X, sample_weight, labels, n_clusters, distances): """ M step of the K-means EM algorithm Computation of cluster centers / means. :param X: array-like, shape (n_samples, n_features) :param sample_weight: array-like, shape (n_samples,) The weights for each observation in X. :param labels: array of integers, shape (n_samples) Current label assignment :param n_clusters: int Number of desired clusters :param distances: array-like, shape (n_samples) Distance to closest cluster for each sample. :return: centers : array, shape (n_clusters, n_features) The resulting centers :githublink:`%|py|214` """ n_samples = X.shape[0] n_features = X.shape[1] dtype = X.dtype centers = numpy.zeros((n_clusters, n_features), dtype=dtype) weight_in_cluster = numpy.zeros((n_clusters,), dtype=dtype) for i in range(n_samples): c = labels[i] weight_in_cluster[c] += sample_weight[i] empty_clusters = numpy.where(weight_in_cluster == 0)[0] # maybe also relocate small clusters? if distances is not None and len(empty_clusters): # find points to reassign empty clusters to far_from_centers = distances.argsort()[::-1] for i, cluster_id in enumerate(empty_clusters): far_index = far_from_centers[i] new_center = X[far_index] * sample_weight[far_index] centers[cluster_id] = new_center weight_in_cluster[cluster_id] = sample_weight[far_index] for i in range(n_samples): for j in range(n_features): centers[labels[i], j] += X[i, j] * sample_weight[i] centers /= weight_in_cluster[:, numpy.newaxis] return centers
[docs]def _centers_sparse(X, sample_weight, labels, n_clusters, distances): """ M step of the K-means EM algorithm Computation of cluster centers / means. :param X: scipy.sparse.csr_matrix, shape (n_samples, n_features) :param sample_weight: array-like, shape (n_samples,) The weights for each observation in X. :param labels: array of integers, shape (n_samples) Current label assignment :param n_clusters: int Number of desired clusters :param distances: array-like, shape (n_samples) Distance to closest cluster for each sample. :return: centers, array, shape (n_clusters, n_features) The resulting centers :githublink:`%|py|263` """ n_samples = X.shape[0] n_features = X.shape[1] data = X.data indices = X.indices indptr = X.indptr dtype = X.dtype centers = numpy.zeros((n_clusters, n_features), dtype=dtype) weight_in_cluster = numpy.zeros((n_clusters,), dtype=dtype) for i in range(n_samples): c = labels[i] weight_in_cluster[c] += sample_weight[i] empty_clusters = numpy.where(weight_in_cluster == 0)[0] n_empty_clusters = empty_clusters.shape[0] # maybe also relocate small clusters? if n_empty_clusters > 0: # find points to reassign empty clusters to far_from_centers = distances.argsort()[::-1][:n_empty_clusters] assign_rows_csr(X, far_from_centers, empty_clusters, centers) for i in range(n_empty_clusters): weight_in_cluster[empty_clusters[i]] = 1 for i in range(labels.shape[0]): curr_label = labels[i] for ind in range(indptr[i], indptr[i + 1]): j = indices[ind] centers[curr_label, j] += data[ind] * sample_weight[i] centers /= weight_in_cluster[:, numpy.newaxis] return centers