"""K-fold cross-validation for SUBMARIT clustering.
This module implements k-fold cross-validation for SUBMARIT clustering algorithms,
following the methodology from kSMNFold.m and kSMNFold2.m. It supports:
1. Standard k-fold validation with train/test splitting
2. Empirical distribution generation for Rand index
3. Constrained clustering for test set assignment
4. Multiple clustering algorithm support
"""
from typing import Dict, List, Optional, Tuple, Union
from dataclasses import dataclass
import numpy as np
from numpy.typing import NDArray
from concurrent.futures import ProcessPoolExecutor, as_completed
import warnings
from submarit.core.base import BaseValidator
from submarit.algorithms.local_search import (
KSMLocalSearch, KSMLocalSearch2,
KSMLocalSearchConstrained, KSMLocalSearchConstrained2
)
from submarit.validation.rand_index import RandIndex
from submarit.validation.multiple_runs import run_clusters, run_clusters_constrained
@dataclass
class KFoldResult:
"""Container for k-fold validation results.
Attributes:
av_rand: Average Rand index across all fold pairs
av_adj_rand: Average adjusted Rand index across all fold pairs
av_rand_dist: Empirical distribution for average Rand index (if computed)
av_adj_rand_dist: Empirical distribution for average adjusted Rand index
fold_results: List of clustering results for each fold
rand_pairs: Rand index values for each fold pair
adj_rand_pairs: Adjusted Rand index values for each fold pair
n_folds: Number of folds used
n_random: Number of random permutations for empirical distribution
"""
av_rand: float
av_adj_rand: float
av_rand_dist: Optional[NDArray[np.float64]] = None
av_adj_rand_dist: Optional[NDArray[np.float64]] = None
fold_results: Optional[List[Dict]] = None
rand_pairs: Optional[List[Tuple[int, int, float]]] = None
adj_rand_pairs: Optional[List[Tuple[int, int, float]]] = None
n_folds: int = 0
n_random: int = 0
def summary(self) -> str:
"""Generate summary of k-fold validation results."""
lines = [
"K-Fold Cross-Validation Results",
"=" * 40,
f"Number of folds: {self.n_folds}",
f"Average Rand index: {self.av_rand:.4f}",
f"Average adjusted Rand index: {self.av_adj_rand:.4f}",
]
if self.av_rand_dist is not None:
lines.extend([
"",
"Empirical distribution statistics:",
f" Random permutations: {self.n_random}",
f" Rand index 95% CI: [{np.percentile(self.av_rand_dist, 2.5):.4f}, "
f"{np.percentile(self.av_rand_dist, 97.5):.4f}]",
f" Adjusted Rand 95% CI: [{np.percentile(self.av_adj_rand_dist, 2.5):.4f}, "
f"{np.percentile(self.av_adj_rand_dist, 97.5):.4f}]",
])
return "\n".join(lines)
[docs]
class KFoldValidator(BaseValidator):
"""K-fold cross-validation for SUBMARIT clustering.
This validator implements the k-fold validation procedure from kSMNFold.m,
where data is split into k folds, each fold is held out as test data,
and the model is trained on the remaining k-1 folds. Test items are then
assigned using constrained clustering.
Parameters
----------
n_folds : int, default=5
Number of folds for cross-validation
n_random : int, default=0
Number of random permutations for empirical distribution.
If 0, no empirical distribution is computed.
max_fold_run : int, optional
Maximum number of folds to actually run (defaults to n_folds).
Useful for testing with subset of folds.
random_state : int, optional
Random seed for reproducibility
n_jobs : int, default=1
Number of parallel jobs for empirical distribution generation.
-1 means using all processors.
algorithm : {'v1', 'v2'}, default='v1'
Which clustering algorithm to use:
- 'v1': Uses KSMLocalSearch (PHat-P optimization)
- 'v2': Uses KSMLocalSearch2 (log-likelihood optimization)
"""
def __init__(
self,
n_folds: int = 5,
n_random: int = 0,
max_fold_run: Optional[int] = None,
random_state: Optional[int] = None,
n_jobs: int = 1,
algorithm: str = 'v1'
):
"""Initialize the k-fold validator."""
super().__init__()
self.n_folds = n_folds
self.n_random = n_random
self.max_fold_run = max_fold_run or n_folds
self.random_state = random_state
self.n_jobs = n_jobs
self.algorithm = algorithm
# Validate parameters
if self.n_folds < 2:
raise ValueError("n_folds must be at least 2")
if self.max_fold_run > self.n_folds:
raise ValueError("max_fold_run cannot exceed n_folds")
if self.algorithm not in ['v1', 'v2']:
raise ValueError("algorithm must be 'v1' or 'v2'")
[docs]
def validate(
self,
swm: NDArray[np.float64],
n_clusters: int,
min_items: int = 1,
n_runs: int = 10,
**kwargs
) -> KFoldResult:
"""Perform k-fold cross-validation on switching matrix.
Parameters
----------
swm : ndarray of shape (n_items, n_items)
Product x product switching matrix
n_clusters : int
Number of clusters
min_items : int, default=1
Minimum number of items per cluster
n_runs : int, default=10
Number of SUBMARIT runs for each optimization
**kwargs : dict
Additional parameters passed to clustering algorithm
Returns
-------
result : KFoldResult
Cross-validation results including average Rand indices
and optional empirical distributions
"""
n_items = swm.shape[0]
# Generate random permutation of items
rng = np.random.RandomState(self.random_state)
col_perm = rng.permutation(n_items)
# Storage for fold results
fold_clusters = []
fold_assignments = []
random_assignments = [] if self.n_random > 0 else None
# Process each fold
for i in range(self.max_fold_run):
# Calculate fold boundaries
start = int(np.round(i * n_items / self.n_folds))
end = int(np.round((i + 1) * n_items / self.n_folds))
# Split into test and train indices
test_indexes = col_perm[start:end]
train_indexes = np.setdiff1d(np.arange(n_items), test_indexes)
# Check for zero rows/columns after removing test items
sub_swm = swm[np.ix_(train_indexes, train_indexes)]
col_sum = np.sum(sub_swm, axis=0)
row_sum = np.sum(sub_swm, axis=1)
# Find items that would create zero rows/columns
chk_items = (col_sum == 0) | (row_sum == 0)
rem_indexes = train_indexes[chk_items]
# Move problematic items to test set
if len(rem_indexes) > 0:
test_indexes = np.sort(np.concatenate([test_indexes, rem_indexes]))
train_indexes = np.setdiff1d(train_indexes, rem_indexes)
sub_swm = swm[np.ix_(train_indexes, train_indexes)]
# Run clustering on training data
train_result = run_clusters(
sub_swm, n_clusters, min_items, n_runs,
random_state=self.random_state,
algorithm=self.algorithm
)
# Assign test items using constrained algorithm
# Need to convert train assignments to full array
full_assign = np.zeros(n_items, dtype=np.int64)
full_assign[train_indexes] = train_result.Assign
# Run constrained clustering
fold_result = run_clusters_constrained(
swm, n_clusters, min_items,
full_assign[train_indexes],
train_indexes + 1, # Convert to 1-based
test_indexes + 1, # Convert to 1-based
n_runs,
random_state=self.random_state,
algorithm=self.algorithm
)
fold_clusters.append(fold_result)
fold_assignments.append(fold_result.Assign)
# Generate random assignments for empirical distribution
if self.n_random > 0:
random_fold = []
temp_col = np.zeros(n_items, dtype=np.int64)
temp_col[train_indexes] = train_result.Assign
for j in range(self.n_random):
# Random assignment for test items
temp_col[test_indexes] = rng.randint(1, n_clusters + 1,
size=len(test_indexes))
random_fold.append(temp_col.copy())
random_assignments.append(np.column_stack(random_fold))
# Calculate Rand indices for all fold pairs
rand_calc = RandIndex()
sum_rand = 0.0
sum_adj_rand = 0.0
rand_pairs = []
adj_rand_pairs = []
# Calculate pairwise Rand indices
n_pairs = 0
for i in range(self.max_fold_run - 1):
for j in range(i + 1, self.max_fold_run):
rand, adj_rand = rand_calc.calculate(
fold_assignments[i],
fold_assignments[j]
)
sum_rand += rand
sum_adj_rand += adj_rand
n_pairs += 1
rand_pairs.append((i, j, rand))
adj_rand_pairs.append((i, j, adj_rand))
# Calculate averages
av_rand = sum_rand / n_pairs if n_pairs > 0 else 0.0
av_adj_rand = sum_adj_rand / n_pairs if n_pairs > 0 else 0.0
# Generate empirical distribution if requested
av_rand_dist = None
av_adj_rand_dist = None
if self.n_random > 0:
av_rand_dist, av_adj_rand_dist = self._generate_empirical_distribution(
random_assignments, rand_calc
)
return KFoldResult(
av_rand=av_rand,
av_adj_rand=av_adj_rand,
av_rand_dist=av_rand_dist,
av_adj_rand_dist=av_adj_rand_dist,
fold_results=fold_clusters,
rand_pairs=rand_pairs,
adj_rand_pairs=adj_rand_pairs,
n_folds=self.n_folds,
n_random=self.n_random
)
def _generate_empirical_distribution(
self,
random_assignments: List[NDArray[np.int64]],
rand_calc: RandIndex
) -> Tuple[NDArray[np.float64], NDArray[np.float64]]:
"""Generate empirical distribution for random assignments.
Parameters
----------
random_assignments : list of ndarray
Random cluster assignments for each fold
rand_calc : RandIndex
Rand index calculator
Returns
-------
av_rand_dist : ndarray
Sorted distribution of average Rand indices
av_adj_rand_dist : ndarray
Sorted distribution of average adjusted Rand indices
"""
n_random = random_assignments[0].shape[1]
av_rand_dist = np.zeros(n_random)
av_adj_rand_dist = np.zeros(n_random)
# Process in parallel if requested
if self.n_jobs != 1:
av_rand_dist, av_adj_rand_dist = self._parallel_empirical_dist(
random_assignments, rand_calc
)
else:
# Sequential processing
for r_count in range(n_random):
sum_rand = 0.0
sum_adj_rand = 0.0
n_pairs = 0
for i in range(self.max_fold_run - 1):
for j in range(i + 1, self.max_fold_run):
rand, adj_rand = rand_calc.calculate(
random_assignments[i][:, r_count],
random_assignments[j][:, r_count]
)
sum_rand += rand
sum_adj_rand += adj_rand
n_pairs += 1
av_rand_dist[r_count] = sum_rand / n_pairs
av_adj_rand_dist[r_count] = sum_adj_rand / n_pairs
# Sort distributions
return np.sort(av_rand_dist), np.sort(av_adj_rand_dist)
def _parallel_empirical_dist(
self,
random_assignments: List[NDArray[np.int64]],
rand_calc: RandIndex
) -> Tuple[NDArray[np.float64], NDArray[np.float64]]:
"""Generate empirical distribution in parallel."""
n_random = random_assignments[0].shape[1]
av_rand_dist = np.zeros(n_random)
av_adj_rand_dist = np.zeros(n_random)
# Determine number of workers
n_workers = self.n_jobs
if n_workers == -1:
n_workers = None # Use all available CPUs
def compute_random_sample(r_idx: int) -> Tuple[float, float]:
"""Compute Rand indices for one random sample."""
sum_rand = 0.0
sum_adj_rand = 0.0
n_pairs = 0
for i in range(self.max_fold_run - 1):
for j in range(i + 1, self.max_fold_run):
rand, adj_rand = rand_calc.calculate(
random_assignments[i][:, r_idx],
random_assignments[j][:, r_idx]
)
sum_rand += rand
sum_adj_rand += adj_rand
n_pairs += 1
return sum_rand / n_pairs, sum_adj_rand / n_pairs
# Execute in parallel
with ProcessPoolExecutor(max_workers=n_workers) as executor:
futures = {
executor.submit(compute_random_sample, i): i
for i in range(n_random)
}
for future in as_completed(futures):
idx = futures[future]
av_rand, av_adj_rand = future.result()
av_rand_dist[idx] = av_rand
av_adj_rand_dist[idx] = av_adj_rand
return np.sort(av_rand_dist), np.sort(av_adj_rand_dist)
def k_fold_validate(
swm: NDArray[np.float64],
n_clusters: int,
min_items: int = 1,
n_runs: int = 10,
n_folds: int = 5,
n_random: int = 0,
max_fold_run: Optional[int] = None,
random_state: Optional[int] = None,
algorithm: str = 'v1'
) -> KFoldResult:
"""Convenience function for k-fold validation (MATLAB-compatible interface).
Parameters
----------
swm : ndarray of shape (n_items, n_items)
Product x product switching matrix
n_clusters : int
Number of clusters
min_items : int, default=1
Minimum number of items per cluster
n_runs : int, default=10
Number of SUBMARIT runs for each optimization
n_folds : int, default=5
Number of folds
n_random : int, default=0
Number of random permutations for empirical distribution
max_fold_run : int, optional
Maximum number of folds to run
random_state : int, optional
Random seed
algorithm : {'v1', 'v2'}, default='v1'
Which algorithm version to use
Returns
-------
result : KFoldResult
Cross-validation results
"""
validator = KFoldValidator(
n_folds=n_folds,
n_random=n_random,
max_fold_run=max_fold_run,
random_state=random_state,
algorithm=algorithm
)
return validator.validate(swm, n_clusters, min_items, n_runs)