# -*- coding: utf-8 -*-
"""
Implémente la classe :class:`ConstraintKMeans <mlinsights.mlmodel.kmeans_constraint.ConstraintKMeans>`.
:githublink:`%|py|6`
"""
import numpy
from scipy.spatial import Delaunay # pylint: disable=E0611
from sklearn.cluster import KMeans
from sklearn.metrics.pairwise import euclidean_distances
from ._kmeans_constraint_ import constraint_kmeans, constraint_predictions
[docs]class ConstraintKMeans(KMeans):
"""
Defines a constraint :epkg:`k-means`.
Clusters are modified to have an equal size.
The algorithm is initialized with a regular :epkg:`KMeans`
and continues with a modified version of it.
Computing the predictions offer a choice.
The first one is to keep the predictions
from the regular :epkg:`k-means`
algorithm but with the balanced clusters.
The second is to compute balanced predictions
over the test set. That implies that the predictions
for the same observations might change depending
on the set it belongs to.
The parameter *strategy* determines how
obseervations should be assigned to a cluster.
The value can be:
* ``'distance'``: observations are ranked by distance to a cluster,
the algorithm assigns first point to the closest center unless it reached
the maximum size, it deals first with the further point and maps it to
the closest center
* ``'gain'``: follows the algorithm described at
see `Same-size k-Means Variation
<https://elki-project.github.io/tutorial/same-size_k_means>`_,
* ``'weights'``: estimates weights attached to each cluster,
it weights the distance to each cluster in order
to balance the number of points mapped to every cluster,
the strategy uses a learning rate.
The first two strategies cannot reach a good compromise
without using function :func:`_switch_clusters <mlinsights.mlmodel._kmeans_constraint_._switch_clusters>` which
tries every switch between clusters: two points
change clusters. It keeps the number of points and checks
that the inertia is reduced.
:githublink:`%|py|50`
"""
_strategy_value = {'distance', 'gain', 'weights'}
[docs] def __init__(self, n_clusters=8, init='k-means++', n_init=10, max_iter=500,
tol=0.0001, precompute_distances='deprecated', verbose=0,
random_state=None, copy_x=True, n_jobs=1, algorithm='auto',
balanced_predictions=False, strategy='gain', kmeans0=True,
learning_rate=1., history=False):
"""
:param n_clusters: number of clusters
:param init: used by :epkg:`k-means`
:param n_init: used by :epkg:`k-means`
:param max_iter: used by :epkg:`k-means`
:param tol: used by :epkg:`k-means`
:param precompute_distances: used by :epkg:`k-means`
:param verbose: used by :epkg:`k-means`
:param random_state: used by :epkg:`k-means`
:param copy_x: used by :epkg:`k-means`
:param n_jobs: used by :epkg:`k-means`
:param algorithm: used by :epkg:`k-means`
:param balanced_predictions: produced balanced prediction
or the regular ones
:param strategy: strategy or algorithm used to abide
by the constraint
:param kmeans0: if True, applies *k-means* algorithm first
:param history: keeps centers accress iterations
:param learning_rate: learning rate, used by strategy `'weights'`
:githublink:`%|py|78`
"""
KMeans.__init__(self, n_clusters=n_clusters, init=init, n_init=n_init,
max_iter=max_iter, tol=tol, precompute_distances=precompute_distances,
verbose=verbose, random_state=random_state, copy_x=copy_x,
n_jobs=n_jobs, algorithm=algorithm)
self.balanced_predictions = balanced_predictions
self.strategy = strategy
self.kmeans0 = kmeans0
self.history = history
self._n_threads = None
self.learning_rate = learning_rate
if strategy not in ConstraintKMeans._strategy_value:
raise ValueError('strategy must be in {0}'.format(
ConstraintKMeans._strategy_value))
if precompute_distances == 'deprecated':
km = KMeans()
if km.precompute_distances != precompute_distances:
self.precompute_distances = km.precompute_distances
[docs] def fit(self, X, y=None, sample_weight=None, fLOG=None):
"""
Compute k-means clustering.
:param X: array-like or sparse matrix, shape=(n_samples, n_features)
Training instances to cluster. It must be noted that the data
will be converted to C ordering, which will cause a memory
copy if the given data is not C-contiguous.
:param y: Ignored
:param sample_weight: sample weight
:param fLOG: logging function
:githublink:`%|py|108`
"""
max_iter = self.max_iter
self.max_iter //= 2
if self.kmeans0:
KMeans.fit(self, X, y, sample_weight=sample_weight)
state = None
else:
state = numpy.random.RandomState( # pylint: disable=E1101
self.random_state)
labels = state.randint(
0, self.n_clusters, X.shape[0], dtype=numpy.int32)
centers = numpy.empty((self.n_clusters, X.shape[1]), dtype=X.dtype)
choice = state.randint(0, self.n_clusters, self.n_clusters)
for i, c in enumerate(choice):
centers[i, :] = X[c, :]
self.labels_ = labels
self.cluster_centers_ = centers
self.inertia_ = float(X.shape[0])
self.n_iter_ = 0
self.max_iter = max_iter
return self.constraint_kmeans(
X, sample_weight=sample_weight, state=state,
learning_rate=self.learning_rate,
history=self.history, fLOG=fLOG)
[docs] def constraint_kmeans(self, X, sample_weight=None, state=None,
learning_rate=1., history=False, fLOG=None):
"""
Completes the constraint k-means.
:param X: features
:param sample_weight: sample weight
:param state: state
:param history: keeps evolution of centers
:param fLOG: logging function
:githublink:`%|py|144`
"""
labels, centers, inertia, weights, iter_, all_centers = constraint_kmeans(
X, self.labels_, sample_weight, self.cluster_centers_,
inertia=self.inertia_, iter=self.n_iter_,
max_iter=self.max_iter, verbose=self.verbose,
strategy=self.strategy, state=state,
learning_rate=learning_rate, history=history,
fLOG=fLOG)
self.labels_ = labels
self.cluster_centers_ = centers
self.cluster_centers_iter_ = (
None if len(all_centers) == 0 else numpy.dstack(all_centers))
self.inertia_ = inertia
self.n_iter_ = iter_
self.weights_ = weights
return self
[docs] def predict(self, X, sample_weight=None):
"""
Computes the predictions.
:param X: features.
:return: prediction
:githublink:`%|py|167`
"""
if self.weights_ is None:
if self.balanced_predictions:
labels, _, __ = constraint_predictions(
X, self.cluster_centers_, strategy=self.strategy + '_p')
return labels
return KMeans.predict(self, X, sample_weight=sample_weight)
else:
if self.balanced_predictions:
raise RuntimeError( # pragma: no cover
"balanced_predictions and weights_ cannot be used together.")
return KMeans.predict(self, X, sample_weight=sample_weight)
[docs] def score(self, X, y=None, sample_weight=None):
"""
Returns the distances to all clusters.
:param X: features
:param y: unused
:param sample_weight: sample weight
:return: distances
:githublink:`%|py|218`
"""
if self.weights_ is None:
if self.balanced_predictions:
_, __, dist_close = constraint_predictions(
X, self.cluster_centers_, strategy=self.strategy)
return dist_close
res = euclidean_distances(self.cluster_centers_, X, squared=True)
else:
if self.balanced_predictions:
raise RuntimeError( # pragma: no cover
"balanced_predictions and weights_ cannot be used together.")
res = euclidean_distances(X, self.cluster_centers_, squared=True)
res *= self.weights_.reshape((1, -1))
return res.max(axis=1)
[docs] def cluster_edges(self):
"""
Computes edges between clusters based on a
`Delaunay <https://docs.scipy.org/doc/scipy/reference/
generated/scipy.spatial.Delaunay.html>`_
graph.
:githublink:`%|py|239`
"""
tri = Delaunay(self.cluster_centers_)
triangles = tri.simplices # pylint: disable=E1101
edges = set()
for row in triangles:
for j in range(1, row.shape[-1]):
a, b = row[j - 1:j + 1]
if a < b:
edges.add((a, b))
else:
edges.add((b, a))
a, b = row[0], row[-1]
if a < b:
edges.add((a, b))
else:
edges.add((b, a))
return edges