"""
@authors:
Collin Leiber
"""
import torch
import numpy as np
from clustpy.deep._utils import detect_device, encode_batchwise, mean_squared_error
from clustpy.deep._data_utils import get_train_and_test_dataloader
from clustpy.deep._train_utils import get_neural_network
from clustpy.deep._abstract_deep_clustering_algo import _AbstractDeepClusteringAlgo
import tqdm
from sklearn.neighbors import NearestNeighbors
from sklearn.cluster import KMeans
from collections.abc import Callable
from pathlib import Path
[docs]class DEN(_AbstractDeepClusteringAlgo):
"""
The Deep Embedding Network (DEN) algorithm.
It trains a neural network by optimizing a loss functions consisting of three components.
These are (1) the standrad loss function of the neural netork (e.g. reconstruction loss for autoencoders), (2) the locality-preserving constraint and (3) the group sparsity constraint.
Finally, k-Means is excuted in the resulting embedding.
Parameters
----------
n_clusters : int
number of clusters (default: 8)
group_size : int | list
the number of features in each group. Can also be a list, specifying the size of each group separately. Can be None if embedding_size is specified (default: 2)
n_neighbors : int
the number of nearest-neighbors (including itself) for the locality-preserving constraint. Nearest-neighbors will be calculated by using the Euclidean distance.
If another distance should be used to define the nearest-neighbors, the neighbors can be included in the custom_dataloader as additional_inputs.
In this case, it is expected that the trainloader is composed of: (sample_ids, original_samples, 1st-NNs, 2nd-NNs, ..., (n_neighbors-1)-NNs) (default: 5)
weight_locality_constraint : float
weight alpha for the locality-preserving constraint (default: 0.5)
weight_sparsity_constraint : float
weight beta for the group sparsity constraint (default: 1.)
heat_kernel_t_parameter : float
the t parameter for the heat kernel included in the locality-preserving constraint (default: 1.)
group_lasso_lambda_parameter : float
the lambda parameter for the group lasso included in the group sparsity constraint (default: 1.)
batch_size : int
size of the data batches (default: 256)
pretrain_optimizer_params : dict
parameters of the optimizer for the pretraining of the neural network, includes the learning rate. If None, it will be set to {"lr": 1e-3} (default: None)
pretrain_epochs : int
number of epochs for the pretraining of the neural network (default: 100)
optimizer_class : torch.optim.Optimizer
the optimizer class (default: torch.optim.Adam)
ssl_loss_fn : Callable | torch.nn.modules.loss._Loss
self-supervised learning (ssl) loss function for training the network, e.g. reconstruction loss for autoencoders (default: mean_squared_error)
neural_network : torch.nn.Module | tuple
the input neural network. If None, a new FeedforwardAutoencoder will be created.
Can also be a tuple consisting of the neural network class (torch.nn.Module) and the initialization parameters (dict) (default: None)
neural_network_weights : str | Path
Path to a file containing the state_dict of the neural_network (default: None)
embedding_size : int
size of the embedding within the neural network (default: None)
custom_dataloaders : tuple
tuple consisting of a trainloader (random order) at the first and a test loader (non-random order) at the second position.
Can also be a tuple of strings, where the first entry is the path to a saved trainloader and the second entry the path to a saved testloader.
In this case the dataloaders will be loaded by torch.load(PATH).
If None, the default dataloaders will be used (default: None)
device : torch.device
The device on which to perform the computations.
If device is None then it will be automatically chosen: if a gpu is available the gpu with the highest amount of free memory will be chosen (default: None)
random_state : np.random.RandomState | int
use a fixed random state to get a repeatable solution. Can also be of type int (default: None)
Attributes
----------
labels_ : np.ndarray
The final labels (obtained by KMeans)
cluster_centers_ : np.ndarray
The final cluster centers (obtained by KMeans)
neural_network_trained_ : torch.nn.Module
The final neural network
n_features_in_ : int
the number of features used for the fitting
Examples
----------
>>> from clustpy.data import create_subspace_data
>>> from clustpy.deep import DEN
>>> data, labels = create_subspace_data(1500, subspace_features=(3, 50), random_state=1)
>>> den = DEN(n_clusters=3, pretrain_epochs=3)
>>> den.fit(data)
References
----------
Huang, Peihao, et al. "Deep embedding network for clustering."
2014 22nd International conference on pattern recognition. IEEE, 2014.
"""
def __init__(self, n_clusters: int = 8, group_size : int | list | None = 2, n_neighbors: int = 5, weight_locality_constraint: float = 0.5,
weight_sparsity_constraint: float = 1., heat_kernel_t_parameter: float = 1., group_lasso_lambda_parameter: float = 1.,
batch_size: int = 256, pretrain_optimizer_params: dict = None,
pretrain_epochs: int = 100, optimizer_class: torch.optim.Optimizer = torch.optim.Adam,
ssl_loss_fn: Callable | torch.nn.modules.loss._Loss = mean_squared_error,
neural_network: torch.nn.Module | tuple = None, neural_network_weights: str | Path = None,
embedding_size: int | None = None, custom_dataloaders: tuple = None,
device: torch.device = None, random_state: np.random.RandomState | int = None):
super().__init__(batch_size, neural_network, neural_network_weights, embedding_size, device, random_state)
self.n_clusters = n_clusters
self.group_size = group_size
self.n_neighbors = n_neighbors
self.weight_locality_constraint = weight_locality_constraint
self.weight_sparsity_constraint = weight_sparsity_constraint
self.heat_kernel_t_parameter = heat_kernel_t_parameter
self.group_lasso_lambda_parameter = group_lasso_lambda_parameter
self.pretrain_optimizer_params = pretrain_optimizer_params
self.pretrain_epochs = pretrain_epochs
self.optimizer_class = optimizer_class
self.ssl_loss_fn = ssl_loss_fn
self.custom_dataloaders = custom_dataloaders
def _check_group_size_and_embedding_size(self) -> (list, int):
"""
Check if the values for group_size and embedding_size match.
Returns
-------
tuple : (list, int)
the size of each group,
the embedding size
"""
assert (type(self.group_size) is list and np.sum(self.group_size) == self.embedding_size) or (type(self.group_size) is int and self.group_size * self.n_clusters == self.embedding_size) or (self.group_size is None and self.embedding_size is not None) or (self.embedding_size is None and self.group_size is not None), "Either group_size or embedding_size must be None or group_size must be set in accordance to the embedding size. You set group_size = {0} and embedding_size = {1}".format(self.group_size, self.embedding_size)
if self.embedding_size is None:
group_size = self.group_size
if type(group_size) is int:
group_size = [group_size] * self.n_clusters
assert type(group_size) is list, "group_size must be of type int or list. Your input: {0} / type: {1}".format(group_size, type(group_size))
embedding_size = np.sum(group_size)
else:
assert self.embedding_size >= self.n_clusters, "embedding_size can not be smaller than n_clusters"
embedding_size = self.embedding_size
group_size = np.array([embedding_size // self.n_clusters] * self.n_clusters)
group_size[: embedding_size % self.n_clusters] += 1
assert len(group_size) == self.n_clusters, "group_size must have n_clusters entries"
return group_size, embedding_size
def _locality_preserving_loss(self, batch: list, embedded: torch.Tensor, neural_network: torch.nn.Module, device: torch.device) -> torch.Tensor:
"""
Calculate the DEN locality preserving loss of given embedded samples.
Parameters
----------
batch : list
the minibatch
embedded : torch.Tensor
the embedded samples
neural_network : torch.nn.Module
the neural network
device : torch.device
device to be trained on
Returns
-------
loss : torch.Tensor
the DEN locality preserving loss
"""
locality_preserving_loss = torch.tensor(0.)
samples = batch[1].to(device)
for i in range(self.n_neighbors - 1): # TODO: Maybe use functorch.vmap in the future for vectorization
neighbors = batch[2 + i].to(device)
embedded_neighbor = neural_network.encode(neighbors)
embedded_diff = (embedded - embedded_neighbor).pow(2).sum(1)
orig_diff = (samples - neighbors).pow(2).sum(1)
heat_kernel = torch.exp(-orig_diff / self.heat_kernel_t_parameter)
locality_preserving_loss = locality_preserving_loss + (heat_kernel * embedded_diff).sum()
return locality_preserving_loss / embedded.shape[0]
def _group_sparsity_loss(self, embedded: torch.Tensor, group_size: list) -> torch.Tensor:
"""
Calculate the DEN group sparsity loss of given embedded samples.
Parameters
----------
embedded : torch.Tensor
the embedded samples
group_size : list
the size of each group
Returns
-------
loss : torch.Tensor
the DEN group sparsity loss
"""
group_sparsity_loss = torch.tensor(0.)
group_index = 0
for g in range(self.n_clusters):
group_units = embedded[:, group_index:group_index+group_size[g]]
group_units_length = (group_units.pow(2) + 1e-10).sum(1).sqrt()
group_lasso_loss = self.group_lasso_lambda_parameter * torch.sqrt(torch.tensor(group_size[g])) * group_units_length
group_sparsity_loss = group_sparsity_loss + group_lasso_loss.sum()
# raise group index
group_index += group_size[g]
return group_sparsity_loss / embedded.shape[0]
def _loss(self, batch: list, group_size: list, neural_network: torch.nn.Module, device: torch.device):
"""
Calculate the complete DEN + neural network loss.
Parameters
----------
batch : list
the minibatch
group_size : list
the size of each group
neural_network : torch.nn.Module
the neural network
device : torch.device
device to be trained on
Returns
-------
loss : torch.Tensor
the final DEN loss
"""
# Calculate ssl loss
ssl_loss, embedded, _ = neural_network.loss(batch, self.ssl_loss_fn, device)
# Calculate locality-preserving constraint
locality_preserving_loss = self._locality_preserving_loss(batch, embedded, neural_network, device)
# Calculate group sparsity constraint
group_sparsity_loss = self._group_sparsity_loss(embedded, group_size)
loss = ssl_loss + self.weight_locality_constraint * locality_preserving_loss + self.weight_sparsity_constraint * group_sparsity_loss
return loss
def _get_nearest_neighbors(self, X: np.ndarray) -> list:
"""
Get a list containing the nearest neighbors of each entry in X.
The list contains the actual data points, not the data indices.
Parameters
----------
X : np.ndarray
the given data set
Returns
-------
nearest_neigbors : list
list containing the nearest neighbors of each entry in X
"""
nearest_neigbors = []
neighbors = NearestNeighbors(n_neighbors=self.n_neighbors)
neighbors.fit(X)
nearest_neighbors_ids = neighbors.kneighbors(n_neighbors=self.n_neighbors - 1, return_distance=False)
for i in range(self.n_neighbors - 1):
nearest_neigbors.append(X[nearest_neighbors_ids[:, i]])
return nearest_neigbors
[docs] def fit(self, X: np.ndarray, y: np.ndarray = None) -> 'DEN':
"""
Initiate the actual clustering process on the input data set.
The resulting cluster labels will be stored in the labels_ attribute.
Parameters
----------
X : np.ndarray
the given data set
y : np.ndarray
the labels (can be ignored)
Returns
-------
self : DEN
this instance of the DEN algorithm
"""
assert self.n_neighbors > 0, "n_neigbors must be larger than 0"
X, _, random_state, pretrain_optimizer_params, _, _ = self._check_parameters(X, y=y)
group_size, embedding_size = self._check_group_size_and_embedding_size()
# Get the device to train on and the dataloaders
device = detect_device(self.device)
if self.custom_dataloaders is None:
nearest_neighbors = self._get_nearest_neighbors(X)
trainloader, testloader, _ = get_train_and_test_dataloader(X, self.batch_size, self.custom_dataloaders,
additional_inputs_trainloader=nearest_neighbors if self.custom_dataloaders is None else None)
# Check that the trainloader includes neighbors -> must contain n_neighbors + 1 (the ids) entries
assert len(next(iter(trainloader))) >= self.n_neighbors + 1, "Trainloader does not appear to include any neighbors."
# Get AE
neural_network = get_neural_network(input_dim=X.shape[1], embedding_size=embedding_size,
neural_network=self.neural_network, neural_network_weights=self.neural_network_weights,
device=device, random_state=random_state)
optimizer = self.optimizer_class(neural_network.parameters(), **pretrain_optimizer_params)
# DEN training loop
tbar = tqdm.trange(self.pretrain_epochs, desc="DEN training")
for _ in tbar:
# Update Network
total_loss = 0
for batch in trainloader:
loss = self._loss(batch, group_size, neural_network, device)
total_loss += loss.item()
# Backward pass - update weights
optimizer.zero_grad()
loss.backward()
optimizer.step()
postfix_str = {"Loss": total_loss}
tbar.set_postfix(postfix_str)
# Execute clustering with Kmeans
embedded_data = encode_batchwise(testloader, neural_network)
kmeans = KMeans(n_clusters=self.n_clusters, random_state=random_state)
kmeans.fit(embedded_data)
# Save parameters
self.labels_ = kmeans.labels_
self.cluster_centers_ = kmeans.cluster_centers_
self.neural_network_trained_ = neural_network
self.set_n_featrues_in(X)
return self