From 8f8b5a98461f1d4075dd71900720ce9141929a04 Mon Sep 17 00:00:00 2001 From: Tobias Urdin Date: Tue, 29 Jun 2021 01:46:29 +0200 Subject: [PATCH 01/11] Fix bytes to str decode in Authorization header (cherry picked from commit 260463ebe8a01053fc7a80d63260ed799e4077d0) (cherry picked from commit 2d1f9e83c029055ed783befb3ae32b833676c60b) --- gnocchi/rest/auth_helper.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/gnocchi/rest/auth_helper.py b/gnocchi/rest/auth_helper.py index 1362f83ae..355846b4f 100644 --- a/gnocchi/rest/auth_helper.py +++ b/gnocchi/rest/auth_helper.py @@ -118,8 +118,10 @@ def get_metric_policy_filter(request, rule): class BasicAuthHelper(object): @staticmethod def get_current_user(request): - auth = werkzeug.http.parse_authorization_header( - request.headers.get("Authorization")) + hdr = request.headers.get("Authorization") + auth_hdr = (hdr.decode('utf-8') if type(hdr) == bytes + else hdr) + auth = werkzeug.http.parse_authorization_header(auth_hdr) if auth is None: api.abort(401) return auth.username From 8a14ff245d106e12891b047c4f497bfa24f96856 Mon Sep 17 00:00:00 2001 From: Tobias Urdin Date: Tue, 29 Jun 2021 01:47:06 +0200 Subject: [PATCH 02/11] Fix DB migration issue on MySQL >= 8 Trying to alter drop a table that has a check constraint is not allowed. (cherry picked from commit bcd916e8ee94fe960faf6a6b42ea68c64830ae14) (cherry picked from commit 76fa465e5314ed0c71c5ba9ffad1c847e18b067a) --- .../5c4f93e5bb4_mysql_float_to_timestamp.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/gnocchi/indexer/alembic/versions/5c4f93e5bb4_mysql_float_to_timestamp.py b/gnocchi/indexer/alembic/versions/5c4f93e5bb4_mysql_float_to_timestamp.py index 74a790a61..46fe6548c 100644 --- a/gnocchi/indexer/alembic/versions/5c4f93e5bb4_mysql_float_to_timestamp.py +++ b/gnocchi/indexer/alembic/versions/5c4f93e5bb4_mysql_float_to_timestamp.py @@ -24,6 +24,7 @@ """ from alembic import op +from sqlalchemy.engine.reflection import Inspector import sqlalchemy as sa from sqlalchemy.sql import func @@ -38,8 +39,21 @@ def upgrade(): bind = op.get_bind() + inspector = Inspector.from_engine(bind) + if bind and bind.engine.name == "mysql": op.execute("SET time_zone = '+00:00'") + + previous_cks = {"resource": [], "resource_history": []} + for table in ("resource", "resource_history"): + existing_cks = [ + c['name'] for c in inspector.get_check_constraints(table) + ] + ck_name = "ck_{}_started_before_ended".format(table) + if ck_name in existing_cks: + op.drop_constraint(ck_name, table, type_="check") + previous_cks[table].append(ck_name) + # NOTE(jd) So that crappy engine that is MySQL does not have "ALTER # TABLE … USING …". We need to copy everything and convert… for table_name, column_name in (("resource", "started_at"), @@ -75,3 +89,7 @@ def upgrade(): existing_nullable=nullable, existing_type=existing_type, new_column_name=column_name) + + for table in ("resource", "resource_history"): + for ck_name in previous_cks[table]: + op.create_check_constraint(ck_name, table, "started_at <= ended_at") From 56353dcd47946f01706cf9263fba2ce49dfaa9e6 Mon Sep 17 00:00:00 2001 From: Tobias Urdin Date: Tue, 29 Jun 2021 01:49:19 +0200 Subject: [PATCH 03/11] Fix SQLAlchemy 1.4 support * Fix list_metrics in amqp1d not properly validating list/set is not empty. * Dont set database or drivername on the new immutable SQLAlchemy URL object. * Validate values is not None before using. (cherry picked from commit 5ed9cc9a042d2b5717bd935956a99a2ba58d2e4a) (cherry picked from commit 58c78bf063f9ca4ef1f320d880ed4cf3fe156f62) # Conflicts: # gnocchi/amqp1d.py --- gnocchi/amqp1d.py | 237 +++++++++++++++++++++++++++++ gnocchi/indexer/sqlalchemy.py | 39 +++-- gnocchi/indexer/sqlalchemy_base.py | 8 +- 3 files changed, 273 insertions(+), 11 deletions(-) create mode 100644 gnocchi/amqp1d.py diff --git a/gnocchi/amqp1d.py b/gnocchi/amqp1d.py new file mode 100644 index 000000000..9a3ba148c --- /dev/null +++ b/gnocchi/amqp1d.py @@ -0,0 +1,237 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import itertools +import uuid + +import daiquiri +import proton.handlers +import proton.reactor +import six +import ujson + +from gnocchi import incoming +from gnocchi import indexer +from gnocchi import service +from gnocchi import utils + +LOG = daiquiri.getLogger(__name__) + + +class BatchProcessor(object): + def __init__(self, conf): + self.conf = conf + self.incoming = incoming.get_driver(self.conf) + self.indexer = indexer.get_driver(self.conf) + self._ensure_resource_type_exists() + + self._hosts = {} + self._measures = collections.defaultdict( + lambda: collections.defaultdict(list)) + + def reset(self): + self._hosts.clear() + self._measures.clear() + + def add_measures(self, host, name, measures): + host_id = "%s:%s" % (self.conf.amqp1d.resource_type, + host.replace("/", "_")) + self._hosts[host_id] = host + self._measures[host_id][name].extend(measures) + + def flush(self): + try: + self._flush() + except Exception: + LOG.error("Unepected error during flush()", exc_info=True) + self.reset() + + def _flush(self): + archive_policies = {} + resources = self._get_resources(self._measures.keys()) + for host_id, measures_by_names in six.iteritems(self._measures): + resource = resources[host_id] + + mbn_keys = measures_by_names.keys() + names = (set(mbn_keys) if len(mbn_keys) + else set()) + + for name in names: + if name not in archive_policies: + archive_policies[name] = ( + self.indexer.get_archive_policy_for_metric(name)) + known_metrics = self.indexer.list_metrics(attribute_filter={ + "and": [{"=": {"resource_id": resource.id}}, + {"in": {"name": list(names)}}] + }) + known_names = set((m.name for m in known_metrics)) + already_exists_names = [] + for name in (names - known_names): + try: + m = self.indexer.create_metric( + uuid.uuid4(), + creator=self.conf.amqp1d.creator, + resource_id=resource.id, + name=name, + archive_policy_name=archive_policies[name].name) + except indexer.NamedMetricAlreadyExists as e: + already_exists_names.append(e.metric) + except indexer.IndexerException as e: + LOG.error("Unexpected error, dropping metric %s", + name, exc_info=True) + else: + known_metrics.append(m) + + if already_exists_names: + # Add metrics created in the meantime + known_names.extend(already_exists_names) + known_metrics.extend( + self.indexer.list_metrics(attribute_filter={ + "and": [{"=": {"resource_id": resource.id}}, + {"in": {"name": already_exists_names}}] + })) + + self.incoming.add_measures_batch( + dict((metric.id, + measures_by_names[metric.name]) + for metric in known_metrics)) + + def _get_resources(self, host_ids): + + resource_ids = set((utils.ResourceUUID(host_id, + self.conf.amqp1d.creator) + for host_id in host_ids)) + + resources = self.indexer.list_resources( + resource_type=self.conf.amqp1d.resource_type, + attribute_filter={"in": {"id": resource_ids}}) + + resources_by_host_id = {r.original_resource_id: r for r in resources} + + missing_host_ids = set(host_ids) - set(resources_by_host_id.keys()) + + for host_id in missing_host_ids: + resource_id = utils.ResourceUUID(host_id, + self.conf.amqp1d.creator) + try: + r = self.indexer.create_resource( + self.conf.amqp1d.resource_type, + resource_id, + self.conf.amqp1d.creator, + original_resource_id=host_id, + host=self._hosts[host_id]) + except indexer.ResourceAlreadyExists: + r = self.indexer.get_resource( + self.conf.amqp1d.resource_type, + resource_id) + resources_by_host_id[host_id] = r + + return resources_by_host_id + + def _ensure_resource_type_exists(self): + try: + self.resource_type = self.indexer.get_resource_type( + self.conf.amqp1d.resource_type) + except indexer.NoSuchResourceType: + try: + mgr = self.indexer.get_resource_type_schema() + rtype = mgr.resource_type_from_dict( + self.conf.amqp1d.resource_type, { + "host": {"type": "string", "required": True, + "min_length": 0, "max_length": 255}, + }, "creating") + self.indexer.create_resource_type(rtype) + except indexer.ResourceTypeAlreadyExists: + LOG.debug("Resource type %s already exists", + self.conf.amqp1d.resource_type) + else: + LOG.info("Created resource type %s", + self.conf.amqp1d.resource_type) + self.resource_type = self.indexer.get_resource_type( + self.conf.amqp1d.resource_type) + else: + LOG.info("Found resource type %s", + self.conf.amqp1d.resource_type) + + +class CollectdFormatHandler(object): + def __init__(self, processor): + self.processor = processor + + @staticmethod + def _serialize_identifier(index, message): + """Based of FORMAT_VL from collectd/src/daemon/common.h. + + The biggest difference is that we don't prepend the host and append the + index of the value, and don't use slash. + + """ + suffix = ("-%s" % message["dsnames"][index] + if len(message["dsnames"]) > 1 else "") + return (message["plugin"] + ("-" + message["plugin_instance"] + if message["plugin_instance"] else "") + + "@" + + message["type"] + ("-" + message["type_instance"] + if message["type_instance"] else "") + + suffix) + + def on_message(self, event): + json_message = ujson.loads(event.message.body) + timestamp = utils.dt_in_unix_ns(utils.utcnow()) + measures_by_host_and_name = sorted(( + (message["host"], + self._serialize_identifier(index, message), + value) + for message in json_message + for index, value in enumerate(message["values"]) + )) + for (host, name), values in itertools.groupby( + measures_by_host_and_name, key=lambda x: x[0:2]): + measures = (incoming.Measure(timestamp, v[2]) for v in values) + self.processor.add_measures(host, name, measures) + + +class AMQP1Server(proton.handlers.MessagingHandler): + + def __init__(self, conf): + super(AMQP1Server, self).__init__() + self.peer_close_is_error = True + self.conf = conf + + self.processor = BatchProcessor(conf) + + # Only collectd format is supported for now + self.data_source_handler = { + "collectd": CollectdFormatHandler + }[self.conf.amqp1d.data_source](self.processor) + + def on_start(self, event): + event.container.schedule(self.conf.amqp1d.flush_delay, self) + + def on_message(self, event): + self.data_source_handler.on_message(event) + + def on_timer_task(self, event): + event.container.schedule(self.conf.amqp1d.flush_delay, self) + self.processor.flush() + + +def start(): + conf = service.prepare_service() + server = proton.reactor.Container(AMQP1Server(conf)) + try: + server.run() + except KeyboardInterrupt: + pass diff --git a/gnocchi/indexer/sqlalchemy.py b/gnocchi/indexer/sqlalchemy.py index 1265194ab..a842f32f4 100644 --- a/gnocchi/indexer/sqlalchemy.py +++ b/gnocchi/indexer/sqlalchemy.py @@ -260,27 +260,46 @@ def _safe_execute(self, connection, works): class SQLAlchemyIndexer(indexer.IndexerDriver): _RESOURCE_TYPE_MANAGER = ResourceClassMapper() + @staticmethod + def _set_url_database(url, database): + if hasattr(url, "set"): + return url.set(database=database) + else: + url.database = database + return url + + @staticmethod + def _set_url_drivername(url, drivername): + if hasattr(url, "set"): + return url.set(drivername=drivername) + else: + url.drivername = drivername + return url + @classmethod def _create_new_database(cls, url): """Used by testing to create a new database.""" purl = sqlalchemy_url.make_url( cls.dress_url( url)) - purl.database = purl.database + str(uuid.uuid4()).replace('-', '') + new_database = purl.database + str(uuid.uuid4()).replace('-', '') + purl = cls._set_url_database(purl, new_database) new_url = str(purl) sqlalchemy_utils.create_database(new_url) return new_url - @staticmethod - def dress_url(url): + @classmethod + def dress_url(cls, url): # If no explicit driver has been set, we default to pymysql if url.startswith("mysql://"): url = sqlalchemy_url.make_url(url) - url.drivername = "mysql+pymysql" + new_drivername = "mysql+pymysql" + url = cls._set_url_drivername(url, new_drivername) return str(url) if url.startswith("postgresql://"): url = sqlalchemy_url.make_url(url) - url.drivername = "postgresql+psycopg2" + new_drivername = "postgresql+psycopg2" + url = cls._set_url_drivername(url, new_drivername) return str(url) return url @@ -951,8 +970,6 @@ def delete_resources(self, resource_type='generic', target_cls = self._resource_type_to_mappers( session, resource_type)["resource"] - q = session.query(target_cls.id) - engine = session.connection() try: f = QueryTransformer.build_filter(engine.dialect.name, @@ -964,12 +981,16 @@ def delete_resources(self, resource_type='generic', raise indexer.ResourceAttributeError(resource_type, e.attribute) - q = q.filter(f) + q1 = session.query(target_cls.id) + q1 = q1.filter(f) session.query(Metric).filter( - Metric.resource_id.in_(q) + Metric.resource_id.in_(q1) ).update({"status": "delete"}, synchronize_session=False) + + q = session.query(target_cls) + q = q.filter(f) return q.delete(synchronize_session=False) @retry_on_deadlock diff --git a/gnocchi/indexer/sqlalchemy_base.py b/gnocchi/indexer/sqlalchemy_base.py index c9aae589e..331b51ef0 100644 --- a/gnocchi/indexer/sqlalchemy_base.py +++ b/gnocchi/indexer/sqlalchemy_base.py @@ -51,13 +51,17 @@ def process_bind_param(self, value, dialect): def process_result_value(self, value, dialect): values = super(ArchivePolicyDefinitionType, self).process_result_value(value, dialect) + if values is None: + return [] return [archive_policy.ArchivePolicyItem(**v) for v in values] class SetType(sqlalchemy_utils.JSONType): def process_result_value(self, value, dialect): - return set(super(SetType, - self).process_result_value(value, dialect)) + values = super(SetType, self).process_result_value(value, dialect) + if values is None: + return set() + return set(values) class ArchivePolicy(Base, GnocchiBase, archive_policy.ArchivePolicy): From a80f4a5abaec5a3ac89ed7c9d0491f9a4d964ab8 Mon Sep 17 00:00:00 2001 From: Tobias Urdin Date: Tue, 29 Jun 2021 11:04:19 +0200 Subject: [PATCH 04/11] Ignore H216 for flake8 We need to move to unittest.mock before enabling it. (cherry picked from commit 952f577f86bc7d58946847b1bd7c2b734d94a4ad) # Conflicts: # tox.ini (cherry picked from commit 4d2e0d914765405bd33c6f5224d018b329e7ca22) # Conflicts: # tox.ini --- tox.ini | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tox.ini b/tox.ini index 6930aab59..e46cc8fc9 100644 --- a/tox.ini +++ b/tox.ini @@ -117,7 +117,15 @@ commands = pifpaf -g GNOCCHI_INDEXER_URL run postgresql -- python setup.py testr exclude = .tox,.eggs,doc,gnocchi/rest/prometheus/remote_pb2.py,gnocchi/indexer/alembic/versions/ show-source = true enable-extensions = H904 +<<<<<<< HEAD ignore = E501,E731,W503,W504 +======= +<<<<<<< HEAD +======= +# TODO(tobias-urdin): Remove H216 when we use unittest.mock +ignore = E501,E731,W503,W504,H216 +>>>>>>> 952f577f (Ignore H216 for flake8) +>>>>>>> 4d2e0d91 (Ignore H216 for flake8) [testenv:docs] basepython = python3 From 65386282a196a4d6aef38e040377027478aa7856 Mon Sep 17 00:00:00 2001 From: Tobias Urdin Date: Tue, 29 Jun 2021 12:03:09 +0200 Subject: [PATCH 05/11] Only convert set to list in amqp1d _flush() (cherry picked from commit d11fc325cf8c85e00df818969de9dd743b5f4bdf) (cherry picked from commit 4b7ac058dfb13b002fd6a29bc7d70aee7f5d6a6b) --- gnocchi/amqp1d.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/gnocchi/amqp1d.py b/gnocchi/amqp1d.py index 9a3ba148c..4976185b1 100644 --- a/gnocchi/amqp1d.py +++ b/gnocchi/amqp1d.py @@ -64,10 +64,7 @@ def _flush(self): for host_id, measures_by_names in six.iteritems(self._measures): resource = resources[host_id] - mbn_keys = measures_by_names.keys() - names = (set(mbn_keys) if len(mbn_keys) - else set()) - + names = set(measures_by_names.keys()) for name in names: if name not in archive_policies: archive_policies[name] = ( From 2b12fbe837b3b4bdfc17916bb890b58982b8c6ce Mon Sep 17 00:00:00 2001 From: Tobias Urdin Date: Wed, 30 Jun 2021 14:54:27 +0200 Subject: [PATCH 06/11] Use isinstance() instead of type() for auth header (cherry picked from commit daa1e3e8a91eaf5c39913ae0c923da75d2f6eccf) (cherry picked from commit c1e6bb5d4c1401b947a9bb14dc713fd578077ede) --- gnocchi/rest/auth_helper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gnocchi/rest/auth_helper.py b/gnocchi/rest/auth_helper.py index 355846b4f..d11fadf1b 100644 --- a/gnocchi/rest/auth_helper.py +++ b/gnocchi/rest/auth_helper.py @@ -119,7 +119,7 @@ class BasicAuthHelper(object): @staticmethod def get_current_user(request): hdr = request.headers.get("Authorization") - auth_hdr = (hdr.decode('utf-8') if type(hdr) == bytes + auth_hdr = (hdr.decode('utf-8') if isinstance(hdr, bytes) else hdr) auth = werkzeug.http.parse_authorization_header(auth_hdr) if auth is None: From 215368a1537fef35794faa835656859fd9249abf Mon Sep 17 00:00:00 2001 From: Tobias Urdin Date: Wed, 30 Jun 2021 15:18:29 +0200 Subject: [PATCH 07/11] Unpin SQLAlchemy (cherry picked from commit 62ee223b456fa8e185720c18439d929d0f8cb0d4) (cherry picked from commit 7a04631d1041be8506cc2f38dd12ba97e999b8a0) --- setup.cfg | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.cfg b/setup.cfg index ef858a59b..0a1b0e2fb 100644 --- a/setup.cfg +++ b/setup.cfg @@ -58,13 +58,13 @@ keystone = mysql = pymysql oslo.db>=4.29.0 - sqlalchemy<1.4.0 + sqlalchemy sqlalchemy-utils alembic>=0.7.6,!=0.8.1,!=0.9.0 postgresql = psycopg2 oslo.db>=4.29.0 - sqlalchemy<1.4.0 + sqlalchemy sqlalchemy-utils alembic>=0.7.6,!=0.8.1,!=0.9.0 s3 = From c6c573f86e75ae89f81c7b9d55d533e2e0e96d10 Mon Sep 17 00:00:00 2001 From: Matthias Runge Date: Thu, 10 Dec 2020 17:19:17 +0100 Subject: [PATCH 08/11] Update hacking and use hacking later than 0.13. (cherry picked from commit c40c9db607adde84ffcb5e5fedfd14ddbd4255c5) (cherry picked from commit 2b01ed1b67a279624eb35a1089fc5baec5158686) # Conflicts: # gnocchi/tests/test_storage.py # tox.ini --- gnocchi/amqp1d.py | 2 +- gnocchi/tests/test_storage.py | 5 +++++ tox.ini | 8 ++++++++ 3 files changed, 14 insertions(+), 1 deletion(-) diff --git a/gnocchi/amqp1d.py b/gnocchi/amqp1d.py index 4976185b1..86e53e159 100644 --- a/gnocchi/amqp1d.py +++ b/gnocchi/amqp1d.py @@ -85,7 +85,7 @@ def _flush(self): archive_policy_name=archive_policies[name].name) except indexer.NamedMetricAlreadyExists as e: already_exists_names.append(e.metric) - except indexer.IndexerException as e: + except indexer.IndexerException: LOG.error("Unexpected error, dropping metric %s", name, exc_info=True) else: diff --git a/gnocchi/tests/test_storage.py b/gnocchi/tests/test_storage.py index 7d6846c30..2a5f1bf0b 100644 --- a/gnocchi/tests/test_storage.py +++ b/gnocchi/tests/test_storage.py @@ -637,7 +637,12 @@ def test_rewrite_measures(self): (datetime64(2016, 1, 6, 15, 12), numpy.timedelta64(1, 'm'), 44), (datetime64(2016, 1, 10, 16, 18), numpy.timedelta64(1, 'm'), 45), (datetime64(2016, 1, 10, 17, 12), numpy.timedelta64(1, 'm'), 46), +<<<<<<< HEAD ]}, self.storage.get_measures(self.metric, [aggregation])) +======= + ]}, get_measures_list(self.storage.get_aggregated_measures( + {self.metric: [aggregation]})[self.metric])) +>>>>>>> 2b01ed1b (Update hacking) def test_rewrite_measures_multiple_granularities(self): apname = str(uuid.uuid4()) diff --git a/tox.ini b/tox.ini index e46cc8fc9..b040e3513 100644 --- a/tox.ini +++ b/tox.ini @@ -105,7 +105,10 @@ commands = {toxinidir}/run-upgrade-tests.sh mysql-ceph [testenv:pep8] basepython = python3 deps = hacking>=0.12 +<<<<<<< HEAD +======= +>>>>>>> 2b01ed1b (Update hacking) commands = flake8 allowlist_externals = /usr/bin/flake8 @@ -118,6 +121,7 @@ exclude = .tox,.eggs,doc,gnocchi/rest/prometheus/remote_pb2.py,gnocchi/indexer/a show-source = true enable-extensions = H904 <<<<<<< HEAD +<<<<<<< HEAD ignore = E501,E731,W503,W504 ======= <<<<<<< HEAD @@ -126,6 +130,10 @@ ignore = E501,E731,W503,W504 ignore = E501,E731,W503,W504,H216 >>>>>>> 952f577f (Ignore H216 for flake8) >>>>>>> 4d2e0d91 (Ignore H216 for flake8) +======= +# TODO(tobias-urdin): Remove H216 when we use unittest.mock +ignore = E501,E731,W503,W504,H216 +>>>>>>> 2b01ed1b (Update hacking) [testenv:docs] basepython = python3 From bcde9055fcbc42881ce60ec5abbc680507f422d2 Mon Sep 17 00:00:00 2001 From: Tobias Urdin Date: Wed, 30 Jun 2021 17:19:36 +0200 Subject: [PATCH 09/11] Dont introduce amqp1d --- gnocchi/amqp1d.py | 234 ---------------------------------------------- 1 file changed, 234 deletions(-) delete mode 100644 gnocchi/amqp1d.py diff --git a/gnocchi/amqp1d.py b/gnocchi/amqp1d.py deleted file mode 100644 index 86e53e159..000000000 --- a/gnocchi/amqp1d.py +++ /dev/null @@ -1,234 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import collections -import itertools -import uuid - -import daiquiri -import proton.handlers -import proton.reactor -import six -import ujson - -from gnocchi import incoming -from gnocchi import indexer -from gnocchi import service -from gnocchi import utils - -LOG = daiquiri.getLogger(__name__) - - -class BatchProcessor(object): - def __init__(self, conf): - self.conf = conf - self.incoming = incoming.get_driver(self.conf) - self.indexer = indexer.get_driver(self.conf) - self._ensure_resource_type_exists() - - self._hosts = {} - self._measures = collections.defaultdict( - lambda: collections.defaultdict(list)) - - def reset(self): - self._hosts.clear() - self._measures.clear() - - def add_measures(self, host, name, measures): - host_id = "%s:%s" % (self.conf.amqp1d.resource_type, - host.replace("/", "_")) - self._hosts[host_id] = host - self._measures[host_id][name].extend(measures) - - def flush(self): - try: - self._flush() - except Exception: - LOG.error("Unepected error during flush()", exc_info=True) - self.reset() - - def _flush(self): - archive_policies = {} - resources = self._get_resources(self._measures.keys()) - for host_id, measures_by_names in six.iteritems(self._measures): - resource = resources[host_id] - - names = set(measures_by_names.keys()) - for name in names: - if name not in archive_policies: - archive_policies[name] = ( - self.indexer.get_archive_policy_for_metric(name)) - known_metrics = self.indexer.list_metrics(attribute_filter={ - "and": [{"=": {"resource_id": resource.id}}, - {"in": {"name": list(names)}}] - }) - known_names = set((m.name for m in known_metrics)) - already_exists_names = [] - for name in (names - known_names): - try: - m = self.indexer.create_metric( - uuid.uuid4(), - creator=self.conf.amqp1d.creator, - resource_id=resource.id, - name=name, - archive_policy_name=archive_policies[name].name) - except indexer.NamedMetricAlreadyExists as e: - already_exists_names.append(e.metric) - except indexer.IndexerException: - LOG.error("Unexpected error, dropping metric %s", - name, exc_info=True) - else: - known_metrics.append(m) - - if already_exists_names: - # Add metrics created in the meantime - known_names.extend(already_exists_names) - known_metrics.extend( - self.indexer.list_metrics(attribute_filter={ - "and": [{"=": {"resource_id": resource.id}}, - {"in": {"name": already_exists_names}}] - })) - - self.incoming.add_measures_batch( - dict((metric.id, - measures_by_names[metric.name]) - for metric in known_metrics)) - - def _get_resources(self, host_ids): - - resource_ids = set((utils.ResourceUUID(host_id, - self.conf.amqp1d.creator) - for host_id in host_ids)) - - resources = self.indexer.list_resources( - resource_type=self.conf.amqp1d.resource_type, - attribute_filter={"in": {"id": resource_ids}}) - - resources_by_host_id = {r.original_resource_id: r for r in resources} - - missing_host_ids = set(host_ids) - set(resources_by_host_id.keys()) - - for host_id in missing_host_ids: - resource_id = utils.ResourceUUID(host_id, - self.conf.amqp1d.creator) - try: - r = self.indexer.create_resource( - self.conf.amqp1d.resource_type, - resource_id, - self.conf.amqp1d.creator, - original_resource_id=host_id, - host=self._hosts[host_id]) - except indexer.ResourceAlreadyExists: - r = self.indexer.get_resource( - self.conf.amqp1d.resource_type, - resource_id) - resources_by_host_id[host_id] = r - - return resources_by_host_id - - def _ensure_resource_type_exists(self): - try: - self.resource_type = self.indexer.get_resource_type( - self.conf.amqp1d.resource_type) - except indexer.NoSuchResourceType: - try: - mgr = self.indexer.get_resource_type_schema() - rtype = mgr.resource_type_from_dict( - self.conf.amqp1d.resource_type, { - "host": {"type": "string", "required": True, - "min_length": 0, "max_length": 255}, - }, "creating") - self.indexer.create_resource_type(rtype) - except indexer.ResourceTypeAlreadyExists: - LOG.debug("Resource type %s already exists", - self.conf.amqp1d.resource_type) - else: - LOG.info("Created resource type %s", - self.conf.amqp1d.resource_type) - self.resource_type = self.indexer.get_resource_type( - self.conf.amqp1d.resource_type) - else: - LOG.info("Found resource type %s", - self.conf.amqp1d.resource_type) - - -class CollectdFormatHandler(object): - def __init__(self, processor): - self.processor = processor - - @staticmethod - def _serialize_identifier(index, message): - """Based of FORMAT_VL from collectd/src/daemon/common.h. - - The biggest difference is that we don't prepend the host and append the - index of the value, and don't use slash. - - """ - suffix = ("-%s" % message["dsnames"][index] - if len(message["dsnames"]) > 1 else "") - return (message["plugin"] + ("-" + message["plugin_instance"] - if message["plugin_instance"] else "") - + "@" - + message["type"] + ("-" + message["type_instance"] - if message["type_instance"] else "") - + suffix) - - def on_message(self, event): - json_message = ujson.loads(event.message.body) - timestamp = utils.dt_in_unix_ns(utils.utcnow()) - measures_by_host_and_name = sorted(( - (message["host"], - self._serialize_identifier(index, message), - value) - for message in json_message - for index, value in enumerate(message["values"]) - )) - for (host, name), values in itertools.groupby( - measures_by_host_and_name, key=lambda x: x[0:2]): - measures = (incoming.Measure(timestamp, v[2]) for v in values) - self.processor.add_measures(host, name, measures) - - -class AMQP1Server(proton.handlers.MessagingHandler): - - def __init__(self, conf): - super(AMQP1Server, self).__init__() - self.peer_close_is_error = True - self.conf = conf - - self.processor = BatchProcessor(conf) - - # Only collectd format is supported for now - self.data_source_handler = { - "collectd": CollectdFormatHandler - }[self.conf.amqp1d.data_source](self.processor) - - def on_start(self, event): - event.container.schedule(self.conf.amqp1d.flush_delay, self) - - def on_message(self, event): - self.data_source_handler.on_message(event) - - def on_timer_task(self, event): - event.container.schedule(self.conf.amqp1d.flush_delay, self) - self.processor.flush() - - -def start(): - conf = service.prepare_service() - server = proton.reactor.Container(AMQP1Server(conf)) - try: - server.run() - except KeyboardInterrupt: - pass From 9a826686fe5410612a8ccc1bd8ca2cc46cc8a1f4 Mon Sep 17 00:00:00 2001 From: Tobias Urdin Date: Wed, 30 Jun 2021 17:20:49 +0200 Subject: [PATCH 10/11] Fix update hacking test_storage.py conflict --- gnocchi/tests/test_storage.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/gnocchi/tests/test_storage.py b/gnocchi/tests/test_storage.py index 2a5f1bf0b..7d6846c30 100644 --- a/gnocchi/tests/test_storage.py +++ b/gnocchi/tests/test_storage.py @@ -637,12 +637,7 @@ def test_rewrite_measures(self): (datetime64(2016, 1, 6, 15, 12), numpy.timedelta64(1, 'm'), 44), (datetime64(2016, 1, 10, 16, 18), numpy.timedelta64(1, 'm'), 45), (datetime64(2016, 1, 10, 17, 12), numpy.timedelta64(1, 'm'), 46), -<<<<<<< HEAD ]}, self.storage.get_measures(self.metric, [aggregation])) -======= - ]}, get_measures_list(self.storage.get_aggregated_measures( - {self.metric: [aggregation]})[self.metric])) ->>>>>>> 2b01ed1b (Update hacking) def test_rewrite_measures_multiple_granularities(self): apname = str(uuid.uuid4()) From 2a3205661038e98d5dd249776f453f548e617b5f Mon Sep 17 00:00:00 2001 From: Tobias Urdin Date: Wed, 30 Jun 2021 17:22:04 +0200 Subject: [PATCH 11/11] Fix tox.ini conflict --- tox.ini | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/tox.ini b/tox.ini index b040e3513..2a51a3dac 100644 --- a/tox.ini +++ b/tox.ini @@ -105,10 +105,6 @@ commands = {toxinidir}/run-upgrade-tests.sh mysql-ceph [testenv:pep8] basepython = python3 deps = hacking>=0.12 -<<<<<<< HEAD - -======= ->>>>>>> 2b01ed1b (Update hacking) commands = flake8 allowlist_externals = /usr/bin/flake8 @@ -120,20 +116,8 @@ commands = pifpaf -g GNOCCHI_INDEXER_URL run postgresql -- python setup.py testr exclude = .tox,.eggs,doc,gnocchi/rest/prometheus/remote_pb2.py,gnocchi/indexer/alembic/versions/ show-source = true enable-extensions = H904 -<<<<<<< HEAD -<<<<<<< HEAD -ignore = E501,E731,W503,W504 -======= -<<<<<<< HEAD -======= -# TODO(tobias-urdin): Remove H216 when we use unittest.mock -ignore = E501,E731,W503,W504,H216 ->>>>>>> 952f577f (Ignore H216 for flake8) ->>>>>>> 4d2e0d91 (Ignore H216 for flake8) -======= # TODO(tobias-urdin): Remove H216 when we use unittest.mock ignore = E501,E731,W503,W504,H216 ->>>>>>> 2b01ed1b (Update hacking) [testenv:docs] basepython = python3