Skip to content

Commit

Permalink
Fix Metricbeat k8s metadata sometimes not being present at startup (#…
Browse files Browse the repository at this point in the history
…41216)

* Fix Metricbeat k8s metadata sometimes not being present at startup

* Clone the whole metadata map when fetching

(cherry picked from commit 4e62fa5)
  • Loading branch information
swiatekm authored and mergify[bot] committed Oct 17, 2024
1 parent 49e49c7 commit 5ec5f39
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 254 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix fields not being parsed correctly in postgresql/database {issue}25301[25301] {pull}37720[37720]
- rabbitmq/queue - Change the mapping type of `rabbitmq.queue.consumers.utilisation.pct` to `scaled_float` from `long` because the values fall within the range of `[0.0, 1.0]`. Previously, conversion to integer resulted in reporting either `0` or `1`.
- Use namespace for GetListMetrics when exists in AWS {pull}41022[41022]
- Fix http server helper SSL config. {pull}39405[39405]
- Fix Kubernetes metadata sometimes not being present after startup {pull}41216[41216]

*Osquerybeat*

Expand Down
280 changes: 131 additions & 149 deletions metricbeat/module/kubernetes/util/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package util
import (
"errors"
"fmt"
"maps"
"strings"
"sync"
"time"
Expand All @@ -39,6 +40,10 @@ import (
"github.com/elastic/beats/v7/metricbeat/mb"
)

// Resource metadata keys are composed of multiple parts - usually just the namespace and name. This string is the
// separator between the parts when treating the key as a single string.
const resourceMetadataKeySeparator = "/"

type kubernetesConfig struct {
KubeConfig string `config:"kube_config"`
KubeClientOptions kubernetes.KubeClientOptions `config:"kube_client_options"`
Expand Down Expand Up @@ -67,12 +72,13 @@ type Enricher interface {

type enricher struct {
sync.RWMutex
metadata map[string]mapstr.M
metadataCache map[string]mapstr.M
index func(mapstr.M) string
updateFunc func(kubernetes.Resource) map[string]mapstr.M
deleteFunc func(kubernetes.Resource) []string
metricsetName string
resourceName string
watcher *metaWatcher
isPod bool
config *kubernetesConfig
log *logp.Logger
Expand All @@ -90,8 +96,7 @@ type metaWatcher struct {

metricsetsUsing []string // list of metricsets using this shared watcher(e.g. pod, container, state_pod)

enrichers map[string]*enricher // map of enrichers using this watcher. The key is the metricset name. Each metricset has its own enricher
metadataObjects map[string]bool // representation of a set of ids(in the form of namespace_name-resource_name) of each object received by the watcher's handler functions
enrichers map[string]*enricher // map of enrichers using this watcher. The key is the metricset name. Each metricset has its own enricher

nodeScope bool // whether this watcher should watch for resources in current node or in whole cluster
restartWatcher kubernetes.Watcher // whether this watcher needs a restart. Only relevant in leader nodes due to metricsets with different nodescope(pod, state_pod)
Expand Down Expand Up @@ -179,10 +184,10 @@ func getExtraWatchers(resourceName string, addResourceMetadata *metadata.AddReso
// in order to be able to retrieve 2nd layer Owner metadata like in case of:
// Deployment -> Replicaset -> Pod
// CronJob -> job -> Pod
if addResourceMetadata != nil && addResourceMetadata.Deployment {
if addResourceMetadata.Deployment {
extra = append(extra, ReplicaSetResource)
}
if addResourceMetadata != nil && addResourceMetadata.CronJob {
if addResourceMetadata.CronJob {
extra = append(extra, JobResource)
}
return extra
Expand Down Expand Up @@ -320,47 +325,82 @@ func createWatcher(
// Check if a watcher for the specific resource already exists.
resourceMetaWatcher, ok := resourceWatchers.metaWatchersMap[resourceName]

// If it does not exist, create the resourceMetaWatcher.
if !ok {
// Check if we need to add namespace to the watcher's options.
if isNamespaced(resourceName) {
options.Namespace = namespace
}
watcher, err := kubernetes.NewNamedWatcher(resourceName, client, resource, options, nil)
if err != nil {
return false, err
}
resourceWatchers.metaWatchersMap[resourceName] = &metaWatcher{
watcher: watcher,
started: false, // not started yet
metadataObjects: make(map[string]bool),
enrichers: make(map[string]*enricher),
metricsetsUsing: make([]string, 0),
restartWatcher: nil,
nodeScope: nodeScope,
}
return true, nil
} else if resourceMetaWatcher.nodeScope != nodeScope && resourceMetaWatcher.nodeScope {
// It might happen that the watcher already exists, but is only being used to monitor the resources
// of a single node(e.g. created by pod metricset). In that case, we need to check if we are trying to create a new watcher that will track
// the resources of whole cluster(e.g. in case of state_pod metricset).
// If it is the case, then we need to update the watcher by changing its watch options (removing options.Node)
// A running watcher cannot be updated directly. Instead, we must create a new one with the correct watch options.
// The new restartWatcher must be identical to the old watcher, including the same handler function, with the only difference being the watch options.

if isNamespaced(resourceName) {
options.Namespace = namespace
// If the watcher exists, exit
if ok {
if resourceMetaWatcher.nodeScope != nodeScope && resourceMetaWatcher.nodeScope {
// It might happen that the watcher already exists, but is only being used to monitor the resources
// of a single node(e.g. created by pod metricset). In that case, we need to check if we are trying to create a new watcher that will track
// the resources of whole cluster(e.g. in case of state_pod metricset).
// If it is the case, then we need to update the watcher by changing its watch options (removing options.Node)
// A running watcher cannot be updated directly. Instead, we must create a new one with the correct watch options.
// The new restartWatcher must be identical to the old watcher, including the same handler function, with the only difference being the watch options.

if isNamespaced(resourceName) {
options.Namespace = namespace
}
restartWatcher, err := kubernetes.NewNamedWatcher(resourceName, client, resource, options, nil)
if err != nil {
return false, err
}
// update the handler of the restartWatcher to match the current watcher's handler.
restartWatcher.AddEventHandler(resourceMetaWatcher.watcher.GetEventHandler())
resourceMetaWatcher.restartWatcher = restartWatcher
resourceMetaWatcher.nodeScope = nodeScope
}
restartWatcher, err := kubernetes.NewNamedWatcher(resourceName, client, resource, options, nil)
if err != nil {
return false, err
return false, nil
}
// Watcher doesn't exist, create it

// Check if we need to add namespace to the watcher's options.
if isNamespaced(resourceName) {
options.Namespace = namespace
}
watcher, err := kubernetes.NewNamedWatcher(resourceName, client, resource, options, nil)
if err != nil {
return false, err
}

resourceMetaWatcher = &metaWatcher{
watcher: watcher,
started: false, // not started yet
enrichers: make(map[string]*enricher),
metricsetsUsing: make([]string, 0),
restartWatcher: nil,
nodeScope: nodeScope,
}
resourceWatchers.metaWatchersMap[resourceName] = resourceMetaWatcher

// Add event handlers to the watcher. The only action we need to do here is invalidate the enricher cache.
addEventHandlerToWatcher(resourceMetaWatcher, resourceWatchers)

return true, nil
}

// addEventHandlerToWatcher adds an event handler to the watcher that invalidates the cache of enrichers attached
// to the watcher.
func addEventHandlerToWatcher(metaWatcher *metaWatcher, resourceWatchers *Watchers) {
notifyFunc := func(obj interface{}) {
enrichers := make(map[string]*enricher, len(metaWatcher.enrichers))

resourceWatchers.lock.Lock()
maps.Copy(enrichers, metaWatcher.enrichers)
resourceWatchers.lock.Unlock()

for _, enricher := range enrichers {
enricher.Lock()
ids := enricher.deleteFunc(obj.(kubernetes.Resource))
// update this watcher events by removing all the metadata[id]
for _, id := range ids {
delete(enricher.metadataCache, id)
}
enricher.Unlock()
}
// update the handler of the restartWatcher to match the current watcher's handler.
restartWatcher.AddEventHandler(resourceMetaWatcher.watcher.GetEventHandler())
resourceMetaWatcher.restartWatcher = restartWatcher
resourceMetaWatcher.nodeScope = nodeScope
}
return false, nil
metaWatcher.watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {}, // do nothing
UpdateFunc: notifyFunc,
DeleteFunc: notifyFunc,
})
}

// addToMetricsetsUsing adds metricset identified by metricsetUsing to the list of resources using the shared watcher
Expand Down Expand Up @@ -604,6 +644,7 @@ func NewResourceMetadataEnricher(
return &nilEnricher{}
}

_, _ = specificMetaGen, generalMetaGen // necessary for earlier versions of golangci-lint
// updateFunc to be used as the resource watchers add and update handler.
// The handler function is executed when a watcher is triggered(i.e. new/updated resource).
// It is responsible for generating the metadata for a detected resource by executing the metadata generators Generate method.
Expand Down Expand Up @@ -904,7 +945,7 @@ func getString(m mapstr.M, key string) string {
}

func join(fields ...string) string {
return strings.Join(fields, ":")
return strings.Join(fields, resourceMetadataKeySeparator)
}

// buildMetadataEnricher builds and returns a metadata enricher for a given metricset.
Expand All @@ -922,7 +963,7 @@ func buildMetadataEnricher(
log *logp.Logger) *enricher {

enricher := &enricher{
metadata: map[string]mapstr.M{},
metadataCache: map[string]mapstr.M{},
index: indexFunc,
updateFunc: updateFunc,
deleteFunc: deleteFunc,
Expand All @@ -940,104 +981,7 @@ func buildMetadataEnricher(
if resourceMetaWatcher != nil {
// Append the new enricher to watcher's enrichers map.
resourceMetaWatcher.enrichers[metricsetName] = enricher

// Check if this shared watcher has already detected resources and collected their
// metadata for another enricher.
// In that case, for each resource, call the updateFunc of the current enricher to
// generate its metadata. This is needed in cases where the watcher has already been
// notified for new/updated resources while the enricher for current metricset has not
// built yet (example is pod, state_pod metricsets).
for key := range resourceMetaWatcher.metadataObjects {
obj, exists, err := resourceMetaWatcher.watcher.Store().GetByKey(key)
if err != nil {
log.Errorf("Error trying to get the object from the store: %s", err)
} else {
if exists {
newMetadataEvents := enricher.updateFunc(obj.(kubernetes.Resource))
// add the new metadata to the watcher received metadata
for id, metadata := range newMetadataEvents {
enricher.metadata[id] = metadata
}
}
}
}

// AddEventHandler sets add, update and delete methods of watcher.
// Those methods are triggered when an event is detected for a
// resource creation, update or deletion.
resourceMetaWatcher.watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
resourceWatchers.lock.Lock()
defer resourceWatchers.lock.Unlock()

// Add object(detected resource) to the list of metadata objects of this watcher,
// so it can be used by enrichers created after the event is triggered.
// The identifier of the object is in the form of namespace/name so that
// it can be easily fetched from watcher's store in previous step.
accessor, _ := meta.Accessor(obj.(kubernetes.Resource))
id := accessor.GetName()
namespace := accessor.GetNamespace()
if namespace != "" {
id = namespace + "/" + id
}
resourceMetaWatcher.metadataObjects[id] = true
// Execute the updateFunc of each enricher associated to thos watcher.
for _, enricher := range resourceMetaWatcher.enrichers {
enricher.Lock()
newMetadataEvents := enricher.updateFunc(obj.(kubernetes.Resource))
// add the new metadata to the watcher received metadata
for id, metadata := range newMetadataEvents {
enricher.metadata[id] = metadata
}
enricher.Unlock()
}
},
UpdateFunc: func(obj interface{}) {
resourceWatchers.lock.Lock()
defer resourceWatchers.lock.Unlock()

// Add object to the list of metadata objects of this watcher
accessor, _ := meta.Accessor(obj.(kubernetes.Resource))
id := accessor.GetName()
namespace := accessor.GetNamespace()
if namespace != "" {
id = namespace + "/" + id
}
resourceMetaWatcher.metadataObjects[id] = true

for _, enricher := range resourceMetaWatcher.enrichers {
enricher.Lock()
updatedMetadataEvents := enricher.updateFunc(obj.(kubernetes.Resource))
for id, metadata := range updatedMetadataEvents {
enricher.metadata[id] = metadata
}
enricher.Unlock()
}
},
DeleteFunc: func(obj interface{}) {
resourceWatchers.lock.Lock()
defer resourceWatchers.lock.Unlock()

// Remove object from the list of metadata objects of this watcher
accessor, _ := meta.Accessor(obj.(kubernetes.Resource))
id := accessor.GetName()
namespace := accessor.GetNamespace()
if namespace != "" {
id = namespace + "/" + id
}
delete(resourceMetaWatcher.metadataObjects, id)

for _, enricher := range resourceMetaWatcher.enrichers {
enricher.Lock()
ids := enricher.deleteFunc(obj.(kubernetes.Resource))
// update this watcher events by removing all the metadata[id]
for _, id := range ids {
delete(enricher.metadata, id)
}
enricher.Unlock()
}
},
})
enricher.watcher = resourceMetaWatcher
}

return enricher
Expand Down Expand Up @@ -1124,11 +1068,8 @@ func (e *enricher) Stop(resourceWatchers *Watchers) {
// This method is executed whenever a new event is created and about to be published.
// The enricher's index method is used to retrieve the resource identifier from each event.
func (e *enricher) Enrich(events []mapstr.M) {
e.RLock()
defer e.RUnlock()

for _, event := range events {
if meta := e.metadata[e.index(event)]; meta != nil {
if meta := e.getMetadata(event); meta != nil {
k8s, err := meta.GetValue("kubernetes")
if err != nil {
continue
Expand All @@ -1145,10 +1086,9 @@ func (e *enricher) Enrich(events []mapstr.M) {
}

// don't apply pod metadata to module level
k8sMeta = k8sMeta.Clone()
delete(k8sMeta, "pod")
}
ecsMeta := meta.Clone()
ecsMeta := meta
err = ecsMeta.Delete("kubernetes")
if err != nil {
logp.Debug("kubernetes", "Failed to delete field '%s': %s", "kubernetes", err)
Expand All @@ -1162,6 +1102,48 @@ func (e *enricher) Enrich(events []mapstr.M) {
}
}

// getMetadata returns metadata for the given event. If the metadata doesn't exist in the cache, we try to get it
// from the watcher store.
// The returned map is copy to be owned by the caller.
func (e *enricher) getMetadata(event mapstr.M) mapstr.M {
e.Lock()
defer e.Unlock()
metaKey := e.index(event)
eventMeta := e.metadataCache[metaKey]
if eventMeta == nil {
e.updateMetadataCacheFromWatcher(metaKey)
eventMeta = e.metadataCache[metaKey]
}
if eventMeta != nil {
eventMeta = eventMeta.Clone()
}
return eventMeta
}

// updateMetadataCacheFromWatcher updates the metadata cache for the given key with data from the watcher.
func (e *enricher) updateMetadataCacheFromWatcher(key string) {
storeKey := getWatcherStoreKeyFromMetadataKey(key)
if res, exists, _ := e.watcher.watcher.Store().GetByKey(storeKey); exists {
eventMetaMap := e.updateFunc(res.(kubernetes.Resource))
for k, v := range eventMetaMap {
e.metadataCache[k] = v
}
}
}

// getWatcherStoreKeyFromMetadataKey returns a watcher store key for a given metadata cache key. These are identical
// for nearly all resources, and have the form `{namespace}/{name}`, with the exception of containers, where it's
// `{namespace}/{pod_name}/{container_name}`. In that case, we want the Pod key, so we drop the final part.
func getWatcherStoreKeyFromMetadataKey(metaKey string) string {
parts := strings.Split(metaKey, resourceMetadataKeySeparator)
if len(parts) <= 2 { // normal K8s resource
return metaKey
}

// container, we need to remove the final part to get the Pod key
return strings.Join(parts[:2], resourceMetadataKeySeparator)
}

func CreateEvent(event mapstr.M, namespace string) (mb.Event, error) {
var moduleFieldsMapStr mapstr.M
moduleFields, ok := event[mb.ModuleDataKey]
Expand Down
Loading

0 comments on commit 5ec5f39

Please sign in to comment.