Source code for clustpy.hierarchical.dctree_clusterer

"""
@authors:
Pascal Weber
"""

# - Author: Pascal Weber
# - Source: https://github.com/pasiweber/SHADE

from __future__ import annotations
import numpy as np
import sys
from clustpy.utils.dctree import DCTree, _DCNode
from typing import Optional
from sklearn.base import ClusterMixin, BaseEstimator
from clustpy.utils.checks import check_parameters


sys.setrecursionlimit(1000000000)


[docs]class DCTree_Clusterer(ClusterMixin, BaseEstimator): """ The DCTree clustering algorithm. Identifies stable nodes within the DCTree and labels the data accordingly. Parameters ---------- min_points : int the minimum number of points (default: 5) use_less_memory: bool Use less memory when constructing the DCTree. This will, however, increase the runtime (default: False) Attributes ---------- n_clusters_ : int The final number of clusters labels_ : np.ndarray The final labels dc_tree_ : BinaryClusterTree The resulting cluster tree n_features_in_ : int the number of features used for the fitting References ---------- SHADE: Deep Density-based Clustering Anna Beer; Pascal Weber; Lukas Miklautz; Collin Leiber; Walid Durani; Christian Böhm IEEE International Conference on Data Mining (ICDM), Abu Dhabi, United Arab Emirates, 2024, pp. 675-680, doi: 10.1109/ICDM59182.2024. """ def __init__(self, min_points: int = 5, use_less_memory: bool = False): self.min_points = min_points self.use_less_memory = use_less_memory
[docs] def fit(self, X: np.ndarray, y: np.ndarray=None) -> 'DCTree_Clusterer': """ Initiate the actual clustering process on the input data set. The resulting cluster labels will be stored in the labels_ attribute. Parameters ---------- X : np.ndarray the given data set y : np.ndarray the labels (can be ignored) Returns ------- self : DCTree_Clusterer this instance of the DCTree_Clusterer algorithm """ X, _, _ = check_parameters(X=X, y=y) self.dc_tree_ = DCTree(X, min_points=self.min_points, use_less_memory=self.use_less_memory) condensed_root = self._condense(self.dc_tree_.root) stable_nodes = self._get_stable_nodes(condensed_root) labels = np.full(self.dc_tree_.n, -1 if stable_nodes else 0, dtype=np.int32) self.n_clusters_ = len(stable_nodes) if stable_nodes else 1 for idx, node in enumerate(stable_nodes): labels[node.leaves] = idx self.labels_ = labels self.n_features_in_ = X.shape[1] return self
def _condense(self, node: Optional[_DCNode]) -> Optional[_DCNode]: """ Condense the tree to nodes, where both children contain at least min_points leaves. Uses a recursive strategy that checks each node separately. Parameters ---------- node : _DCNode the node that is checked for its children Returns ------- condensed_node : _DCNode either a condensed node or None """ if node is None or len(node.leaves) < self.min_points: return None # Process children L = self._condense(node.left) R = self._condense(node.right) # If both children have min_points children, keep this branch. size_L = len(node.left.leaves) if node.left is not None else 0 size_R = len(node.right.leaves) if node.right is not None else 0 if size_L >= self.min_points and size_R >= self.min_points: if (L is not None and R is not None) or (L is None and R is None): return _DCNode(node.id, node.dist, node.leaves, L, R) elif L is not None: return L else: return R elif L is not None: return _DCNode(node.id, L.dist, node.leaves, L.left, L.right) elif R is not None: return _DCNode(node.id, R.dist, node.leaves, R.left, R.right) return None def _get_stable_nodes(self, node: Optional[_DCNode], parent_dist: float = None) -> list: """ Identify stable nodes in the tree. Parameters ---------- node : _DCNode the node that is checked. parent_dist : float Distance in the parent node. Can be None in the case of the root (default: None) Returns ------- stable_nodes : list list of stable nodes """ if node is None: # In case root is None return [] node.stability_ = (1.0/node.dist - 1.0/parent_dist) * len(node.leaves) if parent_dist is not None else 0 # Calculate stability for children sum_child_stabilities = 0 child_results = [] for child in [node.left, node.right]: if child is not None: child_results += self._get_stable_nodes(child, node.dist) sum_child_stabilities += child.stability_ # Flag stable nodes if node.stability_ >= sum_child_stabilities: stable_nodes = [node] else: node.stability_ = sum_child_stabilities stable_nodes = child_results return stable_nodes