Source code for clustpy.deep.dcn

"""
@authors:
Lukas Miklautz,
Dominik Mautz
"""

from clustpy.deep._utils import detect_device, encode_batchwise, squared_euclidean_distance, predict_batchwise, \
    set_torch_seed
from clustpy.deep._data_utils import get_dataloader
from clustpy.deep._train_utils import get_trained_autoencoder
import torch
import numpy as np
from sklearn.cluster import KMeans
from sklearn.base import BaseEstimator, ClusterMixin
from sklearn.utils import check_random_state


def _dcn(X: np.ndarray, n_clusters: int, batch_size: int, pretrain_learning_rate: float,
         clustering_learning_rate: float, pretrain_epochs: int,
         clustering_epochs: int, optimizer_class: torch.optim.Optimizer, loss_fn: torch.nn.modules.loss._Loss,
         autoencoder: torch.nn.Module, embedding_size: int, degree_of_space_distortion: float,
         degree_of_space_preservation: float, random_state: np.random.RandomState) -> (
        np.ndarray, np.ndarray, np.ndarray, np.ndarray, torch.nn.Module):
    """
    Start the actual DCN clustering procedure on the input data set.

    Parameters
    ----------
    X : np.ndarray / torch.Tensor
        the given data set. Can be a np.ndarray or a torch.Tensor
    n_clusters : int
        number of clusters
    batch_size : int
        size of the data batches
    pretrain_learning_rate : float
        learning rate for the pretraining of the autoencoder
    clustering_learning_rate : float
        learning rate of the actual clustering procedure
    pretrain_epochs : int
        number of epochs for the pretraining of the autoencoder
    clustering_epochs : int
        number of epochs for the actual clustering procedure
    optimizer_class : torch.optim.Optimizer
        the optimizer class
    loss_fn : torch.nn.modules.loss._Loss
         loss function for the reconstruction
    autoencoder : torch.nn.Module
        the input autoencoder. If None a new FlexibleAutoencoder will be created
    embedding_size : int
        size of the embedding within the autoencoder
    degree_of_space_distortion : float
        weight of the reconstruction loss
    degree_of_space_preservation : float
        weight of the clustering loss
    random_state : np.random.RandomState
        use a fixed random state to get a repeatable solution

    Returns
    -------
    tuple : (np.ndarray, np.ndarray, np.ndarray, np.ndarray, torch.nn.Module)
        The labels as identified by a final KMeans execution,
        The cluster centers as identified by a final KMeans execution,
        The labels as identified by DCN after the training terminated,
        The cluster centers as identified by DCN after the training terminated,
        The final autoencoder
    """
    device = detect_device()
    trainloader = get_dataloader(X, batch_size, True, False)
    testloader = get_dataloader(X, batch_size, False, False)
    autoencoder = get_trained_autoencoder(trainloader, pretrain_learning_rate, pretrain_epochs, device,
                                          optimizer_class, loss_fn, X.shape[1], embedding_size, autoencoder)
    # Execute kmeans in embedded space
    embedded_data = encode_batchwise(testloader, autoencoder, device)
    kmeans = KMeans(n_clusters=n_clusters, random_state=random_state)
    kmeans.fit(embedded_data)
    init_centers = kmeans.cluster_centers_
    # Setup DCN Module
    dcn_module = _DCN_Module(init_centers).to_device(device)
    # Use DCN learning_rate (usually pretrain_learning_rate reduced by a magnitude of 10)
    optimizer = optimizer_class(list(autoencoder.parameters()), lr=clustering_learning_rate)
    # DEC Training loop
    dcn_module.fit(autoencoder, trainloader, clustering_epochs, device, optimizer, loss_fn,
                   degree_of_space_distortion, degree_of_space_preservation)
    # Get labels
    dcn_labels = predict_batchwise(testloader, autoencoder, dcn_module, device)
    dcn_centers = dcn_module.centers.detach().cpu().numpy()
    # Do reclustering with Kmeans
    embedded_data = encode_batchwise(testloader, autoencoder, device)
    kmeans = KMeans(n_clusters=n_clusters, random_state=random_state)
    kmeans.fit(embedded_data)
    return kmeans.labels_, kmeans.cluster_centers_, dcn_labels, dcn_centers, autoencoder


def _compute_centroids(centers: torch.Tensor, embedded: torch.Tensor, count: torch.Tensor, labels: torch.Tensor) -> (
        torch.Tensor, torch.Tensor):
    """
    Update the centers and amount of object ever assigned to a center.

    New center is calculated by (see Eq. 8 in the paper):
    center - eta (center - embedded[i])
    => center - eta * center + eta * embedded[i]
    => (1 - eta) center + eta * embedded[i]

    Parameters
    ----------
    centers : torch.Tensor
        The current cluster centers
    embedded : torch.Tensor
        The embedded samples
    count : torch.Tensor
        The total amount of objects that ever got assigned to a cluster. Affects the learning rate of the center update
    labels : torch.Tensor
        The current hard labels

    Returns
    -------
    centers, count : (torch.Tensor, torch.Tensor)
        The updated centers and the updated counts
    """
    for i in range(embedded.shape[0]):
        c = labels[i].item()
        count[c] += 1
        eta = 1.0 / count[c].item()
        centers[c] = (1 - eta) * centers[c] + eta * embedded[i]
    return centers, count


