Source code for clustpy.data._utils

try:
    import requests
except:
    print("[WARNING] Could not import requests in clustpy.data._utils. Please install requests by 'pip install requests' if necessary")
try:
    from nltk.stem import SnowballStemmer
except:
    print(
        "[WARNING] Could not import nltk in clustpy.data.real_world_data to use the SnowballStemmer. Please install nltk by 'pip install nltk' if necessary")
try:
    from PIL import Image
except:
    print(
        "[WARNING] Could not import PIL in clustpy.data.real_world_data. Please install PIL by 'pip install Pillow' if necessary")
import numpy as np
import os
from pathlib import Path, PurePath
from sklearn.feature_extraction.text import TfidfTransformer, CountVectorizer
from sklearn.feature_selection import VarianceThreshold
from sklearn.datasets import fetch_file
import subprocess


DEFAULT_DOWNLOAD_PATH = Path.home() / "Downloads" / "clustpy_datafiles"


def _get_download_dir(downloads_path: str | Path) -> Path:
    """
    Helper function to define the path where the data files should be stored. If downloads_path is None then default path
    '[USER]/Downloads/clustpy_datafiles' will be used. If the directory does not exists it will be created.

    Parameters
    ----------
    downloads_path : str | Path
        path to the directory where the data will be stored. Can be None

    Returns
    -------
    downloads_path : str
        path to the directory where the data will be stored. If input was None this will be equal to
        '[USER]/Downloads/clustpy_datafiles'
    """
    if downloads_path is None:
        env_data_path = os.environ.get("CLUSTPY_DATA", None)
        if env_data_path is None:
            downloads_path = DEFAULT_DOWNLOAD_PATH
        else:
            downloads_path = Path(env_data_path)
    elif isinstance(downloads_path, str):
        # Cast str to Path
        downloads_path = Path(downloads_path)
    if not downloads_path.is_dir():
        downloads_path.mkdir(parents=True, exist_ok=False)
        with open(downloads_path / "info.txt", "w") as f:
            f.write("This directory was created by the ClustPy python package to store real world data sets.\n"
                    "The default directory is '[USER]/Downloads/clustpy_datafiles' and can be changed with the "
                    "'downloads_path' parameter when loading a data set.\n"
                    "Alternatively, a global python environment variable for the path can be defined with os.environ['CLUSTPY_DATA'] = 'PATH'.")
    return downloads_path


def _download_file(file_url: str, filename_local: str | Path) -> None:
    """
    Helper function to download a file into a specified location.

    Parameters
    ----------
    file_url : str
        URL of the file
    filename_local : str | Path
        local name of the file after it has been downloaded
    """
    if isinstance(filename_local, str):
        filename_local = Path(filename_local)
    local_dir = filename_local.parent
    local_filename = filename_local.name
    print("Downloading data set from {0} to {1}".format(file_url, filename_local))
    fetch_file(file_url, folder=local_dir, local_filename=local_filename)


def _download_file_from_google_drive(file_id: str, filename_local: str | Path, chunk_size: int = 32768) -> None:
    """
    Download a file from google drive.
    Code taken from:
    https://stackoverflow.com/questions/38511444/python-download-files-from-google-drive-using-url

    Parameters
    ----------
    file_id : str
        ID of the file on google drive
    filename_local : str | Path
        local name of the file after it has been downloaded
    chunk_size : int
        chink size when downloading the file (default: 32768)
    """
    print("Downloading data set {0} from Google Drive to {1}".format(file_id, filename_local))
    URL = "https://drive.google.com/uc"
    session = requests.Session()
    response = session.get(URL, params={"id": file_id, "confirm": "t"}, stream=True)
    if response.text.startswith("<!DOCTYPE"):
        # Large files can not be obtained automatically but need a second request
        try:
            URL_extracted = response.text.split("download-form\" action=\"")[1].split("\" method=\"get\"")[0]
            uuid = response.text.split("name=\"uuid\" value=\"")[1].split("\">")[0]
        except:
            raise Exception("[ERROR] New URL and UUID could not be extracted from first request in _download_file_from_google_drive")
        response = session.get(URL_extracted, params={"id": file_id, "confirm": "t", "uuid": uuid}, stream=True)
    with open(filename_local, "wb") as f:
        for chunk in response.iter_content(chunk_size):
            if chunk:  # filter out keep-alive new chunks
                f.write(chunk)
    session.close()


