Source code for submarit.evaluation.cluster_evaluator

"""Cluster evaluation metrics for SUBMARIT clustering."""

from typing import Dict, List, Optional, Tuple, Union
import numpy as np
from numpy.typing import NDArray
from dataclasses import dataclass

from submarit.core.base import BaseEvaluator


@dataclass
class ClusterEvaluationResult:
    """Container for cluster evaluation results.
    
    Attributes:
        assign: Cluster assignments (1 to n_clusters)
        indexes: List of arrays containing product indices for each cluster
        counts: Number of products in each cluster
        diff: Sum of (PHat - P)
        item_diff: Sum of (PHat - P) / n_items
        scaled_diff: Sum of (PHat - P) / P
        z_value: Z-value statistic as per Urban and Hauser
        max_obj: Objective function value (same as diff)
        log_lh: Log-likelihood from maximum likelihood model
        diff_sq: Sum of (PHat - P)^2
        p_hat: Estimated probabilities
        p: Theoretical probabilities
        var: Variance values for each cluster
        n_clusters: Number of clusters
        n_items: Number of items
    """
    assign: NDArray[np.int64]
    indexes: List[NDArray[np.int64]]
    counts: List[int]
    diff: float
    item_diff: float
    scaled_diff: float
    z_value: float
    max_obj: float
    log_lh: float
    diff_sq: float
    p_hat: NDArray[np.float64]
    p: NDArray[np.float64]
    var: List[NDArray[np.float64]]
    n_clusters: int
    n_items: int
    
    def __repr__(self) -> str:
        """String representation."""
        return (
            f"ClusterEvaluationResult("
            f"n_clusters={self.n_clusters}, "
            f"n_items={self.n_items}, "
            f"diff={self.diff:.6f}, "
            f"z_value={self.z_value:.4f}, "
            f"log_lh={self.log_lh:.4f})"
        )
    
    def summary(self) -> str:
        """Generate detailed summary."""
        lines = [
            f"Cluster Evaluation Results",
            f"=" * 40,
            f"Number of clusters: {self.n_clusters}",
            f"Number of items: {self.n_items}",
            f"",
            f"Cluster sizes:",
        ]
        
        for i, count in enumerate(self.counts):
            lines.append(f"  Cluster {i+1}: {count} items")
        
        lines.extend([
            f"",
            f"Evaluation metrics:",
            f"  Diff (PHat - P): {self.diff:.6f}",
            f"  Item diff: {self.item_diff:.6f}",
            f"  Scaled diff: {self.scaled_diff:.6f}",
            f"  Z-value: {self.z_value:.4f}",
            f"  Log-likelihood: {self.log_lh:.4f}",
            f"  Squared diff: {self.diff_sq:.6f}",
        ])
        
        return "\n".join(lines)


