Source code for clustpy.partition.dipnsub

"""
@authors:
Collin Leiber
"""

import numpy as np
from clustpy.utils import dip_test, dip_pval, dip_pval_gradient
from clustpy.partition import UniDip
from sklearn.decomposition import PCA
from clustpy.partition.dipext import _angle, _n_starting_vectors_default, _ambiguous_modal_triangle_random
from sklearn.utils import check_random_state
from sklearn.base import BaseEstimator, ClusterMixin


def _dip_n_sub(X: np.ndarray, significance: float, threshold: float, step_size: float, momentum: float,
               n_starting_vectors: int, add_tails: bool, outliers: bool, consider_duplicates: bool,
               random_state: np.random.RandomState, debug: bool) -> (int, np.ndarray, np.ndarray):
    """
    Start the actual DipNSub clustering procedure on the input data set.

    Parameters
    ----------
    X : np.ndarray
        the given data set
    significance : float
            Threshold to decide if the result of the dip-test is unimodal or multimodal
    threshold : float
        Defines the amount of objects that must be contained in multimodal clusters for the algorithm to continue
    step_size : float
        Step size used for gradient descent
    momentum : float
        Momentum used for gradient descent
    n_starting_vectors : int
        The number of starting vectors for gradient descent
    add_tails : bool
        Defines if TailoredDip should try to add tails to the surrounding clusters
    outliers : bool
        Defines if outliers should be identified as described by UniDip
    consider_duplicates : bool
        If multiple instances on the projection axis share a value, the gradient is ambiguous. If those duplicate values should be considered a random instances will be choses for furhter calculations. Beware: The calculation will not be deterministic anymore
    random_state : np.random.RandomState
        use a fixed random state to get a repeatable solution
    debug : bool
        If true, additional information will be printed to the console

    Returns
    -------
    tuple : (int, np.ndarray, np.ndarray)
        The final number of clusters,
        The labels as identified by DipNSub,
        The resulting feature space (Number of samples x number of components)
    """
    assert significance >= 0 and significance <= 1, "significance must be a value in the range [0, 1]"
    assert threshold >= 0 and threshold <= 1, "threshold must be a value in the range [0, 1]"
    # Initial values
    n_clusters = 1
    labels = np.zeros(X.shape[0], dtype=np.int32)
    subspace = np.zeros((X.shape[0], 0))
    remaining_X = X
    cluster_sizes = np.array([X.shape[0]])
    # Find multimodal axes
    while remaining_X.shape[1] > 0:
        pvalues, projection, projected_data = _find_min_dippvalue_by_grouped_sgd(remaining_X, labels, n_clusters,
                                                                                 step_size, momentum,
                                                                                 n_starting_vectors,
                                                                                 cluster_sizes, consider_duplicates,
                                                                                 random_state, debug)
        # Check how many objects are contained in multimodal axes on the identified axis
        amount_multimodal_objects = np.sum((pvalues < significance) * cluster_sizes) / X.shape[0]
        if amount_multimodal_objects >= threshold:
            if debug:
                print(
                    "amount_multimodal_objects is {0} >= {1}. Use dimension {2} for clustering. pvalues = {3} / cluster_sizes = {4}".format(
                        amount_multimodal_objects, threshold, subspace.shape[1] + 1, pvalues, cluster_sizes))
            n_clusters_old = n_clusters
            for i in range(n_clusters_old):
                # Operation only sensible if pvalue is below significance level
                if pvalues[i] < significance:
                    points_in_cluster = np.where(labels == i)[0]
                    unidip = UniDip(outliers=outliers, significance=significance, add_tails=add_tails,
                                    pval_strategy="function", random_state=random_state)
                    unidip.fit(projected_data[points_in_cluster])
                    # Update cluster sizes
                    for j in range(unidip.n_clusters_):
                        new_clusters_size = unidip.labels_[unidip.labels_ == j].shape[0]
                        if j == 0:
                            cluster_sizes[i] = new_clusters_size
                        else:
                            cluster_sizes = np.append(cluster_sizes, new_clusters_size)
                    # Update labels
                    labels[points_in_cluster[unidip.labels_ == -1]] = -1
                    labels[points_in_cluster[unidip.labels_ == 0]] = i
                    labels[points_in_cluster[unidip.labels_ > 0]] = unidip.labels_[
                                                                        unidip.labels_ > 0] + n_clusters - 1
                    n_clusters += unidip.n_clusters_ - 1
            if debug:
                print("-> new n_clusters:", n_clusters)
            # Add dimension to subspace
            subspace = np.c_[subspace, projected_data]
            if subspace.shape[0] == X.shape[1]:
                break
            # Prepare next iteration
            orthogonal_space, _ = np.linalg.qr(projection.reshape(-1, 1), mode="complete")
            remaining_X = np.matmul(remaining_X, orthogonal_space[:, 1:])
        else:
            if debug:
                print(
                    "amount_multimodal_objects is {0} < {1}. Abort clustering. pvalues = {2} / cluster_sizes = {3}".format(
                        amount_multimodal_objects, threshold, pvalues, cluster_sizes))
            # Never return an empty subspace
            if subspace.shape[1] == 0:
                subspace = np.c_[subspace, projected_data]
            break
    return n_clusters, labels, subspace


