Skip to content

Commit

Permalink
Automatically detect deleted resources
Browse files Browse the repository at this point in the history
While executing some Gnocchi optimizations (#1307), we noticed that some deleted/removed resources do not have the "ended_at" field with a datetime. This can cause slowness with time, as more and more "zombie" resources are left there, and this has a direct impact in the MySQL queries executed with the aggregates API.

This patch introduces a new parameter called `metric_inactive_after`, which defines for how long a metric can go without receiving new datapoints until we consider it as inactive. Then, when all metrics of a resource are in inactive state, we can mark/consider the resource as removed.
  • Loading branch information
rafaelweingartner committed Jul 3, 2024
1 parent 57b9693 commit 6e76708
Show file tree
Hide file tree
Showing 10 changed files with 183 additions and 10 deletions.
75 changes: 75 additions & 0 deletions gnocchi/chef.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

import daiquiri
import random
import datetime

from gnocchi import carbonara
from gnocchi import indexer
from gnocchi import utils

LOG = daiquiri.getLogger(__name__)

Expand Down Expand Up @@ -51,6 +53,79 @@ def __init__(self, coord, incoming, index, storage):
self.index = index
self.storage = storage

def resource_ended_at_normalization(self, metric_inactive_after):
"""Marks resources as ended at if needed.
This method will check all metrics that have not received new
datapoints after a given period. The period is defined by
'metric_inactive_after' parameter. If all metrics of resource are in
inactive state, we mark the ended_at field with a timestmap. Therefore,
we consider that the resource has ceased existing.
In this process we will handle only metrics that are considered as
inactive, according to `metric_inactive_after` parameter. Therefore,
we do not need to lock these metrics while processing, as they are
inactive, and chances are that they will not receive measures anymore.
"""

momment_now = utils.utcnow()
momment = momment_now - datetime.timedelta(
seconds=metric_inactive_after)

inactive_metrics = self.index.list_metrics(
attribute_filter={"<": {
"last_measure_timestamp": momment}},
resource_policy_filter={"==": {"ended_at": None}}
)

LOG.debug("Inactive metrics found for processing: [%s].",
inactive_metrics)

metrics_by_resource_id = {}
for metric in inactive_metrics:
resource_id = metric.resource_id
if metrics_by_resource_id.get(resource_id) is None:
metrics_by_resource_id[resource_id] = []

metrics_by_resource_id[resource_id].append(metric)

for resource_id in metrics_by_resource_id.keys():
if resource_id is None:
LOG.debug("We do not need to process inactive metrics that do "
"not have resource. Therefore, these metrics [%s] "
"will be considered inactive, but there is nothing "
"else we can do in this process.",
metrics_by_resource_id[resource_id])
continue
resource = self.index.get_resource(
"generic", resource_id, with_metrics=True)
resource_metrics = resource.metrics
resource_inactive_metrics = metrics_by_resource_id.get(resource_id)

all_metrics_are_inactive = True
for m in resource_metrics:
if m not in resource_inactive_metrics:
all_metrics_are_inactive = False
LOG.debug("Not all metrics of resource [%s] are inactive. "
"Metric [%s] is not inactive. The inactive "
"metrics are [%s].",
resource, m, resource_inactive_metrics)
break

if all_metrics_are_inactive:
LOG.info("All metrics [%s] of resource [%s] are inactive."
"Therefore, we will mark it as finished with an"
"ended at timestmap.", resource_metrics, resource)
if resource.ended_at is not None:
LOG.debug(
"Resource [%s] already has an ended at value.", resource)
else:
LOG.info("Marking ended at timestamp for resource "
"[%s] because all of its metrics are inactive.",
resource)
self.index.update_resource(
"generic", resource_id, ended_at=momment_now)

def clean_raw_data_inactive_metrics(self):
"""Cleans metrics raw data if they are inactive.
Expand Down
11 changes: 11 additions & 0 deletions gnocchi/cli/metricd.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,17 @@ def _run_job(self):
LOG.debug("Finished the cleaning of raw data points for metrics that "
"are no longer receiving measures.")

if (self.conf.metricd.metric_inactive_after and
self.conf.metricd.metric_inactive_after > 0):
LOG.debug("Starting resource ended at field normalization.")
self.chef.resource_ended_at_normalization(
self.conf.metricd.metric_inactive_after)
LOG.debug("Finished resource ended at field normalization.")
else:
LOG.debug("Resource ended at field normalization is not "
"activated. See 'metric_inactive_after' parameter if "
"you wish to activate it.")


class MetricdServiceManager(cotyledon.ServiceManager):
def __init__(self, conf):
Expand Down
6 changes: 5 additions & 1 deletion gnocchi/indexer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,11 @@ def update_backwindow_changed_for_metrics_archive_policy(
raise exceptions.NotImplementedError

@staticmethod
def update_needs_raw_data_truncation(metric_id):
def update_needs_raw_data_truncation(metric_id, value):
raise exceptions.NotImplementedError

@staticmethod
def update_last_measure_timestmap(metric_id):
raise exceptions.NotImplementedError

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,15 @@
# under the License.
#

"""create metric truncation status column
"""Create metric truncation status column
Revision ID: 18fff4509e3e
Revises: 04eba72e4f90
Create Date: 2024-04-24 09:16:00
"""
import datetime

from alembic import op
from sqlalchemy.sql import func

import sqlalchemy

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Copyright 2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#

"""Create last measure push timestamp column
Revision ID: f89ed2e3c2ec
Revises: 18fff4509e3e
Create Date: 2024-04-24 09:16:00
"""

from alembic import op

import sqlalchemy

from sqlalchemy.sql import func

# revision identifiers, used by Alembic.
revision = 'f89ed2e3c2ec'
down_revision = '18fff4509e3e'
branch_labels = None
depends_on = None


def upgrade():
op.add_column(
"metric", sqlalchemy.Column(
"last_measure_timestamp", sqlalchemy.DateTime,
nullable=False, server_default=func.current_timestamp()))
7 changes: 7 additions & 0 deletions gnocchi/indexer/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -1403,6 +1403,13 @@ def update_needs_raw_data_truncation(self, metrid_id, value=False):
if session.execute(stmt).rowcount == 0:
raise indexer.NoSuchMetric(metrid_id)

def update_last_measure_timestmap(self, metrid_id):
with self.facade.writer() as session:
stmt = update(Metric).filter(Metric.id == metrid_id).values(
last_measure_timestamp=datetime.datetime.utcnow())
if session.execute(stmt).rowcount == 0:
raise indexer.NoSuchMetric(metrid_id)

def update_backwindow_changed_for_metrics_archive_policy(
self, archive_policy_name):
with self.facade.writer() as session:
Expand Down
15 changes: 13 additions & 2 deletions gnocchi/indexer/sqlalchemy_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import sqlalchemy
from sqlalchemy.ext import declarative
from sqlalchemy.orm import declarative_base
from sqlalchemy.sql import func

import sqlalchemy_utils

Expand Down Expand Up @@ -113,6 +114,14 @@ class Metric(Base, GnocchiBase, indexer.Metric):
nullable=False, default=True,
server_default=sqlalchemy.sql.true())

