Source code for clustpy.deep._utils

from sklearn.base import ClusterMixin
import inspect
import torch
import numpy as np
import random
import os
import subprocess


[docs]def set_torch_seed(random_state: np.random.RandomState | int) -> None: """ Set the random state for torch applications. Parameters ---------- random_state : np.random.RandomState | int use a fixed random state or an integer to get a repeatable solution """ if type(random_state) is int: seed = random_state elif type(random_state) is np.random.RandomState: seed = random_state.randint(np.iinfo(np.int32).max) else: raise ValueError("random_state must be of type int or np.random.RandomState") torch.manual_seed(seed) torch.cuda.manual_seed(seed) torch.cuda.manual_seed_all(seed) np.random.seed(seed) random.seed(seed)
[docs]def mean_squared_error(tensor1: torch.Tensor, tensor2: torch.Tensor, weights: torch.Tensor = None) -> torch.Tensor: """ Calculate the mean squared error between two tensors. Each row in the tensors is interpreted as a separate object, while each column represents its features. Optionally, features can be individually weighted. The default behavior is that all features are weighted by 1. Parameters ---------- tensor1 : torch.Tensor the first tensor tensor2 : torch.Tensor the second tensor weights : torch.Tensor tensor containing the weights of the features (default: None) Returns ------- mse : torch.Tensor the mean squared error """ assert tensor1.shape == tensor2.shape, "The two input tensors must have the same shape." diffs = tensor1 - tensor2 if weights is not None: assert tensor1.shape[1] == weights.shape[0], "The weight tensor must have one entry for each feature" diffs = diffs * weights mse = diffs.pow(2).sum() / tensor1.shape[0] return mse
def squared_euclidean_distance(tensor1: torch.Tensor, tensor2: torch.Tensor, weights: torch.Tensor = None) -> torch.Tensor: """ Calculate the pairwise squared Euclidean distance between two tensors. Each row in the tensors is interpreted as a separate object, while each column represents its features. Therefore, the result of an (4x3) and (12x3) tensor will be a (4x12) tensor. Optionally, features can be individually weighted. The default behavior is that all features are weighted by 1. Parameters ---------- tensor1 : torch.Tensor the first tensor tensor2 : torch.Tensor the second tensor weights : torch.Tensor tensor containing the weights of the features (default: None) Returns ------- squared_diffs : torch.Tensor the pairwise squared Euclidean distances """ assert tensor1.shape[1] == tensor2.shape[1], "The number of features of the two input tensors must match." ta = tensor1.unsqueeze(1) tb = tensor2.unsqueeze(0) squared_diffs = (ta - tb) if weights is not None: assert tensor1.shape[1] == weights.shape[0], "The weight tensor must have one entry for each feature" weights_unsqueezed = weights.unsqueeze(0).unsqueeze(1) squared_diffs = squared_diffs * weights_unsqueezed squared_diffs = squared_diffs.pow(2).sum(2) return squared_diffs
[docs]def detect_device(device: torch.device | int | str = None) -> torch.device: """ Automatically detects if you have a cuda enabled GPU. Device can also be read from environment variable "CLUSTPY_DEVICE". It can be set using, e.g., os.environ["CLUSTPY_DEVICE"] = "cuda:1" Parameters ---------- device : torch.device | int | str the input device. Will be returned if it is not None (default: None) Returns ------- device : torch.device device on which the prediction should take place """ assert device is None or type(device) is torch.device or type(device) is int or type(device) is str, "device must be None or of type torch.device, int or str" if device == -1: # Special case device = torch.device('cpu') elif device is None: env_device = os.environ.get("CLUSTPY_DEVICE", None) # Check if environment device is None - in that case CLUSTPY_DEVICE has not been specified if env_device is None: if torch.cuda.is_available(): # Try to automatically identify best GPU try: shell_output = (subprocess.check_output("nvidia-smi -q -d Utilization |grep Memory", shell=True)).decode('utf-8')[:-1] entries = shell_output.split("\n")[::2] used_memory = [int(e.split(":")[1].replace(" %", "")) for e in entries] device = torch.device("cuda:{0}".format(np.argmin(used_memory))) print(device, "was automatically chosen as device for the computation.") except Exception: # Default: Use first available GPU device = torch.device('cuda') else: device = torch.device('cpu') else: device = torch.device(env_device) elif type(device) is int or type(device) is str: device = torch.device(device) return device
[docs]def get_device_from_module(neural_network: torch.nn.Module) -> torch.device: """ Get the device from a given module. Parameters ---------- neural_network : torch.nn.Module the neural network that is used for the encoding (e.g. an autoencoder) Returns ------- device : torch.device device of the module """ example_param = next(neural_network.parameters()) if example_param.is_cuda: device = torch.device('cuda:' + str(example_param.get_device())) else: device = torch.device('cpu') return device
[docs]def encode_batchwise(dataloader: torch.utils.data.DataLoader, neural_network: torch.nn.Module) -> np.ndarray: """ Utility function for embedding the whole data set in a mini-batch fashion Parameters ---------- dataloader : torch.utils.data.DataLoader data to embed neural_network : torch.nn.Module the neural network that is used for the encoding (e.g. an autoencoder) Returns ------- embeddings_numpy : np.ndarray The embedded data set """ device = get_device_from_module(neural_network) embeddings_numpy = None for batch in dataloader: batch_data = batch[1].to(device) embedded_data = neural_network.encode(batch_data) # In case encode() returns more than one value (e.g., for a variational autoencoder), we will pick the first if type(embedded_data) is tuple: embedded_data = embedded_data[0] if embeddings_numpy is None: embeddings_numpy = np.zeros([len(dataloader.dataset)] + list(embedded_data.shape[1:]), dtype=float) embeddings_numpy[batch[0]] = embedded_data.detach().cpu().numpy() return embeddings_numpy
[docs]def decode_batchwise(dataloader: torch.utils.data.DataLoader, neural_network: torch.nn.Module) -> np.ndarray: """ Utility function for decoding the whole data set in a mini-batch fashion, e.g., with an autoencoder. Note: Assumes an implemented decode function Parameters ---------- dataloader : torch.utils.data.DataLoader data to decode neural_network : torch.nn.Module the neural network that is used for the decoding (e.g. an autoencoder) Returns ------- decodings_numpy : np.ndarray The decoded data set """ device = get_device_from_module(neural_network) decodings_numpy = None for batch in dataloader: batch_data = batch[1].to(device) embedded_data = neural_network.encode(batch_data) # In case encode() returns more than one value (e.g., for a variational autoencoder), we all of them will be used for decoding if type(embedded_data) is tuple: decoded_data = neural_network.decode(*embedded_data) else: decoded_data = neural_network.decode(embedded_data) if decodings_numpy is None: decodings_numpy = np.zeros([len(dataloader.dataset)] + list(decoded_data.shape[1:]), dtype=float) decodings_numpy[batch[0]] = decoded_data.detach().cpu().numpy() return decodings_numpy
[docs]def encode_decode_batchwise(dataloader: torch.utils.data.DataLoader, neural_network: torch.nn.Module) -> ( np.ndarray, np.ndarray): """ Utility function for encoding and decoding the whole data set in a mini-batch fashion, e.g., with an autoencoder. Note: Assumes an implemented decode function Parameters ---------- dataloader : torch.utils.data.DataLoader dataloader to be used neural_network : torch.nn.Module the neural network that is used for the encoding and decoding (e.g. an autoencoder) Returns ------- tuple : (np.ndarray, np.ndarray) The embedded data set, The decoded data set """ device = get_device_from_module(neural_network) embeddings_numpy = None decodings_numpy = None for batch in dataloader: batch_data = batch[1].to(device) embedded_data = neural_network.encode(batch_data) # In case encode() returns more than one value (e.g., for a variational autoencoder), we all of them will be used for decoding if type(embedded_data) is tuple: decoded_data = neural_network.decode(*embedded_data) embedded_data = embedded_data[0] else: decoded_data = neural_network.decode(embedded_data) if embeddings_numpy is None: embeddings_numpy = np.zeros([len(dataloader.dataset)] + list(embedded_data.shape[1:]), dtype=float) decodings_numpy = np.zeros([len(dataloader.dataset)] + list(decoded_data.shape[1:]), dtype=float) embeddings_numpy[batch[0]] = embedded_data.detach().cpu().numpy() decodings_numpy[batch[0]] = decoded_data.detach().cpu().numpy() return embeddings_numpy, decodings_numpy
[docs]def predict_batchwise(dataloader: torch.utils.data.DataLoader, neural_network: torch.nn.Module, cluster_module: torch.nn.Module) -> np.ndarray: """ Utility function for predicting the cluster labels over the whole data set in a mini-batch fashion. Method calls the predict_hard method of the cluster_module for each batch of data. Parameters ---------- dataloader : torch.utils.data.DataLoader dataloader to be used neural_network : torch.nn.Module the neural network that is used for the encoding (e.g. an autoencoder) cluster_module : torch.nn.Module the cluster module that is used for the encoding (e.g. DEC). Usually contains the predict method. Returns ------- predictions_numpy : np.ndarray The predictions of the cluster_module for the data set """ device = get_device_from_module(neural_network) predictions_numpy = np.zeros(len(dataloader.dataset), dtype=np.int32) for batch in dataloader: batch_data = batch[1].to(device) prediction = cluster_module.predict_hard(neural_network.encode(batch_data)).detach().cpu() predictions_numpy[batch[0]] = prediction return predictions_numpy
# def add_noise(batch): # mask = torch.empty( # batch.shape, device=batch.device).bernoulli_(0.8) # return batch * mask def int_to_one_hot(int_tensor: torch.Tensor, n_integers: int) -> torch.Tensor: """ Convert a tensor containing integers (e.g. labels) to an one hot encoding. Here, each integer gets its own features in the resulting tensor, where only the values 0 or 1 are accepted. E.g. [0,0,1,2,1] gets [[1,0,0], [1,0,0], [0,1,0], [0,0,1], [0,1,0]] Parameters ---------- int_tensor : torch.Tensor The original tensor containing integers n_integers : int The number of different integers within int_tensor Returns ------- onehot : torch.Tensor The final one hot encoding tensor """ onehot = torch.zeros([int_tensor.shape[0], n_integers], dtype=torch.float, device=int_tensor.device) onehot.scatter_(1, int_tensor.unsqueeze(1).long(), 1) return onehot def run_initial_clustering(X: np.ndarray, n_clusters: int, clustering_class: ClusterMixin, clustering_params: dict, random_state: np.random.RandomState) -> (int, np.ndarray, np.ndarray, ClusterMixin): """ Get an initial clustering result for a deep clustering algorithm. This result can then be refined by the optimization of the neural network. Parameters ---------- X : np.ndarray the embedded data set n_clusters : int number of clusters. Can be None if a corresponding initial_clustering_class is given, e.g. DBSCAN clustering_class : ClusterMixin the class of the initial clustering algorithm. If it is None, random labels will be chosen clustering_params : dict the parameters for the initial clustering algorithm random_state : np.random.RandomState use a fixed random state to get a repeatable solution Returns ------- tuple : (int, np.ndarray, np.ndarray, ClusterMixin) The number of clusters (can change if e.g. DBSCAN is used), The initial cluster labels, The initial cluster centers, The clustering object """ if clustering_class is None: clustering_algo = ClusterMixin() clustering_algo.labels_ = np.random.randint(n_clusters, size=X.shape[0]) else: # Get possible input parameters of the clustering algorithm clustering_class_parameters = inspect.getfullargspec(clustering_class).args + inspect.getfullargspec( clustering_class).kwonlyargs # Check if n_clusters or n_components is contained in the possible parameters if "n_clusters" in clustering_class_parameters: if "random_state" in clustering_class_parameters and "random_state" not in clustering_params.keys(): clustering_algo = clustering_class(n_clusters=n_clusters, random_state=random_state, **clustering_params) else: clustering_algo = clustering_class(n_clusters=n_clusters, **clustering_params) elif "n_components" in clustering_class_parameters: # in case of GMM if "random_state" in clustering_class_parameters and "random_state" not in clustering_params.keys(): clustering_algo = clustering_class(n_components=n_clusters, random_state=random_state, **clustering_params) else: clustering_algo = clustering_class(n_components=n_clusters, **clustering_params) else: # in case of e.g., DBSCAN if "random_state" in clustering_class_parameters and "random_state" not in clustering_params.keys(): clustering_algo = clustering_class(random_state=random_state, **clustering_params) else: clustering_algo = clustering_class(**clustering_params) # Run algorithm clustering_algo.fit(X) # Check if clustering algorithm return cluster centers if hasattr(clustering_algo, "cluster_centers_"): labels = clustering_algo.labels_ centers = clustering_algo.cluster_centers_ elif hasattr(clustering_algo, "means_"): # in case of GMM labels = clustering_algo.predict(X) centers = clustering_algo.means_ else: # in case of e.g., DBSCAN labels = clustering_algo.labels_ centers = np.array([np.mean(X[labels == i], axis=0) for i in np.unique(labels) if i >= 0]) n_clusters = np.sum(np.unique(labels) >= 0) # Needed for DBSCAN, XMeans, GMeans, ... return n_clusters, labels, centers, clustering_algo