Skip to content

Commit

Permalink
initial suppression of deprecation warnings for progpy internal code
Browse files Browse the repository at this point in the history
  • Loading branch information
adam-sweet committed Oct 22, 2024
1 parent f030eb8 commit 326dfc9
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 43 deletions.
30 changes: 18 additions & 12 deletions src/progpy/state_estimators/unscented_kalman_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from filterpy import kalman
from numpy import diag, array
from warnings import warn
from warnings import warn, catch_warnings, simplefilter

from progpy.state_estimators import state_estimator
from progpy.uncertain_data import MultivariateNormalDist, UncertainData
Expand Down Expand Up @@ -54,22 +54,28 @@ def __init__(self, model, x0, **kwargs):
# Saving for reduce pickling

def measure(x):
x = model.StateContainer({key: value for (key, value) in zip(x0.keys(), x)})
R_err = model.parameters['measurement_noise'].copy()
model.parameters['measurement_noise'] = dict.fromkeys(R_err, 0)
z = model.output(x)
model.parameters['measurement_noise'] = R_err
return array(list(z.values())).ravel()
# Disable deprecation warnings for internal progpy code.
with catch_warnings():
simplefilter("ignore", DeprecationWarning)
x = model.StateContainer({key: value for (key, value) in zip(x0.keys(), x)})
R_err = model.parameters['measurement_noise'].copy()
model.parameters['measurement_noise'] = dict.fromkeys(R_err, 0)
z = model.output(x)
model.parameters['measurement_noise'] = R_err
return array(list(z.values())).ravel()

if 'Q' not in self.parameters:
self.parameters['Q'] = diag([1.0e-3 for _ in x0.keys()])

def state_transition(x, dt):
x = model.StateContainer({key: value for (key, value) in zip(x0.keys(), x)})
Q_err = model.parameters['process_noise'].copy()
model.parameters['process_noise'] = dict.fromkeys(Q_err, 0)
x = model.next_state(x, self.__input, dt)
return array(list(x.values())).ravel()
# Disable deprecation warnings for internal progpy code.
with catch_warnings():
simplefilter("ignore", DeprecationWarning)
x = model.StateContainer({key: value for (key, value) in zip(x0.keys(), x)})
Q_err = model.parameters['process_noise'].copy()
model.parameters['process_noise'] = dict.fromkeys(Q_err, 0)
x = model.next_state(x, self.__input, dt)
return array(list(x.values())).ravel()

num_states = len(x0.keys())
num_measurements = model.n_outputs
Expand Down
22 changes: 14 additions & 8 deletions src/progpy/utils/containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import numpy as np
import pandas as pd
from typing import Union
from warnings import warn
from warnings import warn, catch_warnings, simplefilter


class DictLikeMatrixWrapper():
Expand Down Expand Up @@ -82,10 +82,13 @@ def __getitem__(self, key: str) -> int:
"""
get all values associated with a key, ex: all values of 'i'
"""
row = self.matrix[self._keys.index(key)] # creates list from a row of matrix
if len(row) == 1: # list contains 1 value, returns that value (non-vectorized)
return row[0]
return row # returns entire row/list (vectorized case)
# Disable deprecation warnings for internal progpy code.
with catch_warnings():
simplefilter("ignore", DeprecationWarning)
row = self.matrix[self._keys.index(key)] # creates list from a row of matrix
if len(row) == 1: # list contains 1 value, returns that value (non-vectorized)
return row[0]
return row # returns entire row/list (vectorized case)

def __setitem__(self, key: str, value: int) -> None:
"""
Expand Down Expand Up @@ -185,10 +188,13 @@ def items(self) -> zip:
"""
returns keys and values as a list of tuples (for iterating)
"""
if len(self.matrix) > 0 and len(
# Disable deprecation warnings for internal progpy code.
with catch_warnings():
simplefilter("ignore", DeprecationWarning)
if len(self.matrix) > 0 and len(
self.matrix[0]) == 1: # first row of the matrix has one value (non-vectorized case)
return zip(self._keys, np.array([value[0] for value in self.matrix]))
return zip(self._keys, self.matrix)
return zip(self._keys, np.array([value[0] for value in self.matrix]))
return zip(self._keys, self.matrix)

def update(self, other: "DictLikeMatrixWrapper") -> None:
"""
Expand Down
60 changes: 37 additions & 23 deletions src/progpy/utils/parameters.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright © 2021 United States Government as represented by the Administrator of the
# National Aeronautics and Space Administration. All Rights Reserved.

