Source code for clustpy.partition.gapstatistic

"""
@authors:
Collin Leiber
"""

import numpy as np
from sklearn.decomposition import PCA
from sklearn.base import BaseEstimator, ClusterMixin
from sklearn.utils import check_random_state
from sklearn.cluster import KMeans
import matplotlib.pyplot as plt
from scipy.spatial.distance import pdist


def _gap_statistic(X: np.ndarray, min_n_clusters: int, max_n_clusters: int, n_boots: int,
                   use_principal_components: bool, use_log: bool, random_state: np.random.RandomState) -> (
        int, np.ndarray, np.ndarray, np.ndarray, np.ndarray):
    """
    Start the actual Gap Statistic procedure on the input data set.

    Parameters
    ----------
    X : np.ndarray
        the given data set
    min_n_clusters : int
        Minimum number of clusters. Must be smaller than max_n_clusters
    max_n_clusters : int
        Maximum number of clusters. Must be larger than min_n_clusters
    n_boots : int
        Number of random data sets that should be created to calculate Gap Statistic
    use_principal_components : bool
        True, if the random data sets should be created using the feature-wise minimum and maximum value of the Principle Components.
        Else, the minimum and maximum value of the regular data set will be used
    use_log : bool
        True, if the logarithm of the within cluster dispersion should be used
        For more information see Mohajer et al.
    random_state : np.random.RandomState
        use a fixed random state to get a repeatable solution

    Returns
    -------
    tuple : (int, np.ndarray, np.ndarray, np.ndarray, np.ndarray)
        The first number of clusters that fulfills the Gap condition (can be None),
        The labels as identified by the Gap Statistic (can be None),
        The cluster centers as identified by the Gap Statistic (Can be None),
        The Gap values,
        The sk values
    """
    assert max_n_clusters >= min_n_clusters, "max_n_clusters can not be smaller than min_n_clusters"
    assert n_boots > 0, "n_boots must be larger than 0"
    # Get min and max values for each dimension
    if use_principal_components:
        pca = PCA(n_components=X.shape[1])
        X_transformed = pca.fit_transform(X)
    else:
        pca = None
        X_transformed = X
    mins = np.min(X_transformed, axis=0)
    maxs = np.max(X_transformed, axis=0)
    # Prepare parameters
    gaps = np.zeros(max_n_clusters + 2 - min_n_clusters)
    sks = np.zeros(max_n_clusters + 2 - min_n_clusters)
    all_labels = np.zeros((X.shape[0], max_n_clusters + 2 - min_n_clusters), dtype=np.int32)
    random_datasets = [_generate_random_data(X.shape, mins, maxs, pca, random_state) for _ in
                       range(n_boots)]
    for n_clusters in range(min_n_clusters, max_n_clusters + 2):  # +1 because we need to calculate Gap(k+1)
        # Execute KMeans on the original data
        labels, W_k = _execute_kmeans(X, n_clusters, use_log, random_state)
        # Save labels
        all_labels[:, n_clusters - min_n_clusters] = labels
        # Create random data
        W_kbs = np.zeros(n_boots)
        for b in range(n_boots):
            # Execute KMeans on random data
            labels, W_kb = _execute_kmeans(random_datasets[b], n_clusters, use_log, random_state)
            # Save within cluster dispersion
            W_kbs[b] = W_kb
        # Calculate Gap Statistic
        gaps[n_clusters - min_n_clusters] = np.mean(W_kbs) - W_k
        sks[n_clusters - min_n_clusters] = np.std(W_kbs) * np.sqrt(1 + 1 / n_boots)
    # Check if any result fulfills gap condition
    fulfills_gap = gaps[:-1] >= gaps[1:] - sks[1:]
    # Prepare final result
    if np.any(fulfills_gap):
        best_index = np.where(fulfills_gap)[0][0]
        best_n_clusters = best_index + min_n_clusters
        best_labels = all_labels[:, best_index]
        best_centers = np.array([np.mean(X[best_labels == c], axis=0) for c in range(best_n_clusters)])
    else:
        best_n_clusters = None
        best_labels = None
        best_centers = None
    return best_n_clusters, best_labels, best_centers, gaps, sks


def _generate_random_data(data_shape: tuple, mins: np.ndarray, maxs: np.ndarray, pca: PCA,
                          random_state: np.random.RandomState) -> np.ndarray:
    """
    Create a random data set using a uniform distribution and the feature-wise min and max values of the data set.
    If a PCA was used, rotate the data set back into the original feature space.

    Parameters
    ----------
    data_shape : tuple
        The data shape
    mins : np.ndarray
        The feature-wise minimum values
    maxs : np.ndarray
        The feature-wise maximum values
    pca : PCA
        The PCA object used to calculate mins and maxs. Can be None, if principle components are not used
    random_state : np.random.RandomState
        use a fixed random state to get a repeatable solution

    Returns
    -------
    random_samples : np.ndarray
        The randomly created data set
    """
    random_dataset = random_state.random(size=data_shape) * (maxs - mins) + mins
    if pca is not None:
        random_dataset = pca.inverse_transform(random_dataset)
    return random_dataset


