From 7670620a86d6a5f7736e8da69ffbc2600c98dd47 Mon Sep 17 00:00:00 2001 From: alvarocabanas Date: Fri, 18 Nov 2022 08:15:30 +0100 Subject: [PATCH] Prometheus exporter operator POC --- .../templates/clusterrole.yaml | 5 + .../templates/kubelet/scraper-configmap.yaml | 9 +- cmd/nri-kubernetes/main.go | 300 +++++++++++++++--- internal/config/config.go | 5 + 4 files changed, 261 insertions(+), 58 deletions(-) diff --git a/charts/newrelic-infrastructure/templates/clusterrole.yaml b/charts/newrelic-infrastructure/templates/clusterrole.yaml index 4913448e71..152c3fa07c 100644 --- a/charts/newrelic-infrastructure/templates/clusterrole.yaml +++ b/charts/newrelic-infrastructure/templates/clusterrole.yaml @@ -19,6 +19,11 @@ rules: - "nodes" - "namespaces" verbs: [ "get", "list", "watch" ] + - apiGroups: [ "" ] + resources: + - "pods" + - "secrets" + verbs: [ "get", "list", "watch", "create"] - nonResourceURLs: ["/metrics"] verbs: ["get"] {{- if .Values.rbac.pspEnabled }} diff --git a/charts/newrelic-infrastructure/templates/kubelet/scraper-configmap.yaml b/charts/newrelic-infrastructure/templates/kubelet/scraper-configmap.yaml index e43b5227fc..2498eb4dc7 100644 --- a/charts/newrelic-infrastructure/templates/kubelet/scraper-configmap.yaml +++ b/charts/newrelic-infrastructure/templates/kubelet/scraper-configmap.yaml @@ -1,4 +1,3 @@ -{{- if .Values.kubelet.enabled -}} --- apiVersion: v1 kind: ConfigMap @@ -10,9 +9,5 @@ metadata: data: nri-kubernetes.yml: | {{- (merge .Values.common.config (include "newrelic.integrationConfigDefaults" . | fromYaml)) | toYaml | nindent 4 }} - kubelet: - enabled: true - {{- if .Values.kubelet.config }} - {{- toYaml .Values.kubelet.config | nindent 6 }} - {{- end }} -{{- end }} + operator: + enabled: true diff --git a/cmd/nri-kubernetes/main.go b/cmd/nri-kubernetes/main.go index 56d8ff2a8c..3e60a99e15 100644 --- a/cmd/nri-kubernetes/main.go +++ b/cmd/nri-kubernetes/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "os" "path" @@ -9,22 +10,24 @@ import ( "time" sdk "github.com/newrelic/infra-integrations-sdk/integration" - log "github.com/sirupsen/logrus" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" - "k8s.io/client-go/util/homedir" - "github.com/newrelic/nri-kubernetes/v3/internal/config" "github.com/newrelic/nri-kubernetes/v3/internal/discovery" "github.com/newrelic/nri-kubernetes/v3/src/client" "github.com/newrelic/nri-kubernetes/v3/src/controlplane" - "github.com/newrelic/nri-kubernetes/v3/src/integration" "github.com/newrelic/nri-kubernetes/v3/src/ksm" ksmClient "github.com/newrelic/nri-kubernetes/v3/src/ksm/client" "github.com/newrelic/nri-kubernetes/v3/src/kubelet" kubeletClient "github.com/newrelic/nri-kubernetes/v3/src/kubelet/client" "github.com/newrelic/nri-kubernetes/v3/src/prometheus" + log "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/kubernetes" + v1 "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/util/homedir" ) const ( @@ -75,36 +78,38 @@ func main() { } } - integrationOptions := []integration.OptionFunc{ - integration.WithLogger(logger), - integration.WithMetadata(integration.Metadata{ - Name: integrationName, - Version: integrationVersion, - }), - } + /* + integrationOptions := []integration.OptionFunc{ + integration.WithLogger(logger), + integration.WithMetadata(integration.Metadata{ + Name: integrationName, + Version: integrationVersion, + }), + } - switch c.Sink.Type { - case config.SinkTypeHTTP: - integrationOptions = append(integrationOptions, integration.WithHTTPSink(c.Sink.HTTP)) - case config.SinkTypeStdout: - // We don't need to do anything here to sink to stdout, as it's the default behavior of integration.Wrapper. - logger.Warn("Sinking metrics to stdout") - default: - log.Errorf("Unknown sink type %s", c.Sink.Type) - os.Exit(exitConfig) - } + switch c.Sink.Type { + case config.SinkTypeHTTP: + integrationOptions = append(integrationOptions, integration.WithHTTPSink(c.Sink.HTTP)) + case config.SinkTypeStdout: + // We don't need to do anything here to sink to stdout, as it's the default behavior of integration.Wrapper. + logger.Warn("Sinking metrics to stdout") + default: + log.Errorf("Unknown sink type %s", c.Sink.Type) + os.Exit(exitConfig) + } - iw, err := integration.NewWrapper(integrationOptions...) - if err != nil { - logger.Errorf("creating integration wrapper: %v", err) - os.Exit(exitIntegration) - } + iw, err := integration.NewWrapper(integrationOptions...) + if err != nil { + logger.Errorf("creating integration wrapper: %v", err) + os.Exit(exitIntegration) + } - i, err := iw.Integration() - if err != nil { - logger.Errorf("creating integration with http sink: %v", err) - os.Exit(exitIntegration) - } + i, err := iw.Integration() + if err != nil { + logger.Errorf("creating integration with http sink: %v", err) + os.Exit(exitIntegration) + } + */ logger.Infof( "New Relic %s integration Version: %s, Platform: %s, GoVersion: %s, GitCommit: %s, BuildDate: %s\n", @@ -123,14 +128,16 @@ func main() { namespaceCache := discovery.NewNamespaceInMemoryStore(logger) - var kubeletScraper *kubelet.Scraper - if c.Kubelet.Enabled { - kubeletScraper, err = setupKubelet(c, clients, namespaceCache) - if err != nil { - logger.Errorf("setting up ksm scraper: %v", err) - os.Exit(exitSetup) + /* + var kubeletScraper *kubelet.Scraper + if c.Kubelet.Enabled { + kubeletScraper, err = setupKubelet(c, clients, namespaceCache) + if err != nil { + logger.Errorf("setting up ksm scraper: %v", err) + os.Exit(exitSetup) + } } - } + */ var ksmScraper *ksm.Scraper if c.KSM.Enabled { @@ -152,24 +159,38 @@ func main() { defer controlplaneScraper.Close() } + var o *operator + if c.Operator.Enabled { + o = createOperator(clients.k8s) + o.log = logger + } + for { start := time.Now() logger.Debugf("scraping data from all the scrapers defined: KSM: %t, Kubelet: %t, ControlPlane: %t", c.KSM.Enabled, c.Kubelet.Enabled, c.ControlPlane.Enabled) - // TODO think carefully to the signature of this function - err := runScrapers(c, ksmScraper, kubeletScraper, controlplaneScraper, i) - if err != nil { - logger.Errorf("retrieving scraper data: %v", err) - os.Exit(exitLoop) - } + /* + // TODO think carefully to the signature of this function + err := runScrapers(c, ksmScraper, kubeletScraper, controlplaneScraper, i) + if err != nil { + logger.Errorf("retrieving scraper data: %v", err) + os.Exit(exitLoop) + } + */ logger.Debugf("publishing data") - err = i.Publish() - if err != nil { - logger.Errorf("publishing integration: %v", err) - os.Exit(exitLoop) + /* + err = i.Publish() + if err != nil { + logger.Errorf("publishing integration: %v", err) + os.Exit(exitLoop) + } + */ + if c.Operator.Enabled { + o.run() + o.log = logger } namespaceCache.Vacuum() @@ -206,6 +227,183 @@ func runScrapers(c *config.Config, ksmScraper *ksm.Scraper, kubeletScraper *kube return nil } +type operator struct { + lister v1.PodNamespaceLister + client kubernetes.Interface + log *log.Logger +} + +const operatorNamespace = "redis" + +func createOperator(client kubernetes.Interface) *operator { + listMap, _ := discovery.NewNamespacePodListerer(discovery.PodListererConfig{Client: client, Namespaces: []string{operatorNamespace}}) + + l, _ := listMap.Lister(operatorNamespace) + + return &operator{ + lister: l, + client: client, + } +} + +func (o operator) run() { + logger.Debugf("running operator") + + listWorkLoads, listIntegrations := o.listInterestingPods() + + logger.Debugf("CREATING INTEGRATIONS") + + for _, w := range listWorkLoads { + found := false + for _, i := range listIntegrations { + if strings.Contains(i.Name, w.Name) { + logger.Debugf("workload already monitored %s by %s", w.Name, i.Name) + + found = true + } else { + logger.Debugf("%q does not contain %q", i.Name, w.Name) + } + } + + if w.Status.PodIP == "" { + logger.Warnf("SKIPPIN FOR NOW %q since IP is empty", w.Name) + } + + if !found && w.Status.PodIP != "" { + o.deployIntegration(w) + } + } + + logger.Debugf("CLEANING INTEGRATIONS") + + for _, i := range listIntegrations { + found := false + for _, w := range listWorkLoads { + if strings.Contains(i.Name, w.Name) { //TODO this is an example, we should also check that config did not change + logger.Debugf("integration monitoring workload %s by %s", w.Name, i.Name) + + found = true + } + } + + if !found { + o.deleteIntegration(i) + } + + } +} + +func (o operator) listInterestingPods() ([]*corev1.Pod, []*corev1.Pod) { + listWorkLoads, err := o.lister.List( + labels.SelectorFromSet(labels.Set{ + "monitoring-role": "workload-to-monitor", + })) + if err != nil { + o.log.Errorf("listing workloads %v", err) + } + + logger.Debugf("found pods %d", len(listWorkLoads)) + + listIntegrations, err := o.lister.List( + labels.SelectorFromSet(labels.Set{ + "monitoring-role": "integration-monitoring-workload", + })) + if err != nil { + o.log.Errorf("listing integrations %v", err) + } + + logger.Debugf("found integrations %d", len(listIntegrations)) + + return listWorkLoads, listIntegrations +} + +func (o operator) deleteIntegration(pod *corev1.Pod) { + logger.Debugf("deleting integrations and secret %q", pod.Name) + err := o.client.CoreV1().Pods(operatorNamespace).Delete(context.Background(), pod.Name, metav1.DeleteOptions{}) + if err != nil { + o.log.Errorf("deleting integration %v", err) + } + err = o.client.CoreV1().Secrets(operatorNamespace).Delete(context.Background(), pod.Name, metav1.DeleteOptions{}) + if err != nil { + o.log.Errorf("deleting secret %v", err) + } +} + +func (o operator) deployIntegration(pod *corev1.Pod) { + o.log.Infof("creating a new integration for %s", pod.Name) + + // The image to grab will come from an in the monitored service's pod annotation + containerIntegration := corev1.Container{ + Name: "integration", + Image: "acabanas977/nri-redis:latest", + Env: []corev1.EnvVar{ + { + Name: "WORKLOAD_NODE_IP", + Value: pod.Status.HostIP, + }, + }, + EnvFrom: []corev1.EnvFromSource{ + { + SecretRef: &corev1.SecretEnvSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: pod.Name + "-integration", + }, + }, + }, + }, + Resources: corev1.ResourceRequirements{}, + } + + // The port exposed can always be the same in all our prometheus-exporter images + p := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: pod.Name + "-integration", + Namespace: operatorNamespace, + Labels: map[string]string{ + "monitoring-role": "integration-monitoring-workload", + }, + Annotations: map[string]string{ + "prometheus.io/scrape": "true", + "prometheus.io/port": "9121", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + containerIntegration, + }, + }, + } + + _, err := o.client.CoreV1().Pods(operatorNamespace).Create(context.Background(), p, metav1.CreateOptions{}) + if err != nil { + o.log.Errorf("creating pod %v", err) + } + + data := map[string][]byte{} + populatedString := strings.Replace(pod.Annotations["config"], "${discovery.ip}", pod.Status.PodIP, 100) + rows := strings.Split(populatedString, "\n") + for _, r := range rows { + val := strings.Split(r, ": ") + if len(val) == 2 { + data[val[0]] = []byte(val[1]) + } else { + o.log.Errorf("Unexpected string %q", rows) + } + + } + + _, err = o.client.CoreV1().Secrets(operatorNamespace).Create(context.Background(), &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: pod.Name + "-integration", + Namespace: operatorNamespace, + }, + Data: data, + }, metav1.CreateOptions{}) + if err != nil { + o.log.Errorf("creating secret %v", err) + } +} + func setupKSM(c *config.Config, clients *clusterClients, namespaceCache *discovery.NamespaceInMemoryStore) (*ksm.Scraper, error) { providers := ksm.Providers{ K8s: clients.k8s, diff --git a/internal/config/config.go b/internal/config/config.go index c0e51c89d9..7195414fea 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -49,6 +49,7 @@ type Config struct { HTTP HTTPSink `mapstructure:"http"` } `mapstructure:"sink"` + Operator `mapstructure:"operator"` // ControlPlane defines config options for the control plane scraper. ControlPlane `mapstructure:"controlPlane"` // Kubelet defines config options for the kubelet scraper. @@ -84,6 +85,10 @@ type TLSConfig struct { CAPath string `mapstructure:"caPath"` } +type Operator struct { + Enabled bool `mapstructure:"enabled"` +} + // KSM contains configuration options for the KSM scraper. type KSM struct { // Enabled controls whether KSM scraping will be attempted.