"""Statistical tests and utilities for SUBMARIT evaluation."""
from typing import Tuple, Optional, Dict, List
import numpy as np
from numpy.typing import NDArray
from scipy import stats
import warnings
from submarit.evaluation.cluster_evaluator import ClusterEvaluationResult
[docs]
class StatisticalTests:
"""Statistical testing utilities for cluster evaluation."""
[docs]
@staticmethod
def permutation_test(
swm: NDArray[np.float64],
assign1: NDArray[np.int64],
assign2: NDArray[np.int64],
n_permutations: int = 1000,
metric: str = "z_value",
random_state: Optional[int] = None
) -> Dict[str, float]:
"""Perform permutation test to compare two clustering solutions.
Args:
swm: Switching matrix
assign1: First cluster assignment
assign2: Second cluster assignment
n_permutations: Number of permutations
metric: Metric to compare ('z_value', 'diff', 'log_lh')
random_state: Random seed
Returns:
Dictionary with test statistics and p-value
"""
from submarit.evaluation.cluster_evaluator import ClusterEvaluator
if random_state is not None:
np.random.seed(random_state)
evaluator = ClusterEvaluator()
n_clusters1 = len(np.unique(assign1))
n_clusters2 = len(np.unique(assign2))
# Evaluate original assignments
result1 = evaluator.evaluate(swm, n_clusters1, assign1)
result2 = evaluator.evaluate(swm, n_clusters2, assign2)
# Get original difference
orig_diff = getattr(result1, metric) - getattr(result2, metric)
# Perform permutations
perm_diffs = []
n_items = len(assign1)
for _ in range(n_permutations):
# Randomly swap assignments between solutions
swap_mask = np.random.rand(n_items) < 0.5
perm_assign1 = assign1.copy()
perm_assign2 = assign2.copy()
perm_assign1[swap_mask] = assign2[swap_mask]
perm_assign2[swap_mask] = assign1[swap_mask]
# Evaluate permuted assignments
try:
perm_result1 = evaluator.evaluate(swm, n_clusters1, perm_assign1)
perm_result2 = evaluator.evaluate(swm, n_clusters2, perm_assign2)
perm_diff = getattr(perm_result1, metric) - getattr(perm_result2, metric)
perm_diffs.append(perm_diff)
except:
# Skip invalid permutations
continue
perm_diffs = np.array(perm_diffs)
# Calculate p-value (two-tailed)
p_value = np.mean(np.abs(perm_diffs) >= np.abs(orig_diff))
return {
"original_diff": orig_diff,
"p_value": p_value,
"n_permutations": len(perm_diffs),
"metric": metric,
"result1_value": getattr(result1, metric),
"result2_value": getattr(result2, metric),
"perm_mean": np.mean(perm_diffs),
"perm_std": np.std(perm_diffs)
}
[docs]
@staticmethod
def bootstrap_confidence_interval(
swm: NDArray[np.float64],
cluster_assign: NDArray[np.int64],
metric: str = "z_value",
n_bootstrap: int = 1000,
confidence: float = 0.95,
random_state: Optional[int] = None
) -> Dict[str, float]:
"""Calculate bootstrap confidence intervals for evaluation metrics.
Args:
swm: Switching matrix
cluster_assign: Cluster assignments
metric: Metric to analyze
n_bootstrap: Number of bootstrap samples
confidence: Confidence level
random_state: Random seed
Returns:
Dictionary with confidence intervals and statistics
"""
from submarit.evaluation.cluster_evaluator import ClusterEvaluator
if random_state is not None:
np.random.seed(random_state)
evaluator = ClusterEvaluator()
n_clusters = len(np.unique(cluster_assign))
n_items = swm.shape[0]
# Original evaluation
orig_result = evaluator.evaluate(swm, n_clusters, cluster_assign)
orig_value = getattr(orig_result, metric)
# Bootstrap samples
bootstrap_values = []
for _ in range(n_bootstrap):
# Resample items with replacement
boot_indices = np.random.choice(n_items, size=n_items, replace=True)
# Create bootstrap switching matrix
boot_swm = swm[np.ix_(boot_indices, boot_indices)]
# Map cluster assignments
boot_assign = cluster_assign[boot_indices]
# Ensure all clusters are represented
unique_clusters = np.unique(boot_assign)
if len(unique_clusters) < n_clusters:
continue
# Remap to consecutive integers
remap = {old: new+1 for new, old in enumerate(unique_clusters)}
boot_assign = np.array([remap[x] for x in boot_assign])
try:
boot_result = evaluator.evaluate(boot_swm, len(unique_clusters), boot_assign)
bootstrap_values.append(getattr(boot_result, metric))
except:
continue
bootstrap_values = np.array(bootstrap_values)
# Calculate confidence intervals
alpha = 1 - confidence
lower_percentile = (alpha/2) * 100
upper_percentile = (1 - alpha/2) * 100
ci_lower = np.percentile(bootstrap_values, lower_percentile)
ci_upper = np.percentile(bootstrap_values, upper_percentile)
return {
"metric": metric,
"original_value": orig_value,
"mean": np.mean(bootstrap_values),
"std": np.std(bootstrap_values),
"ci_lower": ci_lower,
"ci_upper": ci_upper,
"confidence": confidence,
"n_bootstrap": len(bootstrap_values)
}
[docs]
@staticmethod
def stability_analysis(
swm: NDArray[np.float64],
cluster_func,
n_clusters: int,
n_runs: int = 10,
min_items: int = 1
) -> Dict[str, float]:
"""Analyze stability of clustering results across multiple runs.
Args:
swm: Switching matrix
cluster_func: Clustering function
n_clusters: Number of clusters
n_runs: Number of runs to perform
min_items: Minimum items per cluster
Returns:
Dictionary with stability metrics
"""
from submarit.evaluation.cluster_evaluator import ClusterEvaluator
evaluator = ClusterEvaluator()
# Store results from multiple runs
z_values = []
log_lh_values = []
diff_values = []
assignments = []
for i in range(n_runs):
# Run clustering
cluster_result = cluster_func(swm, n_clusters, min_items, 1)
# Evaluate
eval_result = evaluator.evaluate(swm, n_clusters, cluster_result.assign)
z_values.append(eval_result.z_value)
log_lh_values.append(eval_result.log_lh)
diff_values.append(eval_result.diff)
assignments.append(cluster_result.assign)
# Calculate stability metrics
z_values = np.array(z_values)
log_lh_values = np.array(log_lh_values)
diff_values = np.array(diff_values)
# Calculate pairwise agreement between assignments
n_items = len(assignments[0])
agreement_scores = []
for i in range(n_runs):
for j in range(i+1, n_runs):
# Calculate Rand index or similar
agreement = StatisticalTests._calculate_agreement(
assignments[i], assignments[j]
)
agreement_scores.append(agreement)
return {
"z_value_mean": np.mean(z_values),
"z_value_std": np.std(z_values),
"z_value_cv": np.std(z_values) / (np.mean(z_values) + 1e-10),
"log_lh_mean": np.mean(log_lh_values),
"log_lh_std": np.std(log_lh_values),
"diff_mean": np.mean(diff_values),
"diff_std": np.std(diff_values),
"mean_agreement": np.mean(agreement_scores) if agreement_scores else 0,
"min_agreement": np.min(agreement_scores) if agreement_scores else 0,
"n_runs": n_runs
}
@staticmethod
def _calculate_agreement(assign1: NDArray[np.int64], assign2: NDArray[np.int64]) -> float:
"""Calculate agreement between two cluster assignments (simplified Rand index)."""
n = len(assign1)
agreements = 0
for i in range(n):
for j in range(i+1, n):
# Check if pair is in same cluster in both assignments
same1 = assign1[i] == assign1[j]
same2 = assign2[i] == assign2[j]
if same1 == same2:
agreements += 1
total_pairs = n * (n - 1) / 2
return agreements / total_pairs if total_pairs > 0 else 0
[docs]
@staticmethod
def cluster_validity_indices(
swm: NDArray[np.float64],
cluster_assign: NDArray[np.int64]
) -> Dict[str, float]:
"""Calculate various cluster validity indices.
Args:
swm: Switching matrix
cluster_assign: Cluster assignments
Returns:
Dictionary with validity indices
"""
n_items = swm.shape[0]
n_clusters = len(np.unique(cluster_assign))
# Calculate within-cluster and between-cluster switching
within_switching = 0
between_switching = 0
for i in range(n_items):
for j in range(i+1, n_items):
if cluster_assign[i] == cluster_assign[j]:
within_switching += swm[i, j]
else:
between_switching += swm[i, j]
total_switching = within_switching + between_switching
# Silhouette-like coefficient for switching data
silhouette_values = []
for i in range(n_items):
# Average switching to items in same cluster
same_cluster = cluster_assign == cluster_assign[i]
same_cluster[i] = False # Exclude self
if np.sum(same_cluster) > 0:
a_i = np.mean(swm[i, same_cluster])
else:
a_i = 0
# Average switching to items in other clusters
b_values = []
for c in range(1, n_clusters + 1):
if c != cluster_assign[i]:
other_cluster = cluster_assign == c
if np.sum(other_cluster) > 0:
b_values.append(np.mean(swm[i, other_cluster]))
b_i = np.min(b_values) if b_values else 0
# Silhouette coefficient
s_i = (b_i - a_i) / (max(a_i, b_i) + 1e-10)
silhouette_values.append(s_i)
# Dunn-like index
min_between = np.inf
max_within = 0
for c1 in range(1, n_clusters + 1):
cluster1 = cluster_assign == c1
# Within-cluster maximum
if np.sum(cluster1) > 1:
within_swm = swm[np.ix_(cluster1, cluster1)]
max_within = max(max_within, np.max(within_swm))
# Between-cluster minimum
for c2 in range(c1 + 1, n_clusters + 1):
cluster2 = cluster_assign == c2
if np.sum(cluster1) > 0 and np.sum(cluster2) > 0:
between_swm = swm[np.ix_(cluster1, cluster2)]
min_between = min(min_between, np.min(between_swm[between_swm > 0]))
dunn_index = min_between / (max_within + 1e-10) if max_within > 0 else 0
return {
"within_switching_ratio": within_switching / (total_switching + 1e-10),
"between_switching_ratio": between_switching / (total_switching + 1e-10),
"silhouette_coefficient": np.mean(silhouette_values),
"dunn_index": dunn_index,
"n_clusters": n_clusters,
"n_items": n_items
}