Source code for clustpy.partition.xmeans

"""
@authors:
Collin Leiber
"""

from sklearn.cluster import KMeans
import numpy as np
from sklearn.base import BaseEstimator, ClusterMixin
from sklearn.utils import check_random_state
from clustpy.utils._information_theory import bic_costs

"""
HELPERS also used by other classes
"""


def _initial_kmeans_clusters(X: np.ndarray, n_clusters_init: int | np.ndarray, random_state: np.random.RandomState) -> (
        int, np.ndarray, np.ndarray, float):
    """
    Get the initial cluster centers and cluster labels based on the n_clusters_init parameter.
    If n_clusters_init is an integer, the cluster parameters are identified by KMeans with n_clusters_init als single input.
    If n_clusters_init is of type np.ndarray, the cluster parameters are identified by KMeans with the initial cluster centers given by n_clusters_init.

    Parameters
    ----------
    X : np.ndarray
        the given data set
    n_clusters_init : int | np.ndarray
        The initial number of clusters. Can also be of type np.ndarray if initial cluster centers are specified
    random_state : np.random.RandomState
        use a fixed random state to get a repeatable solution

    Returns
    -------
    tuple : (int, np.ndarray, np.ndarray, float)
        The initial number of clusters,
        The initial cluster labels,
        The initial cluster centers,
        The Kmeans error of the initial clustering result
    """
    if type(n_clusters_init) is int and n_clusters_init == 1:
        n_clusters = n_clusters_init
        labels = np.zeros(X.shape[0], dtype=np.int32)
        centers = np.mean(X, axis=0).reshape(1, -1)
        kmeans_error = np.sum((X - centers) ** 2)
    else:
        if type(n_clusters_init) is int:
            # Normally, n_clusters_init is int
            n_clusters = n_clusters_init
            kmeans = KMeans(n_clusters=n_clusters, random_state=random_state)
        else:
            # If n_clusters_init is array, this should be equal to the initial cluster centers
            n_clusters = n_clusters_init.shape[0]
            kmeans = KMeans(n_clusters=n_clusters, init=n_clusters_init, n_init=1, random_state=random_state)
        kmeans.fit(X)
        labels = kmeans.labels_
        centers = kmeans.cluster_centers_
        kmeans_error = kmeans.inertia_
    return n_clusters, labels, centers, kmeans_error


def _execute_two_means(X: np.ndarray, ids_in_each_cluster: list, cluster_id_to_split: int, centers: np.ndarray,
                       n_split_trials: int, random_state: np.random.RandomState) -> (np.ndarray, np.ndarray, float):
    """
    Execute 2-Means.
    Splits a cluster into two by first selecting a random object from the data set as first new cluster and then selects the coordinate on the opposite site of the original center as the second new center.
    Afterwards, KMeans will be executed.
    This procedure is repeated n_split_trials times and the result with the lowest KMeans-error will be returned.

    Parameters
    ----------
    X : np.ndarray
        the given data set
    ids_in_each_cluster : list
        List that contains for each cluster an array with the ids of all objects within this cluster
    cluster_id_to_split : int
        The id of the cluster that should be split
    centers : np.ndarray
        The original cluster centers
    n_split_trials : int
        Number tries to split a cluster. For each try 2-KMeans is executed with different cluster centers
    random_state : np.random.RandomState
        use a fixed random state to get a repeatable solution

    Returns
    -------
    tuple : (np.ndarray, np.ndarray, float)
        The resulting cluster labels,
        The resuling cluster centers,
        The Kmeans error of the clustering result
    """
    assert X.shape[0] >= 2, "X must contain at least 2 elements"
    # Prepare cluster for splitting
    old_center = centers[cluster_id_to_split, :]
    reduced_centers = np.delete(centers, cluster_id_to_split, axis=0)
    ids_in_cluster = ids_in_each_cluster[cluster_id_to_split]
    # Try to find kmeans result with smallest squared distances
    best_kmeans = None
    # Get random points in cluster as new centers
    n_split_trials = min(n_split_trials, ids_in_cluster.shape[0])
    random_centers = X[random_state.choice(ids_in_cluster, n_split_trials, replace=False), :]
    # Calculate second new centers as: new2 = old - (new1 - old)
    adjusted_centers = old_center - (random_centers - old_center)
    # Get Kmeans result with minimum Kmeans-error
    for i in range(n_split_trials):
        # Run kmeans with new centers
        tmp_centers = np.r_[reduced_centers, [random_centers[i]], [adjusted_centers[i]]]
        kmeans = KMeans(n_clusters=tmp_centers.shape[0], init=tmp_centers, n_init=1, random_state=random_state)
        kmeans.fit(X)
        # Check squared distances to find best kmeans result
        if best_kmeans is None or kmeans.inertia_ < best_kmeans.inertia_:
            best_kmeans = kmeans
    return best_kmeans.labels_, best_kmeans.cluster_centers_, best_kmeans.inertia_