class _DCN_Module(torch.nn.Module):
    """
    The _DCN_Module. Contains most of the algorithm specific procedures like the loss and prediction functions.

    Parameters
    ----------
    init_np_centers : np.ndarray
        The initial numpy centers

    Attributes
    ----------
    centers : torch.Tensor
        the cluster centers
    """

    def __init__(self, init_np_centers: np.ndarray):
        super().__init__()
        self.centers = torch.tensor(init_np_centers)

    def dcn_loss(self, embedded: torch.Tensor, weights: torch.Tensor = None) -> torch.Tensor:
        """
        Calculate the DCN loss of given embedded samples.

        Parameters
        ----------
        embedded : torch.Tensor
            the embedded samples
        weights : torch.Tensor
            feature weights for the squared euclidean distance (default: None)

        Returns
        -------
        loss: torch.Tensor
            the final DCN loss
        """
        dist = squared_euclidean_distance(embedded, self.centers, weights=weights)
        loss = (dist.min(dim=1)[0]).mean()
        return loss

    def predict_hard(self, embedded: torch.Tensor, weights: torch.Tensor = None) -> torch.Tensor:
        """
        Hard prediction of the given embedded samples. Returns the corresponding hard labels.
        Uses the minimum squared euclidean distance to the cluster centers to get the labels.

        Parameters
        ----------
        embedded : torch.Tensor
            the embedded samples
        weights : torch.Tensor
            feature weights for the squared euclidean distance (default: None)

        Returns
        -------
        labels : torch.Tensor
            the final labels
        """
        dist = squared_euclidean_distance(embedded, self.centers, weights=weights)
        labels = (dist.min(dim=1)[1])
        return labels

    def update_centroids(self, embedded: torch.Tensor, count: torch.Tensor, labels: torch.Tensor) -> torch.Tensor:
        """
        Update the cluster centers of the _DCN_Module.

        Parameters
        ----------
        embedded : torch.Tensor
            the embedded samples
        count : torch.Tensor
            The total amount of objects that ever got assigned to a cluster. Affects the learning rate of the center update
        labels : torch.Tensor
            The current hard labels

        Returns
        -------
        count : torch.Tensor
            The new amount of objects that ever got assigned to a cluster
        """
        self.centers, count = _compute_centroids(self.centers, embedded, count, labels)
        return count

    def to_device(self, device: torch.device) -> '_DCN_Module':
        """
        Move the _DCN_Module and the cluster centers to the specified device (cpu or cuda).

        Parameters
        ----------
        device : torch.device
            device to be trained on

        Returns
        -------
        self : _DCN_Module
            this instance of the _DCN_Module
        """
        self.centers = self.centers.to(device)
        self.to(device)
        return self

    def fit(self, autoencoder: torch.nn.Module, trainloader: torch.utils.data.DataLoader, n_epochs: int,
            device: torch.device, optimizer: torch.optim.Optimizer, loss_fn: torch.nn.modules.loss._Loss,
            degree_of_space_distortion: float, degree_of_space_preservation: float) -> '_DCN_Module':
        """
        Trains the _DCN_Module in place.

        Parameters
        ----------
        autoencoder : torch.nn.Module
            the autoencoder
        trainloader : torch.utils.data.DataLoader
            dataloader to be used for training
        n_epochs : int
            number of epochs for the clustering procedure
        device : torch.device
            device to be trained on
        optimizer : torch.optim.Optimizer
            the optimizer for training
        loss_fn : torch.nn.modules.loss._Loss
            loss function for the reconstruction
        degree_of_space_distortion : float
            weight of the clustering loss
        degree_of_space_preservation : float
            weight of the reconstruction loss

        Returns
        -------
        self : _DCN_Module
            this instance of the _DCN_Module
        """
        # DCN training loop
        # Init for count from original DCN code (not reported in Paper)
        # This means centroid learning rate at the beginning is scaled by a hundred
        count = torch.ones(self.centers.shape[0], dtype=torch.int32) * 100
        for _ in range(n_epochs):
            # Update Network
            for batch in trainloader:
                batch_data = batch[1].to(device)
                embedded = autoencoder.encode(batch_data)
                reconstruction = autoencoder.decode(embedded)
                # compute reconstruction loss
                ae_loss = loss_fn(batch_data, reconstruction)
                # compute cluster loss
                cluster_loss = self.dcn_loss(embedded)
                # compute total loss
                loss = degree_of_space_preservation * ae_loss + 0.5 * degree_of_space_distortion * cluster_loss
                # Backward pass - update weights
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
            # Update Assignments and Centroids
            with torch.no_grad():
                for batch in trainloader:
                    batch_data = batch[1].to(device)
                    embedded = autoencoder.encode(batch_data)

                    ## update centroids [on gpu] About 40 seconds for 1000 iterations
                    ## No overhead from loading between gpu and cpu
                    # count = cluster_module.update_centroid(embedded, count, s)

                    # update centroids [on cpu] About 30 Seconds for 1000 iterations
                    # with additional overhead from loading between gpu and cpu
                    embedded = embedded.cpu()
                    self.centers = self.centers.cpu()

                    # update assignments
                    labels = self.predict_hard(embedded)

                    # update centroids
                    count = self.update_centroids(embedded, count.cpu(), labels.cpu())
                    # count = count.to(device)
                    self.centers = self.centers.to(device)
        return self


