"""Rand index calculations and empirical distributions.
This module implements the Rand index and adjusted Rand index for comparing
clustering solutions, following the methodology from RandIndex4.m. It also
provides functions for creating empirical distributions of Rand indices.
"""
from typing import Dict, Optional, Tuple, Union
from dataclasses import dataclass
import numpy as np
from numpy.typing import NDArray
from concurrent.futures import ProcessPoolExecutor, as_completed
from submarit.utils.matlab_compat import MatlabRandom
[docs]
@dataclass
class RandIndexResult:
"""Container for Rand index calculation results.
Attributes:
rand: Rand index value (0 to 1)
adj_rand: Adjusted Rand index value
a: Number of pairs in same cluster in both clusterings
b: Number of pairs in different clusters in both clusterings
c: Number of pairs in same cluster in C1, different in C2
d: Number of pairs in different cluster in C1, same in C2
n_items: Number of items compared
n_pairs: Total number of pairs
"""
rand: float
adj_rand: float
a: int
b: int
c: int
d: int
n_items: int
n_pairs: int
[docs]
def summary(self) -> str:
"""Generate summary of Rand index results."""
return (
f"Rand Index Results\n"
f"==================\n"
f"Rand index: {self.rand:.4f}\n"
f"Adjusted Rand index: {self.adj_rand:.4f}\n"
f"Number of items: {self.n_items}\n"
f"Total pairs: {self.n_pairs}\n"
f"Contingency table:\n"
f" a (agree same): {self.a}\n"
f" b (agree different): {self.b}\n"
f" c (disagree C1 same): {self.c}\n"
f" d (disagree C1 diff): {self.d}"
)
[docs]
@dataclass
class RandEmpiricalDistribution:
"""Container for Rand index empirical distribution.
Attributes:
rand_dist: Sorted array of Rand index values
adj_rand_dist: Sorted array of adjusted Rand index values
n_points: Number of points in distribution
n_items: Number of items in clusterings
n_clusters: Number of clusters
min_items: Minimum items per cluster constraint
"""
rand_dist: NDArray[np.float64]
adj_rand_dist: NDArray[np.float64]
n_points: int
n_items: int
n_clusters: int
min_items: int = 1
[docs]
def get_percentiles(self, percentiles: Optional[NDArray[np.float64]] = None) -> Dict[str, NDArray[np.float64]]:
"""Get percentile values from the distribution.
Parameters
----------
percentiles : array-like, optional
Percentiles to compute (0-100). Defaults to standard confidence intervals.
Returns
-------
dict
Dictionary with 'rand' and 'adj_rand' percentile values
"""
if percentiles is None:
percentiles = np.array([0.5, 2.5, 5, 25, 50, 75, 95, 97.5, 99.5])
indices = np.round(percentiles / 100 * self.n_points).astype(int)
indices = np.clip(indices, 0, self.n_points - 1)
return {
'rand': self.rand_dist[indices],
'adj_rand': self.adj_rand_dist[indices],
'percentiles': percentiles
}
[docs]
def calculate_p_values(self, rand_value: float, adj_rand_value: float) -> Dict[str, Tuple[float, float]]:
"""Calculate p-values for given Rand index values.
Parameters
----------
rand_value : float
Observed Rand index
adj_rand_value : float
Observed adjusted Rand index
Returns
-------
dict
Dictionary with p-values: {'rand': (upper_p, lower_p), 'adj_rand': (upper_p, lower_p)}
"""
# Find indices with higher values
rand_gr_idx = np.where(self.rand_dist > rand_value)[0]
adj_rand_gr_idx = np.where(self.adj_rand_dist > adj_rand_value)[0]
# Calculate p-values
if len(rand_gr_idx) == 0:
rand_p = (0.0, 1.0) # Better than all values
else:
below = (rand_gr_idx[0] - 1) / self.n_points
rand_p = (1 - below, below)
if len(adj_rand_gr_idx) == 0:
adj_rand_p = (0.0, 1.0) # Better than all values
else:
below = (adj_rand_gr_idx[0] - 1) / self.n_points
adj_rand_p = (1 - below, below)
return {
'rand': rand_p,
'adj_rand': adj_rand_p
}
[docs]
class RandIndex:
"""Calculator for Rand index and adjusted Rand index.
The Rand index measures the similarity between two clusterings by considering
all pairs of items and checking whether they are grouped consistently.
The adjusted Rand index corrects for chance agreement and is preferred
for comparing clusterings with different numbers of clusters.
"""
[docs]
def calculate(
self,
clusters1: NDArray[np.int64],
clusters2: NDArray[np.int64]
) -> Tuple[float, float]:
"""Calculate Rand index and adjusted Rand index.
This implements the methodology from RandIndex4.m, computing both
the standard Rand index (Rand, 1971) and the adjusted Rand index
(Hubert and Arabie, 1985).
Parameters
----------
clusters1 : ndarray of shape (n_items,)
First clustering assignment (1-based cluster labels)
clusters2 : ndarray of shape (n_items,)
Second clustering assignment (1-based cluster labels)
Returns
-------
rand : float
Rand index (0 to 1, where 1 indicates perfect agreement)
adj_rand : float
Adjusted Rand index (can be negative, 1 indicates perfect agreement)
"""
# Validate inputs
if len(clusters1) != len(clusters2):
raise ValueError("Clusterings must have the same number of items")
n = len(clusters1)
if n < 2:
raise ValueError("At least 2 items required for Rand index")
# Total number of pairs
N = n * (n - 1) // 2
# Get maximum cluster label to size contingency table
max_label = max(np.max(clusters1), np.max(clusters2))
# Create indicator matrices (n x max_clusters)
# CA1[i,j] = 1 if item i is in cluster j+1
CA1 = np.zeros((n, max_label), dtype=int)
CA2 = np.zeros((n, max_label), dtype=int)
# Fill indicator matrices (convert to 0-based indexing)
CA1[np.arange(n), clusters1 - 1] = 1
CA2[np.arange(n), clusters2 - 1] = 1
# Compute contingency table
match = CA1.T @ CA2 # Clusters1 x Clusters2 contingency table
# Row and column sums
row_sums = np.sum(match, axis=1)
col_sums = np.sum(match, axis=0)
# Calculate intermediate values
# P: pairs from same cluster in clustering 1
P = np.sum(row_sums * (row_sums - 1) // 2)
# Q: pairs from same cluster in clustering 2
Q = np.sum(col_sums * (col_sums - 1) // 2)
# T: pairs in same cluster in both clusterings
T = (np.sum(match ** 2) - n) // 2
# Rand index: (a + b) / (n choose 2)
# where a = T (agree same), b = N - P - Q + T (agree different)
rand = (N + 2 * T - P - Q) / N
# Adjusted Rand index
if N * (P + Q) - 2 * P * Q == 0:
# Handle edge case where denominator is 0
adj_rand = 0.0
else:
adj_rand = 2 * (N * T - P * Q) / (N * (P + Q) - 2 * P * Q)
return rand, adj_rand
[docs]
def calculate_detailed(
self,
clusters1: NDArray[np.int64],
clusters2: NDArray[np.int64]
) -> RandIndexResult:
"""Calculate Rand index with detailed contingency information.
Parameters
----------
clusters1 : ndarray of shape (n_items,)
First clustering assignment (1-based cluster labels)
clusters2 : ndarray of shape (n_items,)
Second clustering assignment (1-based cluster labels)
Returns
-------
result : RandIndexResult
Detailed results including contingency table counts
"""
n = len(clusters1)
N = n * (n - 1) // 2
# Get basic Rand indices
rand, adj_rand = self.calculate(clusters1, clusters2)
# Calculate detailed contingency counts
# a: pairs in same cluster in both
# b: pairs in different clusters in both
# c: pairs in same cluster in C1, different in C2
# d: pairs in different cluster in C1, same in C2
a = 0 # agree same
b = 0 # agree different
c = 0 # disagree (C1 same, C2 diff)
d = 0 # disagree (C1 diff, C2 same)
for i in range(n - 1):
for j in range(i + 1, n):
same_c1 = clusters1[i] == clusters1[j]
same_c2 = clusters2[i] == clusters2[j]
if same_c1 and same_c2:
a += 1
elif not same_c1 and not same_c2:
b += 1
elif same_c1 and not same_c2:
c += 1
else: # not same_c1 and same_c2
d += 1
return RandIndexResult(
rand=rand,
adj_rand=adj_rand,
a=a,
b=b,
c=c,
d=d,
n_items=n,
n_pairs=N
)
[docs]
def create_rand_empirical_distribution(
n_items: int,
n_clusters: int,
n_points: int,
min_items: int = 1,
random_state: Optional[int] = None,
n_jobs: int = 1
) -> RandEmpiricalDistribution:
"""Create empirical distribution for Rand index under random clustering.
This function implements the methodology from RandCreateDist.m, generating
random clustering pairs and computing their Rand indices to build an
empirical null distribution.
Parameters
----------
n_items : int
Number of items to cluster
n_clusters : int
Number of clusters
n_points : int
Number of points in empirical distribution
min_items : int, default=1
Minimum number of items required per cluster
random_state : int, optional
Random seed for reproducibility
n_jobs : int, default=1
Number of parallel jobs. -1 means using all processors.
Returns
-------
distribution : RandEmpiricalDistribution
Empirical distribution of Rand indices
"""
# Initialize arrays
rand_dist = np.zeros(n_points)
adj_rand_dist = np.zeros(n_points)
# Random number generator
rng = MatlabRandom(random_state)
rand_calc = RandIndex()
if n_jobs == 1:
# Sequential processing
for i_point in range(n_points):
# Generate two random clusterings with min_items constraint
clusters1 = _generate_random_clustering(n_items, n_clusters, min_items, rng)
clusters2 = _generate_random_clustering(n_items, n_clusters, min_items, rng)
# Calculate Rand indices
rand_dist[i_point], adj_rand_dist[i_point] = rand_calc.calculate(
clusters1, clusters2
)
else:
# Parallel processing
n_workers = n_jobs if n_jobs > 0 else None
def compute_sample(seed: int) -> Tuple[float, float]:
"""Compute one sample for the distribution."""
local_rng = MatlabRandom(seed)
clusters1 = _generate_random_clustering(n_items, n_clusters, min_items, local_rng)
clusters2 = _generate_random_clustering(n_items, n_clusters, min_items, local_rng)
return rand_calc.calculate(clusters1, clusters2)
# Generate unique seeds for each worker
if random_state is not None:
base_rng = np.random.RandomState(random_state)
seeds = base_rng.randint(0, 2**31, size=n_points)
else:
seeds = np.random.randint(0, 2**31, size=n_points)
with ProcessPoolExecutor(max_workers=n_workers) as executor:
futures = {
executor.submit(compute_sample, seeds[i]): i
for i in range(n_points)
}
for future in as_completed(futures):
idx = futures[future]
rand_dist[idx], adj_rand_dist[idx] = future.result()
# Sort distributions
rand_dist.sort()
adj_rand_dist.sort()
return RandEmpiricalDistribution(
rand_dist=rand_dist,
adj_rand_dist=adj_rand_dist,
n_points=n_points,
n_items=n_items,
n_clusters=n_clusters,
min_items=min_items
)
def _generate_random_clustering(
n_items: int,
n_clusters: int,
min_items: int,
rng: MatlabRandom
) -> NDArray[np.int64]:
"""Generate random clustering with minimum items constraint.
Parameters
----------
n_items : int
Number of items
n_clusters : int
Number of clusters
min_items : int
Minimum items per cluster
rng : MatlabRandom
Random number generator
Returns
-------
clusters : ndarray
Random cluster assignments (1-based)
"""
min_item_count = 0
while min_item_count < min_items:
# Generate random assignments (1-based)
clusters = rng.randi(n_clusters, n_items)
# Check minimum item count
min_item_count = n_items
for i in range(1, n_clusters + 1):
count = np.sum(clusters == i)
min_item_count = min(count, min_item_count)
return clusters
[docs]
def rand_empirical_p(
rand_dist: NDArray[np.float64],
adj_rand_dist: NDArray[np.float64],
rand: float,
adj_rand: float,
create_confidence: bool = True
) -> Dict[str, Union[float, Tuple[float, float], NDArray[np.float64]]]:
"""Calculate p-values from empirical distribution.
This implements the functionality from RandEmpiricalP.m, computing
p-values and confidence intervals from empirical distributions.
Parameters
----------
rand_dist : ndarray
Sorted empirical distribution of Rand indices
adj_rand_dist : ndarray
Sorted empirical distribution of adjusted Rand indices
rand : float
Observed Rand index
adj_rand : float
Observed adjusted Rand index
create_confidence : bool, default=True
Whether to compute confidence interval values
Returns
-------
result : dict
Dictionary containing:
- 'rand': Observed Rand index
- 'adj_rand': Observed adjusted Rand index
- 'rand_p': (upper_p, lower_p) tuple for Rand index
- 'adj_rand_p': (upper_p, lower_p) tuple for adjusted Rand
- 'rand_conf': Confidence interval values (if requested)
- 'adj_rand_conf': Adjusted Rand CI values (if requested)
"""
n_points = len(rand_dist)
result = {
'rand': rand,
'adj_rand': adj_rand
}
# Calculate p-values for Rand index
gr_indexes = np.where(rand_dist > rand)[0]
if len(gr_indexes) == 0:
result['rand_p'] = (0.0, 1.0) # Better than all values
else:
below = (gr_indexes[0] - 1) / n_points
result['rand_p'] = (1 - below, below)
# Calculate p-values for adjusted Rand index
gr_indexes = np.where(adj_rand_dist > adj_rand)[0]
if len(gr_indexes) == 0:
result['adj_rand_p'] = (0.0, 1.0) # Better than all values
else:
below = (gr_indexes[0] - 1) / n_points
result['adj_rand_p'] = (1 - below, below)
# Create confidence interval values if requested
if create_confidence:
# Standard confidence levels
conf_levels = np.array([0.005, 0.025, 0.05, 0.25, 0.5, 0.75, 0.95, 0.975, 0.995])
indices = np.round(conf_levels * n_points).astype(int)
indices = np.clip(indices, 0, n_points - 1)
result['rand_conf'] = rand_dist[indices]
result['adj_rand_conf'] = adj_rand_dist[indices]
result['conf_levels'] = conf_levels
return result