Source code for clustpy.deep._utils

from sklearn.base import ClusterMixin
import inspect
import torch
from itertools import islice
import numpy as np
import random
from sklearn.metrics.pairwise import pairwise_distances_argmin_min
import os
import subprocess


[docs]def set_torch_seed(random_state: np.random.RandomState | int) -> None: """ Set the random state for torch applications. Parameters ---------- random_state : np.random.RandomState | int use a fixed random state or an integer to get a repeatable solution """ if type(random_state) is int: seed = random_state elif type(random_state) is np.random.RandomState: seed = random_state.randint(np.iinfo(np.int32).max) else: raise ValueError("random_state must be of type int or np.random.RandomState") torch.manual_seed(seed) torch.cuda.manual_seed(seed) torch.cuda.manual_seed_all(seed) np.random.seed(seed) random.seed(seed)
def squared_euclidean_distance(tensor1: torch.Tensor, tensor2: torch.Tensor, weights: torch.Tensor = None) -> torch.Tensor: """ Calculate the pairwise squared Euclidean distance between two tensors. Each row in the tensors is interpreted as a separate object, while each column represents its features. Therefore, the result of an (4x3) and (12x3) tensor will be a (4x12) tensor. Optionally, features can be individually weighted. The default behavior is that all features are weighted by 1. Parameters ---------- tensor1 : torch.Tensor the first tensor tensor2 : torch.Tensor the second tensor weights : torch.Tensor tensor containing the weights of the features (default: None) Returns ------- squared_diffs : torch.Tensor the pairwise squared Euclidean distances """ assert tensor1.shape[1] == tensor2.shape[1], "The number of features of the two input tensors must match." ta = tensor1.unsqueeze(1) tb = tensor2.unsqueeze(0) squared_diffs = (ta - tb) if weights is not None: assert tensor1.shape[1] == weights.shape[0] weights_unsqueezed = weights.unsqueeze(0).unsqueeze(1) squared_diffs = squared_diffs * weights_unsqueezed squared_diffs = squared_diffs.pow(2).sum(2) return squared_diffs
[docs]def detect_device(device: torch.device | int | str = None) -> torch.device: """ Automatically detects if you have a cuda enabled GPU. Device can also be read from environment variable "CLUSTPY_DEVICE". It can be set using, e.g., os.environ["CLUSTPY_DEVICE"] = "cuda:1" Parameters ---------- device : torch.device | int | str the input device. Will be returned if it is not None (default: None) Returns ------- device : torch.device device on which the prediction should take place """ if device == -1: # Special case device = torch.device('cpu') elif device is None: env_device = os.environ.get("CLUSTPY_DEVICE", None) # Check if environment device is None - in that case CLUSTPY_DEVICE has not been specified if env_device is None: if torch.cuda.is_available(): # Try to automatically identify best GPU try: shell_output = (subprocess.check_output("nvidia-smi -q -d Utilization |grep Memory", shell=True)).decode('utf-8')[:-1] entries = shell_output.split("\n")[::2] used_memory = [int(e.split(":")[1].replace(" %", "")) for e in entries] device = torch.device("cuda:{0}".format(np.argmin(used_memory))) print(device, "was automatically chosen as device for the computation.") except Exception: # Default: Use first available GPU device = torch.device('cuda') else: device = torch.device('cpu') else: device = torch.device(env_device) elif type(device) is int or type(device) is str: device = torch.device(device) return device
[docs]def get_device_from_module(neural_network: torch.nn.Module) -> torch.device: """ Get the device from a given module. Parameters ---------- neural_network : torch.nn.Module the neural network that is used for the encoding (e.g. an autoencoder) Returns ------- device : torch.device device of the module """ example_param = next(neural_network.parameters()) if example_param.is_cuda: device = torch.device('cuda:' + str(example_param.get_device())) else: device = torch.device('cpu') return device
[docs]def encode_batchwise(dataloader: torch.utils.data.DataLoader, neural_network: torch.nn.Module) -> np.ndarray: """ Utility function for embedding the whole data set in a mini-batch fashion Parameters ---------- dataloader : torch.utils.data.DataLoader dataloader to be used neural_network : torch.nn.Module the neural network that is used for the encoding (e.g. an autoencoder) Returns ------- embeddings_numpy : np.ndarray The embedded data set """ device = get_device_from_module(neural_network) embeddings = [] for batch in dataloader: batch_data = batch[1].to(device) embedded_data = neural_network.encode(batch_data) # In case encode() returns more than one value (e.g., for a variational autoencoder), we will pick the first if type(embedded_data) is tuple: embedded_data = embedded_data[0] embeddings.append(embedded_data.detach().cpu()) embeddings_numpy = torch.cat(embeddings, dim=0).numpy() return embeddings_numpy
[docs]def decode_batchwise(dataloader: torch.utils.data.DataLoader, neural_network: torch.nn.Module) -> np.ndarray: """ Utility function for decoding the whole data set in a mini-batch fashion, e.g., with an autoencoder. Note: Assumes an implemented decode function Parameters ---------- dataloader : torch.utils.data.DataLoader dataloader to be used neural_network : torch.nn.Module the neural network that is used for the decoding (e.g. an autoencoder) device : torch.device device to be trained on Returns ------- reconstructions_numpy : np.ndarray The reconstructed data set """ device = get_device_from_module(neural_network) reconstructions = [] for batch in dataloader: batch_data = batch[1].to(device) embedded_data = neural_network.encode(batch_data) # In case encode() returns more than one value (e.g., for a variational autoencoder), we all of them will be used for decoding if type(embedded_data) is tuple: decoded_data = neural_network.decode(*embedded_data) else: decoded_data = neural_network.decode(embedded_data) reconstructions.append(decoded_data.detach().cpu()) reconstructions_numpy = torch.cat(reconstructions, dim=0).numpy() return reconstructions_numpy
[docs]def encode_decode_batchwise(dataloader: torch.utils.data.DataLoader, neural_network: torch.nn.Module) -> ( np.ndarray, np.ndarray): """ Utility function for encoding and decoding the whole data set in a mini-batch fashion, e.g., with an autoencoder. Note: Assumes an implemented decode function Parameters ---------- dataloader : torch.utils.data.DataLoader dataloader to be used neural_network : torch.nn.Module the neural network that is used for the encoding and decoding (e.g. an autoencoder) Returns ------- tuple : (np.ndarray, np.ndarray) The embedded data set, The reconstructed data set """ device = get_device_from_module(neural_network) embeddings = [] reconstructions = [] for batch in dataloader: batch_data = batch[1].to(device) embedding = neural_network.encode(batch_data) embeddings.append(embedding.detach().cpu()) reconstructions.append(neural_network.decode(embedding).detach().cpu()) embeddings_numpy = torch.cat(embeddings, dim=0).numpy() reconstructions_numpy = torch.cat(reconstructions, dim=0).numpy() return embeddings_numpy, reconstructions_numpy
[docs]def predict_batchwise(dataloader: torch.utils.data.DataLoader, neural_network: torch.nn.Module, cluster_module: torch.nn.Module) -> np.ndarray: """ Utility function for predicting the cluster labels over the whole data set in a mini-batch fashion. Method calls the predict_hard method of the cluster_module for each batch of data. Parameters ---------- dataloader : torch.utils.data.DataLoader dataloader to be used neural_network : torch.nn.Module the neural network that is used for the encoding (e.g. an autoencoder) cluster_module : torch.nn.Module the cluster module that is used for the encoding (e.g. DEC). Usually contains the predict method. Returns ------- predictions_numpy : np.ndarray The predictions of the cluster_module for the data set """ device = get_device_from_module(neural_network) predictions = [] for batch in dataloader: batch_data = batch[1].to(device) prediction = cluster_module.predict_hard(neural_network.encode(batch_data)).detach().cpu() predictions.append(prediction) predictions_numpy = torch.cat(predictions, dim=0).numpy() return predictions_numpy
# def add_noise(batch): # mask = torch.empty( # batch.shape, device=batch.device).bernoulli_(0.8) # return batch * mask def int_to_one_hot(int_tensor: torch.Tensor, n_integers: int) -> torch.Tensor: """ Convert a tensor containing integers (e.g. labels) to an one hot encoding. Here, each integer gets its own features in the resulting tensor, where only the values 0 or 1 are accepted. E.g. [0,0,1,2,1] gets [[1,0,0], [1,0,0], [0,1,0], [0,0,1], [0,1,0]] Parameters ---------- int_tensor : torch.Tensor The original tensor containing integers n_integers : int The number of different integers within int_tensor Returns ------- onehot : torch.Tensor The final one hot encoding tensor """ onehot = torch.zeros([int_tensor.shape[0], n_integers], dtype=torch.float, device=int_tensor.device) onehot.scatter_(1, int_tensor.unsqueeze(1).long(), 1) return onehot def embedded_kmeans_prediction(X_embed: np.ndarray, cluster_centers: np.ndarray) -> np.ndarray: """ Predicts the labels of the given embedded data. Labels correspond to the id of the closest cluster center. Parameters ---------- X_embed : np.ndarray dataloader to be used cluster_centers : np.ndarray input cluster centers Returns ------- predicted_labels : np.ndarray The predicted labels """ predicted_labels, _ = pairwise_distances_argmin_min(X=X_embed, Y=cluster_centers, metric='euclidean', metric_kwargs={'squared': True}) predicted_labels = predicted_labels.astype(np.int32) return predicted_labels def run_initial_clustering(X: np.ndarray, n_clusters: int, clustering_class: ClusterMixin, clustering_params: dict, random_state: np.random.RandomState) -> (int, np.ndarray, np.ndarray, ClusterMixin): """ Get an initial clustering result for a deep clustering algorithm. This result can then be refined by the optimization of the neural network. Parameters ---------- X : np.ndarray the embedded data set n_clusters : int number of clusters. Can be None if a corresponding initial_clustering_class is given, e.g. DBSCAN clustering_class : ClusterMixin the class of the initial clustering algorithm. If it is None, random labels will be chosen clustering_params : dict the parameters for the initial clustering algorithm random_state : np.random.RandomState use a fixed random state to get a repeatable solution Returns ------- tuple : (int, np.ndarray, np.ndarray, ClusterMixin) The number of clusters (can change if e.g. DBSCAN is used), The initial cluster labels, The initial cluster centers, The clustering object """ if clustering_class is None: clustering_algo = ClusterMixin() clustering_algo.labels_ = np.random.randint(n_clusters, size=X.shape[0]) else: # Get possible input parameters of the clustering algorithm clustering_class_parameters = inspect.getfullargspec(clustering_class).args + inspect.getfullargspec( clustering_class).kwonlyargs # Check if n_clusters or n_components is contained in the possible parameters if "n_clusters" in clustering_class_parameters: if "random_state" in clustering_class_parameters and "random_state" not in clustering_params.keys(): clustering_algo = clustering_class(n_clusters=n_clusters, random_state=random_state, **clustering_params) else: clustering_algo = clustering_class(n_clusters=n_clusters, **clustering_params) elif "n_components" in clustering_class_parameters: # in case of GMM if "random_state" in clustering_class_parameters and "random_state" not in clustering_params.keys(): clustering_algo = clustering_class(n_components=n_clusters, random_state=random_state, **clustering_params) else: clustering_algo = clustering_class(n_components=n_clusters, **clustering_params) else: # in case of e.g., DBSCAN if "random_state" in clustering_class_parameters and "random_state" not in clustering_params.keys(): clustering_algo = clustering_class(random_state=random_state, **clustering_params) else: clustering_algo = clustering_class(**clustering_params) # Run algorithm clustering_algo.fit(X) # Check if clustering algorithm return cluster centers if hasattr(clustering_algo, "cluster_centers_"): labels = clustering_algo.labels_ centers = clustering_algo.cluster_centers_ elif hasattr(clustering_algo, "means_"): # in case of GMM labels = clustering_algo.predict(X) centers = clustering_algo.means_ else: # in case of e.g., DBSCAN labels = clustering_algo.labels_ centers = np.array([np.mean(X[labels == i], axis=0) for i in np.unique(labels) if i >= 0]) n_clusters = np.sum(np.unique(labels) >= 0) # Needed for DBSCAN, XMeans, GMeans, ... return n_clusters, labels, centers, clustering_algo