From 6fe74defdd6d38021da64f3be8087f5f3751eb85 Mon Sep 17 00:00:00 2001 From: marekwadinger Date: Wed, 26 Jul 2023 10:46:21 +0200 Subject: [PATCH 01/16] Implement MultivariateGaussian --- docs/releases/unreleased.md | 4 + river/proba/__init__.py | 3 +- river/proba/gaussian.py | 207 +++++++++++++++++++++++++++++++++++- 3 files changed, 212 insertions(+), 2 deletions(-) diff --git a/docs/releases/unreleased.md b/docs/releases/unreleased.md index f4c8c9eea7..233a0399e5 100644 --- a/docs/releases/unreleased.md +++ b/docs/releases/unreleased.md @@ -26,6 +26,10 @@ Calling `learn_one` in a pipeline will now update each part of the pipeline in t - Added `preprocessing.OrdinalEncoder`, to map string features to integers. +## proba + +- Added `proba.MultivariateGaussian`. + ## utils - Added `utils.random.exponential` to retrieve random samples following an exponential distribution. diff --git a/river/proba/__init__.py b/river/proba/__init__.py index a7a3263bce..49536c626e 100644 --- a/river/proba/__init__.py +++ b/river/proba/__init__.py @@ -4,6 +4,7 @@ from . import base from .beta import Beta from .gaussian import Gaussian +from .gaussian import MultivariateGaussian from .multinomial import Multinomial -__all__ = ["base", "Beta", "Gaussian", "Multinomial"] +__all__ = ["base", "Beta", "Gaussian", "Multinomial", "MultivariateGaussian"] diff --git a/river/proba/gaussian.py b/river/proba/gaussian.py index 632d5e1888..af32985a74 100644 --- a/river/proba/gaussian.py +++ b/river/proba/gaussian.py @@ -1,11 +1,16 @@ from __future__ import annotations import math +import warnings +import numpy as np +from scipy.stats import multivariate_normal + +from river import covariance from river import stats from river.proba import base -__all__ = ["Gaussian"] +__all__ = ["Gaussian", "MultivariateGaussian"] class Gaussian(base.ContinuousDistribution): @@ -90,3 +95,203 @@ def sample(self): @property def mode(self): return self.mu + + +class MultivariateGaussian(base.ContinuousDistribution): + """Multivariate normal distribution with parameters mu and var. + + Parameters + ---------- + seed + Random number generator seed for reproducibility. + + Examples + -------- + >>> import numpy as np + >>> import pandas as pd + + >>> np.random.seed(42) + >>> X = pd.DataFrame(np.random.random((8, 3)), + ... columns=["red", "green", "blue"]) + >>> X + red green blue + 0 0.374540 0.950714 0.731994 + 1 0.598658 0.156019 0.155995 + 2 0.058084 0.866176 0.601115 + 3 0.708073 0.020584 0.969910 + 4 0.832443 0.212339 0.181825 + 5 0.183405 0.304242 0.524756 + 6 0.431945 0.291229 0.611853 + 7 0.139494 0.292145 0.366362 + + >>> p = MultivariateGaussian() + >>> p.n_samples + 0.0 + >>> for x in X.to_dict(orient="records"): + ... p = p.update(x) + >>> p._var + blue green red + blue 0.076 0.020 -0.010 + green 0.020 0.113 -0.053 + red -0.010 -0.053 0.079 + + Weighted samples are currently not implemented. Updates with default w = 1. + >>> p = p.update(x, w=2) + + Retrieving current state in nice format is simple + >>> p + 𝒩(μ=(0.385, 0.376, 0.501), + σ^2=([0.069 0.019 -0.004] + [0.019 0.100 -0.044] + [-0.004 -0.044 0.078])) + >>> p.n_samples + 9.0 + + To retrieve pdf and cdf + >>> p(x) # doctest: +ELLIPSIS + 1.70399123552737... + >>> p.cdf(x) # doctest: +ELLIPSIS + 0.01421620021072799... + + MultivariateGaussian works with Rolling + >>> from river import utils + >>> p = utils.Rolling(MultivariateGaussian(), window_size=5) + >>> for x in X.to_dict(orient="records"): + ... p = p.update(x) + >>> p._var + blue green red + blue 0.087 -0.023 0.008 + green -0.023 0.014 -0.025 + red 0.008 -0.025 0.095 + + MultivariateGaussian works with TimeRolling + >>> from datetime import datetime as dt, timedelta as td + >>> X.index = [dt(2023, 3, 28, 0, 0, 0) + td(seconds=x) for x in range(8)] + >>> p = utils.TimeRolling(MultivariateGaussian(), period=td(seconds=5)) + >>> for t, x in X.iterrows(): + ... p = p.update(x.to_dict(), t=t) + >>> p._var + blue green red + blue 0.087 -0.023 0.008 + green -0.023 0.014 -0.025 + red 0.008 -0.025 0.095 + + Weighted samples are currently not implemented. Updates with default w = 1. + >>> p = p.update(x.to_dict(), t=t + td(seconds=1), **{"w":2}) + + >>> from river.proba import Gaussian + >>> p = MultivariateGaussian() + >>> p_ = Gaussian() + >>> for t, x in X.iterrows(): + ... p = p.update(x.to_dict()) + ... p_ = p_.update(x['blue']) + >>> p.sigma[0][0] == p_.sigma + True + + Initiation of class from state is currently not implemented + >>> p = MultivariateGaussian()._from_state( + ... 0, 0, 0, 0) # doctest: +IGNORE_EXCEPTION_DETAIL + Traceback (most recent call last): + ... + NotImplementedError: from_state_ not implemented yet. + + """ # noqa: W291 + + def __init__(self, seed=None): + super().__init__(seed) + self._var = covariance.EmpiricalCovariance(ddof=1) + self.feature_names_in_ = None + + @classmethod + def _from_state(cls, n, m, sig, ddof): + raise NotImplementedError("_from_state not implemented yet.") + + @property + def n_samples(self): + if not self._var.matrix: + return 0.0 + else: + return list(self._var.matrix.values())[-1].mean.n + + @property + def mu(self): + """The mean value of the distribution.""" + return list( + { + key1: values.mean.get() + for (key1, key2), values in self._var.matrix.items() + if key1 == key2 + }.values() + ) + + @property + def var(self): + """The variance of the distribution.""" + variables = sorted(list({var for cov in self._var.matrix.keys() for var in cov})) + # Initialize the covariance matrix array + cov_array = np.zeros((len(variables), len(variables))) + + # Fill in the covariance matrix array + for i in range(len(variables)): + for j in range(i, len(variables)): + if i == j: + # Fill in the diagonal with variances + cov_array[i, j] = self._var[(variables[i], variables[j])].get() + else: + # Fill in the off-diagonal with covariances + cov_array[i, j] = self._var[(variables[i], variables[j])].get() + cov_array[j, i] = self._var[(variables[i], variables[j])].get() + return cov_array + + @property + def sigma(self): + """The standard deviation of the distribution.""" + cov_array = self.var + return [[x**0.5 if x > 0 else float("nan") for x in row] for row in cov_array] + + def __repr__(self): + mu_str = ", ".join(f"{m:.3f}" for m in self.mu) + var_str = "\n ".join("[" + " ".join(f"{s:.3f}" for s in row) + "]" for row in self.var) + return f"𝒩(μ=({mu_str}),\nσ^2=({var_str}))" + + def update(self, x, w=1.0): + if w != 1.0: + warnings.warn("Weights not implemented yet.", RuntimeWarning) + self._var.update(x) + return self + + def revert(self, x, w=1.0): + if w != 1.0: + # TODO: find out why not called during TimeRolling usage test + warnings.warn("Weights not implemented yet.", RuntimeWarning) # pragma: no cover + self._var.revert(x) + return self + + def __call__(self, x): + x = list(x.values()) + var = self.var + if var is not None: + try: + return multivariate_normal(self.mu, var).pdf(x) + # TODO: validate occurence of ValueError + # The input matrix must be symmetric positive semidefinite. + except ValueError: # pragma: no cover + return 0.0 + # TODO: validate occurence of OverflowError + except OverflowError: # pragma: no cover + return 0.0 + return 0.0 # pragma: no cover + + def cdf(self, x): + x = list(x.values()) + return multivariate_normal(self.mu, self.var, allow_singular=True).cdf(x) + + def sample(self): + return multivariate_normal( + self.mu, + self.var, + ).rvs() + + @property + def mode(self): + return self.mu From 470eca788fad9e95e05b36af1611ce018b7e1689 Mon Sep 17 00:00:00 2001 From: marekwadinger Date: Wed, 26 Jul 2023 11:03:15 +0200 Subject: [PATCH 02/16] Updated GaussianScorer API to work with Pipelines --- river/anomaly/gaussian.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/river/anomaly/gaussian.py b/river/anomaly/gaussian.py index fb9ccfd0da..1044fe7bc1 100644 --- a/river/anomaly/gaussian.py +++ b/river/anomaly/gaussian.py @@ -57,11 +57,11 @@ def __init__(self, window_size=None, grace_period=100): ) self.grace_period = grace_period - def learn_one(self, _, y): + def learn_one(self, x, y): self.gaussian.update(y) return self - def score_one(self, _, y): + def score_one(self, x, y): if self.gaussian.n_samples < self.grace_period: return 0 return 2 * abs(self.gaussian.cdf(y) - 0.5) From 33a2b3ef5aa77b2adc276cae2961480b708d0066 Mon Sep 17 00:00:00 2001 From: Marek Wadinger <50716630+MarekWadinger@users.noreply.github.com> Date: Tue, 1 Aug 2023 08:26:02 +0200 Subject: [PATCH 03/16] Apply suggestions from code review Co-authored-by: Max Halford --- river/proba/gaussian.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/river/proba/gaussian.py b/river/proba/gaussian.py index af32985a74..7201f45f8d 100644 --- a/river/proba/gaussian.py +++ b/river/proba/gaussian.py @@ -127,6 +127,7 @@ class MultivariateGaussian(base.ContinuousDistribution): >>> p = MultivariateGaussian() >>> p.n_samples 0.0 + >>> for x in X.to_dict(orient="records"): ... p = p.update(x) >>> p._var @@ -144,8 +145,6 @@ class MultivariateGaussian(base.ContinuousDistribution): σ^2=([0.069 0.019 -0.004] [0.019 0.100 -0.044] [-0.004 -0.044 0.078])) - >>> p.n_samples - 9.0 To retrieve pdf and cdf >>> p(x) # doctest: +ELLIPSIS @@ -153,7 +152,8 @@ class MultivariateGaussian(base.ContinuousDistribution): >>> p.cdf(x) # doctest: +ELLIPSIS 0.01421620021072799... - MultivariateGaussian works with Rolling + MultivariateGaussian works with `utils.Rolling` + >>> from river import utils >>> p = utils.Rolling(MultivariateGaussian(), window_size=5) >>> for x in X.to_dict(orient="records"): @@ -164,7 +164,8 @@ class MultivariateGaussian(base.ContinuousDistribution): green -0.023 0.014 -0.025 red 0.008 -0.025 0.095 - MultivariateGaussian works with TimeRolling + MultivariateGaussian works with `utils.TimeRolling` + >>> from datetime import datetime as dt, timedelta as td >>> X.index = [dt(2023, 3, 28, 0, 0, 0) + td(seconds=x) for x in range(8)] >>> p = utils.TimeRolling(MultivariateGaussian(), period=td(seconds=5)) @@ -268,6 +269,7 @@ def revert(self, x, w=1.0): return self def __call__(self, x): + """PDF(x) method.""" x = list(x.values()) var = self.var if var is not None: @@ -287,10 +289,10 @@ def cdf(self, x): return multivariate_normal(self.mu, self.var, allow_singular=True).cdf(x) def sample(self): - return multivariate_normal( + return float(multivariate_normal( self.mu, self.var, - ).rvs() + ).rvs()) @property def mode(self): From 5d0e9a78fd3a5fe8eaadfbe4fd387acd19c06f73 Mon Sep 17 00:00:00 2001 From: marekwadinger Date: Tue, 1 Aug 2023 08:44:38 +0200 Subject: [PATCH 04/16] UPDATE: MultivariateGaussian sample returns float --- river/proba/gaussian.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/river/proba/gaussian.py b/river/proba/gaussian.py index 7201f45f8d..a9a5155cec 100644 --- a/river/proba/gaussian.py +++ b/river/proba/gaussian.py @@ -289,10 +289,10 @@ def cdf(self, x): return multivariate_normal(self.mu, self.var, allow_singular=True).cdf(x) def sample(self): - return float(multivariate_normal( + return multivariate_normal( self.mu, self.var, - ).rvs()) + ).rvs().tolist() @property def mode(self): From 1622fe55c34a594cfd0210bba25ce3e9079e5e6b Mon Sep 17 00:00:00 2001 From: marekwadinger Date: Tue, 1 Aug 2023 08:51:35 +0200 Subject: [PATCH 05/16] Remove private attributes from doctest --- river/proba/gaussian.py | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/river/proba/gaussian.py b/river/proba/gaussian.py index a9a5155cec..d94c4472f8 100644 --- a/river/proba/gaussian.py +++ b/river/proba/gaussian.py @@ -130,11 +130,10 @@ class MultivariateGaussian(base.ContinuousDistribution): >>> for x in X.to_dict(orient="records"): ... p = p.update(x) - >>> p._var - blue green red - blue 0.076 0.020 -0.010 - green 0.020 0.113 -0.053 - red -0.010 -0.053 0.079 + >>> p.var + array([[ 0.07611911, 0.02029152, -0.01012815], + [ 0.02029152, 0.11293148, -0.05326768], + [-0.01012815, -0.05326768, 0.0789612 ]]) Weighted samples are currently not implemented. Updates with default w = 1. >>> p = p.update(x, w=2) @@ -158,11 +157,10 @@ class MultivariateGaussian(base.ContinuousDistribution): >>> p = utils.Rolling(MultivariateGaussian(), window_size=5) >>> for x in X.to_dict(orient="records"): ... p = p.update(x) - >>> p._var - blue green red - blue 0.087 -0.023 0.008 - green -0.023 0.014 -0.025 - red 0.008 -0.025 0.095 + >>> p.var + array([[ 0.08706173, -0.02287347, 0.00776493], + [-0.02287347, 0.01427901, -0.02518146], + [ 0.00776493, -0.02518146, 0.09506639]]) MultivariateGaussian works with `utils.TimeRolling` @@ -171,11 +169,10 @@ class MultivariateGaussian(base.ContinuousDistribution): >>> p = utils.TimeRolling(MultivariateGaussian(), period=td(seconds=5)) >>> for t, x in X.iterrows(): ... p = p.update(x.to_dict(), t=t) - >>> p._var - blue green red - blue 0.087 -0.023 0.008 - green -0.023 0.014 -0.025 - red 0.008 -0.025 0.095 + >>> p.var + array([[ 0.08706173, -0.02287347, 0.00776493], + [-0.02287347, 0.01427901, -0.02518146], + [ 0.00776493, -0.02518146, 0.09506639]]) Weighted samples are currently not implemented. Updates with default w = 1. >>> p = p.update(x.to_dict(), t=t + td(seconds=1), **{"w":2}) From 02c67d6c64e0ebf5529c13f59282a36e0741295d Mon Sep 17 00:00:00 2001 From: marekwadinger Date: Tue, 1 Aug 2023 09:04:47 +0200 Subject: [PATCH 06/16] UPDATE: make representation nicer --- river/proba/gaussian.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/river/proba/gaussian.py b/river/proba/gaussian.py index d94c4472f8..64193d0343 100644 --- a/river/proba/gaussian.py +++ b/river/proba/gaussian.py @@ -249,8 +249,10 @@ def sigma(self): def __repr__(self): mu_str = ", ".join(f"{m:.3f}" for m in self.mu) - var_str = "\n ".join("[" + " ".join(f"{s:.3f}" for s in row) + "]" for row in self.var) - return f"𝒩(μ=({mu_str}),\nσ^2=({var_str}))" + var_str = "\n".join( + " [" + ", ".join(f"{s:.3f}" for s in row) + "]" + for row in self.var) + return f"𝒩(\n μ=({mu_str}),\n σ^2=(\n{var_str}\n )\n)" def update(self, x, w=1.0): if w != 1.0: From 693808ed4ae5a5673aba3f857b87e1b74a02cc18 Mon Sep 17 00:00:00 2001 From: marekwadinger Date: Tue, 1 Aug 2023 09:05:32 +0200 Subject: [PATCH 07/16] UPDATE: doctest for representation --- river/proba/gaussian.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/river/proba/gaussian.py b/river/proba/gaussian.py index 64193d0343..1a8757504a 100644 --- a/river/proba/gaussian.py +++ b/river/proba/gaussian.py @@ -140,10 +140,14 @@ class MultivariateGaussian(base.ContinuousDistribution): Retrieving current state in nice format is simple >>> p - 𝒩(μ=(0.385, 0.376, 0.501), - σ^2=([0.069 0.019 -0.004] - [0.019 0.100 -0.044] - [-0.004 -0.044 0.078])) + 𝒩( + μ=(0.385, 0.376, 0.501), + σ^2=( + [0.069, 0.019, -0.004] + [0.019, 0.100, -0.044] + [-0.004, -0.044, 0.078] + ) + ) To retrieve pdf and cdf >>> p(x) # doctest: +ELLIPSIS From abb911f0b3097a8e57929c1cae07f8e995b1d4e4 Mon Sep 17 00:00:00 2001 From: marekwadinger Date: Tue, 1 Aug 2023 09:17:16 +0200 Subject: [PATCH 08/16] Replaced not implemented with TODOs --- river/proba/gaussian.py | 37 +++++++++---------------------------- 1 file changed, 9 insertions(+), 28 deletions(-) diff --git a/river/proba/gaussian.py b/river/proba/gaussian.py index 1a8757504a..d352c6f1c9 100644 --- a/river/proba/gaussian.py +++ b/river/proba/gaussian.py @@ -135,25 +135,22 @@ class MultivariateGaussian(base.ContinuousDistribution): [ 0.02029152, 0.11293148, -0.05326768], [-0.01012815, -0.05326768, 0.0789612 ]]) - Weighted samples are currently not implemented. Updates with default w = 1. - >>> p = p.update(x, w=2) - Retrieving current state in nice format is simple >>> p 𝒩( - μ=(0.385, 0.376, 0.501), + μ=(0.416, 0.387, 0.518), σ^2=( - [0.069, 0.019, -0.004] - [0.019, 0.100, -0.044] - [-0.004, -0.044, 0.078] + [0.076, 0.020, -0.010] + [0.020, 0.113, -0.053] + [-0.010, -0.053, 0.079] ) ) To retrieve pdf and cdf >>> p(x) # doctest: +ELLIPSIS - 1.70399123552737... + 1.26921953490694... >>> p.cdf(x) # doctest: +ELLIPSIS - 0.01421620021072799... + 0.00787141517849810... MultivariateGaussian works with `utils.Rolling` @@ -178,9 +175,6 @@ class MultivariateGaussian(base.ContinuousDistribution): [-0.02287347, 0.01427901, -0.02518146], [ 0.00776493, -0.02518146, 0.09506639]]) - Weighted samples are currently not implemented. Updates with default w = 1. - >>> p = p.update(x.to_dict(), t=t + td(seconds=1), **{"w":2}) - >>> from river.proba import Gaussian >>> p = MultivariateGaussian() >>> p_ = Gaussian() @@ -189,14 +183,6 @@ class MultivariateGaussian(base.ContinuousDistribution): ... p_ = p_.update(x['blue']) >>> p.sigma[0][0] == p_.sigma True - - Initiation of class from state is currently not implemented - >>> p = MultivariateGaussian()._from_state( - ... 0, 0, 0, 0) # doctest: +IGNORE_EXCEPTION_DETAIL - Traceback (most recent call last): - ... - NotImplementedError: from_state_ not implemented yet. - """ # noqa: W291 def __init__(self, seed=None): @@ -204,9 +190,7 @@ def __init__(self, seed=None): self._var = covariance.EmpiricalCovariance(ddof=1) self.feature_names_in_ = None - @classmethod - def _from_state(cls, n, m, sig, ddof): - raise NotImplementedError("_from_state not implemented yet.") + # TODO: add method _from_state to initialize model (for warm starting) @property def n_samples(self): @@ -259,15 +243,12 @@ def __repr__(self): return f"𝒩(\n μ=({mu_str}),\n σ^2=(\n{var_str}\n )\n)" def update(self, x, w=1.0): - if w != 1.0: - warnings.warn("Weights not implemented yet.", RuntimeWarning) + # TODO: add support for weigthed samples self._var.update(x) return self def revert(self, x, w=1.0): - if w != 1.0: - # TODO: find out why not called during TimeRolling usage test - warnings.warn("Weights not implemented yet.", RuntimeWarning) # pragma: no cover + # TODO: add support for weigthed samples self._var.revert(x) return self From 7ae847f6b7b73f79a0ef4aaef3892335b5489baf Mon Sep 17 00:00:00 2001 From: marekwadinger Date: Tue, 1 Aug 2023 09:22:13 +0200 Subject: [PATCH 09/16] Improve coverage and doctest readability --- river/proba/gaussian.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/river/proba/gaussian.py b/river/proba/gaussian.py index d352c6f1c9..2bd2c943bf 100644 --- a/river/proba/gaussian.py +++ b/river/proba/gaussian.py @@ -146,12 +146,22 @@ class MultivariateGaussian(base.ContinuousDistribution): ) ) + To retrieve number of samples and mode + >>> p.n_samples + 8.0 + >>> p.mode # doctest: +ELLIPSIS + [0.415..., 0.386..., 0.517...] + To retrieve pdf and cdf >>> p(x) # doctest: +ELLIPSIS 1.26921953490694... >>> p.cdf(x) # doctest: +ELLIPSIS 0.00787141517849810... + To sample data from distribution + >>> p.sample() # doctest: +ELLIPSIS + [0.203..., -0.0532..., 0.840...] + MultivariateGaussian works with `utils.Rolling` >>> from river import utils @@ -175,6 +185,8 @@ class MultivariateGaussian(base.ContinuousDistribution): [-0.02287347, 0.01427901, -0.02518146], [ 0.00776493, -0.02518146, 0.09506639]]) + Singlevariate usage is consistent with Gaussian + >>> from river.proba import Gaussian >>> p = MultivariateGaussian() >>> p_ = Gaussian() From b3ec1379167161cf9312e96c0da2922e9f8539c9 Mon Sep 17 00:00:00 2001 From: marekwadinger Date: Tue, 1 Aug 2023 09:58:43 +0200 Subject: [PATCH 10/16] UPDATE: make mu, var return dict, pd.DataFrame --- river/proba/gaussian.py | 64 +++++++++++++++++++++++------------------ 1 file changed, 36 insertions(+), 28 deletions(-) diff --git a/river/proba/gaussian.py b/river/proba/gaussian.py index 2bd2c943bf..c2d78ededa 100644 --- a/river/proba/gaussian.py +++ b/river/proba/gaussian.py @@ -4,6 +4,7 @@ import warnings import numpy as np +import pandas as pd from scipy.stats import multivariate_normal from river import covariance @@ -131,18 +132,19 @@ class MultivariateGaussian(base.ContinuousDistribution): >>> for x in X.to_dict(orient="records"): ... p = p.update(x) >>> p.var - array([[ 0.07611911, 0.02029152, -0.01012815], - [ 0.02029152, 0.11293148, -0.05326768], - [-0.01012815, -0.05326768, 0.0789612 ]]) + blue green red + blue 0.076119 0.020292 -0.010128 + green 0.020292 0.112931 -0.053268 + red -0.010128 -0.053268 0.078961 Retrieving current state in nice format is simple >>> p 𝒩( μ=(0.416, 0.387, 0.518), σ^2=( - [0.076, 0.020, -0.010] - [0.020, 0.113, -0.053] - [-0.010, -0.053, 0.079] + [ 0.076 0.020 -0.010] + [ 0.020 0.113 -0.053] + [-0.010 -0.053 0.079] ) ) @@ -150,7 +152,7 @@ class MultivariateGaussian(base.ContinuousDistribution): >>> p.n_samples 8.0 >>> p.mode # doctest: +ELLIPSIS - [0.415..., 0.386..., 0.517...] + {'red': 0.415..., 'green': 0.386..., 'blue': 0.517...} To retrieve pdf and cdf >>> p(x) # doctest: +ELLIPSIS @@ -169,9 +171,10 @@ class MultivariateGaussian(base.ContinuousDistribution): >>> for x in X.to_dict(orient="records"): ... p = p.update(x) >>> p.var - array([[ 0.08706173, -0.02287347, 0.00776493], - [-0.02287347, 0.01427901, -0.02518146], - [ 0.00776493, -0.02518146, 0.09506639]]) + blue green red + blue 0.087062 -0.022873 0.007765 + green -0.022873 0.014279 -0.025181 + red 0.007765 -0.025181 0.095066 MultivariateGaussian works with `utils.TimeRolling` @@ -181,9 +184,10 @@ class MultivariateGaussian(base.ContinuousDistribution): >>> for t, x in X.iterrows(): ... p = p.update(x.to_dict(), t=t) >>> p.var - array([[ 0.08706173, -0.02287347, 0.00776493], - [-0.02287347, 0.01427901, -0.02518146], - [ 0.00776493, -0.02518146, 0.09506639]]) + blue green red + blue 0.087062 -0.022873 0.007765 + green -0.022873 0.014279 -0.025181 + red 0.007765 -0.025181 0.095066 Singlevariate usage is consistent with Gaussian @@ -214,13 +218,11 @@ def n_samples(self): @property def mu(self): """The mean value of the distribution.""" - return list( - { - key1: values.mean.get() - for (key1, key2), values in self._var.matrix.items() - if key1 == key2 - }.values() - ) + return { + key1: values.mean.get() + for (key1, key2), values in self._var.matrix.items() + if key1 == key2 + } @property def var(self): @@ -239,19 +241,21 @@ def var(self): # Fill in the off-diagonal with covariances cov_array[i, j] = self._var[(variables[i], variables[j])].get() cov_array[j, i] = self._var[(variables[i], variables[j])].get() + + cov_array = pd.DataFrame(cov_array, index=variables, columns=variables) return cov_array @property def sigma(self): """The standard deviation of the distribution.""" - cov_array = self.var + cov_array = self.var.values return [[x**0.5 if x > 0 else float("nan") for x in row] for row in cov_array] def __repr__(self): - mu_str = ", ".join(f"{m:.3f}" for m in self.mu) - var_str = "\n".join( - " [" + ", ".join(f"{s:.3f}" for s in row) + "]" - for row in self.var) + mu_str = ", ".join(f"{m:.3f}" for m in self.mu.values()) + var_str = self.var.to_string( + float_format="{:0.3f}".format, header=False, index=False) + var_str = " [" + var_str.replace("\n", "]\n [") + "]" return f"𝒩(\n μ=({mu_str}),\n σ^2=(\n{var_str}\n )\n)" def update(self, x, w=1.0): @@ -270,7 +274,7 @@ def __call__(self, x): var = self.var if var is not None: try: - return multivariate_normal(self.mu, var).pdf(x) + return multivariate_normal([*self.mu.values()], var).pdf(x) # TODO: validate occurence of ValueError # The input matrix must be symmetric positive semidefinite. except ValueError: # pragma: no cover @@ -282,11 +286,15 @@ def __call__(self, x): def cdf(self, x): x = list(x.values()) - return multivariate_normal(self.mu, self.var, allow_singular=True).cdf(x) + return multivariate_normal( + [*self.mu.values()], + self.var, + allow_singular=True + ).cdf(x) def sample(self): return multivariate_normal( - self.mu, + [*self.mu.values()], self.var, ).rvs().tolist() From 411af34562538f178c6abc5925e4f8ec24f0eb42 Mon Sep 17 00:00:00 2001 From: marekwadinger Date: Tue, 1 Aug 2023 10:00:55 +0200 Subject: [PATCH 11/16] Reformatting by pre-commit --- river/proba/gaussian.py | 37 ++++++++++++++++++------------------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/river/proba/gaussian.py b/river/proba/gaussian.py index c2d78ededa..7fda164817 100644 --- a/river/proba/gaussian.py +++ b/river/proba/gaussian.py @@ -128,7 +128,7 @@ class MultivariateGaussian(base.ContinuousDistribution): >>> p = MultivariateGaussian() >>> p.n_samples 0.0 - + >>> for x in X.to_dict(orient="records"): ... p = p.update(x) >>> p.var @@ -165,7 +165,7 @@ class MultivariateGaussian(base.ContinuousDistribution): [0.203..., -0.0532..., 0.840...] MultivariateGaussian works with `utils.Rolling` - + >>> from river import utils >>> p = utils.Rolling(MultivariateGaussian(), window_size=5) >>> for x in X.to_dict(orient="records"): @@ -177,7 +177,7 @@ class MultivariateGaussian(base.ContinuousDistribution): red 0.007765 -0.025181 0.095066 MultivariateGaussian works with `utils.TimeRolling` - + >>> from datetime import datetime as dt, timedelta as td >>> X.index = [dt(2023, 3, 28, 0, 0, 0) + td(seconds=x) for x in range(8)] >>> p = utils.TimeRolling(MultivariateGaussian(), period=td(seconds=5)) @@ -219,10 +219,10 @@ def n_samples(self): def mu(self): """The mean value of the distribution.""" return { - key1: values.mean.get() - for (key1, key2), values in self._var.matrix.items() - if key1 == key2 - } + key1: values.mean.get() + for (key1, key2), values in self._var.matrix.items() + if key1 == key2 + } @property def var(self): @@ -241,7 +241,7 @@ def var(self): # Fill in the off-diagonal with covariances cov_array[i, j] = self._var[(variables[i], variables[j])].get() cov_array[j, i] = self._var[(variables[i], variables[j])].get() - + cov_array = pd.DataFrame(cov_array, index=variables, columns=variables) return cov_array @@ -253,8 +253,7 @@ def sigma(self): def __repr__(self): mu_str = ", ".join(f"{m:.3f}" for m in self.mu.values()) - var_str = self.var.to_string( - float_format="{:0.3f}".format, header=False, index=False) + var_str = self.var.to_string(float_format="{:0.3f}".format, header=False, index=False) var_str = " [" + var_str.replace("\n", "]\n [") + "]" return f"𝒩(\n μ=({mu_str}),\n σ^2=(\n{var_str}\n )\n)" @@ -286,17 +285,17 @@ def __call__(self, x): def cdf(self, x): x = list(x.values()) - return multivariate_normal( - [*self.mu.values()], - self.var, - allow_singular=True - ).cdf(x) + return multivariate_normal([*self.mu.values()], self.var, allow_singular=True).cdf(x) def sample(self): - return multivariate_normal( - [*self.mu.values()], - self.var, - ).rvs().tolist() + return ( + multivariate_normal( + [*self.mu.values()], + self.var, + ) + .rvs() + .tolist() + ) @property def mode(self): From 318e973220808c5ebfa0d8a311dfa4e95d190faa Mon Sep 17 00:00:00 2001 From: marekwadinger Date: Tue, 1 Aug 2023 10:06:35 +0200 Subject: [PATCH 12/16] Make sigma return pd.DataFrame --- river/proba/gaussian.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/river/proba/gaussian.py b/river/proba/gaussian.py index 7fda164817..2e7cc8f59a 100644 --- a/river/proba/gaussian.py +++ b/river/proba/gaussian.py @@ -189,7 +189,7 @@ class MultivariateGaussian(base.ContinuousDistribution): green -0.022873 0.014279 -0.025181 red 0.007765 -0.025181 0.095066 - Singlevariate usage is consistent with Gaussian + Variance on diagonal is consistent with Gaussian >>> from river.proba import Gaussian >>> p = MultivariateGaussian() @@ -197,7 +197,7 @@ class MultivariateGaussian(base.ContinuousDistribution): >>> for t, x in X.iterrows(): ... p = p.update(x.to_dict()) ... p_ = p_.update(x['blue']) - >>> p.sigma[0][0] == p_.sigma + >>> p.sigma['blue']['blue'] == p_.sigma True """ # noqa: W291 @@ -248,8 +248,7 @@ def var(self): @property def sigma(self): """The standard deviation of the distribution.""" - cov_array = self.var.values - return [[x**0.5 if x > 0 else float("nan") for x in row] for row in cov_array] + return self.var**0.5 def __repr__(self): mu_str = ", ".join(f"{m:.3f}" for m in self.mu.values()) From cdb56c92913ae4937cc0ebd520f22a9c3fd433b0 Mon Sep 17 00:00:00 2001 From: marekwadinger Date: Wed, 2 Aug 2023 07:53:07 +0200 Subject: [PATCH 13/16] FIX: sort named values --- river/proba/gaussian.py | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/river/proba/gaussian.py b/river/proba/gaussian.py index 2e7cc8f59a..10577836ee 100644 --- a/river/proba/gaussian.py +++ b/river/proba/gaussian.py @@ -140,7 +140,7 @@ class MultivariateGaussian(base.ContinuousDistribution): Retrieving current state in nice format is simple >>> p 𝒩( - μ=(0.416, 0.387, 0.518), + μ=(0.518, 0.387, 0.416), σ^2=( [ 0.076 0.020 -0.010] [ 0.020 0.113 -0.053] @@ -152,17 +152,17 @@ class MultivariateGaussian(base.ContinuousDistribution): >>> p.n_samples 8.0 >>> p.mode # doctest: +ELLIPSIS - {'red': 0.415..., 'green': 0.386..., 'blue': 0.517...} + {'blue': 0.5179..., 'green': 0.3866..., 'red': 0.4158...} To retrieve pdf and cdf >>> p(x) # doctest: +ELLIPSIS - 1.26921953490694... + 0.97967086129734... >>> p.cdf(x) # doctest: +ELLIPSIS - 0.00787141517849810... + 0.00509653891791713... To sample data from distribution >>> p.sample() # doctest: +ELLIPSIS - [0.203..., -0.0532..., 0.840...] + [0.3053..., -0.0532..., 0.7388...] MultivariateGaussian works with `utils.Rolling` @@ -199,12 +199,11 @@ class MultivariateGaussian(base.ContinuousDistribution): ... p_ = p_.update(x['blue']) >>> p.sigma['blue']['blue'] == p_.sigma True - """ # noqa: W291 + """ def __init__(self, seed=None): super().__init__(seed) self._var = covariance.EmpiricalCovariance(ddof=1) - self.feature_names_in_ = None # TODO: add method _from_state to initialize model (for warm starting) @@ -220,7 +219,7 @@ def mu(self): """The mean value of the distribution.""" return { key1: values.mean.get() - for (key1, key2), values in self._var.matrix.items() + for (key1, key2), values in sorted(self._var.matrix.items()) if key1 == key2 } @@ -256,19 +255,19 @@ def __repr__(self): var_str = " [" + var_str.replace("\n", "]\n [") + "]" return f"𝒩(\n μ=({mu_str}),\n σ^2=(\n{var_str}\n )\n)" - def update(self, x, w=1.0): + def update(self, x): # TODO: add support for weigthed samples self._var.update(x) return self - def revert(self, x, w=1.0): + def revert(self, x): # TODO: add support for weigthed samples self._var.revert(x) return self def __call__(self, x): """PDF(x) method.""" - x = list(x.values()) + x = [x[i] for i in self.mu] var = self.var if var is not None: try: @@ -283,7 +282,7 @@ def __call__(self, x): return 0.0 # pragma: no cover def cdf(self, x): - x = list(x.values()) + x = [x[i] for i in self.mu] return multivariate_normal([*self.mu.values()], self.var, allow_singular=True).cdf(x) def sample(self): From c6bd61461f8d6b75f1e1975079c6df259c0d48b6 Mon Sep 17 00:00:00 2001 From: marekwadinger Date: Wed, 2 Aug 2023 08:04:47 +0200 Subject: [PATCH 14/16] ADD: typehints; UPDATE: type in cdf, pdf --- river/proba/gaussian.py | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/river/proba/gaussian.py b/river/proba/gaussian.py index 10577836ee..1a0ccfa078 100644 --- a/river/proba/gaussian.py +++ b/river/proba/gaussian.py @@ -208,14 +208,14 @@ def __init__(self, seed=None): # TODO: add method _from_state to initialize model (for warm starting) @property - def n_samples(self): + def n_samples(self) -> float: if not self._var.matrix: return 0.0 else: return list(self._var.matrix.values())[-1].mean.n @property - def mu(self): + def mu(self) -> dict: """The mean value of the distribution.""" return { key1: values.mean.get() @@ -224,7 +224,7 @@ def mu(self): } @property - def var(self): + def var(self) -> pd.DataFrame: """The variance of the distribution.""" variables = sorted(list({var for cov in self._var.matrix.keys() for var in cov})) # Initialize the covariance matrix array @@ -245,7 +245,7 @@ def var(self): return cov_array @property - def sigma(self): + def sigma(self) -> pd.DataFrame: """The standard deviation of the distribution.""" return self.var**0.5 @@ -255,23 +255,24 @@ def __repr__(self): var_str = " [" + var_str.replace("\n", "]\n [") + "]" return f"𝒩(\n μ=({mu_str}),\n σ^2=(\n{var_str}\n )\n)" - def update(self, x): + def update(self, x: dict): # TODO: add support for weigthed samples self._var.update(x) return self - def revert(self, x): + def revert(self, x: dict): # TODO: add support for weigthed samples self._var.revert(x) return self - def __call__(self, x): + def __call__(self, x: dict) -> float: """PDF(x) method.""" x = [x[i] for i in self.mu] var = self.var if var is not None: try: - return multivariate_normal([*self.mu.values()], var).pdf(x) + pdf_ = multivariate_normal([*self.mu.values()], var).pdf(x) + return float(pdf_) # TODO: validate occurence of ValueError # The input matrix must be symmetric positive semidefinite. except ValueError: # pragma: no cover @@ -281,11 +282,12 @@ def __call__(self, x): return 0.0 return 0.0 # pragma: no cover - def cdf(self, x): + def cdf(self, x: dict) -> float: x = [x[i] for i in self.mu] - return multivariate_normal([*self.mu.values()], self.var, allow_singular=True).cdf(x) + cdf_ = multivariate_normal([*self.mu.values()], self.var, allow_singular=True).cdf(x) + return float(cdf_) - def sample(self): + def sample(self) -> list[float]: return ( multivariate_normal( [*self.mu.values()], @@ -296,5 +298,5 @@ def sample(self): ) @property - def mode(self): + def mode(self) -> dict: return self.mu From 45ef6d8ec266e329a95ccc173133c43e4e7504c6 Mon Sep 17 00:00:00 2001 From: marekwadinger Date: Wed, 2 Aug 2023 08:16:56 +0200 Subject: [PATCH 15/16] ADD: proba.MultivariateContinuousDistribution base class --- river/proba/base.py | 23 +++++++++++++++++++++++ river/proba/gaussian.py | 18 +++++++++--------- 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/river/proba/base.py b/river/proba/base.py index 28bcb2d02f..c10766749d 100644 --- a/river/proba/base.py +++ b/river/proba/base.py @@ -104,3 +104,26 @@ def revert(self, x: float): @abc.abstractmethod def cdf(self, x: float): """Cumulative density function, i.e. P(X <= x).""" + + +class MultivariateContinuousDistribution(Distribution): + """A probability distribution for multivariate continuous values. + + Parameters + ---------- + seed + Random number generator seed for reproducibility. + + """ + + @abc.abstractmethod + def update(self, x: dict[str, float]): + """Updates the parameters of the distribution given a new observation.""" + + @abc.abstractmethod + def revert(self, x: dict[str, float]): + """Reverts the parameters of the distribution for a given observation.""" + + @abc.abstractmethod + def cdf(self, x: dict[str, float]) -> float: + """Cumulative density function, i.e. P(X <= x).""" diff --git a/river/proba/gaussian.py b/river/proba/gaussian.py index 1a0ccfa078..258966203e 100644 --- a/river/proba/gaussian.py +++ b/river/proba/gaussian.py @@ -98,7 +98,7 @@ def mode(self): return self.mu -class MultivariateGaussian(base.ContinuousDistribution): +class MultivariateGaussian(base.MultivariateContinuousDistribution): """Multivariate normal distribution with parameters mu and var. Parameters @@ -255,23 +255,23 @@ def __repr__(self): var_str = " [" + var_str.replace("\n", "]\n [") + "]" return f"𝒩(\n μ=({mu_str}),\n σ^2=(\n{var_str}\n )\n)" - def update(self, x: dict): + def update(self, x): # TODO: add support for weigthed samples self._var.update(x) return self - def revert(self, x: dict): + def revert(self, x): # TODO: add support for weigthed samples self._var.revert(x) return self - def __call__(self, x: dict) -> float: + def __call__(self, x: dict[str, float]): """PDF(x) method.""" - x = [x[i] for i in self.mu] + x_ = [x[i] for i in self.mu] var = self.var if var is not None: try: - pdf_ = multivariate_normal([*self.mu.values()], var).pdf(x) + pdf_ = multivariate_normal([*self.mu.values()], var).pdf(x_) return float(pdf_) # TODO: validate occurence of ValueError # The input matrix must be symmetric positive semidefinite. @@ -282,9 +282,9 @@ def __call__(self, x: dict) -> float: return 0.0 return 0.0 # pragma: no cover - def cdf(self, x: dict) -> float: - x = [x[i] for i in self.mu] - cdf_ = multivariate_normal([*self.mu.values()], self.var, allow_singular=True).cdf(x) + def cdf(self, x: dict[str, float]): + x_ = [x[i] for i in self.mu] + cdf_ = multivariate_normal([*self.mu.values()], self.var, allow_singular=True).cdf(x_) return float(cdf_) def sample(self) -> list[float]: From 566bcf10fb15f4400aa479fc7c59d69fe1994387 Mon Sep 17 00:00:00 2001 From: marekwadinger Date: Wed, 2 Aug 2023 08:23:10 +0200 Subject: [PATCH 16/16] UPDATE: sample of MultivariateGaussian returns dict --- river/proba/gaussian.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/river/proba/gaussian.py b/river/proba/gaussian.py index 258966203e..28a07869ba 100644 --- a/river/proba/gaussian.py +++ b/river/proba/gaussian.py @@ -162,7 +162,7 @@ class MultivariateGaussian(base.MultivariateContinuousDistribution): To sample data from distribution >>> p.sample() # doctest: +ELLIPSIS - [0.3053..., -0.0532..., 0.7388...] + {'blue': 0.3053..., 'green': -0.0532..., 'red': 0.7388...} MultivariateGaussian works with `utils.Rolling` @@ -287,15 +287,9 @@ def cdf(self, x: dict[str, float]): cdf_ = multivariate_normal([*self.mu.values()], self.var, allow_singular=True).cdf(x_) return float(cdf_) - def sample(self) -> list[float]: - return ( - multivariate_normal( - [*self.mu.values()], - self.var, - ) - .rvs() - .tolist() - ) + def sample(self) -> dict[str, float]: + sample_ = multivariate_normal([*self.mu.values()], self.var).rvs().tolist() + return dict(zip(self.mu.keys(), sample_)) @property def mode(self) -> dict: