Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Monitoring Metrics with aioprometheus for ResourcePools #103

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions helm/templates/service_metrics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{{- if .Values.deploy -}}
apiVersion: v1
kind: Service
metadata:
name: {{ include "poolboy.name" . }}-metrics
namespace: {{ include "poolboy.namespaceName" . }}
labels:
{{- include "poolboy.labels" . | nindent 4 }}
spec:
type: {{ .Values.serviceMetrics.type }}
{{- with .Values.serviceMetrics.ports }}
ports:
{{- toYaml . | nindent 2 }}
{{- end }}
selector:
{{- include "poolboy.selectorLabels" . | nindent 4 }}
sessionAffinity: None
{{- end -}}
18 changes: 18 additions & 0 deletions helm/templates/service_monitor.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: {{ include "poolboy.name" . }}
namespace: {{ include "poolboy.namespaceName" . }}
labels:
{{- include "poolboy.labels" . | nindent 4 }}
spec:
selector:
matchLabels:
app.kubernetes.io/name: {{ include "poolboy.name" . }}
namespaceSelector:
matchNames:
- {{ include "poolboy.namespaceName" . }}
endpoints:
- port: {{ include "poolboy.name" . }}
interval: "30s"
path: /metrics
6 changes: 6 additions & 0 deletions helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ service:
port: 8000
protocol: TCP
targetPort: 8000
- name: metrics-service
port: 8000
protocol: web
targetPort: 8000
path: /metrics


resources: {}
# We usually recommend not to specify default resources and to leave this as a conscious
Expand Down
4 changes: 4 additions & 0 deletions operator/metrics/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .app_metrics import AppMetrics # noqa F401
from .metrics_manager import MetricsManager # noqa F401
from .metrics_service import MetricsService # noqa F401
from .resourcepool_metrics import ResourcePoolMetrics # noqa F401
38 changes: 38 additions & 0 deletions operator/metrics/app_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from __future__ import annotations
import time
from functools import wraps
from aioprometheus import Summary


class AppMetrics:
# Summary: Similar to a histogram, a summary samples observations
# (usually things like request durations and response sizes).
# While it also provides a total count of observations and a sum of all observed values,
# it calculates configurable quantiles over a sliding time window.
response_time_seconds = Summary("response_time_seconds",
"Response time in seconds",
{"method": "Method used for the request",
"resource_type": "Type of resource requested"
}
)

@staticmethod
def measure_execution_time(metric_name, **labels):

def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
metric = getattr(AppMetrics, metric_name, None)
start_time = time.time()
result = await func(*args, **kwargs)
end_time = time.time()
duration = end_time - start_time
if metric and callable(getattr(metric, 'observe', None)):
metric.observe(labels=labels, value=duration)
else:
print(f"Metric {metric_name} not found or doesn't support observe()")
return result

return wrapper

return decorator
18 changes: 18 additions & 0 deletions operator/metrics/metrics_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from __future__ import annotations
from aioprometheus import REGISTRY, Counter, Gauge

from .app_metrics import AppMetrics
from .resourcepool_metrics import ResourcePoolMetrics


class MetricsManager:
metrics_classes = [AppMetrics, ResourcePoolMetrics]

@classmethod
def register(cls):
for metrics_class in cls.metrics_classes:
for attr_name in dir(metrics_class):
attr = getattr(metrics_class, attr_name)
if isinstance(attr, (Counter, Gauge)):
if attr.name not in REGISTRY.collectors:
REGISTRY.register(attr)
22 changes: 22 additions & 0 deletions operator/metrics/metrics_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from __future__ import annotations
import os
import logging
from aioprometheus.service import Service


logger = logging.getLogger(__name__)


class MetricsService:
service = Service()

@classmethod
async def start(cls, addr="0.0.0.0", port=8000) -> None:
port = int(os.environ.get("METRICS_PORT", port))
await cls.service.start(addr=addr, port=port, metrics_url="/metrics")
logger.info(f"Serving metrics on: {cls.service.metrics_url}")

@classmethod
async def stop(cls) -> None:
logger.info("Stopping metrics service")
await cls.service.stop()
38 changes: 38 additions & 0 deletions operator/metrics/resourcepool_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from __future__ import annotations
from aioprometheus import Counter, Gauge

