Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix amortized variational inference and TimeInState aggregator, python 3.11 support #639

Merged
merged 4 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion iotfunctions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@
import os
import pkgutil

__version__ = '8.8.0'
__version__ = '9.0.0'
__all__ = list(module for (_, module, _) in pkgutil.iter_modules([os.path.dirname(__file__)]))
66 changes: 50 additions & 16 deletions iotfunctions/anomaly.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
minmax_scale, PolynomialFeatures)
from sklearn.utils import check_array


# for Matrix Profile
import stumpy

Expand Down Expand Up @@ -97,6 +96,10 @@
Saliency_normalizer = 1
Generalized_normalizer = 1 / 300

# Do away with numba logs
numba_logger = logging.getLogger('numba')
numba_logger.setLevel(logging.INFO)

# from
# https://stackoverflow.com/questions/44790072/sliding-window-on-time-series-data
def view_as_windows1(temperature, length, step):
Expand Down Expand Up @@ -669,7 +672,7 @@ def _calc(self, df):
# make sure shape is correct
try:
df[output_item] = zScoreII
except Exception as e2:
except Exception as e2:
df[output_item] = zScoreII.reshape(-1,1)
pass

Expand Down Expand Up @@ -2769,7 +2772,7 @@ def predict(self, X):


#######################################################################################
# Variational Autoencoder
# Amortized Variational Inference
# to approximate probability distribution of targets with respect to features
#######################################################################################
# from https://www.ritchievink.com/blog/2019/09/16/variational-inference-from-scratch/
Expand Down Expand Up @@ -2832,14 +2835,15 @@ def reparameterize(self, mu, log_var):
# sample from the one-dimensional normal distribution N(mu, exp(log_var))
def forward(self, x):
mu = self.q_mu(x)
#mu_adj = torch.add(mu, torch.full(mu.shape, self.adjust_mean))
log_var = self.q_log_var(x)
return self.reparameterize(mu, log_var), mu, log_var
return self.reparameterize(mu, log_var), mu, log_var, mu + self.adjust_mean

# see 2.3 in https://arxiv.org/pdf/1312.6114.pdf
# see 2.3 in https://arxiv.org/pdf/1312.6114.pdf - *the* Kingma-Welling article
#
def elbo(self, y_pred, y, mu, log_var):
# likelihood of observing y given Variational mu and sigma - reconstruction error
loglikelihood = ll_gaussian(y, mu, log_var)
##loglikelihood = ll_gaussian(y, mu, log_var)

# Sample from p(x|z) by sampling from q(z|x), passing through decoder (y_pred)
# likelihood of observing y given Variational decoder mu and sigma - reconstruction error
Expand All @@ -2864,7 +2868,7 @@ def elbo(self, y_pred, y, mu, log_var):
return (log_qzCx + self.beta * (log_pz - log_pxCz)).mean()


# from https://arxiv.org/pdf/1509.00519.pdf
# from https://arxiv.org/pdf/1509.00519.pdf - the Burda,Grosse article
# and https://justin-tan.github.io/blog/2020/06/20/Intuitive-Importance-Weighted-ELBO-Bounds
def iwae(self, x, y, k_samples):

Expand All @@ -2874,7 +2878,7 @@ def iwae(self, x, y, k_samples):
# Encode - sample from the encoder
# Latent variables mean,variance: mu_enc, log_var_enc
# y_pred: Sample from q(z|x) by passing data through encoder and reparametrizing
y_pred, mu_enc, log_var_enc = self.forward(x)
y_pred, mu_enc, log_var_enc,_ = self.forward(x)

# there is not much of a decoder - hence we use the identity below as decoder 'stub'
dec_mu = mu_enc
Expand Down Expand Up @@ -2907,7 +2911,7 @@ def iwae(self, x, y, k_samples):
class VIAnomalyScore(SupervisedLearningTransformer):
"""
A supervised anomaly detection function.
Uses VAE based density approximation to assign an anomaly score
Uses amortised VI based density approximation to assign an anomaly score
"""
# set self.auto_train and self.delete_model
def __init__(self, features, targets, predictions=None, pred_stddev=None):
Expand Down Expand Up @@ -2950,6 +2954,24 @@ def execute(self, df):

