Skip to content

Commit

Permalink
Upgrade knative.dev/eventing to latest 1.15 (#4112)
Browse files Browse the repository at this point in the history
* Upgrade knative.dev/eventing to latest 1.15

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Pass PodLister as expected

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Migrate to library SchedulerFunc to use the new signature

Signed-off-by: Pierangelo Di Pilato <[email protected]>

---------

Signed-off-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
pierDipi authored Sep 24, 2024
1 parent f62e51f commit 0c77381
Show file tree
Hide file tree
Showing 11 changed files with 226 additions and 199 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ func (r *Reconciler) schedule(ctx context.Context, cg *kafkainternals.ConsumerGr
return cg.MarkScheduleConsumerFailed("Schedule", err)
}

placements, err := statefulSetScheduler.Schedule(cg)
placements, err := statefulSetScheduler.Schedule(ctx, cg)
if err != nil {
return cg.MarkScheduleConsumerFailed("Schedule", err)
}
Expand Down
48 changes: 21 additions & 27 deletions control-plane/pkg/reconciler/consumergroup/consumergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,6 @@ import (
kedaclient "knative.dev/eventing-kafka-broker/third_party/pkg/client/injection/client/fake"
)

type SchedulerFunc func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error)

func (f SchedulerFunc) Schedule(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return f(vpod)
}

