"""
@authors:
Collin Leiber
"""
import torch
import numpy as np
from clustpy.deep._utils import detect_device, encode_batchwise, run_initial_clustering, mean_squared_error
from clustpy.deep._data_utils import get_train_and_test_dataloader
from clustpy.deep._train_utils import get_trained_network
from clustpy.deep._abstract_deep_clustering_algo import _AbstractDeepClusteringAlgo
from sklearn.manifold import TSNE
from scipy.spatial.distance import pdist, squareform
from sklearn.base import TransformerMixin, BaseEstimator, ClusterMixin
from sklearn.mixture import GaussianMixture as GMM
import inspect
from collections.abc import Callable
from clustpy.utils.checks import check_parameters
from pathlib import Path
def _manifold_based_sequential_dc(X: np.ndarray, n_clusters: int, batch_size: int, pretrain_optimizer_params: dict,
pretrain_epochs: int, optimizer_class: torch.optim.Optimizer,
ssl_loss_fn: Callable | torch.nn.modules.loss._Loss, neural_network: torch.nn.Module | tuple,
neural_network_weights: str | Path, embedding_size: int, custom_dataloaders: tuple,
manifold_class: TransformerMixin, manifold_params: dict,
clustering_class: ClusterMixin, clustering_params: dict, device: torch.device,
random_state: np.random.RandomState) -> (
int, np.ndarray, np.ndarray, torch.nn.Module, TransformerMixin):
"""
Execute a manifold-based sequential deep clustering procedure on the input data set.
Parameters
----------
X : np.ndarray / torch.Tensor
the given data set. Can be a np.ndarray or a torch.Tensor
n_clusters : int
number of clusters (can be None)
batch_size : int
size of the data batches
pretrain_optimizer_params : dict
parameters of the optimizer for the pretraining of the neural network, includes the learning rate
pretrain_epochs : int
number of epochs for the pretraining of the neural network
optimizer_class : torch.optim.Optimizer
the optimizer class
ssl_loss_fn : Callable | torch.nn.modules.loss._Loss
self-supervised learning (ssl) loss function for training the network, e.g. reconstruction loss for autoencoders
neural_network : torch.nn.Module | tuple
the input neural network.
Can also be a tuple consisting of the neural network class (torch.nn.Module) and the initialization parameters (dict)
neural_network_weights : str | Path
Path to a file containing the state_dict of the neural_network.
embedding_size : int
size of the embedding within the neural network
custom_dataloaders : tuple
tuple consisting of a trainloader (random order) at the first and a test loader (non-random order) at the second position.
Can also be a tuple of strings, where the first entry is the path to a saved trainloader and the second entry the path to a saved testloader.
In this case the dataloaders will be loaded by torch.load(PATH).
If None, the default dataloaders will be used
manifold_class : TransformerMixin
the manifold technique class
manifold_params : dict
Parameters for the manifold technique. Check out e.g. sklearn.manifold.TSNE for more information
clustering_class : ClusterMixin
clustering class to obtain the cluster labels after pretraining the neural network and learning the manifold
clustering_params : dict
parameters for the clustering class
device : torch.device
The device on which to perform the computations
random_state : np.random.RandomState
use a fixed random state to get a repeatable solution
Returns
-------
tuple : (int, np.ndarray, np.ndarray, torch.nn.Module, TransformerMixin)
The number of clusters,
The cluster labels,
The cluster centers in the embedding of the AE,
The cluster centers in the embedding of the manifold algorithm,
The final neural network,
The Manifold object
"""
# Get the device to train on
device = detect_device(device)
trainloader, testloader, _ = get_train_and_test_dataloader(X, batch_size, custom_dataloaders)
# Get initial AE
neural_network = get_trained_network(trainloader, n_epochs=pretrain_epochs,
optimizer_params=pretrain_optimizer_params, optimizer_class=optimizer_class,
device=device, ssl_loss_fn=ssl_loss_fn, embedding_size=embedding_size,
neural_network=neural_network, neural_network_weights=neural_network_weights,
random_state=random_state)
# Encode data
X_embed = encode_batchwise(testloader, neural_network)
# Get possible input parameters of the manifold class
manifold_class_parameters = inspect.getfullargspec(manifold_class).args + inspect.getfullargspec(
manifold_class).kwonlyargs
if "random_state" not in manifold_params.keys() and "random_state" in manifold_class_parameters:
manifold_params = manifold_params.copy()
manifold_params["random_state"] = random_state
# Execute Manifold
manifold = manifold_class(**manifold_params)
X_manifold = manifold.fit_transform(X_embed)
# Execute Clustering Algorithm
n_clusters, labels, centers_manifold, _ = run_initial_clustering(X_manifold, n_clusters, clustering_class,
clustering_params, random_state)
# Calculate centers in the embedding of the AE
centers_ae = np.array([np.mean(X_embed[labels == c], axis=0) for c in range(n_clusters)])
return n_clusters, labels, centers_ae, centers_manifold, neural_network, manifold
[docs]class DDC_density_peak_clustering(ClusterMixin, BaseEstimator):
"""
A variant of the Density Peak Algorithm as proposed in the DDC paper.
Parameters
----------
ratio : float
The ratio parameter, defining the cutoff distance d_c by calculating: average pairwise distance * ratio
Attributes
----------
n_clusters_ : int
The final number of clusters
labels_ : np.ndarray
The final labels
n_features_in_ : int
the number of features used for the fitting
References
----------
Ren, Yazhou, et al. "Deep density-based image clustering."
Knowledge-Based Systems 197 (2020): 105841.
"""
def __init__(self, ratio: float):
self.ratio = ratio
[docs] def fit(self, X: np.ndarray, y: np.ndarray = None) -> 'DDC_density_peak_clustering':
"""
Initiate the actual clustering process on the input data set.
The resulting cluster labels will be stored in the labels_ attribute.
Parameters
----------
X : np.ndarray
the given data set
y : np.ndarray
the labels (can be ignored)
Returns
-------
self : DDC_density_peak_clustering
this instance of the DDC variant of the Density Peak Clsutering algorithm
"""
X, _, _ = check_parameters(X=X, y=y)
n_clusters, labels = _density_peak_clustering(X, self.ratio)
self.n_clusters_ = n_clusters
self.labels_ = labels
self.n_features_in_ = X.shape[1]
return self
def _density_peak_clustering(X: np.ndarray, ratio: float) -> (int, np.ndarray):
"""
Execute the variant of the Density Peak Algorithm as proposed in the paper.
Parameters
----------
X : np.ndarray
The given data set
ratio : float
The ratio parameter, defining the cutoff distance d_c by calculating: average pairwise distance * ratio
Returns
-------
tuple : (int,np.ndarray)
The number of clusters,
The cluster labels
"""
distances = pdist(X)
max_dist = np.max(distances)
d_c = np.mean(distances) * ratio
if d_c >= max_dist:
d_c = max_dist - 1e-8 # d_c can not be larger than the max distance
print(
"[WARNING] ratio parameter was chosen too large (ratio={0}). It is recommended to set ratio smaller than 1. d_c will be set to the maximum possible value of {1}".format(
ratio, d_c))
# Calculate rho_i
adj_distancse = np.exp(-((distances / d_c) ** 2)) # Equation 7
rhos = np.sum(squareform(adj_distancse), axis=1)
avg_rho = np.mean(rhos) # Below Equation 9
# Calculate delta_i and search for local cluster centers
distances = squareform(distances) # Convert distances to symmetric matrix
deltas = np.zeros(X.shape[0])
labels = np.full(X.shape[0], -1, np.int32)
cluster_rhos = np.zeros((0, 2))
cluster_id = 0
chain_of_ids = []
queue = list(range(X.shape[0]))
while len(queue) > 0:
i = queue.pop(0)
if labels[i] == -1:
chain_of_ids.append(i)
distances_i = distances[i].copy()
distances_i[rhos <= rhos[i]] = max_dist # Equation 8
nn_with_higher_dens = np.argmin(distances_i) # Equation 8
deltas[i] = distances_i[nn_with_higher_dens] # Equation 8
# Check if i is local cluster center
if deltas[i] > d_c and rhos[i] > avg_rho: # Equation 9
labels[chain_of_ids] = cluster_id
cluster_rhos = np.r_[cluster_rhos, [[np.sum(rhos[chain_of_ids]), len(chain_of_ids)]]]
cluster_id += 1
chain_of_ids = []
elif labels[nn_with_higher_dens] != -1:
labels[chain_of_ids] = labels[nn_with_higher_dens]
cluster_rhos[labels[nn_with_higher_dens], 0] += np.sum(rhos[chain_of_ids])
cluster_rhos[labels[nn_with_higher_dens], 1] += len(chain_of_ids)
chain_of_ids = []
else:
queue.insert(0, nn_with_higher_dens)
# ==> Start Merging of clusters
# Average rho of clusters
avg_cluster_rho = cluster_rhos[:, 0] / cluster_rhos[:, 1]
# Get core points
ids_core_points = np.where(rhos > avg_cluster_rho[labels])[0] # Equation 10
# Are clusters density connected?
for i in range(len(ids_core_points) - 1):
core_point_i = ids_core_points[i]
for j in range(i + 1, len(ids_core_points)):
core_point_j = ids_core_points[j]
if distances[core_point_i, core_point_j] < d_c and labels[core_point_i] != labels[
core_point_j]: # Equation 11
min_label = min(labels[core_point_i], labels[core_point_j])
max_label = max(labels[core_point_i], labels[core_point_j])
labels[labels == max_label] = min_label
labels[labels > max_label] -= 1
cluster_id -= 1
return cluster_id, labels
[docs]class DDC(_AbstractDeepClusteringAlgo):
"""
The Deep Density-based Image Clustering (DDC) algorithm.
First, a neural network will be trained (will be skipped if input neural network is given).
Afterward, t-SNE is executed on the embedded data and a variant of the Density Peak Clustering algorithm is executed.
Parameters
----------
ratio : float
The ratio parameter, defining the cutoff distance d_c by calculating: average pairwise distance * ratio (default: 0.1)
batch_size : int
size of the data batches (default: 256)
pretrain_optimizer_params : dict
parameters of the optimizer for the pretraining of the neural network, includes the learning rate. If None, it will be set to {"lr": 1e-3} (default: None)
pretrain_epochs : int
number of epochs for the pretraining of the neural network (default: 100)
optimizer_class : torch.optim.Optimizer
the optimizer class (default: torch.optim.Adam)
ssl_loss_fn : Callable | torch.nn.modules.loss._Loss
self-supervised learning (ssl) loss function for training the network, e.g. reconstruction loss for autoencoders (default: mean_squared_error)
neural_network : torch.nn.Module | tuple
the input neural network. If None, a new FeedforwardAutoencoder will be created.
Can also be a tuple consisting of the neural network class (torch.nn.Module) and the initialization parameters (dict) (default: None)
neural_network_weights : str | Path
Path to a file containing the state_dict of the neural_network (default: None)
embedding_size : int
size of the embedding within the neural network (default: 10)
custom_dataloaders : tuple
tuple consisting of a trainloader (random order) at the first and a test loader (non-random order) at the second position.
Can also be a tuple of strings, where the first entry is the path to a saved trainloader and the second entry the path to a saved testloader.
In this case the dataloaders will be loaded by torch.load(PATH).
If None, the default dataloaders will be used (default: None)
tsne_params : dict
Parameters for the t-SNE execution. For example, perplexity can be changed by setting tsne_params to {"n_components": 2, "perplexity": 25}.
Check out sklearn.manifold.TSNE for more information. If None, it will be set to {"n_components": 2} (default: None)
device : torch.device
The device on which to perform the computations.
If device is None then it will be automatically chosen: if a gpu is available the gpu with the highest amount of free memory will be chosen (default: None)
random_state : np.random.RandomState | int
use a fixed random state to get a repeatable solution. Can also be of type int (default: None)
Attributes
----------
n_clusters_ : int
The final number of clusters
labels_ : np.ndarray
The final labels (obtained by a variant of Density Peak Clustering)
neural_network_trained_ : torch.nn.Module
The final neural network
tsne_ : TSNE
The t-SNE object
n_features_in_ : int
the number of features used for the fitting
cluster_centers_ : np.ndarray
The final cluster centers defined as the mean of assigned samples within the AE embedding
Examples
----------
>>> from clustpy.data import create_subspace_data
>>> from clustpy.deep import DDC
>>> data, labels = create_subspace_data(1500, subspace_features=(3, 50), random_state=1)
>>> ddc = DDC(pretrain_epochs=3)
>>> ddc.fit(data)
References
----------
Ren, Yazhou, et al. "Deep density-based image clustering."
Knowledge-Based Systems 197 (2020): 105841.
"""
def __init__(self, ratio: float = 0.1, batch_size: int = 256, pretrain_optimizer_params: dict = None,
pretrain_epochs: int = 100, optimizer_class: torch.optim.Optimizer = torch.optim.Adam,
ssl_loss_fn: Callable | torch.nn.modules.loss._Loss = mean_squared_error,
neural_network: torch.nn.Module | tuple = None, neural_network_weights: str | Path = None,
embedding_size: int = 10, custom_dataloaders: tuple = None, tsne_params: dict = None,
device: torch.device = None, random_state: np.random.RandomState | int = None):
super().__init__(batch_size, neural_network, neural_network_weights, embedding_size, device, random_state)
self.ratio = ratio
self.pretrain_optimizer_params = pretrain_optimizer_params
self.pretrain_epochs = pretrain_epochs
self.optimizer_class = optimizer_class
self.ssl_loss_fn = ssl_loss_fn
self.custom_dataloaders = custom_dataloaders
self.tsne_params = tsne_params
[docs] def fit(self, X: np.ndarray, y: np.ndarray = None) -> 'DDC':
"""
Initiate the actual clustering process on the input data set.
The resulting cluster labels will be stored in the labels_ attribute.
Parameters
----------
X : np.ndarray
the given data set
y : np.ndarray
the labels (can be ignored)
Returns
-------
self : DDC
this instance of the DDC algorithm
"""
X, _, random_state, pretrain_optimizer_params, _, _ = self._check_parameters(X, y=y)
tsne_params = {"n_components": 2} if self.tsne_params is None else self.tsne_params
if self.ratio > 1:
print("[WARNING] ratio for DDC algorithm has been set to a value > 1 which can cause poor results")
n_clusters, labels, centers_ae, _, neural_network, tsne = _manifold_based_sequential_dc(X, None, self.batch_size,
pretrain_optimizer_params,
self.pretrain_epochs,
self.optimizer_class,
self.ssl_loss_fn,
self.neural_network,
self.neural_network_weights,
self.embedding_size,
self.custom_dataloaders, TSNE,
tsne_params,
DDC_density_peak_clustering,
{"ratio": self.ratio}, self.device,
random_state)
self.labels_ = labels
self.n_clusters_ = n_clusters
self.cluster_centers_ = centers_ae
self.neural_network_trained_ = neural_network
self.tsne_ = tsne
self.set_n_featrues_in(X)
return self
[docs] def predict(self, X: np.ndarray,) -> np.ndarray:
"""
Predicts the labels of the input data.
Note that this is just a very imprecise estimation as the manifold does not learn a function f() to map the data into the final embedding.
Therefore, the prediction is calculated by checking the distance to the clostest mean of samples in a cluster within the embedding of the AE.
Parameters
----------
X : np.ndarray
input data
Returns
-------
predicted_labels : np.ndarray
The predicted labels
"""
print("WARNING: predict does not use the embedding of the manifold and is, therefore, just a very rough estimate")
predicted_labels = super().predict(X)
return predicted_labels
[docs]class N2D(_AbstractDeepClusteringAlgo):
"""
The Not 2 Deep (N2D) clustering algorithm.
First, a neural network will be trained (will be skipped if input neural network is given).
Afterward, t-SNE/UMAP/ISOMAP is executed on the embedded data and the EM algorithm is executed.
Parameters
----------
n_clusters : int
number of clusters (default: 8)
batch_size : int
size of the data batches (default: 256)
pretrain_optimizer_params : dict
parameters of the optimizer for the pretraining of the neural network, includes the learning rate. If None, it will be set to {"lr": 1e-3} (default: None)
pretrain_epochs : int
number of epochs for the pretraining of the neural network (default: 100)
optimizer_class : torch.optim.Optimizer
the optimizer class (default: torch.optim.Adam)
ssl_loss_fn : Callable | torch.nn.modules.loss._Loss
self-supervised learning (ssl) loss function for training the network, e.g. reconstruction loss for autoencoders (default: mean_squared_error)
neural_network : torch.nn.Module | tuple
the input neural network. If None, a new FeedforwardAutoencoder will be created.
Can also be a tuple consisting of the neural network class (torch.nn.Module) and the initialization parameters (dict) (default: None)
neural_network_weights : str | Path
Path to a file containing the state_dict of the neural_network (default: None)
embedding_size : int
size of the embedding within the neural network (default: 10)
custom_dataloaders : tuple
tuple consisting of a trainloader (random order) at the first and a test loader (non-random order) at the second position.
Can also be a tuple of strings, where the first entry is the path to a saved trainloader and the second entry the path to a saved testloader.
In this case the dataloaders will be loaded by torch.load(PATH).
If None, the default dataloaders will be used (default: None)
manifold_class : TransformerMixin
the manifold technique class (default: TSNE)
manifold_params : dict
Parameters for the manifold execution. For example, perplexity can be changed for TSNE by setting manifold_params to {"n_components": 2, "perplexity": 25}.
Check out e.g. sklearn.manifold.TSNE for more information. If None, it will be set to {"n_components": n_clusters} (default: None)
initial_clustering_params : dict
parameters for the GMM clustering class. If None, it will be set to {} (default: None)
device : torch.device
The device on which to perform the computations.
If device is None then it will be automatically chosen: if a gpu is available the gpu with the highest amount of free memory will be chosen (default: None)
random_state : np.random.RandomState | int
use a fixed random state to get a repeatable solution. Can also be of type int (default: None)
Attributes
----------
labels_ : np.ndarray
The final labels
cluster_centers_manifold_ : np.ndarray
The final cluster centers within the embedding of the manifold
neural_network_trained_ : torch.nn.Module
The final neural network
manifold_ : TransformerMixin
The manifold object
n_features_in_ : int
the number of features used for the fitting
cluster_centers_ : np.ndarray
The final cluster centers defined as the mean of assigned samples within the AE embedding
References
----------
McConville, Ryan, et al. "N2d:(not too) deep clustering via clustering the local manifold of an autoencoded embedding."
2020 25th international conference on pattern recognition (ICPR). IEEE, 2021.
"""
def __init__(self, n_clusters: int = 8, batch_size: int = 256, pretrain_optimizer_params: dict = None,
pretrain_epochs: int = 100, optimizer_class: torch.optim.Optimizer = torch.optim.Adam,
ssl_loss_fn: Callable | torch.nn.modules.loss._Loss = mean_squared_error,
neural_network: torch.nn.Module | tuple = None, neural_network_weights: str | Path = None,
embedding_size: int = 10, custom_dataloaders: tuple = None, manifold_class: TransformerMixin = TSNE,
manifold_params: dict = None, initial_clustering_params: dict = None, device: torch.device = None,
random_state: np.random.RandomState | int = None):
super().__init__(batch_size, neural_network, neural_network_weights, embedding_size, device, random_state)
self.n_clusters = n_clusters
self.pretrain_optimizer_params = pretrain_optimizer_params
self.pretrain_epochs = pretrain_epochs
self.optimizer_class = optimizer_class
self.ssl_loss_fn = ssl_loss_fn
self.custom_dataloaders = custom_dataloaders
self.manifold_class = manifold_class
self.manifold_params = manifold_params
self.initial_clustering_params = initial_clustering_params
[docs] def fit(self, X: np.ndarray, y: np.ndarray = None) -> 'N2D':
"""
Initiate the actual clustering process on the input data set.
The resulting cluster labels will be stored in the labels_ attribute.
Parameters
----------
X : np.ndarray
the given data set
y : np.ndarray
the labels (can be ignored)
Returns
-------
self : N2D
this instance of the N2D algorithm
"""
X, _, random_state, pretrain_optimizer_params, _, initial_clustering_params = self._check_parameters(X, y=y)
manifold_params = {"n_components": self.n_clusters} if self.manifold_params is None else self.manifold_params
_, labels, centers_ae, centers_manifold, neural_network, manifold = _manifold_based_sequential_dc(X, self.n_clusters,
self.batch_size,
pretrain_optimizer_params,
self.pretrain_epochs,
self.optimizer_class,
self.ssl_loss_fn,
self.neural_network,
self.neural_network_weights,
self.embedding_size,
self.custom_dataloaders,
self.manifold_class,
manifold_params,
GMM, initial_clustering_params,
self.device,
random_state)
self.labels_ = labels.astype(np.int32)
self.cluster_centers_manifold_ = centers_manifold
self.cluster_centers_ = centers_ae
self.neural_network_trained_ = neural_network
self.manifold_ = manifold
self.set_n_featrues_in(X)
return self
[docs] def predict(self, X: np.ndarray,) -> np.ndarray:
"""
Predicts the labels of the input data.
Note that this is just a very imprecise estimation as the manifold does not learn a function f() to map the data into the final embedding.
Therefore, the prediction is calculated by checking the distance to the clostest mean of samples in a cluster within the embedding of the AE.
Parameters
----------
X : np.ndarray
input data
Returns
-------
predicted_labels : np.ndarray
The predicted labels
"""
print("WARNING: predict does not use the embedding of the manifold and is, therefore, just a very rough estimate")
predicted_labels = super().predict(X)
return predicted_labels