[docs] class ClusterEvaluator(BaseEvaluator): """Evaluates SUBMARIT clustering solutions using multiple criteria. This class implements the evaluation methodology from kSMEvaluateClustering.m, computing various statistics to assess clustering quality including: - Difference between estimated and theoretical probabilities - Z-value statistics - Log-likelihood values """ def __init__(self): """Initialize the cluster evaluator.""" super().__init__()
[docs] def evaluate( self, swm: NDArray[np.float64], n_clusters: int, cluster_assign: NDArray[np.int64] ) -> ClusterEvaluationResult: """Evaluate a SUBMARIT clustering solution. Args: swm: Product x product switching matrix n_clusters: Number of clusters cluster_assign: Product cluster assignments (1 to n_clusters) Returns: ClusterEvaluationResult containing all evaluation metrics """ n_items = swm.shape[0] # Calculate sales and proportions p_sales = np.sum(swm, axis=1) pswm = swm / (p_sales[:, np.newaxis] + 1e-10) # Add small value to avoid division by zero pp_sales = p_sales / np.sum(p_sales) # Initialize cluster information indexes = [] counts = [] for i in range(1, n_clusters + 1): cur_indexes = np.where(cluster_assign == i)[0] indexes.append(cur_indexes) counts.append(len(cur_indexes)) # Setup P and PHat arrays p_hat = np.zeros(n_items) p = np.zeros(n_items) var_list = [] log_lh = 0.0 # Calculate values for each cluster for i_clus in range(n_clusters): idx = indexes[i_clus] if len(idx) == 0: var_list.append(np.array([])) continue # Extract submatrix for current cluster sub_swm = pswm[np.ix_(idx, idx)] # Sum for each item to get PHat value p_hat[idx] = np.sum(sub_swm, axis=1) # Calculate P values spp_sales = pp_sales[idx] props = np.outer(np.ones(counts[i_clus]), spp_sales) - np.diag(spp_sales) p[idx] = np.sum(props, axis=1) / (1 - spp_sales + 1e-10) # Calculate variance and log-likelihood components var = p[idx] * (1 - p[idx]) * p_sales[idx] var_list.append(var) sd_comp = np.log(1 / (np.sqrt(np.sum(var) * 2 * np.pi) + 1e-10)) s_diff = np.sum(p_hat[idx] * p_sales[idx]) - np.sum(p[idx] * p_sales[idx]) # Update log-likelihood log_lh += sd_comp - (np.sign(s_diff) * (s_diff ** 2)) / (2 * np.sum(var) + 1e-10) # Calculate differences diff = np.sum(p_hat - p) diff_sq = np.sum((p_hat - p) ** 2) item_diff = diff / n_items # Scaled differences valid_ix = (~np.isinf(p)) & (~np.isnan(p)) & (np.abs(p) > 1e-10) scaled_diff = np.sum((p_hat[valid_ix] - p[valid_ix]) / p[valid_ix]) # Z-value as per Urban and Hauser m_p_hat = np.sum(p_hat * p_sales) m_p = np.sum(p * p_sales) denominator = np.sqrt(np.sum(p_hat * (1 - p_hat) * p_sales) + 1e-10) z_value = (m_p_hat - m_p) / denominator return ClusterEvaluationResult( assign=cluster_assign, indexes=indexes, counts=counts, diff=diff, item_diff=item_diff, scaled_diff=scaled_diff, z_value=z_value, max_obj=diff, log_lh=log_lh, diff_sq=diff_sq, p_hat=p_hat, p=p, var=var_list, n_clusters=n_clusters, n_items=n_items )
[docs] def evaluate_legacy( self, swm: NDArray[np.float64], n_clusters: int, cluster_assign: NDArray[np.int64] ) -> Dict[str, Union[float, NDArray[np.float64]]]: """Legacy evaluation function matching kEvaluateClustering.m. This is a simplified version without some of the advanced statistics. Args: swm: Product x product switching matrix n_clusters: Number of clusters cluster_assign: Product cluster assignments (1 to n_clusters) Returns: Dictionary containing evaluation metrics """ n_items = swm.shape[0] # Calculate sales and proportions p_sales = np.sum(swm, axis=1) pswm = swm / (p_sales[:, np.newaxis] + 1e-10) pp_sales = p_sales / np.sum(p_sales) # Initialize arrays p_hat = np.zeros(n_items) p = np.zeros(n_items) # Calculate values for each cluster for i_clus in range(1, n_clusters + 1): idx = np.where(cluster_assign == i_clus)[0] if len(idx) == 0: continue # Extract submatrix sub_swm = pswm[np.ix_(idx, idx)] # PHat values p_hat[idx] = np.sum(sub_swm, axis=1) # P values spp_sales = pp_sales[idx] n_idx = len(idx) props = np.outer(np.ones(n_idx), spp_sales) - np.diag(spp_sales) p[idx] = np.sum(props, axis=1) / (1 - spp_sales + 1e-10) # Calculate metrics diff = np.sum(p_hat - p) item_diff = diff / n_items # Scaled differences valid_ix = (~np.isinf(p)) & (~np.isnan(p)) & (np.abs(p) > 1e-10) n_valid = np.sum(valid_ix) scaled_diff = np.sum((p_hat[valid_ix] - p[valid_ix]) / p[valid_ix]) # Z-value m_p_hat = np.mean(p_hat) m_p = np.mean(p) all_sales = np.sum(p_sales) z_value = (m_p_hat - m_p) / np.sqrt(m_p * (1 - m_p) / all_sales + 1e-10) # Log-likelihood (simplified version) var = p[valid_ix] * (1 - p[valid_ix]) / (p_sales[valid_ix] + 1e-10) part1 = n_valid * np.log(2 * np.pi) part2 = np.sum(var) part3 = np.sum(((p_hat[valid_ix] - p[valid_ix]) ** 2) / np.sqrt(var + 1e-10)) log_lh = -0.5 * (part1 + part2 + part3) return { "diff": diff, "item_diff": item_diff, "scaled_diff": scaled_diff, "z_value": z_value, "log_lh": log_lh, "p": p, "p_hat": p_hat, "max_obj": diff }