Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatically detect deleted resources #1386

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
74 changes: 74 additions & 0 deletions gnocchi/chef.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import hashlib

import daiquiri
import random

from collections import defaultdict
from gnocchi import carbonara
from gnocchi import indexer
from gnocchi import utils

LOG = daiquiri.getLogger(__name__)

Expand Down Expand Up @@ -51,6 +54,77 @@ def __init__(self, coord, incoming, index, storage):
self.index = index
self.storage = storage

def resource_ended_at_normalization(self, metric_inactive_after):
"""Marks resources as ended at if needed.

This method will check all metrics that have not received new
datapoints after a given period. The period is defined by
'metric_inactive_after' parameter. If all metrics of resource are in
inactive state, we mark the ended_at field with a timestmap. Therefore,
we consider that the resource has ceased existing.

In this process we will handle only metrics that are considered as
inactive, according to `metric_inactive_after` parameter. Therefore,
we do not need to lock these metrics while processing, as they are
inactive, and chances are that they will not receive measures anymore.
Moreover, we are only touching metadata, and not the actual data.
"""

moment_now = utils.utcnow()
moment = moment_now - datetime.timedelta(
seconds=metric_inactive_after)

inactive_metrics = self.index.list_metrics(
attribute_filter={"<": {
"last_measure_timestamp": moment}},
resource_policy_filter={"==": {"ended_at": None}}
)

LOG.debug("Inactive metrics found for processing: [%s].",
inactive_metrics)

inactive_metrics_by_resource_id = defaultdict(list)
for metric in inactive_metrics:
resource_id = metric.resource_id
inactive_metrics_by_resource_id[resource_id].append(metric)

for resource_id in inactive_metrics_by_resource_id.keys():
if resource_id is None:
LOG.debug("We do not need to process inactive metrics that do "
"not have resource. Therefore, these metrics [%s] "
"will be considered inactive, but there is nothing "
"else we can do in this process.",
inactive_metrics_by_resource_id[resource_id])
continue
resource = self.index.get_resource(
"generic", resource_id, with_metrics=True)
resource_metrics = resource.metrics
resource_inactive_metrics = inactive_metrics_by_resource_id.get(resource_id)

all_metrics_are_inactive = True
for m in resource_metrics:
rafaelweingartner marked this conversation as resolved.
Show resolved Hide resolved
if m not in resource_inactive_metrics:
chungg marked this conversation as resolved.
Show resolved Hide resolved
all_metrics_are_inactive = False
LOG.debug("Not all metrics of resource [%s] are inactive. "
"Metric [%s] is not inactive. The inactive "
"metrics are [%s].",
resource, m, resource_inactive_metrics)
break

if all_metrics_are_inactive:
LOG.info("All metrics [%s] of resource [%s] are inactive."
"Therefore, we will mark it as finished with an"
"ended at timestmap.", resource_metrics, resource)
if resource.ended_at is not None:
LOG.debug(
"Resource [%s] already has an ended at value.", resource)
else:
LOG.info("Marking ended at timestamp for resource "
"[%s] because all of its metrics are inactive.",
resource)
self.index.update_resource(
"generic", resource_id, ended_at=moment_now)

def clean_raw_data_inactive_metrics(self):
"""Cleans metrics raw data if they are inactive.

Expand Down
11 changes: 11 additions & 0 deletions gnocchi/cli/metricd.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,17 @@ def _run_job(self):
LOG.debug("Finished the cleaning of raw data points for metrics that "
"are no longer receiving measures.")

if (self.conf.metricd.metric_inactive_after and
self.conf.metricd.metric_inactive_after > 0):
LOG.debug("Starting resource ended at field normalization.")
self.chef.resource_ended_at_normalization(
self.conf.metricd.metric_inactive_after)
LOG.debug("Finished resource ended at field normalization.")
else:
LOG.debug("Resource ended at field normalization is not "
"activated. See 'metric_inactive_after' parameter if "
"you wish to activate it.")


class MetricdServiceManager(cotyledon.ServiceManager):
def __init__(self, conf):
Expand Down
6 changes: 5 additions & 1 deletion gnocchi/indexer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,11 @@ def update_backwindow_changed_for_metrics_archive_policy(
raise exceptions.NotImplementedError

@staticmethod
def update_needs_raw_data_truncation(metric_id):
def update_needs_raw_data_truncation(metric_id, value):
raise exceptions.NotImplementedError

@staticmethod
def update_last_measure_timestamp(metric_id):
raise exceptions.NotImplementedError

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,15 @@
# under the License.
#

"""create metric truncation status column
"""Create metric truncation status column
Revision ID: 18fff4509e3e
Revises: 04eba72e4f90
Create Date: 2024-04-24 09:16:00
"""
import datetime

from alembic import op
from sqlalchemy.sql import func

import sqlalchemy

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Copyright 2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#

"""Create last measure push timestamp column
Revision ID: f89ed2e3c2ec
Revises: 18fff4509e3e
Create Date: 2024-04-24 09:16:00
"""

from alembic import op

import sqlalchemy

from sqlalchemy.sql import func

# revision identifiers, used by Alembic.
revision = 'f89ed2e3c2ec'
down_revision = '18fff4509e3e'
branch_labels = None
depends_on = None


def upgrade():
op.add_column(
"metric", sqlalchemy.Column(
"last_measure_timestamp", sqlalchemy.DateTime,
nullable=False, server_default=func.current_timestamp()))
Copy link
Member

@chungg chungg Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't know if it makes sense to make this nullable? it's possible that this is disabled at first and gets a default value and is enabled later and immediately all of the resources can potentially get set to ended (until next update).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually nevermind. i just realised last_measure_timestamp is updated regardless if auto end logic is enabled or not.

