Source code for clustpy.deep._data_utils

import torch
import torchvision
import numpy as np
from typing import Callable, List


class _ClustpyDataset(torch.utils.data.Dataset):
    """
    Dataset wrapping tensors that has the indices always in the first entry.
    Each sample will be retrieved by indexing tensors along the first dimension.
    Optionally you can pass additional augmentation transforms and/or preprocessing transforms. The augmented tensor
    will be in the second entry and the original version in the third entry and so on for additional tensors

    Implementation is based on torch.utils.data.Dataset.

    Parameters
    ----------
    *tensors : torch.Tensor
        tensors that have the same size of the first dimension. Usually contains the data.
    aug_transforms_list : List of torchvision.transforms
        List of augmentation torchvision.transforms for each tensor in tensors. Note that multiple torchvision.transforms can be combined using
        torchvision.transforms.Compose. If a tensor in the list should not be transformed add None to the list.
        For example, [transform0, None, transform1], will apply the transform0 to the first tensor, the second tensor will not be transformed
        and the third tensor will be transformed with transform1.
    orig_transforms_list : List of torchvision.transforms
        List of torchvision.transforms for each original tensor in tensors, e.g., for preprocessing. If a tensor in the list should not be transformed add None to the list.
    
    Attributes
    ----------
    tensors : torch.Tensor
        tensors that have the same size of the first dimension. Usually contains the data.
    aug_transforms_list : List of torchvision.transforms
    orig_transforms_list : List of torchvision.transforms
    """

    def __init__(self, *tensors: torch.Tensor, aug_transforms_list: List[Callable] = None,
                 orig_transforms_list: List[Callable] = None):
        assert all(tensors[0].size(0) == tensor.size(0) for tensor in tensors), "Size mismatch between tensors"
        self.tensors = tensors
        assert orig_transforms_list is None or len(orig_transforms_list) == len(
            tensors), "Size mismatch between tensors and orig_transforms_list"
        self.orig_transforms_list = orig_transforms_list
        assert aug_transforms_list is None or len(aug_transforms_list) == len(
            tensors), "Size mismatch between tensors and aug_transforms_list"
        self.aug_transforms_list = aug_transforms_list

    def __getitem__(self, index: int) -> tuple:
        """
        Get sample at specified index.

        Parameters
        ----------
        index : int
            index of the desired sample

        Returns
        -------
        final_tuple : tuple
            Tuple containing the sample. Consists of (index, data1, data2, ...), depending on the input tensors.
        """

        if self.orig_transforms_list is None and self.aug_transforms_list is None:
            final_tuple = tuple([index] + [tensor[index] for tensor in self.tensors])
        else:
            aug_list = []
            for i, tensor in enumerate(self.tensors):
                if self.aug_transforms_list is not None:
                    # apply augmentation
                    aug_transforms_i = self.aug_transforms_list[i]
                    if aug_transforms_i is not None:
                        aug_list.append(aug_transforms_i(tensor[index]))

                if self.orig_transforms_list is not None:
                    # apply preprocessing
                    orig_transforms_i = self.orig_transforms_list[i]
                    if orig_transforms_i is None:
                        orig_i = tensor[index]
                    else:
                        orig_i = orig_transforms_i(tensor[index])
                else:
                    orig_i = tensor[index]

                aug_list.append(orig_i)

            final_tuple = tuple([index] + aug_list)
        return final_tuple

    def __len__(self) -> int:
        """
        Get length of the dataset which equals the length of the input tensors.

        Returns
        -------
        dataset_size : int
            Length of the dataset.
        """
        dataset_size = self.tensors[0].size(0)
        return dataset_size


