From de3aad298d630cabd0be6b8adfe9abe576a84640 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Fri, 10 Jan 2025 11:14:08 +0100 Subject: [PATCH 01/19] wait for replica set informer before starting pod informer Signed-off-by: Florian Bacher --- .../internal/kube/client.go | 33 ++++++++++++++++--- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index a589d5a1c1f5..971f688ed73f 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -210,9 +210,11 @@ func New( func (c *WatchClient) Start() error { synced := make([]cache.InformerSynced, 0) + waitForReplicaSets := false // start the replicaSet informer first, as the replica sets need to be // present at the time the pods are handled, to correctly establish the connection between pods and deployments if c.Rules.DeploymentName || c.Rules.DeploymentUID { + waitForReplicaSets = true reg, err := c.replicasetInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.handleReplicaSetAdd, UpdateFunc: c.handleReplicaSetUpdate, @@ -222,7 +224,7 @@ func (c *WatchClient) Start() error { return err } synced = append(synced, reg.HasSynced) - go c.replicasetInformer.Run(c.stopCh) + go c.runInformer(c.replicasetInformer) } reg, err := c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -234,7 +236,12 @@ func (c *WatchClient) Start() error { return err } synced = append(synced, reg.HasSynced) - go c.informer.Run(c.stopCh) + + if waitForReplicaSets { + go c.runInformer(c.informer, c.replicasetInformer) + } else { + go c.runInformer(c.informer) + } reg, err = c.namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.handleNamespaceAdd, @@ -245,7 +252,7 @@ func (c *WatchClient) Start() error { return err } synced = append(synced, reg.HasSynced) - go c.namespaceInformer.Run(c.stopCh) + go c.runInformer(c.namespaceInformer) if c.nodeInformer != nil { reg, err = c.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -257,7 +264,7 @@ func (c *WatchClient) Start() error { return err } synced = append(synced, reg.HasSynced) - go c.nodeInformer.Run(c.stopCh) + go c.runInformer(c.nodeInformer) } if c.waitForMetadata { @@ -1123,6 +1130,24 @@ func (c *WatchClient) getReplicaSet(uid string) (*ReplicaSet, bool) { return nil, false } +// runInformer starts the given informer. This method optionally takes a list of other informers that should complete +// before the informer is started. This is necessary e.g. for the pod informer which requires the replica set informer +// to be finished to correctly establish the connection to the replicaset/deployment it belongs to. +func (c *WatchClient) runInformer(informer cache.SharedInformer, dependencies ...cache.SharedInformer) { + if len(dependencies) > 0 { + timeoutCh := make(chan struct{}) + // TODO hard coding the timeout for now, check if we should make this configurable + t := time.AfterFunc(5*time.Second, func() { + close(timeoutCh) + }) + defer t.Stop() + for _, dependency := range dependencies { + cache.WaitForCacheSync(timeoutCh, dependency.HasSynced) + } + } + informer.Run(c.stopCh) +} + // ignoreDeletedFinalStateUnknown returns the object wrapped in // DeletedFinalStateUnknown. Useful in OnDelete resource event handlers that do // not need the additional context. From b4d76363e55b26307e09bfd33613619b5f7c0b2b Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Fri, 10 Jan 2025 11:14:58 +0100 Subject: [PATCH 02/19] re-enable failing test Signed-off-by: Florian Bacher --- processor/k8sattributesprocessor/e2e_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/processor/k8sattributesprocessor/e2e_test.go b/processor/k8sattributesprocessor/e2e_test.go index 147b968ec913..7de25e62e3b7 100644 --- a/processor/k8sattributesprocessor/e2e_test.go +++ b/processor/k8sattributesprocessor/e2e_test.go @@ -1099,8 +1099,6 @@ func TestE2E_NamespacedRBACNoPodIP(t *testing.T) { // make docker-otelcontribcol // KUBECONFIG=/tmp/kube-config-otelcol-e2e-testing kind load docker-image otelcontribcol:latest func TestE2E_ClusterRBACCollectorStartAfterTelemetryGen(t *testing.T) { - // TODO: Re-enable this test when the issue being tested here is fully solved: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/37056 - t.Skip("Skipping test as https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/37056 is not fully solved yet") testDir := filepath.Join("testdata", "e2e", "clusterrbac") k8sClient, err := k8stest.NewK8sClient(testKubeConfig) From 2d0334e5995aad9429ea2fc2231cb7510ee092f2 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Mon, 13 Jan 2025 12:08:15 +0100 Subject: [PATCH 03/19] wait for other informers before starting pod informer, add changelog Signed-off-by: Florian Bacher --- .../k8sattributes-wait-for-informers.yaml | 27 +++++++++++++ .../internal/kube/client.go | 39 +++++++++---------- 2 files changed, 46 insertions(+), 20 deletions(-) create mode 100644 .chloggen/k8sattributes-wait-for-informers.yaml diff --git a/.chloggen/k8sattributes-wait-for-informers.yaml b/.chloggen/k8sattributes-wait-for-informers.yaml new file mode 100644 index 000000000000..e0fc2dc978bb --- /dev/null +++ b/.chloggen/k8sattributes-wait-for-informers.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: k8sattributesprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Wait for the other informers to complete their initial sync before starting the pod informers + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [37056] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index 971f688ed73f..b8b360242da2 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -210,11 +210,10 @@ func New( func (c *WatchClient) Start() error { synced := make([]cache.InformerSynced, 0) - waitForReplicaSets := false + waitForInformers := []cache.SharedInformer{} // start the replicaSet informer first, as the replica sets need to be // present at the time the pods are handled, to correctly establish the connection between pods and deployments if c.Rules.DeploymentName || c.Rules.DeploymentUID { - waitForReplicaSets = true reg, err := c.replicasetInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.handleReplicaSetAdd, UpdateFunc: c.handleReplicaSetUpdate, @@ -224,26 +223,11 @@ func (c *WatchClient) Start() error { return err } synced = append(synced, reg.HasSynced) + waitForInformers = append(waitForInformers, c.replicasetInformer) go c.runInformer(c.replicasetInformer) } - reg, err := c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: c.handlePodAdd, - UpdateFunc: c.handlePodUpdate, - DeleteFunc: c.handlePodDelete, - }) - if err != nil { - return err - } - synced = append(synced, reg.HasSynced) - - if waitForReplicaSets { - go c.runInformer(c.informer, c.replicasetInformer) - } else { - go c.runInformer(c.informer) - } - - reg, err = c.namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + reg, err := c.namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.handleNamespaceAdd, UpdateFunc: c.handleNamespaceUpdate, DeleteFunc: c.handleNamespaceDelete, @@ -252,10 +236,11 @@ func (c *WatchClient) Start() error { return err } synced = append(synced, reg.HasSynced) + waitForInformers = append(waitForInformers, c.namespaceInformer) go c.runInformer(c.namespaceInformer) if c.nodeInformer != nil { - reg, err = c.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + reg, err := c.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.handleNodeAdd, UpdateFunc: c.handleNodeUpdate, DeleteFunc: c.handleNodeDelete, @@ -264,9 +249,23 @@ func (c *WatchClient) Start() error { return err } synced = append(synced, reg.HasSynced) + waitForInformers = append(waitForInformers, c.nodeInformer) go c.runInformer(c.nodeInformer) } + reg, err = c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: c.handlePodAdd, + UpdateFunc: c.handlePodUpdate, + DeleteFunc: c.handlePodDelete, + }) + if err != nil { + return err + } + synced = append(synced, reg.HasSynced) + + // start the podInformer with the prerequisite of the other informers to be finished first + go c.runInformer(c.informer, waitForInformers...) + if c.waitForMetadata { timeoutCh := make(chan struct{}) t := time.AfterFunc(c.waitForMetadataTimeout, func() { From 94bf7ac7933ca8d44a065daa3af8c70ad10e531f Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Mon, 13 Jan 2025 12:14:11 +0100 Subject: [PATCH 04/19] fix linting Signed-off-by: Florian Bacher --- processor/k8sattributesprocessor/internal/kube/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index b8b360242da2..d15e2cf1835b 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -240,7 +240,7 @@ func (c *WatchClient) Start() error { go c.runInformer(c.namespaceInformer) if c.nodeInformer != nil { - reg, err := c.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + reg, err = c.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.handleNodeAdd, UpdateFunc: c.handleNodeUpdate, DeleteFunc: c.handleNodeDelete, From ec1c895ea5b2395e09f1d50f82238de54b601b4f Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Mon, 13 Jan 2025 12:32:15 +0100 Subject: [PATCH 05/19] trigger CI checks Signed-off-by: Florian Bacher From 3c4e6c4a5c37d69f57de61a0f56d7d2859837fc6 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Mon, 13 Jan 2025 13:35:37 +0100 Subject: [PATCH 06/19] trigger CI checks Signed-off-by: Florian Bacher From 02ad7bbcbdcdea7779bec429d864b3a3c79338f7 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Mon, 13 Jan 2025 13:51:37 +0100 Subject: [PATCH 07/19] trigger CI checks Signed-off-by: Florian Bacher From 041defd0c8964af87ce8d4dffd26c47092491ed7 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Mon, 13 Jan 2025 14:10:33 +0100 Subject: [PATCH 08/19] trigger CI checks Signed-off-by: Florian Bacher From 5fc8ca9195429b8f615eab16f3319a55643d7af2 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Mon, 13 Jan 2025 14:26:46 +0100 Subject: [PATCH 09/19] trigger CI checks Signed-off-by: Florian Bacher From 3101bd8e9bc858a2ce72c4f1aecc48f8bbe60cf9 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Mon, 13 Jan 2025 14:49:40 +0100 Subject: [PATCH 10/19] trigger CI checks Signed-off-by: Florian Bacher From 002adad4c20542654fa79cfddd49e3f75f38cf15 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Mon, 13 Jan 2025 15:43:27 +0100 Subject: [PATCH 11/19] trigger CI checks Signed-off-by: Florian Bacher From f2c8e91a7123d36b4a2a2a7d3b4272c3434a8fab Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Tue, 14 Jan 2025 06:53:39 +0100 Subject: [PATCH 12/19] trigger CI checks Signed-off-by: Florian Bacher From a3b97e962d5a07e3a3ef9d290a3f432d2de6f669 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Tue, 14 Jan 2025 08:17:44 +0100 Subject: [PATCH 13/19] trigger CI checks Signed-off-by: Florian Bacher From 76a042fe926d28b03f0797bbdd5422440251095b Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Tue, 14 Jan 2025 08:18:06 +0100 Subject: [PATCH 14/19] trigger CI checks Signed-off-by: Florian Bacher From 4ba0a32bf09396ddab12716e71eac780bfe12d94 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Tue, 14 Jan 2025 08:38:07 +0100 Subject: [PATCH 15/19] trigger CI checks Signed-off-by: Florian Bacher From 4de34d4529441ea918e5f384996bacda47042ac7 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Tue, 14 Jan 2025 08:57:02 +0100 Subject: [PATCH 16/19] trigger CI checks Signed-off-by: Florian Bacher From c86c80b4c817a2a93afa46c0421e14c353b5bfc4 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Tue, 14 Jan 2025 09:38:13 +0100 Subject: [PATCH 17/19] adapt to pr review Signed-off-by: Florian Bacher --- .../internal/kube/client.go | 22 +++++++------------ 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index a6bc23686dcf..9d1b8a90f8c5 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -209,8 +209,6 @@ func New( // Start registers pod event handlers and starts watching the kubernetes cluster for pod changes. func (c *WatchClient) Start() error { synced := make([]cache.InformerSynced, 0) - - waitForInformers := []cache.SharedInformer{} // start the replicaSet informer first, as the replica sets need to be // present at the time the pods are handled, to correctly establish the connection between pods and deployments if c.Rules.DeploymentName || c.Rules.DeploymentUID { @@ -223,8 +221,7 @@ func (c *WatchClient) Start() error { return err } synced = append(synced, reg.HasSynced) - waitForInformers = append(waitForInformers, c.replicasetInformer) - go c.runInformer(c.replicasetInformer) + go c.replicasetInformer.Run(c.stopCh) } reg, err := c.namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -236,8 +233,7 @@ func (c *WatchClient) Start() error { return err } synced = append(synced, reg.HasSynced) - waitForInformers = append(waitForInformers, c.namespaceInformer) - go c.runInformer(c.namespaceInformer) + go c.namespaceInformer.Run(c.stopCh) if c.nodeInformer != nil { reg, err = c.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -249,8 +245,7 @@ func (c *WatchClient) Start() error { return err } synced = append(synced, reg.HasSynced) - waitForInformers = append(waitForInformers, c.nodeInformer) - go c.runInformer(c.nodeInformer) + go c.nodeInformer.Run(c.stopCh) } reg, err = c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -261,10 +256,9 @@ func (c *WatchClient) Start() error { if err != nil { return err } - synced = append(synced, reg.HasSynced) // start the podInformer with the prerequisite of the other informers to be finished first - go c.runInformer(c.informer, waitForInformers...) + go c.runInformerWithDependencies(c.informer, synced) if c.waitForMetadata { timeoutCh := make(chan struct{}) @@ -272,7 +266,7 @@ func (c *WatchClient) Start() error { close(timeoutCh) }) defer t.Stop() - if !cache.WaitForCacheSync(timeoutCh, synced...) { + if !cache.WaitForCacheSync(timeoutCh, reg.HasSynced) { return errors.New("failed to wait for caches to sync") } } @@ -1129,10 +1123,10 @@ func (c *WatchClient) getReplicaSet(uid string) (*ReplicaSet, bool) { return nil, false } -// runInformer starts the given informer. This method optionally takes a list of other informers that should complete +// runInformerWithDependencies starts the given informer. The second argument is a list of other informers that should complete // before the informer is started. This is necessary e.g. for the pod informer which requires the replica set informer // to be finished to correctly establish the connection to the replicaset/deployment it belongs to. -func (c *WatchClient) runInformer(informer cache.SharedInformer, dependencies ...cache.SharedInformer) { +func (c *WatchClient) runInformerWithDependencies(informer cache.SharedInformer, dependencies []cache.InformerSynced) { if len(dependencies) > 0 { timeoutCh := make(chan struct{}) // TODO hard coding the timeout for now, check if we should make this configurable @@ -1141,7 +1135,7 @@ func (c *WatchClient) runInformer(informer cache.SharedInformer, dependencies .. }) defer t.Stop() for _, dependency := range dependencies { - cache.WaitForCacheSync(timeoutCh, dependency.HasSynced) + cache.WaitForCacheSync(timeoutCh, dependency) } } informer.Run(c.stopCh) From f64e13ada6d76e5777dd1593f911e877ac76d565 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Tue, 14 Jan 2025 09:39:50 +0100 Subject: [PATCH 18/19] add comment to explain changes to waitForMetadata logic Signed-off-by: Florian Bacher --- processor/k8sattributesprocessor/internal/kube/client.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index 9d1b8a90f8c5..98d1917161e3 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -266,6 +266,9 @@ func (c *WatchClient) Start() error { close(timeoutCh) }) defer t.Stop() + // Wait for the Pod informer to be completed. + // The other informers will already be finished at this point, as the pod informer + // waits for them be finished before it can run if !cache.WaitForCacheSync(timeoutCh, reg.HasSynced) { return errors.New("failed to wait for caches to sync") } From 711d559990f2d57a60a65f0388ad2a478d483df2 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Tue, 14 Jan 2025 11:24:09 +0100 Subject: [PATCH 19/19] adapt to pr review Signed-off-by: Florian Bacher --- processor/k8sattributesprocessor/internal/kube/client.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index 98d1917161e3..f2952046a56d 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -1137,9 +1137,7 @@ func (c *WatchClient) runInformerWithDependencies(informer cache.SharedInformer, close(timeoutCh) }) defer t.Stop() - for _, dependency := range dependencies { - cache.WaitForCacheSync(timeoutCh, dependency) - } + cache.WaitForCacheSync(timeoutCh, dependencies...) } informer.Run(c.stopCh) }