Skip to content

Commit

Permalink
Metric Server - consolidate apiserver to one port (#4982)
Browse files Browse the repository at this point in the history
Signed-off-by: Bojan Zelic <[email protected]>
  • Loading branch information
BojanZelic authored Sep 25, 2023
1 parent d33a865 commit e253ead
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 4 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ Here is an overview of all new **experimental** features:
- **General**: Add support for formula based evaluation of metric values ([#2440](https://github.com/kedacore/keda/issues/2440))

### Improvements

- **General**: Add apiserver Prometheus metrics to KEDA Metric Server ([#4460](https://github.com/kedacore/keda/issues/4460))
- **General**: Add more events for user checking ([#796](https://github.com/kedacore/keda/issues/3764))
- **General**: Add ScaledObject/ScaledJob names to output of `kubectl get triggerauthentication/clustertriggerauthentication` ([#796](https://github.com/kedacore/keda/issues/796))
- **General**: Add standalone CRD generation to release workflow ([#2726](https://github.com/kedacore/keda/issues/2726))
Expand Down
64 changes: 62 additions & 2 deletions cmd/adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,20 @@ import (
"context"
"flag"
"fmt"
"net/http"
"os"

"github.com/prometheus/client_golang/prometheus/collectors"
appsv1 "k8s.io/api/apps/v1"
apimetrics "k8s.io/apiserver/pkg/endpoints/metrics"
"k8s.io/client-go/kubernetes/scheme"
kubemetrics "k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/klog/v2"
"k8s.io/klog/v2/klogr"
ctrl "sigs.k8s.io/controller-runtime"
ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache"
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
"sigs.k8s.io/controller-runtime/pkg/metrics/server"
basecmd "sigs.k8s.io/custom-metrics-apiserver/pkg/cmd"
"sigs.k8s.io/custom-metrics-apiserver/pkg/provider"
Expand Down Expand Up @@ -96,10 +102,9 @@ func (a *Adapter) makeProvider(ctx context.Context) (provider.ExternalMetricsPro
cfg.Burst = adapterClientRequestBurst
cfg.DisableCompression = disableCompression

metricsBindAddress := fmt.Sprintf(":%v", metricsAPIServerPort)
mgr, err := ctrl.NewManager(cfg, ctrl.Options{
Metrics: server.Options{
BindAddress: metricsBindAddress,
BindAddress: "0", // disabled since we use our own server to serve metrics
},
Scheme: scheme,
Cache: ctrlcache.Options{
Expand Down Expand Up @@ -131,6 +136,58 @@ func (a *Adapter) makeProvider(ctx context.Context) (provider.ExternalMetricsPro
return kedaprovider.NewProvider(ctx, logger, mgr.GetClient(), *grpcClient), stopCh, nil
}

// getMetricHandler returns a http handler that exposes metrics from controller-runtime and apiserver
func getMetricHandler() http.HandlerFunc {
// Register apiserver metrics in legacy registry
// this contains the apiserver_* metrics
apimetrics.Register()

// unregister duplicate collectors that are already handled by controller-runtime's registry
legacyregistry.Registerer().Unregister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
legacyregistry.Registerer().Unregister(collectors.NewGoCollector(collectors.WithGoCollectorRuntimeMetrics(collectors.MetricsAll)))

// Return handler that serves metrics from both legacy and controller-runtime registry
return func(w http.ResponseWriter, req *http.Request) {
legacyregistry.Handler().ServeHTTP(w, req)

kubemetrics.HandlerFor(ctrlmetrics.Registry, kubemetrics.HandlerOpts{}).ServeHTTP(w, req)
}
}

// RunMetricsServer runs a http listener and handles the /metrics endpoint
// this is needed to consolidate apiserver and controller-runtime metrics
// we have to use a separate http server & can't rely on the controller-runtime implementation
// because apiserver doesn't provide a way to register metrics to other prometheus registries
func RunMetricsServer(ctx context.Context, stopCh <-chan struct{}) {
h := getMetricHandler()
mux := http.NewServeMux()
mux.Handle("/metrics", h)
metricsBindAddress := fmt.Sprintf(":%v", metricsAPIServerPort)

server := &http.Server{
Addr: metricsBindAddress,
Handler: mux,
}

go func() {
logger.Info("starting /metrics server endpoint")
// nosemgrep: use-tls
err := server.ListenAndServe()
if err != http.ErrServerClosed {
panic(err)
}
}()

go func() {
<-stopCh
logger.Info("Shutting down the /metrics server gracefully...")

if err := server.Shutdown(ctx); err != nil {
logger.Error(err, "http server shutdown error")
}
}()
}

// generateDefaultMetricsServiceAddr generates default Metrics Service gRPC Server address based on the current Namespace.
// By default the Metrics Service gRPC Server runs in the same namespace on the keda-operator pod.
func generateDefaultMetricsServiceAddr() string {
Expand Down Expand Up @@ -196,6 +253,9 @@ func main() {
cmd.WithExternalMetrics(kedaProvider)

logger.Info(cmd.Message)

RunMetricsServer(ctx, stopCh)

if err = cmd.Run(stopCh); err != nil {
return
}
Expand Down
39 changes: 38 additions & 1 deletion tests/sequential/prometheus_metrics/prometheus_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,9 @@ func testWebhookMetricValues(t *testing.T) {
}

func testMetricServerMetrics(t *testing.T) {
_ = fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaMetricsServerPrometheusURL))
families := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaMetricsServerPrometheusURL))
checkMetricServerValues(t, families)
checkBuildInfo(t, families)
}

func testOperatorMetricValues(t *testing.T, kc *kubernetes.Clientset) {
Expand Down Expand Up @@ -768,3 +770,38 @@ func checkWebhookValues(t *testing.T, families map[string]*prommodel.MetricFamil
}
assert.GreaterOrEqual(t, metricValue, 1.0, "keda_webhook_scaled_object_validation_total has to be greater than 0")
}

func checkMetricServerValues(t *testing.T, families map[string]*prommodel.MetricFamily) {
t.Log("--- testing metric server metrics ---")

family, ok := families["workqueue_adds_total"]
if !ok {
t.Errorf("metric workqueue_adds_total not available")
return
}

metricValue := 0.0
metrics := family.GetMetric()
for _, metric := range metrics {
metricValue += *metric.Counter.Value
}
assert.GreaterOrEqual(t, metricValue, 1.0, "workqueue_adds_total has to be greater than 0")

family, ok = families["apiserver_request_total"]
if !ok {
t.Errorf("metric apiserver_request_total not available")
return
}

metricValue = 0.0
metrics = family.GetMetric()
for _, metric := range metrics {
labels := metric.GetLabel()
for _, label := range labels {
if *label.Name == "group" && *label.Value == "external.metrics.k8s.io" {
metricValue = *metric.Counter.Value
}
}
}
assert.GreaterOrEqual(t, metricValue, 1.0, "apiserver_request_total has to be greater than 0")
}

0 comments on commit e253ead

Please sign in to comment.