# Timestamp that represents when the last measure push was received for the
# given metric. This allows us to identify when a metric ceased receiving
# measurements; thus, if all metric for a resource are in this situation,
# chances are that the resource ceased existing in the backend.
last_measure_timestamp = sqlalchemy.Column(
"last_measure_timestamp", sqlalchemy.DateTime,
nullable=False, server_default=func.current_timestamp())

def jsonify(self):
d = {
"id": self.id,
Expand Down Expand Up @@ -256,7 +265,8 @@ def type(cls):
creator = sqlalchemy.Column(sqlalchemy.String(255))
started_at = sqlalchemy.Column(types.TimestampUTC, nullable=False,
default=lambda: utils.utcnow())
revision_start = sqlalchemy.Column(types.TimestampUTC, nullable=False,
revision_start = sqlalchemy.Column(types.TimestampUTC,
nullable=False,
default=lambda: utils.utcnow())
ended_at = sqlalchemy.Column(types.TimestampUTC)
user_id = sqlalchemy.Column(sqlalchemy.String(255))
Expand Down Expand Up @@ -298,7 +308,8 @@ class ResourceHistory(ResourceMixin, Base, GnocchiBase):
ondelete="CASCADE",
name="fk_rh_id_resource_id"),
nullable=False)
revision_end = sqlalchemy.Column(types.TimestampUTC, nullable=False,
revision_end = sqlalchemy.Column(types.TimestampUTC,
nullable=False,
default=lambda: utils.utcnow())
metrics = sqlalchemy.orm.relationship(
Metric, primaryjoin="Metric.resource_id == ResourceHistory.id",
Expand Down
14 changes: 11 additions & 3 deletions gnocchi/opts.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ def __getitem__(self, key):
for opt in _INCOMING_OPTS:
opt.default = '${storage.%s}' % opt.name


API_OPTS = (
cfg.HostAddressOpt('host',
default="0.0.0.0",
Expand All @@ -73,7 +72,7 @@ def __getitem__(self, key):
but not chunked encoding (InfluxDB)
* http-socket/socket: support chunked encoding, but require a upstream HTTP
Server for HTTP/1.1, keepalive and HTTP protocol correctness.
""")
"""),
)


Expand Down Expand Up @@ -172,7 +171,16 @@ def list_opts():
default=10000,
min=1,
help="Number of metrics that should be deleted "
"simultaneously by one janitor.")
"simultaneously by one janitor."),
cfg.IntOpt('metric_inactive_after',
default=0,
help="Number of seconds to wait before we consider a "
"metric inactive. An inactive metric is a metric "
"that has not received new measurements for a "
"given period. If all metrics of a resource are "
"inactive, we mark the resource with the "
"'ended_at' timestamp. The default is 0 (zero), "
"which means that we never execute process.")
)),
("api", (
cfg.StrOpt('paste_config',
Expand Down
20 changes: 20 additions & 0 deletions gnocchi/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,26 @@ def _map_compute_splits_operations(bound_timeserie):
if metric.needs_raw_data_truncation:
indexer_driver.update_needs_raw_data_truncation(metric.id)

# Mark when the metric receives its latest measures
indexer_driver.update_last_measure_timestmap(metric.id)

resource_id = metric.resource_id
if resource_id:
resource = indexer_driver.get_resource('generic', resource_id)
LOG.debug("Checking if resource [%s] of metric [%s] with "
"resource ID [%s] needs to be 'undeleted.'",
resource, metric.id, resource_id)
if resource.ended_at is not None:
LOG.info("Resource [%s] was marked with a timestamp for the "
"'ended_at' field. However, it received a "
"measurement for metric [%s]. Therefore, we undelete "
"it.", resource, metric)
indexer_driver.update_resource(
"generic", resource_id, ended_at=None)
else:
LOG.debug("Metric [%s] does not have a resource "
"assigned to it.", metric)

with self.statistics.time("splits delete"):
self._delete_metric_splits(splits_to_delete)
self.statistics["splits delete"] += len(splits_to_delete)
Expand Down
2 changes: 1 addition & 1 deletion run-upgrade-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ export GNOCCHI_USER=$GNOCCHI_USER_ID
# needs to be released. Otherwise, the logs stop to be writen, and the
# execution of the code is "frozen", due to the lack of buffer in the
# process output. To work around that, we can read the buffer, and dump it
# into a lof file. Then, we can cat the log file content at the end of the
# into a log file. Then, we can cat the log file content at the end of the
# process.
UWSGI_LOG_FILE=/tmp/uwsgi-new-version.log
METRICD_LOG_FILE=/tmp/gnocchi-metricd-new-version.log
Expand Down

0 comments on commit 6e76708

Please sign in to comment.