From 6e76708db4533e76ef18f2a1d852b2c9b1921eea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20Weing=C3=A4rtner?= Date: Thu, 23 May 2024 10:42:05 -0300 Subject: [PATCH] Automatically detect deleted resources While executing some Gnocchi optimizations (https://github.com/gnocchixyz/gnocchi/pull/1307), we noticed that some deleted/removed resources do not have the "ended_at" field with a datetime. This can cause slowness with time, as more and more "zombie" resources are left there, and this has a direct impact in the MySQL queries executed with the aggregates API. This patch introduces a new parameter called `metric_inactive_after`, which defines for how long a metric can go without receiving new datapoints until we consider it as inactive. Then, when all metrics of a resource are in inactive state, we can mark/consider the resource as removed. --- gnocchi/chef.py | 75 +++++++++++++++++++ gnocchi/cli/metricd.py | 11 +++ gnocchi/indexer/__init__.py | 6 +- ...n_for_truncate_inactive_metrics_process.py | 4 +- ...ec_create_last_measure_timestamp_column.py | 39 ++++++++++ gnocchi/indexer/sqlalchemy.py | 7 ++ gnocchi/indexer/sqlalchemy_base.py | 15 +++- gnocchi/opts.py | 14 +++- gnocchi/storage/__init__.py | 20 +++++ run-upgrade-tests.sh | 2 +- 10 files changed, 183 insertions(+), 10 deletions(-) create mode 100644 gnocchi/indexer/alembic/versions/f89ed2e3c2ec_create_last_measure_timestamp_column.py diff --git a/gnocchi/chef.py b/gnocchi/chef.py index 9c71bb1fd..36ed9bf85 100644 --- a/gnocchi/chef.py +++ b/gnocchi/chef.py @@ -18,9 +18,11 @@ import daiquiri import random +import datetime from gnocchi import carbonara from gnocchi import indexer +from gnocchi import utils LOG = daiquiri.getLogger(__name__) @@ -51,6 +53,79 @@ def __init__(self, coord, incoming, index, storage): self.index = index self.storage = storage + def resource_ended_at_normalization(self, metric_inactive_after): + """Marks resources as ended at if needed. + + This method will check all metrics that have not received new + datapoints after a given period. The period is defined by + 'metric_inactive_after' parameter. If all metrics of resource are in + inactive state, we mark the ended_at field with a timestmap. Therefore, + we consider that the resource has ceased existing. + + In this process we will handle only metrics that are considered as + inactive, according to `metric_inactive_after` parameter. Therefore, + we do not need to lock these metrics while processing, as they are + inactive, and chances are that they will not receive measures anymore. + """ + + momment_now = utils.utcnow() + momment = momment_now - datetime.timedelta( + seconds=metric_inactive_after) + + inactive_metrics = self.index.list_metrics( + attribute_filter={"<": { + "last_measure_timestamp": momment}}, + resource_policy_filter={"==": {"ended_at": None}} + ) + + LOG.debug("Inactive metrics found for processing: [%s].", + inactive_metrics) + + metrics_by_resource_id = {} + for metric in inactive_metrics: + resource_id = metric.resource_id + if metrics_by_resource_id.get(resource_id) is None: + metrics_by_resource_id[resource_id] = [] + + metrics_by_resource_id[resource_id].append(metric) + + for resource_id in metrics_by_resource_id.keys(): + if resource_id is None: + LOG.debug("We do not need to process inactive metrics that do " + "not have resource. Therefore, these metrics [%s] " + "will be considered inactive, but there is nothing " + "else we can do in this process.", + metrics_by_resource_id[resource_id]) + continue + resource = self.index.get_resource( + "generic", resource_id, with_metrics=True) + resource_metrics = resource.metrics + resource_inactive_metrics = metrics_by_resource_id.get(resource_id) + + all_metrics_are_inactive = True + for m in resource_metrics: + if m not in resource_inactive_metrics: + all_metrics_are_inactive = False + LOG.debug("Not all metrics of resource [%s] are inactive. " + "Metric [%s] is not inactive. The inactive " + "metrics are [%s].", + resource, m, resource_inactive_metrics) + break + + if all_metrics_are_inactive: + LOG.info("All metrics [%s] of resource [%s] are inactive." + "Therefore, we will mark it as finished with an" + "ended at timestmap.", resource_metrics, resource) + if resource.ended_at is not None: + LOG.debug( + "Resource [%s] already has an ended at value.", resource) + else: + LOG.info("Marking ended at timestamp for resource " + "[%s] because all of its metrics are inactive.", + resource) + self.index.update_resource( + "generic", resource_id, ended_at=momment_now) + def clean_raw_data_inactive_metrics(self): """Cleans metrics raw data if they are inactive. diff --git a/gnocchi/cli/metricd.py b/gnocchi/cli/metricd.py index a22e1646b..f3c488a1f 100644 --- a/gnocchi/cli/metricd.py +++ b/gnocchi/cli/metricd.py @@ -278,6 +278,17 @@ def _run_job(self): LOG.debug("Finished the cleaning of raw data points for metrics that " "are no longer receiving measures.") + if (self.conf.metricd.metric_inactive_after and + self.conf.metricd.metric_inactive_after > 0): + LOG.debug("Starting resource ended at field normalization.") + self.chef.resource_ended_at_normalization( + self.conf.metricd.metric_inactive_after) + LOG.debug("Finished resource ended at field normalization.") + else: + LOG.debug("Resource ended at field normalization is not " + "activated. See 'metric_inactive_after' parameter if " + "you wish to activate it.") + class MetricdServiceManager(cotyledon.ServiceManager): def __init__(self, conf): diff --git a/gnocchi/indexer/__init__.py b/gnocchi/indexer/__init__.py index 1949cd63a..7416948e8 100644 --- a/gnocchi/indexer/__init__.py +++ b/gnocchi/indexer/__init__.py @@ -446,7 +446,11 @@ def update_backwindow_changed_for_metrics_archive_policy( raise exceptions.NotImplementedError @staticmethod - def update_needs_raw_data_truncation(metric_id): + def update_needs_raw_data_truncation(metric_id, value): + raise exceptions.NotImplementedError + + @staticmethod + def update_last_measure_timestmap(metric_id): raise exceptions.NotImplementedError @staticmethod diff --git a/gnocchi/indexer/alembic/versions/18fff4509e3e_create_column_for_truncate_inactive_metrics_process.py b/gnocchi/indexer/alembic/versions/18fff4509e3e_create_column_for_truncate_inactive_metrics_process.py index d67bb6064..18a2f1910 100644 --- a/gnocchi/indexer/alembic/versions/18fff4509e3e_create_column_for_truncate_inactive_metrics_process.py +++ b/gnocchi/indexer/alembic/versions/18fff4509e3e_create_column_for_truncate_inactive_metrics_process.py @@ -13,17 +13,15 @@ # under the License. # -"""create metric truncation status column +"""Create metric truncation status column Revision ID: 18fff4509e3e Revises: 04eba72e4f90 Create Date: 2024-04-24 09:16:00 """ -import datetime from alembic import op -from sqlalchemy.sql import func import sqlalchemy diff --git a/gnocchi/indexer/alembic/versions/f89ed2e3c2ec_create_last_measure_timestamp_column.py b/gnocchi/indexer/alembic/versions/f89ed2e3c2ec_create_last_measure_timestamp_column.py new file mode 100644 index 000000000..4f503e779 --- /dev/null +++ b/gnocchi/indexer/alembic/versions/f89ed2e3c2ec_create_last_measure_timestamp_column.py @@ -0,0 +1,39 @@ +# Copyright 2015 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +"""Create last measure push timestamp column +Revision ID: f89ed2e3c2ec +Revises: 18fff4509e3e +Create Date: 2024-04-24 09:16:00 +""" + +from alembic import op + +import sqlalchemy + +from sqlalchemy.sql import func + +# revision identifiers, used by Alembic. +revision = 'f89ed2e3c2ec' +down_revision = '18fff4509e3e' +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column( + "metric", sqlalchemy.Column( + "last_measure_timestamp", sqlalchemy.DateTime, + nullable=False, server_default=func.current_timestamp())) diff --git a/gnocchi/indexer/sqlalchemy.py b/gnocchi/indexer/sqlalchemy.py index 4028bca90..57b811ffb 100644 --- a/gnocchi/indexer/sqlalchemy.py +++ b/gnocchi/indexer/sqlalchemy.py @@ -1403,6 +1403,13 @@ def update_needs_raw_data_truncation(self, metrid_id, value=False): if session.execute(stmt).rowcount == 0: raise indexer.NoSuchMetric(metrid_id) + def update_last_measure_timestmap(self, metrid_id): + with self.facade.writer() as session: + stmt = update(Metric).filter(Metric.id == metrid_id).values( + last_measure_timestamp=datetime.datetime.utcnow()) + if session.execute(stmt).rowcount == 0: + raise indexer.NoSuchMetric(metrid_id) + def update_backwindow_changed_for_metrics_archive_policy( self, archive_policy_name): with self.facade.writer() as session: diff --git a/gnocchi/indexer/sqlalchemy_base.py b/gnocchi/indexer/sqlalchemy_base.py index 7880b5555..c6ea85181 100644 --- a/gnocchi/indexer/sqlalchemy_base.py +++ b/gnocchi/indexer/sqlalchemy_base.py @@ -19,6 +19,7 @@ import sqlalchemy from sqlalchemy.ext import declarative from sqlalchemy.orm import declarative_base +from sqlalchemy.sql import func import sqlalchemy_utils @@ -113,6 +114,14 @@ class Metric(Base, GnocchiBase, indexer.Metric): nullable=False, default=True, server_default=sqlalchemy.sql.true()) + # Timestamp that represents when the last measure push was received for the + # given metric. This allows us to identify when a metric ceased receiving + # measurements; thus, if all metric for a resource are in this situation, + # chances are that the resource ceased existing in the backend. + last_measure_timestamp = sqlalchemy.Column( + "last_measure_timestamp", sqlalchemy.DateTime, + nullable=False, server_default=func.current_timestamp()) + def jsonify(self): d = { "id": self.id, @@ -256,7 +265,8 @@ def type(cls): creator = sqlalchemy.Column(sqlalchemy.String(255)) started_at = sqlalchemy.Column(types.TimestampUTC, nullable=False, default=lambda: utils.utcnow()) - revision_start = sqlalchemy.Column(types.TimestampUTC, nullable=False, + revision_start = sqlalchemy.Column(types.TimestampUTC, + nullable=False, default=lambda: utils.utcnow()) ended_at = sqlalchemy.Column(types.TimestampUTC) user_id = sqlalchemy.Column(sqlalchemy.String(255)) @@ -298,7 +308,8 @@ class ResourceHistory(ResourceMixin, Base, GnocchiBase): ondelete="CASCADE", name="fk_rh_id_resource_id"), nullable=False) - revision_end = sqlalchemy.Column(types.TimestampUTC, nullable=False, + revision_end = sqlalchemy.Column(types.TimestampUTC, + nullable=False, default=lambda: utils.utcnow()) metrics = sqlalchemy.orm.relationship( Metric, primaryjoin="Metric.resource_id == ResourceHistory.id", diff --git a/gnocchi/opts.py b/gnocchi/opts.py index 1d8b7bf72..c20031f91 100644 --- a/gnocchi/opts.py +++ b/gnocchi/opts.py @@ -57,7 +57,6 @@ def __getitem__(self, key): for opt in _INCOMING_OPTS: opt.default = '${storage.%s}' % opt.name - API_OPTS = ( cfg.HostAddressOpt('host', default="0.0.0.0", @@ -73,7 +72,7 @@ def __getitem__(self, key): but not chunked encoding (InfluxDB) * http-socket/socket: support chunked encoding, but require a upstream HTTP Server for HTTP/1.1, keepalive and HTTP protocol correctness. -""") +"""), ) @@ -172,7 +171,16 @@ def list_opts(): default=10000, min=1, help="Number of metrics that should be deleted " - "simultaneously by one janitor.") + "simultaneously by one janitor."), + cfg.IntOpt('metric_inactive_after', + default=0, + help="Number of seconds to wait before we consider a " + "metric inactive. An inactive metric is a metric " + "that has not received new measurements for a " + "given period. If all metrics of a resource are " + "inactive, we mark the resource with the " + "'ended_at' timestamp. The default is 0 (zero), " + "which means that we never execute process.") )), ("api", ( cfg.StrOpt('paste_config', diff --git a/gnocchi/storage/__init__.py b/gnocchi/storage/__init__.py index e41d150a4..04c218a28 100644 --- a/gnocchi/storage/__init__.py +++ b/gnocchi/storage/__init__.py @@ -688,6 +688,26 @@ def _map_compute_splits_operations(bound_timeserie): if metric.needs_raw_data_truncation: indexer_driver.update_needs_raw_data_truncation(metric.id) + # Mark when the metric receives its latest measures + indexer_driver.update_last_measure_timestmap(metric.id) + + resource_id = metric.resource_id + if resource_id: + resource = indexer_driver.get_resource('generic', resource_id) + LOG.debug("Checking if resource [%s] of metric [%s] with " + "resource ID [%s] needs to be 'undeleted.'", + resource, metric.id, resource_id) + if resource.ended_at is not None: + LOG.info("Resource [%s] was marked with a timestamp for the " + "'ended_at' field. However, it received a " + "measurement for metric [%s]. Therefore, we undelete " + "it.", resource, metric) + indexer_driver.update_resource( + "generic", resource_id, ended_at=None) + else: + LOG.debug("Metric [%s] does not have a resource " + "assigned to it.", metric) + with self.statistics.time("splits delete"): self._delete_metric_splits(splits_to_delete) self.statistics["splits delete"] += len(splits_to_delete) diff --git a/run-upgrade-tests.sh b/run-upgrade-tests.sh index c9b7456a8..40835e742 100755 --- a/run-upgrade-tests.sh +++ b/run-upgrade-tests.sh @@ -107,7 +107,7 @@ export GNOCCHI_USER=$GNOCCHI_USER_ID # needs to be released. Otherwise, the logs stop to be writen, and the # execution of the code is "frozen", due to the lack of buffer in the # process output. To work around that, we can read the buffer, and dump it -# into a lof file. Then, we can cat the log file content at the end of the +# into a log file. Then, we can cat the log file content at the end of the # process. UWSGI_LOG_FILE=/tmp/uwsgi-new-version.log METRICD_LOG_FILE=/tmp/gnocchi-metricd-new-version.log