Skip to content

Commit

Permalink
update description, fix a bug when calculating rnomc, add draw() meth…
Browse files Browse the repository at this point in the history
…od to see hierarchical cluster's structure as a Graphviz graph, and working with Var when cluster has only one time-series
  • Loading branch information
Bezum30 committed Jul 26, 2024
1 parent bb77e55 commit 27b4a93
Showing 1 changed file with 147 additions and 69 deletions.
216 changes: 147 additions & 69 deletions river/cluster/odac.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,50 +9,22 @@


class ODAC(base.Clusterer):
"""The Online Divisive-Agglomerative Clustering (ODAC)[^1] aims at continuously
maintaining a hierarchical cluster structure from evolving time series data
streams.
ODAC continuosly monitors the evolution of clusters' diameters and split or merge
them by gathering more data or reacting to concept drift. Such changes are supported
by a confidence level that comes from the Hoeffding bound. ODAC relies on keeping
the linear correlation between series to evaluate whether or not the time series
hierarchy has changed.
"""The Online Divisive-Agglomerative Clustering (ODAC)[^1] aims at continuously maintaining
a hierarchical cluster structure from evolving time series data streams.
The distance between time-series a and b is given by `rnomc(a, b) = sqrt((1 - corr(a, b)) / 2)`,
where `corr(a, b)` is the Pearson Correlation coefficient.
In the following topics, ε stands for the Hoeffding bound and considers clusters cj
with descendants ck and cs.
**The Merge Operator**
The Splitting Criteria guarantees that cluster's diameters monotonically decrease.
- If diameter (ck) - diameter (cj) > ε OR diameter (cs) - diameter (cj ) > ε:
* There is a change in the correlation structure, so merge clusters ck and cs into cj.
**Splitting Criteria**
Consider:
- d0: the minimum distance;
where `corr(a, b)` is the Pearson Correlation coefficient. If the cluster has only one time-series,
the diameter is given by the time-series variance. The cluster's diameter is given by the largest
distance between the cluster's time-series.
- d1: the farthest distance;
- d_avg: the average distance;
- d2: the second farthest distance.
Then:
- if d1 - d2 > εk or t > εk then
- if (d1 - d0)|(d1 - d_avg) - (d_avg - d0) > εk then
* Split the cluster
ODAC continuously monitors the evolution of diameters, only of the leaves, and splits or merges them
by gathering more data or reacting to concept drift - a confidence level from the Hoeffding bound
supports such changes.
So, the split operator, where the Hoeffding bound is applied, occurs when the difference between
the largest distance (diameter) and the second largest difference is greater than a constant.
Furthermore, the merge operator checks if one of the cluster's children has a diameter greater
than him - applying the Hoeffding bound again.
Parameters
----------
Expand Down Expand Up @@ -88,15 +60,15 @@ class ODAC(base.Clusterer):
Structure changed at observation 200
Structure changed at observation 300
>>> print(model.draw(n_decimal_places=2))
>>> print(model.render_ascii())
ROOT d1=0.79 d2=0.76 [NOT ACTIVE]
├── CH1_LVL_1 d1=0.74 d2=0.72 [NOT ACTIVE]
│ ├── CH1_LVL_2 d1=<Not calculated> [3]
│ ├── CH1_LVL_2 d1=0.08 [3]
│ └── CH2_LVL_2 d1=0.73 [2, 4]
└── CH2_LVL_1 d1=0.81 d2=0.78 [NOT ACTIVE]
├── CH1_LVL_2 d1=0.73 d2=0.67 [NOT ACTIVE]
│ ├── CH1_LVL_3 d1=0.72 [0, 9]
│ └── CH2_LVL_3 d1=<Not calculated> [1]
│ └── CH2_LVL_3 d1=0.08 [1]
└── CH2_LVL_2 d1=0.74 d2=0.73 [NOT ACTIVE]
├── CH1_LVL_3 d1=0.71 [5, 6]
└── CH2_LVL_3 d1=0.71 [7, 8]
Expand All @@ -119,7 +91,8 @@ class ODAC(base.Clusterer):
References
----------
[^1]: [Hierarchical clustering of time-series data streams.](http://doi.org/10.1109/TKDE.2007.190727)
[^1]: P. P. Rodrigues, J. Gama and J. Pedroso, "Hierarchical Clustering of Time-Series Data Streams" in IEEE Transactions
on Knowledge and Data Engineering, vol. 20, no. 5, pp. 615-627, May 2008, doi: 10.1109/TKDE.2007.190727.
"""

Expand Down Expand Up @@ -231,7 +204,7 @@ def learn_one(self, x: dict):
# Time to time approach
if self._update_timer == 0:
# Calculate all the crucial variables to the next procedure
leaf.calculate_coefficients(self.confidence_level)
leaf.calculate_coefficients(confidence_level=self.confidence_level)

if leaf.test_aggregate() or leaf.test_split(tau=self.tau):
# Put the flag change_detected to true to indicate to the user that the structure changed
Expand All @@ -251,10 +224,10 @@ def predict_one(self, x: dict):
A dictionary of features.
"""
raise NotImplementedError("ODAC does not predict anything. It builds a hierarchical cluster's structure.")
raise NotImplementedError()

def draw(self, n_decimal_places: int = 2) -> str:
"""Method to draw the hierarchical cluster's structure.
def render_ascii(self, n_decimal_places: int = 2) -> str:
"""Method to render the hierarchical cluster's structure in text format.
Parameters
----------
Expand All @@ -268,11 +241,114 @@ def draw(self, n_decimal_places: int = 2) -> str:

return self._root_node.design_structure(n_decimal_places).rstrip("\n")

def draw(self, max_depth: int | None = None, show_clusters_info: list[str] = ["timeseries_names", "d1", "d2", "e"], n_decimal_places: int = 2):
"""Method to draw the hierarchical cluster's structure as a Graphviz graph.
Parameters
----------
max_depth
The maximum depth of the tree to display.
show_clusters_info
List of cluster information to show. Valid options are:
- "timeseries_indexes": Shows the indexes of the timeseries.
- "timeseries_names": Shows the names of the timeseries.
- "d1": Shows the d1 distance.
- "d2": Shows the d2 distance.
- "e": Shows the error measure.
n_decimal_places
The number of decimal places to show for numerical values.
"""
if not (n_decimal_places > 0 and n_decimal_places < 10):
raise ValueError("n_decimal_places must be between 1 and 9.")

try:
import graphviz
except ImportError as e:
raise ValueError("You have to install graphviz to use the draw method.") from e

counter = 0

dot = graphviz.Digraph(
graph_attr={"splines": "ortho", "forcelabels": "true", "overlap": "false"},
node_attr={
"shape": "box",
"penwidth": "1.2",
"fontname": "trebuchet",
"fontsize": "11",
"margin": "0.1,0.0",
},
edge_attr={"penwidth": "0.6", "center": "true", "fontsize": "7 "},
)

def iterate(node: ODACCluster, parent_node: str | None = None, depth: int = 0):
nonlocal counter, max_depth, show_clusters_info, n_decimal_places

if max_depth is not None and depth > max_depth:
return

node_n = str(counter)
counter += 1

label = ""

show_clusters_info_copy = show_clusters_info.copy()

# checks
if len(show_clusters_info_copy) > 0:
if "timeseries_indexes" in show_clusters_info_copy:
# Convert timeseries names to indexes
name_to_index = {name: index for index, name in enumerate(self._root_node.timeseries_names)}
timeseries_indexes = [name_to_index[_name] for _name in node.timeseries_names if _name in name_to_index]

label += f"{timeseries_indexes}"
show_clusters_info_copy.remove("timeseries_indexes")
if len(show_clusters_info_copy) > 0:
label += "\n"
if "timeseries_names" in show_clusters_info_copy:
label += f"{node.timeseries_names}"
show_clusters_info_copy.remove("timeseries_names")
if len(show_clusters_info_copy) > 0:
label += "\n"
if "d1" in show_clusters_info_copy:
if node.d1 is not None:
label += f"d1={node.d1:.{n_decimal_places}f}"
else:
label += "d1=<Not calculated>"
show_clusters_info_copy.remove("d1")
if len(show_clusters_info_copy) > 0:
label += "\n"
if "d2" in show_clusters_info_copy and node.d2 is not None:
label += f"d2={node.d2:.{n_decimal_places}f}"
show_clusters_info_copy.remove("d2")
if len(show_clusters_info_copy) > 0:
label += "\n"
if "e" in show_clusters_info_copy:
label += f"e={node.e:.{n_decimal_places}f}"

show_clusters_info_copy.clear()

# Creates a node with different colors to differentiate the active clusters from the non-active
if node.active:
dot.node(node_n, label, style="filled", fillcolor="#76b5c5")
else:
dot.node(node_n, label, style="filled", fillcolor="#f2f2f2")

if parent_node is not None:
dot.edge(parent_node, node_n)

if node.children is not None:
iterate(node=node.children.first, parent_node=node_n, depth=depth + 1)
iterate(node.children.second, parent_node=node_n, depth=depth + 1)

iterate(node=self._root_node)

return dot

@property
def structure_changed(self) -> bool:
return self._structure_changed


class ODACCluster(base.Base):
"""Cluster class for representing individual clusters."""

Expand All @@ -284,7 +360,7 @@ def __init__(self, name: str, parent: ODACCluster | None = None):
self.children: ODACChildren | None = None

self.timeseries_names: list[typing.Hashable] = []
self._statistics: dict[tuple[typing.Hashable, typing.Hashable], stats.PearsonCorr] | None
self._statistics: dict[tuple[typing.Hashable, typing.Hashable], stats.PearsonCorr] | stats.Var

self.d1: float | None = None
self.d2: float | None = None
Expand Down Expand Up @@ -348,14 +424,14 @@ def __str__(self) -> str:
def __repr__(self) -> str:
return self.design_structure()

def _init_stats(self) -> dict[tuple[typing.Hashable, typing.Hashable], stats.PearsonCorr]:
def _init_stats(self) -> dict[tuple[typing.Hashable, typing.Hashable], stats.PearsonCorr] | stats.Var:
return collections.defaultdict(
stats.PearsonCorr,
{
(k1, k2): stats.PearsonCorr()
for k1, k2 in itertools.combinations(self.timeseries_names, 2)
},
)
) if len(self.timeseries_names) > 1 else stats.Var()