"""
Actual XMeans methods
"""


def _xmeans(X: np.ndarray, n_clusters_init: int, max_n_clusters: int, check_global_score: bool, allow_merging: bool,
            n_split_trials: int, random_state: np.random.RandomState) -> (int, np.ndarray, np.ndarray):
    """
    Start the actual XMeans clustering procedure on the input data set.

    Parameters
    ----------
    X : np.ndarray
        the given data set
    n_clusters_init : int
        The initial number of clusters. Can also by of type np.ndarray if initial cluster centers are specified
    max_n_clusters : int
        Maximum number of clusters. Must be larger than n_clusters_init
    check_global_score : bool
        Defines whether the global BIC score should be checked after the 'Improve-Params' step. Some implementations skip this step
    allow_merging : bool
        Try to merge clusters after the regular XMeans algorithm terminated. See Ishioka et al. for more information
    n_split_trials : int
        Number tries to split a cluster. For each try 2-KMeans is executed with different cluster centers
    random_state : np.random.RandomState
        use a fixed random state to get a repeatable solution

    Returns
    -------
    tuple : (int, np.ndarray, np.ndarray)
        The final number of clusters,
        The labels as identified by XMeans,
        The cluster centers as identified by XMeans
    """
    assert max_n_clusters >= n_clusters_init, "max_n_clusters can not be smaller than n_clusters_init"
    n_dims = X.shape[1]
    n_clusters, labels, centers, global_variance = _initial_kmeans_clusters(X, n_clusters_init, random_state)
    # Get parameters of all clusters
    ids_in_each_cluster = [np.where(labels == c)[0] for c in range(n_clusters)]
    cluster_sizes = np.array([ids_in_cluster.shape[0] for ids_in_cluster in ids_in_each_cluster])
    cluster_variances = np.array([np.sum((X[ids_in_each_cluster[c]] - centers[c]) ** 2) / (
            cluster_sizes[c] - 1) for c in range(n_clusters)])  # Only used if allow_merging is True
    if check_global_score:
        # Get initial global variance
        global_variance = global_variance / (X.shape[0] - n_clusters)
        # Get initial global BIC score
        best_global_bic_score = _bic_score(X.shape[0], cluster_sizes, n_dims, global_variance)
        # Save best result
        best_result = (n_clusters, labels, centers, ids_in_each_cluster, cluster_sizes, cluster_variances)
    while n_clusters < max_n_clusters:
        n_clusters_old = n_clusters
        # Split Clusters => Improve-Structure
        for c in range(n_clusters_old):
            ids_in_cluster = ids_in_each_cluster[c]
            original_cluster_size = cluster_sizes[c]
            if ids_in_cluster.shape[0] <= 2:
                # Cluster can not be split because it is too small
                continue
            # Get variance of original cluster
            cluster_variance = cluster_variances[c]
            # Get BIC score of original cluster
            cluster_bic_score = _bic_score(original_cluster_size, original_cluster_size, n_dims, cluster_variance)
            # Split cluster into two
            labels_split, centers_split, split_variance = _execute_two_means(X[ids_in_cluster],
                                                                             [np.arange(original_cluster_size)], 0,
                                                                             np.array([centers[c]]), n_split_trials,
                                                                             random_state)
            # Get variance of splitted clusters
            split_variance = split_variance / (ids_in_cluster.shape[0] - 2)
            cluster_sizes_split = np.array([np.sum(labels_split == c) for c in range(2)])
            # Get BIC score of splitted clusters
            split_cluster_bic_score = _bic_score(original_cluster_size, cluster_sizes_split, n_dims, split_variance)
            if cluster_bic_score < split_cluster_bic_score:
                # Keep new clusters
                centers[c] = centers_split[0]
                centers = np.r_[centers, [centers_split[1]]]
                labels[ids_in_cluster[labels_split == 1]] = n_clusters
                n_clusters += 1
                # If maximum number of clusters is reached, stop iterating over the current clusters
                if n_clusters == max_n_clusters:
                    break
        # If no cluster changed, XMeans terminates
        if n_clusters == n_clusters_old:
            break
        else:
            # Prepare the clusters for the next iteration => Improve-Params
            kmeans = KMeans(n_clusters=n_clusters, init=centers, n_init=1, random_state=random_state)
            kmeans.fit(X)
            centers = kmeans.cluster_centers_
            labels = kmeans.labels_
            # Update parameters of all clusters
            ids_in_each_cluster = [np.where(labels == c)[0] for c in range(n_clusters)]
            cluster_sizes = np.array([ids_in_cluster.shape[0] for ids_in_cluster in ids_in_each_cluster])
            cluster_variances = [np.sum((X[ids_in_each_cluster[c]] - centers[c]) ** 2) / (
                    cluster_sizes[c] - 1) if cluster_sizes[c] > 1 else 0 for c in
                                 range(n_clusters)]  # Only used if allow_merging is True
            if check_global_score:
                # Get new global variance
                global_variance = kmeans.inertia_ / (X.shape[0] - n_clusters)
                # Get new global BIC score
                new_global_bic_score = _bic_score(X.shape[0], cluster_sizes, n_dims, global_variance)
                if best_global_bic_score < new_global_bic_score:
                    # If score improved, save new best model
                    best_global_bic_score = new_global_bic_score
                    best_result = (
                        n_clusters, labels.copy(), centers.copy(), ids_in_each_cluster.copy(), cluster_sizes.copy(),
                        cluster_variances.copy())
    if check_global_score:
        # Exchange latest result with best overall result
        n_clusters, labels, centers, ids_in_each_cluster, cluster_sizes, cluster_variances = best_result
    # OPTIONAL: try to merge clusters
    if allow_merging:
        n_clusters, labels, centers = _merge_clusters(X, n_clusters, labels, centers, ids_in_each_cluster,
                                                      cluster_sizes, cluster_variances)
    return n_clusters, labels, centers


