diff --git a/helm/templates/service_monitor.yaml b/helm/templates/service_monitor.yaml new file mode 100644 index 0000000..8914658 --- /dev/null +++ b/helm/templates/service_monitor.yaml @@ -0,0 +1,22 @@ +{{- if .Values.serviceMonitor.create }} +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + name: {{ include "poolboy.name" . }} + namespace: {{ include "poolboy.namespaceName" . }} + labels: + {{- include "poolboy.labels" . | nindent 4 }} +spec: + endpoints: + - interval: {{ .Values.serviceMonitor.interval }} + path: {{ .Values.serviceMonitor.path }} + port: {{ .Values.serviceMonitor.portName }} + scrapeTimeout: {{ .Values.serviceMonitor.scrapeTimeout }} + jobLabel: {{ include "poolboy.name" . }} + namespaceSelector: + matchNames: + - {{ include "poolboy.namespaceName" . }} + selector: + matchLabels: + {{- include "poolboy.selectorLabels" . | nindent 6 }} +{{- end }} diff --git a/helm/values.yaml b/helm/values.yaml index b215b30..a4ce1f0 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -51,15 +51,31 @@ serviceAccount: # If not set and create is true, a name is generated using the fullname template name: +serviceMonitor: + # ServiceMonitor requires prometheus-operator installed in the cluster + create: true + # The port name of the service monitor to use. + portName: metrics + # The path to scrape metrics from + path: /metrics + # How often Prometheus should scrape + interval: 30s + # How long Prometheus should wait for a scrape to complete + scrapeTimeout: 15s + service: type: ClusterIP - port: 8000 + port: 8080 ports: + - name: healthz + port: 8080 + protocol: TCP + targetPort: 8080 - name: metrics - port: 8000 + port: 80 protocol: TCP targetPort: 8000 - + resources: {} # We usually recommend not to specify default resources and to leave this as a conscious # choice for the user. This also increases chances charts run on environments with little diff --git a/operator/metrics/__init__.py b/operator/metrics/__init__.py new file mode 100644 index 0000000..064f66d --- /dev/null +++ b/operator/metrics/__init__.py @@ -0,0 +1,4 @@ +from .app_metrics import AppMetrics # noqa F401 +from .metrics_manager import MetricsManager # noqa F401 +from .metrics_service import MetricsService # noqa F401 +from .resourcepool_metrics import ResourcePoolMetrics # noqa F401 \ No newline at end of file diff --git a/operator/metrics/app_metrics.py b/operator/metrics/app_metrics.py new file mode 100644 index 0000000..a567dd9 --- /dev/null +++ b/operator/metrics/app_metrics.py @@ -0,0 +1,19 @@ +from __future__ import annotations +import time +from functools import wraps +from aioprometheus import Summary, Counter + + +class AppMetrics: + response_time_seconds = Summary("response_time_seconds", + "Response time in seconds", + {"method": "Method used for the request", + "resource_type": "Type of resource requested" + } + ) + + request_handler_exceptions = Counter( + "request_handler_exceptions", + "Count of exceptions by handler function.", + {"handler": "Handler function name"} + ) diff --git a/operator/metrics/metrics_manager.py b/operator/metrics/metrics_manager.py new file mode 100644 index 0000000..6757aad --- /dev/null +++ b/operator/metrics/metrics_manager.py @@ -0,0 +1,18 @@ +from __future__ import annotations +from aioprometheus import REGISTRY, Counter, Gauge + +from .app_metrics import AppMetrics +from .resourcepool_metrics import ResourcePoolMetrics + + +class MetricsManager: + metrics_classes = [AppMetrics, ResourcePoolMetrics] + + @classmethod + def register(cls): + for metrics_class in cls.metrics_classes: + for attr_name in dir(metrics_class): + attr = getattr(metrics_class, attr_name) + if isinstance(attr, (Counter, Gauge)): + if attr.name not in REGISTRY.collectors: + REGISTRY.register(attr) diff --git a/operator/metrics/metrics_service.py b/operator/metrics/metrics_service.py new file mode 100644 index 0000000..2a89beb --- /dev/null +++ b/operator/metrics/metrics_service.py @@ -0,0 +1,22 @@ +from __future__ import annotations +import os +import logging +from aioprometheus.service import Service + + +logger = logging.getLogger(__name__) + + +class MetricsService: + service = Service() + + @classmethod + async def start(cls, addr="0.0.0.0", port=8000) -> None: + port = int(os.environ.get("METRICS_PORT", port)) + await cls.service.start(addr=addr, port=port, metrics_url="/metrics") + logger.info(f"Serving metrics on: {cls.service.metrics_url}") + + @classmethod + async def stop(cls) -> None: + logger.info("Stopping metrics service") + await cls.service.stop() diff --git a/operator/metrics/resourcepool_metrics.py b/operator/metrics/resourcepool_metrics.py new file mode 100644 index 0000000..a2491cb --- /dev/null +++ b/operator/metrics/resourcepool_metrics.py @@ -0,0 +1,38 @@ +from __future__ import annotations +from aioprometheus import Counter, Gauge + +from metrics import AppMetrics + + +class ResourcePoolMetrics(AppMetrics): + resource_pool_labels = {'name': "Resource pool name", + 'namespace': "Resource pool namespace" + } + + resource_pool_min_available = Gauge( + 'resource_pool_min_available', + 'Minimum number of available environments in each resource pool', + resource_pool_labels + ) + + resource_pool_available = Gauge( + 'resource_pool_available', + 'Number of available environments in each resource pool', + resource_pool_labels + ) + + resource_pool_used_total = Counter( + 'resource_pool_used_total', + 'Total number of environments used in each resource pool', + resource_pool_labels + ) + + resource_pool_state_labels = {'name': "Resource pool name", + 'namespace': "Resource pool namespace", + 'state': "State of the resource pool" + } + resource_pool_state = Gauge( + 'resource_pool_state', + 'State of each resource pool, including available and used resources', + resource_pool_state_labels + ) diff --git a/operator/operator.py b/operator/operator.py index 8ce280e..eedd2ba 100755 --- a/operator/operator.py +++ b/operator/operator.py @@ -3,6 +3,8 @@ import kopf import logging +from aioprometheus import timer + from copy import deepcopy from datetime import datetime, timedelta from typing import Any, Mapping, Optional @@ -17,8 +19,14 @@ from resourceprovider import ResourceProvider from resourcewatcher import ResourceWatcher +from metrics import MetricsManager, MetricsService, AppMetrics + @kopf.on.startup() +@timer( + AppMetrics.response_time_seconds, + labels={'method': 'startup', 'resource_type': 'poolboy_operator'} +) async def startup(logger: kopf.ObjectLogger, settings: kopf.OperatorSettings, **_): # Store last handled configuration in status settings.persistence.diffbase_storage = kopf.StatusDiffBaseStorage(field='status.diffBase') @@ -41,6 +49,12 @@ async def startup(logger: kopf.ObjectLogger, settings: kopf.OperatorSettings, ** # Configure logging configure_kopf_logging() + # Start metrics service + await MetricsService.start() + + # Register metrics + MetricsManager.register() + # Preload for matching ResourceClaim templates await Poolboy.on_startup() await ResourceProvider.preload(logger=logger) @@ -48,9 +62,14 @@ async def startup(logger: kopf.ObjectLogger, settings: kopf.OperatorSettings, ** @kopf.on.cleanup() +@timer( + AppMetrics.response_time_seconds, + labels={'method': 'cleanup', 'resource_type': 'poolboy_operator'} +) async def cleanup(logger: kopf.ObjectLogger, **_): await ResourceWatcher.stop_all() await Poolboy.on_cleanup() + await MetricsService.stop() @kopf.on.create( @@ -65,6 +84,10 @@ async def cleanup(logger: kopf.ObjectLogger, **_): Poolboy.operator_domain, Poolboy.operator_version, 'resourceclaims', id='resource_claim_update', labels={Poolboy.ignore_label: kopf.ABSENT}, ) +@timer( + AppMetrics.response_time_seconds, + labels={'method': 'resource_claim_event', 'resource_type': 'resourceclaims'} +) async def resource_claim_event( annotations: kopf.Annotations, labels: kopf.Labels, @@ -94,6 +117,10 @@ async def resource_claim_event( Poolboy.operator_domain, Poolboy.operator_version, 'resourceclaims', labels={Poolboy.ignore_label: kopf.ABSENT}, ) +@timer( + AppMetrics.response_time_seconds, + labels={'method': 'resource_claim_delete', 'resource_type': 'resourceclaims'} +) async def resource_claim_delete( annotations: kopf.Annotations, labels: kopf.Labels, @@ -173,6 +200,10 @@ async def resource_claim_daemon( Poolboy.operator_domain, Poolboy.operator_version, 'resourcehandles', id='resource_handle_update', labels={Poolboy.ignore_label: kopf.ABSENT}, ) +@timer( + AppMetrics.response_time_seconds, + labels={'method': 'resource_handle_event', 'resource_type': 'resourcehandles'} +) async def resource_handle_event( annotations: kopf.Annotations, labels: kopf.Labels, @@ -202,6 +233,10 @@ async def resource_handle_event( Poolboy.operator_domain, Poolboy.operator_version, 'resourcehandles', labels={Poolboy.ignore_label: kopf.ABSENT}, ) +@timer( + AppMetrics.response_time_seconds, + labels={'method': 'resource_handle_delete', 'resource_type': 'resourcehandles'} +) async def resource_handle_delete( annotations: kopf.Annotations, labels: kopf.Labels, @@ -281,6 +316,10 @@ async def resource_handle_daemon( Poolboy.operator_domain, Poolboy.operator_version, 'resourcepools', id='resource_pool_update', labels={Poolboy.ignore_label: kopf.ABSENT}, ) +@timer( + AppMetrics.response_time_seconds, + labels={'method': 'resource_pool_event', 'resource_type': 'resourcepools'} +) async def resource_pool_event( annotations: kopf.Annotations, labels: kopf.Labels, @@ -310,6 +349,10 @@ async def resource_pool_event( Poolboy.operator_domain, Poolboy.operator_version, 'resourcepools', labels={Poolboy.ignore_label: kopf.ABSENT}, ) +@timer( + AppMetrics.response_time_seconds, + labels={'method': 'resource_pool_delete', 'resource_type': 'resourcepools'} +) async def resource_pool_delete( annotations: kopf.Annotations, labels: kopf.Labels, @@ -337,6 +380,10 @@ async def resource_pool_delete( @kopf.on.event(Poolboy.operator_domain, Poolboy.operator_version, 'resourceproviders') +@timer( + AppMetrics.response_time_seconds, + labels={'method': 'resource_provider_event', 'resource_type': 'resourceproviders'} +) async def resource_provider_event(event: Mapping, logger: kopf.ObjectLogger, **_) -> None: definition = event['object'] if event['type'] == 'DELETED': diff --git a/operator/resourcepool.py b/operator/resourcepool.py index e577680..b2d6b8c 100644 --- a/operator/resourcepool.py +++ b/operator/resourcepool.py @@ -3,6 +3,9 @@ import kubernetes_asyncio import pytimeparse +from metrics import ResourcePoolMetrics + + from datetime import timedelta from typing import List, Mapping, Optional, TypeVar @@ -119,6 +122,17 @@ def lifespan_unclaimed_timedelta(self): def metadata(self) -> Mapping: return self.meta + @property + def metrics_labels(self) -> Mapping: + return { + 'name': self.name, + 'namespace': self.namespace, + } + + @property + def metric_state_labels(self) -> Mapping: + return {'name': self.name, 'namespace': self.namespace, 'state': ''} + @property def min_available(self) -> int: return self.spec.get('minAvailable', 0) @@ -161,6 +175,42 @@ def refresh(self, self.status = status self.uid = uid + async def handle_metrics(self, logger: kopf.ObjectLogger, resource_handles): + logger.info("Handling metrics for resource pool") + resource_handle_deficit = self.min_available - len(resource_handles) + + try: + ResourcePoolMetrics.resource_pool_min_available.set( + labels=self.metrics_labels, + value=self.min_available + ) + + ResourcePoolMetrics.resource_pool_available.set( + labels=self.metrics_labels, + value=len(resource_handles) + ) + + if resource_handle_deficit < 0: + ResourcePoolMetrics.resource_pool_used_total.inc( + labels=self.metrics_labels, + ) + + state_labels = self.metric_state_labels + state_labels['state'] = 'available' + ResourcePoolMetrics.resource_pool_state.set( + labels=state_labels, + value=len(resource_handles) + ) + + state_labels['state'] = 'used' + ResourcePoolMetrics.resource_pool_state.set( + labels=state_labels, + value=resource_handle_deficit + ) + except Exception as e: + logger.error(f"Error handling metrics for resource pool: {e}") + return + async def handle_delete(self, logger: kopf.ObjectLogger): await resourcehandle.ResourceHandle.delete_unbound_handles_for_pool(logger=logger, resource_pool=self) @@ -168,6 +218,9 @@ async def manage(self, logger: kopf.ObjectLogger): async with self.lock: resource_handles = await resourcehandle.ResourceHandle.get_unbound_handles_for_pool(resource_pool=self, logger=logger) resource_handle_deficit = self.min_available - len(resource_handles) + + await self.handle_metrics(logger=logger, resource_handles=resource_handles) + if resource_handle_deficit <= 0: return for i in range(resource_handle_deficit): diff --git a/requirements.txt b/requirements.txt index a986470..511096f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,7 @@ jsonpointer==2.2 jsonschema==3.2.0 openapi-schema-validator==0.1.5 prometheus-client==0.11.0 +aioprometheus==23.12.0 pyasn1==0.4.8 pyasn1-modules==0.2.8 pydantic==1.9.0