7 changes: 7 additions & 0 deletions gnocchi/indexer/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -1403,6 +1403,13 @@ def update_needs_raw_data_truncation(self, metrid_id, value=False):
if session.execute(stmt).rowcount == 0:
raise indexer.NoSuchMetric(metrid_id)

def update_last_measure_timestamp(self, metrid_id):
with self.facade.writer() as session:
stmt = update(Metric).filter(Metric.id == metrid_id).values(
last_measure_timestamp=datetime.datetime.utcnow())
if session.execute(stmt).rowcount == 0:
raise indexer.NoSuchMetric(metrid_id)

def update_backwindow_changed_for_metrics_archive_policy(
self, archive_policy_name):
with self.facade.writer() as session:
Expand Down
15 changes: 13 additions & 2 deletions gnocchi/indexer/sqlalchemy_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import sqlalchemy
from sqlalchemy.ext import declarative
from sqlalchemy.orm import declarative_base
from sqlalchemy.sql import func

import sqlalchemy_utils

Expand Down Expand Up @@ -113,6 +114,14 @@ class Metric(Base, GnocchiBase, indexer.Metric):
nullable=False, default=True,
server_default=sqlalchemy.sql.true())

# Timestamp that represents when the last measure push was received for the
# given metric. This allows us to identify when a metric ceased receiving
# measurements; thus, if all metric for a resource are in this situation,
# chances are that the resource ceased existing in the backend.
last_measure_timestamp = sqlalchemy.Column(
"last_measure_timestamp", sqlalchemy.DateTime,
nullable=False, server_default=func.current_timestamp())

def jsonify(self):
d = {
"id": self.id,
Expand Down Expand Up @@ -256,7 +265,8 @@ def type(cls):
creator = sqlalchemy.Column(sqlalchemy.String(255))
started_at = sqlalchemy.Column(types.TimestampUTC, nullable=False,
default=lambda: utils.utcnow())
revision_start = sqlalchemy.Column(types.TimestampUTC, nullable=False,
revision_start = sqlalchemy.Column(types.TimestampUTC,
nullable=False,
default=lambda: utils.utcnow())
ended_at = sqlalchemy.Column(types.TimestampUTC)
user_id = sqlalchemy.Column(sqlalchemy.String(255))
Expand Down Expand Up @@ -298,7 +308,8 @@ class ResourceHistory(ResourceMixin, Base, GnocchiBase):
ondelete="CASCADE",
name="fk_rh_id_resource_id"),
nullable=False)
revision_end = sqlalchemy.Column(types.TimestampUTC, nullable=False,
revision_end = sqlalchemy.Column(types.TimestampUTC,
nullable=False,
default=lambda: utils.utcnow())
metrics = sqlalchemy.orm.relationship(
Metric, primaryjoin="Metric.resource_id == ResourceHistory.id",
Expand Down
12 changes: 10 additions & 2 deletions gnocchi/opts.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ def __getitem__(self, key):
for opt in _INCOMING_OPTS:
opt.default = '${storage.%s}' % opt.name


API_OPTS = (
cfg.HostAddressOpt('host',
default="0.0.0.0",
Expand Down Expand Up @@ -172,7 +171,16 @@ def list_opts():
default=10000,
min=1,
help="Number of metrics that should be deleted "
"simultaneously by one janitor.")
"simultaneously by one janitor."),
cfg.IntOpt('metric_inactive_after',
chungg marked this conversation as resolved.
Show resolved Hide resolved
default=0,
help="Number of seconds to wait before we consider a "
"metric inactive. An inactive metric is a metric "
"that has not received new measurements for a "
"given period. If all metrics of a resource are "
"inactive, we mark the resource with the "
"'ended_at' timestamp. The default is 0 (zero), "
"which means that we never execute process.")
)),
("api", (
cfg.StrOpt('paste_config',
Expand Down
20 changes: 20 additions & 0 deletions gnocchi/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,26 @@ def _map_compute_splits_operations(bound_timeserie):
if metric.needs_raw_data_truncation:
indexer_driver.update_needs_raw_data_truncation(metric.id)

# Mark when the metric receives its latest measures
indexer_driver.update_last_measure_timestamp(metric.id)

resource_id = metric.resource_id
if resource_id:
resource = indexer_driver.get_resource('generic', resource_id)
LOG.debug("Checking if resource [%s] of metric [%s] with "
"resource ID [%s] needs to be restored.",
resource, metric.id, resource_id)
if resource.ended_at is not None:
LOG.info("Resource [%s] was marked with a timestamp for the "
"'ended_at' field. However, it received a "
"measurement for metric [%s]. Therefore, we undelete "
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"measurement for metric [%s]. Therefore, we undelete "
"measurement for metric [%s]. Therefore, restoring "

"it.", resource, metric)
indexer_driver.update_resource(
"generic", resource_id, ended_at=None)
else:
LOG.debug("Metric [%s] does not have a resource "
"assigned to it.", metric)

with self.statistics.time("splits delete"):
self._delete_metric_splits(splits_to_delete)
self.statistics["splits delete"] += len(splits_to_delete)
Expand Down
2 changes: 1 addition & 1 deletion run-upgrade-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ export GNOCCHI_USER=$GNOCCHI_USER_ID
# needs to be released. Otherwise, the logs stop to be writen, and the
# execution of the code is "frozen", due to the lack of buffer in the
# process output. To work around that, we can read the buffer, and dump it
# into a lof file. Then, we can cat the log file content at the end of the
# into a log file. Then, we can cat the log file content at the end of the
rafaelweingartner marked this conversation as resolved.
Show resolved Hide resolved
# process.
UWSGI_LOG_FILE=/tmp/uwsgi-new-version.log
METRICD_LOG_FILE=/tmp/gnocchi-metricd-new-version.log
Expand Down
Loading