Source code for clustpy.hierarchical.diana

"""
@authors:
Collin Leiber
"""

from sklearn.base import BaseEstimator, ClusterMixin
import numpy as np
from scipy.spatial.distance import pdist, squareform
from clustpy.hierarchical._cluster_tree import BinaryClusterTree
import copy


def _diana(X: np.ndarray, n_clusters: int, distance_threshold: float, construct_full_tree: bool, metric: str) -> (
        np.ndarray, BinaryClusterTree):
    """
    Start the actual DIANA clustering procedure on the input data set.
    
    Parameters
    ----------
    X : np.ndarray
        the given data set
    n_clusters : int
        The number of clusters (can be None)
    distance_threshold : float
        The distance thresholds defines the minimum diameter that is considered (can be 0)
    construct_full_tree : bool
        Defines whether the full tree should be constructed after n_clusters has been reached
    metric : str
        Metric used to compute the dissimilarity. Can be "euclidean", "l1", "l2", "manhattan", "cosine", or "precomputed" (see scipy.spatial.distance.pdist)

    Returns
    -------
    tuple : (np.ndarray, BinaryClusterTree)
        The final cluster labels,
        The resulting tree containing the cluster hierarchy
    """
    labels = np.zeros(X.shape[0], dtype=np.int32)
    final_labels = np.zeros(X.shape[0], dtype=np.int32)
    # Calculate pairwise distances (must only be done once)
    global_distance_matrix = squareform(pdist(X, metric=metric))
    # Start with a single cluster
    current_n_clusters = 1
    tree = BinaryClusterTree()
    while current_n_clusters < n_clusters or construct_full_tree:
        # Get cluster with maximum diameter (largest distance between two poinst within a cluster)
        split_cluster_id, cluster_distance_matrix = _get_cluster_with_max_diameter(
            global_distance_matrix, labels, current_n_clusters, distance_threshold)
        # Check if we only have clusters of size one or only clusters with diameter < distance_threshold
        if split_cluster_id is None:
            break
        else:
            # Split cluster by updating labels and tree
            labels_new = _split_cluster(cluster_distance_matrix, split_cluster_id, current_n_clusters)
            labels[labels == split_cluster_id] = labels_new
            tree.split_cluster(split_cluster_id)
            current_n_clusters += 1
        if current_n_clusters == n_clusters:
            # Save current labels in final labels -> relevant if n_clusters is specified and construct_full_tree is True
            final_labels = labels.copy()
    return final_labels, tree


def _get_cluster_with_max_diameter(global_distance_matrix: np.ndarray, labels: np.ndarray, n_clusters: int,
                                   distance_threshold: float) -> (int, np.ndarray):
    """
    Identify the cluster with the largest diameter, i.e. with the largest distance between two objects assigned to this cluster.
    Here, only diameters which are larger than distance_threshold are taken into account.
    If only clusters of size one occur or all diameters are below distance_threshold, all return values will be None.

    Parameters
    ----------
    global_distance_matrix : np.ndarray
        The global distance matrix containing the pairwise distances of all objects
    labels : np.ndarray
        The current cluster labels
    n_clusters : int
        The current number of clusters
    distance_threshold : float
        The distance thresholds defines the minimum diameter that is considered

    Returns
    -------
    tuple: (int, np.ndarray)
        The id of the cluster that should be split,
        The pariwise distances of all points within that cluster
    """
    max_diameter = -1
    split_cluster_id = None
    resulting_cluster_distance_matrix = None
    # Search cluster with largest diamter (two objects within a cluster with largest distance)
    for cluster_id in range(n_clusters):
        points = labels == cluster_id
        # Cluster must contain more than one object
        if np.sum(points) > 1:
            # Get pairwise distances of all points within cluster
            cluster_distance_matrix = global_distance_matrix[np.ix_(points, points)]
            diameter = np.max(cluster_distance_matrix)
            if diameter > max_diameter and diameter >= distance_threshold:
                # Save parameters of current cluster with largest diameter
                max_diameter = diameter
                split_cluster_id = cluster_id
                resulting_cluster_distance_matrix = cluster_distance_matrix
    return split_cluster_id, resulting_cluster_distance_matrix


