Source code for submarit.validation.topk_analysis

"""Top-k clustering solution analysis.

This module implements functionality from RunClustersTopk.m and RunClustersTopk2.m
for analyzing agreement among top-k clustering solutions.
"""

from typing import Dict, List, Optional, Tuple, Union
import numpy as np
from numpy.typing import NDArray
from dataclasses import dataclass
import warnings
from concurrent.futures import ProcessPoolExecutor, as_completed

from submarit.algorithms.local_search import LocalSearchResult
from submarit.validation.multiple_runs import run_clusters
from submarit.validation.rand_index import RandIndex, create_rand_empirical_distribution, rand_empirical_p
from submarit.utils.matlab_compat import MatlabRandom


[docs] @dataclass class TopKResult: """Container for top-k clustering analysis results. Attributes: best_solutions: List of top-k best clustering solutions avg_rand: Average Rand index among top-k solutions avg_adj_rand: Average adjusted Rand index among top-k solutions rand_p: P-value for Rand index adj_rand_p: P-value for adjusted Rand index rand_conf: Confidence interval values for Rand index adj_rand_conf: Confidence interval values for adjusted Rand pairwise_rand: Matrix of pairwise Rand indices pairwise_adj_rand: Matrix of pairwise adjusted Rand indices empirical_dist: Empirical distribution used for p-values """ best_solutions: List[LocalSearchResult] avg_rand: float avg_adj_rand: float rand_p: Tuple[float, float] adj_rand_p: Tuple[float, float] rand_conf: Optional[NDArray[np.float64]] = None adj_rand_conf: Optional[NDArray[np.float64]] = None pairwise_rand: Optional[NDArray[np.float64]] = None pairwise_adj_rand: Optional[NDArray[np.float64]] = None empirical_dist: Optional[Dict] = None
[docs] def summary(self) -> str: """Generate summary of top-k analysis results.""" k = len(self.best_solutions) return ( f"Top-{k} Clustering Analysis\n" f"===========================\n" f"Average Rand index: {self.avg_rand:.4f}\n" f"Average adjusted Rand index: {self.avg_adj_rand:.4f}\n" f"Rand p-value: {self.rand_p[0]:.4f}\n" f"Adjusted Rand p-value: {self.adj_rand_p[0]:.4f}\n" f"\nBest solution statistics:\n" f"Log-likelihood: {self.best_solutions[0].LogLH:.4f}\n" f"Diff value: {self.best_solutions[0].Diff:.4f}\n" f"Z-value: {self.best_solutions[0].ZValue:.4f}" )
[docs] def run_clusters_topk( swm: NDArray[np.float64], n_clusters: int, min_items: int, n_runs: int, top_k: int, n_random: int = 10000, algorithm: str = 'v1', random_state: Optional[int] = None, n_jobs: int = 1, verbose: bool = False ) -> TopKResult: """Run multiple clusterings and analyze agreement among top-k solutions. This function implements the functionality of RunClustersTopk.m and RunClustersTopk2.m, finding the top-k best clustering solutions and analyzing their agreement using Rand indices. Parameters ---------- swm : ndarray of shape (n_items, n_items) Product x product switching matrix n_clusters : int Number of clusters min_items : int Minimum number of items per cluster n_runs : int Number of experimental runs top_k : int Number of top solutions to return n_random : int, default=10000 Number of random values for empirical distributions algorithm : {'v1', 'v2'}, default='v1' Which algorithm version to use: - 'v1': KSMLocalSearch (PHat-P optimization) - 'v2': KSMLocalSearch2 (log-likelihood optimization) random_state : int, optional Random seed for reproducibility n_jobs : int, default=1 Number of parallel jobs (-1 for all processors) verbose : bool, default=False Whether to print progress messages Returns ------- result : TopKResult Analysis results including p-values and confidence intervals """ n_items = swm.shape[0] if verbose: print(f"Running {n_runs} clustering attempts to find top-{top_k} solutions...") # Initialize storage for best solutions best_ll = np.full(top_k, 1e7 if algorithm == 'v2' else -1e7) best_solutions = [None] * top_k # Run clustering multiple times for i in range(n_runs): if verbose and (i + 1) % max(1, n_runs // 10) == 0: print(f" Progress: {i + 1}/{n_runs} runs completed") # Set random seed for this run if random_state is not None: run_seed = random_state + i else: run_seed = None try: # Run clustering result = run_clusters( swm, n_clusters, min_items, n_runs=1, # Single run random_state=run_seed, algorithm=algorithm ) # Get objective value if algorithm == 'v1': obj_value = result.Diff # Maximize else: obj_value = result.LogLH # Maximize (note: stored as positive) # Check if this belongs in top-k for j in range(top_k): if algorithm == 'v1': # For v1, we maximize Diff if obj_value > best_ll[j]: # Shift worse solutions down for k in range(top_k - 1, j, -1): best_ll[k] = best_ll[k - 1] best_solutions[k] = best_solutions[k - 1] # Insert new solution best_ll[j] = obj_value best_solutions[j] = result break else: # For v2, we maximize LogLH (less negative is better) if obj_value > best_ll[j]: # Shift worse solutions down for k in range(top_k - 1, j, -1): best_ll[k] = best_ll[k - 1] best_solutions[k] = best_solutions[k - 1] # Insert new solution best_ll[j] = obj_value best_solutions[j] = result break except Exception as e: if verbose: warnings.warn(f"Run {i + 1} failed: {str(e)}") continue # Filter out None values (in case we didn't find enough solutions) best_solutions = [s for s in best_solutions if s is not None] if len(best_solutions) < top_k: warnings.warn(f"Only found {len(best_solutions)} valid solutions out of {top_k} requested") top_k = len(best_solutions) if verbose: print(f"Computing pairwise Rand indices for top-{top_k} solutions...") # Calculate pairwise Rand indices rand_calc = RandIndex() pairwise_rand = np.zeros((top_k, top_k)) pairwise_adj_rand = np.zeros((top_k, top_k)) sum_rand = 0.0 sum_adj_rand = 0.0 n_pairs = 0 for i in range(top_k - 1): for j in range(i + 1, top_k): rand_idx, adj_rand_idx = rand_calc.calculate( best_solutions[i].Assign, best_solutions[j].Assign ) pairwise_rand[i, j] = rand_idx pairwise_rand[j, i] = rand_idx pairwise_adj_rand[i, j] = adj_rand_idx pairwise_adj_rand[j, i] = adj_rand_idx sum_rand += rand_idx sum_adj_rand += adj_rand_idx n_pairs += 1 # Fill diagonal np.fill_diagonal(pairwise_rand, 1.0) np.fill_diagonal(pairwise_adj_rand, 1.0) # Calculate average Rand indices avg_rand = sum_rand / n_pairs if n_pairs > 0 else 0.0 avg_adj_rand = sum_adj_rand / n_pairs if n_pairs > 0 else 0.0 if verbose: print(f"Creating empirical distribution with {n_random} random samples...") # Create empirical distribution for p-values if n_jobs == 1: # Sequential version avg_rand_dist = np.zeros(n_random) avg_adj_rand_dist = np.zeros(n_random) rng = MatlabRandom(random_state) for k in range(n_random): if verbose and (k + 1) % max(1, n_random // 10) == 0: print(f" Progress: {k + 1}/{n_random} samples generated") # Generate top_k random assignments rand_assign = np.zeros((n_items, top_k), dtype=np.int64) for t in range(top_k): rand_assign[:, t] = rng.randi(n_clusters, n_items) # Calculate pairwise Rand indices k_sum_rand = 0.0 k_sum_adj_rand = 0.0 for i in range(top_k - 1): for j in range(i + 1, top_k): rand_idx, adj_rand_idx = rand_calc.calculate( rand_assign[:, i], rand_assign[:, j] ) k_sum_rand += rand_idx k_sum_adj_rand += adj_rand_idx avg_rand_dist[k] = k_sum_rand / n_pairs if n_pairs > 0 else 0.0 avg_adj_rand_dist[k] = k_sum_adj_rand / n_pairs if n_pairs > 0 else 0.0 else: # Parallel version avg_rand_dist, avg_adj_rand_dist = _parallel_empirical_distribution( n_items, n_clusters, top_k, n_random, random_state, n_jobs, verbose ) # Sort distributions avg_rand_dist.sort() avg_adj_rand_dist.sort() if verbose: print("Computing p-values and confidence intervals...") # Calculate p-values using empirical distribution p_struct = rand_empirical_p( avg_rand_dist, avg_adj_rand_dist, avg_rand, avg_adj_rand, create_confidence=True ) return TopKResult( best_solutions=best_solutions, avg_rand=avg_rand, avg_adj_rand=avg_adj_rand, rand_p=p_struct['rand_p'], adj_rand_p=p_struct['adj_rand_p'], rand_conf=p_struct.get('rand_conf'), adj_rand_conf=p_struct.get('adj_rand_conf'), pairwise_rand=pairwise_rand, pairwise_adj_rand=pairwise_adj_rand, empirical_dist={ 'avg_rand_dist': avg_rand_dist, 'avg_adj_rand_dist': avg_adj_rand_dist, 'n_random': n_random } )
def _parallel_empirical_distribution( n_items: int, n_clusters: int, top_k: int, n_random: int, random_state: Optional[int], n_jobs: int, verbose: bool ) -> Tuple[NDArray[np.float64], NDArray[np.float64]]: """Generate empirical distribution in parallel.""" n_workers = n_jobs if n_jobs > 0 else None def compute_sample(seed: int) -> Tuple[float, float]: """Compute one sample for the distribution.""" local_rng = MatlabRandom(seed) rand_calc = RandIndex() # Generate top_k random assignments rand_assign = np.zeros((n_items, top_k), dtype=np.int64) for t in range(top_k): rand_assign[:, t] = local_rng.randi(n_clusters, n_items) # Calculate pairwise Rand indices sum_rand = 0.0 sum_adj_rand = 0.0 n_pairs = 0 for i in range(top_k - 1): for j in range(i + 1, top_k): rand_idx, adj_rand_idx = rand_calc.calculate( rand_assign[:, i], rand_assign[:, j] ) sum_rand += rand_idx sum_adj_rand += adj_rand_idx n_pairs += 1 avg_rand = sum_rand / n_pairs if n_pairs > 0 else 0.0 avg_adj_rand = sum_adj_rand / n_pairs if n_pairs > 0 else 0.0 return avg_rand, avg_adj_rand # Generate unique seeds if random_state is not None: base_rng = np.random.RandomState(random_state) seeds = base_rng.randint(0, 2**31, size=n_random) else: seeds = np.random.randint(0, 2**31, size=n_random) # Initialize arrays avg_rand_dist = np.zeros(n_random) avg_adj_rand_dist = np.zeros(n_random) with ProcessPoolExecutor(max_workers=n_workers) as executor: futures = { executor.submit(compute_sample, seeds[i]): i for i in range(n_random) } completed = 0 for future in as_completed(futures): idx = futures[future] avg_rand_dist[idx], avg_adj_rand_dist[idx] = future.result() completed += 1 if verbose and completed % max(1, n_random // 10) == 0: print(f" Progress: {completed}/{n_random} samples generated") return avg_rand_dist, avg_adj_rand_dist
[docs] def analyze_solution_stability( solutions: List[LocalSearchResult], verbose: bool = False ) -> Dict[str, Union[float, NDArray[np.float64]]]: """Analyze stability across multiple clustering solutions. Parameters ---------- solutions : list of LocalSearchResult Clustering solutions to analyze verbose : bool, default=False Whether to print analysis details Returns ------- analysis : dict Dictionary containing: - 'consensus_matrix': Item x item co-clustering frequency - 'item_stability': Stability score for each item - 'cluster_stability': Stability of each cluster - 'overall_stability': Overall stability score """ if not solutions: raise ValueError("No solutions to analyze") n_items = len(solutions[0].Assign) n_solutions = len(solutions) # Build consensus matrix consensus_matrix = np.zeros((n_items, n_items)) for solution in solutions: assignments = solution.Assign for i in range(n_items): for j in range(i + 1, n_items): if assignments[i] == assignments[j]: consensus_matrix[i, j] += 1 consensus_matrix[j, i] += 1 # Normalize by number of solutions consensus_matrix /= n_solutions np.fill_diagonal(consensus_matrix, 1.0) # Calculate item stability item_stability = np.zeros(n_items) for i in range(n_items): # Stability is high if co-clustering frequencies are close to 0 or 1 co_cluster_freq = consensus_matrix[i, :] item_stability[i] = np.mean(np.minimum(co_cluster_freq, 1 - co_cluster_freq)) * 2 # Overall stability overall_stability = 1 - np.mean(item_stability) # Analyze cluster-level stability # Find most common cluster configuration from collections import Counter config_counts = Counter() for solution in solutions: # Create canonical representation of clustering # (to handle label permutations) config = tuple(sorted(Counter(solution.Assign).values())) config_counts[config] += 1 most_common_config = config_counts.most_common(1)[0] config_stability = most_common_config[1] / n_solutions if verbose: print(f"Overall stability: {overall_stability:.3f}") print(f"Most common configuration appears in {config_stability:.1%} of solutions") print(f"Configuration: {most_common_config[0]}") return { 'consensus_matrix': consensus_matrix, 'item_stability': item_stability, 'overall_stability': overall_stability, 'config_stability': config_stability, 'most_common_config': most_common_config[0] }