Skip to content

Commit

Permalink
Updates in ODAC code (#1584)
Browse files Browse the repository at this point in the history
* update description, fix a bug when calculating rnomc, add draw() method to see hierarchical cluster's structure as a Graphviz graph, and working with Var when cluster has only one time-series

* change the description of draw() method

* add cluster's name in draw() method

* correct version

* Update river/cluster/odac.py

* Update river/cluster/odac.py

* update docs/releases/unreleased.md

* Update docs/releases/unreleased.md

---------

Co-authored-by: gonfa3003 <[email protected]>
Co-authored-by: Saulo Martiello Mastelini <[email protected]>
  • Loading branch information
3 people authored Aug 1, 2024
1 parent df257b8 commit d606d7b
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 70 deletions.
7 changes: 7 additions & 0 deletions docs/releases/unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

- The units used in River have been corrected to be based on powers of 2 (KiB, MiB). This only changes the display, the behaviour is unchanged.

## cluster

- Update the description of `cluster.ODAC`.
- Change `draw` in `cluster.ODAC` to draw the hierarchical cluster's structure as a Graphviz graph.
- Add `render_ascii` in `cluster.ODAC` to render the hierarchical cluster's structure in text format.
- Work with `stats.Var` in `cluster.ODAC` when cluster has only one time series.

## tree

- Instead of letting trees grow indefinitely, setting the `max_depth` parameter to `None` will stop the trees from growing when they reach the system recursion limit.
226 changes: 156 additions & 70 deletions river/cluster/odac.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,50 +9,22 @@


class ODAC(base.Clusterer):
"""The Online Divisive-Agglomerative Clustering (ODAC)[^1] aims at continuously
maintaining a hierarchical cluster structure from evolving time series data
streams.
ODAC continuosly monitors the evolution of clusters' diameters and split or merge
them by gathering more data or reacting to concept drift. Such changes are supported
by a confidence level that comes from the Hoeffding bound. ODAC relies on keeping
the linear correlation between series to evaluate whether or not the time series
hierarchy has changed.
"""The Online Divisive-Agglomerative Clustering (ODAC)[^1] aims at continuously maintaining
a hierarchical cluster structure from evolving time series data streams.
The distance between time-series a and b is given by `rnomc(a, b) = sqrt((1 - corr(a, b)) / 2)`,
where `corr(a, b)` is the Pearson Correlation coefficient.
In the following topics, ε stands for the Hoeffding bound and considers clusters cj
with descendants ck and cs.
**The Merge Operator**
The Splitting Criteria guarantees that cluster's diameters monotonically decrease.
- If diameter (ck) - diameter (cj) > ε OR diameter (cs) - diameter (cj ) > ε:
* There is a change in the correlation structure, so merge clusters ck and cs into cj.
**Splitting Criteria**
Consider:
- d0: the minimum distance;
where `corr(a, b)` is the Pearson Correlation coefficient. If the cluster has only one time-series,
the diameter is given by the time-series variance. The cluster's diameter is given by the largest
distance between the cluster's time-series.
- d1: the farthest distance;
- d_avg: the average distance;
- d2: the second farthest distance.
Then:
- if d1 - d2 > εk or t > εk then
- if (d1 - d0)|(d1 - d_avg) - (d_avg - d0) > εk then
* Split the cluster
ODAC continuously monitors the evolution of diameters, only of the leaves, and splits or merges them
by gathering more data or reacting to concept drift - a confidence level from the Hoeffding bound
supports such changes.
So, the split operator, where the Hoeffding bound is applied, occurs when the difference between
the largest distance (diameter) and the second largest difference is greater than a constant.
Furthermore, the merge operator checks if one of the cluster's children has a diameter bigger
than their parent - applying the Hoeffding bound again.
Parameters
----------
Expand Down Expand Up @@ -88,15 +60,15 @@ class ODAC(base.Clusterer):
Structure changed at observation 200
Structure changed at observation 300
>>> print(model.draw(n_decimal_places=2))
>>> print(model.render_ascii())
ROOT d1=0.79 d2=0.76 [NOT ACTIVE]
├── CH1_LVL_1 d1=0.74 d2=0.72 [NOT ACTIVE]
│ ├── CH1_LVL_2 d1=<Not calculated> [3]
│ ├── CH1_LVL_2 d1=0.08 [3]
│ └── CH2_LVL_2 d1=0.73 [2, 4]
└── CH2_LVL_1 d1=0.81 d2=0.78 [NOT ACTIVE]
├── CH1_LVL_2 d1=0.73 d2=0.67 [NOT ACTIVE]
│ ├── CH1_LVL_3 d1=0.72 [0, 9]
│ └── CH2_LVL_3 d1=<Not calculated> [1]
│ └── CH2_LVL_3 d1=0.08 [1]
└── CH2_LVL_2 d1=0.74 d2=0.73 [NOT ACTIVE]
├── CH1_LVL_3 d1=0.71 [5, 6]
└── CH2_LVL_3 d1=0.71 [7, 8]
Expand All @@ -119,7 +91,8 @@ class ODAC(base.Clusterer):
References
----------
[^1]: [Hierarchical clustering of time-series data streams.](http://doi.org/10.1109/TKDE.2007.190727)
[^1]: P. P. Rodrigues, J. Gama and J. Pedroso, "Hierarchical Clustering of Time-Series Data Streams" in IEEE Transactions
on Knowledge and Data Engineering, vol. 20, no. 5, pp. 615-627, May 2008, doi: 10.1109/TKDE.2007.190727.
"""

Expand Down Expand Up @@ -231,7 +204,7 @@ def learn_one(self, x: dict):
# Time to time approach
if self._update_timer == 0:
# Calculate all the crucial variables to the next procedure
leaf.calculate_coefficients(self.confidence_level)
leaf.calculate_coefficients(confidence_level=self.confidence_level)

if leaf.test_aggregate() or leaf.test_split(tau=self.tau):
# Put the flag change_detected to true to indicate to the user that the structure changed
Expand All @@ -251,10 +224,10 @@ def predict_one(self, x: dict):
A dictionary of features.
"""
raise NotImplementedError("ODAC does not predict anything. It builds a hierarchical cluster's structure.")
raise NotImplementedError()

def draw(self, n_decimal_places: int = 2) -> str:
"""Method to draw the hierarchical cluster's structure.
def render_ascii(self, n_decimal_places: int = 2) -> str:
"""Method to render the hierarchical cluster's structure in text format.
Parameters
----------
Expand All @@ -268,11 +241,120 @@ def draw(self, n_decimal_places: int = 2) -> str:

return self._root_node.design_structure(n_decimal_places).rstrip("\n")

def draw(self, max_depth: int | None = None, show_clusters_info: list[typing.Hashable] = ["timeseries_names", "d1", "d2", "e"], n_decimal_places: int = 2):
"""Method to draw the hierarchical cluster's structure as a Graphviz graph.
Parameters
----------
max_depth
The maximum depth of the tree to display.
show_clusters_info
List of cluster information to show. Valid options are:
- "timeseries_indexes": Shows the indexes of the timeseries in the cluster.
- "timeseries_names": Shows the names of the timeseries in the cluster.
- "name": Shows the cluster's name.
- "d1": Shows the d1 (the largest distance in the cluster).
- "d2": Shows the d2 (the second largest distance in the cluster).
- "e": Shows the error bound.
n_decimal_places
The number of decimal places to show for numerical values.
"""
if not (n_decimal_places > 0 and n_decimal_places < 10):
raise ValueError("n_decimal_places must be between 1 and 9.")

try:
import graphviz
except ImportError as e:
raise ValueError("You have to install graphviz to use the draw method.") from e

counter = 0

dot = graphviz.Digraph(
graph_attr={"splines": "ortho", "forcelabels": "true", "overlap": "false"},
node_attr={
"shape": "box",
"penwidth": "1.2",
"fontname": "trebuchet",
"fontsize": "11",
"margin": "0.1,0.0",
},
edge_attr={"penwidth": "0.6", "center": "true", "fontsize": "7 "},
)

def iterate(node: ODACCluster, parent_node: str | None = None, depth: int = 0):
nonlocal counter, max_depth, show_clusters_info, n_decimal_places

if max_depth is not None and depth > max_depth:
return

node_n = str(counter)
counter += 1

label = ""

# checks if user wants to see information about clusters
if len(show_clusters_info) > 0:
show_clusters_info_copy = show_clusters_info.copy()

if "name" in show_clusters_info_copy:
label += f"{node.name}"
show_clusters_info_copy.remove("name")
if len(show_clusters_info_copy) > 0:
label += "\n"
if "timeseries_indexes" in show_clusters_info_copy:
# Convert timeseries names to indexes
name_to_index = {name: index for index, name in enumerate(self._root_node.timeseries_names)}
timeseries_indexes = [name_to_index[_name] for _name in node.timeseries_names if _name in name_to_index]

label += f"{timeseries_indexes}"
show_clusters_info_copy.remove("timeseries_indexes")
if len(show_clusters_info_copy) > 0:
label += "\n"
if "timeseries_names" in show_clusters_info_copy:
label += f"{node.timeseries_names}"
show_clusters_info_copy.remove("timeseries_names")
if len(show_clusters_info_copy) > 0:
label += "\n"
if "d1" in show_clusters_info_copy:
if node.d1 is not None:
label += f"d1={node.d1:.{n_decimal_places}f}"
else:
label += "d1=<Not calculated>"
show_clusters_info_copy.remove("d1")
if len(show_clusters_info_copy) > 0:
label += "\n"
if "d2" in show_clusters_info_copy and node.d2 is not None:
label += f"d2={node.d2:.{n_decimal_places}f}"
show_clusters_info_copy.remove("d2")
if len(show_clusters_info_copy) > 0:
label += "\n"
if "e" in show_clusters_info_copy:
label += f"e={node.e:.{n_decimal_places}f}"

show_clusters_info_copy.clear()

# Creates a node with different color to differentiate the active clusters from the non-active
if node.active:
dot.node(node_n, label, style="filled", fillcolor="#76b5c5")
else:
dot.node(node_n, label, style="filled", fillcolor="#f2f2f2")

if parent_node is not None:
dot.edge(parent_node, node_n)

if node.children is not None:
iterate(node=node.children.first, parent_node=node_n, depth=depth + 1)
iterate(node.children.second, parent_node=node_n, depth=depth + 1)

iterate(node=self._root_node)

return dot

@property
def structure_changed(self) -> bool:
return self._structure_changed


class ODACCluster(base.Base):
"""Cluster class for representing individual clusters."""

Expand All @@ -284,7 +366,7 @@ def __init__(self, name: str, parent: ODACCluster | None = None):
self.children: ODACChildren | None = None

self.timeseries_names: list[typing.Hashable] = []
self._statistics: dict[tuple[typing.Hashable, typing.Hashable], stats.PearsonCorr] | None
self._statistics: dict[tuple[typing.Hashable, typing.Hashable], stats.PearsonCorr] | stats.Var | None

self.d1: float | None = None
self.d2: float | None = None
Expand Down Expand Up @@ -348,14 +430,14 @@ def __str__(self) -> str:
def __repr__(self) -> str:
return self.design_structure()

def _init_stats(self) -> dict[tuple[typing.Hashable, typing.Hashable], stats.PearsonCorr]:
def _init_stats(self) -> dict[tuple[typing.Hashable, typing.Hashable], stats.PearsonCorr] | stats.Var:
return collections.defaultdict(
stats.PearsonCorr,
{
(k1, k2): stats.PearsonCorr()
for k1, k2 in itertools.combinations(self.timeseries_names, 2)
},
)
) if len(self.timeseries_names) > 1 else stats.Var()

# TODO: not sure if this is the best design
def __call__(self, ts_names: list[typing.Hashable]):
Expand All @@ -364,12 +446,15 @@ def __call__(self, ts_names: list[typing.Hashable]):
self._statistics = self._init_stats()

def update_statistics(self, x: dict) -> None:
# For each pair of time-series in the cluster update the correlation
# values with the data received
for (k1, k2), item in self._statistics.items(): # type: ignore
if x.get(k1, None) is None or x.get(k2, None) is None:
continue
item.update(float(x[k1]), float(x[k2]))
if len(self.timeseries_names) > 1:
# For each pair of time-series in the cluster update the correlation
# values with the data received
for (k1, k2), item in self._statistics.items(): # type: ignore
if x.get(k1, None) is None or x.get(k2, None) is None:
continue
item.update(float(x[k1]), float(x[k2]))
else:
self._statistics.update(float(x.get(self.timeseries_names[0]))) # type: ignore

# Increment the number of observation in the cluster
self.n += 1
Expand All @@ -380,16 +465,17 @@ def _calculate_rnomc_dict(self)-> dict[tuple[typing.Hashable, typing.Hashable],
rnomc_dict = {}

for k1, k2 in itertools.combinations(self.timeseries_names, 2):
rnomc_dict[(k1, k2)] = math.sqrt((1 - self._statistics[(k1, k2)].get()) / 2.0) # type: ignore
value = abs((1 - self._statistics[(k1, k2)].get()) / 2.0) # type: ignore
rnomc_dict[(k1, k2)] = math.sqrt(value)

return rnomc_dict

# Method to calculate coefficients for splitting or aggregation
def calculate_coefficients(self, confidence_level: float) -> None:
# Get the rnomc values
rnomc_dict = self._calculate_rnomc_dict()
if len(self.timeseries_names) > 1:
# Get the rnomc values
rnomc_dict = self._calculate_rnomc_dict()

if len(rnomc_dict) > 0:
# Get the average distance in the cluster
self.avg = sum(rnomc_dict.values()) / self.n

Expand All @@ -405,13 +491,13 @@ def calculate_coefficients(self, confidence_level: float) -> None:
self.pivot_2, self.d2 = max(remaining.items(), key=lambda x: x[1])
else:
self.pivot_2 = self.d2 = None # type: ignore
else:
self.d1 = self._statistics.get() # type: ignore
# Calculate the Hoeffding bound in the cluster
self.e = math.sqrt(math.log(1 / confidence_level) / (2 * self.n))

# Calculate the Hoeffding bound in the cluster
self.e = math.sqrt(math.log(1 / confidence_level) / (2 * self.n))

# Method that gives the closest cluster where the current time series is located
def _get_closest_cluster(self, pivot_1, pivot_2, current, rnormc_dict: dict) -> int:
"""Method that gives the closest cluster where the current time series is located."""

dist_1 = rnormc_dict.get((min(pivot_1, current), max(pivot_1, current)), 0)
dist_2 = rnormc_dict.get((min(pivot_2, current), max(pivot_2, current)), 0)
return 2 if dist_1 >= dist_2 else 1
Expand Down Expand Up @@ -444,8 +530,9 @@ def _split_this_cluster(self, pivot_1: typing.Hashable, pivot_2: typing.Hashable

# Set the active flag to false. Since this cluster is not an active cluster anymore.
self.active = False
self.avg = self.d0 = self.pivot_0 = self.pivot_1 = self.pivot_2 = None # type: ignore
self._statistics = None

# Reset some attributes
self.avg = self.d0 = self.pivot_0 = self.pivot_1 = self.pivot_2 = self._statistics = None # type: ignore

# Method that proceeds to merge on this cluster
def _aggregate_this_cluster(self):
Expand Down Expand Up @@ -485,7 +572,6 @@ def test_aggregate(self):
return True
return False


class ODACChildren(base.Base):
"""Children class representing child clusters."""

Expand Down

0 comments on commit d606d7b

Please sign in to comment.