"""
@authors:
Collin Leiber
"""
from sklearn.neighbors import NearestNeighbors
import numpy as np
from sklearn.base import BaseEstimator, ClusterMixin
def _multi_density_dbscan(X: np.ndarray, k: int, var: float, min_cluster_size: int) -> (int, np.ndarray, list):
"""
Start the actual Multiple Density DBSCAN clustering procedure on the input data set.
Parameters
----------
X : np.ndarray
the given data set
k : int
the number of neighbors to consider
var : float
Defines the factor that the density of a point may deviate from the average cluster density
min_cluster_size : int
The minimum cluster size (if a cluster is smaller, all contained points will be labeled as noise)
Returns
-------
tuple : (int, np.ndarray, list)
The identified number of clusters
The cluster labels
The final cluster densities
"""
assert k <= X.shape[0], "The number of nearest neighbors k can not be larger than the number of data points"
assert var >= 1, "var must be >= 1"
assert min_cluster_size > 1, "min_cluster_size must be > 1"
# Get k nearest neighbors and densities for each point
nearest_neighbors = NearestNeighbors(n_neighbors=k + 1).fit(X)
densities, knns = nearest_neighbors.kneighbors(X, n_neighbors=k + 1)
knns = knns[:, 1:]
densities = np.mean(densities[:, 1:], axis=1)
# Order densities
order = np.argsort(densities)
# Start parameters
labels = -np.ones(X.shape[0], dtype=np.int32)
cluster_densities = []
c_id = 0
# Iterate over all points
for p1 in order:
if labels[p1] != -1:
# Point is already assigned to a cluster
continue
cluster_points, cluster_density = _gather(p1, c_id, densities, knns, labels, var)
# Check if cluster is large enough
if len(cluster_points) >= min_cluster_size:
c_id += 1
cluster_densities.append(cluster_density)
else:
labels[cluster_points] = -1
n_clusters = c_id
return n_clusters, labels, cluster_densities
def _gather(p1: int, c_id: int, densities: np.ndarray, knns: np.ndarray, labels: np.ndarray, var: float) -> (
list, float):
"""
Expand the current cluster (consisting of a single most dense point).
Check each added point's neighbors to see if their density is low enough to add them the cluster.
Parameters
----------
p1 : int
The id of the starting point of the cluster (most dense point)
c_id : int
The cluster id of the current cluster
densities : np.ndarray
The densities of all points
knns : np.ndarray
The k-nearest neighbors of all points
labels : np.ndarray
The current cluster labels
var : float
Defines the factor that the density of a point may deviate from the average cluster density
Returns
-------
tuple : (list, float)
The ids of the points in this cluster
The density of the cluster
"""
# Add point to cluster and assign Label
cluster_points = [p1]
labels[p1] = c_id
# Get neighbors of point 1
neighbors = [kn for kn in knns[p1, :] if labels[kn] == -1]
neighbors = _sort_neighbors_by_densities(neighbors, densities)
# Set start density of the cluster
cluster_density = densities[p1]
while len(neighbors) > 0:
p2 = neighbors.pop(0)
if labels[p2] == -1:
density_p2 = densities[p2]
# Is density of point 2 high enough?
if density_p2 <= var * cluster_density:
# Add point to cluster and assign Label
cluster_points.append(p2)
labels[p2] = c_id
# Update Cluster density
cluster_density = (cluster_density * (len(cluster_points) - 1) + density_p2) / len(cluster_points)
# Add new neighbors
neighbors = _add_neighbors_to_neighbor_list(densities, labels, neighbors, knns[p2, :])
return cluster_points, cluster_density
def _sort_neighbors_by_densities(neighbors: list, densities: np.ndarray) -> list:
"""
Sort the available neighbors by their densities. Sort in ascending order.
If densities are equal samples will be sorted by their id.
Parameters
----------
neighbors : list
the ids of the neighbors
densities : np.ndarray
the densities
Returns
-------
neighbors : list
the ids of the neighbors sorted by densities
"""
neighbors = sorted(neighbors, key=lambda x: (densities[x], x))
return neighbors
def _add_neighbors_to_neighbor_list(densities: np.ndarray, labels: np.ndarray, current_neighbors: list,
new_neighbors: np.ndarray) -> list:
"""
Add the new neighbors to the neighboring list.
Make sure that they are correctly sorted according to their density.
Result should be equal to:
current_neighbors += [kn for kn in new_neighbors if labels[kn] == -1]
current_neighbors = list(set(current_neighbors))
current_neighbors = _sort_neighbors_by_densities(neighbors, densities)
Parameters
----------
densities : np.ndarray
The densities of all points
labels : np.ndarray
The current cluster labels
current_neighbors : list
The current list of neighbors of cluster objects
new_neighbors : list
The new neighbors that should be added to the neighbor list
Returns
-------
current_neighbors : list
The updated neighbor list
"""
# ignore points that are already assigned
new_neighbors = new_neighbors[labels[new_neighbors] == -1]
# Sort new neighbors by density
sorted_new_neighbors = _sort_neighbors_by_densities(new_neighbors, densities)
# Get densities of current neighbors
index = 0
for p in sorted_new_neighbors:
while index < len(current_neighbors) and (densities[p] > densities[current_neighbors[index]] or (
densities[p] == densities[current_neighbors[index]] and p > current_neighbors[index])):
index += 1
if index == len(current_neighbors) or current_neighbors[index] != p:
current_neighbors.insert(index, p) # Add new neighbor, ignore if point already in neighbor list
index += 1
return current_neighbors
[docs]class MultiDensityDBSCAN(BaseEstimator, ClusterMixin):
"""
The Multi Density DBSCAN algorithm.
First, the densities of all data points will be calculated.
Afterwards, clusters will be expanded starting with the most dense point.
Density is defined as the average distance to the k-nearest neighbors.
Parameters
----------
k : int
The number of nearest neighbors. Does not include the objects itself (default: 15)
var : float
Defines the factor that the density of a point may deviate from the average cluster density (default: 2.5)
min_cluster_size : int
The minimum cluster size (if a cluster is smaller, all contained points will be labeled as noise) (default: 2)
Attributes
----------
n_clusters_ : int
The identified number of clusters
labels_ : np.ndarray
The final labels
cluster_densities_ : list
The final cluster densities
References
----------
Ashour, Wesam, and Saad Sunoallah. "Multi density DBSCAN."
International Conference on Intelligent Data Engineering and Automated Learning. Springer, Berlin, Heidelberg, 2011.
"""
def __init__(self, k: int = 15, var: float = 2.5, min_cluster_size: int = 2):
self.k = k
self.var = var
self.min_cluster_size = min_cluster_size
[docs] def fit(self, X: np.ndarray, y: np.ndarray = None) -> 'MultiDensityDBSCAN':
"""
Initiate the actual clustering process on the input data set.
The resulting cluster labels will be stored in the labels_ attribute.
Parameters
----------
X : np.ndarray
the given data set
y : np.ndarray
the labels (can be ignored)
Returns
-------
self : MultiDensityDBSCAN
this instance of the Multi Density DBSCAN algorithm
"""
n_clusters, labels, cluster_densities = _multi_density_dbscan(X, self.k, self.var, self.min_cluster_size)
self.n_clusters_ = n_clusters
self.labels_ = labels
self.cluster_densities_ = cluster_densities
return self