Source code for clustpy.partition.specialk

"""
@authors:
Collin Leiber

Based on the original implementation, available at:
https://github.com/Sibylse/SpecialK/blob/master/SpecialK.ipynb
"""

import numpy as np
from scipy.spatial.distance import pdist, squareform
from sklearn.neighbors import radius_neighbors_graph, kneighbors_graph
from sklearn.cluster import KMeans
from sklearn.base import BaseEstimator, ClusterMixin
from sklearn.utils import check_random_state
import scipy


def _specialk(X: np.ndarray, significance: float, n_dimensions: int, similarity_matrix: str, n_neighbors: int,
              percentage: float, n_cluster_pairs_to_consider: int, max_n_clusters: int,
              random_state: np.random.RandomState, debug: bool) -> (int, np.ndarray):
    """
    Start the actual SpecialK clustering procedure on the input data set.

    Parameters
    ----------
    X : np.ndarray
        the given data set
    significance : float
        Threshold to decide if the samples originate from a single distribution or two distributions
    n_dimensions : int
        Dimensionality of the embedding
    similarity_matrix : str
        Defines the similarity matrix to use. Can be one of the following strings or a numpy array / scipy sparse csr matrix.
        If 'NAM', a neighborhood adjacency matrix is used.
        If 'SAM' a symmetrically normalized adjacency matrix is used
    n_neighbors : int
        Number of neighbors for the construction of the similarity matrix. Does not include the object itself
    percentage : float
        The amount of data points that should have at least n_neighbors neighbors.
        Only relevant if use_neighborhood_adj_mat is set to True
    n_cluster_pairs_to_consider : int
        The number of cluster pairs to consider when calculating the probability boundary.
        The cluster pairs responsible for the highest cut. If None, all pairs will be considered.
        Smaller values for n_cluster_pairs_to_consider will decrease the computing time
    max_n_clusters : int
        Maximum number of clusters
    random_state : np.random.RandomState
        use a fixed random state to get a repeatable solution
    debug : bool
        If true, additional information will be printed to the console

    Returns
    -------
    tuple : (int, np.ndarray)
        The final number of clusters,
        The labels as identified by DipMeans,
    """
    assert significance >= 0 and significance <= 1, "significance must be a value in the range [0, 1]"
    assert percentage >= 0 and percentage <= 1, "percentage must be a value in the range [0, 1]"
    if type(similarity_matrix) is str and similarity_matrix == 'NAM':
        final_similarity_matrix = _get_neighborhood_adjacency_matrix(X, percentage, n_neighbors)
    elif type(similarity_matrix) is str and similarity_matrix == 'SAM':
        final_similarity_matrix = _get_symmetrically_normalized_adjacency_matrix(X, n_neighbors)
    elif type(similarity_matrix) is np.ndarray or type(similarity_matrix) is scipy.sparse.csr_matrix:
        final_similarity_matrix = similarity_matrix
    else:
        raise ValueError(
            "similarity_matrix must be 'NAM' (Neighborhood Adjacency Matrix), 'SAM' (Symmetrically Normalized Adjacency Matrix) or a numpy array.")
    # Get eigenvalues and eigenvectors
    my_lambda, V = scipy.sparse.linalg.eigsh(final_similarity_matrix, k=n_dimensions, which="LM")
    my_lambda, D = np.absolute(my_lambda), np.absolute(V)
    D = D * np.sqrt(my_lambda)
    # Initial values
    n_clusters = 2
    stop_search = False
    best_labels = np.zeros(X.shape[0])
    while n_clusters <= max_n_clusters:
        if debug:
            print("=== n_clusters={0} ===".format(n_clusters))
        # Execute KMeans
        kmeans = KMeans(n_clusters, random_state=random_state)
        kmeans.fit(D)
        ids_in_each_cluster = [np.where(kmeans.labels_ == c)[0] for c in range(n_clusters)]
        if n_cluster_pairs_to_consider is None:
            # Get all pairs of clusters
            cluster_pairs = [(c1, c2) for c1 in range(n_clusters - 1) for c2 in range(c1 + 1, n_clusters)]
        else:
            # Only consider the pairs of clusters responsible for the maximum cuts
            # Calculate cuts
            one_hot = np.zeros((D.shape[0], n_clusters), dtype=int)
            for c in range(n_clusters):
                one_hot[ids_in_each_cluster[c], c] = 1
            cuts = one_hot.T @ final_similarity_matrix @ one_hot
            # Fill upper triangle matrix with 0s and ignore diagonal
            cuts = np.tril(cuts)
            np.fill_diagonal(cuts, -np.inf)  # operation is inplace
            # Get cluster pairs with maximum cuts
            cluster_pairs = np.column_stack(np.unravel_index(np.argsort(cuts, axis=None), cuts.shape))[::-1]
            cluster_pairs = cluster_pairs[:min(int(n_clusters * (n_clusters - 1) / 2), n_cluster_pairs_to_consider)]
        # Iterate over the cluster pairs
        for c1, c2 in cluster_pairs:
            if debug:
                print("check cluster {0} and {1} (cut={2})".format(c1, c2, cuts[
                    c1, c2] if n_cluster_pairs_to_consider is not None else None))
            ids_in_cluster_1 = ids_in_each_cluster[c1]
            ids_in_cluster_2 = ids_in_each_cluster[c2]
            # Calculate bound
            t_total = _zz_top_bound(D, ids_in_cluster_1, ids_in_cluster_2, debug)
            if debug:
                print("ZZ top:", t_total)
            if t_total > significance:
                # Stop execution -> return n_clusters - 1
                stop_search = True
                break
        if stop_search:
            break
        else:
            # Save current result as best one so far
            best_labels = kmeans.labels_
            n_clusters += 1
    # Return number of clusters and labels
    if debug:
        print("Final n_clusters={0}".format(n_clusters - 1))
    return n_clusters - 1, best_labels


