Skip to content

Commit

Permalink
Automatically detect deleted resources
Browse files Browse the repository at this point in the history
While executing some Gnocchi optimizations (#1307), we noticed that some deleted/removed resources do not have the "ended_at" field with a datetime. This can cause slowness with time, as more and more "zombie" resources are left there, and this has a direct impact in the MySQL queries executed with the aggregates API.

This patch introduces a new parameter called `metric_inactive_after`, which defines for how long a metric can go without receiving new datapoints until we consider it as inactive. Then, when all metrics of a resource are in inactive state, we can mark/consider the resource as removed.
  • Loading branch information
rafaelweingartner committed May 28, 2024
1 parent 3bb95db commit 9d08cc0
Show file tree
Hide file tree
Showing 10 changed files with 161 additions and 8 deletions.
68 changes: 68 additions & 0 deletions gnocchi/chef.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import hashlib

import daiquiri
import datetime

from gnocchi import carbonara
from gnocchi import indexer
Expand Down Expand Up @@ -50,6 +51,73 @@ def __init__(self, coord, incoming, index, storage):
self.index = index
self.storage = storage

def resource_ended_at_normalization(self, metric_inactive_after):
"""Marks resources as ended at if needed.
This method will check all metrics that have not received new
datapoints after a given period. The period is defined by
'metric_inactive_after' parameter. If all metrics of resource are in
inactive state, we mark the ended_at field with a timestmap. Therefore,
we consider that the resource has ceased existing.
In this process we will handle only metrics that are considered as
inactive, according to `metric_inactive_after` parameter. Therefore,
we do not need to lock these metrics while processing, as they are
inactive, and chances are that they will not receive measures anymore.
We will not "undelete" the resource, if it starts receiving measures
again. If there will ever be a case like this, we need to implement
some workflow to handle it.
"""

momment_now = datetime.datetime.utcnow()
momment = momment_now - datetime.timedelta(
seconds=metric_inactive_after)

inactive_metrics = self.index.list_metrics(
attribute_filter={"<": {
"last_measure_timestamp": momment}},
resource_policy_filter={"==": {"ended_at": None}}
)

LOG.debug("Inactive metrics found for processing: [%s].",
inactive_metrics)

metrics_by_resource_id = {}
for metric in inactive_metrics:
resource_id = metric.resource_id
if metrics_by_resource_id.get(resource_id) is None:
metrics_by_resource_id[resource_id] = []

metrics_by_resource_id[resource_id].append(metric)

for resource_id in metrics_by_resource_id.keys():
resource = self.index.get_resource(
"generic", resource_id, with_metrics=True)
resource_metrics = resource.metrics
resource_inactive_metrics = metrics_by_resource_id.get(resource_id)

all_metrics_are_inactive = True
for m in resource_metrics:
if m not in resource_inactive_metrics:
all_metrics_are_inactive = False
LOG.debug("Not all metrics of resource [%s] are inactive. "
"Metric [%s] is not inactive.", resource, m)
break

if all_metrics_are_inactive:
LOG.info("All metrics [%s] of resource [%s] are inactive."
"Therefore, we will mark it as finished with an"
"ended at timestmap.")
if resource.ended_at is not None:
LOG.debug(
"Resource [%s] already has an ended at value.", resource)
else:
LOG.info("Marking ended at timestamp for resource "
"[%s] because all of its metrics are inactive.",
resource)
self.index.update_resource(
"generic", resource_id, ended_at=momment_now)

def clean_raw_data_inactive_metrics(self):
"""Cleans metrics raw data if they are inactive.
Expand Down
5 changes: 4 additions & 1 deletion gnocchi/cli/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ def api():
"No need to pass `--' in gnocchi-api command line anymore, "
"please remove")

uwsgi = spawn.find_executable("uwsgi")
uwsgi = conf.api.uwsgi_path
if not uwsgi:
uwsgi = spawn.find_executable("uwsgi")

if not uwsgi:
LOG.error("Unable to find `uwsgi'.\n"
"Be sure it is installed and in $PATH.")
Expand Down
10 changes: 10 additions & 0 deletions gnocchi/cli/metricd.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,16 @@ def _run_job(self):
LOG.debug("Finished the cleaning of raw data points for metrics that "
"are no longer receiving measures.")

if self.conf.metricd.metric_inactive_after:
LOG.debug("Starting resource ended at field normalization.")
self.chef.resource_ended_at_normalization(
self.conf.metricd.metric_inactive_after)
LOG.debug("Finished resource ended at field normalization.")
else:
LOG.debug("Resource ended at field normalization is not "
"activated. See 'metric_inactive_after' parameter if "
"you wish to activate it.")


class MetricdServiceManager(cotyledon.ServiceManager):
def __init__(self, conf):
Expand Down
6 changes: 5 additions & 1 deletion gnocchi/indexer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,11 @@ def update_backwindow_changed_for_metrics_archive_policy(
raise exceptions.NotImplementedError

@staticmethod
def update_needs_raw_data_truncation(metric_id):
def update_needs_raw_data_truncation(metric_id, value):
raise exceptions.NotImplementedError

@staticmethod
def update_last_measure_timestmap(metric_id):
raise exceptions.NotImplementedError

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,15 @@
# under the License.
#

"""create metric truncation status column
"""Create metric truncation status column
Revision ID: 18fff4509e3e
Revises: 04eba72e4f90
Create Date: 2024-04-24 09:16:00
"""
import datetime

from alembic import op
from sqlalchemy.sql import func

import sqlalchemy

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright 2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#

"""Create last measure push timestamp column
Revision ID: f89ed2e3c2ec
Revises: 18fff4509e3e
Create Date: 2024-04-24 09:16:00
"""

from alembic import op

import datetime
import sqlalchemy

# revision identifiers, used by Alembic.
revision = 'f89ed2e3c2ec'
down_revision = '18fff4509e3e'
branch_labels = None
depends_on = None


def upgrade():
op.add_column(
"metric", sqlalchemy.Column(
"last_measure_timestamp", sqlalchemy.DateTime,
nullable=True, default=datetime.datetime.utcnow()))
7 changes: 7 additions & 0 deletions gnocchi/indexer/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -1403,6 +1403,13 @@ def update_needs_raw_data_truncation(self, metrid_id, value=False):
if session.execute(stmt).rowcount == 0:
raise indexer.NoSuchMetric(metrid_id)

def update_last_measure_timestmap(self, metrid_id):
with self.facade.writer() as session:
stmt = update(Metric).filter(Metric.id == metrid_id).values(
last_measure_timestamp=datetime.datetime.utcnow())
if session.execute(stmt).rowcount == 0:
raise indexer.NoSuchMetric(metrid_id)

def update_backwindow_changed_for_metrics_archive_policy(
self, archive_policy_name):
with self.facade.writer() as session:
Expand Down
9 changes: 9 additions & 0 deletions gnocchi/indexer/sqlalchemy_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime

from oslo_db.sqlalchemy import models
import sqlalchemy
Expand Down Expand Up @@ -113,6 +114,14 @@ class Metric(Base, GnocchiBase, indexer.Metric):
nullable=False, default=True,
server_default=sqlalchemy.sql.true())

