Source code for conninfpy.topologies

"""
Topology-based synthetic connectivity scenarios.

Built on top of :class:`conninfpy.synth_datasets.ModularDatasetGenerator`,
this module adds a library of named topological effect patterns (hub,
chain, rich-club, checkerboard, gradient, etc.) plus a scenario registry
so callers can request a scenario by name.

Public API
----------
- :class:`TopologyScenario` — scenario specification
- :class:`TopologyDataset`  — generated output container
- :class:`TopologyDatasetGenerator` — the main entry point
- :func:`list_scenarios`, :func:`get_scenario`, :func:`get_scenarios` — registry

Mask generator functions (``_mask_*``, ``_scenario_mask_*``) are private;
access them indirectly via the scenario registry.

Example
-------
>>> from conninfpy.topologies import TopologyDatasetGenerator, list_scenarios
>>> list_scenarios()[:3]
['within_module_dense', 'between_modules_dense', 'hub']
>>> gen = TopologyDatasetGenerator(n_nodes=60, n_modules=4, seed=1)
>>> ds = gen.generate("chain", effect_size=0.3, n_samples=20, time_points=30)
>>> ds.group1.shape
(20, 60, 60)
"""

from __future__ import annotations

import hashlib
from dataclasses import dataclass, field, replace
from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Union

import numpy as np

from .synth_datasets import ModularDatasetGenerator
from .utils import fisher_r_to_z


__all__ = [
    "TopologyScenario",
    "TopologyDataset",
    "TopologyDatasetGenerator",
    "list_scenarios",
    "get_scenarios",
    "get_scenario",
]


ArrayF = np.ndarray


def _stable_seed(*parts: object) -> int:
    """
    Deterministic 32-bit seed from arbitrary parts (order-independent of Python hash).

    This makes per-scenario generation reproducible regardless of scenario iteration order.
    """
    h = hashlib.sha256()
    for part in parts:
        h.update(str(part).encode("utf-8"))
        h.update(b"|")
    return int.from_bytes(h.digest()[:4], "little", signed=False)


def _params_token(params: Mapping[str, object]) -> str:
    items = sorted(params.items(), key=lambda kv: kv[0])
    return ",".join(f"{k}={v}" for k, v in items)


