"""
GLM-based statistical testing for brain connectivity networks.
Implements edge-wise General Linear Model with Freedman-Lane permutation
for confound-aware inference on connectivity matrices. Supports all
enhancement methods (TFNBS, NBS, cNBS, NI-TFNBS, FBC-TFNBS).
Main Functions
--------------
compute_glm_stat : Compute GLM t-statistics for connectivity data
compute_p_val_glm : Full GLM pipeline with permutation testing
build_design_matrix : Convenience builder for design matrix + contrast
References
----------
Freedman & Lane (1983). A nonstochastic interpretation of reported significance levels.
Anderson & Legendre (1999). An empirical comparison of permutation methods.
Winkler et al. (2014). Permutation inference for the general linear model.
"""
from __future__ import annotations
import logging
import warnings
from enum import Enum
from functools import partial
from multiprocessing import Pool
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
import numpy as np
import numpy.typing as npt
from .defaults import (
DEFAULT_EXTENT_EXPONENT,
DEFAULT_HEIGHT_EXPONENT,
DEFAULT_N_THRESHOLDS_PERMUTATION as DEFAULT_N_THRESHOLDS,
DEFAULT_N_PERMUTATIONS,
DEFAULT_START_THRESHOLD,
DEFAULT_MIN_CLUSTER_SIZE,
DEFAULT_NBS_THRESHOLD,
DEFAULT_NBS_STAT,
)
from ._enhancement import (
apply_cnbs,
apply_fbc_tfnbs,
apply_nbs,
apply_ni_tfnbs,
apply_tfnbs,
)
from .pairwise_stats import (
_extract_max_stats,
_collect_results_to_arrays,
_compute_p_values_from_null,
_is_worker_process,
compute_p_val,
get_available_cores,
StatMethod,
CONSTRAINED_METHODS,
)
import time
from .acceleration import compute_p_values_accelerated
from ._compat import TailResult, make_tail_result, normalize_keys
from ._progress import run_permutations
from ._result import InferenceResult, OmnibusInferenceResult
from ._rng import RngLike, resolve_seed, warn_legacy_random_state
__all__ = [
"GLMStatType",
"compute_glm_stat",
"compute_p_val_glm",
"compute_p_val_paired_glm",
"build_design_matrix",
]
logger = logging.getLogger(__name__)
# =============================================================================
# Enums
# =============================================================================
[docs]
class GLMStatType(str, Enum):
"""Statistic type for GLM inference."""
TSTAT = "tstat"
"""t-statistic: beta / SE(beta). Scale-invariant, start_thres=1.65 valid."""
BETA = "beta"
"""Raw regression coefficient. Interpretable, needs adapted threshold."""
FSTAT = "fstat"
"""F-statistic for an omnibus (multi-row) contrast.
Non-negative by construction — the enhancement pipeline sees a single
``'omnibus'`` key instead of a ``positive``/``negative`` split. For a
single-row contrast, F == t² (same rejection regions, same rankings)."""
# =============================================================================
# OLS precomputation
# =============================================================================
def _precompute_ols(
X: npt.NDArray[np.float64],
) -> Tuple[npt.NDArray[np.float64], npt.NDArray[np.float64], npt.NDArray[np.float64]]:
"""
Precompute OLS quantities that are constant across permutations.
Parameters
----------
X : ndarray of shape (n_subjects, p)
Design matrix (must include intercept if desired).
Returns
-------
X_pinv : ndarray of shape (p, n_subjects)
Moore-Penrose pseudoinverse of the design matrix.
XtX_inv_diag : ndarray of shape (p,)
Diagonal of the generalized inverse of X'X, needed for SE computation.
XtX_inv : ndarray of shape (p, p)
Generalized inverse of X'X, needed for contrast SE computation.
"""
rank = np.linalg.matrix_rank(X)
if rank < X.shape[1]:
logger.warning(
"Design matrix is rank deficient "
f"(rank={rank}, columns={X.shape[1]}). "
"Using Moore-Penrose pseudoinverse; non-estimable contrasts may "
"still be unstable."
)
else:
cond = np.linalg.cond(X)
if cond > 1e12:
logger.warning(
f"Design matrix is ill-conditioned (cond={cond:.1e}). "
f"Results may be numerically unstable."
)
X_pinv = np.linalg.pinv(X)
XtX_inv = X_pinv @ X_pinv.T
XtX_inv_diag = np.diag(XtX_inv)
return X_pinv, XtX_inv_diag, XtX_inv
def _residual_degrees_of_freedom(X: npt.NDArray[np.float64]) -> int:
"""Return residual df based on design rank, not raw column count."""
df = X.shape[0] - int(np.linalg.matrix_rank(X))
if df <= 0:
raise ValueError(
"Design matrix has no residual degrees of freedom "
f"(n={X.shape[0]}, rank={np.linalg.matrix_rank(X)})."
)
return df
def _warn_beta_threshold_scale(
stat_type_str: str,
method_enum: StatMethod,
) -> None:
"""Warn when beta-scale statistics are paired with threshold enhancers."""
threshold_methods = {
StatMethod.NBS,
StatMethod.TFNBS,
StatMethod.NI_TFNBS,
StatMethod.FBC_TFNBS,
}
if (
stat_type_str == GLMStatType.BETA.value
and method_enum in threshold_methods
):
warnings.warn(
"stat_type='beta' uses raw coefficient units. Ensure NBS/TFNBS "
"thresholds are chosen on the beta scale, not the default t-stat "
"scale.",
UserWarning,
stacklevel=3,
)
# =============================================================================
# Core GLM computation
# =============================================================================
[docs]
def compute_glm_stat(
Y: npt.NDArray[np.float64],
X: npt.NDArray[np.float64],
contrast: npt.NDArray[np.float64],
stat_type: Union[str, GLMStatType] = GLMStatType.TSTAT,
X_pinv: Optional[npt.NDArray[np.float64]] = None,
XtX_inv_diag: Optional[npt.NDArray[np.float64]] = None,
XtX_inv: Optional[npt.NDArray[np.float64]] = None,
) -> Dict[str, npt.NDArray[np.float64]]:
"""
Compute edge-wise GLM statistics for connectivity matrices.
For each edge (i,j): Y_ij = X @ beta_ij + epsilon_ij.
- ``tstat``/``beta``: single-column contrast, returns ``{positive, negative}``.
- ``fstat``: multi-row contrast (omnibus F), returns ``{omnibus}`` — F is
non-negative by construction, so there is no direction to split.
Parameters
----------
Y : ndarray of shape (n_subjects, N, N)
Connectivity matrices (symmetric, zero diagonal).
X : ndarray of shape (n_subjects, p)
Design matrix (should include intercept column if needed).
contrast : ndarray of shape (p,) or (r, p)
Contrast vector (for ``tstat``/``beta``: 1D) or contrast matrix (for
``fstat``: 2D with ``r`` rows). A 1D contrast passed with ``stat_type
='fstat'`` is treated as a 1×p matrix (F == t² at each edge).
stat_type : {'tstat', 'beta', 'fstat'} or GLMStatType, default='tstat'
Type of statistic to compute.
X_pinv : ndarray of shape (p, n_subjects), optional
Precomputed Moore-Penrose pseudoinverse. If None, computed
internally by :func:`_precompute_ols`.
XtX_inv_diag : ndarray of shape (p,), optional
Precomputed diagonal of the generalized inverse of ``X'X``.
Required when ``stat_type='tstat'`` and ``X_pinv`` is provided.
XtX_inv : ndarray of shape (p, p), optional
Generalized inverse of ``X'X``. Required for contrast standard
errors when ``X_pinv`` is provided.
Returns
-------
dict
For ``tstat``/``beta``: ``{'positive': (N, N), 'negative': (N, N)}``,
both non-negative.
For ``fstat``: ``{'omnibus': (N, N)}``, non-negative.
"""
stat_type_str = stat_type.value if isinstance(stat_type, GLMStatType) else stat_type
n, N, _ = Y.shape
contrast = np.asarray(contrast, dtype=np.float64)
p = X.shape[1]
# Precompute if not provided
if X_pinv is None:
X_pinv, XtX_inv_diag, XtX_inv = _precompute_ols(X)
# Compute betas: (p, N, N)
# Y reshaped to (n, N*N), then beta = X_pinv @ Y_flat → (p, N*N) → (p, N, N)
Y_flat = Y.reshape(n, -1) # (n, N*N)
beta_flat = X_pinv @ Y_flat # (p, N*N)
beta = beta_flat.reshape(p, N, N) # (p, N, N)
if stat_type_str == GLMStatType.FSTAT.value:
# --- F-statistic: (Cβ)' [C(X'X)^+ C']^-1 (Cβ) / (r · σ²) ---
if contrast.ndim == 1:
C = contrast.reshape(1, -1)
elif contrast.ndim == 2:
C = contrast
else:
raise ValueError(
f"F-stat contrast must be 1D or 2D, got ndim={contrast.ndim}."
)
r = C.shape[0]
if C.shape[1] != p:
raise ValueError(
f"Contrast has {C.shape[1]} columns but design matrix has {p}."
)
# Residual variance per edge
residuals_flat = Y_flat - X @ beta_flat # (n, N*N)
df = _residual_degrees_of_freedom(X)
sigma2_flat = np.sum(residuals_flat ** 2, axis=0) / df
sigma2 = sigma2_flat.reshape(N, N)
if XtX_inv is None:
_, _, XtX_inv = _precompute_ols(X)
# M = C (X'X)^+ C' shape (r, r); invert once (small)
M = C @ XtX_inv @ C.T
if np.linalg.matrix_rank(M) < r:
raise ValueError(
"C(X'X)⁻¹C' is singular — contrast rows are not linearly "
"independent on the column space of X."
)
try:
M_inv = np.linalg.inv(M)
except np.linalg.LinAlgError as err:
raise ValueError(
"C(X'X)⁻¹C' is singular — contrast rows are not linearly "
"independent on the column space of X."
) from err
# Cβ shape (r, N, N); quadratic form per edge via einsum
Cbeta = np.tensordot(C, beta, axes=([1], [0])) # (r, N, N)
quad = np.einsum("ixy,ij,jxy->xy", Cbeta, M_inv, Cbeta) # (N, N)
with np.errstate(divide="ignore", invalid="ignore"):
F = quad / (r * sigma2)
F = np.where(sigma2 == 0, 0.0, F)
F = np.where(F < 0, 0.0, F) # guard numerical noise near zero
return {"omnibus": F}
# --- t-stat / beta path: single-column contrast, signed output ---
if contrast.ndim != 1:
raise ValueError(
f"stat_type='{stat_type_str}' requires a 1D contrast; got "
f"ndim={contrast.ndim}. Use stat_type='fstat' for multi-row contrasts."
)
# Contrast beta: scalar statistic per edge
# contrast @ beta → (N, N)
c_beta = np.tensordot(contrast, beta, axes=([0], [0])) # (N, N)
if stat_type_str == GLMStatType.BETA.value:
stat = c_beta
elif stat_type_str == GLMStatType.TSTAT.value:
# Residuals: (n, N*N)
residuals_flat = Y_flat - X @ beta_flat # (n, N*N)
# Residual variance per edge: sigma^2 = RSS / (n - rank(X))
df = _residual_degrees_of_freedom(X)
sigma2_flat = np.sum(residuals_flat ** 2, axis=0) / df # (N*N,)
sigma2 = sigma2_flat.reshape(N, N)
# SE of contrast beta: sqrt(c' (X'X)^+ c * sigma^2)
if XtX_inv is None:
_, _, XtX_inv = _precompute_ols(X)
c_XtX_inv_c = contrast @ XtX_inv @ contrast # scalar
se = np.sqrt(c_XtX_inv_c * sigma2)
with np.errstate(divide='ignore', invalid='ignore'):
stat = c_beta / se
stat = np.where(se == 0, 0, stat)
else:
raise ValueError(
f"Invalid stat_type: '{stat_type_str}'. "
f"Must be one of: {[s.value for s in GLMStatType]}"
)
# Separate positive and negative effects
positive = np.where(stat > 0, stat, 0.0)
negative = np.where(stat < 0, -stat, 0.0)
return make_tail_result(positive, negative)
# =============================================================================
# Design matrix builder
# =============================================================================
[docs]
def build_design_matrix(
interest: npt.NDArray[np.float64],
confounds: Optional[npt.NDArray[np.float64]] = None,
) -> Tuple[npt.NDArray[np.float64], npt.NDArray[np.float64]]:
"""
Build design matrix and contrast vector from interest and confound variables.
Constructs X = [intercept, confounds, interest] and contrast = [0, ..., 0, 1]
targeting the last column (interest variable).
Parameters
----------
interest : ndarray of shape (n_subjects,)
Variable of interest (continuous or binary).
confounds : ndarray of shape (n_subjects,) or (n_subjects, q), optional
Confound variables. If 1D, treated as single confound.
Returns
-------
X : ndarray of shape (n_subjects, p)
Design matrix with intercept, confounds, and interest.
contrast : ndarray of shape (p,)
Contrast vector [0, ..., 0, 1] targeting the interest column.
"""
n = len(interest)
interest = np.asarray(interest, dtype=np.float64)
if interest.ndim == 1:
interest = interest[:, np.newaxis]
intercept = np.ones((n, 1), dtype=np.float64)
if confounds is not None:
confounds = np.asarray(confounds, dtype=np.float64)
if confounds.ndim == 1:
confounds = confounds[:, np.newaxis]
X = np.hstack([intercept, confounds, interest])
else:
X = np.hstack([intercept, interest])
contrast = np.zeros(X.shape[1], dtype=np.float64)
contrast[-1] = 1.0
return X, contrast
# =============================================================================
# Freedman-Lane permutation helpers
# =============================================================================
def _compute_reduced_model(
X: npt.NDArray[np.float64],
contrast: npt.NDArray[np.float64],
) -> npt.NDArray[np.float64]:
"""
Compute the reduced design matrix for Freedman-Lane permutation.
The reduced model contains all columns NOT targeted by the contrast.
For a 2D contrast matrix (F-test), a column is targeted if ANY row of
the contrast has a non-zero entry for it.
Parameters
----------
X : ndarray of shape (n_subjects, p)
Full design matrix.
contrast : ndarray of shape (p,) or (r, p)
Contrast vector (1D, t-test) or contrast matrix (2D, F-test).
Returns
-------
X_reduced : ndarray of shape (n_subjects, p_reduced)
Reduced design matrix (columns not touched by the contrast).
"""
contrast = np.asarray(contrast)
if contrast.ndim == 1:
keep_cols = np.where(contrast == 0)[0]
else:
# 2D: keep columns that are zero across ALL contrast rows
keep_cols = np.where(np.all(contrast == 0, axis=0))[0]
if len(keep_cols) == 0:
# All columns targeted — reduced model is just intercept
return np.ones((X.shape[0], 1), dtype=np.float64)
return X[:, keep_cols]
def _freedman_lane_permutation_task(
Y_hat_reduced: npt.NDArray[np.float64],
residuals_reduced: npt.NDArray[np.float64],
X: npt.NDArray[np.float64],
contrast: npt.NDArray[np.float64],
X_pinv: npt.NDArray[np.float64],
XtX_inv_diag: npt.NDArray[np.float64],
XtX_inv: npt.NDArray[np.float64],
reference_shape: Tuple[int, ...],
stat_type: str,
enhance_func: Optional[Callable] = None,
seed: Optional[int] = None,
strata_codes: Optional[npt.NDArray[np.int_]] = None,
**enhance_kwargs,
) -> Dict[str, np.float64]:
"""
Single Freedman-Lane permutation step.
1. Permute reduced-model residuals across subjects (within-stratum
only when ``strata_codes`` is provided — PALM ``-eb`` semantics).
2. Reconstruct Y_perm = Y_hat_reduced + permuted residuals
3. Fit full model to Y_perm
4. Optionally apply enhancement
5. Return max statistics
Parameters
----------
Y_hat_reduced : ndarray of shape (n_subjects, N, N)
Predicted values from reduced model.
residuals_reduced : ndarray of shape (n_subjects, N, N)
Residuals from reduced model.
X, contrast, X_pinv, XtX_inv_diag, XtX_inv : precomputed GLM quantities
reference_shape : tuple
Shape (N, N) for _extract_max_stats.
stat_type : str
'tstat' or 'beta'.
enhance_func : callable, optional
Enhancement function to apply to GLM stats.
seed : int
Random seed for this permutation.
strata_codes : ndarray of shape (n_subjects,), optional
Integer stratum codes for within-block residual permutation.
**enhance_kwargs
Keyword arguments for enhance_func.
Returns
-------
dict
Maximum statistics per direction.
"""
rng = np.random.RandomState(seed)
if strata_codes is None:
perm_idx = rng.permutation(residuals_reduced.shape[0])
else:
from .pairwise_stats import _stratified_perm
perm_idx = _stratified_perm(strata_codes, rng)
residuals_perm = residuals_reduced[perm_idx]
Y_perm = Y_hat_reduced + residuals_perm
stat_dict = compute_glm_stat(
Y_perm, X, contrast,
stat_type=stat_type,
X_pinv=X_pinv,
XtX_inv_diag=XtX_inv_diag,
XtX_inv=XtX_inv,
)
if enhance_func is not None:
stat_dict = enhance_func(stat_dict, **enhance_kwargs)
return _extract_max_stats(stat_dict, reference_shape)
# =============================================================================
# Enhancement registry
# =============================================================================
# Wrappers live in _enhancement.py and are shared with the t-test pipeline —
# see pairwise_stats.compute_p_val.
_ENHANCE_MAP = {
StatMethod.TSTAT: None, # No enhancement
StatMethod.TFNBS: apply_tfnbs,
StatMethod.NBS: apply_nbs,
StatMethod.CNBS: apply_cnbs,
StatMethod.NI_TFNBS: apply_ni_tfnbs,
StatMethod.FBC_TFNBS: apply_fbc_tfnbs,
}
# =============================================================================
# Main orchestrator
# =============================================================================
[docs]
def compute_p_val_glm(
Y: npt.NDArray[np.float64],
# --- Advanced API ---
design_matrix: Optional[npt.NDArray[np.float64]] = None,
contrast: Optional[npt.NDArray[np.float64]] = None,
# --- Convenience API ---
interest: Optional[npt.NDArray[np.float64]] = None,
confounds: Optional[npt.NDArray[np.float64]] = None,
# --- Common parameters ---
stat_type: Union[str, GLMStatType] = GLMStatType.TSTAT,
method: Union[str, StatMethod] = StatMethod.TFNBS,
n_permutations: int = DEFAULT_N_PERMUTATIONS,
two_tailed: bool = False,
acceleration: Optional[str] = None,
use_mp: bool = True,
rng: RngLike = None,
random_state: Optional[int] = None, # deprecated alias for `rng`
n_processes: Optional[int] = None,
verbose: bool = False,
# --- Method-specific parameters ---
net_labels: Optional[npt.NDArray[np.int_]] = None,
threshold: float = DEFAULT_NBS_THRESHOLD,
nbs_stat: str = DEFAULT_NBS_STAT,
e: Union[float, List[float]] = DEFAULT_EXTENT_EXPONENT,
h: Union[float, List[float]] = DEFAULT_HEIGHT_EXPONENT,
n: int = DEFAULT_N_THRESHOLDS,
start_thres: float = DEFAULT_START_THRESHOLD,
min_cluster_size: int = DEFAULT_MIN_CLUSTER_SIZE,
normalization: str = "sqrt",
strata: Optional[npt.NDArray[Any]] = None,
) -> Union[InferenceResult, OmnibusInferenceResult]:
"""
Compute p-values for connectivity data using GLM with Freedman-Lane permutation.
Supports two APIs that share the same core engine:
- **Advanced API**: provide ``design_matrix`` and ``contrast`` directly.
- **Convenience API**: provide ``interest`` (and optional ``confounds``);
the design matrix and contrast are built automatically.
Parameters
----------
Y : ndarray of shape (n_subjects, N, N)
Connectivity matrices (symmetric, zero diagonal).
design_matrix : ndarray of shape (n_subjects, p), optional
Full design matrix (advanced API). Mutually exclusive with ``interest``.
contrast : ndarray of shape (p,) or (r, p), optional
Contrast vector (1D, t/beta) or contrast matrix (2D, F-stat).
Required with ``design_matrix``. For ``stat_type='fstat'``, pass a
2D matrix whose rows encode the linear combinations being tested
jointly (e.g., ``[[1, -1, 0], [1, 0, -1]]`` tests β₁=β₂=β₃).
interest : ndarray of shape (n_subjects,), optional
Variable of interest (convenience API). Mutually exclusive with
``design_matrix``. Produces a single-column contrast — use the
advanced API for multi-row F-contrasts.
confounds : ndarray of shape (n_subjects,) or (n_subjects, q), optional
Confound variables (convenience API).
stat_type : {'tstat', 'beta', 'fstat'} or GLMStatType, default='tstat'
Type of statistic to compute. ``'fstat'`` enables an omnibus F-test
for a multi-row contrast matrix and returns a single ``'omnibus'``
p-value map (no positive/negative split).
method : str or StatMethod, default='tfnbs'
Enhancement method.
n_permutations : int, default=1000
Number of permutations for null distribution.
two_tailed : bool, default=False
If False (default), per-tail FWER control (separate null for positive
and negative). If True, joint null from max(max_positive, max_negative).
acceleration : {'gpd', 'gamma'} or None, default=None
Permutation acceleration method (Winkler et al., 2016). If None,
use standard empirical p-values. 'gpd' fits a Generalized Pareto
Distribution to the tail of the null distribution. 'gamma' fits
a gamma distribution using method of moments. Both are
approximation accelerators with empirical fallback; use
``acceleration=None`` for exact empirical finite-permutation
p-values.
use_mp : bool, default=True
Use multiprocessing for permutation testing.
random_state : int, optional
Random seed for reproducibility.
n_processes : int, optional
Number of CPU cores for parallel computing.
net_labels : ndarray of shape (N,), optional
Network labels (required for cnbs, ni_tfnbs, fbc_tfnbs).
threshold, nbs_stat, n, start_thres, min_cluster_size, normalization
Method-specific parameters (see ``compute_p_val`` docstring).
e, h : float or sequence of float
TFNBS extent and height exponents. Pass equal-length sequences to
evaluate a whole ``(E, H)`` grid in one permutation pass. In grid
mode the returned :class:`InferenceResult` carries the parameter
axis (``result.is_grid``, ``result.e_grid``, ``result.h_grid``);
use ``result.select(param_idx)`` or pass ``param_idx=`` to
``significant_edges`` / ``to_csv`` to project to a single cell.
Returns
-------
InferenceResult or OmnibusInferenceResult
For ``stat_type`` in ``{'tstat', 'beta'}``: an
:class:`InferenceResult` with ``positive`` and ``negative`` p-value
maps of shape ``(N, N)`` or ``(N, N, K)`` in grid mode. For
``stat_type='fstat'``: an :class:`OmnibusInferenceResult` with a
single ``omnibus`` map of shape ``(N, N)``.
Raises
------
ValueError
If API arguments are inconsistent or contrast is invalid.
"""
_t0 = time.perf_counter()
# ---- Resolve rng → seed ----
if random_state is not None and rng is None:
warn_legacy_random_state("random_state")
random_state = resolve_seed(rng, legacy_random_state=random_state)
# ---- Resolve API mode ----
if design_matrix is not None and interest is not None:
raise ValueError(
"Provide either (design_matrix, contrast) or (interest, confounds), "
"not both."
)
if design_matrix is None and interest is None:
raise ValueError(
"Must provide either design_matrix or interest."
)
if interest is not None:
X, contrast_vec = build_design_matrix(interest, confounds)
else:
X = np.asarray(design_matrix, dtype=np.float64)
if contrast is None:
raise ValueError("contrast is required when using design_matrix.")
contrast_vec = np.asarray(contrast, dtype=np.float64)
# ---- Validate inputs ----
n_subjects = Y.shape[0]
N = Y.shape[1]
p = X.shape[1]
if X.shape[0] != n_subjects:
raise ValueError(
f"Design matrix has {X.shape[0]} rows but Y has {n_subjects} subjects."
)
# Contrast may be 1D (t/beta) or 2D (F-stat). Check the column dimension.
if contrast_vec.ndim == 1:
if contrast_vec.shape[0] != p:
raise ValueError(
f"Contrast vector length ({contrast_vec.shape[0]}) must match "
f"number of columns in design matrix ({p})."
)
elif contrast_vec.ndim == 2:
if contrast_vec.shape[1] != p:
raise ValueError(
f"Contrast matrix has {contrast_vec.shape[1]} columns but design "
f"matrix has {p}."
)
else:
raise ValueError(
f"Contrast must be 1D or 2D, got ndim={contrast_vec.ndim}."
)
if np.all(contrast_vec == 0):
raise ValueError("Contrast must have at least one non-zero entry.")
if acceleration is not None and acceleration not in ("gpd", "gamma"):
raise ValueError(
f"Invalid acceleration: '{acceleration}'. Must be 'gpd', 'gamma', or None."
)
stat_type_str = stat_type.value if isinstance(stat_type, GLMStatType) else stat_type
method_str = method.value if isinstance(method, StatMethod) else method
try:
method_enum = StatMethod(method_str)
except ValueError:
valid = [m.value for m in StatMethod if m in _ENHANCE_MAP]
raise ValueError(f"Invalid method: '{method_str}'. Must be one of: {valid}")
if method_enum not in _ENHANCE_MAP:
raise ValueError(
f"Method '{method_str}' is not supported in the GLM pipeline. "
f"Use compute_p_val() for parametric baselines."
)
_warn_beta_threshold_scale(stat_type_str, method_enum)
if method_enum in CONSTRAINED_METHODS and net_labels is None:
raise ValueError(
f"Method '{method_str}' requires net_labels."
)
# ---- Build enhancement kwargs ----
enhance_kwargs: Dict[str, Any] = {}
if method_enum == StatMethod.NBS:
enhance_kwargs = {"threshold": threshold, "nbs_stat": nbs_stat}
elif method_enum in {StatMethod.TFNBS, StatMethod.NI_TFNBS, StatMethod.FBC_TFNBS}:
enhance_kwargs = {"e": e, "h": h, "n": n, "start_thres": start_thres}
if method_enum in CONSTRAINED_METHODS:
enhance_kwargs["net_labels"] = net_labels
if method_enum == StatMethod.FBC_TFNBS:
enhance_kwargs["min_cluster_size"] = min_cluster_size
if method_enum == StatMethod.NI_TFNBS:
enhance_kwargs["normalization"] = normalization
elif method_enum == StatMethod.CNBS:
enhance_kwargs = {"net_labels": net_labels}
enhance_func = _ENHANCE_MAP[method_enum]
# ---- Precompute OLS ----
X_pinv, XtX_inv_diag, XtX_inv = _precompute_ols(X)
# ---- Compute observed statistics ----
emp_stat_dict = compute_glm_stat(
Y, X, contrast_vec,
stat_type=stat_type_str,
X_pinv=X_pinv,
XtX_inv_diag=XtX_inv_diag,
XtX_inv=XtX_inv,
)
# Keep the raw (pre-enhancement) stat dict so InferenceResult can
# surface the t/β/F effect map even when the enhanced score is a
# multi-(E, H) tensor.
raw_stat_dict = emp_stat_dict
if enhance_func is not None:
emp_stat_dict = enhance_func(emp_stat_dict, **enhance_kwargs)
# ---- Freedman-Lane: compute reduced model ----
X_reduced = _compute_reduced_model(X, contrast_vec)
X_red_pinv, _, _ = _precompute_ols(X_reduced)
Y_flat = Y.reshape(n_subjects, -1)
Y_hat_reduced_flat = X_reduced @ (X_red_pinv @ Y_flat)
Y_hat_reduced = Y_hat_reduced_flat.reshape(n_subjects, N, N)
residuals_reduced = Y - Y_hat_reduced
reference_shape = (N, N)
# ---- Encode strata for within-block exchangeability (PALM -eb) ----
strata_codes = None
if strata is not None:
from .pairwise_stats import _encode_strata
strata_codes = _encode_strata(strata)
if strata_codes.shape[0] != n_subjects:
raise ValueError(
f"strata length {strata_codes.shape[0]} does not match "
f"n_subjects={n_subjects}."
)
# ---- Generate seeds ----
rng = np.random.RandomState(random_state)
seeds = rng.randint(0, 2**32 - 1, size=n_permutations, dtype=np.int64)
# ---- Build permutation task ----
task_func = partial(
_freedman_lane_permutation_task,
Y_hat_reduced,
residuals_reduced,
X,
contrast_vec,
X_pinv,
XtX_inv_diag,
XtX_inv,
reference_shape,
stat_type_str,
enhance_func,
strata_codes=strata_codes,
**enhance_kwargs,
)
# ---- Run permutations ----
_use_mp = use_mp and not _is_worker_process()
if _use_mp and n_processes is None:
n_processes = get_available_cores()
if _use_mp and n_processes is not None:
n_processes = min(n_processes, n_permutations)
results = run_permutations(
task_func, list(seeds),
use_mp=_use_mp, n_processes=n_processes,
verbose=verbose, desc="glm_perm",
)
max_null_dict = _collect_results_to_arrays(results, n_permutations)
# ---- Two-tailed FWER ----
if two_tailed:
if stat_type_str == GLMStatType.FSTAT.value:
# F is non-negative; there is no sign to split and nothing to pool.
# Silently ignore rather than raise — keeps the API uniform for
# callers that pass two_tailed=True by default.
pass
else:
# Build joint null from max(max_positive, max_negative)
joint_null = np.maximum(
max_null_dict["positive"],
max_null_dict["negative"],
)
max_null_dict = {
"positive": joint_null,
"negative": joint_null,
}
# ---- Compute p-values ----
if acceleration is not None:
result = compute_p_values_accelerated(
emp_stat_dict, max_null_dict, method=acceleration,
)
else:
result = _compute_p_values_from_null(emp_stat_dict, max_null_dict)
method_str = method.value if hasattr(method, "value") else str(method)
from .pairwise_stats import _grid_from_kwargs
e_grid, h_grid = _grid_from_kwargs(method_enum, enhance_kwargs)
if set(result.keys()) == {"positive", "negative"}:
return InferenceResult(
result["positive"], result["negative"],
method=method_str, n_permutations=n_permutations,
acceleration=acceleration,
wall_time_s=time.perf_counter() - _t0,
stat_positive=raw_stat_dict["positive"],
stat_negative=raw_stat_dict["negative"],
stat_type=stat_type_str,
strata_provided=strata is not None,
e_grid=e_grid,
h_grid=h_grid,
)
# F-stat path: single 'omnibus' map → OmnibusInferenceResult
return OmnibusInferenceResult(
result["omnibus"],
method=method_str, n_permutations=n_permutations,
acceleration=acceleration,
wall_time_s=time.perf_counter() - _t0,
stat_omnibus=raw_stat_dict["omnibus"],
stat_type="fstat",
strata_provided=strata is not None,
)
# =============================================================================
# Multi-contrast GLM (shared nuisance, single permutation pass)
# =============================================================================
def _multi_contrast_perm_task(
Y_hat_reduced: npt.NDArray[np.float64],
residuals_reduced: npt.NDArray[np.float64],
X: npt.NDArray[np.float64],
contrast_names: Tuple[str, ...],
contrast_arrays: Tuple[npt.NDArray[np.float64], ...],
X_pinv: npt.NDArray[np.float64],
XtX_inv_diag: npt.NDArray[np.float64],
XtX_inv: npt.NDArray[np.float64],
reference_shape: Tuple[int, ...],
stat_type: str,
enhance_func: Optional[Callable],
enhance_kwargs: Dict[str, Any],
seed: Optional[int] = None,
strata_codes: Optional[npt.NDArray[np.int_]] = None,
) -> Dict[str, Dict[str, np.float64]]:
"""Single Freedman-Lane permutation, evaluated against multiple contrasts.
Permutes residuals once (within-stratum when ``strata_codes`` is
provided), fits the full model once, then for each contrast extracts
the per-edge statistic, optionally enhances, and records max-stats.
Returns a nested dict ``{contrast_name: {tail: max_stat}}``.
"""
rng = np.random.RandomState(seed)
if strata_codes is None:
perm_idx = rng.permutation(residuals_reduced.shape[0])
else:
from .pairwise_stats import _stratified_perm
perm_idx = _stratified_perm(strata_codes, rng)
Y_perm = Y_hat_reduced + residuals_reduced[perm_idx]
out: Dict[str, Dict[str, np.float64]] = {}
for name, contrast in zip(contrast_names, contrast_arrays):
stat_dict = compute_glm_stat(
Y_perm, X, contrast,
stat_type=stat_type,
X_pinv=X_pinv,
XtX_inv_diag=XtX_inv_diag,
XtX_inv=XtX_inv,
)
if enhance_func is not None:
stat_dict = enhance_func(stat_dict, **enhance_kwargs)
out[name] = _extract_max_stats(stat_dict, reference_shape)
return out
def compute_p_val_glm_multi(
Y: npt.NDArray[np.float64],
design_matrix: npt.NDArray[np.float64],
contrasts: Dict[str, npt.NDArray[np.float64]],
*,
nuisance_contrast: Optional[npt.NDArray[np.float64]] = None,
stat_type: Union[str, GLMStatType] = GLMStatType.TSTAT,
method: Union[str, StatMethod] = StatMethod.TFNBS,
n_permutations: int = DEFAULT_N_PERMUTATIONS,
two_tailed: bool = False,
acceleration: Optional[str] = None,
use_mp: bool = True,
rng: RngLike = None,
random_state: Optional[int] = None,
n_processes: Optional[int] = None,
verbose: bool = False,
# --- Method-specific parameters (forwarded to enhancement) ---
net_labels: Optional[npt.NDArray[np.int_]] = None,
threshold: float = DEFAULT_NBS_THRESHOLD,
nbs_stat: str = DEFAULT_NBS_STAT,
e: Union[float, List[float]] = DEFAULT_EXTENT_EXPONENT,
h: Union[float, List[float]] = DEFAULT_HEIGHT_EXPONENT,
n: int = DEFAULT_N_THRESHOLDS,
start_thres: float = DEFAULT_START_THRESHOLD,
min_cluster_size: int = DEFAULT_MIN_CLUSTER_SIZE,
normalization: str = "sqrt",
strata: Optional[npt.NDArray[Any]] = None,
) -> Dict[str, InferenceResult]:
"""Run several contrasts under a shared nuisance model in one perm pass.
For studies with multiple contrasts of interest under the same nuisance
structure (e.g. testing age, sex, and motion separately while using
the others as covariates), the Freedman-Lane reduced model and residual
permutations are shared across contrasts. The current implementation
still recomputes contrast-specific GLM statistics for each contrast, so
the speedup is amortized rather than a full K-fold reuse of one fitted
model.
Parameters
----------
Y : ndarray of shape (n_subjects, N, N)
Connectivity tensor.
design_matrix : ndarray of shape (n_subjects, p)
Full design matrix (intercept + all regressors of interest +
nuisance).
contrasts : dict[str, ndarray]
Named 1D contrast vectors (length ``p``). For a 4-column
``[intercept, age, sex, motion]`` design, pass
``{'age': [0,1,0,0], 'sex': [0,0,1,0], 'motion': [0,0,0,1]}``.
F-stat / multi-row contrasts are not supported here — use
:func:`compute_p_val_glm` for those.
nuisance_contrast : ndarray, optional
Explicit specification of the *combined* set of regressors of
interest. The reduced model excludes any column touched by
``nuisance_contrast``. Defaults to the union of ``contrasts.values()``
(a column is "of interest" if any contrast is non-zero on it).
stat_type : ``'tstat'`` or ``'beta'``
F-stat is unsupported here.
method, two_tailed, acceleration, n_permutations, use_mp, rng,
n_processes, verbose, net_labels, threshold, nbs_stat, e, h, n,
start_thres, min_cluster_size, normalization
Forwarded to the underlying GLM pipeline (see
:func:`compute_p_val_glm`).
Returns
-------
dict[str, InferenceResult]
One :class:`InferenceResult` per contrast name. Wall-time on
each result reflects the *amortised* cost (total wall-time / K),
and ``n_permutations`` is the shared count.
"""
_t0 = time.perf_counter()
if random_state is not None and rng is None:
warn_legacy_random_state("random_state")
random_state = resolve_seed(rng, legacy_random_state=random_state)
if not contrasts:
raise ValueError("contrasts must be a non-empty dict.")
Y = np.asarray(Y, dtype=np.float64)
if Y.ndim != 3 or Y.shape[1] != Y.shape[2]:
raise ValueError(
f"Y must have shape (n_subjects, N, N), got {Y.shape}."
)
n_subjects, N, _ = Y.shape
X = np.asarray(design_matrix, dtype=np.float64)
if X.shape[0] != n_subjects:
raise ValueError(
f"design_matrix has {X.shape[0]} rows but Y has {n_subjects} subjects."
)
p = X.shape[1]
# Validate contrasts: all 1D, all length p
contrast_names: List[str] = list(contrasts.keys())
contrast_arrays: List[npt.NDArray[np.float64]] = []
for name in contrast_names:
c = np.asarray(contrasts[name], dtype=np.float64)
if c.ndim != 1 or c.shape[0] != p:
raise ValueError(
f"contrast {name!r}: must be 1D of length {p}, got shape {c.shape}."
)
contrast_arrays.append(c)
stat_type_str = (
stat_type.value if isinstance(stat_type, GLMStatType) else stat_type
)
if stat_type_str == GLMStatType.FSTAT.value:
raise ValueError(
"F-stat is unsupported in compute_p_val_glm_multi (per-contrast "
"directional split is undefined). Call compute_p_val_glm once "
"per multi-row contrast for omnibus F tests."
)
method_str = method.value if isinstance(method, StatMethod) else method
try:
method_enum = StatMethod(method_str)
except ValueError:
valid = [m.value for m in StatMethod]
raise ValueError(f"Invalid method: {method_str!r}. Must be one of {valid}.")
if method_enum not in _ENHANCE_MAP:
valid = [m.value for m in _ENHANCE_MAP]
raise ValueError(
f"Method {method_str!r} is not supported in the GLM multi-contrast "
f"pipeline. Must be one of {valid}."
)
_warn_beta_threshold_scale(stat_type_str, method_enum)
if method_enum in CONSTRAINED_METHODS and net_labels is None:
raise ValueError(
f"Method {method_str!r} requires net_labels to be provided."
)
enhance_func = _ENHANCE_MAP.get(method_enum)
enhance_kwargs: Dict[str, Any] = {}
if method_enum == StatMethod.NBS:
enhance_kwargs = {"threshold": threshold, "nbs_stat": nbs_stat}
elif method_enum in {StatMethod.TFNBS, StatMethod.NI_TFNBS, StatMethod.FBC_TFNBS}:
enhance_kwargs = {"e": e, "h": h, "n": n, "start_thres": start_thres}
if method_enum in CONSTRAINED_METHODS:
enhance_kwargs["net_labels"] = net_labels
if method_enum == StatMethod.FBC_TFNBS:
enhance_kwargs["min_cluster_size"] = min_cluster_size
if method_enum == StatMethod.NI_TFNBS:
enhance_kwargs["normalization"] = normalization
elif method_enum == StatMethod.CNBS:
enhance_kwargs = {"net_labels": net_labels}
# ---- Reduced model (shared) ----
if nuisance_contrast is None:
# Default: column is "of interest" if ANY contrast is non-zero on it.
union = np.zeros(p, dtype=np.float64)
for c in contrast_arrays:
union = np.where(c != 0, 1.0, union)
reduced_marker = union # 1 where of interest → exclude from reduced model
else:
reduced_marker = np.asarray(nuisance_contrast, dtype=np.float64)
if reduced_marker.ndim == 1 and reduced_marker.shape[0] != p:
raise ValueError(
f"nuisance_contrast length {reduced_marker.shape[0]} does not "
f"match design width {p}."
)
X_reduced = _compute_reduced_model(X, reduced_marker)
X_red_pinv, _, _ = _precompute_ols(X_reduced)
X_pinv, XtX_inv_diag, XtX_inv = _precompute_ols(X)
# ---- Empirical stats per contrast ----
emp_dicts: Dict[str, Dict[str, npt.NDArray[np.float64]]] = {}
raw_stat_dicts: Dict[str, Dict[str, npt.NDArray[np.float64]]] = {}
for name, contrast in zip(contrast_names, contrast_arrays):
emp = compute_glm_stat(
Y, X, contrast, stat_type=stat_type_str,
X_pinv=X_pinv, XtX_inv_diag=XtX_inv_diag, XtX_inv=XtX_inv,
)
# Snapshot the raw (pre-enhancement) statistic for the result
# object — the user-facing effect map should be t/β, not the
# enhanced score.
raw_stat_dicts[name] = emp
if enhance_func is not None:
emp = enhance_func(emp, **enhance_kwargs)
emp_dicts[name] = emp
# ---- Fixed-residuals reduced fit (shared) ----
Y_flat = Y.reshape(n_subjects, -1)
Y_hat_reduced_flat = X_reduced @ (X_red_pinv @ Y_flat)
Y_hat_reduced = Y_hat_reduced_flat.reshape(n_subjects, N, N)
residuals_reduced = Y - Y_hat_reduced
reference_shape = (N, N)
# ---- Encode strata for within-block exchangeability (PALM -eb) ----
strata_codes = None
if strata is not None:
from .pairwise_stats import _encode_strata
strata_codes = _encode_strata(strata)
if strata_codes.shape[0] != n_subjects:
raise ValueError(
f"strata length {strata_codes.shape[0]} does not match "
f"n_subjects={n_subjects}."
)
# ---- Permutation seeds ----
rng_local = np.random.RandomState(random_state)
seeds = rng_local.randint(0, 2**32 - 1, size=n_permutations, dtype=np.int64)
task_func = partial(
_multi_contrast_perm_task,
Y_hat_reduced,
residuals_reduced,
X,
tuple(contrast_names),
tuple(contrast_arrays),
X_pinv,
XtX_inv_diag,
XtX_inv,
reference_shape,
stat_type_str,
enhance_func,
enhance_kwargs,
strata_codes=strata_codes,
)
_use_mp = use_mp and not _is_worker_process()
if _use_mp and n_processes is None:
n_processes = get_available_cores()
if _use_mp and n_processes is not None:
n_processes = min(n_processes, n_permutations)
perm_results = run_permutations(
task_func, list(seeds),
use_mp=_use_mp, n_processes=n_processes,
verbose=verbose, desc="glm_multi",
)
# ---- Re-shape into per-contrast null arrays ----
null_max_per_contrast: Dict[str, Dict[str, npt.NDArray[np.float64]]] = {}
for name in contrast_names:
per_perm = [pr[name] for pr in perm_results]
null_max_per_contrast[name] = _collect_results_to_arrays(
per_perm, n_permutations
)
# ---- Two-tailed pooling ----
if two_tailed:
for name in contrast_names:
joint = np.maximum(
null_max_per_contrast[name]["positive"],
null_max_per_contrast[name]["negative"],
)
null_max_per_contrast[name] = {"positive": joint, "negative": joint}
# ---- Per-contrast p-values + InferenceResult ----
out: Dict[str, InferenceResult] = {}
elapsed = time.perf_counter() - _t0
per_contrast_wall = elapsed / max(1, len(contrast_names))
for name in contrast_names:
emp = emp_dicts[name]
null = null_max_per_contrast[name]
if acceleration is not None:
res = compute_p_values_accelerated(emp, null, method=acceleration)
else:
res = _compute_p_values_from_null(emp, null)
raw = raw_stat_dicts[name]
out[name] = InferenceResult(
res["positive"], res["negative"],
method=method_str, n_permutations=n_permutations,
acceleration=acceleration,
wall_time_s=per_contrast_wall,
stat_positive=raw["positive"],
stat_negative=raw["negative"],
stat_type=stat_type_str,
strata_provided=strata is not None,
)
return out
# =============================================================================
# Paired-difference GLM convenience wrapper
# =============================================================================
[docs]
def compute_p_val_paired_glm(
Y_A: npt.NDArray[np.float64],
Y_B: npt.NDArray[np.float64],
confounds_A: Optional[npt.NDArray[np.float64]] = None,
confounds_B: Optional[npt.NDArray[np.float64]] = None,
**kwargs,
) -> Union[InferenceResult, Dict[str, npt.NDArray[np.float64]]]:
"""
Paired-difference GLM: test ``Y_A`` vs ``Y_B`` within-subject, optionally
controlling for per-subject differences in confounds.
Two routes depending on ``confounds_*``:
- **No confounds** → delegates to
:func:`conninfpy.compute_p_val` with ``test_type='paired'`` (sign-flip
permutation). This is numerically equivalent to a one-sample GLM on
the differences and avoids the Freedman-Lane degeneracy that occurs
when the contrast targets the only column in the design matrix.
- **With confounds** → builds ``Δ_Y = Y_A − Y_B`` and
``Δ_C = confounds_A − confounds_B``, then calls
:func:`compute_p_val_glm` with ``design_matrix = [1, Δ_C]``,
``contrast = [1, 0, ..., 0]`` (intercept as the effect of interest).
Use this when the same subjects are measured under two conditions (task
A vs task B, pre vs post, open vs close) and a confound whose value
*differs between the two conditions for the same subject* (e.g.,
motion, arousal, response time) needs to be partialled out.
Parameters
----------
Y_A, Y_B : ndarray of shape (n_subjects, N, N)
Aligned per-subject connectivity matrices for the two conditions.
``Y_A[s]`` and ``Y_B[s]`` must correspond to the same subject.
confounds_A, confounds_B : ndarray of shape (n_subjects,) or (n_subjects, k), optional
Aligned per-subject confound values for the two conditions. Either
pass both or neither. If passed, the *difference*
``confounds_A - confounds_B`` enters the GLM as a nuisance regressor.
**kwargs
Forwarded to the downstream call (``compute_p_val`` or
``compute_p_val_glm``). ``test_type`` is fixed to ``'paired'`` when
routing to the t-test pipeline and must not be supplied.
Returns
-------
dict
``{'positive', 'negative'}`` regardless of routing. For the
no-confound path, ``positive`` corresponds to ``Y_B > Y_A`` (B has
greater connectivity); for the with-confound path it corresponds
to the sign of the ``Y_A - Y_B`` intercept adjusted for ``Δ_C``.
Legacy keys ``'g2>g1'`` / ``'g1>g2'`` remain accessible with a
:class:`DeprecationWarning` until v2.1.
Raises
------
ValueError
If ``Y_A`` and ``Y_B`` shapes mismatch, only one of the confound
arrays is supplied, or confound shapes mismatch.
"""
Y_A = np.asarray(Y_A, dtype=np.float64)
Y_B = np.asarray(Y_B, dtype=np.float64)
if Y_A.shape != Y_B.shape:
raise ValueError(
f"Y_A and Y_B must have identical shapes, got {Y_A.shape} vs {Y_B.shape}."
)
if Y_A.ndim != 3 or Y_A.shape[1] != Y_A.shape[2]:
raise ValueError(
f"Y_A/Y_B must have shape (n_subjects, N, N), got {Y_A.shape}."
)
has_A = confounds_A is not None
has_B = confounds_B is not None
if has_A != has_B:
raise ValueError(
"Provide either both confounds_A and confounds_B, or neither. "
"Paired-difference inference requires matched confound values "
"in both conditions."
)
# --- No-confound path: delegate to sign-flip paired test ---
if not has_A:
if "test_type" in kwargs:
raise ValueError(
"test_type is fixed to 'paired' by compute_p_val_paired_glm; "
"do not pass it explicitly."
)
return compute_p_val(Y_A, Y_B, test_type="paired", **kwargs)
# --- With-confound path: one-sample GLM on differences ---
confounds_A = np.asarray(confounds_A, dtype=np.float64)
confounds_B = np.asarray(confounds_B, dtype=np.float64)
if confounds_A.shape != confounds_B.shape:
raise ValueError(
f"confounds_A and confounds_B must have identical shapes, "
f"got {confounds_A.shape} vs {confounds_B.shape}."
)
if confounds_A.shape[0] != Y_A.shape[0]:
raise ValueError(
f"confounds have {confounds_A.shape[0]} subjects but Y_A has "
f"{Y_A.shape[0]}."
)
Delta_Y = Y_A - Y_B
Delta_C = confounds_A - confounds_B
if Delta_C.ndim == 1:
Delta_C = Delta_C[:, np.newaxis]
n_subjects = Y_A.shape[0]
intercept = np.ones((n_subjects, 1), dtype=np.float64)
X = np.hstack([intercept, Delta_C])
contrast = np.zeros(X.shape[1], dtype=np.float64)
contrast[0] = 1.0 # test the intercept (= mean of Δ_Y)
# Disallow caller from overriding the design; these keys conflict.
for forbidden in ("design_matrix", "contrast", "interest", "confounds"):
if forbidden in kwargs:
raise ValueError(
f"compute_p_val_paired_glm builds the design internally; "
f"do not pass '{forbidden}'."
)
return compute_p_val_glm(
Delta_Y,
design_matrix=X,
contrast=contrast,
**kwargs,
)