from metrics import AppMetrics


class ResourcePoolMetrics(AppMetrics):
resource_pool_labels = {'name': "Resource pool name",
'namespace': "Resource pool namespace"
}

resource_pool_min_available = Gauge(
'resource_pool_min_available',
'Minimum number of available environments in each resource pool',
resource_pool_labels
)

resource_pool_available = Gauge(
'resource_pool_available',
'Number of available environments in each resource pool',
resource_pool_labels
)

resource_pool_used_total = Counter(
'resource_pool_used_total',
'Total number of environments used in each resource pool',
resource_pool_labels
)

resource_pool_state_labels = {'name': "Resource pool name",
'namespace': "Resource pool namespace",
'state': "State of the resource pool"
}
resource_pool_state = Gauge(
'resource_pool_state',
'State of each resource pool, including available and used resources',
resource_pool_state_labels
)
45 changes: 45 additions & 0 deletions operator/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
from resourceprovider import ResourceProvider
from resourcewatcher import ResourceWatcher

from metrics import MetricsManager, MetricsService, AppMetrics



@kopf.on.startup()
async def startup(logger: kopf.ObjectLogger, settings: kopf.OperatorSettings, **_):
Expand All @@ -41,6 +44,12 @@ async def startup(logger: kopf.ObjectLogger, settings: kopf.OperatorSettings, **
# Configure logging
configure_kopf_logging()

# Start metrics service
await MetricsService.start()

# Register metrics
MetricsManager.register()

# Preload for matching ResourceClaim templates
await Poolboy.on_startup()
await ResourceProvider.preload(logger=logger)
Expand All @@ -51,6 +60,7 @@ async def startup(logger: kopf.ObjectLogger, settings: kopf.OperatorSettings, **
async def cleanup(logger: kopf.ObjectLogger, **_):
await ResourceWatcher.stop_all()
await Poolboy.on_cleanup()
await MetricsService.stop()


@kopf.on.create(
Expand All @@ -65,6 +75,11 @@ async def cleanup(logger: kopf.ObjectLogger, **_):
Poolboy.operator_domain, Poolboy.operator_version, 'resourceclaims',
id='resource_claim_update', labels={Poolboy.ignore_label: kopf.ABSENT},
)
@AppMetrics.measure_execution_time(
'response_time_seconds',
method='on_create',
resource_type='resourceclaims'
)
async def resource_claim_event(
annotations: kopf.Annotations,
labels: kopf.Labels,
Expand Down Expand Up @@ -94,6 +109,11 @@ async def resource_claim_event(
Poolboy.operator_domain, Poolboy.operator_version, 'resourceclaims',
labels={Poolboy.ignore_label: kopf.ABSENT},
)
@AppMetrics.measure_execution_time(
'response_time_seconds',
method='on_delete',
resource_type='resourceclaims'
)
async def resource_claim_delete(
annotations: kopf.Annotations,
labels: kopf.Labels,
Expand Down Expand Up @@ -173,6 +193,11 @@ async def resource_claim_daemon(
Poolboy.operator_domain, Poolboy.operator_version, 'resourcehandles',
id='resource_handle_update', labels={Poolboy.ignore_label: kopf.ABSENT},
)
@AppMetrics.measure_execution_time(
'response_time_seconds',
method='on_create',
resource_type='resourcehandles'
)
async def resource_handle_event(
annotations: kopf.Annotations,
labels: kopf.Labels,
Expand Down Expand Up @@ -202,6 +227,11 @@ async def resource_handle_event(
Poolboy.operator_domain, Poolboy.operator_version, 'resourcehandles',
labels={Poolboy.ignore_label: kopf.ABSENT},
)
@AppMetrics.measure_execution_time(
'response_time_seconds',
method='on_delete',
resource_type='resourcehandles'
)
async def resource_handle_delete(
annotations: kopf.Annotations,
labels: kopf.Labels,
Expand Down Expand Up @@ -281,6 +311,11 @@ async def resource_handle_daemon(
Poolboy.operator_domain, Poolboy.operator_version, 'resourcepools',
id='resource_pool_update', labels={Poolboy.ignore_label: kopf.ABSENT},
)
@AppMetrics.measure_execution_time(
'response_time_seconds',
method='on_event',
resource_type='resourcepools'
)
async def resource_pool_event(
annotations: kopf.Annotations,
labels: kopf.Labels,
Expand Down Expand Up @@ -310,6 +345,11 @@ async def resource_pool_event(
Poolboy.operator_domain, Poolboy.operator_version, 'resourcepools',
labels={Poolboy.ignore_label: kopf.ABSENT},
)
@AppMetrics.measure_execution_time(
'response_time_seconds',
method='on_delete',
resource_type='resourcepools'
)
async def resource_pool_delete(
annotations: kopf.Annotations,
labels: kopf.Labels,
Expand Down Expand Up @@ -337,6 +377,11 @@ async def resource_pool_delete(


@kopf.on.event(Poolboy.operator_domain, Poolboy.operator_version, 'resourceproviders')
@AppMetrics.measure_execution_time(
'response_time_seconds',
method='on_event',
resource_type='resourceproviders'
)
async def resource_provider_event(event: Mapping, logger: kopf.ObjectLogger, **_) -> None:
definition = event['object']
if event['type'] == 'DELETED':
Expand Down
54 changes: 54 additions & 0 deletions operator/resourcepool.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import kubernetes_asyncio
import pytimeparse

from metrics import ResourcePoolMetrics

from datetime import timedelta
from typing import List, Mapping, Optional, TypeVar

Expand Down Expand Up @@ -119,6 +121,17 @@ def lifespan_unclaimed_timedelta(self):
def metadata(self) -> Mapping:
return self.meta

@property
def metrics_labels(self) -> Mapping:
return {
'name': self.name,
'namespace': self.namespace,
}

@property
def metric_state_labels(self) -> Mapping:
return {'name': self.name, 'namespace': self.namespace, 'state': ''}

@property
def min_available(self) -> int:
return self.spec.get('minAvailable', 0)
Expand Down Expand Up @@ -161,13 +174,54 @@ def refresh(self,
self.status = status
self.uid = uid

async def handle_metrics(self, logger: kopf.ObjectLogger, resource_handles):
logger.info("Handling metrics for resource pool")
resource_handle_deficit = self.min_available - len(resource_handles)

ResourcePoolMetrics.resource_pool_min_available.set(
labels=self.metrics_labels,
value=self.min_available
)

ResourcePoolMetrics.resource_pool_available.set(
labels=self.metrics_labels,
value=len(resource_handles)
)

if resource_handle_deficit < 0:
ResourcePoolMetrics.resource_pool_used_total.inc(
labels=self.metrics_labels,
value=resource_handle_deficit
)

state_labels = self.metric_state_labels
state_labels['state'] = 'available'
ResourcePoolMetrics.resource_pool_state.set(
labels=state_labels,
value=len(resource_handles)
)

state_labels['state'] = 'used'
ResourcePoolMetrics.resource_pool_state.set(
labels=state_labels,
value=resource_handle_deficit
)

async def handle_delete(self, logger: kopf.ObjectLogger):
await resourcehandle.ResourceHandle.delete_unbound_handles_for_pool(logger=logger, resource_pool=self)

@ResourcePoolMetrics.measure_execution_time(
'response_time_seconds',
method='manage',
resource_type='resourcepool'
)
async def manage(self, logger: kopf.ObjectLogger):
async with self.lock:
resource_handles = await resourcehandle.ResourceHandle.get_unbound_handles_for_pool(resource_pool=self, logger=logger)
resource_handle_deficit = self.min_available - len(resource_handles)

await self.handle_metrics(logger=logger, resource_handles=resource_handles)

if resource_handle_deficit <= 0:
return
for i in range(resource_handle_deficit):
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ jsonpointer==2.2
jsonschema==3.2.0
openapi-schema-validator==0.1.5
prometheus-client==0.11.0
aioprometheus==23.12.0
pyasn1==0.4.8
pyasn1-modules==0.2.8
pydantic==1.9.0
Expand Down
Loading