const (
testSchedulerKey = "scheduler"
noTestScheduler = "no-scheduler"
Expand Down Expand Up @@ -102,7 +96,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 1},
Expand Down Expand Up @@ -189,7 +183,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 1},
Expand Down Expand Up @@ -307,7 +301,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 1},
Expand Down Expand Up @@ -402,7 +396,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 1},
Expand Down Expand Up @@ -528,7 +522,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 1},
Expand Down Expand Up @@ -702,7 +696,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 1},
Expand Down Expand Up @@ -877,7 +871,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 1},
Expand Down Expand Up @@ -1034,7 +1028,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
}, nil
Expand Down Expand Up @@ -1121,7 +1115,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 1},
Expand Down Expand Up @@ -1208,7 +1202,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 1},
Expand Down Expand Up @@ -1303,7 +1297,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 2},
Expand Down Expand Up @@ -1426,7 +1420,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 2},
Expand Down Expand Up @@ -1533,7 +1527,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 1},
Expand Down Expand Up @@ -1630,7 +1624,7 @@ func TestReconcileKind(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return nil, io.EOF
}),
},
Expand Down Expand Up @@ -1758,7 +1752,7 @@ func TestReconcileKindNoAutoscaler(t *testing.T) {
},
Key: ConsumerGroupTestKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
{PodName: "p2", VReplicas: 1},
Expand Down Expand Up @@ -1922,7 +1916,7 @@ func TestFinalizeKind(t *testing.T) {
},
Key: testKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return nil, nil
}),
},
Expand Down Expand Up @@ -1991,7 +1985,7 @@ func TestFinalizeKind(t *testing.T) {
},
Key: testKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return nil, nil
}),
},
Expand Down Expand Up @@ -2117,7 +2111,7 @@ func TestFinalizeKind(t *testing.T) {
},
Key: testKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return nil, nil
}),
},
Expand Down Expand Up @@ -2163,7 +2157,7 @@ func TestFinalizeKind(t *testing.T) {
},
Key: testKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return nil, nil
}),
kafkatesting.ErrorOnDeleteConsumerGroupTestKey: sarama.ErrUnknownTopicOrPartition,
Expand Down Expand Up @@ -2210,7 +2204,7 @@ func TestFinalizeKind(t *testing.T) {
},
Key: testKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return nil, nil
}),
kafkatesting.ErrorOnDeleteConsumerGroupTestKey: sarama.ErrGroupIDNotFound,
Expand Down Expand Up @@ -2258,7 +2252,7 @@ func TestFinalizeKind(t *testing.T) {
WantErr: true,
Key: testKey,
OtherTestData: map[string]interface{}{
testSchedulerKey: SchedulerFunc(func(vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
testSchedulerKey: scheduler.SchedulerFunc(func(ctx context.Context, vpod scheduler.VPod) ([]eventingduckv1alpha1.Placement, error) {
return nil, nil
}),
kafkatesting.ErrorOnDeleteConsumerGroupTestKey: sarama.ErrClusterAuthorizationFailed,
Expand Down
18 changes: 11 additions & 7 deletions control-plane/pkg/reconciler/consumergroup/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/storage/names"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"

"knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing"
Expand Down Expand Up @@ -106,6 +107,9 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I
logger.Panicf("unable to process required environment variables: %v", err)
}

dispatcherPodInformer := podinformer.Get(ctx, eventing.DispatcherLabelSelectorStr)
dispatcherPodLister := dispatcherPodInformer.Lister()

c := SchedulerConfig{
RefreshPeriod: time.Duration(env.SchedulerRefreshPeriod) * time.Second,
Capacity: env.PodCapacity,
Expand All @@ -114,13 +118,11 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I
}

schedulers := map[string]Scheduler{
KafkaSourceScheduler: createKafkaScheduler(ctx, c, kafkainternals.SourceStatefulSetName),
KafkaTriggerScheduler: createKafkaScheduler(ctx, c, kafkainternals.BrokerStatefulSetName),
KafkaChannelScheduler: createKafkaScheduler(ctx, c, kafkainternals.ChannelStatefulSetName),
KafkaSourceScheduler: createKafkaScheduler(ctx, c, dispatcherPodLister, kafkainternals.SourceStatefulSetName),
KafkaTriggerScheduler: createKafkaScheduler(ctx, c, dispatcherPodLister, kafkainternals.BrokerStatefulSetName),
KafkaChannelScheduler: createKafkaScheduler(ctx, c, dispatcherPodLister, kafkainternals.ChannelStatefulSetName),
}

dispatcherPodInformer := podinformer.Get(ctx, eventing.DispatcherLabelSelectorStr)

r := &Reconciler{
SchedulerFunc: func(s string) (Scheduler, bool) { sched, ok := schedulers[strings.ToLower(s)]; return sched, ok },
ConsumerLister: consumer.Get(ctx).Lister(),
Expand Down Expand Up @@ -325,10 +327,11 @@ func enqueueConsumerGroupFromConsumer(enqueue func(name types.NamespacedName)) f
}
}

func createKafkaScheduler(ctx context.Context, c SchedulerConfig, ssName string) Scheduler {
func createKafkaScheduler(ctx context.Context, c SchedulerConfig, podLister corelisters.PodLister, ssName string) Scheduler {
lister := consumergroup.Get(ctx).Lister()
return createStatefulSetScheduler(
ctx,
podLister,
SchedulerConfig{
StatefulSetName: ssName,
RefreshPeriod: c.RefreshPeriod,
Expand Down Expand Up @@ -370,7 +373,7 @@ func getSelectorLabel(ssName string) map[string]string {
return selectorLabel
}

func createStatefulSetScheduler(ctx context.Context, c SchedulerConfig, lister scheduler.VPodLister) Scheduler {
func createStatefulSetScheduler(ctx context.Context, podLister corelisters.PodLister, c SchedulerConfig, lister scheduler.VPodLister) Scheduler {
ss, _ := statefulsetscheduler.New(ctx, &statefulsetscheduler.Config{
StatefulSetNamespace: system.Namespace(),
StatefulSetName: c.StatefulSetName,
Expand All @@ -383,6 +386,7 @@ func createStatefulSetScheduler(ctx context.Context, c SchedulerConfig, lister s
Evictor: newEvictor(ctx, zap.String("kafka.eventing.knative.dev/component", "evictor")).evict,
VPodLister: lister,
NodeLister: nodeinformer.Get(ctx).Lister(),
PodLister: podLister.Pods(system.Namespace()),
})

return Scheduler{
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ require (
k8s.io/apiserver v0.29.2
k8s.io/client-go v0.29.2
k8s.io/utils v0.0.0-20240102154912-e7106e64919e
knative.dev/eventing v0.42.0
knative.dev/eventing v0.42.2-0.20240923151015-7fedbd08af70
knative.dev/hack v0.0.0-20240704013904-b9799599afcf
knative.dev/pkg v0.0.0-20240716082220-4355f0c73608
knative.dev/reconciler-test v0.0.0-20240716134925-00d94f40c470
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1213,8 +1213,8 @@ k8s.io/utils v0.0.0-20200912215256-4140de9c8800/go.mod h1:jPW/WVKK9YHAvNhRxK0md/
k8s.io/utils v0.0.0-20210819203725-bdf08cb9a70a/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20240102154912-e7106e64919e h1:eQ/4ljkx21sObifjzXwlPKpdGLrCfRziVtos3ofG/sQ=
k8s.io/utils v0.0.0-20240102154912-e7106e64919e/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
knative.dev/eventing v0.42.0 h1:pbPPhV4JlgpHBZxLBhJTUf+4HuZe5y/zlkOGHZfvtZ0=
knative.dev/eventing v0.42.0/go.mod h1:hW5BMYcihtCelT9pqaMtK8gmNOo1ybxcigjBY+/fU+k=
knative.dev/eventing v0.42.2-0.20240923151015-7fedbd08af70 h1:Cf6YhPrDySVcyIqHcvBonCQpyt0hlEYJuIF9pF5zIVo=
knative.dev/eventing v0.42.2-0.20240923151015-7fedbd08af70/go.mod h1:hW5BMYcihtCelT9pqaMtK8gmNOo1ybxcigjBY+/fU+k=
knative.dev/hack v0.0.0-20240704013904-b9799599afcf h1:n92FmZRywgtHso7pFAku7CW0qvRAs1hXtMQqO0R6eiE=
knative.dev/hack v0.0.0-20240704013904-b9799599afcf/go.mod h1:yk2OjGDsbEnQjfxdm0/HJKS2WqTLEFg/N6nUs6Rqx3Q=
knative.dev/pkg v0.0.0-20240716082220-4355f0c73608 h1:BOiRzcnRS9Z5ruxlCiS/K1/Hb5bUN0X4W3xCegdcYQE=
Expand Down
8 changes: 4 additions & 4 deletions vendor/knative.dev/eventing/pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,18 @@ type Evictor func(pod *corev1.Pod, vpod VPod, from *duckv1alpha1.Placement) erro
// Scheduler is responsible for placing VPods into real Kubernetes pods
type Scheduler interface {
// Schedule computes the new set of placements for vpod.
Schedule(vpod VPod) ([]duckv1alpha1.Placement, error)
Schedule(ctx context.Context, vpod VPod) ([]duckv1alpha1.Placement, error)
}

// SchedulerFunc type is an adapter to allow the use of
// ordinary functions as Schedulers. If f is a function
// with the appropriate signature, SchedulerFunc(f) is a
// Scheduler that calls f.
type SchedulerFunc func(vpod VPod) ([]duckv1alpha1.Placement, error)
type SchedulerFunc func(ctx context.Context, vpod VPod) ([]duckv1alpha1.Placement, error)

// Schedule implements the Scheduler interface.
func (f SchedulerFunc) Schedule(vpod VPod) ([]duckv1alpha1.Placement, error) {
return f(vpod)
func (f SchedulerFunc) Schedule(ctx context.Context, vpod VPod) ([]duckv1alpha1.Placement, error) {
return f(ctx, vpod)
}

// VPod represents virtual replicas placed into real Kubernetes pods
Expand Down
9 changes: 5 additions & 4 deletions vendor/knative.dev/eventing/pkg/scheduler/state/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"

"knative.dev/eventing/pkg/scheduler"
)

Expand Down Expand Up @@ -55,10 +56,10 @@ func SatisfyZoneAvailability(feasiblePods []int32, states *State) bool {
var zoneName string
var err error
for _, podID := range feasiblePods {
wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
zoneName, _, err = states.GetPodInfo(PodNameFromOrdinal(states.StatefulSetName, podID))
return err == nil, nil
})
zoneName, _, err = states.GetPodInfo(PodNameFromOrdinal(states.StatefulSetName, podID))
if err != nil {
continue
}
zoneMap[zoneName] = struct{}{}
}
return len(zoneMap) == int(states.NumZones)
Expand Down
Loading

0 comments on commit 0c77381

Please sign in to comment.