Skip to content

Commit

Permalink
Extend LinearStateSpace class (#569)
Browse files Browse the repository at this point in the history
* extend `LinearStateSpace` class

* Fix PEP 8 issues

* Update lss.py

Add conditions for finding the constant term in the transition matrix.

* Update docstring
  • Loading branch information
shizejin authored Apr 16, 2021
1 parent 6ffbe57 commit d975adb
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 45 deletions.
132 changes: 97 additions & 35 deletions quantecon/lss.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import numpy as np
from numpy.random import multivariate_normal
from scipy.linalg import solve
from .matrix_eqn import solve_discrete_lyapunov
from numba import jit
from .util import check_random_state

Expand Down Expand Up @@ -281,59 +282,61 @@ def moment_sequence(self):
mu_x = A.dot(mu_x)
Sigma_x = A.dot(Sigma_x).dot(A.T) + C.dot(C.T)

def stationary_distributions(self, max_iter=200, tol=1e-5):
def stationary_distributions(self):
r"""
Compute the moments of the stationary distributions of :math:`x_t` and
:math:`y_t` if possible. Computation is by iteration, starting from
the initial conditions self.mu_0 and self.Sigma_0
Parameters
----------
max_iter : scalar(int), optional(default=200)
The maximum number of iterations allowed
tol : scalar(float), optional(default=1e-5)
The tolerance level that one wishes to achieve
:math:`y_t` if possible. Computation is by solving the discrete
Lyapunov equation.
Returns
-------
mu_x_star : array_like(float)
mu_x : array_like(float)
An n x 1 array representing the stationary mean of :math:`x_t`
mu_y_star : array_like(float)
mu_y : array_like(float)
An k x 1 array representing the stationary mean of :math:`y_t`
Sigma_x_star : array_like(float)
Sigma_x : array_like(float)
An n x n array representing the stationary var-cov matrix
of :math:`x_t`
Sigma_y_star : array_like(float)
Sigma_y : array_like(float)
An k x k array representing the stationary var-cov matrix
of :math:`y_t`
Sigma_yx : array_like(float)
An k x n array representing the stationary cov matrix
between :math:`y_t` and :math:`x_t`.
"""
# == Initialize iteration == #
m = self.moment_sequence()
mu_x, mu_y, Sigma_x, Sigma_y = next(m)
i = 0
error = tol + 1
self.__partition()
num_const, sorted_idx = self.num_const, self.sorted_idx
A21, A22 = self.A21, self.A22
CC2 = self.C2 @ self.C2.T
n = self.n

if num_const > 0:
μ = solve(np.eye(n-num_const) - A22, A21)
else:
μ = solve(np.eye(n-num_const) - A22, np.zeros((n, 1)))
Σ = solve_discrete_lyapunov(A22, CC2, method='bartels-stewart')

# == Loop until convergence or failure == #
while error > tol:
mu_x = np.empty((n, 1))
mu_x[:num_const] = self.mu_0[sorted_idx[:num_const]]
mu_x[num_const:] = μ

if i > max_iter:
fail_message = 'Convergence failed after {} iterations'
raise ValueError(fail_message.format(max_iter))
Sigma_x = np.zeros((n, n))
Sigma_x[num_const:, num_const:] = Σ

else:
i += 1
mu_x1, mu_y1, Sigma_x1, Sigma_y1 = next(m)
error_mu = np.max(np.abs(mu_x1 - mu_x))
error_Sigma = np.max(np.abs(Sigma_x1 - Sigma_x))
error = max(error_mu, error_Sigma)
mu_x, Sigma_x = mu_x1, Sigma_x1
mu_x = self.P.T @ mu_x
Sigma_x = self.P.T @ Sigma_x @ self.P

# == Prepare return values == #
mu_x_star, Sigma_x_star = mu_x, Sigma_x
mu_y_star, Sigma_y_star = mu_y1, Sigma_y1
mu_y = self.G @ mu_x
Sigma_y = self.G @ Sigma_x @ self.G.T
if self.H is not None:
Sigma_y += self.H @ self.H.T
Sigma_yx = self.G @ Sigma_x

self.mu_x, self.mu_y = mu_x, mu_y
self.Sigma_x, self.Sigma_y, self.Sigma_yx = Sigma_x, Sigma_y, Sigma_yx

return mu_x_star, mu_y_star, Sigma_x_star, Sigma_y_star
return mu_x, mu_y, Sigma_x, Sigma_y, Sigma_yx

def geometric_sums(self, beta, x_t):
r"""
Expand Down Expand Up @@ -406,3 +409,62 @@ def impulse_response(self, j=5):
Apower = np.dot(Apower, A)

return xcoef, ycoef

def __partition(self):
r"""
Reorder the states by shifting the constant terms to the
top of the state vector. Then partition the linear state
space model following Appendix C in RMT Ch2 such that the
A22 matrix associated with non-constant states have eigenvalues
all strictly smaller than 1.
.. math::
\left[\begin{array}{c}
const\\
x_{2,t+1}
\end{array}\right]=\left[\begin{array}{cc}
I & 0\\
A_{21} & A_{22}
\end{array}\right]\left[\begin{array}{c}
1\\
x_{2,t}
\end{array}\right]+\left[\begin{array}{c}
0\\
C_{2}
\end{array}\right]w_{t+1}
Returns
-------
A22 : array_like(float)
Lower right part of the reordered matrix A.
A21 : array_like(float)
Lower left part of the reordered matrix A.
"""
A, C = self.A, self.C
n = self.n

sorted_idx = []
A_diag = np.diag(A)
num_const = 0
for idx in range(n):
if (A_diag[idx] == 1) and (C[idx, :] == 0).all() \
and np.linalg.norm(A[idx, :]) == 1:
sorted_idx.insert(0, idx)
num_const += 1
else:
sorted_idx.append(idx)
self.num_const = num_const
self.sorted_idx = sorted_idx

P = np.zeros((n, n))
P[range(n), sorted_idx] = 1

sorted_A = P @ A @ P.T
sorted_C = P @ C
A21 = sorted_A[num_const:, :num_const]
A22 = sorted_A[num_const:, num_const:]
C2 = sorted_C[num_const:, :]

self.P, self.A21, self.A22, self.C2 = P, A21, A22, C2

return A21, A22
43 changes: 33 additions & 10 deletions quantecon/tests/test_lss.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,58 @@
class TestLinearStateSpace(unittest.TestCase):

def setUp(self):
# Initial Values
# Example 1
A = .95
C = .05
G = 1.
mu_0 = .75

self.ss = LinearStateSpace(A, C, G, mu_0=mu_0)
self.ss1 = LinearStateSpace(A, C, G, mu_0=mu_0)

# Example 2
ρ1 = 0.5
ρ2 = 0.3
α = 0.5

A = np.array([[ρ1, ρ2, α], [1, 0, 0], [0, 0, 1]])
C = np.array([[1], [0], [0]])
G = np.array([[1, 0, 0]])
mu_0 = [0.5, 0.5, 1]

self.ss2 = LinearStateSpace(A, C, G, mu_0=mu_0)

def tearDown(self):
del self.ss
del self.ss1
del self.ss2

def test_stationarity(self):
vals = self.ss.stationary_distributions(max_iter=1000, tol=1e-9)
ssmux, ssmuy, sssigx, sssigy = vals
vals = self.ss1.stationary_distributions()
ssmux, ssmuy, sssigx, sssigy, sssigyx = vals

self.assertTrue(abs(ssmux - ssmuy) < 2e-8)
self.assertTrue(abs(sssigx - sssigy) < 2e-8)
self.assertTrue(abs(ssmux) < 2e-8)
self.assertTrue(abs(sssigx - self.ss.C**2/(1 - self.ss.A**2)) < 2e-8)
self.assertTrue(abs(sssigx - self.ss1.C**2/(1 - self.ss1.A**2)) < 2e-8)
self.assertTrue(abs(sssigyx - self.ss1.G @ sssigx) < 2e-8)

vals = self.ss2.stationary_distributions()
ssmux, ssmuy, sssigx, sssigy, sssigyx = vals

assert_allclose(ssmux.flatten(), np.array([2.5, 2.5, 1]))
assert_allclose(ssmuy.flatten(), np.array([2.5]))
assert_allclose(sssigx, self.ss2.A @ sssigx @ self.ss2.A.T + self.ss2.C @ self.ss2.C.T)
assert_allclose(sssigy, self.ss2.G @ sssigx @ self.ss2.G.T)
assert_allclose(sssigyx, self.ss2.G @ sssigx)

def test_simulate(self):
ss = self.ss
ss = self.ss1

sim = ss.simulate(ts_length=250)
for arr in sim:
self.assertTrue(len(arr[0])==250)

def test_simulate_with_seed(self):
ss = self.ss
ss = self.ss1

xval, yval = ss.simulate(ts_length=5, random_state=5)
expected_output = np.array([0.75 , 0.73456137, 0.6812898, 0.76876387,
Expand All @@ -51,14 +74,14 @@ def test_simulate_with_seed(self):
assert_allclose(yval[0], expected_output)

def test_replicate(self):
xval, yval = self.ss.replicate(T=100, num_reps=5000)
xval, yval = self.ss1.replicate(T=100, num_reps=5000)

assert_allclose(xval, yval)
self.assertEqual(xval.size, 5000)
self.assertLessEqual(abs(np.mean(xval)), .05)

def test_replicate_with_seed(self):
xval, yval = self.ss.replicate(T=100, num_reps=5, random_state=5)
xval, yval = self.ss1.replicate(T=100, num_reps=5, random_state=5)
expected_output = np.array([0.06871204, 0.06937119, -0.1478022,
0.23841252, -0.06823762])

Expand Down

0 comments on commit d975adb

Please sign in to comment.