Source code for submarit.validation.p_values

"""P-value calculation utilities for SUBMARIT validation.

This module provides utilities for calculating p-values from empirical
distributions, supporting both one-tailed and two-tailed tests, as well
as multiple testing corrections.
"""

from typing import Dict, List, Optional, Tuple, Union
from dataclasses import dataclass
import numpy as np
from numpy.typing import NDArray
import warnings
from scipy import stats


[docs] @dataclass class PValueResult: """Container for p-value calculation results. Attributes: observed: Observed test statistic value p_value: Raw p-value p_value_corrected: Corrected p-value (if correction applied) percentile: Percentile of observed value in distribution ci_lower: Lower confidence interval bound ci_upper: Upper confidence interval bound direction: Test direction ('greater', 'less', or 'two-sided') n_simulations: Number of simulations in empirical distribution """ observed: float p_value: float p_value_corrected: Optional[float] = None percentile: Optional[float] = None ci_lower: Optional[float] = None ci_upper: Optional[float] = None direction: str = 'greater' n_simulations: Optional[int] = None
[docs] def is_significant(self, alpha: float = 0.05) -> bool: """Check if result is statistically significant. Parameters ---------- alpha : float, default=0.05 Significance level Returns ------- bool True if p-value < alpha """ p = self.p_value_corrected if self.p_value_corrected is not None else self.p_value return p < alpha
[docs] def summary(self) -> str: """Generate summary of p-value results.""" lines = [ f"P-value Analysis Results", f"=======================", f"Observed value: {self.observed:.6f}", f"P-value ({self.direction}): {self.p_value:.4f}", ] if self.p_value_corrected is not None: lines.append(f"Corrected p-value: {self.p_value_corrected:.4f}") if self.percentile is not None: lines.append(f"Percentile: {self.percentile:.1f}%") if self.ci_lower is not None and self.ci_upper is not None: lines.append(f"95% CI: [{self.ci_lower:.6f}, {self.ci_upper:.6f}]") if self.n_simulations is not None: lines.append(f"Based on {self.n_simulations} simulations") return "\n".join(lines)
[docs] def calculate_empirical_p_value( observed: float, null_distribution: NDArray[np.float64], direction: str = 'greater', return_percentile: bool = True ) -> PValueResult: """Calculate p-value from empirical distribution. Parameters ---------- observed : float Observed test statistic null_distribution : ndarray Null distribution values (should be sorted) direction : {'greater', 'less', 'two-sided'}, default='greater' Direction of the test: - 'greater': P(X >= observed) - 'less': P(X <= observed) - 'two-sided': 2 * min(P(X >= observed), P(X <= observed)) return_percentile : bool, default=True Whether to calculate percentile of observed value Returns ------- result : PValueResult P-value calculation results """ n = len(null_distribution) if n == 0: raise ValueError("Null distribution is empty") # Ensure distribution is sorted if not np.all(null_distribution[:-1] <= null_distribution[1:]): null_distribution = np.sort(null_distribution) # Calculate p-value based on direction if direction == 'greater': # P(X >= observed) n_greater_equal = np.sum(null_distribution >= observed) p_value = (n_greater_equal + 1) / (n + 1) # Add 1 for continuity correction elif direction == 'less': # P(X <= observed) n_less_equal = np.sum(null_distribution <= observed) p_value = (n_less_equal + 1) / (n + 1) elif direction == 'two-sided': # Two-sided test n_greater_equal = np.sum(null_distribution >= observed) n_less_equal = np.sum(null_distribution <= observed) p_greater = (n_greater_equal + 1) / (n + 1) p_less = (n_less_equal + 1) / (n + 1) p_value = 2 * min(p_greater, p_less) p_value = min(p_value, 1.0) # Ensure p <= 1 else: raise ValueError(f"Invalid direction: {direction}") # Calculate percentile if requested percentile = None if return_percentile: n_below = np.sum(null_distribution < observed) percentile = 100 * n_below / n # Calculate confidence interval (2.5% and 97.5% percentiles) ci_lower = np.percentile(null_distribution, 2.5) ci_upper = np.percentile(null_distribution, 97.5) return PValueResult( observed=observed, p_value=p_value, percentile=percentile, ci_lower=ci_lower, ci_upper=ci_upper, direction=direction, n_simulations=n )
[docs] def calculate_p_value_range( observed: float, null_distribution: NDArray[np.float64] ) -> Tuple[float, float]: """Calculate upper and lower p-value bounds. This implements the methodology from MATLAB code where both upper and lower p-values are returned as a tuple. Parameters ---------- observed : float Observed test statistic null_distribution : ndarray Sorted null distribution values Returns ------- p_range : tuple (upper_p, lower_p) where: - upper_p: Probability of values >= observed - lower_p: Probability of values <= observed """ n = len(null_distribution) # Find indices with values greater than observed gr_indices = np.where(null_distribution > observed)[0] if len(gr_indices) == 0: # Observed is better than all values return (0.0, 1.0) else: # Calculate proportion below observed below = (gr_indices[0] - 1) / n return (1 - below, below)
[docs] def multiple_testing_correction( p_values: Union[List[float], NDArray[np.float64]], method: str = 'bonferroni', alpha: float = 0.05 ) -> Dict[str, Union[NDArray[np.float64], float]]: """Apply multiple testing correction to p-values. Parameters ---------- p_values : array-like Raw p-values to correct method : {'bonferroni', 'holm', 'fdr_bh', 'fdr_by'}, default='bonferroni' Correction method: - 'bonferroni': Bonferroni correction - 'holm': Holm-Bonferroni method - 'fdr_bh': Benjamini-Hochberg FDR - 'fdr_by': Benjamini-Yekutieli FDR alpha : float, default=0.05 Family-wise error rate or false discovery rate Returns ------- results : dict Dictionary containing: - 'corrected': Corrected p-values - 'rejected': Boolean array of rejected hypotheses - 'threshold': Adjusted significance threshold - 'method': Correction method used """ p_values = np.asarray(p_values) n_tests = len(p_values) if n_tests == 0: raise ValueError("No p-values provided") if method == 'bonferroni': # Bonferroni correction corrected = np.minimum(p_values * n_tests, 1.0) threshold = alpha / n_tests rejected = p_values < threshold elif method == 'holm': # Holm-Bonferroni method sorted_idx = np.argsort(p_values) sorted_p = p_values[sorted_idx] corrected = np.zeros_like(p_values) rejected = np.zeros(n_tests, dtype=bool) for i in range(n_tests): adjusted_p = sorted_p[i] * (n_tests - i) corrected[sorted_idx[i]] = min(adjusted_p, 1.0) if adjusted_p < alpha: rejected[sorted_idx[i]] = True else: break # Stop at first non-rejection threshold = alpha / (n_tests - np.sum(rejected) + 1) elif method == 'fdr_bh': # Benjamini-Hochberg FDR control sorted_idx = np.argsort(p_values) sorted_p = p_values[sorted_idx] # Find largest i such that P(i) <= (i/m) * alpha thresh_idx = np.where(sorted_p <= (np.arange(1, n_tests + 1) / n_tests) * alpha)[0] if len(thresh_idx) > 0: max_idx = thresh_idx[-1] threshold = sorted_p[max_idx] rejected = p_values <= threshold else: threshold = 0.0 rejected = np.zeros(n_tests, dtype=bool) # Adjusted p-values corrected = np.zeros_like(p_values) for i in range(n_tests): corrected[sorted_idx[i]] = min( sorted_p[i] * n_tests / (i + 1), 1.0 ) elif method == 'fdr_by': # Benjamini-Yekutieli FDR control c_m = np.sum(1.0 / np.arange(1, n_tests + 1)) # Harmonic sum sorted_idx = np.argsort(p_values) sorted_p = p_values[sorted_idx] thresh_idx = np.where( sorted_p <= (np.arange(1, n_tests + 1) / (n_tests * c_m)) * alpha )[0] if len(thresh_idx) > 0: max_idx = thresh_idx[-1] threshold = sorted_p[max_idx] rejected = p_values <= threshold else: threshold = 0.0 rejected = np.zeros(n_tests, dtype=bool) corrected = np.minimum(p_values * n_tests * c_m, 1.0) else: raise ValueError(f"Unknown correction method: {method}") return { 'corrected': corrected, 'rejected': rejected, 'threshold': threshold, 'method': method, 'n_tests': n_tests }
[docs] def combine_p_values( p_values: Union[List[float], NDArray[np.float64]], method: str = 'fisher', weights: Optional[NDArray[np.float64]] = None ) -> Dict[str, float]: """Combine multiple p-values into a single test statistic. Parameters ---------- p_values : array-like P-values to combine method : {'fisher', 'stouffer', 'min', 'max'}, default='fisher' Combination method: - 'fisher': Fisher's combined probability test - 'stouffer': Stouffer's Z-score method - 'min': Minimum p-value - 'max': Maximum p-value weights : array-like, optional Weights for weighted combination (only for Stouffer's method) Returns ------- result : dict Dictionary containing: - 'statistic': Combined test statistic - 'p_value': Combined p-value - 'method': Combination method used """ p_values = np.asarray(p_values) n = len(p_values) if n == 0: raise ValueError("No p-values provided") # Check for invalid p-values if np.any((p_values < 0) | (p_values > 1)): raise ValueError("P-values must be between 0 and 1") if method == 'fisher': # Fisher's combined probability test # -2 * sum(log(p_i)) ~ chi-squared(2n) with np.errstate(divide='ignore'): log_p = np.log(p_values) log_p[p_values == 0] = -np.inf # Handle p=0 statistic = -2 * np.sum(log_p) p_value = 1 - stats.chi2.cdf(statistic, df=2*n) elif method == 'stouffer': # Stouffer's Z-score method # sum(Phi^(-1)(1-p_i)) / sqrt(n) ~ N(0,1) z_scores = stats.norm.ppf(1 - p_values) if weights is not None: weights = np.asarray(weights) if len(weights) != n: raise ValueError("Weights must have same length as p_values") weights = weights / np.sqrt(np.sum(weights**2)) statistic = np.sum(weights * z_scores) else: statistic = np.sum(z_scores) / np.sqrt(n) p_value = 1 - stats.norm.cdf(statistic) elif method == 'min': # Minimum p-value (Tippett's method) statistic = np.min(p_values) p_value = 1 - (1 - statistic)**n elif method == 'max': # Maximum p-value statistic = np.max(p_values) p_value = statistic**n else: raise ValueError(f"Unknown combination method: {method}") return { 'statistic': statistic, 'p_value': p_value, 'method': method, 'n_tests': n }
[docs] def permutation_test( data1: NDArray[np.float64], data2: NDArray[np.float64], statistic_func: callable, n_permutations: int = 10000, random_state: Optional[int] = None ) -> PValueResult: """Perform permutation test for difference between two groups. Parameters ---------- data1 : ndarray First group data data2 : ndarray Second group data statistic_func : callable Function to compute test statistic. Should accept two arrays and return a scalar statistic. n_permutations : int, default=10000 Number of permutations random_state : int, optional Random seed for reproducibility Returns ------- result : PValueResult Permutation test results """ # Compute observed statistic observed = statistic_func(data1, data2) # Combine data combined = np.concatenate([data1, data2]) n1 = len(data1) n_total = len(combined) # Generate permutation distribution rng = np.random.RandomState(random_state) perm_stats = np.zeros(n_permutations) for i in range(n_permutations): # Permute combined data perm_idx = rng.permutation(n_total) perm_data1 = combined[perm_idx[:n1]] perm_data2 = combined[perm_idx[n1:]] # Compute permutation statistic perm_stats[i] = statistic_func(perm_data1, perm_data2) # Calculate p-value (two-sided by default) return calculate_empirical_p_value( observed, perm_stats, direction='two-sided' )