Source code for clustpy.deep.dec

"""
@authors:
Lukas Miklautz,
Dominik Mautz,
Collin Leiber
"""

from clustpy.deep._utils import detect_device, encode_batchwise, squared_euclidean_distance, predict_batchwise, \
    set_torch_seed, run_initial_clustering, embedded_kmeans_prediction
from clustpy.deep._data_utils import get_dataloader, augmentation_invariance_check
from clustpy.deep._train_utils import get_trained_autoencoder
import torch
import numpy as np
from sklearn.cluster import KMeans
from sklearn.base import BaseEstimator, ClusterMixin
from sklearn.utils import check_random_state


def _dec(X: np.ndarray, n_clusters: int, alpha: float, batch_size: int, pretrain_optimizer_params: dict,
         clustering_optimizer_params: dict, pretrain_epochs: int, clustering_epochs: int,
         optimizer_class: torch.optim.Optimizer, loss_fn: torch.nn.modules.loss._Loss,
         autoencoder: torch.nn.Module, embedding_size: int, use_reconstruction_loss: bool,
         cluster_loss_weight: float, custom_dataloaders: tuple, augmentation_invariance: bool,
         initial_clustering_class: ClusterMixin,
         initial_clustering_params: dict, random_state: np.random.RandomState) -> (
        np.ndarray, np.ndarray, np.ndarray, np.ndarray, torch.nn.Module):
    """
    Start the actual DEC clustering procedure on the input data set.

    Parameters
    ----------
    X : np.ndarray / torch.Tensor
        the given data set. Can be a np.ndarray or a torch.Tensor
    n_clusters : int
        number of clusters. Can be None if a corresponding initial_clustering_class is given, e.g. DBSCAN
    alpha : float
        alpha value for the prediction
    batch_size : int
        size of the data batches
    pretrain_optimizer_params : dict
        parameters of the optimizer for the pretraining of the autoencoder, includes the learning rate
    clustering_optimizer_params : dict
        parameters of the optimizer for the actual clustering procedure, includes the learning rate
    pretrain_epochs : int
        number of epochs for the pretraining of the autoencoder
    clustering_epochs : int
        number of epochs for the actual clustering procedure
    optimizer_class : torch.optim.Optimizer
        the optimizer
    loss_fn : torch.nn.modules.loss._Loss
         loss function for the reconstruction
    autoencoder : torch.nn.Module
        the input autoencoder. If None a new FeedforwardAutoencoder will be created
    embedding_size : int
        size of the embedding within the autoencoder
    use_reconstruction_loss : bool
        defines whether the reconstruction loss will be used during clustering training
    cluster_loss_weight : float
        weight of the clustering loss compared to the reconstruction loss
    custom_dataloaders : tuple
        tuple consisting of a trainloader (random order) at the first and a test loader (non-random order) at the second position.
        If None, the default dataloaders will be used
    augmentation_invariance : bool
        If True, augmented samples provided in custom_dataloaders[0] will be used to learn 
        cluster assignments that are invariant to the augmentation transformations
    initial_clustering_class : ClusterMixin
        clustering class to obtain the initial cluster labels after the pretraining
    initial_clustering_params : dict
        parameters for the initial clustering class
    random_state : np.random.RandomState
        use a fixed random state to get a repeatable solution

    Returns
    -------
    tuple : (np.ndarray, np.ndarray, np.ndarray, np.ndarray, torch.nn.Module)
        The labels as identified by a final KMeans execution,
        The cluster centers as identified by a final KMeans execution,
        The labels as identified by DEC after the training terminated,
        The cluster centers as identified by DEC after the training terminated,
        The final autoencoder
    """
    device = detect_device()
    if custom_dataloaders is None:
        trainloader = get_dataloader(X, batch_size, True, False)
        testloader = get_dataloader(X, batch_size, False, False)
    else:
        trainloader, testloader = custom_dataloaders
    autoencoder = get_trained_autoencoder(trainloader, pretrain_optimizer_params, pretrain_epochs, device,
                                          optimizer_class, loss_fn, embedding_size, autoencoder)
    # Execute initial clustering in embedded space
    embedded_data = encode_batchwise(testloader, autoencoder, device)
    n_clusters, _, init_centers, _ = run_initial_clustering(embedded_data, n_clusters,
                                                            initial_clustering_class,
                                                            initial_clustering_params, random_state)
    # Setup DEC Module
    dec_module = _DEC_Module(init_centers, alpha, augmentation_invariance).to(device)
    # Use DEC optimizer parameters (usually learning rate is reduced by a magnitude of 10)
    optimizer = optimizer_class(list(autoencoder.parameters()) + list(dec_module.parameters()),
                                **clustering_optimizer_params)
    # DEC Training loop
    dec_module.fit(autoencoder, trainloader, clustering_epochs, device, optimizer, loss_fn,
                   use_reconstruction_loss, cluster_loss_weight)
    # Get labels
    dec_labels = predict_batchwise(testloader, autoencoder, dec_module, device)
    dec_centers = dec_module.centers.detach().cpu().numpy()
    # Do reclustering with Kmeans
    embedded_data = encode_batchwise(testloader, autoencoder, device)
    kmeans = KMeans(n_clusters=n_clusters, random_state=random_state)
    kmeans.fit(embedded_data)
    return kmeans.labels_, kmeans.cluster_centers_, dec_labels, dec_centers, autoencoder