def _execute_kmeans(X: np.ndarray, n_clusters: int, use_log: bool, random_state: np.random.RandomState) -> (
        np.ndarray, float):
    """
    Execute KMeans on the given data set.

    Parameters
    ----------
    X : np.ndarray
        the given data set
    n_clusters : int
        The number of clusters
    use_log : bool
        True, if the logarithm of the within cluster dispersion should be used
        For more information see Mohajer et al.
    random_state : np.random.RandomState
        use a fixed random state to get a repeatable solution

    Returns
    -------
    tuple : (np.ndarray, float)
        The cluster labels,
        The within cluster dispersion
    """
    if n_clusters > 1:
        kmeans = KMeans(n_clusters, random_state=random_state)
        kmeans.fit(X)
        labels = kmeans.labels_
        # Calculate within cluster dispersion
        W_k = np.log(kmeans.inertia_) if use_log else kmeans.inertia_  # Equal to D_k = sum_k(D_r / (2n))
    else:
        labels = np.zeros(X.shape[0])
        # Calculate within cluster dispersion
        W_k = np.sum(pdist(X, metric="sqeuclidean")) / X.shape[0]
        W_k = np.log(W_k) if use_log else W_k
    return labels, W_k


[docs]class GapStatistic(BaseEstimator, ClusterMixin): """ Estimate the number of cluster for KMeans using the Gar Statistic. Calculate the Gap Statistic by comparing within cluster dispersion of the input data set with that of ranomly sampled data. The Gap Statistic is evaluated for multiple numebers of clusters. First clustering result that fulfills the Gap condition 'Gap(k) >= Gap(k+1)-s_{k+1}' will be returned. Beware: Result can be None if no clustering result fulfills that condition! Parameters ---------- min_n_clusters : int Minimum number of clusters. Must be smaller than max_n_clusters (default: 1) max_n_clusters : int Maximum number of clusters. Must be larger than min_n_clusters (default: 10) n_boots : int Number of random data sets that should be created to calculate Gap Statistic (default: 10) use_principal_components : bool True, if the random data sets should be created using the feature-wise minimum and maximum value of the Principle Components. Else, the minimum and maximum value of the regular data set will be used (default: True) use_log : bool True, if the logarithm of the within cluster dispersion should be used. For more information see Mohajer et al. (default: True) random_state : np.random.RandomState | int use a fixed random state to get a repeatable solution. Can also be of type int (default: None) Attributes ---------- n_clusters_ : int The first number of clusters that fulfills the Gap condition (can be None) labels_ : np.ndarray The labels as identified by the Gap Statistic (can be None) cluster_centers_ : np.ndarray The cluster centers as identified by the Gap Statistic (Can be None) gaps_ : np.ndarray The Gap values, sks_ : np.ndarray The sk values Examples ---------- >>> from sklearn.datasets import make_blobs >>> X, L = make_blobs(1000, 2, centers=7, cluster_std=0.7) >>> gs = GapStatistic() >>> gs.fit(X) >>> print(gs.n_clusters_) >>> gs.plot() References ---------- Tibshirani, Robert, Guenther Walther, and Trevor Hastie. "Estimating the number of clusters in a data set via the gap statistic." Journal of the Royal Statistical Society: Series B (Statistical Methodology) 63.2 (2001): 411-423. and Mohajer, Mojgan, Karl-Hans Englmeier, and Volker J. Schmid. "A comparison of Gap statistic definitions with and without logarithm function." arXiv preprint arXiv:1103.4767 (2011). """ def __init__(self, min_n_clusters: int = 1, max_n_clusters: int = 10, n_boots: int = 10, use_principal_components: bool = True, use_log: bool = True, random_state: np.random.RandomState | int = None): self.min_n_clusters = min_n_clusters self.max_n_clusters = max_n_clusters self.n_boots = n_boots self.use_principal_components = use_principal_components self.use_log = use_log self.random_state = check_random_state(random_state)
[docs] def fit(self, X: np.ndarray, y: np.ndarray = None) -> 'GapStatistic': """ Initiate the actual clustering process on the input data set. The resulting cluster labels will be stored in the labels_ attribute. Parameters ---------- X : np.ndarray the given data set y : np.ndarray the labels (can be ignored) Returns ------- self : GapStatistic this instance of the GapStatistic algorithm """ n_clusters, labels, centers, gaps, sks = _gap_statistic(X, self.min_n_clusters, self.max_n_clusters, self.n_boots, self.use_principal_components, self.use_log, self.random_state) self.n_clusters_ = n_clusters self.labels_ = labels self.cluster_centers_ = centers self.gaps_ = gaps self.sks_ = sks return self
[docs] def plot(self) -> None: """ Plot the result of the Gap Statistic. Shows the number of the clusters on the x-axis and the Gap values on the y-axis. """ assert hasattr(self, "gaps_"), "The Gap Statistic algorithm has not run yet. Use the fit() function first." plt.plot(np.arange(self.min_n_clusters, self.max_n_clusters + 1), self.gaps_[:-1]) plt.errorbar(np.arange(self.min_n_clusters, self.max_n_clusters + 1), self.gaps_[:-1], self.sks_[:-1], capsize=3, linestyle='None') plt.ylabel("Gap Statistic") plt.xlabel("n_clusters") plt.show()