Source code for clustpy.partition.dipmeans

"""
@authors:
Collin Leiber
"""

import numpy as np
from scipy.spatial.distance import pdist, squareform
from clustpy.utils import dip_test, dip_pval, dip_boot_samples
from clustpy.partition.xmeans import _initial_kmeans_clusters, _execute_two_means
from sklearn.base import BaseEstimator, ClusterMixin
from sklearn.utils import check_random_state


def _dipmeans(X: np.ndarray, significance: float, split_viewers_threshold: float, pval_strategy: str, n_boots: int,
              n_split_trials: int, n_clusters_init: int, max_n_clusters: int, random_state: np.random.RandomState) -> (
        int, np.ndarray, np.ndarray):
    """
    Start the actual DipMeans clustering procedure on the input data set.

    Parameters
    ----------
    X : np.ndarray
        the given data set
    significance : float
        Threshold to decide if the result of the dip-test is unimodal or multimodal
    split_viewers_threshold : float
        Threshold to decide whether a cluster has a unimodal or multimodal structure. Must be within [0, 1]
    pval_strategy : str
        Defines which strategy to use to receive dip-p-vales. Possibilities are 'table', 'function' and 'bootstrap'
    n_boots : int
        Number of bootstraps used to calculate dip-p-values. Only necessary if pval_strategy is 'bootstrap'
    n_split_trials : int
        Number tries to split a cluster. For each try 2-KMeans is executed with different cluster centers
    n_clusters_init : int
        The initial number of clusters. Can also by of type np.ndarray if initial cluster centers are specified
    max_n_clusters : int
        Maximum number of clusters. Must be larger than n_clusters_init
    random_state : np.random.RandomState
        use a fixed random state to get a repeatable solution

    Returns
    -------
    tuple : (int, np.ndarray, np.ndarray)
        The final number of clusters,
        The labels as identified by DipMeans,
        The cluster centers as identified by DipMeans
    """
    assert max_n_clusters >= n_clusters_init, "max_n_clusters can not be smaller than n_clusters_init"
    assert significance >= 0 and significance <= 1, "significance must be a value in the range [0, 1]"
    # Calculate distance matrix
    data_dist_matrix = squareform(pdist(X, 'euclidean'))
    # Initialize parameters
    n_clusters, labels, centers, _ = _initial_kmeans_clusters(X, n_clusters_init, random_state)
    while n_clusters <= max_n_clusters:
        # Default score is 0 for all clusters
        cluster_scores = np.zeros(n_clusters)
        ids_in_each_cluster = [np.where(labels == c)[0] for c in range(n_clusters)]
        for c in range(n_clusters):
            ids_in_cluster = ids_in_each_cluster[c]
            # Get pairwise distances of points in cluster
            cluster_dist_matrix = data_dist_matrix[np.ix_(ids_in_cluster, ids_in_cluster)]
            # Calculate dip values for the distances of each point
            cluster_dips = np.array([dip_test(cluster_dist_matrix[p, :], just_dip=True, is_data_sorted=False) for p in
                                     range(ids_in_cluster.shape[0])])
            # Calculate p-values
            if pval_strategy == "bootstrap":
                # Bootstrap values here so it is not needed for each pval separately
                boot_dips = dip_boot_samples(ids_in_cluster.shape[0], n_boots, random_state)
                cluster_pvals = np.array([np.mean(point_dip <= boot_dips) for point_dip in cluster_dips])
            else:
                cluster_pvals = np.array([dip_pval(point_dip, ids_in_cluster.shape[0], pval_strategy=pval_strategy,
                                                   random_state=random_state) for point_dip in cluster_dips])
            # Get split viewers (points with dip-p-value of < significance)
            split_viewers = cluster_dips[cluster_pvals < significance]
            # Check if percentage share of split viewers in cluster is larger than threshold
            if split_viewers.shape[0] / ids_in_cluster.shape[0] > split_viewers_threshold:
                # Calculate cluster score
                cluster_scores[c] = np.mean(split_viewers)
        # Get cluster with maximum score
        cluster_id_to_split = np.argmax(cluster_scores)
        # Check if any cluster has to be split
        if cluster_scores[cluster_id_to_split] > 0:
            # Split cluster using bisecting kmeans
            labels, centers, _ = _execute_two_means(X, ids_in_each_cluster, cluster_id_to_split, centers,
                                                    n_split_trials, random_state)
            n_clusters += 1
        else:
            break
    return n_clusters, labels, centers


[docs]class DipMeans(BaseEstimator, ClusterMixin): """ Execute the DipMeans clustering procedure. In contrast to other algorithms (e.g. KMeans) DipMeans is able to identify the number of clusters by itself. Therefore, it uses the dip-dist criterion. It calculates the dip-value of the distances of each point within a cluster to all other points in that cluster and checks how many points are assigned a dip-value below the threshold. If that amount of so called split viewers is above the split_viewers_threshold, the cluster will be split using 2-Means. The algorithm terminates if all clusters show a unimdoal behaviour. Parameters ---------- significance : float Threshold to decide if the result of the dip-test is unimodal or multimodal (default: 0.001) split_viewers_threshold : float Threshold to decide whether a cluster has a unimodal or multimodal structure. Must be within [0, 1] (default: 0.01) pval_strategy : str Defines which strategy to use to receive dip-p-vales. Possibilities are 'table', 'function' and 'bootstrap' (default: 'table') n_boots : int Number of bootstraps used to calculate dip-p-values. Only necessary if pval_strategy is 'bootstrap' (default: 1000) n_split_trials : int Number tries to split a cluster. For each try 2-KMeans is executed with different cluster centers (default: 10) n_clusters_init : int The initial number of clusters. Can also by of type np.ndarray if initial cluster centers are specified (default: 1) max_n_clusters : int Maximum number of clusters. Must be larger than n_clusters_init (default: np.inf) random_state : np.random.RandomState | int use a fixed random state to get a repeatable solution. Can also be of type int (default: None) Attributes ---------- n_clusters_ : int The final number of clusters labels_ : np.ndarray The final labels cluster_centers_ : np.ndarray The final cluster centers References ---------- Kalogeratos, Argyris, and Aristidis Likas. "Dip-means: an incremental clustering method for estimating the number of clusters." Advances in neural information processing systems. 2012. """ def __init__(self, significance: float = 0.001, split_viewers_threshold: float = 0.01, pval_strategy: str = "table", n_boots: int = 1000, n_split_trials: int = 10, n_clusters_init: int = 1, max_n_clusters: int = np.inf, random_state: np.random.RandomState | int = None): self.significance = significance self.split_viewers_threshold = split_viewers_threshold self.pval_strategy = pval_strategy self.n_boots = n_boots self.n_split_trials = n_split_trials self.n_clusters_init = n_clusters_init self.max_n_clusters = max_n_clusters self.random_state = check_random_state(random_state)
[docs] def fit(self, X: np.ndarray, y: np.ndarray = None) -> 'DipMeans': """ Initiate the actual clustering process on the input data set. The resulting cluster labels will be stored in the labels_ attribute. Parameters ---------- X : np.ndarray the given data set y : np.ndarray the labels (can be ignored) Returns ------- self : DipMeans this instance of the DipMeans algorithm """ n_clusters, labels, centers = _dipmeans(X, self.significance, self.split_viewers_threshold, self.pval_strategy, self.n_boots, self.n_split_trials, self.n_clusters_init, self.max_n_clusters, self.random_state) self.n_clusters_ = n_clusters self.labels_ = labels self.cluster_centers_ = centers return self