def _find_min_dippvalue_by_grouped_sgd(X: np.ndarray, labels: np.ndarray, n_clusters: int, step_size: float,
                                       momentum: float, n_starting_vectors: int,
                                       cluster_sizes: np.ndarray, consider_duplicates: bool,
                                       random_state: np.random.RandomState, debug: bool) -> (
        np.ndarray, np.ndarray, np.ndarray):
    """
    Find the axes with n_starting_vectors highest weighted dip-p-values and start gradient descent from there.

    Parameters
    ----------
    X : np.ndarray
        the given data set
    labels : np.ndarray
        The current cluster labels
    n_clusters : int
        The current number of clusters
    step_size : float
        Step size used for gradient descent
    momentum : float
        Momentum used for gradient descent
    n_starting_vectors : int
        The number of starting vectors for gradient descent
    cluster_sizes : np.ndarray
        List containing the number of samples in each cluster
    consider_duplicates : bool
        If multiple instances on the projection axis share a value, the gradient is ambiguous. If those duplicate values should be considered a random instances will be choses for furhter calculations. Beware: The calculation will not be deterministic anymore
    random_state : np.random.RandomState
        use a fixed random state to get a repeatable solution
    debug : bool
        If true, additional information will be printed to the console

    Returns
    -------
    tuple : (np.ndarray, np.ndarray, np.ndarray)
        The best identified weighted dip-p-values,
        The corresponing projection axis responsible for the dip-p-values,
        The data projected onto that projection axis
    """
    # Get dip-p-value of each cluster on each axis
    axis_dips = [
        np.array([dip_test(X[labels == j, d], just_dip=True, is_data_sorted=False) for j in range(n_clusters)]) for d in
        range(X.shape[1])]
    axis_pvalues = [np.array(
        [dip_pval(d_inner, cluster_sizes[j], pval_strategy="function") for j, d_inner in enumerate(single_axis_dips)])
        for single_axis_dips in axis_dips]
    if X.shape[1] == 1:
        # Return axis_pvales and trivial projection
        return axis_pvalues[0], np.array([1]), X
    # Calculate weighted sum of p-values (use negative dip-value if all p-values are 0)
    sum_weighted_pvalues_per_axis = np.array([
        np.sum(axis_pvalues[j] * cluster_sizes) if np.sum(axis_pvalues[j]) != 0 else -np.sum(
            axis_dips[j] * cluster_sizes) for j in range(len(axis_pvalues))])
    # Sort axes by weighted sum of p-values
    argsorted_dimensions = np.argsort(sum_weighted_pvalues_per_axis)
    min_sum_weighted_pvalues = sum_weighted_pvalues_per_axis[argsorted_dimensions[0]]
    best_pvalues = axis_pvalues[argsorted_dimensions[0]]
    best_projection = np.zeros(X.shape[1])
    best_projection[argsorted_dimensions[0]] = 1
    best_projected_data = X[:, argsorted_dimensions[0]]
    n_equal_results = 1
    # Start from n_starting_vectors features (max is current total number of features)
    n_starting_vectors = min(n_starting_vectors, X.shape[1])
    # Include features from PCA
    pca = PCA(n_starting_vectors)
    pca.fit(X)
    for i in range(n_starting_vectors):
        for mode in ["axis", "pca"]:
            # Initial projection vector
            if mode == "axis":
                start_projection = np.zeros(X.shape[1])
                start_projection[argsorted_dimensions[i]] = 1
            else:
                start_projection = pca.components_[i]
            pvalues, projection, projected_data, sum_weighted_pvalues = _find_min_dippvalue_by_grouped_sgd_with_start(X,
                                                                                                                      labels,
                                                                                                                      n_clusters,
                                                                                                                      start_projection,
                                                                                                                      step_size,
                                                                                                                      momentum,
                                                                                                                      cluster_sizes,
                                                                                                                      consider_duplicates,
                                                                                                                      random_state,
                                                                                                                      debug)
            # In rare cases the weighted p-values can be equal to current best result -> pick better result randomly
            if sum_weighted_pvalues == min_sum_weighted_pvalues:
                n_equal_results += 1
                random_number = random_state.rand()
                if debug and not np.array_equal(projection, best_projection):
                    # Only used for prints
                    picked_pvalues = (pvalues, best_pvalues) if random_number <= 1 / n_equal_results else (
                        best_pvalues, pvalues)
                    picked_projection = (projection, best_projection) if random_number <= 1 / n_equal_results else (
                        best_projection, projection)
                    print(
                        "--> Current and best costs are both {0}. Therefore, pvalues = {1}, projection = {2} was randomly picked instead of pvalues = {3}, projection = {4}.".format(
                            sum_weighted_pvalues, picked_pvalues[0], picked_projection[0], picked_pvalues[1],
                            picked_projection[1]))
            if sum_weighted_pvalues < min_sum_weighted_pvalues or (
                    sum_weighted_pvalues == min_sum_weighted_pvalues and random_number <= 1 / n_equal_results):
                min_sum_weighted_pvalues = sum_weighted_pvalues
                best_pvalues = pvalues
                best_projection = projection
                best_projected_data = projected_data
    return best_pvalues, best_projection, best_projected_data


