Skip to content

Commit

Permalink
style: made variable names of Whittaker base more concise with specia…
Browse files Browse the repository at this point in the history
…l focus on single character names; unified variable names globally
  • Loading branch information
MothNik committed May 20, 2024
1 parent face185 commit 6885f7c
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 93 deletions.
4 changes: 2 additions & 2 deletions chemotools/smooth/_whittaker_smooth.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@
from sklearn.base import BaseEstimator, OneToOneFeatureMixin, TransformerMixin
from sklearn.utils.validation import check_is_fitted

from chemotools.utils.check_inputs import check_input, check_weights
from chemotools.utils._types import RealNumeric
from chemotools.utils._whittaker_base import (
WhittakerLikeSolver,
WhittakerSmoothLambda,
WhittakerSmoothMethods,
)
from chemotools.utils.check_inputs import check_input, check_weights

### Main Class ###

Expand Down Expand Up @@ -288,7 +288,7 @@ def transform(

# Calculate the whittaker smooth
return self._whittaker_solve(
X=X_, w=sample_weight_checked, use_same_w_for_all=use_same_w_for_all
X=X_, weights=sample_weight_checked, use_same_w_for_all=use_same_w_for_all
)[0]

def fit_transform(
Expand Down
25 changes: 15 additions & 10 deletions chemotools/utils/_whittaker_base/auto_lambda/logml.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
def get_log_marginal_likelihood_constant_term(
differences: int,
penalty_mat_log_pseudo_det: float,
w: np.ndarray,
weights: np.ndarray,
zero_weight_tol: float,
) -> float:
"""
Expand Down Expand Up @@ -64,9 +64,9 @@ def get_log_marginal_likelihood_constant_term(
# first, the constant terms of the log marginal likelihood are computed starting
# from the log pseudo-determinant of the weight matrix, i.e., the product of the
# non-zero elements of the weight vector
nonzero_w_flags = w > w.max() * zero_weight_tol
nonzero_w_flags = weights > weights.max() * zero_weight_tol
nnz_w = nonzero_w_flags.sum()
log_pseudo_det_w = np.log(w[nonzero_w_flags]).sum()
log_pseudo_det_w = np.log(weights[nonzero_w_flags]).sum()

# the constant term of the log marginal likelihood is computed
return (
Expand All @@ -82,9 +82,9 @@ def get_log_marginal_likelihood(
lam: float,
differences: int,
diff_kernel_flipped: np.ndarray,
b: np.ndarray,
b_smooth: np.ndarray,
w: Union[float, np.ndarray],
rhs_b: np.ndarray,
rhs_b_smooth: np.ndarray,
weights: Union[float, np.ndarray],
w_plus_penalty_plus_n_samples_term: float,
) -> float:
"""
Expand Down Expand Up @@ -153,13 +153,18 @@ def get_log_marginal_likelihood(
""" # noqa: E501

# first, the weighted Sum of Squared Residuals is computed ...
wrss = get_smooth_wrss(b=b, b_smooth=b_smooth, w=w)
wrss = get_smooth_wrss(
rhs_b=rhs_b,
rhs_b_smooth=rhs_b_smooth,
weights=weights,
)
# ... followed by the Penalty Sum of Squares which requires the squared forward
# finite differences of the smoothed series
# NOTE: ``np.convolve`` is used to compute the forward finite differences and
# since it flips the provided kernel, an already flipped kernel is used
pss = (
lam * np.square(np.convolve(b_smooth, diff_kernel_flipped, mode="valid")).sum()
lam
* np.square(np.convolve(rhs_b_smooth, diff_kernel_flipped, mode="valid")).sum()
)

# besides the determinant of the combined left hand side matrix has to be
Expand All @@ -174,7 +179,7 @@ def get_log_marginal_likelihood(
return -0.5 * (
wrss
+ pss
- (b.size - differences) * log_lam
- (rhs_b.size - differences) * log_lam
+ lhs_logabsdet
+ w_plus_penalty_plus_n_samples_term
)
Expand All @@ -183,7 +188,7 @@ def get_log_marginal_likelihood(
# ill-conditioned and the log marginal likelihood cannot be computed
# NOTE: since it is very hard to trigger this exception, it is not covered by the
# tests
raise RuntimeError( # pragma: no cover
raise RuntimeError( # pragma: no cover
"\nThe determinant of the combined left hand side matrix "
"W + lambda * D.T @ D is negative, indicating that the system is extremely "
"ill-conditioned.\n"
Expand Down
12 changes: 6 additions & 6 deletions chemotools/utils/_whittaker_base/auto_lambda/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@


def get_smooth_wrss(
b: np.ndarray,
b_smooth: np.ndarray,
w: Union[float, np.ndarray],
rhs_b: np.ndarray,
rhs_b_smooth: np.ndarray,
weights: Union[float, np.ndarray],
) -> float:
"""
Computes the (weighted) Sum of Squared Residuals (w)RSS between the original and
Expand All @@ -34,8 +34,8 @@ def get_smooth_wrss(
"""

# Case 1: no weights are provided
if isinstance(w, float):
return np.square(b - b_smooth).sum()
if isinstance(weights, float):
return np.square(rhs_b - rhs_b_smooth).sum()

# Case 2: weights are provided
return (w * np.square(b - b_smooth)).sum()
return (weights * np.square(rhs_b - rhs_b_smooth)).sum()
79 changes: 43 additions & 36 deletions chemotools/utils/_whittaker_base/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ def _setup_for_fit(
def _solve(
self,
lam: float,
b_weighted: np.ndarray,
w: Union[float, np.ndarray],
rhs_b_weighted: np.ndarray,
weights: Union[float, np.ndarray],
) -> tuple[np.ndarray, _models.BandedSolvers, auto._Factorization]:
"""
Internal wrapper for the solver methods to solve the linear system of equations
Expand All @@ -197,8 +197,8 @@ def _solve(
differences=self.differences_,
l_and_u=self._l_and_u_,
penalty_mat_banded=self._penalty_mat_banded_,
b_weighted=b_weighted,
w=w,
rhs_b_weighted=rhs_b_weighted,
weights=weights,
pentapy_enabled=self._pentapy_enabled_,
)

Expand All @@ -207,8 +207,8 @@ def _solve(
def _marginal_likelihood_objective(
self,
log_lam: float,
b: np.ndarray,
w: np.ndarray,
rhs_b: np.ndarray,
weights: np.ndarray,
w_plus_penalty_plus_n_samples_term: float,
) -> float:
"""
Expand All @@ -226,8 +226,8 @@ def _marginal_likelihood_objective(
# the solution of the linear system of equations is computed
b_smooth, _, factorization = self._solve(
lam=lam,
b_weighted=b * w,
w=w,
rhs_b_weighted=rhs_b * weights,
weights=weights,
)

# finally, the log marginal likelihood is computed and returned (negative since
Expand All @@ -239,18 +239,18 @@ def _marginal_likelihood_objective(
lam=lam,
differences=self.differences_,
diff_kernel_flipped=self._diff_kernel_flipped_,
b=b,
b_smooth=b_smooth,
w=w,
rhs_b=rhs_b,
rhs_b_smooth=b_smooth,
weights=weights,
w_plus_penalty_plus_n_samples_term=w_plus_penalty_plus_n_samples_term,
)

### Solver management methods ###

def _solve_single_b_fixed_lam(
self,
b: np.ndarray,
w: Union[float, np.ndarray],
rhs_b: np.ndarray,
weights: Union[float, np.ndarray],
lam: Optional[float] = None,
) -> tuple[np.ndarray, float]:
"""
Expand All @@ -271,12 +271,12 @@ def _solve_single_b_fixed_lam(
# the most efficient way around going into this method in the first place;
# in the future this might change and thus, this case is kept for now, but
# ignored for coverage
if isinstance(w, float): # pragma: no cover
if isinstance(weights, float): # pragma: no cover
return (
self._solve(
lam=lam,
b_weighted=b,
w=w,
rhs_b_weighted=rhs_b,
weights=weights,
)[0],
lam,
)
Expand All @@ -285,16 +285,16 @@ def _solve_single_b_fixed_lam(
return (
self._solve(
lam=lam,
b_weighted=b * w,
w=w,
rhs_b_weighted=rhs_b * weights,
weights=weights,
)[0],
lam,
)

def _solve_single_b_auto_lam_logml(
self,
b: np.ndarray,
w: Union[float, np.ndarray],
rhs_b: np.ndarray,
weights: Union[float, np.ndarray],
) -> tuple[np.ndarray, float]:
"""
Solves for the Whittaker-like smoother solution for a single series with an
Expand All @@ -305,7 +305,7 @@ def _solve_single_b_auto_lam_logml(

# if the weights are not provided, the log marginal likelihood cannot be
# computed - at least not in a meaningful way
if isinstance(w, (float, int)):
if isinstance(weights, (float, int)):
raise ValueError(
"\nAutomatic fitting of the penalty weight lambda by maximizing the "
"log marginal likelihood is only possible if weights are provided.\n"
Expand All @@ -316,25 +316,29 @@ def _solve_single_b_auto_lam_logml(
w_plus_n_samples_term = auto.get_log_marginal_likelihood_constant_term(
differences=self.differences_,
penalty_mat_log_pseudo_det=self._penalty_mat_log_pseudo_det_,
w=w,
weights=weights,
zero_weight_tol=self.__zero_weight_tol,
)

# the optimization of the log marginal likelihood is carried out
opt_lambda = auto.get_optimized_lambda(
fun=self._marginal_likelihood_objective,
lam=self._lam_inter_,
args=(b, w, w_plus_n_samples_term),
args=(rhs_b, weights, w_plus_n_samples_term),
)

# the optimal penalty weight lambda is returned together with the smoothed
# series
return self._solve_single_b_fixed_lam(b=b, w=w, lam=opt_lambda)
return self._solve_single_b_fixed_lam(
rhs_b=rhs_b,
weights=weights,
lam=opt_lambda,
)

def _solve_multiple_b(
self,
X: np.ndarray,
w: Optional[np.ndarray],
weights: Optional[np.ndarray],
) -> tuple[np.ndarray, np.ndarray]:
"""
Solves for the Whittaker-like smoother solution for multiple series when the
Expand All @@ -349,19 +353,19 @@ def _solve_multiple_b(
# then, the solution of the linear system of equations is computed for the
# transposed series matrix (expected right-hand side format for the solvers)
# Case 1: no weights are provided
if w is None:
if weights is None:
X_smooth, _, _ = self._solve(
lam=self._lam_inter_.fixed_lambda,
b_weighted=X.transpose(),
w=1.0,
rhs_b_weighted=X.transpose(),
weights=1.0,
)

# Case 2: weights are provided
else:
X_smooth, _, _ = self._solve(
lam=self._lam_inter_.fixed_lambda,
b_weighted=(X * w).transpose(),
w=w[0, ::],
rhs_b_weighted=(X * weights).transpose(),
weights=weights[0, ::],
)

return (
Expand All @@ -375,7 +379,7 @@ def _whittaker_solve(
self,
X: np.ndarray,
*,
w: Optional[np.ndarray] = None,
weights: Optional[np.ndarray] = None,
use_same_w_for_all: bool = False,
) -> tuple[np.ndarray, np.ndarray]:
"""
Expand All @@ -389,7 +393,7 @@ def _whittaker_solve(
----------
X : ndarray of shape (m, n)
The series to be smoothed stored as individual rows.
w : ndarray of shape(1, n) or shape(m, n) or None
weights : ndarray of shape(1, n) or shape(m, n) or None
The weights to be applied for smoothing. If only a single row is provided
and ``use_same_w_for_all`` is ``True``, the same weights can be applied
for all series in ``X``, which enhances the smoothing a lot for fixed
Expand All @@ -415,7 +419,7 @@ def _whittaker_solve(
# can be done more efficiently by leveraging LAPACK'S (not pentapy's) ability to
# perform multiple solves from the same inversion at once
if use_same_w_for_all and not self._lam_inter_.fit_auto:
return self._solve_multiple_b(X=X, w=w)
return self._solve_multiple_b(X=X, weights=weights)

# otherwise, the solution of the linear system of equations is computed for
# each series
Expand All @@ -430,8 +434,11 @@ def _whittaker_solve(
# then, the solution is computed for each series by means of a loop
X_smooth = np.empty_like(X)
lam = np.empty(shape=(X.shape[0],))
w_gen = get_weight_generator(w=w, n_series=X.shape[0])
for iter_i, (x_vect, w_vect) in enumerate(zip(X, w_gen)):
X_smooth[iter_i], lam[iter_i] = smooth_method(b=x_vect, w=w_vect)
w_gen = get_weight_generator(weights=weights, n_series=X.shape[0])
for iter_i, (x_vect, wght) in enumerate(zip(X, w_gen)):
X_smooth[iter_i], lam[iter_i] = smooth_method(
rhs_b=x_vect,
weights=wght,
)

return X_smooth, lam
16 changes: 8 additions & 8 deletions chemotools/utils/_whittaker_base/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@


def get_weight_generator(
w: Any,
weights: Any,
n_series: int,
) -> Generator[Union[float, np.ndarray], None, None]:
"""
Expand All @@ -24,25 +24,25 @@ def get_weight_generator(
"""

# if the weights are neither None nor a 2D-Array, an error is raised
if not (w is None or isinstance(w, np.ndarray)):
if not (weights is None or isinstance(weights, np.ndarray)):
raise TypeError(
f"The weights must either be None or a NumPy-2D-Array, but they are of "
f"type '{type(w)}'."
f"type '{type(weights)}'."
)

# Case 1: No weights
if w is None:
if weights is None:
for _ in range(n_series):
yield 1.0

# Case 2: 2D weights
elif w.ndim == 2:
elif weights.ndim == 2:
for idx in range(0, n_series):
yield w[idx]
yield weights[idx]

# Case 3: Invalid weights
elif w.ndim != 2:
elif weights.ndim != 2:
raise ValueError(
f"If provided as an Array, the weights must be a 2D-Array, but they are "
f"{w.ndim}-dimensional with shape {w.shape}."
f"{weights.ndim}-dimensional with shape {weights.shape}."
)
Loading

0 comments on commit 6885f7c

Please sign in to comment.