Skip to content

Commit f036dd9

Browse files
[extension/k8s leader elector] Extension leader election implementation (#38015)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Added the actual implementation for leader election Initial structure was added here: [basic structure](#37266) <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes #34460 <!--Describe what testing was performed and which tests were added.--> #### Testing unit tests added <!--Describe the documentation added.--> #### Documentation documentation is provided <!--Please delete paragraphs that you did not use before submitting.-->
1 parent 20a49b7 commit f036dd9

File tree

7 files changed

+228
-9
lines changed

7 files changed

+228
-9
lines changed

extension/k8sleaderelector/README.md

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,26 @@ service:
4242
| **lease_namespace** | The namespace of the lease object. | none (required) |
4343
| **lease_duration** | The duration of the lease. | 15s |
4444
| **renew_deadline** | The deadline for renewing the lease. It must be less than the lease duration. | 10s |
45-
| **retry_period** | The period for retrying the leader election. | 2s |
45+
| **retry_period** | The period for retrying the leader election. | 2s |
46+
47+
### Suggested RBAC
48+
```yaml
49+
apiVersion: rbac.authorization.k8s.io/v1
50+
kind: Role
51+
metadata:
52+
name: my-lease
53+
namespace: default
54+
rules:
55+
- apiGroups:
56+
- coordination.k8s.io
57+
resources:
58+
- leases
59+
verbs:
60+
- get
61+
- list
62+
- watch
63+
- create
64+
- update
65+
- patch
66+
- delete
67+
```

extension/k8sleaderelector/extension.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package k8sleaderelector // import "github.com/open-telemetry/opentelemetry-coll
55

66
import (
77
"context"
8+
"sync"
89

910
"go.opentelemetry.io/collector/component"
1011
"go.opentelemetry.io/collector/extension"
@@ -36,17 +37,64 @@ type leaderElectionExtension struct {
3637
client kubernetes.Interface
3738
logger *zap.Logger
3839
leaseHolderID string
40+
cancel context.CancelFunc
41+
waitGroup sync.WaitGroup
3942

4043
onStartedLeading []StartCallback
4144
onStoppedLeading []StopCallback
4245
}
4346

47+
// If the receiver sets a callback function then it would be invoked when the leader wins the election
48+
func (lee *leaderElectionExtension) startedLeading(ctx context.Context) {
49+
for _, callback := range lee.onStartedLeading {
50+
callback(ctx)
51+
}
52+
}
53+
54+
// If the receiver sets a callback function then it would be invoked when the leader loss the election
55+
func (lee *leaderElectionExtension) stoppedLeading() {
56+
for _, callback := range lee.onStoppedLeading {
57+
callback()
58+
}
59+
}
60+
4461
// Start begins the extension's processing.
4562
func (lee *leaderElectionExtension) Start(_ context.Context, _ component.Host) error {
63+
lee.logger.Info("Starting k8s leader elector with UUID", zap.String("UUID", lee.leaseHolderID))
64+
65+
ctx := context.Background()
66+
ctx, lee.cancel = context.WithCancel(ctx)
67+
// Create the K8s leader elector
68+
leaderElector, err := newK8sLeaderElector(lee.config, lee.client, lee.startedLeading, lee.stoppedLeading, lee.leaseHolderID)
69+
if err != nil {
70+
lee.logger.Error("Failed to create k8s leader elector", zap.Error(err))
71+
return err
72+
}
73+
lee.waitGroup.Add(1)
74+
go func() {
75+
// Leader election loop stops if context is canceled or the leader elector loses the lease.
76+
// The loop allows continued participation in leader election, even if the lease is lost.
77+
defer lee.waitGroup.Done()
78+
for {
79+
leaderElector.Run(ctx)
80+
81+
if ctx.Err() != nil {
82+
break
83+
}
84+
85+
lee.logger.Info("Leader lease lost. Returning to standby mode...")
86+
}
87+
}()
88+
4689
return nil
4790
}
4891

4992
// Shutdown ends the extension's processing.
5093
func (lee *leaderElectionExtension) Shutdown(context.Context) error {
94+
lee.logger.Info("Stopping k8s leader elector with UUID", zap.String("UUID", lee.leaseHolderID))
95+
if lee.cancel != nil {
96+
lee.cancel()
97+
}
98+
lee.waitGroup.Wait()
5199
return nil
52100
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package k8sleaderelector
5+
6+
import (
7+
"context"
8+
"fmt"
9+
"testing"
10+
"time"
11+
12+
"github.com/stretchr/testify/require"
13+
"go.opentelemetry.io/collector/component/componenttest"
14+
"go.uber.org/zap"
15+
"go.uber.org/zap/zaptest/observer"
16+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17+
"k8s.io/client-go/kubernetes"
18+
"k8s.io/client-go/kubernetes/fake"
19+
"k8s.io/utils/ptr"
20+
21+
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
22+
)
23+
24+
func TestExtension(t *testing.T) {
25+
config := &Config{
26+
LeaseName: "foo",
27+
LeaseNamespace: "default",
28+
LeaseDuration: 15 * time.Second,
29+
RenewDuration: 10 * time.Second,
30+
RetryPeriod: 2 * time.Second,
31+
}
32+
33+
iamInvokedOnLeading := false
34+
35+
ctx := context.TODO()
36+
fakeClient := fake.NewClientset()
37+
config.makeClient = func(_ k8sconfig.APIConfig) (kubernetes.Interface, error) {
38+
return fakeClient, nil
39+
}
40+
41+
observedZapCore, _ := observer.New(zap.WarnLevel)
42+
43+
leaderElection := leaderElectionExtension{
44+
config: config,
45+
client: fakeClient,
46+
logger: zap.New(observedZapCore),
47+
leaseHolderID: "foo",
48+
}
49+
50+
leaderElection.SetCallBackFuncs(
51+
func(_ context.Context) {
52+
iamInvokedOnLeading = true
53+
fmt.Printf("LeaderElection started leading")
54+
},
55+
func() {
56+
fmt.Printf("LeaderElection stopped leading")
57+
},
58+
)
59+
60+
require.NoError(t, leaderElection.Start(ctx, componenttest.NewNopHost()))
61+
62+
expectedLeaseDurationSeconds := ptr.To(int32(15))
63+
64+
require.Eventually(t, func() bool {
65+
lease, err := fakeClient.CoordinationV1().Leases("default").Get(ctx, "foo", metav1.GetOptions{})
66+
require.NoError(t, err)
67+
require.NotNil(t, lease)
68+
require.Equal(t, expectedLeaseDurationSeconds, lease.Spec.LeaseDurationSeconds)
69+
return true
70+
}, 10*time.Second, 100*time.Millisecond)
71+
72+
require.True(t, iamInvokedOnLeading)
73+
require.NoError(t, leaderElection.Shutdown(ctx))
74+
}

extension/k8sleaderelector/factory.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@ package k8sleaderelector // import "github.com/open-telemetry/opentelemetry-coll
66
import (
77
"context"
88
"errors"
9-
"os"
9+
"sync"
1010
"time"
1111

12+
"github.com/google/uuid"
1213
"go.opentelemetry.io/collector/component"
1314
"go.opentelemetry.io/collector/extension"
1415

@@ -52,16 +53,14 @@ func createExtension(
5253
return nil, errors.New("failed to create k8s client")
5354
}
5455

55-
leaseHolderID, err := os.Hostname()
56-
if err != nil {
57-
return nil, err
58-
}
56+
leaseHolderID := uuid.New().String()
5957

6058
return &leaderElectionExtension{
6159
config: baseCfg,
6260
logger: set.Logger,
6361
client: client,
6462
leaseHolderID: leaseHolderID,
63+
waitGroup: sync.WaitGroup{},
6564
}, nil
6665
}
6766

extension/k8sleaderelector/go.mod

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sle
33
go 1.23.0
44

55
require (
6+
github.com/google/uuid v1.6.0
67
github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.120.1
78
github.com/stretchr/testify v1.10.0
89
go.opentelemetry.io/collector/component v0.120.1-0.20250226024140-8099e51f9a77
@@ -12,7 +13,9 @@ require (
1213
go.opentelemetry.io/collector/extension/extensiontest v0.120.1-0.20250226024140-8099e51f9a77
1314
go.uber.org/goleak v1.3.0
1415
go.uber.org/zap v1.27.0
16+
k8s.io/apimachinery v0.32.2
1517
k8s.io/client-go v0.32.2
18+
k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738
1619
)
1720

1821
require (
@@ -30,7 +33,6 @@ require (
3033
github.com/google/gnostic-models v0.6.8 // indirect
3134
github.com/google/go-cmp v0.6.0 // indirect
3235
github.com/google/gofuzz v1.2.0 // indirect
33-
github.com/google/uuid v1.6.0 // indirect
3436
github.com/josharian/intern v1.0.0 // indirect
3537
github.com/json-iterator/go v1.1.12 // indirect
3638
github.com/knadh/koanf/maps v0.1.1 // indirect
@@ -69,10 +71,8 @@ require (
6971
gopkg.in/inf.v0 v0.9.1 // indirect
7072
gopkg.in/yaml.v3 v3.0.1 // indirect
7173
k8s.io/api v0.32.2 // indirect
72-
k8s.io/apimachinery v0.32.2 // indirect
7374
k8s.io/klog/v2 v2.130.1 // indirect
7475
k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f // indirect
75-
k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 // indirect
7676
sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect
7777
sigs.k8s.io/structured-merge-diff/v4 v4.4.3 // indirect
7878
sigs.k8s.io/yaml v1.4.0 // indirect
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package k8sleaderelector // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector"
5+
6+
import (
7+
"context"
8+
9+
"k8s.io/client-go/kubernetes"
10+
"k8s.io/client-go/tools/leaderelection"
11+
"k8s.io/client-go/tools/leaderelection/resourcelock"
12+
)
13+
14+
func newK8sLeaderElector(
15+
cfg *Config,
16+
client kubernetes.Interface,
17+
onStartedLeading func(context.Context),
18+
onStoppedLeading func(),
19+
identity string,
20+
) (*leaderelection.LeaderElector, error) {
21+
resourceLock, err := resourcelock.New(
22+
resourcelock.LeasesResourceLock,
23+
cfg.LeaseNamespace,
24+
cfg.LeaseName,
25+
client.CoreV1(),
26+
client.CoordinationV1(),
27+
resourcelock.ResourceLockConfig{
28+
Identity: identity,
29+
})
30+
if err != nil {
31+
return nil, err
32+
}
33+
34+
leConfig := leaderelection.LeaderElectionConfig{
35+
Lock: resourceLock,
36+
LeaseDuration: cfg.LeaseDuration,
37+
RenewDeadline: cfg.RenewDuration,
38+
RetryPeriod: cfg.RetryPeriod,
39+
Callbacks: leaderelection.LeaderCallbacks{
40+
OnStartedLeading: onStartedLeading,
41+
OnStoppedLeading: onStoppedLeading,
42+
},
43+
}
44+
45+
return leaderelection.NewLeaderElector(leConfig)
46+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package k8sleaderelector
2+
3+
// Copyright The OpenTelemetry Authors
4+
// SPDX-License-Identifier: Apache-2.0
5+
6+
import (
7+
"context"
8+
"testing"
9+
"time"
10+
11+
"github.com/stretchr/testify/require"
12+
"k8s.io/client-go/kubernetes/fake"
13+
)
14+
15+
func TestLeaderElector(t *testing.T) {
16+
fakeClient := fake.NewClientset()
17+
onStartedLeading := func(_ context.Context) {}
18+
onStoppedLeading := func() {}
19+
leConfig := Config{
20+
LeaseName: "foo",
21+
LeaseNamespace: "bar",
22+
LeaseDuration: 20 * time.Second,
23+
RenewDuration: 10 * time.Second,
24+
RetryPeriod: 2 * time.Second,
25+
}
26+
27+
leaderElector, err := newK8sLeaderElector(&leConfig, fakeClient, onStartedLeading, onStoppedLeading, "host1")
28+
require.NoError(t, err)
29+
require.NotNil(t, leaderElector)
30+
}

0 commit comments

Comments
 (0)