Source code for conninfpy.synth_datasets

"""Synthetic connectivity datasets for benchmarking and testing."""
import numpy as np
from scipy import linalg

try:
    from sklearn.datasets import make_sparse_spd_matrix
    HAS_SKLEARN = True
except ImportError:
    HAS_SKLEARN = False


__all__ = [
    "generate_fc_matrices",
    "generate_multisite_glm_dataset",
    "ModularDatasetGenerator",
]


[docs] def generate_multisite_glm_dataset( n_subjects: int = 30, N: int = 100, n_sites: int = 3, effect_size: float = 0.0, site_shift_sigma: float = 0.2, corr_site_interest: float = 0.0, n_signal_edges: int = 0, base_corr_sparsity: float = 0.9, seed: int | None = None, ) -> dict: """Multi-site GLM connectivity dataset, sized to mimic a Schaefer-100 study. Built for the v2.1 full-pipeline calibration tests (`tests/test_full_pipeline.py`) and for the matrix-level phase of the SC-prior validation work in `Projects/NetworkStatistics/_wiki/pseudo_real_validation.md`. Output is in **Fisher-z** units already (so callers should pass ``fisher_z=False`` to ``analyze()``). Site effects are additive on Fisher-z; signal is linear in the regressor of interest at a fixed set of signal edges. Parameters ---------- n_subjects : int, default 30 Total subjects, evenly distributed across ``n_sites``. N : int, default 100 Number of ROIs. Default 100 mimics Schaefer-100. n_sites : int, default 3 Number of distinct sites; each contributes a fixed symmetric Fisher-z offset on all edges. effect_size : float, default 0.0 Slope of the linear-in-interest perturbation at signal edges. Set to ``0.0`` for an H₀ dataset (no group / regressor effect). site_shift_sigma : float, default 0.2 Scale of the per-site additive Fisher-z offset. Each site's offset is drawn from ``N(0, σ_site²)`` once per simulation. corr_site_interest : float in [0, 1], default 0.0 Population correlation between ``sites`` (as an integer code) and the ``interest`` regressor. ``0.0`` = independent (the H₀-friendly regime); ``0.6`` = the regime where unrestricted permutation after harmonisation leaks Type-I. n_signal_edges : int, default 0 Number of edges carrying the planted signal. Ignored when ``effect_size == 0``. Edges are drawn uniformly from the upper triangle (excluding diagonal). base_corr_sparsity : float, default 0.9 ``alpha`` for ``make_sparse_spd_matrix`` controlling baseline connectivity sparsity (higher → sparser). seed : int, optional Reproducibility handle. Returns ------- dict with keys - ``"Y"`` — ``(n_subjects, N, N)`` Fisher-z connectivity tensor, symmetric with zero diagonal. - ``"interest"`` — ``(n_subjects,)`` regressor of interest. - ``"sites"`` — ``(n_subjects,)`` integer site labels. - ``"signal_mask"`` — ``(N, N)`` boolean upper-triangle mask of true positive edges (all False when ``effect_size == 0`` or ``n_signal_edges == 0``). Notes ----- Designed for end-to-end exercise of ``analyze(Y, interest=..., sites=...)``: it triggers the auto- preserve, auto-strata, ComBat-with-preserve, Freedman-Lane with within-block exchangeability path in a single call. Calibration tests use ``effect_size=0.0`` (H₀); strata-vs-no-strata sanity tests use ``corr_site_interest > 0`` to bring the leak into view. Examples -------- >>> data = generate_multisite_glm_dataset( ... n_subjects=24, N=20, n_sites=3, ... site_shift_sigma=0.3, corr_site_interest=0.6, seed=0, ... ) >>> data["Y"].shape (24, 20, 20) >>> data["sites"].shape, data["interest"].shape ((24,), (24,)) """ if not HAS_SKLEARN: raise ImportError( "scikit-learn is required for generate_multisite_glm_dataset(). " "Install it with `pip install conninfpy[full]`." ) if not 0.0 <= corr_site_interest <= 1.0: raise ValueError( f"corr_site_interest must be in [0, 1], got {corr_site_interest}." ) if n_subjects < n_sites: raise ValueError( f"Need at least one subject per site (n_subjects={n_subjects}, " f"n_sites={n_sites})." ) rng = np.random.default_rng(seed) # ---- Site assignment (round-robin) ---- sites = np.tile(np.arange(n_sites), n_subjects // n_sites + 1)[:n_subjects] # ---- Regressor of interest with target corr(site, interest) ---- # Project site labels to mean-zero, unit-variance to make the # correlation parameter well-defined. site_codes = sites.astype(np.float64) site_centered = (site_codes - site_codes.mean()) / max(site_codes.std(), 1e-12) noise = rng.standard_normal(n_subjects) interest = ( corr_site_interest * site_centered + np.sqrt(max(1.0 - corr_site_interest ** 2, 0.0)) * noise ) # ---- Base covariance + per-subject sampled correlation ---- base_cov = make_sparse_spd_matrix( N, alpha=base_corr_sparsity, norm_diag=True, random_state=rng.integers(0, 2**31 - 1), ) n_obs_for_corr = max(N + 1, 200) # ensure stable per-subject correlation Y = np.zeros((n_subjects, N, N), dtype=np.float64) for i in range(n_subjects): samples = rng.multivariate_normal( mean=np.zeros(N), cov=base_cov, size=n_obs_for_corr, ) Y[i] = np.corrcoef(samples.T) # Fisher r→z (clip to avoid arctanh blow-up at |r|→1) Y = np.clip(Y, -0.9999, 0.9999) Y = np.arctanh(Y) for i in range(n_subjects): np.fill_diagonal(Y[i], 0.0) # ---- Per-site additive shift (symmetric, zero diagonal) ---- if site_shift_sigma > 0: iu = np.triu_indices(N, k=1) site_shifts = np.zeros((n_sites, N, N), dtype=np.float64) for s in range(n_sites): shift_upper = rng.standard_normal(iu[0].size) * site_shift_sigma site_shifts[s][iu] = shift_upper site_shifts[s] = site_shifts[s] + site_shifts[s].T for i in range(n_subjects): Y[i] += site_shifts[sites[i]] # ---- Signal injection ---- signal_mask = np.zeros((N, N), dtype=bool) if effect_size != 0.0 and n_signal_edges > 0: iu = np.triu_indices(N, k=1) chosen = rng.choice(iu[0].size, size=n_signal_edges, replace=False) signal_rows = iu[0][chosen] signal_cols = iu[1][chosen] for i in range(n_subjects): delta = effect_size * interest[i] Y[i, signal_rows, signal_cols] += delta Y[i, signal_cols, signal_rows] += delta signal_mask[signal_rows, signal_cols] = True signal_mask[signal_cols, signal_rows] = True # Re-zero diagonal (site shifts and signal already preserve it) for i in range(n_subjects): np.fill_diagonal(Y[i], 0.0) return { "Y": Y, "interest": interest, "sites": sites, "signal_mask": signal_mask, }
[docs] def generate_fc_matrices(N, effect_size, mask=None, n_samples_group1=50, n_samples_group2=50, repeated_measures=False, seed=None): """ Generate synthetic functional connectivity correlation matrices for groupwise comparisons or repeated measures. Parameters ---------- N : int Number of ROIs (regions of interest), i.e. an ``N x N`` matrix. effect_size : float Magnitude of correlation difference between groups. mask : np.ndarray, optional Binary mask ``(N, N)`` to apply correlation differences. n_samples_group1 : int Number of matrices in group 1 (default: 50). n_samples_group2 : int Number of matrices in group 2 (default: 50). repeated_measures : bool If True, generate within-subject repeated-measures (paired) data, otherwise independent groups (default: False). seed : int, optional Random seed for reproducibility. Returns ------- group1 : np.ndarray Array of FC matrices for group 1, shape ``(n_samples_group1, N, N)``. group2 : np.ndarray Array of FC matrices for group 2, shape ``(n_samples_group2, N, N)``. (base_cov, mod_cov) : tuple of np.ndarray Original covariance matrix and modified covariance matrix with the ``effect_size`` introduced; used for group 1 and group 2 matrices respectively. Examples -------- >>> N = 6; e = 0.2; mask = np.zeros((N, N)) >>> mask[0:2, 0:2] = 1; mask[2:4, 2:4] = -1 >>> g1, g2, (c1, c2) = generate_fc_matrices(N, e, mask, 5, 10, seed=0) >>> g1.shape == (5, 6, 6) True >>> g2.shape == (10, 6, 6) True >>> np.allclose(c1, c1.T) True """ if not HAS_SKLEARN: raise ImportError( "scikit-learn is required for generate_fc_matrices(). " "Install it with `pip install conninfpy[full]`." ) if seed is not None: np.random.seed(seed) if mask is None: mask = np.zeros((N, N)) N_pos_block = N//3 mask[:N_pos_block, :N_pos_block] = 1 N_neg_block = N//3 mask[N_pos_block:N_pos_block+N_neg_block, N_pos_block:N_pos_block+N_neg_block] = -1 # Generate base covariance matrix base_cov = make_sparse_spd_matrix(N, alpha=0.8, norm_diag=True, random_state=seed) # Create modified covariance for the second condition or group mod_cov = base_cov.copy() mod_cov[mask == 1] += effect_size + np.abs(np.random.normal(0,0.05, mod_cov[mask == 1].shape)) # Increase correlations in masked regions mod_cov[mask == -1] -= effect_size - np.abs(np.random.normal(0,0.1, mod_cov[mask == 1].shape)) # Decrease correlations in masked regions np.fill_diagonal(mod_cov, 1.0) mod_cov = (mod_cov+mod_cov.T)/2 def enforce_spd(matrix, eps=1e-6): """ Ensures a matrix is symmetric positive definite (SPD) by adjusting eigenvalues. """ eigvals, eigvecs = np.linalg.eigh(matrix) # Get eigenvalues & eigenvectors eigvals[eigvals < eps] = eps # Replace negative eigenvalues with small positive value return eigvecs @ np.diag(eigvals) @ eigvecs.T # Reconstruct SPD matrix # Enforce SPD property mod_cov = enforce_spd(mod_cov) # Generate sample correlation matrices with variability def generate_samples(cov_matrix, n_samples): return np.array([np.corrcoef(np.random.multivariate_normal(np.zeros(N), cov_matrix, size=N).T) for _ in range(n_samples)]) if repeated_measures: # Generate paired data (same subjects measured twice) group1 = generate_samples(base_cov, n_samples_group1) group2 = generate_samples(mod_cov, n_samples_group1) # Same number as group1 else: # Generate independent groups group1 = generate_samples(base_cov, n_samples_group1) group2 = generate_samples(mod_cov, n_samples_group2) return group1, group2, (base_cov, mod_cov)
[docs] class ModularDatasetGenerator: """ A generator for synthetic functional connectivity data with a modular (block) structure. This class simulates brain connectivity matrices where nodes are organized into distinct functional modules (e.g., Visual, DMN, Motor). It allows for the injection of specific topological effects (within-module or between-module changes) to simulate pathological conditions. """ def __init__( self, N: int, n_modules: int = 5, intra_corr: float = 0.6, inter_corr: float = 0.1, noise_level: float = 0.05, seed: int = None ): """ Initialize the generator. Parameters ---------- N : int Number of nodes (Regions of Interest). n_modules : int Number of functional modules (networks). intra_corr : float Base correlation strength between nodes within the same module. inter_corr : float Base correlation strength between nodes of different modules. noise_level : float Standard deviation of Gaussian noise added to the base covariance structure. seed : int, optional Random seed for reproducibility. """ self.N = N self.n_modules = n_modules self.rng = np.random.default_rng(seed) self.noise_level = noise_level # Assign nodes to modules (evenly distributed) # e.g., for N=10, n=2: [0, 1, 0, 1, ...] sorted to [0, 0, ..., 1, 1, ...] self.labels = np.sort(np.arange(N) % n_modules) # Create the underlying ground-truth covariance for "Healthy" controls self.base_cov = self._create_base_covariance(intra_corr, inter_corr, noise_level) def _create_base_covariance(self, intra: float, inter: float, noise: float) -> np.ndarray: """ Constructs a modular covariance matrix. """ cov = np.zeros((self.N, self.N)) for i in range(self.N): for j in range(self.N): if i == j: cov[i, j] = 1.0 elif self.labels[i] == self.labels[j]: # Within-module connection val = intra + self.rng.normal(0, noise) cov[i, j] = val else: # Between-module connection val = inter + self.rng.normal(0, noise) cov[i, j] = val # Symmetrize cov = (cov + cov.T) / 2 # Clip to ensure values are roughly within correlation bounds before SPD enforcement cov = np.clip(cov, -0.95, 0.95) np.fill_diagonal(cov, 1.0) # Ensure the matrix is mathematically valid (Symmetric Positive Definite) return self._enforce_spd(cov) def _enforce_spd(self, matrix: np.ndarray, eps: float = 1e-6) -> np.ndarray: """ Projects a matrix onto the cone of Symmetric Positive Definite matrices. Replaces negative eigenvalues with a small epsilon and normalizes the diagonal. """ # 1. Symmetrize matrix = (matrix + matrix.T) / 2 # 2. Eigen-decomposition eigvals, eigvecs = np.linalg.eigh(matrix) # 3. Clip negative eigenvalues eigvals[eigvals < eps] = eps # 4. Reconstruct reconstructed = eigvecs @ np.diag(eigvals) @ eigvecs.T # 5. Rescale to ensure 1.0 on diagonal (Correlation matrix property) d = np.sqrt(np.diag(reconstructed)) d_inv = 1.0 / d # D^-1 * M * D^-1 corr = reconstructed * np.outer(d_inv, d_inv) # Final safety clip np.fill_diagonal(corr, 1.0) return np.clip(corr, -1.0, 1.0)
[docs] def get_mask_within_module(self, module_idx: int) -> np.ndarray: """ Returns a binary mask for all edges WITHIN a specific module. """ mask = np.zeros((self.N, self.N), dtype=int) nodes = np.where(self.labels == module_idx)[0] # Create block mask using broadcasting indices # We exclude the diagonal (self-connections) for i in nodes: for j in nodes: if i != j: mask[i, j] = 1 return mask
[docs] def get_mask_between_modules(self, module_idx_A: int, module_idx_B: int) -> np.ndarray: """ Returns a binary mask for all edges connecting module A and module B. """ mask = np.zeros((self.N, self.N), dtype=int) nodes_A = np.where(self.labels == module_idx_A)[0] nodes_B = np.where(self.labels == module_idx_B)[0] for i in nodes_A: for j in nodes_B: mask[i, j] = 1 mask[j, i] = 1 # Symmetrize return mask
[docs] def generate_data( self, effect_mask: np.ndarray, effect_size: float, n_samples_g1: int = 50, n_samples_g2: int = 50, time_points: int = 200 ): """ Generates sample correlation matrices for two groups. Group 1 is sampled from the base modular covariance. Group 2 is sampled from a modified covariance (base + effect). Parameters ---------- effect_mask : np.ndarray Effect mask matrix of shape (N, N). - 0 means "no effect" for that edge. - Non-zero values scale the effect magnitude per edge. - The sign of the value controls effect direction (positive/negative). The matrix is treated as undirected: it will be symmetrized internally and its diagonal will be set to 0. effect_size : float Magnitude of the effect (Cohen's d-like shift in correlation). Positive values increase correlation, negative values decrease it. n_samples_g1 : int Number of subjects in Group 1 (Control). n_samples_g2 : int Number of subjects in Group 2 (Test). time_points : int Length of the simulated BOLD time-series. Higher values reduce sampling noise. Returns ------- g1_data : np.ndarray (n_samples_g1, N, N) Correlation matrices for Group 1. g2_data : np.ndarray (n_samples_g2, N, N) Correlation matrices for Group 2. labels : np.ndarray (N,) Vector of module assignments for nodes. """ effect_mask = np.asarray(effect_mask, dtype=np.float64) if effect_mask.shape != (self.N, self.N): raise ValueError(f"effect_mask must have shape ({self.N}, {self.N}).") # Enforce undirected mask semantics. effect_mask = (effect_mask + effect_mask.T) / 2 np.fill_diagonal(effect_mask, 0.0) # Group 1: Base Covariance cov_g1 = self.base_cov.copy() # Group 2: Modified Covariance cov_g2 = self.base_cov.copy() # Add random noise to the effect to simulate biological variability # This prevents the effect from being a perfect constant shift effect_noise = self.rng.normal(0, self.noise_level, size=cov_g2.shape) effect_noise = (effect_noise + effect_noise.T) / 2 # Apply effect: Delta = (Target_Shift + Noise) * Mask # - For binary masks (0/1), this reduces to a constant mean shift on masked edges. # - For signed/weighted masks, this enables mixed-sign and graded effects. update_vals = (effect_size + effect_noise) * effect_mask cov_g2 += update_vals # Re-enforce SPD property after modification cov_g2 = self._enforce_spd(cov_g2) # Helper to generate subjects by sampling time-series def sample_subject_matrices(true_cov, n_subs): matrices = [] for _ in range(n_subs): # Generate time series: shape (time_points, N) ts = self.rng.multivariate_normal( mean=np.zeros(self.N), cov=true_cov, size=time_points ) # Compute Pearson correlation: shape (N, N) # rowvar=False means columns are variables (nodes) corr = np.corrcoef(ts, rowvar=False) matrices.append(corr) return np.array(matrices) g1_data = sample_subject_matrices(cov_g1, n_samples_g1) g2_data = sample_subject_matrices(cov_g2, n_samples_g2) return g1_data, g2_data, self.labels