try:
import cv2
except:
print("[WARNING] Could not import cv2 in clustpy.data.real_video_data. Please install cv2 by 'pip install opencv-python' if necessary")
from clustpy.data._utils import _download_file, _get_download_dir, flatten_images
import numpy as np
import zipfile
from sklearn.datasets._base import Bunch
from pathlib import Path
"""
Helpers
"""
def _load_video(path: str | Path, image_size: tuple) -> np.ndarray:
"""
Load a video by saving each frame within a numpy array.
Parameters
----------
path : Path | Path
Path to the video
image_size : tuple
The single frames can be downsized. This is necessary for large datasets.
The tuple equals (width, height) of the images.
Can also be None if the image size should not be changed
Returns
-------
video_array : np.ndarray
The array containing the frames
"""
# Load video
vid = cv2.VideoCapture(path)
if not vid.isOpened():
vid.release()
raise IOError(f"OpenCV could not open {path}. This usually indicates missing codecs (ffmpeg/libav).")
video_array = []
# Iterate over frames
try:
while True:
successful, frame_array = vid.read()
if not successful:
break
is_color_image = frame_array.ndim == 3 and frame_array.shape[2] == 3
if is_color_image:
frame_array = cv2.cvtColor(frame_array, cv2.COLOR_BGR2RGB)
if image_size is not None:
frame_array = cv2.resize(frame_array, image_size, interpolation=cv2.INTER_AREA)
video_array.append(frame_array.copy())
finally:
vid.release()
if len(video_array) == 0:
raise ValueError(f"Video at {path} yielded 0 frames. File might be corrupted.")
# Transform list to numpy array
video_array = np.array(video_array, dtype="uint8")
return video_array
def _downsample_frames(data: np.ndarray, labels: np.ndarray, frame_sampling_ratio: float = 1) -> (
np.ndarray, np.ndarray):
"""
Downsample the number of frames within a video.
Parameters
----------
data : np.ndarray
The data array containing the frames
labels : np.ndarray
The labels array
frame_sampling_ratio : float
Ratio to downsample the number of frames. If it is set to 1 all frames will be returned.
Can take values within (0, 1] (default: 1)
Returns
-------
data, labels : (np.ndarray, np.ndarray)
The updated data array, the updated labels array
"""
assert frame_sampling_ratio > 0 and frame_sampling_ratio <= 1, "frame_sampling_ratio must be within (0, 1]"
# Downsample array
if frame_sampling_ratio != 1:
n_samples_orig = data.shape[0]
n_to_delete = int(n_samples_orig - frame_sampling_ratio * n_samples_orig)
indices_to_delete = np.round(np.linspace(0, n_samples_orig - 1, n_to_delete)).astype(int)
data = np.delete(data, indices_to_delete, axis=0)
labels = np.delete(labels, indices_to_delete, axis=0)
assert frame_sampling_ratio <= data.shape[
0] / n_samples_orig, "Difference between frame_sampling_ratio ({0}) and actual sampling ratio ({1}) is too large".format(
frame_sampling_ratio, data.shape[0] / n_samples_orig)
return data, labels
"""
Actual datasets
"""
[docs]def load_video_weizmann(use_actions : tuple = None, use_persons : tuple = None,
image_size: tuple = None, frame_sampling_ratio: float = 1, return_X_y: bool = False,
downloads_path: str | Path = None) -> Bunch:
"""
Load the Weizmann video data set.
It consists of 93 videos showing 9 different persons performing 10 different activities.
We transform the data set by extracting the 5687 144x180 colored frames.
Note that the number of frames can differ depending on the used machine and version of opencv.
The two label sets are the activities and the persons.
N=5687, d=77760, k=[10, 9].
Parameters
----------
use_actions : tuple
Specify the actions. Can be None if all actions should be used (default: None)
use_persons : tuple
Specify the persons. Can be None if all persons should be used (default: None)
image_size : tuple
The single frames can be downsized. This is necessary for large datasets.
The tuple equals (width, height) of the images.
Can also be None if the image size should not be changed (default: None)
frame_sampling_ratio : float
Ratio to downsample the number of frames of each video. If it is set to 1 all frames will be returned.
Can take values within (0, 1] (default: 1)
return_X_y : bool
If True, returns (data, target) instead of a Bunch object. See below for more information about the data and target object (default: False)
downloads_path : str | Path
path to the directory where the data is stored (default: None -> [USER]/Downloads/clustpy_datafiles)
Returns
-------
bunch : Bunch
A Bunch object containing the data in the 'data' attribute and the labels in the 'target' attribute.
Furthermore, the original images are contained in the 'images' attribute.
Note that the data within 'data' is in HWC format and within 'images' in the CHW format.
Alternatively, if return_X_y is True two arrays will be returned:
the data numpy array (5687 x 77760), the labels numpy array (5687 x 2)
References
-------
https://www.wisdom.weizmann.ac.il/~vision/SpaceTimeActions.html
"""
directory = _get_download_dir(downloads_path) / "Video_Weizmann"
all_actions = ["walk", "run", "jump", "side", "bend", "wave1", "wave2", "pjump", "jack", "skip"]
if use_actions is None:
use_actions = all_actions.copy()
assert all([action in all_actions for action in use_actions])
all_persons = ["daria", "denis", "eli", "ido", "ira", "lena", "lyova", "moshe", "shahar"]
if use_persons is None:
use_persons = all_persons.copy()
assert all([person in all_persons for person in use_persons])
all_data_list = []
labels_list = []
# Download data
for action in use_actions:
my_zip_file = action + ".zip"
filename = directory / my_zip_file
if not filename.is_file():
directory.mkdir(parents=False, exist_ok=True)
_download_file(
"https://www.wisdom.weizmann.ac.il/~vision/VideoAnalysis/Demos/SpaceTimeActions/DB/" + my_zip_file,
filename)
# Unpack zipfile
with zipfile.ZipFile(filename, 'r') as zipf:
zipf.extractall(directory)
# Load data, iterate over all video files
for v_file in directory.iterdir():
# Ignore zip files
if v_file.suffix == ".avi":
# Get name of person and type of activity
relevant_parts = v_file.name.split(".")[0]
person = relevant_parts.split("_")[0]
action = relevant_parts.split("_")[1]
# Sometimes a person performs an action twice. In that case a 1/2 is appended to the action
if not action.startswith("wave") and (action.endswith("1") or action.endswith("2")):
action = action[:-1]
assert person in all_persons, "Wrong person. {0} is unknown".format(person)
assert action in all_actions, "Wrong action. {0} is unknown".format(action)
if person not in use_persons or action not in use_actions:
continue
# Load video
data_local = _load_video(directory / v_file, image_size)
# Transform string to label
label_person = use_persons.index(person)
label_action = use_actions.index(action)
labels_local = np.array([[label_action, label_person]] * data_local.shape[0], dtype="int32")
# Downsample frames
data_local, labels_local = _downsample_frames(data_local, labels_local, frame_sampling_ratio)
# Update data and labels
all_data_list.append(data_local)
labels_list.append(labels_local)
all_data = np.concatenate(all_data_list, axis=0)
labels = np.concatenate(labels_list, axis=0)
del all_data_list
del labels_list
# Flatten data
data_flatten = flatten_images(all_data, "HWC")
# Return values
if return_X_y:
return data_flatten, labels
else:
# Get images in correct format
data_image = np.transpose(all_data, [0, 3, 1, 2])
image_format = "CHW"
return Bunch(dataset_name="VideoWeizmann", data=data_flatten, target=labels, images=data_image,
image_format=image_format, classes=(use_actions, use_persons))
[docs]def load_video_keck_gesture(subset: str = "all", image_size: tuple = (200, 200), frame_sampling_ratio: float = 1,
return_X_y: bool = False, downloads_path: str | Path = None) -> Bunch:
"""
Load the Keck Gesture video data set.
It consists of 42 training and 56 testing videos showing 4 different persons performing 14 different gestures.
We assign the label '0' to the gesture 'no gesture', which describes the frames between the actual gestures.
This results in 15 different gestures.
Note, that the person with label '3' is only contained in the testing data.
We transform the data set by extracting the 25457 480x640 colored frames.
Note that the number of frames can differ depending on the used machine and version of opencv.
Further, we recommend to downsize the frames due to possible memory issues.
The final data set is divided into 13546 training and 11911 test images.
The two label sets are the gestures and the persons.
N=25457, d=120000 (for image_size (200, 200)), k=[15, 4].
Parameters
----------
subset : str
can be 'all', 'test' or 'train'. 'all' combines test and train data (default: 'all')
image_size : tuple
The single frames can be downsized. This is necessary for large datasets.
The tuple equals (width, height) of the images.
Can also be None if the image size should not be changed (default: (200, 200)))
frame_sampling_ratio : float
Ratio to downsample the number of frames of each video. If it is set to 1 all frames will be returned.
Can take values within (0, 1] (default: 1)
return_X_y : bool
If True, returns (data, target) instead of a Bunch object. See below for more information about the data and target object (default: False)
downloads_path : str | Path
path to the directory where the data is stored (default: None -> [USER]/Downloads/clustpy_datafiles)
Returns
-------
bunch : Bunch
A Bunch object containing the data in the 'data' attribute and the labels in the 'target' attribute.
Furthermore, the original images are contained in the 'images' attribute.
Note that the data within 'data' is in HWC format and within 'images' in the CHW format.
Alternatively, if return_X_y is True two arrays will be returned:
the data numpy array (25457 x 120000 (for image_size (200, 200))), the labels numpy array (25457 x 2)
References
-------
http://www.zhuolin.umiacs.io/Keckgesturedataset.html
"""
def parse_frames_file(frames_file: str) -> (dict, dict):
"""
Get the specific frames for each gesture from the frames.txt.
Parameters
----------
frames_file : str
path to the frames txt.
Returns
-------
train_dict, test_dict : (dict, dict)
The dictionary for the training data, the dictionary for the testing data
"""
train_dict = {}
test_dict = {}
train_data = True
with open(frames_file, "r") as f:
all_lines = f.readlines()
for line in all_lines:
# Read infos from file
if line.startswith("person"):
person = int(line.split("_")[0].replace("person ", "")) - 1
gesture = int(line.split("_")[1].replace("gesture", ""))
frame_limits = line.split("frames ")[1].split(",")
frames = [(int(single_limit.split("-")[0]), int(single_limit.split("-")[1]) + 1) for single_limit in
frame_limits]
if train_data:
# Train data entry
train_dict[(gesture, person)] = frames
else:
# Test data entry
test_dict[(gesture, person)] = frames
# Switch to test data
if line.startswith("Testing set:"):
train_data = False
return train_dict, test_dict
# Start loading the dataset
subset = subset.lower()
assert subset in ["all", "train",
"test"], "subset must match 'all', 'train' or 'test'. Your input {0}".format(subset)
directory = _get_download_dir(downloads_path) / "Video_Keck_Gesture"
filename = directory / "Keck_Dataset.zip"
frames_file = directory / "sequences.txt"
if not filename.is_file():
directory.mkdir(parents=False, exist_ok=True)
_download_file("http://www.zhuolin.umiacs.io/PrototypeTree/Keck_Dataset.zip", filename)
# Unpack zipfile
with zipfile.ZipFile(filename, 'r') as zipf:
zipf.extractall(directory)
# Get Relevant frames
_download_file("http://www.zhuolin.umiacs.io/PrototypeTree/sequences.txt", frames_file)
# Load data and labels
all_data_list = []
labels_list = []
# Get frame limits from sequences file
frames_train_dict, frames_test_dict = parse_frames_file(frames_file)
# Get necessary directories
file_directories = []
if subset == "all" or subset == "train":
file_directories.append((True, "training files/"))
if subset == "all" or subset == "test":
file_directories.append((False, "testingfiles/"))
# load videos
for train_data, file_directory in file_directories:
directory_files = directory / "Keck Dataset" / file_directory
# Iterate over all video files
for v_file in directory_files.iterdir():
v_file_str = v_file.name
data_local = _load_video(v_file, image_size)
# Transform string to label
label_gesture = int(v_file_str.split("_")[1].replace("gesture", ""))
label_person = int(v_file_str.split("_")[0].replace("person", "")) - 1
labels_local = np.array([[0, label_person]] * data_local.shape[0], dtype="int32")
# Use frames_dicts to set gestures correctly
if train_data:
for start, end in frames_train_dict[(label_gesture, label_person)]:
labels_local[start:end, 0] = label_gesture
else:
for start, end in frames_test_dict[(label_gesture, label_person)]:
labels_local[start:end, 0] = label_gesture
# Downsample frames
data_local, labels_local = _downsample_frames(data_local, labels_local, frame_sampling_ratio)
# Update data and labels
all_data_list.append(data_local)
labels_list.append(labels_local)
all_data = np.concatenate(all_data_list, axis=0)
labels = np.concatenate(labels_list, axis=0)
del all_data_list
del labels_list
# Flatten data
data_flatten = flatten_images(all_data, "HWC")
# Return values
if return_X_y:
return data_flatten, labels
else:
# Get images in correct format
data_image = np.transpose(all_data, [0, 3, 1, 2])
image_format = "CHW"
return Bunch(dataset_name="VideoKeckGesture", data=data_flatten, target=labels, images=data_image,
image_format=image_format)