def _dec_predict(centers: torch.Tensor, embedded: torch.Tensor, alpha: float, weights) -> torch.Tensor:
    """
    Predict soft cluster labels given embedded samples.

    Parameters
    ----------
    centers : torch.Tensor
        the cluster centers
    embedded : torch.Tensor
        the embedded samples
    alpha : float
        the alpha value
    weights : torch.Tensor
        feature weights for the squared euclidean distance (default: None)


    Returns
    -------
    prob : torch.Tensor
        The predicted soft labels
    """
    squared_diffs = squared_euclidean_distance(embedded, centers, weights)
    numerator = (1.0 + squared_diffs / alpha).pow(-1.0 * (alpha + 1.0) / 2.0)
    denominator = numerator.sum(1)
    prob = numerator / denominator.unsqueeze(1)
    return prob


def _dec_compression_value(pred_labels: torch.Tensor) -> torch.Tensor:
    """
    Get the DEC compression values.

    Parameters
    ----------
    pred_labels : torch.Tensor
        the predictions of the embedded samples.

    Returns
    -------
    p : torch.Tensor
        The compression values
    """
    soft_freq = pred_labels.sum(0)
    squared_pred = pred_labels.pow(2)
    normalized_squares = squared_pred / soft_freq.unsqueeze(0)
    sum_normalized_squares = normalized_squares.sum(1)
    p = normalized_squares / sum_normalized_squares.unsqueeze(1)
    return p


def _dec_compression_loss_fn(pred_labels: torch.Tensor, target_p: torch.Tensor = None) -> torch.Tensor:
    """
    Calculate the loss of DEC by computing the DEC compression value.

    Parameters
    ----------
    pred_labels : torch.Tensor
        the predictions of the embedded samples.
    target_p : torch.Tensor
        dec_compression_value used as pseudo target labels

    Returns
    -------
    loss : torch.Tensor
        The final loss
    """
    if target_p is None:
        target_p = _dec_compression_value(pred_labels).detach().data
    loss = -1.0 * torch.mean(torch.sum(target_p * torch.log(pred_labels + 1e-8), dim=1))
    return loss