return super().execute(df)

# requires onnx and onnxscript which are *not* part of requirements.txt
def to_onnx(self, path=None):

for key in self.active_models:
model = self.active_models[key]
print(model)
torch_input = torch.randn(32, 1) #_CALC torch.Size([177980, 1])
if path is None or len(path) == 0: path = "/tmp/amortizedVariationalInference.onnx"

#onnx_program = torch.onnx.dynamo_export(model, torch_input)
#onnx_program.save(path)
torch.onnx.export(model, torch_input,
path,
export_params=True, # store the trained parameter weights inside the model file
input_names = ['input'], # the model's input names
output_names = ['output'], # the model's output names
dynamic_axes={'input' : {0 : 'batch_size'}, # variable length axes
'output' : {0 : 'batch_size'}})

def _calc(self, df):

Expand Down Expand Up @@ -2992,12 +3014,22 @@ def _calc(self, df):
xy = np.hstack([features, targets])

# TODO: assumption is cardinality of One for features and targets !!!
ind = np.lexsort((xy[:, 1], xy[:, 0]))
ind_r = np.argsort(ind)
# for training sort the feature/target pairs w.r.t. features
# because the first linear layers weight is *directly* related to the order of the input features
if vi_model is None:
ind = np.lexsort((xy[:, 1], xy[:, 0]))
ind_r = np.argsort(ind)

# sorting doesn't matter for inference, though
else:
# dummy operation
ind = range(xy.shape[0])
ind_r = range(xy.shape[0])

self.Input[entity] = xy[ind][:, 0]

X = torch.tensor(xy[ind][:, 0].reshape(-1, 1), dtype=torch.float)
#print('_CALC', X.shape)
Y = torch.tensor(xy[ind][:, 1].reshape(-1, 1), dtype=torch.float)

# train new model if there is none and autotrain is set
Expand All @@ -3016,8 +3048,8 @@ def _calc(self, df):

for epoch in range(self.epochs):
optim.zero_grad()
y_pred, mu, log_var = vi_model(X)
loss = -vi_model.elbo(y_pred, Y, mu, log_var)
y_pred, mu, log_var, _ = vi_model(X) # ignore adjusted mean
loss = -vi_model.elbo(y_pred, Y, mu, log_var) # compute for logging purposes only
iwae = -vi_model.iwae(X, Y, self.iwae_samples) # default is to try with 10 samples
if epoch % 10 == 0:
logger.debug('Epoch: ' + str(epoch) + ', neg ELBO: ' + str(loss.item()) + ', IWAE ELBO: ' + str(iwae.item()))
Expand All @@ -3039,14 +3071,16 @@ def _calc(self, df):

with torch.no_grad():
mu_and_log_sigma = vi_model(X)
mue = mu_and_log_sigma[1]
mue = mu_and_log_sigma[3]
sigma = torch.exp(0.5 * mu_and_log_sigma[2]) + 1e-5
print('Sigma', sigma)
mu = sp.stats.norm.ppf(0.5, loc=mue, scale=sigma).reshape(-1,)
q1 = sp.stats.norm.ppf(self.quantile, loc=mue, scale=sigma).reshape(-1,)
q1 = sp.stats.norm.ppf(self.quantile, loc=0, scale=sigma).reshape(-1,)
self.mu[entity] = mu
self.quantile099[entity] = q1

df[self.predictions] = (mu[ind_r] + vi_model.adjust_mean).reshape(-1,1)
#df[self.predictions] = (mu[ind_r] + vi_model.adjust_mean).reshape(-1,1) - HERE
df[self.predictions] = mu[ind_r].reshape(-1,1)
df[self.pred_stddev] = (q1[ind_r]).reshape(-1,1)
else:
logger.debug('No VI model for entity: ' + str(entity))
Expand Down
6 changes: 3 additions & 3 deletions iotfunctions/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import re
import warnings
from collections import OrderedDict
from inspect import getargspec, signature
from inspect import getfullargspec, signature # ready for python 3.11
import hashlib # encode feature names

