from sklearn.base import ClusterMixin
import inspect
import torch
import numpy as np
import random
import os
import subprocess
[docs]def set_torch_seed(random_state: np.random.RandomState | int) -> None:
"""
Set the random state for torch applications.
Parameters
----------
random_state : np.random.RandomState | int
use a fixed random state or an integer to get a repeatable solution
"""
if type(random_state) is int:
seed = random_state
elif type(random_state) is np.random.RandomState:
seed = random_state.randint(np.iinfo(np.int32).max)
else:
raise ValueError("random_state must be of type int or np.random.RandomState")
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
np.random.seed(seed)
random.seed(seed)
[docs]def mean_squared_error(tensor1: torch.Tensor, tensor2: torch.Tensor, weights: torch.Tensor = None) -> torch.Tensor:
"""
Calculate the mean squared error between two tensors.
Each row in the tensors is interpreted as a separate object, while each column represents its features.
Optionally, features can be individually weighted.
The default behavior is that all features are weighted by 1.
Parameters
----------
tensor1 : torch.Tensor
the first tensor
tensor2 : torch.Tensor
the second tensor
weights : torch.Tensor
tensor containing the weights of the features (default: None)
Returns
-------
mse : torch.Tensor
the mean squared error
"""
assert tensor1.shape == tensor2.shape, "The two input tensors must have the same shape."
diffs = tensor1 - tensor2
if weights is not None:
assert tensor1.shape[1] == weights.shape[0], "The weight tensor must have one entry for each feature"
diffs = diffs * weights
mse = diffs.pow(2).sum() / tensor1.shape[0]
return mse
def squared_euclidean_distance(tensor1: torch.Tensor, tensor2: torch.Tensor,
weights: torch.Tensor = None) -> torch.Tensor:
"""
Calculate the pairwise squared Euclidean distance between two tensors.
Each row in the tensors is interpreted as a separate object, while each column represents its features.
Therefore, the result of an (4x3) and (12x3) tensor will be a (4x12) tensor.
Optionally, features can be individually weighted.
The default behavior is that all features are weighted by 1.
Parameters
----------
tensor1 : torch.Tensor
the first tensor
tensor2 : torch.Tensor
the second tensor
weights : torch.Tensor
tensor containing the weights of the features (default: None)
Returns
-------
squared_diffs : torch.Tensor
the pairwise squared Euclidean distances
"""
assert tensor1.shape[1] == tensor2.shape[1], "The number of features of the two input tensors must match."
ta = tensor1.unsqueeze(1)
tb = tensor2.unsqueeze(0)
squared_diffs = (ta - tb)
if weights is not None:
assert tensor1.shape[1] == weights.shape[0], "The weight tensor must have one entry for each feature"
weights_unsqueezed = weights.unsqueeze(0).unsqueeze(1)
squared_diffs = squared_diffs * weights_unsqueezed
squared_diffs = squared_diffs.pow(2).sum(2)
return squared_diffs
[docs]def detect_device(device: torch.device | int | str = None) -> torch.device:
"""
Automatically detects if you have a cuda enabled GPU.
Device can also be read from environment variable "CLUSTPY_DEVICE".
It can be set using, e.g., os.environ["CLUSTPY_DEVICE"] = "cuda:1"
Parameters
----------
device : torch.device | int | str
the input device. Will be returned if it is not None (default: None)
Returns
-------
device : torch.device
device on which the prediction should take place
"""
assert device is None or type(device) is torch.device or type(device) is int or type(device) is str, "device must be None or of type torch.device, int or str"
if device == -1:
# Special case
device = torch.device('cpu')
elif device is None:
env_device = os.environ.get("CLUSTPY_DEVICE", None)
# Check if environment device is None - in that case CLUSTPY_DEVICE has not been specified
if env_device is None:
if torch.cuda.is_available():
# Try to automatically identify best GPU
try:
shell_output = (subprocess.check_output("nvidia-smi -q -d Utilization |grep Memory", shell=True)).decode('utf-8')[:-1]
entries = shell_output.split("\n")[::2]
used_memory = [int(e.split(":")[1].replace(" %", "")) for e in entries]
device = torch.device("cuda:{0}".format(np.argmin(used_memory)))
print(device, "was automatically chosen as device for the computation.")
except Exception:
# Default: Use first available GPU
device = torch.device('cuda')
else:
device = torch.device('cpu')
else:
device = torch.device(env_device)
elif type(device) is int or type(device) is str:
device = torch.device(device)
return device
[docs]def get_device_from_module(neural_network: torch.nn.Module) -> torch.device:
"""
Get the device from a given module.
Parameters
----------
neural_network : torch.nn.Module
the neural network that is used for the encoding (e.g. an autoencoder)
Returns
-------
device : torch.device
device of the module
"""
example_param = next(neural_network.parameters())
if example_param.is_cuda:
device = torch.device('cuda:' + str(example_param.get_device()))
else:
device = torch.device('cpu')
return device
[docs]def encode_batchwise(dataloader: torch.utils.data.DataLoader, neural_network: torch.nn.Module) -> np.ndarray:
"""
Utility function for embedding the whole data set in a mini-batch fashion
Parameters
----------
dataloader : torch.utils.data.DataLoader
data to embed
neural_network : torch.nn.Module
the neural network that is used for the encoding (e.g. an autoencoder)
Returns
-------
embeddings_numpy : np.ndarray
The embedded data set
"""
device = get_device_from_module(neural_network)
embeddings_numpy = None
for batch in dataloader:
batch_data = batch[1].to(device)
embedded_data = neural_network.encode(batch_data)
# In case encode() returns more than one value (e.g., for a variational autoencoder), we will pick the first
if type(embedded_data) is tuple:
embedded_data = embedded_data[0]
if embeddings_numpy is None:
embeddings_numpy = np.zeros([len(dataloader.dataset)] + list(embedded_data.shape[1:]), dtype=float)
embeddings_numpy[batch[0]] = embedded_data.detach().cpu().numpy()
return embeddings_numpy
[docs]def decode_batchwise(dataloader: torch.utils.data.DataLoader, neural_network: torch.nn.Module) -> np.ndarray:
"""
Utility function for decoding the whole data set in a mini-batch fashion, e.g., with an autoencoder.
Note: Assumes an implemented decode function
Parameters
----------
dataloader : torch.utils.data.DataLoader
data to decode
neural_network : torch.nn.Module
the neural network that is used for the decoding (e.g. an autoencoder)
Returns
-------
decodings_numpy : np.ndarray
The decoded data set
"""
device = get_device_from_module(neural_network)
decodings_numpy = None
for batch in dataloader:
batch_data = batch[1].to(device)
embedded_data = neural_network.encode(batch_data)
# In case encode() returns more than one value (e.g., for a variational autoencoder), we all of them will be used for decoding
if type(embedded_data) is tuple:
decoded_data = neural_network.decode(*embedded_data)
else:
decoded_data = neural_network.decode(embedded_data)
if decodings_numpy is None:
decodings_numpy = np.zeros([len(dataloader.dataset)] + list(decoded_data.shape[1:]), dtype=float)
decodings_numpy[batch[0]] = decoded_data.detach().cpu().numpy()
return decodings_numpy
[docs]def encode_decode_batchwise(dataloader: torch.utils.data.DataLoader, neural_network: torch.nn.Module) -> (
np.ndarray, np.ndarray):
"""
Utility function for encoding and decoding the whole data set in a mini-batch fashion, e.g., with an autoencoder.
Note: Assumes an implemented decode function
Parameters
----------
dataloader : torch.utils.data.DataLoader
dataloader to be used
neural_network : torch.nn.Module
the neural network that is used for the encoding and decoding (e.g. an autoencoder)
Returns
-------
tuple : (np.ndarray, np.ndarray)
The embedded data set,
The decoded data set
"""
device = get_device_from_module(neural_network)
embeddings_numpy = None
decodings_numpy = None
for batch in dataloader:
batch_data = batch[1].to(device)
embedded_data = neural_network.encode(batch_data)
# In case encode() returns more than one value (e.g., for a variational autoencoder), we all of them will be used for decoding
if type(embedded_data) is tuple:
decoded_data = neural_network.decode(*embedded_data)
embedded_data = embedded_data[0]
else:
decoded_data = neural_network.decode(embedded_data)
if embeddings_numpy is None:
embeddings_numpy = np.zeros([len(dataloader.dataset)] + list(embedded_data.shape[1:]), dtype=float)
decodings_numpy = np.zeros([len(dataloader.dataset)] + list(decoded_data.shape[1:]), dtype=float)
embeddings_numpy[batch[0]] = embedded_data.detach().cpu().numpy()
decodings_numpy[batch[0]] = decoded_data.detach().cpu().numpy()
return embeddings_numpy, decodings_numpy
[docs]def predict_batchwise(dataloader: torch.utils.data.DataLoader, neural_network: torch.nn.Module,
cluster_module: torch.nn.Module) -> np.ndarray:
"""
Utility function for predicting the cluster labels over the whole data set in a mini-batch fashion.
Method calls the predict_hard method of the cluster_module for each batch of data.
Parameters
----------
dataloader : torch.utils.data.DataLoader
dataloader to be used
neural_network : torch.nn.Module
the neural network that is used for the encoding (e.g. an autoencoder)
cluster_module : torch.nn.Module
the cluster module that is used for the encoding (e.g. DEC). Usually contains the predict method.
Returns
-------
predictions_numpy : np.ndarray
The predictions of the cluster_module for the data set
"""
device = get_device_from_module(neural_network)
predictions_numpy = np.zeros(len(dataloader.dataset), dtype=np.int32)
for batch in dataloader:
batch_data = batch[1].to(device)
prediction = cluster_module.predict_hard(neural_network.encode(batch_data)).detach().cpu()
predictions_numpy[batch[0]] = prediction
return predictions_numpy
# def add_noise(batch):
# mask = torch.empty(
# batch.shape, device=batch.device).bernoulli_(0.8)
# return batch * mask
def int_to_one_hot(int_tensor: torch.Tensor, n_integers: int) -> torch.Tensor:
"""
Convert a tensor containing integers (e.g. labels) to an one hot encoding.
Here, each integer gets its own features in the resulting tensor, where only the values 0 or 1 are accepted.
E.g. [0,0,1,2,1] gets
[[1,0,0],
[1,0,0],
[0,1,0],
[0,0,1],
[0,1,0]]
Parameters
----------
int_tensor : torch.Tensor
The original tensor containing integers
n_integers : int
The number of different integers within int_tensor
Returns
-------
onehot : torch.Tensor
The final one hot encoding tensor
"""
onehot = torch.zeros([int_tensor.shape[0], n_integers], dtype=torch.float, device=int_tensor.device)
onehot.scatter_(1, int_tensor.unsqueeze(1).long(), 1)
return onehot
def run_initial_clustering(X: np.ndarray, n_clusters: int, clustering_class: ClusterMixin, clustering_params: dict,
random_state: np.random.RandomState) -> (int, np.ndarray, np.ndarray, ClusterMixin):
"""
Get an initial clustering result for a deep clustering algorithm.
This result can then be refined by the optimization of the neural network.
Parameters
----------
X : np.ndarray
the embedded data set
n_clusters : int
number of clusters. Can be None if a corresponding initial_clustering_class is given, e.g. DBSCAN
clustering_class : ClusterMixin
the class of the initial clustering algorithm.
If it is None, random labels will be chosen
clustering_params : dict
the parameters for the initial clustering algorithm
random_state : np.random.RandomState
use a fixed random state to get a repeatable solution
Returns
-------
tuple : (int, np.ndarray, np.ndarray, ClusterMixin)
The number of clusters (can change if e.g. DBSCAN is used),
The initial cluster labels,
The initial cluster centers,
The clustering object
"""
if clustering_class is None:
clustering_algo = ClusterMixin()
clustering_algo.labels_ = np.random.randint(n_clusters, size=X.shape[0])
else:
# Get possible input parameters of the clustering algorithm
clustering_class_parameters = inspect.getfullargspec(clustering_class).args + inspect.getfullargspec(
clustering_class).kwonlyargs
# Check if n_clusters or n_components is contained in the possible parameters
if "n_clusters" in clustering_class_parameters:
if "random_state" in clustering_class_parameters and "random_state" not in clustering_params.keys():
clustering_algo = clustering_class(n_clusters=n_clusters, random_state=random_state, **clustering_params)
else:
clustering_algo = clustering_class(n_clusters=n_clusters, **clustering_params)
elif "n_components" in clustering_class_parameters: # in case of GMM
if "random_state" in clustering_class_parameters and "random_state" not in clustering_params.keys():
clustering_algo = clustering_class(n_components=n_clusters, random_state=random_state, **clustering_params)
else:
clustering_algo = clustering_class(n_components=n_clusters, **clustering_params)
else: # in case of e.g., DBSCAN
if "random_state" in clustering_class_parameters and "random_state" not in clustering_params.keys():
clustering_algo = clustering_class(random_state=random_state, **clustering_params)
else:
clustering_algo = clustering_class(**clustering_params)
# Run algorithm
clustering_algo.fit(X)
# Check if clustering algorithm return cluster centers
if hasattr(clustering_algo, "cluster_centers_"):
labels = clustering_algo.labels_
centers = clustering_algo.cluster_centers_
elif hasattr(clustering_algo, "means_"): # in case of GMM
labels = clustering_algo.predict(X)
centers = clustering_algo.means_
else: # in case of e.g., DBSCAN
labels = clustering_algo.labels_
centers = np.array([np.mean(X[labels == i], axis=0) for i in np.unique(labels) if i >= 0])
n_clusters = np.sum(np.unique(labels) >= 0) # Needed for DBSCAN, XMeans, GMeans, ...
return n_clusters, labels, centers, clustering_algo