def _load_data_file(filename_local: Path, file_url: str, delimiter: str = ",", last_column_are_labels: bool = True) -> tuple[
        np.ndarray, np.ndarray]:
    """
    Helper function to load a data file. Either the first or last column, depending on last_column_are_labels, of the
    data file is used as the label column.
    If file does not exist on the local machine it will be downloaded.

    Parameters
    ----------
    filename_local : Path
        local name of the file after it has been downloaded
    file_url : str
        URL of the file
    delimiter : str
        delimiter in the data file (default: ";")
    last_column_are_labels : bool
        specifies if the last column contains the labels. If false labels should be contained in the first column (default: True)

    Returns
    -------
    data, labels : tuple[np.ndarray, np.ndarray]
        the data numpy array, the labels numpy array
    """
    if not filename_local.is_file():
        _download_file(file_url, filename_local)
    datafile = np.genfromtxt(filename_local, delimiter=delimiter)
    if last_column_are_labels:
        data = datafile[:, :-1]
        labels = datafile[:, -1]
    else:
        data = datafile[:, 1:]
        labels = datafile[:, 0]
    # Convert labels to int32 format
    labels = labels.astype(np.int32)
    return data, labels


def _decompress_z_file(filename: str | Path, directory: str | Path) -> bool:
    """
    Helper function to decompress a 7z file. The function uses an installed version of 7zip to decompress the file.
    If 7zip is not installed on this machine, the function will return False and a warning is printed.

    Parameters
    ----------
    filename : str
        name of the file that should be decompressed
    directory : str
        directory of the file that should be decompressed

    Returns
    -------
    successful : bool
        True if decompression was successful, else False
    """
    if isinstance(filename, str):
        filename = Path(filename)
    if isinstance(directory, str):
        directory = Path(directory)
    cmd = ["7z", "x", filename.as_posix(), f"-o{directory.as_posix()}"]
    try:
        subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
    except (subprocess.CalledProcessError, FileNotFoundError):
        print("[WARNING] 7Zip extraction failed or 7z executable is missing!")
        return False
    if not filename.with_suffix('').is_file():
        # If no file without .z exists, decompression was not successful
        print("[WARNING] Decompression check failed: expected file not found.")
        return False
    return True


def _load_image_data(image: str | Path | np.ndarray, image_size: tuple, color_image: bool) -> np.ndarray:
    """
    Load image and convert it into a coherent size. Returns a numpy array containing the image data.

    Parameters
    ----------
    image : str | Path | np.ndarray
        Path to the image. Can also be a numpy array containing the specific pixels
    image_size : tuple
        images of various sizes can be converted into a coherent size.
        The tuple equals (width, height) of the images.
        Can also be None if the image size should not be changed
    color_image : bool
        Specifies if the loaded image is a color image

    Returns
    -------
    image_data : np.ndarray
        The numpy array containing the image data
    """
    if isinstance(image, (str, PurePath)):
        pil_image = Image.open(image)
    else:
        pil_image = Image.fromarray(np.uint8(image))
    if color_image:
        pil_image = pil_image.convert("RGB")
    # Convert to coherent size
    if image_size is not None:
        pil_image = pil_image.resize(image_size)
    image_data = np.array(pil_image).copy()
    pil_image.close()
    assert image_size is None or image_data.shape == (
        image_size[0], image_size[1], 3), "Size of image is not correct. Should be {0} but is {1}".format(image_size,
                                                                                                          image_data.shape)
    return image_data


class _StemmedCountVectorizer(CountVectorizer):
    """
    Helper class to apply the stemming when counting words in a corpus. Combines the sklearn CountVectorizer with the nltk SnowballStemmer.
    See: https://stackoverflow.com/questions/36182502/add-stemming-support-to-countvectorizer-sklearn
    """

    def build_analyzer(self):
        """
        Custom build_analyzer method. Calls the build_analyzer of the CountVectorizer parent class and then applies
        SnowballStemmer('english')

        Returns
        -------
        stemmed_words : Generator
            the stemmed words in the document
        """
        stemmer = SnowballStemmer('english')
        analyzer = super(_StemmedCountVectorizer, self).build_analyzer()
        stemmed_words = lambda doc: (stemmer.stem(word) for word in analyzer(doc))
        return stemmed_words