import numpy as np
Expand Down Expand Up @@ -406,7 +406,7 @@ def _get_arg_metadata(self, isoformat_dates=True):
"""

metadata = {}
args = (getargspec(self.__init__))[0][1:]
args = (getfullargspec(self.__init__))[0][1:] # ready for python 3.11
for a in args:
try:
metadata[a] = self.__dict__[a]
Expand Down Expand Up @@ -673,7 +673,7 @@ def _getMetadata(self, df=None, new_df=None, inputs=None, outputs=None, constant
array_inputs = []

# introspect function to get a list of argumnents
args = (getargspec(self.__init__))[0][1:]
args = (getfullargspec(self.__init__))[0][1:] # ready for python 3.11
for a in args:
if a is None:
msg = 'Cannot infer metadata for argument %s as it was initialized with a value of None. \
Expand Down
28 changes: 24 additions & 4 deletions iotfunctions/bif.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,30 +345,50 @@ def build_ui(cls):

return (inputs, outputs)

def execute(self, df):

logger.debug('Execute StateTimePrep')
df_copy = df # no copy

# set output columns to zero
df_copy[self.state_name] = 0

# group over entities
group_base = [pd.Grouper(axis=0, level=0)]

if not df_copy.empty:
df_copy = df_copy.groupby(group_base).apply(self._calc)

logger.debug('StateTimePrep done')
return df_copy

def _calc(self, df):
logger.info('Execute StateTimePrep per entity')

index_names = df.index.names
ts_name = df.index.names[1] # TODO: deal with non-standard dataframes (no timestamp)

logger.info('Source: ' + self.source + ', state_name ' + self.state_name + ', Name: ' + self.name +
logger.info('Source: ' + self.source + ', ts_name ' + ts_name + ', state_name ' + self.state_name + ', Name: ' + self.name +
', Entity: ' + df.index[0][0])

df_copy = df.reset_index()

# pair of +- seconds and regular timestamp
vstate = eval("df_copy[self.source] " + self.state_name).astype(int).values.astype(int)
vchange = eval("df_copy[self.source] " + self.state_name).astype(int).diff().values.astype(int)
#vchange = eval("df_copy[self.source] " + self.state_name).astype(int).diff().values.astype(int)
vchange = np.diff(vstate, prepend=2)

logger.info(str(vstate))
logger.info(str(vchange))
logger.info("vstate: " + str(vstate))
logger.info("vchange: " + str(vchange))

#v1 = np.roll(v1_, -1) # push the first element, NaN, to the end
# v1[-1] = 0

# first value is a NaN, replace it with special value for Aggregator
vchange[0] = 2

logger.info(df_copy[ts_name].astype(int));

#logger.debug('HERE: ' + str(v1[0:600]))

df_copy['__intermediate1__'] = vchange
Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
dill==0.3.5.1
ibm_db==3.1.3 # 3.02 results in "error in ibm_db setup command: use_2to3 is invalid."
ibm_db_sa==0.3.8
ibm-watson-machine-learning==1.0.264
ibm-watson-machine-learning==1.0.333
lxml==4.9.1
lightgbm==3.3.2
lightgbm==3.3.5
nose2==0.6.0
pandas==1.4.3
psycopg2-binary==2.9.5
Expand Down
8 changes: 6 additions & 2 deletions tests/test_vi_anomaly.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def test_vianomaly_score():
# Now run the anomaly functions as if they were executed in a pipeline
vasi = VIAnomalyScore(['speed'], ['rms_x'])
#spsi.epochs = 1 # only for testing model storage
vasi.epochs = 70 # 300 is far too high, it converges much faster
vasi.epochs = 60 #70 # 300 is far too high, it converges much faster

vasi.auto_train = True
vasi.delete_model = True
Expand All @@ -85,7 +85,7 @@ def test_vianomaly_score():
print('VIAnomaly score - inference')

#vasi = VIAnomalyScore(['speed'], ['rms_x'])
vasi.epochs = 70 # 300 is far too high, it converges much faster
vasi.epochs = 30 # 300 is far too high, it converges much faster
vasi.auto_train = True

vasi.delete_model = False
Expand All @@ -97,6 +97,10 @@ def test_vianomaly_score():
df_i = vasi.execute(df=df_i)
print('VIAnomaly inferencing done')

df_i.to_csv("iot.cvs")

vasi.to_onnx()

pass


Expand Down
Loading