diff --git a/river/cluster/odac.py b/river/cluster/odac.py index 5c87d4666e..49b09b820f 100644 --- a/river/cluster/odac.py +++ b/river/cluster/odac.py @@ -9,50 +9,22 @@ class ODAC(base.Clusterer): - """The Online Divisive-Agglomerative Clustering (ODAC)[^1] aims at continuously - maintaining a hierarchical cluster structure from evolving time series data - streams. - - ODAC continuosly monitors the evolution of clusters' diameters and split or merge - them by gathering more data or reacting to concept drift. Such changes are supported - by a confidence level that comes from the Hoeffding bound. ODAC relies on keeping - the linear correlation between series to evaluate whether or not the time series - hierarchy has changed. + """The Online Divisive-Agglomerative Clustering (ODAC)[^1] aims at continuously maintaining + a hierarchical cluster structure from evolving time series data streams. The distance between time-series a and b is given by `rnomc(a, b) = sqrt((1 - corr(a, b)) / 2)`, - where `corr(a, b)` is the Pearson Correlation coefficient. - - In the following topics, ε stands for the Hoeffding bound and considers clusters cj - with descendants ck and cs. - - **The Merge Operator** - - The Splitting Criteria guarantees that cluster's diameters monotonically decrease. - - - If diameter (ck) - diameter (cj) > ε OR diameter (cs) - diameter (cj ) > ε: - - * There is a change in the correlation structure, so merge clusters ck and cs into cj. - - **Splitting Criteria** - - Consider: - - - d0: the minimum distance; + where `corr(a, b)` is the Pearson Correlation coefficient. If the cluster has only one time-series, + the diameter is given by the time-series variance. The cluster's diameter is given by the largest + distance between the cluster's time-series. - - d1: the farthest distance; - - - d_avg: the average distance; - - - d2: the second farthest distance. - - Then: - - - if d1 - d2 > εk or t > εk then - - - if (d1 - d0)|(d1 - d_avg) - (d_avg - d0) > εk then - - * Split the cluster + ODAC continuously monitors the evolution of diameters, only of the leaves, and splits or merges them + by gathering more data or reacting to concept drift - a confidence level from the Hoeffding bound + supports such changes. + So, the split operator, where the Hoeffding bound is applied, occurs when the difference between + the largest distance (diameter) and the second largest difference is greater than a constant. + Furthermore, the merge operator checks if one of the cluster's children has a diameter greater + than him - applying the Hoeffding bound again. Parameters ---------- @@ -88,15 +60,15 @@ class ODAC(base.Clusterer): Structure changed at observation 200 Structure changed at observation 300 - >>> print(model.draw(n_decimal_places=2)) + >>> print(model.render_ascii()) ROOT d1=0.79 d2=0.76 [NOT ACTIVE] ├── CH1_LVL_1 d1=0.74 d2=0.72 [NOT ACTIVE] - │ ├── CH1_LVL_2 d1= [3] + │ ├── CH1_LVL_2 d1=0.08 [3] │ └── CH2_LVL_2 d1=0.73 [2, 4] └── CH2_LVL_1 d1=0.81 d2=0.78 [NOT ACTIVE] ├── CH1_LVL_2 d1=0.73 d2=0.67 [NOT ACTIVE] │ ├── CH1_LVL_3 d1=0.72 [0, 9] - │ └── CH2_LVL_3 d1= [1] + │ └── CH2_LVL_3 d1=0.08 [1] └── CH2_LVL_2 d1=0.74 d2=0.73 [NOT ACTIVE] ├── CH1_LVL_3 d1=0.71 [5, 6] └── CH2_LVL_3 d1=0.71 [7, 8] @@ -119,7 +91,8 @@ class ODAC(base.Clusterer): References ---------- - [^1]: [Hierarchical clustering of time-series data streams.](http://doi.org/10.1109/TKDE.2007.190727) + [^1]: P. P. Rodrigues, J. Gama and J. Pedroso, "Hierarchical Clustering of Time-Series Data Streams" in IEEE Transactions + on Knowledge and Data Engineering, vol. 20, no. 5, pp. 615-627, May 2008, doi: 10.1109/TKDE.2007.190727. """ @@ -231,7 +204,7 @@ def learn_one(self, x: dict): # Time to time approach if self._update_timer == 0: # Calculate all the crucial variables to the next procedure - leaf.calculate_coefficients(self.confidence_level) + leaf.calculate_coefficients(confidence_level=self.confidence_level) if leaf.test_aggregate() or leaf.test_split(tau=self.tau): # Put the flag change_detected to true to indicate to the user that the structure changed @@ -251,10 +224,10 @@ def predict_one(self, x: dict): A dictionary of features. """ - raise NotImplementedError("ODAC does not predict anything. It builds a hierarchical cluster's structure.") + raise NotImplementedError() - def draw(self, n_decimal_places: int = 2) -> str: - """Method to draw the hierarchical cluster's structure. + def render_ascii(self, n_decimal_places: int = 2) -> str: + """Method to render the hierarchical cluster's structure in text format. Parameters ---------- @@ -268,11 +241,114 @@ def draw(self, n_decimal_places: int = 2) -> str: return self._root_node.design_structure(n_decimal_places).rstrip("\n") + def draw(self, max_depth: int | None = None, show_clusters_info: list[str] = ["timeseries_names", "d1", "d2", "e"], n_decimal_places: int = 2): + """Method to draw the hierarchical cluster's structure as a Graphviz graph. + + Parameters + ---------- + max_depth + The maximum depth of the tree to display. + show_clusters_info + List of cluster information to show. Valid options are: + - "timeseries_indexes": Shows the indexes of the timeseries. + - "timeseries_names": Shows the names of the timeseries. + - "d1": Shows the d1 distance. + - "d2": Shows the d2 distance. + - "e": Shows the error measure. + n_decimal_places + The number of decimal places to show for numerical values. + + """ + if not (n_decimal_places > 0 and n_decimal_places < 10): + raise ValueError("n_decimal_places must be between 1 and 9.") + + try: + import graphviz + except ImportError as e: + raise ValueError("You have to install graphviz to use the draw method.") from e + + counter = 0 + + dot = graphviz.Digraph( + graph_attr={"splines": "ortho", "forcelabels": "true", "overlap": "false"}, + node_attr={ + "shape": "box", + "penwidth": "1.2", + "fontname": "trebuchet", + "fontsize": "11", + "margin": "0.1,0.0", + }, + edge_attr={"penwidth": "0.6", "center": "true", "fontsize": "7 "}, + ) + + def iterate(node: ODACCluster, parent_node: str | None = None, depth: int = 0): + nonlocal counter, max_depth, show_clusters_info, n_decimal_places + + if max_depth is not None and depth > max_depth: + return + + node_n = str(counter) + counter += 1 + + label = "" + + show_clusters_info_copy = show_clusters_info.copy() + + # checks + if len(show_clusters_info_copy) > 0: + if "timeseries_indexes" in show_clusters_info_copy: + # Convert timeseries names to indexes + name_to_index = {name: index for index, name in enumerate(self._root_node.timeseries_names)} + timeseries_indexes = [name_to_index[_name] for _name in node.timeseries_names if _name in name_to_index] + + label += f"{timeseries_indexes}" + show_clusters_info_copy.remove("timeseries_indexes") + if len(show_clusters_info_copy) > 0: + label += "\n" + if "timeseries_names" in show_clusters_info_copy: + label += f"{node.timeseries_names}" + show_clusters_info_copy.remove("timeseries_names") + if len(show_clusters_info_copy) > 0: + label += "\n" + if "d1" in show_clusters_info_copy: + if node.d1 is not None: + label += f"d1={node.d1:.{n_decimal_places}f}" + else: + label += "d1=" + show_clusters_info_copy.remove("d1") + if len(show_clusters_info_copy) > 0: + label += "\n" + if "d2" in show_clusters_info_copy and node.d2 is not None: + label += f"d2={node.d2:.{n_decimal_places}f}" + show_clusters_info_copy.remove("d2") + if len(show_clusters_info_copy) > 0: + label += "\n" + if "e" in show_clusters_info_copy: + label += f"e={node.e:.{n_decimal_places}f}" + + show_clusters_info_copy.clear() + + # Creates a node with different colors to differentiate the active clusters from the non-active + if node.active: + dot.node(node_n, label, style="filled", fillcolor="#76b5c5") + else: + dot.node(node_n, label, style="filled", fillcolor="#f2f2f2") + + if parent_node is not None: + dot.edge(parent_node, node_n) + + if node.children is not None: + iterate(node=node.children.first, parent_node=node_n, depth=depth + 1) + iterate(node.children.second, parent_node=node_n, depth=depth + 1) + + iterate(node=self._root_node) + + return dot + @property def structure_changed(self) -> bool: return self._structure_changed - class ODACCluster(base.Base): """Cluster class for representing individual clusters.""" @@ -284,7 +360,7 @@ def __init__(self, name: str, parent: ODACCluster | None = None): self.children: ODACChildren | None = None self.timeseries_names: list[typing.Hashable] = [] - self._statistics: dict[tuple[typing.Hashable, typing.Hashable], stats.PearsonCorr] | None + self._statistics: dict[tuple[typing.Hashable, typing.Hashable], stats.PearsonCorr] | stats.Var self.d1: float | None = None self.d2: float | None = None @@ -348,14 +424,14 @@ def __str__(self) -> str: def __repr__(self) -> str: return self.design_structure() - def _init_stats(self) -> dict[tuple[typing.Hashable, typing.Hashable], stats.PearsonCorr]: + def _init_stats(self) -> dict[tuple[typing.Hashable, typing.Hashable], stats.PearsonCorr] | stats.Var: return collections.defaultdict( stats.PearsonCorr, { (k1, k2): stats.PearsonCorr() for k1, k2 in itertools.combinations(self.timeseries_names, 2) }, - ) + ) if len(self.timeseries_names) > 1 else stats.Var() # TODO: not sure if this is the best design def __call__(self, ts_names: list[typing.Hashable]): @@ -364,12 +440,15 @@ def __call__(self, ts_names: list[typing.Hashable]): self._statistics = self._init_stats() def update_statistics(self, x: dict) -> None: - # For each pair of time-series in the cluster update the correlation - # values with the data received - for (k1, k2), item in self._statistics.items(): # type: ignore - if x.get(k1, None) is None or x.get(k2, None) is None: - continue - item.update(float(x[k1]), float(x[k2])) + if len(self.timeseries_names) > 1: + # For each pair of time-series in the cluster update the correlation + # values with the data received + for (k1, k2), item in self._statistics.items(): # type: ignore + if x.get(k1, None) is None or x.get(k2, None) is None: + continue + item.update(float(x[k1]), float(x[k2])) + else: + self._statistics.update(float(x.get(self.timeseries_names[0]))) # type: ignore # Increment the number of observation in the cluster self.n += 1 @@ -380,16 +459,17 @@ def _calculate_rnomc_dict(self)-> dict[tuple[typing.Hashable, typing.Hashable], rnomc_dict = {} for k1, k2 in itertools.combinations(self.timeseries_names, 2): - rnomc_dict[(k1, k2)] = math.sqrt((1 - self._statistics[(k1, k2)].get()) / 2.0) # type: ignore + value = abs((1 - self._statistics[(k1, k2)].get()) / 2.0) # type: ignore + rnomc_dict[(k1, k2)] = math.sqrt(value) return rnomc_dict # Method to calculate coefficients for splitting or aggregation def calculate_coefficients(self, confidence_level: float) -> None: - # Get the rnomc values - rnomc_dict = self._calculate_rnomc_dict() + if len(self.timeseries_names) > 1: + # Get the rnomc values + rnomc_dict = self._calculate_rnomc_dict() - if len(rnomc_dict) > 0: # Get the average distance in the cluster self.avg = sum(rnomc_dict.values()) / self.n @@ -405,13 +485,13 @@ def calculate_coefficients(self, confidence_level: float) -> None: self.pivot_2, self.d2 = max(remaining.items(), key=lambda x: x[1]) else: self.pivot_2 = self.d2 = None # type: ignore + else: + self.d1 = self._statistics.get() # type: ignore + # Calculate the Hoeffding bound in the cluster + self.e = math.sqrt(math.log(1 / confidence_level) / (2 * self.n)) - # Calculate the Hoeffding bound in the cluster - self.e = math.sqrt(math.log(1 / confidence_level) / (2 * self.n)) - + # Method that gives the closest cluster where the current time series is located def _get_closest_cluster(self, pivot_1, pivot_2, current, rnormc_dict: dict) -> int: - """Method that gives the closest cluster where the current time series is located.""" - dist_1 = rnormc_dict.get((min(pivot_1, current), max(pivot_1, current)), 0) dist_2 = rnormc_dict.get((min(pivot_2, current), max(pivot_2, current)), 0) return 2 if dist_1 >= dist_2 else 1 @@ -445,7 +525,6 @@ def _split_this_cluster(self, pivot_1: typing.Hashable, pivot_2: typing.Hashable # Set the active flag to false. Since this cluster is not an active cluster anymore. self.active = False self.avg = self.d0 = self.pivot_0 = self.pivot_1 = self.pivot_2 = None # type: ignore - self._statistics = None # Method that proceeds to merge on this cluster def _aggregate_this_cluster(self): @@ -485,7 +564,6 @@ def test_aggregate(self): return True return False - class ODACChildren(base.Base): """Children class representing child clusters."""