Skip to content

Implemented rng_fn to CAR/ICAR #7723

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 80 additions & 46 deletions pymc/distributions/multivariate.py
Original file line number Diff line number Diff line change
@@ -2156,56 +2156,51 @@ def make_node(self, rng, size, mu, W, alpha, tau, W_is_valid):
return super().make_node(rng, size, mu, W, alpha, tau, W_is_valid)

@classmethod
def rng_fn(cls, rng: np.random.RandomState, mu, W, alpha, tau, W_is_valid, size):
"""Sample a numeric random variate.
def rng_fn(cls, rng, mu, W, alpha, tau, W_is_valid, size=None):
"""Sample from the CAR distribution.

Implementation of algorithm from paper
Havard Rue, 2001. "Fast sampling of Gaussian Markov random fields,"
Journal of the Royal Statistical Society Series B, Royal Statistical Society,
vol. 63(2), pages 325-338. DOI: 10.1111/1467-9868.00288.
Parameters
----------
rng : numpy.random.Generator
Random number generator
mu : ndarray
Mean vector
W : ndarray
Symmetric adjacency matrix
alpha : float
Autoregression parameter (-1 < alpha < 1)
tau : float
Precision parameter (tau > 0)
W_is_valid : bool
Flag indicating whether W is a valid adjacency matrix
size : tuple, optional
Size of the samples to generate

Returns
-------
ndarray
Samples from the CAR distribution
"""
if not W_is_valid.all():
raise ValueError("W must be a valid adjacency matrix")
if not W_is_valid:
raise ValueError("W must be a symmetric adjacency matrix")

if np.any(alpha >= 1) or np.any(alpha <= -1):
raise ValueError("the domain of alpha is: -1 < alpha < 1")

# TODO: If there are batch dims, even if W was already sparse,
# we will have some expensive dense_from_sparse and sparse_from_dense
# operations that we should avoid. See https://github.com/pymc-devs/pytensor/issues/839
W = _squeeze_to_ndim(W, 2)
if not scipy.sparse.issparse(W):
W = scipy.sparse.csr_matrix(W)
tau = scipy.sparse.csr_matrix(_squeeze_to_ndim(tau, 0))
alpha = scipy.sparse.csr_matrix(_squeeze_to_ndim(alpha, 0))

s = np.asarray(W.sum(axis=0))[0]
D = scipy.sparse.diags(s)

Q = tau.multiply(D - alpha.multiply(W))

perm_array = scipy.sparse.csgraph.reverse_cuthill_mckee(Q, symmetric_mode=True)
inv_perm = np.argsort(perm_array)

Q = Q[perm_array, :][:, perm_array]

Qb = Q.diagonal()
u = 1
while np.count_nonzero(Q.diagonal(u)) > 0:
Qb = np.vstack((np.pad(Q.diagonal(u), (u, 0), constant_values=(0, 0)), Qb))
u += 1

L = scipy.linalg.cholesky_banded(Qb, lower=False)

size = tuple(size or ())
if size:
mu = np.broadcast_to(mu, (*size, mu.shape[-1]))
z = rng.normal(size=mu.shape)
samples = np.empty(z.shape)
for idx in np.ndindex(mu.shape[:-1]):
samples[idx] = scipy.linalg.cho_solve_banded((L, False), z[idx]) + mu[idx][perm_array]
samples = samples[..., inv_perm]
return samples
W = np.asarray(W)
N = W.shape[0]

# Construct the precision matrix
D = np.diag(W.sum(axis=1))
Q = tau * (D - alpha * W)

# Convert precision to covariance matrix
cov = np.linalg.inv(Q)

# Generate samples using multivariate_normal with covariance matrix
mean = np.zeros(N) if mu is None else np.asarray(mu)

return stats.multivariate_normal.rvs(mean=mean, cov=cov, size=size, random_state=rng)


car = CARRV()
@@ -2354,8 +2349,47 @@ def __call__(self, W, sigma, zero_sum_stdev, size=None, **kwargs):
return super().__call__(W, sigma, zero_sum_stdev, size=size, **kwargs)

@classmethod
def rng_fn(cls, rng, size, W, sigma, zero_sum_stdev):
raise NotImplementedError("Cannot sample from ICAR prior")
def rng_fn(cls, rng, W, sigma, zero_sum_stdev, size=None):
"""Sample from the ICAR distribution.