class _DEC_Module(torch.nn.Module):
    """
    The _DEC_Module. Contains most of the algorithm specific procedures like the loss and prediction functions.

    Parameters
    ----------
    init_centers : np.ndarray
        The initial cluster centers
    alpha : double
        alpha value for the prediction method
    augmentation_invariance : bool
        If True, augmented samples provided in will be used to learn 
        cluster assignments that are invariant to the augmentation transformations (default: False)

    Attributes
    ----------
    alpha : float
        the alpha value
    centers : torch.Tensor:
        the cluster centers
    augmentation_invariance : bool
        Is augmentation invariance used
    """

    def __init__(self, init_centers: np.ndarray, alpha: float, augmentation_invariance: bool = False):
        super().__init__()
        self.alpha = alpha
        self.augmentation_invariance = augmentation_invariance
        # Centers are learnable parameters
        self.centers = torch.nn.Parameter(torch.tensor(init_centers), requires_grad=True)

    def predict(self, embedded: torch.Tensor, weights: torch.Tensor = None) -> torch.Tensor:
        """
        Soft prediction of given embedded samples. Returns the corresponding soft labels.

        Parameters
        ----------
        embedded : torch.Tensor
            the embedded samples
        weights : torch.Tensor
            feature weights for the squared euclidean distance within the dec_predict method (default: None)

        Returns
        -------
        pred : torch.Tensor
            The predicted soft labels
        """
        pred = _dec_predict(self.centers, embedded, self.alpha, weights=weights)
        return pred

    def predict_hard(self, embedded: torch.Tensor, weights=None) -> torch.Tensor:
        """
        Hard prediction of the given embedded samples. Returns the corresponding hard labels.
        Uses the soft prediction method and then applies argmax.

        Parameters
        ----------
        embedded : torch.Tensor
            the embedded samples
        weights : torch.Tensor
            feature weights for the squared euclidean distance within the dec_predict method (default: None)

        Returns
        -------
        pred_hard : torch.Tensor
            The predicted hard labels
        """
        pred_hard = self.predict(embedded, weights=weights).argmax(1)
        return pred_hard

    def dec_loss(self, embedded: torch.Tensor, weights: torch.Tensor = None) -> torch.Tensor:
        """
        Calculate the DEC loss of given embedded samples.

        Parameters
        ----------
        embedded : torch.Tensor
            the embedded samples
        weights : torch.Tensor
            feature weights for the squared euclidean distance within the dec_predict method (default: None)

        Returns
        -------
        loss : torch.Tensor
            the final DEC loss
        """
        prediction = _dec_predict(self.centers, embedded, self.alpha, weights=weights)
        loss = _dec_compression_loss_fn(prediction)
        return loss

    def dec_augmentation_invariance_loss(self, embedded: torch.Tensor, embedded_aug: torch.Tensor,
                                         weights: torch.Tensor = None) -> torch.Tensor:
        """
        Calculate the DEC loss of given embedded samples with augmentation invariance.

        Parameters
        ----------
        embedded : torch.Tensor
            the embedded samples
        embedded_aug : torch.Tensor
            the embedded augmented samples
        weights : torch.Tensor
            feature weights for the squared euclidean distance within the dec_predict method (default: None)

        Returns
        -------
        loss : torch.Tensor
            the final DEC loss
        """
        prediction = _dec_predict(self.centers, embedded, self.alpha, weights=weights)
        # Predict pseudo cluster labels with clean samples
        clean_target_p = _dec_compression_value(prediction).detach().data
        # Calculate loss from clean prediction and clean targets
        clean_loss = _dec_compression_loss_fn(prediction, clean_target_p)

        # Predict pseudo cluster labels with augmented samples
        aug_prediction = _dec_predict(self.centers, embedded_aug, self.alpha, weights=weights)
        # Calculate loss from augmented prediction and reused clean targets to enforce that the cluster assignment is invariant against augmentations
        aug_loss = _dec_compression_loss_fn(aug_prediction, clean_target_p)

        # average losses
        loss = (clean_loss + aug_loss) / 2
        return loss

    def _loss(self, batch, autoencoder, cluster_loss_weight, use_reconstruction_loss, loss_fn, device) -> torch.Tensor:
        """
        Calculate the complete DEC + optional Autoencoder loss.

        Parameters
        ----------
        batch : list
            the minibatch
        autoencoder : torch.nn.Module
            the autoencoder
        cluster_loss_weight : float
            weight of the clustering loss compared to the reconstruction loss
        use_reconstruction_loss : bool
            defines whether the reconstruction loss will be used during clustering training
        loss_fn : torch.nn.modules.loss._Loss
            loss function for the reconstruction
        device : torch.device
            device to be trained on

        Returns
        -------
        loss : torch.Tensor
            the final DEC loss
        """
        loss = torch.tensor(0.).to(device)
        # Reconstruction loss is not included in DEC
        if use_reconstruction_loss:
            if self.augmentation_invariance:
                # Convention is that the augmented sample is at the first position and the original one at the second position
                ae_loss, embedded, _ = autoencoder.loss([batch[0], batch[2]], loss_fn, device)
                ae_loss_aug, embedded_aug, _ = autoencoder.loss([batch[0], batch[1]], loss_fn, device)
                loss += ((ae_loss + ae_loss_aug) / 2)
            else:
                ae_loss, embedded, _ = autoencoder.loss(batch, loss_fn, device)
                loss += ae_loss
        else:
            if self.augmentation_invariance:
                aug_data = batch[1].to(device)
                embedded_aug = autoencoder.encode(aug_data)
                orig_data = batch[2].to(device)
                embedded = autoencoder.encode(orig_data)
            else:
                batch_data = batch[1].to(device)
                embedded = autoencoder.encode(batch_data)

        # CLuster loss
        if self.augmentation_invariance:
            cluster_loss = self.dec_augmentation_invariance_loss(embedded, embedded_aug)
        else:
            cluster_loss = self.dec_loss(embedded)
        loss += cluster_loss * cluster_loss_weight

        return loss

    def fit(self, autoencoder: torch.nn.Module, trainloader: torch.utils.data.DataLoader, n_epochs: int,
            device: torch.device, optimizer: torch.optim.Optimizer, loss_fn: torch.nn.modules.loss._Loss,
            use_reconstruction_loss: bool, cluster_loss_weight: float) -> '_DEC_Module':
        """
        Trains the _DEC_Module in place.

        Parameters
        ----------
        autoencoder : torch.nn.Module
            the autoencoder
        trainloader : torch.utils.data.DataLoader
            dataloader to be used for training
        n_epochs : int
            number of epochs for the clustering procedure
        device : torch.device
            device to be trained on
        optimizer : torch.optim.Optimizer
            the optimizer for training
        loss_fn : torch.nn.modules.loss._Loss
            loss function for the reconstruction
        use_reconstruction_loss : bool
            defines whether the reconstruction loss will be used during clustering training
        cluster_loss_weight : float
            weight of the clustering loss compared to the reconstruction loss

        Returns
        -------
        self : _DEC_Module
            this instance of the _DEC_Module
        """
        for _ in range(n_epochs):
            for batch in trainloader:
                loss = self._loss(batch, autoencoder, cluster_loss_weight, use_reconstruction_loss, loss_fn, device)
                # Backward pass
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
        return self


