Source code for clustpy.data.real_video_data

try:
    import cv2
except:
    print("[WARNING] Could not import cv2 in clustpy.data.real_video_data. Please install cv2 by 'pip install opencv-python' if necessary")
from clustpy.data._utils import _download_file, _get_download_dir, _load_image_data, flatten_images
import numpy as np
import os
import zipfile
from sklearn.datasets._base import Bunch

"""
Helpers
"""


def _load_video(path: str, image_size: tuple) -> np.ndarray:
    """
    Load a video by saving each frame within a numpy array.

    Parameters
    ----------
    path : str
        Path to the video
    image_size : tuple
        The single frames can be downsized. This is necessary for large datasets.
        The tuple equals (width, height) of the images.
        Can also be None if the image size should not be changed

    Returns
    -------
    video_array : np.ndarray
        The array containing the frames
    """
    # Load video
    vid = cv2.VideoCapture(path)
    video_array = []
    # Iterate over frames
    successful = True
    while successful:
        successful, frame_array = vid.read()
        if successful:
            is_color_image = frame_array.ndim == 3 and frame_array.shape[2] == 3
            if is_color_image:
                frame_array = cv2.cvtColor(frame_array, cv2.COLOR_BGR2RGB)
            if image_size is not None:
                frame_array = _load_image_data(frame_array, image_size, is_color_image)
            video_array.append(frame_array)
    vid.release()
    # Transform list to numpy array
    video_array = np.array(video_array, dtype="uint8")
    return video_array


def _downsample_frames(data: np.ndarray, labels: np.ndarray, frame_sampling_ratio: float = 1) -> (
        np.ndarray, np.ndarray):
    """
    Downsample the number of frames within a video.

    Parameters
    ----------
    data : np.ndarray
        The data array containing the frames
    labels : np.ndarray
        The labels array
    frame_sampling_ratio : float
        Ratio to downsample the number of frames. If it is set to 1 all frames will be returned.
        Can take values within (0, 1] (default: 1)

    Returns
    -------
    data, labels : (np.ndarray, np.ndarray)
        The updated data array, the updated labels array
    """
    assert frame_sampling_ratio > 0 and frame_sampling_ratio <= 1, "frame_sampling_ratio must be within (0, 1]"
    # Downsample array
    if frame_sampling_ratio != 1:
        n_samples_orig = data.shape[0]
        n_to_delete = int(n_samples_orig - frame_sampling_ratio * n_samples_orig)
        indices_to_delete = np.round(np.linspace(0, n_samples_orig - 1, n_to_delete)).astype(int)
        data = np.delete(data, indices_to_delete, axis=0)
        labels = np.delete(labels, indices_to_delete, axis=0)
        assert frame_sampling_ratio <= data.shape[
            0] / n_samples_orig, "Difference between frame_sampling_ratio ({0}) and actual sampling ratio ({1}) is too large".format(
            frame_sampling_ratio, data.shape[0] / n_samples_orig)
    return data, labels


"""
Actual datasets
"""


