"""
@authors:
Collin Leiber
"""
from sklearn.base import BaseEstimator, ClusterMixin
import numpy as np
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from sklearn.utils import check_random_state
from sklearn.metrics.pairwise import pairwise_distances_argmin_min
def _clustering_via_orthogonalization(X: np.ndarray, n_clusters: list, explained_variance_for_clustering: float,
do_orthogonal_clustering: bool, random_state: np.random.RandomState) -> (
np.ndarray, list, list, list, np.ndarray):
"""
Start the actual Orthogonal Clustering (Orth1) or Clustering in Orthogonal Spaces (Orth2) procedure on the input data set.
Parameters
----------
X : np.ndarray
the given data set
n_clusters : list
list containing number of clusters for each subspace
explained_variance_for_clustering : float
Defines the variances that is contained in the subspace used for clustering. If this value is 1, PCA will not be executed before performing KMeans
do_orthogonal_clustering : bool
Defines if the feature transformation of 'Orthogonal Clustering' or 'Clustering in Orthogonal Spaces' should be applied
random_state : np.random.RandomState
use a fixed random state to get a repeatable solution
Returns
-------
tuple : (np.ndarray, list, list, list, np.ndarray)
The labels,
The cluster centers,
The projections,
The PCA transformations,
The mean value of the data set
"""
assert explained_variance_for_clustering > 0 and explained_variance_for_clustering <= 1, "explained_variancefor_clustering must be within (0, 1)"
labels = np.zeros((X.shape[0], len(n_clusters)), dtype=np.int32)
cluster_centers = []
projections = []
PCAs = [] if explained_variance_for_clustering != 1 else None
# Center data
global_mean = np.mean(X, axis=0)
X = X - global_mean
for subspace, k in enumerate(n_clusters):
# (Optional) Execute PCA before clustering
if explained_variance_for_clustering != 1:
pca = PCA(explained_variance_for_clustering)
X_subspace = pca.fit_transform(X)
PCAs.append(pca)
else:
X_subspace = X
# Execute clustering
km = KMeans(k, random_state=random_state)
km.fit(X_subspace)
# Save labels
labels[:, subspace] = km.labels_
# Get orthogonal space. Note that due to PCA, KMeans centers can be lower-dimensional
if do_orthogonal_clustering:
X, proj, centers_subspace = _orthogonal_clustering_transform(X, km)
else:
X, proj, centers_subspace = _clustering_in_orthogonal_spaces_transform(X, km)
cluster_centers.append(centers_subspace)
projections.append(proj)
return labels, cluster_centers, projections, PCAs, global_mean
def _orthogonal_clustering_transform(X: np.ndarray, km: KMeans) -> (np.ndarray, np.ndarray, np.ndarray):
"""
Execute the Orthogonal clustering (Orth1) feature transformation.
Parameters
----------
X : np.ndarray
the given data set
km : KMeans
The current KMeans result
Returns
-------
tuple : (np.ndarray, np.ndarray, np.ndarray)
The transformed data set,
The executed projection,
The full-dimensional cluster centers
"""
centers_subspace = np.zeros((km.n_clusters, X.shape[1]))
projections_subspace = np.zeros((km.n_clusters, X.shape[1], X.shape[1]))
for c in range(km.n_clusters):
# Get full-dimensional center
center = np.mean(X[km.labels_ == c], axis=0)
# Execute transformation
proj = np.identity(X.shape[1]) - center.reshape(-1, 1) @ center.reshape(1, -1) / (
center.reshape(1, -1) @ center.reshape(-1, 1))
X[km.labels_ == c] = X[km.labels_ == c] @ proj
centers_subspace[c] = center
projections_subspace[c] = proj
return X, projections_subspace, centers_subspace
def _clustering_in_orthogonal_spaces_transform(X: np.ndarray, km: KMeans) -> (np.ndarray, np.ndarray, np.ndarray):
"""
Execute the Clustering in Orthogonal Spaces (Orth2) feature transformation.
Parameters
----------
X : np.ndarray
the given data set
km : KMeans
The current KMeans result
Returns
-------
tuple : (np.ndarray, np.ndarray, np.ndarray)
The transformed data set,
The executed projection,
The full-dimensional cluster centers
"""
# Get full-dimensional center
centers_subspace = np.array([np.mean(X[km.labels_ == c], axis=0) for c in range(km.n_clusters)])
# Execute transformation
pca_subspace = PCA()
pca_subspace.fit(centers_subspace)
A = pca_subspace.components_[:min(km.n_clusters - 1, X.shape[1])]
P = np.identity(A.shape[1]) - A.T @ np.linalg.inv(A @ A.T) @ A
X = X @ P
return X, P, centers_subspace
[docs]class OrthogonalClustering(BaseEstimator, ClusterMixin):
"""
Execute the Orthogonal Clustering procedure (Orth1).
The algorithm will search for multiple clustering solutions by transforming the feature space after each KMeans execution.
The number of subspaces will automatically be traced by the length of the input n_clusters array.
Parameters
----------
n_clusters : list
list containing number of clusters for each subspace
explained_variance_for_clustering : float
Defines the variance that is contained in the subspace used for clustering. This subspace is received by performing PCA.
If explained_variance_for_clustering is 1, PCA will not be executed before performing KMeans (default: 0.9)
random_state : np.random.RandomState
use a fixed random state to get a repeatable solution. Can also be of type int (default: None)
Attributes
----------
labels_ : np.ndarray
The final labels
cluster_centers_ : list
The final cluster centers
projections_ : list
The orthogonal projections
PCAs_ : list
The PCA transformations
global_mean_ : np.ndarray
The mean value of the fitted data set
References
----------
Cui, Ying, Xiaoli Z. Fern, and Jennifer G. Dy. "Non-redundant multi-view clustering via orthogonalization."
Seventh IEEE international conference on data mining (ICDM 2007). IEEE, 2007.
"""
def __init__(self, n_clusters: list, explained_variance_for_clustering: float = 0.9,
random_state: np.random.RandomState = None):
self.n_clusters = n_clusters
self.explained_variance_for_clustering = explained_variance_for_clustering
self.random_state = check_random_state(random_state)
[docs] def fit(self, X: np.ndarray, y: np.ndarray = None) -> 'OrthogonalClustering':
"""
Initiate the actual clustering process on the input data set.
The resulting cluster labels will be stored in the labels_ attribute.
Parameters
----------
X : np.ndarray
the given data set
y : np.ndarray
the labels (can be ignored)
Returns
-------
self : OrthogonalClustering
this instance of the OrthogonalClustering algorithm
"""
labels, centers, projections, pcas, global_mean = _clustering_via_orthogonalization(X, self.n_clusters,
self.explained_variance_for_clustering,
True, self.random_state)
self.labels_ = labels
self.cluster_centers_ = centers
self.projections_ = projections
self.PCAs_ = pcas
self.global_mean_ = global_mean
return self
[docs] def predict(self, X: np.ndarray) -> np.ndarray:
"""
Predict the labels of an input dataset. For this method the results from the fit() method will be used.
Parameters
----------
X : np.ndarray
the given data set
Returns
-------
predicted_labels : np.ndarray
the predicted labels of the input data set for each subspace. Shape equals (n_samples x n_subspaces)
"""
# Check if algorithm has run
assert hasattr(self, "labels_"), "The algorithm has not run yet. Use the fit() function first."
predicted_labels = np.zeros((X.shape[0], len(self.n_clusters)), dtype=np.int32)
# Get labels for each subspace
for subspace in range(len(self.n_clusters)):
X_transform = self.transform_subspace(X, subspace)
if self.PCAs_ is not None:
X_transform = self.PCAs_[subspace].transform(X_transform)
centers_subspace = self.PCAs_[subspace].transform(self.cluster_centers_[subspace])
else:
centers_subspace = self.cluster_centers_[subspace]
labels_tmp, _ = pairwise_distances_argmin_min(X=X_transform, Y=centers_subspace,
metric='euclidean',
metric_kwargs={'squared': True})
predicted_labels[:, subspace] = labels_tmp
# Return the predicted labels
return predicted_labels
[docs]class ClusteringInOrthogonalSpaces(OrthogonalClustering):
"""
Execute the Clustering In Orthogonal Spaces procedure (Orth2).
The algorithm will search for multiple clustering solutions by transforming the feature space after each KMeans execution.
The number of subspaces will automatically be traced by the length of the input n_clusters array.
Parameters
----------
n_clusters : list
list containing number of clusters for each subspace
explained_variance_for_clustering : float
Defines the variance that is contained in the subspace used for clustering. This subspace is received by performing PCA.
If explained_variance_for_clustering is 1, PCA will not be executed before performing KMeans (default: 0.9)
random_state : np.random.RandomState | int
use a fixed random state to get a repeatable solution. Can also be of type int (default: None)
Attributes
----------
labels_ : np.ndarray
The final labels
cluster_centers_ : list
The final cluster centers
projections_ : list
The orthogonal projections
PCAs_ : list
The PCA transformations
global_mean_ : np.ndarray
The mean value of the fitted data set
References
----------
Cui, Ying, Xiaoli Z. Fern, and Jennifer G. Dy. "Non-redundant multi-view clustering via orthogonalization."
Seventh IEEE international conference on data mining (ICDM 2007). IEEE, 2007.
"""
def __init__(self, n_clusters: list, explained_variance_for_clustering: float = 0.9,
random_state: np.random.RandomState | int = None):
super().__init__(n_clusters, explained_variance_for_clustering, random_state)
[docs] def fit(self, X: np.ndarray, y: np.ndarray = None) -> 'ClusteringInOrthogonalSpaces':
"""
Initiate the actual clustering process on the input data set.
The resulting cluster labels will be stored in the labels_ attribute.
Parameters
----------
X : np.ndarray
the given data set
y : np.ndarray
the labels (can be ignored)
Returns
-------
self : ClusteringInOrthogonalSpaces
this instance of the ClusteringInOrthogonalSpaces algorithm
"""
labels, centers, projections, pcas, global_mean = _clustering_via_orthogonalization(X, self.n_clusters,
self.explained_variance_for_clustering,
False, self.random_state)
self.labels_ = labels
self.cluster_centers_ = centers
self.projections_ = projections
self.PCAs_ = pcas
self.global_mean_ = global_mean
return self