[docs]class DEC(BaseEstimator, ClusterMixin): """ The Deep Embedded Clustering (DEC) algorithm. First, an autoencoder (AE) will be trained (will be skipped if input autoencoder is given). Afterward, KMeans identifies the initial clusters. Last, the AE will be optimized using the DEC loss function. Parameters ---------- n_clusters : int number of clusters. Can be None if a corresponding initial_clustering_class is given, e.g. DBSCAN alpha : float alpha value for the prediction (default: 1.0) batch_size : int size of the data batches (default: 256) pretrain_optimizer_params : dict parameters of the optimizer for the pretraining of the autoencoder, includes the learning rate (default: {"lr": 1e-3}) clustering_optimizer_params : dict parameters of the optimizer for the actual clustering procedure, includes the learning rate (default: {"lr": 1e-4}) pretrain_epochs : int number of epochs for the pretraining of the autoencoder (default: 100) clustering_epochs : int number of epochs for the actual clustering procedure (default: 150) optimizer_class : torch.optim.Optimizer the optimizer class (default: torch.optim.Adam) loss_fn : torch.nn.modules.loss._Loss loss function for the reconstruction (default: torch.nn.MSELoss()) autoencoder : torch.nn.Module the input autoencoder. If None a new FeedforwardAutoencoder will be created (default: None) embedding_size : int size of the embedding within the autoencoder (default: 10) cluster_loss_weight : float weight of the clustering loss compared to the reconstruction loss (default: 1) custom_dataloaders : tuple tuple consisting of a trainloader (random order) at the first and a test loader (non-random order) at the second position. If None, the default dataloaders will be used (default: None) augmentation_invariance : bool If True, augmented samples provided in custom_dataloaders[0] will be used to learn cluster assignments that are invariant to the augmentation transformations (default: False) initial_clustering_class : ClusterMixin clustering class to obtain the initial cluster labels after the pretraining (default: KMeans) initial_clustering_params : dict parameters for the initial clustering class (default: {}) random_state : np.random.RandomState use a fixed random state to get a repeatable solution. Can also be of type int (default: None) Attributes ---------- labels_ : np.ndarray The final labels (obtained by a final KMeans execution) cluster_centers_ : np.ndarray The final cluster centers (obtained by a final KMeans execution) dec_labels_ : np.ndarray The final DEC labels dec_cluster_centers_ : np.ndarray The final DEC cluster centers autoencoder : torch.nn.Module The final autoencoder Examples ---------- >>> from clustpy.data import create_subspace_data >>> from clustpy.deep import DEC >>> data, labels = create_subspace_data(1500, subspace_features=(3, 50), random_state=1) >>> dec = DEC(n_clusters=3, pretrain_epochs=3, clustering_epochs=3) >>> dec.fit(data) References ---------- Xie, Junyuan, Ross Girshick, and Ali Farhadi. "Unsupervised deep embedding for clustering analysis." International conference on machine learning. 2016. """ def __init__(self, n_clusters: int, alpha: float = 1.0, batch_size: int = 256, pretrain_optimizer_params: dict = {"lr": 1e-3}, clustering_optimizer_params: dict = {"lr": 1e-4}, pretrain_epochs: int = 100, clustering_epochs: int = 150, optimizer_class: torch.optim.Optimizer = torch.optim.Adam, loss_fn: torch.nn.modules.loss._Loss = torch.nn.MSELoss(), autoencoder: torch.nn.Module = None, embedding_size: int = 10, cluster_loss_weight: float = 1, custom_dataloaders: tuple = None, augmentation_invariance: bool = False, initial_clustering_class: ClusterMixin = KMeans, initial_clustering_params: dict = {}, random_state: np.random.RandomState = None): self.n_clusters = n_clusters self.alpha = alpha self.batch_size = batch_size self.pretrain_optimizer_params = pretrain_optimizer_params self.clustering_optimizer_params = clustering_optimizer_params self.pretrain_epochs = pretrain_epochs self.clustering_epochs = clustering_epochs self.optimizer_class = optimizer_class self.loss_fn = loss_fn self.autoencoder = autoencoder self.embedding_size = embedding_size self.cluster_loss_weight = cluster_loss_weight self.custom_dataloaders = custom_dataloaders self.augmentation_invariance = augmentation_invariance self.initial_clustering_class = initial_clustering_class self.initial_clustering_params = initial_clustering_params self.random_state = check_random_state(random_state) self.use_reconstruction_loss = False set_torch_seed(self.random_state)
[docs] def fit(self, X: np.ndarray, y: np.ndarray = None) -> 'DEC': """ Initiate the actual clustering process on the input data set. The resulting cluster labels will be stored in the labels_ attribute. Parameters ---------- X : np.ndarray the given data set y : np.ndarray the labels (can be ignored) Returns ------- self : DEC this instance of the DEC algorithm """ augmentation_invariance_check(self.augmentation_invariance, self.custom_dataloaders) kmeans_labels, kmeans_centers, dec_labels, dec_centers, autoencoder = _dec(X, self.n_clusters, self.alpha, self.batch_size, self.pretrain_optimizer_params, self.clustering_optimizer_params, self.pretrain_epochs, self.clustering_epochs, self.optimizer_class, self.loss_fn, self.autoencoder, self.embedding_size, self.use_reconstruction_loss, self.cluster_loss_weight, self.custom_dataloaders, self.augmentation_invariance, self.initial_clustering_class, self.initial_clustering_params, self.random_state) self.labels_ = kmeans_labels self.cluster_centers_ = kmeans_centers self.dec_labels_ = dec_labels self.dec_cluster_centers_ = dec_centers self.autoencoder = autoencoder return self
[docs] def predict(self, X: np.ndarray) -> np.ndarray: """ Predicts the labels of the input data. Parameters ---------- X : np.ndarray input data Returns ------- predicted_labels : np.ndarray The predicted labels """ dataloader = get_dataloader(X, self.batch_size, False, False) predicted_labels = embedded_kmeans_prediction(dataloader, self.cluster_centers_, self.autoencoder) return predicted_labels
[docs]class IDEC(DEC): """ The Improved Deep Embedded Clustering (IDEC) algorithm. Is equal to the DEC algorithm but uses the reconstruction loss also during the clustering optimization. Further, cluster_loss_weight is set to 0.1 instead of 1 when using the default settings. Parameters ---------- n_clusters : int number of clusters. Can be None if a corresponding initial_clustering_class is given, e.g. DBSCAN alpha : float alpha value for the prediction (default: 1.0) batch_size : int size of the data batches (default: 256) pretrain_optimizer_params : dict parameters of the optimizer for the pretraining of the autoencoder, includes the learning rate (default: {"lr": 1e-3}) clustering_optimizer_params : dict parameters of the optimizer for the actual clustering procedure, includes the learning rate (default: {"lr": 1e-4}) pretrain_epochs : int number of epochs for the pretraining of the autoencoder (default: 100) clustering_epochs : int number of epochs for the actual clustering procedure (default: 150) optimizer_class : torch.optim.Optimizer the optimizer class (default: torch.optim.Adam) loss_fn : torch.nn.modules.loss._Loss loss function for the reconstruction (default: torch.nn.MSELoss()) autoencoder : torch.nn.Module the input autoencoder. If None a new FeedforwardAutoencoder will be created (default: None) embedding_size : int size of the embedding within the autoencoder (default: 10) cluster_loss_weight : float weight of the clustering loss compared to the reconstruction loss (default: 0.1) custom_dataloaders : tuple tuple consisting of a trainloader (random order) at the first and a test loader (non-random order) at the second position. If None, the default dataloaders will be used (default: None) augmentation_invariance : bool If True, augmented samples provided in custom_dataloaders[0] will be used to learn cluster assignments that are invariant to the augmentation transformations (default: False) initial_clustering_class : ClusterMixin clustering class to obtain the initial cluster labels after the pretraining (default: KMeans) initial_clustering_params : dict parameters for the initial clustering class (default: {}) random_state : np.random.RandomState use a fixed random state to get a repeatable solution. Can also be of type int (default: None) Attributes ---------- labels_ : np.ndarray The final labels (obtained by a final KMeans execution) cluster_centers_ : np.ndarray The final cluster centers (obtained by a final KMeans execution) dec_labels_ : np.ndarray The final DEC labels dec_cluster_centers_ : np.ndarray The final DEC cluster centers autoencoder : torch.nn.Module The final autoencoder Examples ---------- >>> from clustpy.data import create_subspace_data >>> from clustpy.deep import IDEC >>> data, labels = create_subspace_data(1500, subspace_features=(3, 50), random_state=1) >>> idec = IDEC(n_clusters=3, pretrain_epochs=3, clustering_epochs=3) >>> idec.fit(data) References ---------- Guo, Xifeng, et al. "Improved deep embedded clustering with local structure preservation." IJCAI. 2017. """ def __init__(self, n_clusters: int, alpha: float = 1.0, batch_size: int = 256, pretrain_optimizer_params: dict = {"lr": 1e-3}, clustering_optimizer_params: dict = {"lr": 1e-4}, pretrain_epochs: int = 100, clustering_epochs: int = 150, optimizer_class: torch.optim.Optimizer = torch.optim.Adam, loss_fn: torch.nn.modules.loss._Loss = torch.nn.MSELoss(), autoencoder: torch.nn.Module = None, embedding_size: int = 10, cluster_loss_weight: float = 0.1, custom_dataloaders: tuple = None, augmentation_invariance: bool = False, initial_clustering_class: ClusterMixin = KMeans, initial_clustering_params: dict = {}, random_state: np.random.RandomState = None): super().__init__(n_clusters, alpha, batch_size, pretrain_optimizer_params, clustering_optimizer_params, pretrain_epochs, clustering_epochs, optimizer_class, loss_fn, autoencoder, embedding_size, cluster_loss_weight, custom_dataloaders, augmentation_invariance, initial_clustering_class, initial_clustering_params, random_state) self.use_reconstruction_loss = True