# Timestamp that represents when the last measure push was received for the
# given metric. This allows us to identify when a metric ceased receiving
# measurements; thus, if all metric for a resource are in this situation,
# chances are that the resource ceased existing in the backend.
last_measure_timestamp = sqlalchemy.Column(
"last_measure_timestamp", sqlalchemy.DateTime,
nullable=True, default=datetime.datetime.utcnow())

def jsonify(self):
d = {
"id": self.id,
Expand Down
17 changes: 14 additions & 3 deletions gnocchi/opts.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ def __getitem__(self, key):
for opt in _INCOMING_OPTS:
opt.default = '${storage.%s}' % opt.name


API_OPTS = (
cfg.HostAddressOpt('host',
default="0.0.0.0",
Expand All @@ -73,7 +72,10 @@ def __getitem__(self, key):
but not chunked encoding (InfluxDB)
* http-socket/socket: support chunked encoding, but require a upstream HTTP
Server for HTTP/1.1, keepalive and HTTP protocol correctness.
""")
"""),
cfg.StrOpt('uwsgi-path',
default=None,
help="Custom UWSGI path to avoid auto discovery of packages.")
)


Expand Down Expand Up @@ -172,7 +174,16 @@ def list_opts():
default=10000,
min=1,
help="Number of metrics that should be deleted "
"simultaneously by one janitor.")
"simultaneously by one janitor."),
cfg.IntOpt('metric_inactive_after',
default=0,
help="Number of seconds to wait before we consider a "
"metric inactive. An inactive metric is a metric "
"that has not received new measurements for a "
"given period. If all metrics of a resource are "
"inactive, we mark the resource with the "
"'ended_at' timestamp. The default is 0 (zero), "
"which means that we never execute process.")
)),
("api", (
cfg.StrOpt('paste_config',
Expand Down
3 changes: 3 additions & 0 deletions gnocchi/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,9 @@ def _map_compute_splits_operations(bound_timeserie):
if metric.needs_raw_data_truncation:
indexer_driver.update_needs_raw_data_truncation(metric.id)

# Mark when the metric receives its latest measures
indexer_driver.update_last_measure_timestmap(metric.id)

with self.statistics.time("splits delete"):
self._delete_metric_splits(splits_to_delete)
self.statistics["splits delete"] += len(splits_to_delete)
Expand Down

0 comments on commit 9d08cc0

Please sign in to comment.