[docs] @dataclass(frozen=True) class TopologyScenario: """ Scenario specification: how to build labels + an effect mask topology. - `mask_fn(labels, rng=..., **mask_params)` returns a symmetric (N, N) matrix where: 0 means "no effect" and non-zero values scale the effect magnitude per edge. - `labels_fn(n_nodes, n_modules)` returns `net_labels` (shape (N,)). """ name: str base_kind: str # "modular" or "uniform" mask_fn: Callable[..., ArrayF] mask_params: Dict[str, object] = field(default_factory=dict) labels_fn: Callable[[int, int], np.ndarray] = lambda n_nodes, n_modules: np.sort( np.arange(n_nodes) % n_modules )
[docs] def with_mask_params(self, **overrides: object) -> "TopologyScenario": merged = dict(self.mask_params) merged.update(overrides) return replace(self, mask_params=merged)
[docs] @dataclass(frozen=True) class TopologyDataset: """ Output of a topology simulation, ready for TFNBS/NBS pipelines. - `group1`/`group2` are arrays of shape (n_samples, N, N) with zero diagonal. - `net_labels` can be passed to cNBS/NI/FBC methods. - `effect_mask` is the signed/weighted topology mask (not multiplied by effect_size). """ group1: ArrayF group2: ArrayF net_labels: np.ndarray effect_mask: ArrayF effect_size: float scenario: TopologyScenario meta: Dict[str, object] = field(default_factory=dict)
[docs] def fisher_z(self) -> Tuple[ArrayF, ArrayF]: return fisher_r_to_z(self.group1), fisher_r_to_z(self.group2)
[docs] class TopologyDatasetGenerator: """ Reusable dataset generator for topology scenarios. Example ------- >>> gen = TopologyDatasetGenerator(n_nodes=60, n_modules=4, seed=1) >>> ds = gen.generate("chain", effect_size=0.3, n_samples=20, time_points=30) >>> ds.group1.shape (20, 60, 60) """ def __init__( self, n_nodes: int = 60, n_modules: int = 4, intra_corr: float = 0.3, inter_corr: float = 0.05, uniform_corr: float = 0.15, noise_level: float = 0.05, seed: int = 42, ) -> None: self.n_nodes = int(n_nodes) self.n_modules = int(n_modules) self.intra_corr = float(intra_corr) self.inter_corr = float(inter_corr) self.uniform_corr = float(uniform_corr) self.noise_level = float(noise_level) self.seed = int(seed)
[docs] def generate( self, scenario: Union[str, TopologyScenario], effect_size: float, *, n_samples: int = 20, n_samples_g1: Optional[int] = None, n_samples_g2: Optional[int] = None, time_points: int = 30, scenario_params: Optional[Dict[str, object]] = None, zero_diagonal: bool = True, ) -> TopologyDataset: scenario_obj = get_scenario(scenario) if isinstance(scenario, str) else scenario if scenario_params: scenario_obj = scenario_obj.with_mask_params(**scenario_params) n_samples_g1 = int(n_samples if n_samples_g1 is None else n_samples_g1) n_samples_g2 = int(n_samples_g1 if n_samples_g2 is None else n_samples_g2) time_points = int(time_points) param_token = _params_token(scenario_obj.mask_params) mask_seed = _stable_seed(self.seed, scenario_obj.name, "mask", param_token, self.n_nodes, self.n_modules) data_seed = _stable_seed(self.seed, scenario_obj.name, "data", param_token, self.n_nodes, self.n_modules, time_points) labels = scenario_obj.labels_fn(self.n_nodes, self.n_modules) rng_mask = np.random.default_rng(mask_seed) effect_mask = scenario_obj.mask_fn(labels, rng=rng_mask, **scenario_obj.mask_params) effect_mask = _ensure_symmetric_zero_diag(effect_mask) if scenario_obj.base_kind == "modular": intra_corr = self.intra_corr inter_corr = self.inter_corr elif scenario_obj.base_kind == "uniform": intra_corr = self.uniform_corr inter_corr = self.uniform_corr else: raise ValueError(f"Unknown base_kind: {scenario_obj.base_kind!r}") gen = ModularDatasetGenerator( N=self.n_nodes, n_modules=self.n_modules, intra_corr=intra_corr, inter_corr=inter_corr, noise_level=self.noise_level, seed=data_seed, ) if not np.array_equal(gen.labels, labels): gen.labels = labels gen.rng = np.random.default_rng(data_seed) gen.base_cov = gen._create_base_covariance(intra_corr, inter_corr, self.noise_level) g1, g2, net_labels = gen.generate_data( effect_mask=effect_mask, effect_size=float(effect_size), n_samples_g1=n_samples_g1, n_samples_g2=n_samples_g2, time_points=time_points, ) if zero_diagonal: diag = np.arange(self.n_nodes) g1[:, diag, diag] = 0.0 g2[:, diag, diag] = 0.0 meta = { "mask_seed": int(mask_seed), "data_seed": int(data_seed), "n_samples_g1": int(n_samples_g1), "n_samples_g2": int(n_samples_g2), "time_points": int(time_points), "intra_corr": float(intra_corr), "inter_corr": float(inter_corr), "noise_level": float(self.noise_level), } return TopologyDataset( group1=g1, group2=g2, net_labels=net_labels, effect_mask=effect_mask, effect_size=float(effect_size), scenario=scenario_obj, meta=meta, )
def _ensure_symmetric_zero_diag(matrix: ArrayF) -> ArrayF: matrix = np.asarray(matrix, dtype=np.float64) matrix = (matrix + matrix.T) / 2.0 np.fill_diagonal(matrix, 0.0) return matrix def _sorted_module_labels(n_nodes: int, n_modules: int) -> np.ndarray: """Balanced module labels, sorted so blocks appear as contiguous squares in plots.""" return np.sort(np.arange(n_nodes) % n_modules) def _imbalanced_module_labels(n_nodes: int, n_modules: int) -> np.ndarray: """ Create intentionally imbalanced module sizes. Example (n_modules=4): ~50% nodes in module 0, remainder split across others. """ if n_modules < 2: raise ValueError("n_modules must be >= 2 for imbalanced labels.") size0 = int(round(0.5 * n_nodes)) remainder = n_nodes - size0 base = remainder // (n_modules - 1) sizes = [size0] + [base] * (n_modules - 1) for i in range(remainder - base * (n_modules - 1)): sizes[1 + i] += 1 labels: List[int] = [] for idx, size in enumerate(sizes): labels.extend([idx] * size) return np.asarray(labels, dtype=int) def _module_nodes(labels: np.ndarray, module_idx: int) -> np.ndarray: return np.where(labels == module_idx)[0] def _mask_within_module(labels: np.ndarray, module_idx: int) -> ArrayF: n_nodes = labels.shape[0] mask = np.zeros((n_nodes, n_nodes), dtype=np.float64) nodes = _module_nodes(labels, module_idx) if nodes.size < 2: return mask ii, jj = np.triu_indices(nodes.size, k=1) mask[nodes[ii], nodes[jj]] = 1.0 return _ensure_symmetric_zero_diag(mask) def _mask_between_modules(labels: np.ndarray, module_a: int, module_b: int) -> ArrayF: n_nodes = labels.shape[0] mask = np.zeros((n_nodes, n_nodes), dtype=np.float64) nodes_a = _module_nodes(labels, module_a) nodes_b = _module_nodes(labels, module_b) if nodes_a.size == 0 or nodes_b.size == 0: return mask mask[np.ix_(nodes_a, nodes_b)] = 1.0 return _ensure_symmetric_zero_diag(mask) def _mask_hub(n_nodes: int, hub_node: int, n_spokes: int, rng: np.random.Generator) -> ArrayF: mask = np.zeros((n_nodes, n_nodes), dtype=np.float64) candidates = [i for i in range(n_nodes) if i != hub_node] n_spokes = min(n_spokes, len(candidates)) targets = rng.choice(candidates, size=n_spokes, replace=False) mask[hub_node, targets] = 1.0 return _ensure_symmetric_zero_diag(mask) def _mask_chain(n_nodes: int, length: int, rng: np.random.Generator) -> ArrayF: """Random simple path (edges share nodes; long thin component).""" mask = np.zeros((n_nodes, n_nodes), dtype=np.float64) if n_nodes < 2: return mask current = int(rng.integers(0, n_nodes)) visited = {current} for _ in range(length): candidates = list(set(range(n_nodes)) - visited) if not candidates: break nxt = int(rng.choice(candidates)) mask[current, nxt] = 1.0 visited.add(nxt) current = nxt return _ensure_symmetric_zero_diag(mask) def _mask_fragmented_within_module( labels: np.ndarray, module_idx: int, sparsity: float, rng: np.random.Generator, ) -> ArrayF: """Random subset of within-module edges (often not one connected component).""" full = _mask_within_module(labels, module_idx) rows, cols = np.triu_indices_from(full, k=1) edge_idx = np.where(full[rows, cols] > 0)[0] if edge_idx.size == 0: return full * 0.0 n_select = int(round(edge_idx.size * sparsity)) n_select = max(1, min(n_select, edge_idx.size)) chosen = rng.choice(edge_idx, size=n_select, replace=False) mask = np.zeros_like(full) mask[rows[chosen], cols[chosen]] = 1.0 return _ensure_symmetric_zero_diag(mask) def _mask_multi_clique_within_module( labels: np.ndarray, module_idx: int, n_clusters: int, nodes_per_cluster: int, rng: np.random.Generator, ) -> ArrayF: """Several disconnected cliques inside one module.""" n_nodes = labels.shape[0] mask = np.zeros((n_nodes, n_nodes), dtype=np.float64) nodes = _module_nodes(labels, module_idx).copy() rng.shuffle(nodes) for c in range(n_clusters): start = c * nodes_per_cluster end = start + nodes_per_cluster if end > nodes.size: break cluster = nodes[start:end] ii, jj = np.triu_indices(cluster.size, k=1) mask[cluster[ii], cluster[jj]] = 1.0 return _ensure_symmetric_zero_diag(mask) def _mask_scattered_cross_block( labels: np.ndarray, n_edges_per_block: int, rng: np.random.Generator, ) -> ArrayF: """Scattered edges across many blocks (noise-like stress pattern).""" n_nodes = labels.shape[0] n_modules = int(np.max(labels) + 1) mask = np.zeros((n_nodes, n_nodes), dtype=np.float64) for mod_i in range(n_modules): for mod_j in range(mod_i, n_modules): nodes_i = _module_nodes(labels, mod_i) nodes_j = _module_nodes(labels, mod_j) if nodes_i.size == 0 or nodes_j.size == 0: continue if mod_i == mod_j: rr, cc = np.triu_indices(nodes_i.size, k=1) pairs = np.stack([nodes_i[rr], nodes_i[cc]], axis=1) else: grid_i, grid_j = np.meshgrid(nodes_i, nodes_j, indexing="ij") pairs = np.stack([grid_i.ravel(), grid_j.ravel()], axis=1) if pairs.shape[0] == 0: continue n_pick = min(n_edges_per_block, pairs.shape[0]) pick = rng.choice(pairs.shape[0], size=n_pick, replace=False) chosen = pairs[pick] mask[chosen[:, 0], chosen[:, 1]] = 1.0 return _ensure_symmetric_zero_diag(mask) def _mask_perfect_matching_within_module(labels: np.ndarray, module_idx: int, rng: np.random.Generator) -> ArrayF: """ Many edges in one block but no shared nodes (topologically disconnected). Builds a single random perfect matching (or near-perfect if odd node count). """ n_nodes = labels.shape[0] mask = np.zeros((n_nodes, n_nodes), dtype=np.float64) nodes = _module_nodes(labels, module_idx).copy() rng.shuffle(nodes) n_pairs = nodes.size // 2 for k in range(n_pairs): i = int(nodes[2 * k]) j = int(nodes[2 * k + 1]) mask[i, j] = 1.0 return _ensure_symmetric_zero_diag(mask) def _mask_cross_block_connected_chain( labels: np.ndarray, length: int, rng: np.random.Generator, ) -> ArrayF: """ A single long topological component, but edges spread across many blocks. Construct a node path that alternates modules as much as possible. """ n_nodes = labels.shape[0] n_modules = int(np.max(labels) + 1) nodes_by_module = {m: _module_nodes(labels, m).copy() for m in range(n_modules)} for m in nodes_by_module: rng.shuffle(nodes_by_module[m]) # Round-robin node list across modules. rr_nodes: List[int] = [] ptr = {m: 0 for m in range(n_modules)} while len(rr_nodes) < n_nodes: progressed = False for m in range(n_modules): if ptr[m] < nodes_by_module[m].size: rr_nodes.append(int(nodes_by_module[m][ptr[m]])) ptr[m] += 1 progressed = True if not progressed: break if len(rr_nodes) < 2: return np.zeros((n_nodes, n_nodes), dtype=np.float64) length = min(length, len(rr_nodes) - 1) start = int(rng.integers(0, len(rr_nodes) - length)) path = rr_nodes[start : start + length + 1] mask = np.zeros((n_nodes, n_nodes), dtype=np.float64) for a, b in zip(path[:-1], path[1:]): mask[a, b] = 1.0 return _ensure_symmetric_zero_diag(mask) def _mask_rich_club( n_nodes: int, n_hubs: int, rng: np.random.Generator, n_spokes_per_hub: int = 0, ) -> ArrayF: """ Dense clique among hubs (rich-club). Optionally add hub->spoke edges. """ n_hubs = max(2, min(n_hubs, n_nodes)) hubs = rng.choice(n_nodes, size=n_hubs, replace=False) mask = np.zeros((n_nodes, n_nodes), dtype=np.float64) # Clique on hubs. for i in range(n_hubs): for j in range(i + 1, n_hubs): mask[int(hubs[i]), int(hubs[j])] = 1.0 # Optional spokes. if n_spokes_per_hub > 0: all_nodes = np.arange(n_nodes) for hub in hubs: candidates = all_nodes[all_nodes != hub] spokes = rng.choice(candidates, size=min(n_spokes_per_hub, candidates.size), replace=False) mask[int(hub), spokes] = 1.0 return _ensure_symmetric_zero_diag(mask) def _mask_two_disconnected_cliques( labels: np.ndarray, module_idx: int, clique_size: int, rng: np.random.Generator, ) -> ArrayF: """Two equal-size disconnected cliques within one module.""" n_nodes = labels.shape[0] nodes = _module_nodes(labels, module_idx).copy() rng.shuffle(nodes) if nodes.size < 2 * clique_size: clique_size = max(2, nodes.size // 2) if clique_size < 2: return np.zeros((n_nodes, n_nodes), dtype=np.float64) c1 = nodes[:clique_size] c2 = nodes[clique_size : 2 * clique_size] mask = np.zeros((n_nodes, n_nodes), dtype=np.float64) for cluster in (c1, c2): ii, jj = np.triu_indices(cluster.size, k=1) mask[cluster[ii], cluster[jj]] = 1.0 return _ensure_symmetric_zero_diag(mask) def _mask_partial_bipartite_between_modules( labels: np.ndarray, module_a: int, module_b: int, n_a: int, n_b: int, rng: np.random.Generator, ) -> ArrayF: """Subset A×B between-module complete bipartite block.""" n_nodes = labels.shape[0] nodes_a = _module_nodes(labels, module_a).copy() nodes_b = _module_nodes(labels, module_b).copy() rng.shuffle(nodes_a) rng.shuffle(nodes_b) nodes_a = nodes_a[: min(n_a, nodes_a.size)] nodes_b = nodes_b[: min(n_b, nodes_b.size)] mask = np.zeros((n_nodes, n_nodes), dtype=np.float64) if nodes_a.size == 0 or nodes_b.size == 0: return mask mask[np.ix_(nodes_a, nodes_b)] = 1.0 return _ensure_symmetric_zero_diag(mask) def _weighted_mask_gradient_chain( n_nodes: int, length: int, rng: np.random.Generator, min_weight: float = 0.2, ) -> ArrayF: """Connected component with a weight gradient (strong core -> weak tail).""" length = max(2, length) mask_binary = _mask_chain(n_nodes, length=length, rng=rng) rows, cols = np.where(np.triu(mask_binary, k=1) > 0) if rows.size == 0: return mask_binary # Assign a reproducible order of edges and a decreasing weight schedule. order = np.lexsort((cols, rows)) rows = rows[order] cols = cols[order] weights = np.linspace(1.0, float(min_weight), num=rows.size, dtype=np.float64) weighted = np.zeros((n_nodes, n_nodes), dtype=np.float64) weighted[rows, cols] = weights return _ensure_symmetric_zero_diag(weighted) def _mask_checkerboard_within_module(labels: np.ndarray, module_idx: int) -> ArrayF: """ Structured fragmented pattern inside a module (checkerboard-like). This creates many affected edges within the block, but with a regular pattern rather than a dense clique. """ n_nodes = labels.shape[0] mask = np.zeros((n_nodes, n_nodes), dtype=np.float64) nodes = _module_nodes(labels, module_idx) if nodes.size < 2: return mask # Use within-block coordinates (0..k-1) to define a simple checkerboard rule. k = nodes.size ii, jj = np.triu_indices(k, k=1) keep = ((ii + jj) % 2) == 0 mask[nodes[ii[keep]], nodes[jj[keep]]] = 1.0 return _ensure_symmetric_zero_diag(mask) def _weighted_mask_core_periphery_within_module( labels: np.ndarray, module_idx: int, n_core: int, core_weight: float = 1.0, core_to_periphery_weight: float = 0.4, periphery_weight: float = 0.1, ) -> ArrayF: """ Gradient inside one module: strong dense core + weaker periphery effects. This is useful for illustrating why threshold-free methods can outperform fixed-threshold NBS on connected but heterogeneous clusters. """ n_nodes = labels.shape[0] nodes = _module_nodes(labels, module_idx) if nodes.size < 2: return np.zeros((n_nodes, n_nodes), dtype=np.float64) n_core = max(2, min(int(n_core), nodes.size)) core = nodes[:n_core] periphery = nodes[n_core:] mask = np.zeros((n_nodes, n_nodes), dtype=np.float64) # Core clique. ii, jj = np.triu_indices(core.size, k=1) mask[core[ii], core[jj]] = core_weight # Core-to-periphery. if periphery.size > 0: mask[np.ix_(core, periphery)] = core_to_periphery_weight # Periphery-to-periphery. if periphery.size > 1: ii, jj = np.triu_indices(periphery.size, k=1) mask[periphery[ii], periphery[jj]] = periphery_weight return _ensure_symmetric_zero_diag(mask) def _summarize_masked_effect( group1_r: ArrayF, group2_r: ArrayF, group1_z: ArrayF, group2_z: ArrayF, effect_gt_r: ArrayF, t_signed: ArrayF, ) -> Dict[str, float]: """ Summarize realized effect magnitude on masked edges. Returns scalars computed on the upper triangle only (k=1). """ n_nodes = effect_gt_r.shape[0] tri = np.triu_indices(n_nodes, k=1) gt_ut = effect_gt_r[tri] mask_ut = gt_ut != 0 if not np.any(mask_ut): return { "n_edges": 0.0, "gt_r_mean": 0.0, "obs_r_mean": 0.0, "obs_z_mean": 0.0, "pooled_std_z_mean": 0.0, "cohen_d_z_mean": 0.0, "t_abs_median": 0.0, "t_abs_max": 0.0, } mean_g1_r = np.mean(group1_r, axis=0) mean_g2_r = np.mean(group2_r, axis=0) obs_diff_r = (mean_g2_r - mean_g1_r)[tri][mask_ut] mean_g1_z = np.mean(group1_z, axis=0) mean_g2_z = np.mean(group2_z, axis=0) obs_diff_z = (mean_g2_z - mean_g1_z)[tri][mask_ut] # Pooled std across subjects per edge (classical pooled variance), in Fisher-z domain. n1 = group1_z.shape[0] n2 = group2_z.shape[0] var1 = np.var(group1_z, axis=0, ddof=1) var2 = np.var(group2_z, axis=0, ddof=1) pooled_var = ((n1 - 1) * var1 + (n2 - 1) * var2) / max(1, (n1 + n2 - 2)) pooled_std = np.sqrt(pooled_var)[tri][mask_ut] gt_vals = gt_ut[mask_ut] t_vals = t_signed[tri][mask_ut] pooled_std_z_mean = float(np.mean(pooled_std)) obs_r_mean = float(np.mean(obs_diff_r)) obs_z_mean = float(np.mean(obs_diff_z)) cohen_d_z_mean = float(obs_z_mean / pooled_std_z_mean) if pooled_std_z_mean > 0 else 0.0 return { "n_edges": float(np.sum(mask_ut)), "gt_r_mean": float(np.mean(gt_vals)), "obs_r_mean": obs_r_mean, "obs_z_mean": obs_z_mean, "pooled_std_z_mean": pooled_std_z_mean, "cohen_d_z_mean": cohen_d_z_mean, "t_abs_median": float(np.median(np.abs(t_vals))), "t_abs_max": float(np.max(np.abs(t_vals))), } def _scenario_mask_within_module( labels: np.ndarray, *, rng: np.random.Generator, module_idx: int = 0 ) -> ArrayF: return _mask_within_module(labels, module_idx=module_idx) def _scenario_mask_between_modules( labels: np.ndarray, *, rng: np.random.Generator, module_a: int = 1, module_b: int = 2 ) -> ArrayF: return _mask_between_modules(labels, module_a=module_a, module_b=module_b) def _scenario_mask_hub( labels: np.ndarray, *, rng: np.random.Generator, hub_node: Optional[int] = None, n_spokes: int = 40 ) -> ArrayF: n_nodes = labels.shape[0] if hub_node is None: hub_node = n_nodes // 2 return _mask_hub(n_nodes, hub_node=int(hub_node), n_spokes=int(n_spokes), rng=rng) def _scenario_mask_chain( labels: np.ndarray, *, rng: np.random.Generator, length: int = 30 ) -> ArrayF: return _mask_chain(labels.shape[0], length=int(length), rng=rng) def _scenario_mask_fragmented_within_module( labels: np.ndarray, *, rng: np.random.Generator, module_idx: int = 0, sparsity: float = 0.3 ) -> ArrayF: return _mask_fragmented_within_module(labels, module_idx=int(module_idx), sparsity=float(sparsity), rng=rng) def _scenario_mask_multi_clique_within_module( labels: np.ndarray, *, rng: np.random.Generator, module_idx: int = 0, n_clusters: int = 3, nodes_per_cluster: int = 4, ) -> ArrayF: return _mask_multi_clique_within_module( labels, module_idx=int(module_idx), n_clusters=int(n_clusters), nodes_per_cluster=int(nodes_per_cluster), rng=rng, ) def _scenario_mask_checkerboard_within_module( labels: np.ndarray, *, rng: np.random.Generator, module_idx: int = 0 ) -> ArrayF: _ = rng return _mask_checkerboard_within_module(labels, module_idx=int(module_idx)) def _scenario_mask_scattered_cross_block( labels: np.ndarray, *, rng: np.random.Generator, n_edges_per_block: int = 6 ) -> ArrayF: return _mask_scattered_cross_block(labels, n_edges_per_block=int(n_edges_per_block), rng=rng) def _scenario_mask_within_plus_between( labels: np.ndarray, *, rng: np.random.Generator, within_module_idx: int = 0, between_module_a: int = 1, between_module_b: int = 2, ) -> ArrayF: _ = rng return _ensure_symmetric_zero_diag( _mask_within_module(labels, module_idx=int(within_module_idx)) + _mask_between_modules(labels, module_a=int(between_module_a), module_b=int(between_module_b)) ) def _scenario_mask_perfect_matching_within_module( labels: np.ndarray, *, rng: np.random.Generator, module_idx: int = 0 ) -> ArrayF: return _mask_perfect_matching_within_module(labels, module_idx=int(module_idx), rng=rng) def _scenario_mask_cross_block_connected_chain( labels: np.ndarray, *, rng: np.random.Generator, length: int = 30 ) -> ArrayF: return _mask_cross_block_connected_chain(labels, length=int(length), rng=rng) def _scenario_mask_rich_club( labels: np.ndarray, *, rng: np.random.Generator, n_hubs: int = 10, n_spokes_per_hub: int = 0 ) -> ArrayF: return _mask_rich_club( labels.shape[0], n_hubs=int(n_hubs), rng=rng, n_spokes_per_hub=int(n_spokes_per_hub) ) def _scenario_mask_two_disconnected_cliques( labels: np.ndarray, *, rng: np.random.Generator, module_idx: int = 0, clique_size: int = 6 ) -> ArrayF: return _mask_two_disconnected_cliques(labels, module_idx=int(module_idx), clique_size=int(clique_size), rng=rng) def _scenario_mask_focal_clique_inside_module( labels: np.ndarray, *, rng: np.random.Generator, module_idx: int = 0, clique_size: int = 5 ) -> ArrayF: return _mask_multi_clique_within_module( labels, module_idx=int(module_idx), n_clusters=1, nodes_per_cluster=int(clique_size), rng=rng, ) def _scenario_mask_partial_bipartite_between_modules( labels: np.ndarray, *, rng: np.random.Generator, module_a: int = 1, module_b: int = 2, n_a: int = 6, n_b: int = 6, ) -> ArrayF: return _mask_partial_bipartite_between_modules( labels, module_a=int(module_a), module_b=int(module_b), n_a=int(n_a), n_b=int(n_b), rng=rng ) def _scenario_mask_gradient_effect_chain( labels: np.ndarray, *, rng: np.random.Generator, length: int = 30, min_weight: float = 0.2 ) -> ArrayF: return _weighted_mask_gradient_chain(labels.shape[0], length=int(length), rng=rng, min_weight=float(min_weight)) def _scenario_mask_gradient_core_periphery_within_module( labels: np.ndarray, *, rng: np.random.Generator, module_idx: int = 0, n_core: int = 6, core_weight: float = 1.0, core_to_periphery_weight: float = 0.4, periphery_weight: float = 0.1, ) -> ArrayF: _ = rng return _weighted_mask_core_periphery_within_module( labels, module_idx=int(module_idx), n_core=int(n_core), core_weight=float(core_weight), core_to_periphery_weight=float(core_to_periphery_weight), periphery_weight=float(periphery_weight), ) _SCENARIOS: List[TopologyScenario] = [ TopologyScenario( name="within_module_dense", base_kind="modular", mask_fn=_scenario_mask_within_module, mask_params={"module_idx": 0}, labels_fn=_sorted_module_labels, ), TopologyScenario( name="between_modules_dense", base_kind="modular", mask_fn=_scenario_mask_between_modules, mask_params={"module_a": 1, "module_b": 2}, labels_fn=_sorted_module_labels, ), TopologyScenario( name="hub", base_kind="modular", mask_fn=_scenario_mask_hub, mask_params={"hub_node": None, "n_spokes": 40}, labels_fn=_sorted_module_labels, ), TopologyScenario( name="chain", base_kind="modular", mask_fn=_scenario_mask_chain, mask_params={"length": 30}, labels_fn=_sorted_module_labels, ), TopologyScenario( name="fragmented_within_module", base_kind="modular", mask_fn=_scenario_mask_fragmented_within_module, mask_params={"module_idx": 0, "sparsity": 0.3}, labels_fn=_sorted_module_labels, ), TopologyScenario( name="multi_cluster_within_module", base_kind="modular", mask_fn=_scenario_mask_multi_clique_within_module, mask_params={"module_idx": 0, "n_clusters": 3, "nodes_per_cluster": 4}, labels_fn=_sorted_module_labels, ), TopologyScenario( name="checkerboard_within_module", base_kind="modular", mask_fn=_scenario_mask_checkerboard_within_module, mask_params={"module_idx": 0}, labels_fn=_sorted_module_labels, ), TopologyScenario( name="scattered_cross_block", base_kind="modular", mask_fn=_scenario_mask_scattered_cross_block, mask_params={"n_edges_per_block": 6}, labels_fn=_sorted_module_labels, ), TopologyScenario( name="uniform_base_modular_effect", base_kind="uniform", mask_fn=_scenario_mask_within_module, mask_params={"module_idx": 0}, labels_fn=_sorted_module_labels, ), TopologyScenario( name="within_plus_between", base_kind="modular", mask_fn=_scenario_mask_within_plus_between, mask_params={"within_module_idx": 0, "between_module_a": 1, "between_module_b": 2}, labels_fn=_sorted_module_labels, ), TopologyScenario( name="perfect_matching_within_module", base_kind="modular", mask_fn=_scenario_mask_perfect_matching_within_module, mask_params={"module_idx": 0}, labels_fn=_sorted_module_labels, ), TopologyScenario( name="cross_block_connected_chain", base_kind="modular", mask_fn=_scenario_mask_cross_block_connected_chain, mask_params={"length": 30}, labels_fn=_sorted_module_labels, ), TopologyScenario( name="rich_club", base_kind="modular", mask_fn=_scenario_mask_rich_club, mask_params={"n_hubs": 10, "n_spokes_per_hub": 0}, labels_fn=_sorted_module_labels, ), TopologyScenario( name="two_equal_disconnected_cliques", base_kind="modular", mask_fn=_scenario_mask_two_disconnected_cliques, mask_params={"module_idx": 0, "clique_size": 6}, labels_fn=_sorted_module_labels, ), TopologyScenario( name="focal_clique_inside_module", base_kind="modular", mask_fn=_scenario_mask_focal_clique_inside_module, mask_params={"module_idx": 0, "clique_size": 5}, labels_fn=_sorted_module_labels, ), TopologyScenario( name="partial_bipartite_between_modules", base_kind="modular", mask_fn=_scenario_mask_partial_bipartite_between_modules, mask_params={"module_a": 1, "module_b": 2, "n_a": 6, "n_b": 6}, labels_fn=_sorted_module_labels, ), TopologyScenario( name="gradient_effect_chain", base_kind="modular", mask_fn=_scenario_mask_gradient_effect_chain, mask_params={"length": 30, "min_weight": 0.2}, labels_fn=_sorted_module_labels, ), TopologyScenario( name="gradient_core_periphery_within_module", base_kind="modular", mask_fn=_scenario_mask_gradient_core_periphery_within_module, mask_params={ "module_idx": 0, "n_core": 6, "core_weight": 1.0, "core_to_periphery_weight": 0.4, "periphery_weight": 0.1, }, labels_fn=_sorted_module_labels, ), TopologyScenario( name="imbalanced_modules_within_effect", base_kind="modular", mask_fn=_scenario_mask_within_module, mask_params={"module_idx": 0}, labels_fn=_imbalanced_module_labels, ), ] _SCENARIO_BY_NAME: Dict[str, TopologyScenario] = {s.name: s for s in _SCENARIOS}
[docs] def list_scenarios() -> List[str]: return [s.name for s in _SCENARIOS]
[docs] def get_scenarios() -> List[TopologyScenario]: return list(_SCENARIOS)
[docs] def get_scenario(scenario: Union[str, TopologyScenario]) -> TopologyScenario: if isinstance(scenario, TopologyScenario): return scenario if not isinstance(scenario, str): raise TypeError("scenario must be a scenario name (str) or TopologyScenario.") try: return _SCENARIO_BY_NAME[scenario] except KeyError as exc: raise ValueError(f"Unknown scenario: {scenario!r}. Available: {list_scenarios()}") from exc
def _build_scenarios() -> List[TopologyScenario]: """Backward-compatible alias (kept for older example scripts).""" return get_scenarios()