"""
@authors:
Pascal Weber
"""
# Implementation of the dc-distance with a DCTree by
# - Author: Pascal Weber
# - Source: https://github.com/pasiweber/SHADE
# Paper: Connecting the Dots -- Density-Connectivity Distance unifies DBSCAN, k-Center and Spectral Clustering
# Authors: Anna Beer, Andrew Draganov, Ellen Hohma, Philipp Jahn, Christian M.M. Frey, and Ira Assent
# Link: https://doi.org/10.1145/3580305.3599283
from __future__ import annotations
import numpy as np
from typing import List, Optional, Sequence, Tuple, Union
from scipy.spatial.distance import pdist, cdist, squareform
[docs]class DCTree:
"""
The DCTree computes the dc_distances and stores them in a tree structure.
By using the euler tour of the tree and a sparse table for range minimum queries,
the lca-elements in the tree (i.e. the dc_distances) can be computed in O(1) time.
The function `dc_dist(i, j)` returns the dc_distance between `points[i]` and `points[j]`
in O(1) time.
The function `dc_distance(idx_X, idx_Y=None, access_method="tree")` returns a dc_distance matrix
with the dc_distance from each pair of `points[idx_X]` and `points[idx_Y]`.
See the section Functions for more details.
Parameters
----------
X : np.ndarray
points, of which the dc_distances should be computed of.
min_points : int, optional
min_points parameter used for the computation of the dc_distances (default: 5).
use_less_memory: bool
Use less memory when constructing the DCTree.
This will, however, increase the runtime (default: False).
Functions
---------
DCTree[x]:
Returns the _DCNode of given index.
dc_dist : (i, j) -> distance
returns the dc_distance between points[i] and points[j] in O(1) time.
dc_distances : (idx_X, idx_Y) -> np.ndarray
Computes the dc_distance matrix between each pair of points[idx_X] and points[idx_Y].
Returns dc_dists as ndarray of shape (n_samples_X, n_samples_Y)
Examples
--------
>>> points = np.array([[1,6],[2,6],[6,2],[14,17],[123,3246],[52,8323],[265,73]])
>>> dc_tree = dcdist.DCTree(points, 5)
>>> print(dc_tree.dc_dist(2,5))
>>> print(dc_tree.dc_distances(range(len(points))))
>>> print(dc_tree.dc_distances([0,1], [2,3]))
References
----------
Anna Beer, Andrew Draganov, Ellen Hohma, Philipp Jahn, Christian M.M. Frey, and Ira Assent.
"Connecting the Dots -- Density-Connectivity Distance unifies DBSCAN, k-Center and Spectral
Clustering." KDD '23. 2023.
"""
def __init__(
self,
X: np.ndarray,
min_points: int = 5,
use_less_memory: bool = False
):
self.n = X.shape[0]
self.min_points = min_points
if not use_less_memory:
# Calculate pair-wise reachability distance
X = reachability_distances(X, min_points)
mst_edges = minimum_spanning_tree_prims(X, use_less_memory=use_less_memory, min_points=min_points)
self.root = self._build_tree(mst_edges)
self._init_fast_index()
def _build_tree(self, mst_edges: np.ndarray) -> _DCNode:
"""
Build the final DCTree using the MST edges.
Parameters
----------
mst_edges : np.ndarray
The edges of the MST, including the dc-distances
Returns
-------
new_root_node : _DCNode
The root node of the tree
"""
mst_edges.sort(order="dist")
# Create leaf nodes
leaf_nodes = [_DCNode(idx, 0.0, [idx]) for idx in range(self.n)]
idx = self.n
for i, j, dist in mst_edges:
root_i = self._get_root(leaf_nodes[i])
root_j = self._get_root(leaf_nodes[j])
new_leaves = root_i.leaves + root_j.leaves
new_root_node = _DCNode(
id=idx,
dist=dist,
left=root_i,
right=root_j,
leaves=new_leaves,
)
root_i.parent = new_root_node
root_i.root = new_root_node
root_j.parent = new_root_node
root_j.root = new_root_node
idx += 1
return new_root_node
def _get_root(self, node: _DCNode) -> '_DCNode':
"""
Get the root of a node and update all root_ entries that have been visited.
Parameters
----------
node : _DCNode
The input node
Returns
-------
root_node : _DCNode
The root node
"""
root_node = node
while root_node.root is not None:
root_node = root_node.root
# Override old roots
if root_node is not node:
current_node = node
while current_node.root != root_node:
next_node = current_node.root
current_node.root = root_node
current_node = next_node
return root_node
def _init_fast_index(self) -> None:
"""
Build a fast index structure for the DCTree.
"""
n_nodes = 2 * self.n - 1
self.euler = []
self.level = []
self.f_occur = [None] * n_nodes
# Euler tour to get the euler, level, and f_occur lists in O(n) time.
DOWN, UP = 0, 1
stack = [(self.root, 0, DOWN)] # (node, level, DOWN / UP)
while len(stack) > 0:
(node, level, status) = stack.pop()
if status == DOWN:
if self.f_occur[node.id] is None:
self.f_occur[node.id] = len(self.euler)
self.euler.append(node)
self.level.append(level)
if node.right is not None:
stack.append((node, level, UP))
stack.append((node.right, level + 1, DOWN))
if node.left is not None:
stack.append((node, level, UP))
stack.append((node.left, level + 1, DOWN))
elif status == UP:
self.euler.append(node)
self.level.append(level)
# SparseTable structure which finds the index of the minimum value within the range [l,r] in self.level
levels = np.array(self.level, dtype=int)
n_elements = len(levels)
log_n = n_elements.bit_length()
self.sparse_table = np.zeros((log_n, n_elements), dtype=int)
self.sparse_table[0] = np.arange(n_elements)
for j in range(1, log_n):
stride = 1 << (j - 1)
limit = n_elements - (1 << j) + 1
if limit <= 0:
break
left_indices = self.sparse_table[j - 1, :limit]
right_indices = self.sparse_table[j - 1, stride : stride + limit]
choose_left = levels[left_indices] <= levels[right_indices]
self.sparse_table[j, :limit] = np.where(choose_left, left_indices, right_indices)
def __getitem__(
self,
point_idx: Union[int, Sequence[int], np.ndarray]
) -> Union[_DCNode, List[_DCNode]]:
"""
Returns the _DCNode of given index if `point_idx` is an integer or a list of _DCNodes if `point_idx' is a Sequence.
Parameters
----------
point_idx : Union[int, Sequence[int], np.ndarray]
The input parameter as described above
Returns
-------
result : Union[_DCNode, List[_DCNode]]
The querried nodes in the tree
"""
if isinstance(point_idx, int):
return self.euler[self.f_occur[point_idx]]
elif isinstance(point_idx, (Sequence, np.ndarray)):
return [self.euler[self.f_occur[i]] for i in point_idx]
else:
raise IndexError(f"`{point_idx}` needs to be an integer, Sequence or np.ndarray!")
[docs] def dc_dist(self, i: int, j: int) -> float:
"""
Returns the dc_distance between points[i] and points[j] in O(1) time.
Parameters
----------
i : int
index of the first point
j : int
index of the second point
Returns
-------
dc_distance : float
The dc_distance of the two points
"""
if i == j:
return 0.0
l = self.f_occur[i]
r = self.f_occur[j]
if l > r:
l, r = r, l
k = (r - l + 1).bit_length() - 1
idx_l = self.sparse_table[k, l]
idx_r = self.sparse_table[k, r - (1 << k) + 1]
ancestor = self.euler[idx_l if self.level[idx_l] <= self.level[idx_r] else idx_r]
dc_distance = ancestor.dist
return dc_distance
[docs] def dc_distances(
self,
idx_X: Union[Sequence[int], np.ndarray, None] = None,
idx_Y: Union[Sequence[int], np.ndarray, None] = None,
access_method: str = "tree",
) -> np.ndarray:
"""
Computes the dc_distance matrix between each pair of points[idx_X] and points[idx_Y].
If idx_X=None, idx_X=range(n) is used.
If idx_Y=None, idx_Y=idx_X is used.
Parameters
----------
idx_X : Union[Sequence[int], np.ndarray, None]
the first set of indices
idx_Y : Union[Sequence[int], np.ndarray, None]
the second set of indices
access_method : str
"tree": traverses the tree in O(n) time (n = len(points)), no matter the size of X / Y.
"dc_dist": uses the dc_dist function and needs O(k*l) time (k = len(X), l = len(Y))).
(default: "tree")
Returns
-------
dc_dists : np.array
ndarray of shape (n_samples_X, n_samples_Y) containing the distances
"""
if idx_X is None:
idx_X = range(self.n)
if idx_Y is None:
idx_Y = idx_X
dc_dists = np.zeros((len(idx_X), len(idx_Y)))
if access_method == "dc_dist":
# Get distances from the index structure
for i in range(len(idx_X)):
start_index = i + 1 if idx_X is idx_Y else 0
for j in range(start_index, len(idx_Y)):
dc_dists[i, j] = self.dc_dist(idx_X[i], idx_Y[j])
elif access_method == "tree":
# Get distances from the tree
idx_rev_X = np.full(self.n, -1, dtype=int)
idx_rev_X[idx_X] = range(len(idx_X))
if idx_X is idx_Y:
idx_rev_Y = idx_rev_X
else:
idx_rev_Y = np.full(self.n, -1, dtype=int)
idx_rev_Y[idx_Y] = range(len(idx_Y))
node_list = [self.root]
while len(node_list) > 0:
node = node_list.pop()
if node.left is not None:
node_list.append(node.left)
if node.right is not None:
node_list.append(node.right)
if node.left is not None and node.right is not None:
i_leaves = node.left.leaves
j_leaves = node.right.leaves
(i_, j_) = (idx_rev_X[i_leaves], idx_rev_Y[j_leaves])
(i_, j_) = (i_[i_ != -1], j_[j_ != -1])
if len(i_) > 0 and len(j_) > 0:
dc_dists[(i_[:, np.newaxis], j_[np.newaxis, :])] = node.dist
# In case of asymmetric call
if idx_X is not idx_Y:
(i_, j_) = (idx_rev_Y[i_leaves], idx_rev_X[j_leaves])
(i_, j_) = (i_[i_ != -1], j_[j_ != -1])
if len(i_) > 0 and len(j_) > 0:
dc_dists[(j_[:, np.newaxis], i_[np.newaxis, :])] = node.dist
else:
raise ValueError(f"'{access_method}' is no valid `access_method`")
if idx_X is idx_Y:
# Mirror values
dc_dists = dc_dists + dc_dists.T
return dc_dists
def _traverse_until_k_clusters(self, n_clusters: int) -> List[_DCNode]:
"""
Traverse the tree to identify n_clusters nodes that minimize the maximum within-cluster distance.
Parameters
----------
n_clusters : int
the number of desired clusters
Returns
-------
result_nodes : List[_DCNode]
List of nodes, where each node represents the root of a cluster
"""
assert n_clusters <= self.n, "n_clusters can not be larger than the number of input points"
node_list = [self.root]
result_nodes = []
id_threshold = 2 * self.n - n_clusters - 1
while len(node_list) > 0:
node = node_list.pop()
if node.id <= id_threshold:
result_nodes.append(node)
else:
if node.left is not None:
node_list.append(node.left)
if node.right is not None:
node_list.append(node.right)
assert len(result_nodes) == n_clusters, "Length of the result_nodes list is not equal to k"
return result_nodes
[docs] def get_k_center(self, n_clusters: int) -> np.ndarray:
"""
Extract the k center approach.
It identifies clusters such that the maximum distance within a cluster is minimized.
Parameters
----------
n_clusters : int
the number of desired clusters
Returns
-------
labels : np.ndarray
The cluster labels
"""
nodes = self._traverse_until_k_clusters(n_clusters)
labels = np.zeros(self.n, dtype=np.int32)
for i, node in enumerate(nodes):
labels[np.array(node.leaves)] = i
return labels
[docs] def get_eps_for_k(self, n_clusters: int, eps: float = 1e-10):
"""
Receive the largest possible eps value such that one receives exectly n_clusters.
Parameters
----------
n_clusters : int
the number of desired clusters
eps : float
a small constaint used to differentiate from the next potential cluster (default: 1e-10)
Returns
-------
min_eps : float
The resulting epsilon value
"""
assert eps > 0, "Eps has to be a positive number"
nodes = self._traverse_until_k_clusters(n_clusters)
min_eps = np.inf
for node in nodes:
if node.parent is not None:
min_eps = min(min_eps, node.parent.dist)
return min_eps - eps
def __repr__(self) -> str:
"""
Return a string describing the DCTree.
Returns
-------
repr_string : str
The string representation of the tree
"""
if self.root is None:
return ""
pointer_right = " "
pointer_left = " " if self.root.right else " "
repr_string = f"{self.root}"
repr_string += f"{self.__repr__help(self.root.left, pointer_left, '', self.root.right is not None)}"
repr_string += f"{self.__repr__help(self.root.right, pointer_right, '', False)}"
return repr_string
def __repr__help(self, node: Optional[_DCNode], pointer: str, padding: str, has_right_sibling: bool) -> str:
"""
Helper for the __repr__ function.
Parameters
----------
node : Optional[_DCNode]
The current node
pointer : str
The pointer string
padding : str
The padding string
has_right_sibling : bool
True if node has a right sibbling
Returns
-------
repr_string : str
The string for this node
"""
if node is None:
return ""
padding_for_both = padding + (" " if has_right_sibling else " ")
pointer_right = " "
pointer_left = " " if node.right else " "
repr_string = f"\n {padding.replace('|', ' ')}// #region"
repr_string += f"\n{padding}{pointer}{node}"
repr_string += f"{self.__repr__help(node.left, pointer_left, padding_for_both, node.right is not None)}"
repr_string += f"{self.__repr__help(node.right, pointer_right, padding_for_both, False)}"
repr_string += f"\n {padding.replace('|', ' ')}// #endregion"
return repr_string
class _DCNode:
"""
A node in a DCTree.
Each node contains a set of leaves and can have a left and a right child node.
Parameters
----------
id : int
the id of the node
dist : float
the dc distance
leaves : List[int]
list of the ids of its child nodes
left : Optional[_DCNode]
the left child node (default: None)
right : Optional[_DCNode]
the right child node (default: None)
parent : Optional[_DCNode]
the parent node. If None, it will be set to itself (default: None)
root : Optional[_DCNode]
the root node (default: None)
"""
def __init__(
self,
id: int,
dist: float,
leaves: List[int],
left: Optional[_DCNode] = None,
right: Optional[_DCNode] = None,
parent: Optional[_DCNode] = None,
root: Optional[_DCNode] = None
):
self.id = id
self.dist = dist
self.leaves = leaves
self.left = left
self.right = right
if not parent:
self.parent = self
else:
self.parent = parent
self.root = root
def __repr__(self) -> str:
"""
Return a string containing the id and dist of this node.
Returns
-------
to_str : str
The string
"""
to_str = f"DCNode #{self.id} ({self.dist})"
return to_str
def __lt__(self, other_node: _DCNode) -> bool:
"""
Less than method of the node. Compares the dist with respect to the other input node.
Parameters
----------
other_node : _DCNode
the other node
Returns
-------
is_less : bool
True if dist is smaller than dist of the other node
"""
less_than = self.dist < other_node.dist
return less_than
[docs]def reachability_distances(X: np.ndarray, min_points: int = 5) -> np.ndarray:
"""
Calculates the reachability distances between points using the min_points threshold in O(n^2) time.
Parameters
----------
X : np.ndarray
the points
min_points : int, optional
min_points parameter (default: 5)
Returns
-------
reach_distances : np.ndarray
Array containing the pair-wise reachability distances
Raises
------
Raises a ValueError if min_points is larger than the number of points.
"""
if min_points > X.shape[0]:
raise ValueError(f"Min points ({min_points}) can't exceed the size of the dataset ({X.shape[0]})")
eucl_distances = squareform(pdist(X, metric="euclidean"))
if min_points > 1:
core_distances = np.partition(eucl_distances, min_points - 1, axis=1)[:, min_points - 1]
reach_distances = np.maximum(eucl_distances, np.maximum.outer(core_distances, core_distances))
np.fill_diagonal(reach_distances, 0)
else:
reach_distances = eucl_distances
return reach_distances
[docs]def minimum_spanning_tree_prims(matrix: np.ndarray, use_less_memory: bool = False, min_points: int = None) -> np.ndarray:
"""
Create a Minimum-spanning-tree of a given matrix using Prim's algorithm.
The tree will be build in O(n^2) time.
Parameters
----------
matrix : np.ndarray
The input matrix
use_less_memory : bool
If true, the MST will not directly be build for the input matrix but the matrix will be used to construct a distance matrix first.
Saves quadratic RAM usage, but also needs double the time for computing (default: False)
min_points : int
Min_points for calculating the reachability distance. Only relevant if use_less_memory is True.
If min_points is None, the euclidean distance will be used (default: None)
Returns
-------
mst_edges : np.ndarray
The edges of the Minimum-spanning-tree, represented as a (n-1, 3) matrix with entries corresponding to (node_i, node_j, dist_ij)
"""
assert (matrix.shape[0] == matrix.shape[1]) or use_less_memory, "Input matrix must be quadratic or use_less_memory must be True."
n = matrix.shape[0]
nodes_min_dist = np.full(n, np.inf)
parent = np.zeros(n, dtype=int)
not_in_mst = np.ones(n, dtype=bool)
mst_edges = np.empty((n - 1), dtype=([("i", int), ("j", int), ("dist", float)]))
# If min_points is not None, use reachability distance => calculate core distances of all points
if use_less_memory and min_points is not None:
core_distances = np.zeros(n)
for i in range(n):
eucl_distances = cdist([matrix[i]], matrix, metric="euclidean").ravel()
core_distances[i] = np.partition(eucl_distances, min_points - 1)[min_points - 1]
# Start building the MST
u = 0
nodes_min_dist[u] = 0
not_in_mst[u] = False
for i in range(n - 1):
if use_less_memory:
eucl_distances = cdist([matrix[u]], matrix, metric="euclidean").ravel()
if min_points is None:
dist_u = eucl_distances
else:
dist_u = np.maximum(eucl_distances, np.maximum(core_distances[u], core_distances))
else:
# If use_less_memory=False, 'matrix' is expected to be the precomputed distance matrix
dist_u = matrix[u]
update_mask = not_in_mst & (dist_u < nodes_min_dist)
# Update distances and parents
nodes_min_dist[update_mask] = dist_u[update_mask]
parent[update_mask] = u
# Select next closest unvisited node
masked_dists = np.where(not_in_mst, nodes_min_dist, np.inf)
u = np.argmin(masked_dists)
mst_edges[i] = (parent[u], u, nodes_min_dist[u])
not_in_mst[u] = False
return mst_edges