def _find_min_dippvalue_by_grouped_sgd_with_start(X: np.ndarray, labels: np.ndarray, n_clusters: int,
                                                  projection: np.ndarray, step_size: float, momentum: float,
                                                  cluster_sizes: np.ndarray, consider_duplicates: bool,
                                                  random_state: np.random.RandomState,
                                                  debug: bool) -> (np.ndarray, np.ndarray, np.ndarray, float):
    """
    Perform gradient descent to find the projection vector with the minimum weighted dip-p-value.

    Parameters
    ----------
    X : np.ndarray
        the given data set
    labels : np.ndarray
        The current cluster labels
    n_clusters : int
        The current number of clusters
    projection : np.ndarray
        The starting projection axis
    step_size : float
        Step size used for gradient descent
    momentum : float
        Momentum used for gradient descent
    cluster_sizes : np.ndarray
        List containing the number of samples in each cluster
    consider_duplicates : bool
        If multiple instances on the projection axis share a value, the gradient is ambiguous. If those duplicate values should be considered a random instances will be choses for furhter calculations. Beware: The calculation will not be deterministic anymore
    random_state : np.random.RandomState
        use a fixed random state to get a repeatable solution
    debug : bool
        If true, additional information will be printed to the console

    Returns
    -------
    tuple : (np.ndarray, np.ndarray, np.ndarray, float)
        The best identified weighted dip-p-values,
        The corresponing projection axis responsible for the dip-p-values,
        The data projected onto that projection axis,
        The sum of the weighted dip-p-values
    """
    # Initial values
    total_angle = 0
    best_projection = None
    best_projected_data = None
    best_pvalues = None
    direction = np.zeros(X.shape[1])
    min_sum_weighted_pvalues = np.inf
    n_equal_results = 1
    # Perform SGD
    while True:
        # Ensure unit vector
        projection = projection / np.linalg.norm(projection)
        gradient, dip_values, projected_data = _get_min_dippvalue_using_grouped_gradient(X, labels, n_clusters,
                                                                                         projection, cluster_sizes,
                                                                                         consider_duplicates,
                                                                                         random_state)
        # Calculate p-values
        vecotrize_pvalue = np.vectorize(dip_pval, excluded=["pval_strategy"])
        pvalues = vecotrize_pvalue(dip_values, cluster_sizes, pval_strategy="function")
        sum_weighted_pvalues = np.sum(pvalues * cluster_sizes)
        # Use negative dip-value if all p-values are 0
        if sum_weighted_pvalues == 0:
            sum_weighted_pvalues = -np.sum(dip_values * cluster_sizes)
        # In rare cases the weighted p-values can be equal to current best result -> pick better result randomly
        if sum_weighted_pvalues == min_sum_weighted_pvalues:
            n_equal_results += 1
            random_number = random_state.rand()
            if debug and not np.array_equal(projection, best_projection):
                # Only used for prints
                picked_pvalues = (pvalues, best_pvalues) if random_number <= 1 / n_equal_results else (
                    best_pvalues, pvalues)
                picked_projection = (projection, best_projection) if random_number <= 1 / n_equal_results else (
                    best_projection, projection)
                print(
                    "--> Current and best costs are both {0}. Therefore, pvalues = {1}, projection = {2} was randomly picked instead of pvalues = {3}, projection = {4}.".format(
                        sum_weighted_pvalues, picked_pvalues[0], picked_projection[0], picked_pvalues[1],
                        picked_projection[1]))
        if sum_weighted_pvalues < min_sum_weighted_pvalues or (
                sum_weighted_pvalues == min_sum_weighted_pvalues and random_number <= 1 / n_equal_results):
            min_sum_weighted_pvalues = sum_weighted_pvalues
            best_pvalues = pvalues
            best_projection = projection
            best_projected_data = projected_data
        # Update parameters
        direction = momentum * direction + step_size * gradient
        new_projection = projection + direction
        new_angle = _angle(projection, new_projection)
        total_angle += new_angle
        projection = new_projection
        # We converge if the projection vector barely moves anymore and has no intention (momentum) to do so in the future
        if (new_angle <= 0.1 and np.linalg.norm(direction) < 0.1) or total_angle > 360:
            break
    return best_pvalues, best_projection, best_projected_data, min_sum_weighted_pvalues