# TODO: not sure if this is the best design
def __call__(self, ts_names: list[typing.Hashable]):
Expand All @@ -364,12 +440,15 @@ def __call__(self, ts_names: list[typing.Hashable]):
self._statistics = self._init_stats()

def update_statistics(self, x: dict) -> None:
# For each pair of time-series in the cluster update the correlation
# values with the data received
for (k1, k2), item in self._statistics.items(): # type: ignore
if x.get(k1, None) is None or x.get(k2, None) is None:
continue
item.update(float(x[k1]), float(x[k2]))
if len(self.timeseries_names) > 1:
# For each pair of time-series in the cluster update the correlation
# values with the data received
for (k1, k2), item in self._statistics.items(): # type: ignore
if x.get(k1, None) is None or x.get(k2, None) is None:
continue
item.update(float(x[k1]), float(x[k2]))
else:
self._statistics.update(float(x.get(self.timeseries_names[0]))) # type: ignore

# Increment the number of observation in the cluster
self.n += 1
Expand All @@ -380,16 +459,17 @@ def _calculate_rnomc_dict(self)-> dict[tuple[typing.Hashable, typing.Hashable],
rnomc_dict = {}

for k1, k2 in itertools.combinations(self.timeseries_names, 2):
rnomc_dict[(k1, k2)] = math.sqrt((1 - self._statistics[(k1, k2)].get()) / 2.0) # type: ignore
value = abs((1 - self._statistics[(k1, k2)].get()) / 2.0) # type: ignore
rnomc_dict[(k1, k2)] = math.sqrt(value)

return rnomc_dict

# Method to calculate coefficients for splitting or aggregation
def calculate_coefficients(self, confidence_level: float) -> None:
# Get the rnomc values
rnomc_dict = self._calculate_rnomc_dict()
if len(self.timeseries_names) > 1:
# Get the rnomc values
rnomc_dict = self._calculate_rnomc_dict()

if len(rnomc_dict) > 0:
# Get the average distance in the cluster
self.avg = sum(rnomc_dict.values()) / self.n

Expand All @@ -405,13 +485,13 @@ def calculate_coefficients(self, confidence_level: float) -> None:
self.pivot_2, self.d2 = max(remaining.items(), key=lambda x: x[1])
else:
self.pivot_2 = self.d2 = None # type: ignore
else:
self.d1 = self._statistics.get() # type: ignore
# Calculate the Hoeffding bound in the cluster
self.e = math.sqrt(math.log(1 / confidence_level) / (2 * self.n))

# Calculate the Hoeffding bound in the cluster
self.e = math.sqrt(math.log(1 / confidence_level) / (2 * self.n))

# Method that gives the closest cluster where the current time series is located
def _get_closest_cluster(self, pivot_1, pivot_2, current, rnormc_dict: dict) -> int:
"""Method that gives the closest cluster where the current time series is located."""

dist_1 = rnormc_dict.get((min(pivot_1, current), max(pivot_1, current)), 0)
dist_2 = rnormc_dict.get((min(pivot_2, current), max(pivot_2, current)), 0)
return 2 if dist_1 >= dist_2 else 1
Expand Down Expand Up @@ -445,7 +525,6 @@ def _split_this_cluster(self, pivot_1: typing.Hashable, pivot_2: typing.Hashable
# Set the active flag to false. Since this cluster is not an active cluster anymore.
self.active = False
self.avg = self.d0 = self.pivot_0 = self.pivot_1 = self.pivot_2 = None # type: ignore
self._statistics = None

# Method that proceeds to merge on this cluster
def _aggregate_this_cluster(self):
Expand Down Expand Up @@ -485,7 +564,6 @@ def test_aggregate(self):
return True
return False


class ODACChildren(base.Base):
"""Children class representing child clusters."""

Expand Down

0 comments on commit 27b4a93

Please sign in to comment.