def _transform_text_data(data: np.ndarray, use_tfidf: bool, use_stemming: bool, use_stop_words: bool, max_df: float | int, 
                         min_df: float | int, max_features: int, min_variance : float, sublinear_tf: bool, 
                         data_all: np.ndarray | None = None) -> tuple[np.ndarray, list[str]]:
    """
    Transform a set of texts into a data matrix.
    Result can be either a raw count matrix or the result of tf-idf.
    The pipeline is: creation of the count matrix -> (optional) remove words/features with low variance -> (optional) apply tf-idf

    Parameters
    ----------
    data : np.ndarray
        The given data set containing the raw texts
    use_tfidf : bool
        If true, tf-idf will be applied as the last step of the pipeline
    use_stemming : bool
        If true, the SnowballStemmer from nltk will be used when creating the count matrix
    use_stop_words : bool
        If true, the list of English stopwords from sklearn CountVectorizer will be used
    max_df : float | int
        Ignore words that have a document frequency strictly higher than max_df. 
        If float, the parameter represents a proportion of documents, integer corresponds to absolute counts (see sklearn CountVectorizer)
    min_df : float | int
        Ignore words that have a document frequency strictly lower than min_df.
        If float, the parameter represents a proportion of documents, integer corresponds to absolute counts (see sklearn CountVectorizer)
    max_features : int
        If not None, the resulting count matric will ony contain the top max_features ordered by term frequency across the corpus (see sklearn CountVectorizer).
        Note that this value could be further reduced if min_variance is smaller than one
    min_variance : float
        Features with a variance lower than min_variance will be removed (see sklearn VarianceThreshold). 
        The default is to keep all features with non-zero variance, i.e. remove only the features that have the same value in all samples 
    sublinear_tf : bool
        Apply sublinear term frequency scaling, i.e. replace tf with 1 + log(tf) (see sklearn TfidfTransformer)
    data_all : np.ndarray | None
        The complete data set, i.e., if no subset is used. If it is None, it will be equal to data (default: None)

    Returns
    -------
    tuple : tuple[np.ndarray, list[str]]
        The resulting data array,
        The vocabulary of the data output
    """
    if data_all is None:
        data_all = data
    # Create count matrix
    if use_stemming:
        vectorizer = _StemmedCountVectorizer(dtype=np.float64, stop_words="english" if use_stop_words else None, min_df=min_df, max_df=max_df, max_features=max_features)
    else:
        vectorizer = CountVectorizer(dtype=np.float64, stop_words="english" if use_stop_words else None, min_df=min_df, max_df=max_df, max_features=max_features)
    data_sparse_all = vectorizer.fit_transform(data_all)
    data_sparse = vectorizer.transform(data)
    vocabulary = vectorizer.get_feature_names_out()
    # (Optional) Check for variance threshold
    if min_variance != 0:
        selector = VarianceThreshold(min_variance)
        data_sparse_all = selector.fit_transform(data_sparse_all)
        data_sparse = selector.transform(data_sparse)
        vocabulary_mask = selector._get_support_mask()
        vocabulary = vocabulary[vocabulary_mask]
    # (Optional) Apply tf-idf
    if use_tfidf:
        tfidf = TfidfTransformer(sublinear_tf=sublinear_tf)
        tfidf.fit(data_sparse_all)
        data_sparse = tfidf.transform(data_sparse)
    data = np.asarray(data_sparse.todense())
    return data, vocabulary


[docs]def flatten_images(data: np.ndarray, format: str) -> np.ndarray: """ Convert data array from image to numerical vector. Before flattening, color images will be converted to the HWC/HWDC (height, width, color channels) format. Parameters ---------- data : np.ndarray The given data set format : str Format of the images with the data array. Can be: "HW", "HWD", "CHW", "CHWD", "HWC", "HWDC". Abbreviations stand for: H: Height, W: Width, D: Depth, C: Color-channels Returns ------- data : np.ndarray The flatten data array """ format_possibilities = ["HW", "HWD", "CHW", "CHWD", "HWC", "HWDC"] assert format in format_possibilities, "Format must be within {0}".format(format_possibilities) if format == "HW": assert data.ndim == 3, f"ndim has to be 3 but is {data.ndim}" elif format in ["HWD", "CHW", "HWC"]: assert data.ndim == 4, f"ndim has to be 4 but is {data.ndim}" elif format in ["CHWD", "HWDC"]: assert data.ndim == 5, f"ndim has to be 5 but is {data.ndim}" # Flatten shape if format != "HW" and format != "HWD": if format == "CHW": # Change representation to HWC data = np.transpose(data, [0, 2, 3, 1]) elif format == "CHWD": # Change representation to HWDC data = np.transpose(data, [0, 2, 3, 4, 1]) assert data.shape[ -1] == 3, "Color-channels must be in the last position and contain three channels not {0} ({1})".format( data.shape[-1], data.shape) data = data.reshape(data.shape[0], -1) return data
[docs]def unflatten_images(data_flatten: np.ndarray, image_size: tuple) -> np.ndarray: """ Convert data array from numerical vector to image. After unflattening, color images will be converted to the CHW/CHWD (color channels, height, width) format. Parameters ---------- data_flatten : np.ndarray The given flatten data set image_size : str The size of a single image, e.g., (28,28,3) for a colored image of size 28 x 28 Returns ------- data_image : np.ndarray The unflatten data array corresponding to an image """ new_shape = tuple([-1] + [i for i in image_size]) data_image = data_flatten.reshape(new_shape) # Change image from HWC/HWDC to CHW/CHWD if data_image.ndim == 4 and image_size[-1] == 3: data_image = np.transpose(data_image, (0, 3, 1, 2)) elif data_image.ndim == 5 and image_size[-1] == 3: data_image = np.transpose(data_image, (0, 4, 1, 2, 3)) return data_image