def _get_min_dippvalue_using_grouped_gradient(X: np.ndarray, labels: np.ndarray, n_clusters: int,
                                              projection_vector: np.ndarray, cluster_sizes: np.ndarray,
                                              consider_duplicates: bool, random_state: np.random.RandomState) -> (
        np.ndarray, np.ndarray, np.ndarray):
    """
    Use current projection_vector to calculate the dip-value and a corresponding modal_triangle.
    The modal_triangle is then used to calculate the gradient of the used projection axis.

    Parameters
    ----------
    X : np.ndarray
        the given data set
    labels : np.ndarray
        The current cluster labels
    n_clusters : int
        The current number of clusters
    projection_vector : np.ndarray
        The current projection axis
    cluster_sizes : np.ndarray
        List containing the number of samples in each cluster
    consider_duplicates : bool
        If multiple instances on the projection axis share a value, the gradient is ambiguous. If those duplicate values should be considered a random instances will be choses for furhter calculations. Beware: The calculation will not be deterministic anymore
    random_state : np.random.RandomState
        use a fixed random state to get a repeatable solution

    Returns
    -------
    tuple : (np.ndarray, np.ndarray, np.ndarray)
        The gradient,
        List containing the dip-value of each cluster,
        The data set projected onto the projection axis
    """
    # Project data (making a univariate sample)
    projected_data = np.matmul(X, projection_vector)
    # Create result objects
    dip_values = np.zeros(n_clusters)
    gradient = np.zeros(X.shape[1])
    for i in range(n_clusters):
        points_in_cluster = (labels == i)
        if np.sum(points_in_cluster) < 4:
            # Clusters can in theory get very small
            continue
        sorted_indices = np.argsort(projected_data[points_in_cluster])
        sorted_projected_data_in_cluster = projected_data[points_in_cluster][sorted_indices]
        dip_value, _, modal_triangle = dip_test(sorted_projected_data_in_cluster, just_dip=False,
                                                is_data_sorted=True)
        if modal_triangle[0] == -1:
            continue
        if consider_duplicates:
            # If duplicate values should be considered, get random ordering of the objects
            sorted_indices = _ambiguous_modal_triangle_random(sorted_projected_data_in_cluster, sorted_indices,
                                                              modal_triangle, random_state)
        # Calculate the partial derivative for all dimensions regarding this cluster
        gradient_tmp = -dip_pval_gradient(X[points_in_cluster], projected_data[points_in_cluster],
                                          sorted_indices, modal_triangle, dip_value)
        dip_values[i] = dip_value
        # Weight gradient by cluster size
        gradient = gradient + cluster_sizes[i] * gradient_tmp
    gradient = gradient / X.shape[0]
    return gradient, dip_values, projected_data