[docs]def load_video_weizmann(image_size: tuple = None, frame_sampling_ratio: float = 1, return_X_y: bool = False, downloads_path: str = None) -> Bunch: """ Load the Weizmann video data set. It consists of 93 videos showing 9 different persons performing 10 different activities. We transform the data set by extracting the 5687 144x180 colored frames. Note that the number of frames can differ depending on the used machine and version of opencv. The two label sets are the activities and the persons. N=5687, d=77760, k=[10, 9]. Parameters ---------- image_size : tuple The single frames can be downsized. This is necessary for large datasets. The tuple equals (width, height) of the images. Can also be None if the image size should not be changed (default: None) frame_sampling_ratio : float Ratio to downsample the number of frames of each video. If it is set to 1 all frames will be returned. Can take values within (0, 1] (default: 1) return_X_y : bool If True, returns (data, target) instead of a Bunch object. See below for more information about the data and target object (default: False) downloads_path : str path to the directory where the data is stored (default: None -> [USER]/Downloads/clustpy_datafiles) Returns ------- bunch : Bunch A Bunch object containing the data in the 'data' attribute and the labels in the 'target' attribute. Furthermore, the original images are contained in the 'images' attribute. Note that the data within 'data' is in HWC format and within 'images' in the CHW format. Alternatively, if return_X_y is True two arrays will be returned: the data numpy array (5687 x 77760), the labels numpy array (5687 x 2) References ------- https://www.wisdom.weizmann.ac.il/~vision/SpaceTimeActions.html """ directory = _get_download_dir(downloads_path) + "/Video_Weizmann/" all_actions = ["walk", "run", "jump", "side", "bend", "wave1", "wave2", "pjump", "jack", "skip"] all_persons = ["daria", "denis", "eli", "ido", "ira", "lena", "lyova", "moshe", "shahar"] all_data = np.zeros( (0, 144 if image_size is None else image_size[0], 180 if image_size is None else image_size[1], 3), dtype="uint8") labels = np.zeros((0, 2), dtype="int32") # Download data for action in all_actions: my_zip_file = action + ".zip" filename = directory + my_zip_file if not os.path.isfile(filename): if not os.path.isdir(directory): os.mkdir(directory) _download_file( "https://www.wisdom.weizmann.ac.il/~vision/VideoAnalysis/Demos/SpaceTimeActions/DB/" + my_zip_file, filename) # Unpack zipfile with zipfile.ZipFile(filename, 'r') as zipf: zipf.extractall(directory) # Load data, iterate over all video files for v_file in os.listdir(directory): # Ignore zip files if v_file.endswith(".avi"): data_local = _load_video(directory + "/" + v_file, image_size) # Get name of person and type of activity relevant_parts = v_file.split(".")[0] person = relevant_parts.split("_")[0] action = relevant_parts.split("_")[1] # Sometimes a person performs an action twice. In that case a 1/2 is appended to the action if not action.startswith("wave") and (action.endswith("1") or action.endswith("2")): action = action[:-1] assert person in all_persons, "Wrong person. {0} is unknown".format(person) assert action in all_actions, "Wrong action. {0} is unknown".format(action) # Transform string to label label_person = all_persons.index(person) label_action = all_actions.index(action) labels_local = np.array([[label_action, label_person]] * data_local.shape[0], dtype="int32") # Downsample frames data_local, labels_local = _downsample_frames(data_local, labels_local, frame_sampling_ratio) # Update data and labels all_data = np.append(all_data, data_local, axis=0) labels = np.append(labels, labels_local, axis=0) # Flatten data data_flatten = flatten_images(all_data, "HWC") # Return values if return_X_y: return data_flatten, labels else: # Get images in correct format data_image = np.transpose(all_data, [0, 3, 1, 2]) image_format = "CHW" return Bunch(dataset_name="VideoWeizmann", data=data_flatten, target=labels, images=data_image, image_format=image_format)
[docs]def load_video_keck_gesture(subset: str = "all", image_size: tuple = (200, 200), frame_sampling_ratio: float = 1, return_X_y: bool = False, downloads_path: str = None) -> Bunch: """ Load the Keck Gesture video data set. It consists of 42 training and 56 testing videos showing 4 different persons performing 14 different gestures. We assign the label '0' to the gesture 'no gesture', which describes the frames between the actual gestures. This results in 15 different gestures. Note, that the person with label '3' is only contained in the testing data. We transform the data set by extracting the 25457 480x640 colored frames. Note that the number of frames can differ depending on the used machine and version of opencv. Further, we recommend to downsize the frames due to possible memory issues. The final data set is divided into 13546 training and 11911 test images. The two label sets are the gestures and the persons. N=25457, d=120000 (for image_size (200, 200)), k=[15, 4]. Parameters ---------- subset : str can be 'all', 'test' or 'train'. 'all' combines test and train data (default: 'all') image_size : tuple The single frames can be downsized. This is necessary for large datasets. The tuple equals (width, height) of the images. Can also be None if the image size should not be changed (default: (200, 200))) frame_sampling_ratio : float Ratio to downsample the number of frames of each video. If it is set to 1 all frames will be returned. Can take values within (0, 1] (default: 1) return_X_y : bool If True, returns (data, target) instead of a Bunch object. See below for more information about the data and target object (default: False) downloads_path : str path to the directory where the data is stored (default: None -> [USER]/Downloads/clustpy_datafiles) Returns ------- bunch : Bunch A Bunch object containing the data in the 'data' attribute and the labels in the 'target' attribute. Furthermore, the original images are contained in the 'images' attribute. Note that the data within 'data' is in HWC format and within 'images' in the CHW format. Alternatively, if return_X_y is True two arrays will be returned: the data numpy array (25457 x 120000 (for image_size (200, 200))), the labels numpy array (25457 x 2) References ------- http://www.zhuolin.umiacs.io/Keckgesturedataset.html """ def parse_frames_file(frames_file: str) -> (dict, dict): """ Get the specific frames for each gesture from the frames.txt. Parameters ---------- frames_file : str path to the frames txt. Returns ------- train_dict, test_dict : (dict, dict) The dictionary for the training data, the dictionary for the testing data """ train_dict = {} test_dict = {} train_data = True with open(frames_file, "r") as f: all_lines = f.readlines() for line in all_lines: # Read infos from file if line.startswith("person"): person = int(line.split("_")[0].replace("person ", "")) - 1 gesture = int(line.split("_")[1].replace("gesture", "")) frame_limits = line.split("frames ")[1].split(",") frames = [(int(single_limit.split("-")[0]), int(single_limit.split("-")[1]) + 1) for single_limit in frame_limits] if train_data: # Train data entry train_dict[(gesture, person)] = frames else: # Test data entry test_dict[(gesture, person)] = frames # Switch to test data if line.startswith("Testing set:"): train_data = False return train_dict, test_dict # Start loading the dataset subset = subset.lower() assert subset in ["all", "train", "test"], "subset must match 'all', 'train' or 'test'. Your input {0}".format(subset) directory = _get_download_dir(downloads_path) + "/Video_Keck_Gesture/" filename = directory + "Keck_Dataset.zip" frames_file = directory + "sequences.txt" if not os.path.isfile(filename): if not os.path.isdir(directory): os.mkdir(directory) _download_file("http://www.zhuolin.umiacs.io/PrototypeTree/Keck_Dataset.zip", filename) # Unpack zipfile with zipfile.ZipFile(filename, 'r') as zipf: zipf.extractall(directory) # Get Relevant frames _download_file("http://www.zhuolin.umiacs.io/PrototypeTree/sequences.txt", frames_file) # Load data and labels all_data = np.zeros( (0, 480 if image_size is None else image_size[0], 640 if image_size is None else image_size[1], 3), dtype="uint8") labels = np.zeros((0, 2), dtype="int32") # Get frame limits from sequences file frames_train_dict, frames_test_dict = parse_frames_file(frames_file) # Get necessary directories file_directories = [] if subset == "all" or subset == "train": file_directories.append((True, "training files/")) if subset == "all" or subset == "test": file_directories.append((False, "testingfiles/")) # load videos for train_data, file_directory in file_directories: directory_files = directory + "Keck Dataset/" + file_directory # Iterate over all video files for v_file in os.listdir(directory_files): data_local = _load_video(directory_files + v_file, image_size) # Transform string to label label_gesture = int(v_file.split("_")[1].replace("gesture", "")) label_person = int(v_file.split("_")[0].replace("person", "")) - 1 labels_local = np.array([[0, label_person]] * data_local.shape[0], dtype="int32") # Use frames_dicts to set gestures correctly if train_data: for start, end in frames_train_dict[(label_gesture, label_person)]: labels_local[start:end, 0] = label_gesture else: for start, end in frames_test_dict[(label_gesture, label_person)]: labels_local[start:end, 0] = label_gesture # Downsample frames data_local, labels_local = _downsample_frames(data_local, labels_local, frame_sampling_ratio) # Update data and labels all_data = np.append(all_data, data_local, axis=0) labels = np.append(labels, labels_local, axis=0) # Flatten data data_flatten = flatten_images(all_data, "HWC") # Return values if return_X_y: return data_flatten, labels else: # Get images in correct format data_image = np.transpose(all_data, [0, 3, 1, 2]) image_format = "CHW" return Bunch(dataset_name="VideoKeckGesture", data=data_flatten, target=labels, images=data_image, image_format=image_format)