def _merge_clusters(X: np.ndarray, n_clusters: int, labels: np.ndarray, centers: np.ndarray, ids_in_each_cluster: list,
                    cluster_sizes: np.ndarray, cluster_variances: np.ndarray) -> (int, np.ndarray, np.ndarray):
    """
    Addition to XMeans by Ishioka et al..
    Attempts to repair errors caused by an unfortunate splitting order by merging clusters.
    Tests all pairwise combinations of clusters starting with the smallest clusters.
    Here, each original cluster can only be merged once.

    Parameters
    ----------
    X : np.ndarray
        the given data set
    n_clusters : int
        The number of clusters
    labels : np.ndarray
        The cluster labels
    centers : np.ndarray
        The cluster centers
    ids_in_each_cluster : list
        List containing the ids of the samples of a cluster
    cluster_sizes : np.ndarray
        The sizes of the clusters
    cluster_variances : np.ndarray
        The variances of the clusters

    Returns
    -------
    tuple : (int, np.ndarray, np.ndarray)
        The updated number of clusters,
        The updated labels,
        The updated cluster centers

    References
    ----------
    Ishioka, Tsunenori. "An expansion of X-means for automatically determining the optimal number of clusters."
    Proceedings of International Conference on Computational Intelligence. Vol. 2. 2005.
    """
    n_dims = X.shape[1]
    argsorted_sizes = np.argsort(cluster_sizes)
    already_merged = [False] * n_clusters
    n_cluster_old = n_clusters
    # Check each combination of clusters, starting with the smallest
    for c1_not_sorted in range(n_cluster_old):
        c1 = argsorted_sizes[c1_not_sorted]
        if already_merged[c1]:
            continue
        for c2_not_sorted in range(c1_not_sorted + 1, n_cluster_old):
            c2 = argsorted_sizes[c2_not_sorted]
            if already_merged[c2]:
                continue
            combined_cluster_size = cluster_sizes[c1] + cluster_sizes[c2]
            # Get BIC score of non-merged clusters
            cluster_1_and_2_variance = (cluster_variances[c1] * (cluster_sizes[c1] - 1) +
                                        cluster_variances[c2] * (cluster_sizes[c2] - 1)) / (combined_cluster_size - 2)
            cluster_1_and_2_bic_score = _bic_score(combined_cluster_size, cluster_sizes[[c1, c2]], n_dims,
                                                   cluster_1_and_2_variance)
            # Get BIC of merged cluster
            new_center = (centers[c1] * cluster_sizes[c1] + centers[c2] * cluster_sizes[c2]) / combined_cluster_size
            cluster_merged_variance = np.sum(
                (X[np.r_[ids_in_each_cluster[c1], ids_in_each_cluster[c2]]] - new_center) ** 2) / (
                                              combined_cluster_size - 1)
            cluster_merged_bic_score = _bic_score(combined_cluster_size, combined_cluster_size,
                                                  n_dims, cluster_merged_variance)
            # Is merge improving the local BIC score?
            if cluster_merged_bic_score > cluster_1_and_2_bic_score:
                # Update labels and centers
                min_cluster_id = min(c1, c2)
                max_cluster_id = max(c1, c2)
                labels[labels == max_cluster_id] = min_cluster_id
                centers[min_cluster_id] = new_center
                cluster_sizes[max_cluster_id] = 0
                # Set already_merged for both cluster to True so they can not be merged again
                already_merged[c1] = True
                already_merged[c2] = True
                n_clusters -= 1
                break
    # Remove empty clusters. Needs to be done from max cluster id to min cluster id
    for c in range(n_cluster_old - 1, -1, -1):
        if cluster_sizes[c] == 0:
            labels[labels > c] -= 1
            centers = np.delete(centers, c, axis=0)
    return n_clusters, labels, centers