[docs]class DCN(BaseEstimator, ClusterMixin): """ The Deep Clustering Network (DCN) algorithm. First, an autoencoder (AE) will be trained (will be skipped if input autoencoder is given). Afterwards, KMeans identifies the initial clusters. Last, the AE will be optimized using the DCN loss function. Parameters ---------- n_clusters : int number of clusters batch_size : int size of the data batches (default: 256) pretrain_learning_rate : float learning rate for the pretraining of the autoencoder (default: 1e-3) clustering_learning_rate : float learning rate of the actual clustering procedure (default: 1e-4) pretrain_epochs : int number of epochs for the pretraining of the autoencoder (default: 100) clustering_epochs : int number of epochs for the actual clustering procedure (default: 150) optimizer_class : torch.optim.Optimizer the optimizer class (default: torch.optim.Adam) loss_fn : torch.nn.modules.loss._Loss loss function for the reconstruction (default: torch.nn.MSELoss()) degree_of_space_distortion : float weight of the clustering loss (default: 0.05) degree_of_space_preservation : float weight of the reconstruction loss (default: 1.0) autoencoder : torch.nn.Module the input autoencoder. If None a new FlexibleAutoencoder will be created (default: None) embedding_size : int size of the embedding within the autoencoder (default: 10) random_state : np.random.RandomState use a fixed random state to get a repeatable solution. Can also be of type int (default: None) Attributes ---------- labels_ : np.ndarray The final labels (obtained by a final KMeans execution) cluster_centers_ : np.ndarray The final cluster centers (obtained by a final KMeans execution) dcn_labels_ : np.ndarray The final DCN labels dcn_cluster_centers_ : np.ndarray The final DCN cluster centers autoencoder : torch.nn.Module The final autoencoder Examples ---------- from clustpy.data import load_mnist from clustpy.deep import DCN data, labels = load_mnist() dcn = DCN(n_clusters=10) dcn.fit(data) References ---------- Yang, Bo, et al. "Towards k-means-friendly spaces: Simultaneous deep learning and clustering." international conference on machine learning. PMLR, 2017. """ def __init__(self, n_clusters: int, batch_size: int = 256, pretrain_learning_rate: float = 1e-3, clustering_learning_rate: float = 1e-4, pretrain_epochs: int = 100, clustering_epochs: int = 150, optimizer_class: torch.optim.Optimizer = torch.optim.Adam, loss_fn: torch.nn.modules.loss._Loss = torch.nn.MSELoss(), degree_of_space_distortion: float = 0.05, degree_of_space_preservation: float = 1.0, autoencoder: torch.nn.Module = None, embedding_size: int = 10, random_state: np.random.RandomState = None): self.n_clusters = n_clusters self.batch_size = batch_size self.pretrain_learning_rate = pretrain_learning_rate self.clustering_learning_rate = clustering_learning_rate self.pretrain_epochs = pretrain_epochs self.clustering_epochs = clustering_epochs self.optimizer_class = optimizer_class self.loss_fn = loss_fn self.degree_of_space_distortion = degree_of_space_distortion self.degree_of_space_preservation = degree_of_space_preservation self.autoencoder = autoencoder self.embedding_size = embedding_size self.random_state = check_random_state(random_state) set_torch_seed(self.random_state)
[docs] def fit(self, X: np.ndarray, y: np.ndarray = None) -> 'DCN': """ Initiate the actual clustering process on the input data set. The resulting cluster labels will be stored in the labels_ attribute. Parameters ---------- X : np.ndarray the given data set y : np.ndarray the labels (can be ignored) Returns ------- self : DCN this instance of the DCN algorithm """ kmeans_labels, kmeans_centers, dcn_labels, dcn_centers, autoencoder = _dcn(X, self.n_clusters, self.batch_size, self.pretrain_learning_rate, self.clustering_learning_rate, self.pretrain_epochs, self.clustering_epochs, self.optimizer_class, self.loss_fn, self.autoencoder, self.embedding_size, self.degree_of_space_distortion, self.degree_of_space_preservation, self.random_state) self.labels_ = kmeans_labels self.cluster_centers_ = kmeans_centers self.dcn_labels_ = dcn_labels self.dcn_cluster_centers_ = dcn_centers self.autoencoder = autoencoder return self