[docs]def get_dataloader(X: np.ndarray | torch.Tensor, batch_size: int, shuffle: bool = True, drop_last: bool = False, additional_inputs: list | np.ndarray | torch.Tensor = None, dataset_class: torch.utils.data.Dataset = _ClustpyDataset, ds_kwargs: dict = None, dl_kwargs: dict = None) -> torch.utils.data.DataLoader: """ Create a dataloader for Deep Clustering algorithms. First entry always contains the indices of the data samples. Second entry always contains the actual data samples. If for example labels are desired, they can be passed through the additional_inputs parameter (should be a list). Other customizations (e.g. augmentation) can be implemented using a custom dataset_class. This custom class should stick to the conventions, [index, data, ...]. Parameters ---------- X : np.ndarray | torch.Tensor the actual data set (can be np.ndarray or torch.Tensor) batch_size : int the batch size shuffle : bool boolean that defines if the data set should be shuffled (default: True) drop_last : bool boolean that defines if the last batch should be ignored (default: False) additional_inputs : list | np.ndarray | torch.Tensor additional inputs for the dataloader, e.g. labels. Can be None, np.ndarray, torch.Tensor or a list containing np.ndarrays/torch.Tensors (default: None) dataset_class : torch.utils.data.Dataset defines the class of the tensor dataset that is contained in the dataloader (default: _ClustpyDataset) ds_kwargs : dict other arguments for dataset_class. An example usage would be to include augmentation or preprocessing transforms to the _ClustpyDataset by passing ds_kwargs={"aug_transforms_list":[aug_transforms], "orig_transforms_list":[orig_transforms]}, where aug_transforms and orig_transforms are transforming the input X, e.g., using torchvision.transforms.Compose to combine multiple transformations. Important: If aug_transform_list is passed via ds_kwargs the returned values of the dataloader change. The first entry will still be the indices of the data sample, but the second samples will be the transformed version of the actual data samples and third entry will be the original data samples. If orig_transforms_list is passed as well then the third entry will be transformed accordingly, this might be needed for preprocessing the data. An example for MNIST is shown below. dl_kwargs : dict other arguments for torch.utils.data.DataLoader Examples ---------- >>> # Example for usage of data transformations with get_dataloader >>> from clustpy.data import load_mnist >>> import torch >>> import torchvision >>> # load and prepare data for torchvision.transforms >>> data, labels = load_mnist() >>> data = data.reshape(-1, 1, 28, 28) >>> data /= 255.0 >>> data = torch.from_numpy(data).float() >>> # >>> # preprocessing functions >>> mean = data.mean() >>> std = data.std() >>> normalize_fn = torchvision.transforms.Normalize([mean], [std]) >>> # flatten is only needed if a FeedForward network is used, otherwise this can be skipped. >>> flatten_fn = torchvision.transforms.Lambda(torch.flatten) >>> # >>> # augmentation transforms >>> transform_list = [ >>> # transform input tensor to PIL image for augmentation >>> torchvision.transforms.ToPILImage(), >>> # apply transformations >>> torchvision.transforms.RandomAffine(degrees=(-16,+16), >>> translate=(0.1, 0.1), >>> shear=(-8, 8), >>> fill=0), >>> # transform back to torch.tensor >>> torchvision.transforms.ToTensor(), >>> # preprocess and flatten >>> normalize_fn, >>> flatten_fn, >>> ] >>> # >>> # augmentation transforms >>> aug_transforms = torchvision.transforms.Compose(transform_list) >>> # preprocessing transforms without augmentation >>> orig_transforms = torchvision.transforms.Compose([normalize_fn, flatten_fn]) >>> # >>> # pass transforms to dataloader >>> aug_dl = get_dataloader(data, batch_size=32, shuffle=True, >>> ds_kwargs={"aug_transforms_list":[aug_transforms], "orig_transforms_list":[orig_transforms]}, >>> ) Returns ------- dataloader : torch.utils.data.DataLoader The final dataloader """ assert type(X) in [np.ndarray, torch.Tensor], "X must be of type np.ndarray or torch.Tensor." assert additional_inputs is None or type(additional_inputs) in [np.ndarray, torch.Tensor, list], "additional_input must be None or of type np.ndarray, torch.Tensor or list." ds_kwargs = {} if ds_kwargs is None else ds_kwargs dl_kwargs = {} if dl_kwargs is None else dl_kwargs if type(X) is np.ndarray: # Convert np.ndarray to torch.Tensor X = torch.from_numpy(X).float() dataset_input = [X] if additional_inputs is not None: # Check type of additional_inputs if type(additional_inputs) is np.ndarray: dataset_input.append(torch.from_numpy(additional_inputs).float()) elif type(additional_inputs) is torch.Tensor: dataset_input.append(additional_inputs) else: for input in additional_inputs: if type(input) is np.ndarray: input = torch.from_numpy(input).float() elif type(input) is not torch.Tensor: raise Exception( "inputs of additional_inputs must be of type np.ndarray or torch.Tensor. Your input type: {0}".format( type(input))) dataset_input.append(input) dataset = dataset_class(*dataset_input, **ds_kwargs) # Create dataloader using the dataset dataloader = torch.utils.data.DataLoader( dataset, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last, **dl_kwargs) return dataloader
def get_data_dim_from_dataloader(dataloader: torch.utils.data.DataLoader) -> int: """ Get dimensionality of the data within a dataloader. Parameters ---------- dataloader : torch.utils.data.DataLoader The specified datalaoder Returns ------- dim : int The dimensionality of the data """ dim = torch.numel(next(iter(dataloader))[1][0]) return dim def get_train_and_test_dataloader(X: np.ndarray | torch.Tensor, batch_size: int = 256, custom_dataloaders: tuple = None) -> ( torch.utils.data.DataLoader, torch.utils.data.DataLoader, int): """ Get the train- and testloader for deep clustering algorithms. In contrast to the testloader, the trainloader is usually shuffled. If a custom dataloader is not specified, the data X will be used to create the two dataloaders. Else, the parameters X and batch_size are not used but only the specified custom_dataloaders. More information can be found by checking the documentation of get_dataloader(). Parameters ---------- X : np.ndarray | torch.Tensor the actual data set (can be np.ndarray or torch.Tensor) batch_size : int the batch size (default: 256) custom_dataloaders : tuple tuple consisting of a trainloader (random order) at the first and a test loader (non-random order) at the second position. Can also be a tuple of strings, where the first entry is the path to a saved trainloader and the second entry the path to a saved testloader. In this case the dataloaders will be loaded by torch.load(PATH). If None, the default dataloaders will be used (default: None) Returns ------- tuple : (torch.utils.data.DataLoader, torch.utils.data.DataLoader, int) The trainloader, The testloader, The batch size (can be different from input if another value is used within custom_dataloader) """ # sample random mini-batches from the data -> shuffle = True if custom_dataloaders is None: trainloader = get_dataloader(X, batch_size, True, False) testloader = get_dataloader(X, batch_size, False, False) else: trainloader, testloader = custom_dataloaders # If train-/testloader is string, it can be loaded from a file if type(trainloader) is str: trainloader = torch.load(trainloader) if type(testloader) is str: testloader = torch.load(testloader) if trainloader.batch_size != testloader.batch_size: print( "INFO: Batch size of trainloader and testloader do not match: trainloader = {0}, testloader = {1}".format( trainloader.batch_size, testloader.batch_size)) if trainloader.batch_size != batch_size: print( "WARNING: Specified batch_size differs from trainloader.batch_size. Will use trainloader.batch_size ({0}).".format( trainloader.batch_size)) batch_size = trainloader.batch_size return trainloader, testloader, batch_size
[docs]def get_default_augmented_dataloaders(X: np.ndarray | torch.Tensor, batch_size: int = 256, conv_used: bool = False, flatten: bool = True) -> ( torch.utils.data.DataLoader, torch.utils.data.DataLoader): """ Receive a train- and a test dataloader using default augmentations. These transformations correspond to a min-max normalization followed by torchvision.transforms.RandomAffine(degrees=(-16, +16), translate=(0.1, 0.1), shear=(-8, 8), fill=0) and a channel-wise z-transformation. Optionally, the images can be flatten afterward. Parameters ---------- X : np.ndarray | torch.Tensor the actual data set (can be np.ndarray or torch.Tensor) batch_size : int the batch size (default: 256) conv_used : bool defines whether a convolutional network will be used afterward. In this case, grayscale images will be transformed to receive three color channels by copying the grayscale channel three times (default: False) flatten : bool defines whether the augmented images should be flatten afterward. Must be False if conv_used is True (default: True) Returns ------- tuple : (torch.utils.data.DataLoader, torch.utils.data.DataLoader) The trainloader (with augmentations), The testloader (without augmentations) """ assert not conv_used or not flatten assert X.ndim > 2, "Data matrix X must have more than two dimensions. Please use a corresponding dataset (i.e., non-flatten images)" if type(X) is np.ndarray: # Convert np.ndarray to torch.Tensor X = torch.from_numpy(X).float() X /= torch.max(X) # Reshape grayscale data to CHW format if X.ndim != 4: X = X.reshape(-1, 1, X.shape[1], X.shape[2]) if conv_used and X.shape[1] == 1: X = torch.tile(X, (1, 3, 1, 1)) # preprocessing channel_means = X.mean([0, 2, 3]) channel_stds = X.std([0, 2, 3]) normalize_fn = torchvision.transforms.Normalize(channel_means, channel_stds) # augmentation transforms transform_list = [ torchvision.transforms.ToPILImage(), torchvision.transforms.RandomAffine(degrees=(-16, +16), translate=(0.1, 0.1), shear=(-8, 8), fill=0), torchvision.transforms.ToTensor(), normalize_fn ] orig_transform_list = [normalize_fn] if flatten: flatten_fn = torchvision.transforms.Lambda(torch.flatten) transform_list.append(flatten_fn) orig_transform_list.append(flatten_fn) aug_transforms = torchvision.transforms.Compose(transform_list) orig_transforms = torchvision.transforms.Compose(orig_transform_list) # pass transforms to dataloader aug_dataloader = get_dataloader(X, batch_size=batch_size, shuffle=True, ds_kwargs={"aug_transforms_list": [aug_transforms], "orig_transforms_list": [orig_transforms]}) orig_dataloader = get_dataloader(X, batch_size=batch_size, shuffle=False, ds_kwargs={"orig_transforms_list": [orig_transforms]}) return aug_dataloader, orig_dataloader
def augmentation_invariance_check(augmentation_invariance: bool, custom_dataloaders: tuple) -> None: """ Check if the provided custom_dataloaders are compatible with the assumed structure for learning augmentation invariances. Parameters ---------- augmentation_invariance : bool If True, custom_dataloader will be checked. custom_dataloaders : tuple tuple consisting of a trainloader (random order) at the first and a test loader (non-random order) at the second position. """ if augmentation_invariance and custom_dataloaders is not None: # Make sure that two embeddings of the same shape are returned, assuming that one is the augmented tensor and the other the original tensor trainloader, testloader = custom_dataloaders batch = next(iter(trainloader)) if len(batch) < 3: raise ValueError( f"Augmentation_invariance is True, but custom_dataloaders[0] only returns a list of size {len(batch)} (index, tensor)") if not (all(batch[0].size(0) == tensor.size(0) for tensor in batch) and batch[1].shape == batch[2].shape): raise ValueError( f"Augmentation_invariance is True, but the shapes of the returned batch of custom_dataloaders[0] do not match.") else: if torch.equal(batch[1], batch[2]): raise ValueError( f"Augmentation_invariance is True, but custom_dataloaders[0] returns identical tensors in batch[1] and batch[2] indicating that no augmentation is applied to batch[1]") elif augmentation_invariance and custom_dataloaders is None: raise ValueError( "If augmentation_invariance is True, custom_dataloaders cannot be None, but should include augmented samples, e.g., using torchvision.transforms in get_dataloader.")