def _zz_top_bound(D: np.ndarray, ids_in_cluster_1: np.ndarray, ids_in_cluster_2: np.ndarray, debug: bool) -> float:
    """
    Calculate the ZZ Top bound

    Parameters
    ----------
    D : np.ndarray
        Approximated similarity matrix
    ids_in_cluster_1 : np.ndarray
        The ids of the objects in cluster 1
    ids_in_cluster_2 : np.ndarray
        The ids of the objects in cluster 2
    debug : bool
        If true, additional information will be printed to the console

    Returns
    -------
    t_total : float
        The calculated bound
    """
    Dj = D[np.r_[ids_in_cluster_1, ids_in_cluster_2],]
    norms = np.linalg.norm(Dj, axis=0)
    norms[norms == 0] = 1
    Z = (Dj - np.mean(Dj, axis=0)) / norms
    sigma2 = np.mean(Z * Z)
    t1 = np.linalg.norm(np.sum(Z[:ids_in_cluster_1.shape[0]], axis=0)) ** 2 / ids_in_cluster_1.shape[0]
    t2 = np.linalg.norm(np.sum(Z[ids_in_cluster_1.shape[0]:], axis=0)) ** 2 / ids_in_cluster_2.shape[0]
    t = max(t1, t2) - sigma2 * Dj.shape[1]
    if debug:
        print("sigma={0} / t={1}".format(sigma2, t))
    t_total = Dj.shape[0] * np.exp(-0.5 * t ** 2 / (Dj.shape[1] * sigma2 + t / 3))
    return t_total


def _get_neighborhood_adjacency_matrix(X: np.ndarray, percentage: float = 0.99,
                                       n_neighbors: int = 10) -> scipy.sparse.csr_matrix:
    """
    Get a neighborhood adjacency matrix, so that p% of the data points have at least n_neighbors neighbors.
    Here, p can be chosen using the 'percentage' parameter.

    Parameters
    ----------
    X : np.ndarray
        the given data set
    percentage : float
        The amount of data points that should have at least n_neighbors neighbors (default: 0.99)
    n_neighbors : int
        The number of neighbors, not including the object itself (default: 10)

    Returns
    -------
    similarity_matrix : scipy.sparse.csr_matrix
        The resulting similarity matrix
    """
    # Get pairwise distances
    dist_matrix = squareform(pdist(X, 'euclidean'))
    # Get kNN distances (+1 because self is not included in n_neighbors)
    knn_distances = np.sort(dist_matrix, axis=1)[:, n_neighbors + 1]
    # Get knn dist so that more than 'percentage' points have 'n_neighbors' neighbors
    knn_dist_sorted = np.sort(knn_distances)
    eps = knn_dist_sorted[int((X.shape[0] - 1) * percentage)]
    # Get neighbor graph
    similarity_matrix = radius_neighbors_graph(X, radius=eps)
    return similarity_matrix


