Source code for clustpy.deep.shade

"""
@authors:
Pascal Weber
"""

from __future__ import annotations
import numpy as np
import torch
from clustpy.utils import DCTree
from clustpy.hierarchical import DCTree_Clusterer
from clustpy.deep._abstract_deep_clustering_algo import _AbstractDeepClusteringAlgo
from clustpy.deep._data_utils import get_train_and_test_dataloader
from clustpy.deep._train_utils import get_trained_network
from clustpy.deep._utils import detect_device, squared_euclidean_distance, encode_batchwise, run_initial_clustering, mean_squared_error
from clustpy.utils.checks import check_parameters
import tqdm
from typing import Callable, Optional, Tuple
from sklearn.utils.validation import check_is_fitted
from sklearn.base import ClusterMixin


[docs]class SHADE(_AbstractDeepClusteringAlgo): """ The Structure-preserving High-dimensional Analysis with Density-based Exploration (SHADE) algorithm. A neural network (autoencoder AE) will be trained with the reconstruction loss and the d_dc loss function. Afterward, KMeans or HDBSCAN identifies the initial clusters. Parameters ---------- clustering_class : ClusterMixin clustering class to obtain the cluster labels after getting the embedding (default: DCTree_Clusterer) clustering_params : dict parameters for the clustering class. If None, it will be set to {"min_points": min_points} (default: None) min_points : int the minimum number of points (default: 5) use_complete_dc_tree : bool Defines whether the complete DC Tree should be used instead of a batch-wise version (default: True) use_matrix_dc_distance: bool Defines whether the matrix DC distance should be stored - can cause memory issues (default: True) use_less_memory: bool Use less memory when constructing the DCTree. This will, however, increase the runtime (default: False) batch_size : int Size of the data batches. (default: 500) pretrain_optimizer_params : dict parameters of the optimizer for the pretraining of the neural network, includes the learning rate. If None, it will be set to {"lr": 1e-3}. (default: None) clustering_optimizer_params : dict parameters of the optimizer for the actual clustering procedure, includes the learning rate. If None, it will be set to {"lr": 1e-4} (default: None) pretrain_epochs : int number of epochs for the pretraining of the neural network. (default: 0) clustering_epochs : int number of epochs for the actual clustering procedure (default: 100) optimizer_class : torch.optim.Optimizer the optimizer class (default: torch.optim.Adam) ssl_loss_fn : Callable | torch.nn.modules.loss._Loss self-supervised learning (ssl) loss function for training the network, e.g. reconstruction loss for autoencoders (default: mean_squared_error) neural_network : torch.nn.Module | tuple the input neural network. If None, a new FeedforwardAutoencoder will be created. Can also be a tuple consisting of the neural network class (torch.nn.Module) and the initialization parameters (dict) (default: None) neural_network_weights : str Path to a file containing the state_dict of the neural_network (default: None) embedding_size : int size of the embedding within the neural network (default: 10) density_loss_weight : float weight of the density loss compared to the reconstruction loss (default: 1.0) ssl_loss_weight : float weight of the self-supervised learning (ssl) loss (default: 1.0) custom_dataloaders : tuple tuple consisting of a trainloader (random order) at the first and a test loader (non-random order) at the second position. Can also be a tuple of strings, where the first entry is the path to a saved trainloader and the second entry the path to a saved testloader. In this case the dataloaders will be loaded by torch.load(PATH). If None, the default dataloaders will be used (default: None) device : torch.device The device on which to perform the computations. If device is None then it will be automatically chosen: if a gpu is available the gpu with the highest amount of free memory will be chosen (default: None) random_state : np.random.RandomState | int use a fixed random state to get a repeatable solution. Can also be of type int (default: None) Attributes ---------- n_clusters_ : int The final number of clusters labels_ : np.ndarray The final labels cluster_centers_ : np.ndarray The final cluster centers defined as the mean of assigned samples within the AE embedding dc_tree_ : DCTree The dc tree neural_network_trained_ : torch.nn.Module The final neural network n_features_in_ : int the number of features used for the fitting Examples -------- >>> from clustpy.data import create_subspace_data >>> data, labels = create_subspace_data(1500, subspace_features=(3, 50), random_state=1) >>> shade = SHADE() >>> shade.fit(data) References ---------- SHADE: Deep Density-based Clustering Anna Beer; Pascal Weber; Lukas Miklautz; Collin Leiber; Walid Durani; Christian Böhm IEEE International Conference on Data Mining (ICDM), Abu Dhabi, United Arab Emirates, 2024, pp. 675-680, doi: 10.1109/ICDM59182.2024. """ def __init__( self, clustering_class : Optional[ClusterMixin] = DCTree_Clusterer, clustering_params : dict = None, min_points : int = 5, use_complete_dc_tree: bool = True, use_matrix_dc_distance: bool = True, use_less_memory: bool = False, batch_size: int = 500, pretrain_optimizer_params: dict = None, clustering_optimizer_params : dict = None, pretrain_epochs : int = 0, clustering_epochs : int = 100, optimizer_class: torch.optim.Optimizer = torch.optim.Adam, ssl_loss_fn : Callable | torch.nn.modules.loss._Loss = mean_squared_error, neural_network : torch.nn.Module | tuple = None, neural_network_weights : str = None, embedding_size : int = 10, density_loss_weight : float = 1.0, ssl_loss_weight : float = 1.0, custom_dataloaders : tuple = None, device : torch.device = None, random_state : np.random.RandomState | int = None, ): super().__init__(batch_size, neural_network, neural_network_weights, embedding_size, device, random_state) self.clustering_class = clustering_class self.clustering_params = clustering_params self.min_points = min_points self.use_complete_dc_tree = use_complete_dc_tree self.use_matrix_dc_distance = use_matrix_dc_distance self.use_less_memory = use_less_memory self.pretrain_optimizer_params = pretrain_optimizer_params self.clustering_optimizer_params = clustering_optimizer_params self.pretrain_epochs = pretrain_epochs self.clustering_epochs = clustering_epochs self.optimizer_class = optimizer_class self.ssl_loss_fn = ssl_loss_fn self.density_loss_weight = density_loss_weight self.ssl_loss_weight = ssl_loss_weight self.custom_dataloaders = custom_dataloaders
[docs] def fit(self, X: np.ndarray, y: np.ndarray=None) -> SHADE: """ Cluster the input dataset with the SHADE algorithm. The resulting cluster labels will be stored in the `labels_` attribute. Parameters ---------- X : np.ndarray The given data set. y : np.ndarray The labels. (can be ignored) Returns ------- self : SHADE This instance of the SHADE algorithm. """ X, _, random_state, pretrain_optimizer_params, _, _ = self._check_parameters(X, y=y) clustering_optimizer_params = {"lr": 1e-3} if self.clustering_optimizer_params is None else self.clustering_optimizer_params clustering_params = {"min_points": self.min_points, "use_less_memory": self.use_less_memory} if self.clustering_params is None else self.clustering_params device = detect_device(self.device) trainloader, testloader, batch_size = get_train_and_test_dataloader(X, self.batch_size, self.custom_dataloaders) assert batch_size >= self.min_points, f"Batch_size ({batch_size}) cannot be smaller than min_points ({self.min_points})" # Create dc_tree if self.use_complete_dc_tree: self.dc_tree_ = DCTree(X, min_points=self.min_points, use_less_memory=self.use_less_memory) else: self.dc_tree_ = None # Create and pretrain Autoencoder neural_network_params = {"layers": [X.shape[1], 512, 256, 128, self.embedding_size]} neural_network = get_trained_network(trainloader, n_epochs=self.pretrain_epochs, optimizer_params=pretrain_optimizer_params, optimizer_class=self.optimizer_class, device=device, ssl_loss_fn=self.ssl_loss_fn, embedding_size=self.embedding_size, neural_network=self.neural_network, neural_network_weights=self.neural_network_weights, neural_network_params=neural_network_params, random_state=random_state) # Setup SHADE Module shade_module = _SHADE_Module( n_epochs=self.clustering_epochs, neural_network=neural_network, min_points=self.min_points, dc_tree=self.dc_tree_, use_matrix_dc_distance=self.use_matrix_dc_distance, device=device, ssl_loss_fn=self.ssl_loss_fn, density_loss_weight=self.density_loss_weight, ssl_loss_weight=self.ssl_loss_weight ) optimizer = self.optimizer_class(list(neural_network.parameters()), **clustering_optimizer_params) shade_module.fit(X, trainloader, optimizer) # Get labels embedded_data = encode_batchwise(testloader, neural_network) n_clusters, labels, cluster_centers, _ = run_initial_clustering( X=embedded_data, n_clusters=None, clustering_class=self.clustering_class, clustering_params=clustering_params, random_state=random_state, ) self.n_clusters_ = n_clusters self.labels_ = labels self.cluster_centers_ = cluster_centers self.neural_network_trained_ = neural_network self.set_n_featrues_in(X) return self
[docs] def predict(self, X: np.ndarray) -> np.ndarray: """ Predicts the labels of the input data. Note that this is just a very imprecise estimation as we are not using the DC Tree to predict the labels. The prediction is calculated by checking the distance to the clostest mean of samples in a cluster within the embedding of the AE. Parameters ---------- X : np.ndarray input data Returns ------- predicted_labels : np.ndarray The predicted labels """ check_is_fitted(self, ["labels_", "neural_network_trained_", "n_features_in_"]) X, _, _ = check_parameters(X, allow_size_1=True, allow_nd=self.neural_network_trained_.allow_nd_input, estimator_obj=self) print("WARNING: predict does not use the embedding of the manifold and is, therefore, just a very rough estimate") predicted_labels = super().predict(X) return predicted_labels
class _SHADE_Module(torch.nn.Module): """ The _SHADE_Module. Contains most of the algorithm specific procedures like the loss function. Parameters ---------- n_epochs : int number of epochs for the clustering procedure neural_network : torch.nn.Module the neural network min_points : int the minimum number of points dc_tree : Optional[DCTree] the DCTree use_matrix_dc_distance: bool Defines whether the matrix DC distance should be stored - can cause memory issues device : torch.device device to be trained on ssl_loss_fn : Callable | torch.nn.modules.loss._Loss self-supervised learning (ssl) loss function for training the network, e.g. reconstruction loss for autoencoders density_loss_weight : float weight of the clustering loss ssl_loss_weight : float weight of the self-supervised learning (ssl) loss """ def __init__( self, n_epochs : int, neural_network: torch.nn.Module, min_points: int, dc_tree: Optional[DCTree], use_matrix_dc_distance: bool, device: torch.device, ssl_loss_fn: Callable | torch.nn.modules.loss._Loss, density_loss_weight: float, ssl_loss_weight: float ): super().__init__() self.n_epochs = n_epochs self.neural_network = neural_network self.min_points = min_points self.dc_tree = dc_tree self.use_matrix_dc_distance = use_matrix_dc_distance self.device = device self.ssl_loss_fn = ssl_loss_fn self.density_loss_weight = density_loss_weight self.ssl_loss_weight = ssl_loss_weight def fit( self, X: np.ndarray, trainloader: torch.utils.data.DataLoader, optimizer: torch.optim.Optimizer ) -> _SHADE_Module: """ Trains the _SHADE_Module in place. Parameters ---------- X : np.ndarray The data trainloader : torch.utils.data.DataLoader dataloader to be used for training optimizer : torch.optim.Optimizer the optimizer for training Returns ------- self : _SHADE_Module This instance of the _SHADE_Module. """ if self.dc_tree is not None and self.use_matrix_dc_distance: matrix_dc_distance = self.dc_tree.dc_distances() matrix_dc_distance_torch = torch.tensor(matrix_dc_distance, device=self.device) else: matrix_dc_distance_torch = None self.train() tbar = tqdm.trange(self.n_epochs, desc="SHADE training") for _ in tbar: # Update Network for batch in trainloader: if len(batch[0]) <= self.min_points: continue loss = self._loss(X, batch, matrix_dc_distance_torch) # Backward pass - update weights optimizer.zero_grad() loss.backward() optimizer.step() postfix_str = {"Loss": loss} tbar.set_postfix(postfix_str) self.neural_network.eval() self.eval() return self def _loss( self, X: np.ndarray, batch: list, matrix_dc_distance_torch: torch.Tensor ) -> Tuple[torch.Tensor, torch.Tensor]: """ Calculate the autoencoder reconstruction + d_dc loss. Parameters ---------- X : np.ndarray The data batch : list The minibatch. matrix_dc_distance_torch : torch.Tensor A matrix containing pairwise dc distances Returns ------- loss : torch.Tensor The final SHADE loss. """ # Reconstrucion ssl_loss, embedded, _ = self.neural_network.loss(batch, self.ssl_loss_fn, self.device) # Density loss if self.dc_tree is None: # Batch-wise DCTree dc_distances = DCTree(X[batch[0]], min_points=self.min_points).dc_distances() batch_dc_dists = torch.tensor(dc_distances, device=self.device) else: # DCTree of all data points X if self.use_matrix_dc_distance: idxs = batch[0].to(self.device) batch_dc_dists = matrix_dc_distance_torch[idxs[:, None], idxs[None, :]] else: dc_distances = self.dc_tree.dc_distances(batch[0], batch[0]) batch_dc_dists = torch.tensor(dc_distances, device=self.device) batch_eucl_dists = squared_euclidean_distance(embedded, embedded) loss_dens = (batch_eucl_dists - batch_dc_dists).pow(2).mean() loss = self.ssl_loss_weight * ssl_loss + self.density_loss_weight * loss_dens return loss