from warnings import catch_warnings, simplefilter
from collections import UserDict, abc
from copy import deepcopy
import json
Expand All @@ -9,6 +10,7 @@
from scipy.integrate import OdeSolver
import types


from progpy.utils.containers import DictLikeMatrixWrapper
from progpy.utils.next_state import next_state_functions, SciPyIntegrateNextState
from progpy.utils.noise_functions import measurement_noise_functions, process_noise_functions
Expand Down Expand Up @@ -143,18 +145,24 @@ def __setitem__(self, key: str, value: float, _copy: bool = False) -> None:
if 'process_noise_dist' in self and self['process_noise_dist'].lower() not in process_noise_functions:
raise ValueError(f"Unsupported process noise distribution {self['process_noise_dist']}")

if all(value == 0 for value in self['process_noise'].values()):
# No noise, use none function
fcn = process_noise_functions['none']
self._m.apply_process_noise = types.MethodType(fcn, self._m)
elif 'process_noise_dist' in self:
fcn = process_noise_functions[self['process_noise_dist'].lower()]
self._m.apply_process_noise = types.MethodType(fcn, self._m)
else:
# Default to gaussian
fcn = process_noise_functions['gaussian']
self._m.apply_process_noise = types.MethodType(fcn, self._m)
# Disable deprecation warnings for internal progpy code.
with catch_warnings():
simplefilter("ignore", DeprecationWarning)

if all(value == 0 for value in self['process_noise'].values()):
# No noise, use none function
fcn = process_noise_functions['none']
self._m.apply_process_noise = types.MethodType(fcn, self._m)
elif 'process_noise_dist' in self:
fcn = process_noise_functions[self['process_noise_dist'].lower()]
self._m.apply_process_noise = types.MethodType(fcn, self._m)
else:
# Default to gaussian
fcn = process_noise_functions['gaussian']
self._m.apply_process_noise = types.MethodType(fcn, self._m)

#resetwarnings()

# Make sure every key is present
# (single value already handled above)
for key in self._m.states:
Expand Down Expand Up @@ -186,18 +194,24 @@ def __setitem__(self, key: str, value: float, _copy: bool = False) -> None:
if 'measurement_noise_dist' in self and self['measurement_noise_dist'].lower() not in measurement_noise_functions:
raise ValueError(f"Unsupported measurement noise distribution {self['measurement_noise_dist']}")

if all(value == 0 for value in self['measurement_noise'].values()):
# No noise, use none function
fcn = measurement_noise_functions['none']
self._m.apply_measurement_noise = types.MethodType(fcn, self._m)
elif 'measurement_noise_dist' in self:
fcn = measurement_noise_functions[self['measurement_noise_dist'].lower()]
self._m.apply_measurement_noise = types.MethodType(fcn, self._m)
else:
# Default to gaussian
fcn = measurement_noise_functions['gaussian']
self._m.apply_measurement_noise = types.MethodType(fcn, self._m)

# Disable deprecation warnings for internal progpy code.
with catch_warnings():
simplefilter("ignore", category=DeprecationWarning)

if all(value == 0 for value in self['measurement_noise'].values()):
# No noise, use none function
fcn = measurement_noise_functions['none']
self._m.apply_measurement_noise = types.MethodType(fcn, self._m)
elif 'measurement_noise_dist' in self:
fcn = measurement_noise_functions[self['measurement_noise_dist'].lower()]
self._m.apply_measurement_noise = types.MethodType(fcn, self._m)
else:
# Default to gaussian
fcn = measurement_noise_functions['gaussian']
self._m.apply_measurement_noise = types.MethodType(fcn, self._m)

#resetwarnings()

# Make sure every key is present
# (single value already handled above)
if not all([key in self['measurement_noise'] for key in self._m.outputs]):
Expand Down

0 comments on commit 326dfc9

Please sign in to comment.