def _bic_score(n_points: int, cluster_sizes: np.ndarray, n_dims: int, variance: float) -> float:
    """
    Calculate the BIC score of a clustering result.
    For more information see: 'X-means: Extending k-means with efficient estimation of the number of clusters'

    Parameters
    ----------
    n_points : int
        Number of samples in the data set
    cluster_sizes : np.ndarray
        Number of samples in each cluster. Can also by of type int in case of a single cluster
    n_dims : int
        Number of features in the data set
    variance : float
        The variance across all clusters

    Returns
    -------
    bic_total : float
        The BIC score of the cluster
    """
    n_clusters = cluster_sizes.shape[0] if type(cluster_sizes) is np.ndarray else 1
    # BIC of the free parameters
    n_free_params = n_clusters * (n_dims + 1)  # Equal to: (n_clusters - 1) + n_clusters * n_dims + 1
    bic_free_params = n_free_params * bic_costs(n_points, False)
    # BIC of the data using the loglikelihood
    bic_loglikelihood = np.sum(cluster_sizes * (np.log(cluster_sizes) - np.log(n_points) - np.log(
        2.0 * np.pi) / 2 - n_dims * np.log(variance) / 2) - (cluster_sizes - n_clusters) / 2)
    # Combine BIC score components
    bic_total = bic_loglikelihood - bic_free_params
    return bic_total


[docs]class XMeans(BaseEstimator, ClusterMixin): """ Parameters ---------- n_clusters_init : int The initial number of clusters. Can also by of type np.ndarray if initial cluster centers are specified (default: 2) max_n_clusters : int Maximum number of clusters. Must be larger than n_clusters_init (default: np.inf) check_global_score : bool Defines whether the global BIC score should be checked after the 'Improve-Params' step. Some implementations skip this step (default: True) allow_merging : bool Try to merge clusters after the regular XMeans algorithm terminated. See Ishioka et al. for more information. Normally, if allow_merging is True, check_global_score should be False (default: False) n_split_trials : int Number tries to split a cluster. For each try 2-KMeans is executed with different cluster centers (default: 10) random_state : np.random.RandomState | int use a fixed random state to get a repeatable solution. Can also be of type int (default: None) Attributes ---------- n_clusters_ : int The final number of clusters labels_ : np.ndarray The final labels cluster_centers_ : np.ndarray The final cluster centers Examples ---------- >>> from clustpy.partition import XMeans >>> from sklearn.datasets import make_blobs >>> from clustpy.utils import plot_with_transformation >>> rs = np.random.RandomState(11) >>> X, L = make_blobs(500, 2, centers=1, cluster_std=2, random_state=rs) >>> X2, L2 = make_blobs(1000, 2, centers=4, cluster_std=0.5, random_state=rs) >>> X = np.r_[X, X2] >>> for b in [False, True]: >>> xm = XMeans(allow_merging=b, random_state=rs) >>> xm.fit(X) >>> plot_with_transformation(X, xm.labels_, xm.cluster_centers_) References ---------- Pelleg, Dan, and Andrew W. Moore. "X-means: Extending k-means with efficient estimation of the number of clusters." Icml. Vol. 1. 2000. and Ishioka, Tsunenori. "An expansion of X-means for automatically determining the optimal number of clusters." Proceedings of International Conference on Computational Intelligence. Vol. 2. 2005. """ def __init__(self, n_clusters_init: int = 2, max_n_clusters: int = np.inf, check_global_score: bool = True, allow_merging: bool = False, n_split_trials: int = 10, random_state: np.random.RandomState | int = None): self.n_clusters_init = n_clusters_init self.max_n_clusters = max_n_clusters self.check_global_score = check_global_score self.allow_merging = allow_merging self.n_split_trials = n_split_trials self.random_state = check_random_state(random_state)
[docs] def fit(self, X: np.ndarray, y: np.ndarray = None) -> 'XMeans': """ Initiate the actual clustering process on the input data set. The resulting cluster labels will be stored in the labels_ attribute. Parameters ---------- X : np.ndarray the given data set y : np.ndarray the labels (can be ignored) Returns ------- self : XMeans this instance of the XMeans algorithm """ n_clusters, labels, centers = _xmeans(X, self.n_clusters_init, self.max_n_clusters, self.check_global_score, self.allow_merging, self.n_split_trials, self.random_state) self.n_clusters_ = n_clusters self.labels_ = labels self.cluster_centers_ = centers return self