[docs]class DipNSub(BaseEstimator, ClusterMixin): """ Execute the Dip`n`Sub clustering procedure. It searches for projection axes in which as many samples as possible are part of multimodal clusters. Therefore, it uses the gradient of the p-value of the Dip-test to perform stochastic gradient descent. On the identified axes TailoredDip, an extension of UniDip, will be executed to assign cluster labels. Note, that clusters will always form a hypercube in the resulting subspace. Parameters ---------- significance : float Threshold to decide if the result of the dip-test is unimodal or multimodal (default: 0.01) threshold : float Defines the amount of objects that must be contained in multimodal clusters for the algorithm to continue (default: 0.15) step_size : float Step size used for gradient descent (default: 0.1) momentum : float Momentum used for gradient descent (default: 0.95) n_starting_vectors : int The number of starting vectors for gradient descent. Can be None, in that case it will be equal to np.log(data dimensionality) + 1 (default: None) add_tails : bool Defines if TailoredDip should try to add tails to the surrounding clusters (default: True) outliers : bool Defines if outliers should be identified as described by UniDip (default: False) consider_duplicates : bool If multiple instances on the projection axis share a value, the gradient is ambiguous. If those duplicate values should be considered a random instances will be choses for furhter calculations. Beware: The calculation will not be deterministic anymore (default: False) random_state : np.random.RandomState | int use a fixed random state to get a repeatable solution. Can also be of type int (default: None) debug : bool If true, additional information will be printed to the console (default: False) Attributes ---------- n_clusters_ : int The final number of clusters labels_ : np.ndarray The final labels subspace_: np.ndarray The resulting subspace References ---------- Bauer, Lena, et al. "Extension of the Dip-test Repertoire - Efficient and Differentiable p-value Calculation for Clustering." Proceedings of the 2023 SIAM International Conference on Data Mining (SDM). Society for Industrial and Applied Mathematics, 2023. """ def __init__(self, significance: float = 0.01, threshold: float = 0.15, step_size: float = 0.1, momentum: float = 0.95, n_starting_vectors: int = None, add_tails=True, outliers=False, consider_duplicates: bool = False, random_state: np.random.RandomState | int = None, debug: bool = False): self.significance = significance self.threshold = threshold self.step_size = step_size self.momentum = momentum self.n_starting_vectors = n_starting_vectors self.add_tails = add_tails self.outliers = outliers self.consider_duplicates = consider_duplicates self.random_state = check_random_state(random_state) self.debug = debug
[docs] def fit(self, X: np.ndarray, y: np.ndarray = None) -> 'DipNSub': """ Initiate the actual clustering process on the input data set. The resulting cluster labels will be stored in the labels_ attribute. Parameters ---------- X : np.ndarray the given data set y : np.ndarray the labels (can be ignored) Returns ------- self : DipNSub this instance of the DipNSub algorithm """ if self.n_starting_vectors is None: self.n_starting_vectors = _n_starting_vectors_default(X.shape[1]) n_clusters, labels, subspace = _dip_n_sub(X, significance=self.significance, threshold=self.threshold, step_size=self.step_size, momentum=self.momentum, n_starting_vectors=self.n_starting_vectors, add_tails=self.add_tails, outliers=self.outliers, consider_duplicates=self.consider_duplicates, random_state=self.random_state, debug=self.debug) self.n_clusters_ = n_clusters self.labels_ = labels self.subspace_ = subspace return self