def _get_symmetrically_normalized_adjacency_matrix(X: np.ndarray, n_neighbors: int = 10) -> scipy.sparse.csr_matrix:
    """
    Get a symmetrically normalized adjacency matrix of the kNN graph.

    Parameters
    ----------
    X : np.ndarray
        the given data set
    n_neighbors : int
        The number of neighbors, not including the object itself (default: 10)

    Returns
    -------
    similarity_matrix : scipy.sparse.csr_matrix
        The resulting similarity matrix
    """
    # Get neighbor graph
    W = kneighbors_graph(X, n_neighbors=n_neighbors, mode="distance", include_self=False, n_jobs=-1)
    W = 0.5 * (W + W.T)
    d = np.sum(W, axis=1)
    # Convert np.matrix to np.array
    d = np.array(d).reshape(-1)
    d = np.power(d, -0.5)
    D = scipy.sparse.diags(d)
    similarity_matrix = D @ W @ D
    return similarity_matrix


[docs]class SpecialK(BaseEstimator, ClusterMixin): """ Execute the SpecialK clustering procedure. SpecialK is able to autonomously identify a suitable number of clusters for spectral clustering procedures. Therefore, it uses probability bounds on the operator norm of centered, symmetric decompositions based on the matrix Bernstein concentration inequality. It iteratively increases the number of clusters until the probability of objects originating from two instead of one distribution does not increase further. Can be based on an epsilon-neighborhood adjacency matrix or the symmetrically normalized adjacency matrix of the kNN graph. Parameters ---------- significance : float Threshold to decide if the samples originate from a single distribution or two distributions (default: 0.01) n_dimensions : int Dimensionality of the embedding (default: 200) similarity_matrix : str Defines the similarity matrix to use. Can be one of the following strings or a numpy array / scipy sparse csr matrix. If 'NAM', a neighborhood adjacency matrix is used. If 'SAM' a symmetrically normalized adjacency matrix is used (default: 'NAM') n_neighbors : int Number of neighbors for the construction of the similarity matrix. Does not include the object itself (default: 10) percentage : float The amount of data points that should have at least n_neighbors neighbors. Only relevant if use_neighborhood_adj_mat is set to True (default: 0.99) n_cluster_pairs_to_consider : int The number of cluster pairs to consider when calculating the probability boundary. The cluster pairs responsible for the highest cut. If None, all pairs will be considered. Smaller values for n_cluster_pairs_to_consider will decrease the computing time (default: 10) max_n_clusters : int Maximum number of clusters. Must be larger than n_clusters_init (default: np.inf) random_state : np.random.RandomState | int use a fixed random state to get a repeatable solution (default: None) debug : bool If true, additional information will be printed to the console (default: False) Attributes ---------- n_clusters_ : int The final number of clusters labels_ : np.ndarray The final labels References ---------- Hess, Sibylle, and Wouter Duivesteijn. "k is the magic number—inferring the number of clusters through nonparametric concentration inequalities." Machine Learning and Knowledge Discovery in Databases: European Conference, ECML PKDD 2019, Würzburg, Germany, September 16–20, 2019, Proceedings, Part I. Springer International Publishing, 2020. """ def __init__(self, significance: float = 0.01, n_dimensions: int = 200, similarity_matrix: str = 'NAM', n_neighbors: int = 10, percentage: float = 0.99, n_cluster_pairs_to_consider: int = 10, max_n_clusters: int = np.inf, random_state: np.random.RandomState | int = None, debug: bool = False): self.significance = significance self.n_dimensions = n_dimensions self.similarity_matrix = similarity_matrix self.n_neighbors = n_neighbors self.percentage = percentage self.n_cluster_pairs_to_consider = n_cluster_pairs_to_consider self.max_n_clusters = max_n_clusters self.random_state = check_random_state(random_state) self.debug = debug
[docs] def fit(self, X: np.ndarray, y: np.ndarray = None) -> 'SpecialK': """ Initiate the actual clustering process on the input data set. The resulting cluster labels will be stored in the labels_ attribute. Parameters ---------- X : np.ndarray the given data set y : np.ndarray the labels (can be ignored) Returns ------- self : SpecialK this instance of the SpecialK algorithm """ n_clusters, labels = _specialk(X, self.significance, self.n_dimensions, self.similarity_matrix, self.n_neighbors, self.percentage, self.n_cluster_pairs_to_consider, self.max_n_clusters, self.random_state, self.debug) self.n_clusters_ = n_clusters self.labels_ = labels return self