"""
@authors:
Collin Leiber
"""
import torch
import numpy as np
from scipy.spatial.distance import cdist
from clustpy.deep.neural_networks.feedforward_autoencoder import FeedforwardAutoencoder
from clustpy.deep.neural_networks._abstract_autoencoder import FullyConnectedBlock
from collections.abc import Callable
[docs]def get_neighbors_batchwise(X: np.ndarray, n_neighbors: int, metric: str = "sqeuclidean",
batch_size: int = 10000) -> list:
"""
For large datasets it is often not possible to determine the nearest neighbors in a trivial manner.
Therefore, here is an implementation that calculates the nearest neighbors in batches.
Ignores the objects themselves (with distance of 0) as nearest neighbors.
It reduces the memory consumption of a trivial nearest neighbor implementation from (data_size x data_size) to (batch_size x data_size).
A list is returned, which can be given as additional input into a DataLoader and is therefore directly compatible with the NeighborEncoder.
Due to runtime concerns it is still recommended to use a more complex nearest neighbor retrieval implementation (e.g. from sklearn.neighbor)!
Parameters
----------
X : np.ndarray
The given data set
n_neighbors : int
The number of nearest neighbors to identify
metric : str
The distance metric to be used. See scipy.spatial.distance.cdist for more information (default: sqeuclidean)
batch_size : int
The size of the batches (default: 10000)
Returns
-------
nearest_neigbors : list
A list containing the nearest neighbors as torch.Tensors, i.e. [1-nearest-neighbor tensor, 2-nearest-neighbor tensor, ...]
Examples
--------
>>> from clustpy.data import create_subspace_data
>>> from clustpy.deep import get_dataloader
>>> X, L = create_subspace_data(1500, subspace_features=(3, 50), random_state=1)
>>> n_neighbors = 3
>>> neighbors = get_neighbors_batchwise(X, n_neighbors)
>>> dataloader = get_dataloader(X, 256, True, additional_inputs=neighbors)
>>> neighbor_encoder = NeighborEncoder(layers=[X.shape[1], 512, 256, 10], n_neighbors=n_neighbors)
>>> neighbor_encoder.fit(dataloader=dataloader, n_epochs=5, lr=1e-3)
"""
# batch_size should not be larger than the dataset
batch_size = min(X.shape[0], batch_size)
# Create list containing the nearest neighbors
nearest_neigbors = [np.zeros(X.shape) for _ in range(n_neighbors)]
# Check if last batch is a complete batch
add_one_batch = 0 if X.shape[0] % batch_size == 0 else 1
n_iterations = X.shape[0] // batch_size + add_one_batch
for i in range(n_iterations):
index_0 = i * batch_size
if i != n_iterations - 1 or add_one_batch == 0:
index_1 = index_0 + batch_size
else:
index_1 = index_0 + X.shape[0] % batch_size
distances = cdist(X[index_0:index_1], X, metric=metric)
arg_distances = np.argsort(distances, axis=1)
for k in range(n_neighbors):
nearest_neigbors[k][index_0:index_1] = X[arg_distances[:, k + 1]]
return nearest_neigbors
[docs]class NeighborEncoder(FeedforwardAutoencoder):
"""
A NeighborEncoder. Does not compare the reconstruction of an object to itself but to its nearest neighbors.
For more information see the stated reference.
If n_neighbors is 0 and decode_self is true, the NeighborEncoder will work as a regular FeedforwardAutoencoder.
Parameters
----------
layers : list
list of the different layer sizes from input to embedding, e.g. an example architecture for MNIST [784, 512, 256, 10], where 784 is the input dimension and 10 the embedding dimension.
If decoder_layers are not specified then the decoder is symmetric and goes in the same order from embedding to input.
n_neighbors : int
the number of nearest neighbors to be considered
decode_self : bool
specifies whether a point itself should also be decoded (default: False)
batch_norm : bool
Set True if you want to use torch.nn.BatchNorm1d (default: False)
dropout : float
Set the amount of dropout you want to use (default: None)
activation_fn : torch.nn.Module
activation function from torch.nn, set the activation function for the hidden layers, if None then it will be linear (default: torch.nn.LeakyReLU)
bias : bool
set False if you do not want to use a bias term in the linear layers (default: True)
decoder_layers : list
list of different layer sizes from embedding to output of the decoder. If set to None, will be symmetric to layers (default: None)
decoder_output_fn : torch.nn.Module
activation function from torch.nn, set the activation function for the decoder output layer, if None then it will be linear.
E.g. set to torch.nn.Sigmoid if you want to scale the decoder output between 0 and 1 (default: None)
work_on_copy : bool
If set to true, deep clustering algorithms will optimize a copy of the autoencoder and not the autoencoder itself.
Ensures that the same autoencoder can be used by multiple deep clustering algorithms.
As copies of this object are created, the memory requirement increases (default: True)
random_state : np.random.RandomState | int
use a fixed random state to get a repeatable solution. Can also be of type int (default: None)
Attributes
----------
encoder : FullyConnectedBlock
encoder part of the autoencoder, responsible for embedding data points (class is FullyConnectedBlock)
decoder : FullyConnectedBlock
decoder part of the autoencoder, responsible for reconstructing the data point itself (class is FullyConnectedBlock).
Only used if decode_self is true.
neighbor_decoders : list
list containing one decoder network (class is FullyConnectedBlock) for each nearest neighbor
fitted : bool
boolean value indicating whether the autoencoder is already fitted
work_on_copy : bool
indicates whether deep clustering algorithms should work on a copy of the original autoencoder
Examples
--------
>>> from clustpy.data import create_subspace_data
>>> from clustpy.deep import get_dataloader
>>> from scipy.spatial.distance import pdist, squareform
>>> X, L = create_subspace_data(1500, subspace_features=(3, 50), random_state=1)
>>> n_neighbors = 3
>>> dist_matrix = squareform(pdist(X))
>>> neighbor_ids = np.argsort(dist_matrix, axis=1)
>>> neighbors = [X[neighbor_ids[:, 1 + i]] for i in range(n_neighbors)]
>>> # Alternatively: neighbors = get_neighbors_batchwise(X, n_neighbors)
>>> dataloader = get_dataloader(X, 256, True, additional_inputs=neighbors)
>>> neighbor_encoder = NeighborEncoder(layers=[X.shape[1], 512, 256, 10], n_neighbors=n_neighbors, decode_self=False)
>>> neighbor_encoder.fit(dataloader=dataloader, n_epochs=5, lr=1e-3)
References
----------
Yeh, Chin-Chia Michael, et al. "Representation Learning by Reconstructing Neighborhoods." arXiv preprint arXiv:1811.01557 (2018).
"""
def __init__(self, layers: list, n_neighbors: int, decode_self: bool = False, batch_norm: bool = False,
dropout: float = None, activation_fn: torch.nn.Module = torch.nn.LeakyReLU, bias: bool = True,
decoder_layers: list = None, decoder_output_fn: torch.nn.Module = None, work_on_copy: bool = True,
random_state: np.random.RandomState | int = None):
assert n_neighbors > 0 or decode_self, "n_neighbors must be an integer larger than 0 or decode_self must be true"
super().__init__(layers, batch_norm, dropout, activation_fn, bias, decoder_layers, decoder_output_fn,
work_on_copy, random_state)
self.n_neighbors = n_neighbors
self.decode_self = decode_self
neighbor_decoders = torch.nn.ModuleList([FullyConnectedBlock(layers=self.decoder.layers, batch_norm=batch_norm,
dropout=dropout, activation_fn=activation_fn,
bias=bias,
output_fn=decoder_output_fn) for _ in
range(n_neighbors)])
self.neighbor_decoders = neighbor_decoders
[docs] def decode(self, embedded: torch.Tensor) -> torch.Tensor:
"""
Apply the decoder function of each neighbor network to embedded.
Returns a (n_neighbors x batch_size x dimensionality) tensor if decode_self is false, else a (n_neighbors + 1 x batch_size x dimensionality) tensor
Parameters
----------
embedded : torch.Tensor
embedded data point, can also be a mini-batch of embedded points
Returns
-------
decoded_neighbors : torch.Tensor
returns the reconstructions of the embedded sample concerning its neighbor decoders
"""
assert embedded.shape[1] == self.decoder.layers[0], "Input layer of the decoder does not match input sample"
n_decoded_objects = self.n_neighbors + 1 if self.decode_self else self.n_neighbors
decoded_neighbors = torch.zeros((n_decoded_objects, embedded.shape[0], self.encoder.layers[0]))
for i in range(self.n_neighbors): # TODO: Maybe use functorch.vmap in the future for vectorization
decoded_neighbors[i] = self.neighbor_decoders[i](embedded)
if self.decode_self:
decoded_neighbors[-1] = self.decoder(embedded)
return decoded_neighbors
[docs] def loss(self, batch: list, ssl_loss_fn: torch.nn.modules.loss._Loss, device: torch.device,
corruption_fn: Callable = None) -> (torch.Tensor, torch.Tensor, torch.Tensor):
"""
Calculate the loss of a single batch of data.
Corresponds to the sum of losses concerning each neighbor.
batch must contain the data object at the first position and the neighbors at the following positions.
Parameters
----------
batch: list
the different parts of a dataloader (id, samples, 1-nearest-neighbor, 2-nearest-neighbor, ...)
ssl_loss_fn : torch.nn.modules.loss._Loss
self-supervised learning (ssl) loss function for training the network, e.g. reconstruction loss
device : torch.device
device to be trained on
corruption_fn : Callable
Can be used to corrupt the input data, e.g., when using a denoising autoencoder.
Note that the function must match the data and the data loaders.
For example, if the data is normalized, this may have to be taken into account in the corruption function - e.g. in case of salt and pepper noise (default: None)
Returns
-------
loss : (torch.Tensor, torch.Tensor, torch.Tensor)
the sum of the reconstruction losses of the input sample,
the embedded input sample,
the reconstructions of the embedded sample concerning its neighbor decoders
"""
assert type(batch) is list, "batch must come from a dataloader and therefore be of type list"
batch_data = batch[1].to(device)
batch_data_adj = batch_data if corruption_fn is None else corruption_fn(batch_data)
embedded = self.encode(batch_data_adj)
decoded_neighbors = self.decode(embedded)
loss = torch.tensor(0.)
for i in range(self.n_neighbors): # TODO: Maybe use functorch.vmap in the future for vectorization
neighbors = batch[2 + i].to(device)
loss = loss + ssl_loss_fn(decoded_neighbors[i], neighbors)
if self.decode_self:
reconstruction = decoded_neighbors[-1]
loss = loss + ssl_loss_fn(reconstruction, batch_data)
return loss, embedded, decoded_neighbors
[docs] def fit(self, n_epochs: int, optimizer_params: dict, batch_size: int = 128,
dataloader: torch.utils.data.DataLoader = None, evalloader: torch.utils.data.DataLoader = None,
optimizer_class: torch.optim.Optimizer = torch.optim.Adam,
ssl_loss_fn: torch.nn.modules.loss._Loss = torch.nn.MSELoss(), patience: int = 5,
scheduler: torch.optim.lr_scheduler = None, scheduler_params: dict = None,
corruption_fn: Callable = None, model_path: str = None) -> 'NeighborEncoder':
"""
Trains the NeighborEncoder in place.
Equal to fit function of the FeedforwardAutoencoder but does only work with a dataloader (not with a regular data array).
This is because the dataloader must contain the nearest neighbors of each point at the positions 2, 3, ....
Parameters
----------
n_epochs : int
number of epochs for training
optimizer_params : dict
parameters of the optimizer, includes the learning rate
batch_size : int
size of the data batches (default: 128)
dataloader : torch.utils.data.DataLoader
dataloader to be used for training (default: default=None)
evalloader : torch.utils.data.DataLoader
dataloader to be used for evaluation, early stopping and learning rate scheduling if scheduler=torch.optim.lr_scheduler.ReduceLROnPlateau (default: None)
optimizer_class : torch.optim.Optimizer
optimizer to be used (default: torch.optim.Adam)
ssl_loss_fn : torch.nn.modules.loss._Loss
self-supervised learning (ssl) loss function for training the network, e.g. reconstruction loss (default: torch.nn.MSELoss())
patience : int
patience parameter for EarlyStopping (default: 5)
scheduler : torch.optim.lr_scheduler
learning rate scheduler that should be used.
If torch.optim.lr_scheduler.ReduceLROnPlateau is used then the behaviour is matched by providing the validation_loss calculated based on samples from evalloader (default: None)
scheduler_params : dict
dictionary of the parameters of the scheduler object (default: None)
corruption_fn : Callable
Can be used to corrupt the input data, e.g., when using a denoising autoencoder.
Note that the function must match the data and the data loaders.
For example, if the data is normalized, this may have to be taken into account in the corruption function - e.g. in case of salt and pepper noise (default: None)
model_path : str
if specified will save the trained model to the location. If evalloader is used, then only the best model w.r.t. evaluation loss is saved (default: None)
Returns
-------
self : NeighborEncoder
this instance of the NeighborEncoder
"""
super().fit(n_epochs, optimizer_params, batch_size, None, None, dataloader, evalloader, optimizer_class,
ssl_loss_fn, patience, scheduler, scheduler_params, corruption_fn, model_path)
return self