Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(source/service): watch headless endpoints #4427

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions source/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,4 +717,31 @@ func (sc *serviceSource) AddEventHandler(ctx context.Context, handler func()) {
// Right now there is no way to remove event handler from informer, see:
// https://github.com/kubernetes/kubernetes/issues/79610
sc.serviceInformer.Informer().AddEventHandler(eventHandlerFunc(handler))

// Check updates in headless endpoints.
// It's ignores adds and deletes to avoid unnecessary handler processing.
sc.endpointsInformer.Informer().AddEventHandler(headlessEndpointFilter{eventHandlerFunc(handler)})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather like to see an opt-in approach rather than having this added.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, I can add label selector to a shared informer factory, but I need to create another one, something like this:

	headlessLabelSelector, err := labels.NewRequirement(v1.IsHeadlessService, selection.Exists, nil)
	if err != nil {
		return nil, err
	}

	headlessEndpointInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0,
		kubeinformers.WithNamespace(namespace),
		kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) {
			options.LabelSelector = headlessLabelSelector.String()
		}),
	)
	
	endpointsInformer := headlessEndpointInformerFactory.Core().V1().Endpoints()
	
	// duplicate Start and waitForCacheSync funcs for this factory

Is that OK?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, do I need to do something?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@szuecs @mloiseleur Can you please reply?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mloiseleur Are you sure it’s not enabled by default? It would be strange behavior if you don’t use the filter but still don’t get everything. Additionally, this filter seems to work differently compared to the others.

Copy link
Collaborator

@mloiseleur mloiseleur Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On v0.15.0, it's all:

  --service-type-filter=SERVICE-TYPE-FILTER ...  
         The service types to take care about (default: all, 
               expected: ClusterIP, NodePort, LoadBalancer or ExternalName)

I think you're right: it works quite differently from the other service types. It would be quite unexpected to enable/disable an informer with this filter.

So I guess it should be opt-in with a new option on --source (something like headless-service for instance)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, I think it better to add explicit type in --service-type-filter, enabled by default.
Anyway, it's breaking changes.

With a new source, 2 sources are triggered when creating/updating/deleting services (headless or not) because there isn't any filter in the event handler. Is that OK?

With a new service type (enabled by default), the source can be triggered more often.
For users, who use external-dns for headless services, this change would be good (allows increasing sync interval)
For users, who don't have headless services in a cluster, nothing is changed
Bad case, when a user has a headless service, but doesn't use external-dns for managing records

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the last proposal from @tufitko makes sense to me. I think this is more of a bug fix than something we need to carefully feature flag and it's gonna be in the new release only anyway. wdyt @mloiseleur ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Raffo that works for me.

@tufitko any questions left ?

}

type headlessEndpointFilter struct {
handler cache.ResourceEventHandler
}

func (f headlessEndpointFilter) OnAdd(obj interface{}, isInInitialList bool) {}

func (f headlessEndpointFilter) OnDelete(obj interface{}) {}

func (f headlessEndpointFilter) OnUpdate(oldObj, newObj interface{}) {
if f.isHeadless(oldObj) || f.isHeadless(newObj) {
f.handler.OnUpdate(oldObj, newObj)
}
}

func (f headlessEndpointFilter) isHeadless(obj interface{}) bool {
meta, ok := obj.(metav1.ObjectMetaAccessor)
if !ok {
return false
}
_, ok = meta.GetObjectMeta().GetLabels()[v1.IsHeadlessService]
return ok
}
54 changes: 54 additions & 0 deletions source/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3770,3 +3770,57 @@ func BenchmarkServiceEndpoints(b *testing.B) {
require.NoError(b, err)
}
}

func TestHeadlessEndpointFilter(t *testing.T) {
mockHandler := &MockHandler{}
filter := &headlessEndpointFilter{
handler: mockHandler,
}

headlessEndpoint := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"service.kubernetes.io/headless": "",
"some": "label",
},
},
}

nonHeadlessEndpoint := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"some": "label",
},
},
}

filter.OnAdd(headlessEndpoint, false)
filter.OnAdd(nonHeadlessEndpoint, false)
filter.OnDelete(headlessEndpoint)
filter.OnDelete(nonHeadlessEndpoint)
filter.OnUpdate(headlessEndpoint, headlessEndpoint)
filter.OnUpdate(nonHeadlessEndpoint, nonHeadlessEndpoint) // noop
filter.OnUpdate(headlessEndpoint, nonHeadlessEndpoint)

require.Zero(t, mockHandler.OnAddCalledCount, "OnAdd should not be called")
require.Zero(t, mockHandler.OnDeleteCalledCount, "OnDelete should not be called")
require.EqualValues(t, 2, mockHandler.OnUpdateCalledCount, "OnUpdate should be called twice")
}

type MockHandler struct {
OnAddCalledCount int
OnUpdateCalledCount int
OnDeleteCalledCount int
}

func (m *MockHandler) OnAdd(_ interface{}, _ bool) {
m.OnAddCalledCount++
}

func (m *MockHandler) OnUpdate(_, _ interface{}) {
m.OnUpdateCalledCount++
}

func (m *MockHandler) OnDelete(_ interface{}) {
m.OnDeleteCalledCount++
}
Loading