"""Statistical utilities for ConnInfPy.
Includes:
- :func:`fisher_r_to_z` / :func:`fisher_z_to_r` — Fisher transformations.
- :func:`get_components` — connected components (BCT-style).
- :func:`binarize` — threshold-and-binarize a connectivity matrix.
- :func:`create_prior_weights` — block-density priors used by NI-TFNBS.
"""
from typing import Optional
import numpy as np
import numpy.typing as npt
import warnings
__all__ = [
"fisher_r_to_z",
"fisher_z_to_r",
"get_components",
"binarize",
"create_prior_weights",
]
[docs]
def fisher_r_to_z(r: npt.NDArray[np.float64],
handle_bounds: bool = True,
max_z: float = 5) -> npt.NDArray[np.float64]:
"""
Apply Fisher r-to-z transformation to correlation coefficients, handling boundary cases.
Parameters
----------
r : np.ndarray
Array of correlation coefficients with values in ``[-1, 1]``.
Can be any shape (scalar, 1D, 2D, ...).
handle_bounds : bool, optional
If True, replace infinite z-values (from ``r = ±1``) with finite
values. If False, allow infinities and raise a warning.
(default: True).
max_z : float
Maximum absolute z-value to use when ``handle_bounds=True``
(default: 5).
Returns
-------
z : np.ndarray
Array of z-scores with the same shape as ``r``.
Raises
------
ValueError
If any value in ``r`` is outside ``[-1, 1]``.
Warnings
--------
UserWarning
If ``r`` contains ``±1``, indicating boundary values were
encountered.
Notes
-----
The transformation is :math:`z = \\operatorname{arctanh}(r)`. For
:math:`r = \\pm 1`, :math:`z` approaches :math:`\\pm\\infty`. When
``handle_bounds=True``, these are capped at ``±max_z``.
Examples
--------
>>> r_vals = np.array([1.0, -1.0])
>>> fisher_r_to_z(r_vals, handle_bounds=True, max_z=4.0)
array([ 4., -4.])
"""
# Convert input to numpy array and ensure float type
r = np.asarray(r, dtype=np.float64)
# Check that all values are in [-1, 1]
if np.any((r < -1) | (r > 1)):
raise ValueError("Correlation coefficients must be in the range [-1, 1].")
# Apply Fisher transformation
with np.errstate(invalid='ignore'): # Suppress warnings for arctanh at ±1
z = np.arctanh(r)
# Check for boundary values (r = ±1)
bounds_mask = np.isclose(r, 1.0) | np.isclose(r, -1.0)
if np.any(bounds_mask):
warnings.warn(
"Input contains r = ±1, resulting in infinite z-values. "
f"{'Capped at ±' + str(max_z) if handle_bounds else 'Left as infinity.'}",
UserWarning
)
if handle_bounds:
# Replace inf with finite values
z = np.where(bounds_mask, np.sign(r) * max_z, z)
return z
# Inverse function (z to r) for completeness
[docs]
def fisher_z_to_r(z: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]:
"""
Convert Fisher z-scores back to correlation coefficients.
Parameters:
z (np.ndarray): Array of Fisher z-scores (any shape).
Returns:
(np.ndarray): Array of correlation coefficients in (-1, 1) with the same shape as 'z' input.
>>> z_vals = np.array([0.0, 1.0, -1.0, 2.0])
>>> fisher_z_to_r(z_vals)
array([ 0. , 0.76159416, -0.76159416, 0.96402758])
"""
z = np.asarray(z, dtype=np.float64)
return np.tanh(z)
[docs]
def get_components(A, no_depend=False):
'''
Returns the components of an undirected graph specified by the binary and
undirected adjacency matrix adj. Components and their constitutent nodes
are assigned the same index and stored in the vector, comps. The vector,
comp_sizes, contains the number of nodes beloning to each component.
Parameters
----------
A (np.ndarray): A binary undirected adjacency matrix of dimension (N, N)
no_depend (bool, optional): Does nothing, included for backwards compatibility
Returns
-------
comps (np.ndarray): Vector of component assignments for each node with dimension (N, 1)
comp_sizes (np.ndarray): Vector of component sizes with dimension (M ,1)
Notes
-----
Note: disconnected nodes will appear as components with a component
size of 1
Note: The identity of each component (i.e. its numerical value in the
result) is not guaranteed to be identical the value returned in BCT,
matlab code, although the component topology is.
Many thanks to Nick Cullen for providing this implementation
>>> A = np.eye(3)
>>> comps, sizes = get_components(A)
>>> comps
array([1, 2, 3])
>>> sizes
array([1, 1, 1])
'''
if not np.all(A == A.T): # ensure matrix is undirected
raise AssertionError('get_components can only be computed for undirected'
' matrices. If your matrix is noisy, correct it with np.around')
A = binarize(A, copy=True)
n = len(A)
np.fill_diagonal(A, 1)
edge_map = [{u, v} for u in range(n) for v in range(n) if A[u, v] == 1]
union_sets = []
for item in edge_map:
temp = []
for s in union_sets:
if not s.isdisjoint(item):
item = s.union(item)
else:
temp.append(s)
temp.append(item)
union_sets = temp
comps = np.array([i + 1 for v in range(n) for i in
range(len(union_sets)) if v in union_sets[i]])
comp_sizes = np.array([len(s) for s in union_sets])
return comps, comp_sizes
[docs]
def binarize(W, copy=True):
'''
Binarizes an input weighted connection matrix. If copy is not set, this
function will *modify W in place.*
Parameters
----------
W : NxN np.ndarray
weighted connectivity matrix
copy : bool
if True, returns a copy of the matrix. Otherwise, modifies the matrix
in place. Default value=True.
Returns
-------
W : NxN np.ndarray
binary connectivity matrix
>>> W = np.array([
... [0.0, 2.5, 0.0],
... [1.1, 0.0, 0.3],
... [0.0, 0.0, 0.0]
... ])
>>> binarize(W)
array([[0., 1., 0.],
[1., 0., 1.],
[0., 0., 0.]])
'''
if copy:
W = W.copy()
W[W != 0] = 1
return W
[docs]
def create_prior_weights(node_labels: npt.NDArray[np.int_],
target_network_id: Optional[int] = None,
boost_factor: float = 2.0) -> npt.NDArray[np.float64]:
"""
Create an (N, N) symmetric weight matrix based on node network labels.
Parameters
----------
node_labels : np.ndarray[int]
1D array of integer network IDs for each node (length N).
target_network_id : int or None, optional
If provided, only edges where both nodes belong to this network are
boosted. If None (default), all intra-network edges (nodes sharing
the same label) are boosted.
boost_factor : float, optional
Multiplicative weight for priority edges (default 2.0).
Returns
-------
weights : np.ndarray[float]
Symmetric (N, N) weight matrix with background value 1.0 and
`boost_factor` for priority intra-network edges. The diagonal is
left at 1.0.
Examples
--------
>>> labels = np.array([1, 1, 2, 2])
>>> create_prior_weights(labels, boost_factor=3.0)
array([[1., 3., 1., 1.],
[3., 1., 1., 1.],
[1., 1., 1., 3.],
[1., 1., 3., 1.]])
"""
labels = np.asarray(node_labels)
if labels.ndim != 1:
raise ValueError("`node_labels` must be a 1D array of integer labels")
N = labels.shape[0]
# Start with background weight 1.0
weights = np.ones((N, N), dtype=np.float64)
# Build mask of same-network pairs
same_network = labels[:, None] == labels[None, :]
if target_network_id is not None:
# Only boost pairs where both belong to the target network
target_mask = labels == target_network_id
same_network = np.logical_and(same_network, target_mask[:, None] & target_mask[None, :])
# Exclude diagonal (self-connections) from boosting
np.fill_diagonal(same_network, False)
# Apply boost
weights[same_network] = float(boost_factor)
return weights