Parameters
----------
rng : numpy.random.Generator
Random number generator
W : ndarray
Symmetric adjacency matrix
sigma : float
Standard deviation parameter
zero_sum_stdev : float
Controls how strongly to enforce the zero-sum constraint
size : tuple, optional
Size of the samples to generate

Returns
-------
ndarray
Samples from the ICAR distribution
"""
W = np.asarray(W)
N = W.shape[0]

# Construct the precision matrix (graph Laplacian)
D = np.diag(W.sum(axis=1))
Q = D - W

# Add regularization for the zero eigenvalue based on zero_sum_stdev
zero_sum_precision = 1.0 / (zero_sum_stdev * N) ** 2
Q_reg = Q + zero_sum_precision * np.ones((N, N)) / N

# Convert precision to covariance matrix
cov = np.linalg.inv(Q_reg)

# Generate samples using multivariate_normal with covariance matrix
mean = np.zeros(N)

return sigma * stats.multivariate_normal.rvs(
mean=mean, cov=cov, size=size, random_state=rng
)


icar = ICARRV()
62 changes: 58 additions & 4 deletions tests/distributions/test_multivariate.py
Original file line number Diff line number Diff line change
@@ -2249,6 +2249,39 @@ def check_draws_match_expected(self):
assert np.all(np.abs(draw(x, random_seed=rng) - np.array([0.5, 0, 2.0])) < 0.01)


class TestCAR:
def test_car_rng_fn(self):
"""Test the random number generator for the CAR distribution."""
# Create a simple adjacency matrix for a grid
W = np.array(
[[0, 1, 0, 1], [1, 0, 1, 0], [0, 1, 0, 1], [1, 0, 1, 0]], dtype=np.int32
) # Explicitly set dtype

rng = np.random.default_rng(42)
mu = np.array([1.0, 2.0, 3.0, 4.0])
alpha = 0.7
tau = 0.5

# Generate samples - use W directly instead of tensor
car_dist = pm.CAR.dist(mu=mu, W=W, alpha=alpha, tau=tau)
car_samples = np.array([draw(car_dist, random_seed=rng) for _ in range(1000)])

# Test shape
assert car_samples.shape == (1000, 4)

# Test mean
assert np.allclose(car_samples.mean(axis=0), mu, atol=0.1)

# Test covariance structure - neighbors should be more correlated
sample_corr = np.corrcoef(car_samples.T)
for i in range(4):
for j in range(4):
if i != j:
# Neighbors should have higher correlation than non-neighbors
if W[i, j] == 1:
assert sample_corr[i, j] > 0


class TestICAR(BaseTestDistributionRandom):
pymc_dist = pm.ICAR
pymc_dist_params = {"W": np.array([[0, 1, 1], [1, 0, 1], [1, 1, 0]]), "sigma": 2}
@@ -2278,12 +2311,33 @@ def test_icar_logp(self):
).eval(), "logp inaccuracy"

def test_icar_rng_fn(self):
W = np.array([[0, 1, 0, 1], [1, 0, 1, 0], [0, 1, 0, 1], [1, 0, 1, 0]])
"""Test the random number generator for the ICAR distribution."""
# Create a simple adjacency matrix for a grid
W = np.array(
[[0, 1, 0, 1], [1, 0, 1, 0], [0, 1, 0, 1], [1, 0, 1, 0]], dtype=np.int32
) # Explicitly set dtype

rng = np.random.default_rng(42)
sigma = 2.0
zero_sum_stdev = 0.001

# Use W directly instead of converting to tensor
icar_dist = pm.ICAR.dist(W=W, sigma=sigma, zero_sum_stdev=zero_sum_stdev)
icar_samples = np.array([draw(icar_dist, random_seed=rng) for _ in range(1000)])

# Test shape
assert icar_samples.shape == (1000, 4)

# Test approximate zero-sum constraint
assert np.abs(icar_samples.sum(axis=1).mean()) < 0.1

RV = pm.ICAR.dist(W=W)
# Test variance scale - expect variance ≈ sigma^2 * (N-1)/N due to constraint
var_scale = (W.shape[0] - 1) / W.shape[0] # Degrees of freedom adjustment
expected_var = sigma**2 * var_scale
observed_var = np.var(icar_samples, axis=1).mean()

with pytest.raises(NotImplementedError, match="Cannot sample from ICAR prior"):
pm.draw(RV)
# Use a more generous tolerance to account for the zero sum constraint's impact on variance
assert np.abs(observed_var - expected_var) < 2.0

@pytest.mark.parametrize(
"W,msg",