def _split_cluster(cluster_distance_matrix: np.ndarray, split_cluster_id: int, new_cluster_id: int) -> np.ndarray:
    """
    Split the specified cluster into two.
    Therefore, it repeatedly calculates the average dissimilarity of the objects to the two subclusters.
    If the subclusters do not change for an iteration the splitting procedure terminates.

    Parameters
    ----------
    cluster_distance_matrix : np.ndarray
        The distance matrix of the specified cluster containing the pairwise distances of respective objects
    split_cluster_id: int
        The id of the cluster that should be split
    new_cluster_id : int
        The resulting id of the new cluster

    Returns
    -------
    labels_new : np.ndarray
        The updated cluster labels
    """
    # Create labels
    labels_new = np.zeros(cluster_distance_matrix.shape[0], dtype=np.int32) + split_cluster_id
    # Initialize sum of distances for second subcluster
    sum_distances_1 = np.sum(cluster_distance_matrix, axis=1)
    sum_distances_2 = np.zeros(cluster_distance_matrix.shape[0])
    splinter_group = np.array([np.argmax(sum_distances_1)])
    # Start splitting procedure
    size_group_1 = cluster_distance_matrix.shape[0] - 1
    size_group_2 = 0
    while splinter_group.shape[0] > 0:
        # Update labels
        labels_new[splinter_group] = new_cluster_id
        # Update sum of distances for each subcluster
        size_group_1 -= splinter_group.shape[0]
        size_group_2 += splinter_group.shape[0]
        sum_splinter_group = np.sum(cluster_distance_matrix[:, splinter_group], axis=1)
        sum_distances_1 -= sum_splinter_group
        sum_distances_2 += sum_splinter_group
        if size_group_1 > 0:
            # Get new splinter group (only checks objects of the original cluster)
            splinter_group = np.where((labels_new == split_cluster_id) &
                                      (sum_distances_1 / size_group_1 > sum_distances_2 / size_group_2))[0]
        else:
            break
    return labels_new


[docs]class Diana(BaseEstimator, ClusterMixin): """ The DIvisive ANAlysis (DIANA) clustering algorithm. DIANA build a top-down clustering hierarchy by considering pairwise dissimilarity of objects. It recursively splits the clusters with maximum dissimilarity, whereby the dissimilarity is based on a specified distance metric (e.g., Euclidean distance). Parameters ---------- n_clusters : int The number of clusters. If n_clusters is None the tree will be constructed until the max diamater is below distance_threshold (default: None) distance_threshold : float The distance thresholds defines the minimum diameter that is considered. Must be 0 if n_clusters is specified (default: 0) construct_full_tree : bool Defines whether the full tree should be constructed after n_clusters has been reached (default: False) metric : str Metric used to compute the dissimilarity. Can be "euclidean", "l1", "l2", "manhattan", "cosine", or "precomputed" (see scipy.spatial.distance.pdist) (default: euclidean) Attributes ---------- labels_ : np.ndarray The final labels tree_ : BinaryClusterTree The resulting cluster tree References ---------- Kaufman, Rousseeuw "Divisive Analysis (Program DIANA)" Chapter six from Finding Groups in Data: An Introduction to Cluster Analysis. 1990. """ def __init__(self, n_clusters: int = None, distance_threshold: float = 0, construct_full_tree: bool = False, metric: str = "euclidean"): self.n_clusters = n_clusters self.distance_threshold = distance_threshold self.construct_full_tree = construct_full_tree self.metric = metric
[docs] def fit(self, X: np.ndarray, y: np.ndarray = None) -> 'Diana': """ Initiate the actual clustering process on the input data set. The resulting cluster labels will be stored in the labels_ attribute. Parameters ---------- X : np.ndarray the given data set y : np.ndarray the labels (can be ignored) Returns ------- self : Diana this instance of the Diana algorithm """ assert self.n_clusters is None or self.distance_threshold == 0, "If n_clusters is set, distance_threshold must be 0. Else the number of identified clusters can be incorrect" if self.n_clusters is None or self.n_clusters > X.shape[0]: self.n_clusters = X.shape[0] labels, tree = _diana(X, self.n_clusters, self.distance_threshold, self.construct_full_tree, self.metric) self.labels_ = labels self.tree_ = tree return self
[docs] def flat_clustering(self, n_leaf_nodes_to_keep: int) -> np.ndarray: """ Transform the predicted labels into a flat clustering result by only keeping n_leaf_nodes_to_keep leaf nodes in the tree. Returns labels as if the clustering procedure would have stopped at the specified number of nodes. Note that each leaf node corresponds to a cluster. Parameters ---------- n_leaf_nodes_to_keep : int The number of leaf nodes to keep in the cluster tree Returns ------- labels_pruned : np.ndarray The new cluster labels """ assert self.labels_ is not None, "The DIANA algorithm has not run yet. Use the fit() function first." tree_copy = copy.deepcopy(self.tree_) labels_pruned = tree_copy.prune_to_n_leaf_nodes(n_leaf_nodes_to_keep, self.labels_) return labels_pruned