Skip to content

Commit

Permalink
Merge pull request #758 from bashtage/improve-typing
Browse files Browse the repository at this point in the history
TYP: Improve typing accuracy
  • Loading branch information
bashtage authored Nov 5, 2024
2 parents 38beda6 + cc9580a commit 8ff5a5e
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 13 deletions.
4 changes: 2 additions & 2 deletions arch/bootstrap/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1271,15 +1271,15 @@ def _resample(self) -> tuple[tuple[ArrayLike, ...], dict[str, ArrayLike]]:
pos_data: list[Union[NDArray, pd.DataFrame, pd.Series]] = []
for values in self._args:
if isinstance(values, (pd.Series, pd.DataFrame)):
assert isinstance(indices, NDArray)
assert isinstance(indices, np.ndarray)
pos_data.append(values.iloc[indices])
else:
assert isinstance(values, np.ndarray)
pos_data.append(values[indices])
named_data: dict[str, Union[NDArray, pd.DataFrame, pd.Series]] = {}
for key, values in self._kwargs.items():
if isinstance(values, (pd.Series, pd.DataFrame)):
assert isinstance(indices, NDArray)
assert isinstance(indices, np.ndarray)
named_data[key] = values.iloc[indices]
else:
assert isinstance(values, np.ndarray)
Expand Down
3 changes: 2 additions & 1 deletion arch/tests/univariate/test_mean.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from arch.typing import Literal
from arch.univariate.base import (
ARCHModel,
ARCHModelFixedResult,
ARCHModelForecast,
ARCHModelResult,
_align_forecast,
Expand Down Expand Up @@ -139,7 +140,7 @@ def simulated_data(request):
for a, b in itertools.product(simple_mean_models, analytic_volatility_processes)
],
)
def forecastable_model(request):
def forecastable_model(request) -> tuple[ARCHModelResult, ARCHModelFixedResult]:
mod: ARCHModel
vol: VolatilityProcess
mod, vol = request.param
Expand Down
16 changes: 9 additions & 7 deletions arch/unitroot/unitroot.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
diag,
diff,
empty,
float64,
full,
hstack,
inf,
Expand Down Expand Up @@ -235,7 +236,7 @@ def _autolag_ols_low_memory(
trendx.append(empty((nobs, 0)))
else:
if "tt" in trend:
tt = arange(1, nobs + 1, dtype=float)[:, None] ** 2
tt = arange(1, nobs + 1, dtype=float64)[:, None] ** 2
tt *= sqrt(5) / float(nobs) ** (5 / 2)
trendx.append(tt)
if "t" in trend:
Expand Down Expand Up @@ -402,6 +403,7 @@ def _df_select_lags(
if max_lags is None:
max_lags = int(ceil(12.0 * power(nobs / 100.0, 1 / 4.0)))
max_lags = max(min(max_lags, max_max_lags), 0)
assert isinstance(max_lags, int)
if max_lags > 119:
warnings.warn(
"The value of max_lags was not specified and has been calculated as "
Expand Down Expand Up @@ -1970,8 +1972,8 @@ def auto_bandwidth(
float
The estimated optimal bandwidth.
"""
y = ensure1d(y, "y")
if y.shape[0] < 2:
y_arr = ensure1d(y, "y")
if y_arr.shape[0] < 2:
raise ValueError("Data must contain more than one observation")

lower_kernel = kernel.lower()
Expand All @@ -1987,12 +1989,12 @@ def auto_bandwidth(
else:
raise ValueError("Unknown kernel")

n = int(4 * ((len(y) / 100) ** n_power))
n = int(4 * ((len(y_arr) / 100) ** n_power))
sig = (n + 1) * [0]

for i in range(n + 1):
a = list(y[i:])
b = list(y[: len(y) - i])
a = list(y_arr[i:])
b = list(y_arr[: len(y_arr) - i])
sig[i] = int(npsum([i * j for (i, j) in zip(a, b)]))

sigma_m1 = sig[1 : len(sig)] # sigma without the 1st element
Expand All @@ -2018,6 +2020,6 @@ def auto_bandwidth(
else: # kernel == "qs":
gamma = 1.3221 * (((s2 / s0) ** 2) ** t_power)

bandwidth = gamma * power(len(y), t_power)
bandwidth = gamma * power(len(y_arr), t_power)

return bandwidth
37 changes: 35 additions & 2 deletions arch/univariate/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from copy import deepcopy
import datetime as dt
from functools import cached_property
from typing import Any, Callable, Optional, Union, cast
from typing import Any, Callable, Optional, Union, cast, overload
import warnings

import numpy as np
Expand Down Expand Up @@ -410,6 +410,38 @@ def _fit_parameterless_model(
deepcopy(self),
)

@overload
def _loglikelihood(
self,
parameters: Float64Array,
sigma2: Float64Array,
backcast: Union[float, Float64Array],
var_bounds: Float64Array,
) -> float: # pragma: no cover
... # pragma: no cover

@overload
def _loglikelihood(
self,
parameters: Float64Array,
sigma2: Float64Array,
backcast: Union[float, Float64Array],
var_bounds: Float64Array,
individual: Literal[False] = ...,
) -> float: # pragma: no cover
... # pragma: no cover

@overload
def _loglikelihood(
self,
parameters: Float64Array,
sigma2: Float64Array,
backcast: Union[float, Float64Array],
var_bounds: Float64Array,
individual: Literal[True] = ...,
) -> Float64Array: # pragma: no cover
... # pragma: no cover

def _loglikelihood(
self,
parameters: Float64Array,
Expand Down Expand Up @@ -706,6 +738,7 @@ def fit(
if starting_values is not None:
assert sv is not None
sv = ensure1d(sv, "starting_values")
assert isinstance(sv, (np.ndarray, pd.Series))
valid = sv.shape[0] == num_params
if a.shape[0] > 0:
satisfies_constraints = a.dot(sv) - b >= 0
Expand Down Expand Up @@ -1362,7 +1395,7 @@ def _set_tight_x(axis: Axes, index: pd.Index) -> None:
ax = fig.add_subplot(2, 1, 1)
ax.plot(self._index.values, self.resid / self.conditional_volatility)
ax.set_title("Standardized Residuals")
ax.axes.xaxis.set_ticklabels([])
ax.set_xticklabels([])
_set_tight_x(ax, self._index)

ax = fig.add_subplot(2, 1, 2)
Expand Down
36 changes: 35 additions & 1 deletion arch/univariate/mean.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from collections.abc import Mapping, Sequence
import copy
from typing import TYPE_CHECKING, Callable, Optional, Union, cast
from typing import TYPE_CHECKING, Callable, Optional, Union, cast, overload

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -546,7 +546,9 @@ def _check_specification(self) -> None:
if isinstance(self._x, pd.Series):
self._x = pd.DataFrame(self._x)
elif self._x.ndim == 1:
assert isinstance(self._x, np.ndarray)
self._x = np.asarray(self._x)[:, None]
assert isinstance(self._x, (np.ndarray, pd.DataFrame))
if self._x.ndim != 2 or self._x.shape[0] != self._y.shape[0]:
raise ValueError(
"x must be nobs by n, where nobs is the same as "
Expand Down Expand Up @@ -1723,6 +1725,38 @@ def resids(
def starting_values(self) -> Float64Array:
return np.r_[super().starting_values(), 0.0]

@overload
def _loglikelihood(
self,
parameters: Float64Array,
sigma2: Float64Array,
backcast: Union[float, Float64Array],
var_bounds: Float64Array,
) -> float: # pragma: no cover
... # pragma: no cover

@overload
def _loglikelihood(
self,
parameters: Float64Array,
sigma2: Float64Array,
backcast: Union[float, Float64Array],
var_bounds: Float64Array,
individual: Literal[False] = ...,
) -> float: # pragma: no cover
... # pragma: no cover

@overload
def _loglikelihood(
self,
parameters: Float64Array,
sigma2: Float64Array,
backcast: Union[float, Float64Array],
var_bounds: Float64Array,
individual: Literal[True] = ...,
) -> Float64Array: # pragma: no cover
... # pragma: no cover

def _loglikelihood(
self,
parameters: Float64Array,
Expand Down
1 change: 1 addition & 0 deletions arch/univariate/volatility.py
Original file line number Diff line number Diff line change
Expand Up @@ -2111,6 +2111,7 @@ def compute_variance(
var_bounds: Float64Array,
) -> Float64Array:
lam = parameters[0] if self._estimate_lam else self.lam
assert isinstance(lam, float)
return ewma_recursion(lam, resids, sigma2, resids.shape[0], float(backcast))

def constraints(self) -> tuple[Float64Array, Float64Array]:
Expand Down

0 comments on commit 8ff5a5e

Please sign in to comment.