Skip to content

WIP: Add EnqueueWatching #188

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions managedcache/enqueue_watching.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package managedcache

import (
"context"
"fmt"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

// EnqueueWatchingObjects Enqueues all objects watching the object mentioned in the event, filtered by WatcherType.
type EnqueueWatchingObjects struct {
WatcherRefGetter ownerRefGetter
// WatcherType is the type of the Owner object to look for in OwnerReferences. Only Group and Kind are compared.
WatcherType runtime.Object

scheme *runtime.Scheme
// groupKind is the cached Group and Kind from WatcherType
groupKind schema.GroupKind
}

var _ handler.EventHandler = (*EnqueueWatchingObjects)(nil)

// NewEnqueueWatchingObjects returns a new mapper putting the objects registered watching into the workqueue.
func NewEnqueueWatchingObjects(watcherRefGetter ownerRefGetter,
watcherType runtime.Object,
scheme *runtime.Scheme,
) *EnqueueWatchingObjects {
e := &EnqueueWatchingObjects{
WatcherRefGetter: watcherRefGetter,
WatcherType: watcherType,
scheme: scheme,
}
if err := e.parseWatcherTypeGroupKind(scheme); err != nil {
// This (passing a type that is not in the scheme) HAS
// to be a programmer error and can't be recovered at runtime anyways.
panic(err)

Check warning on line 44 in managedcache/enqueue_watching.go

View check run for this annotation

Codecov / codecov/patch

managedcache/enqueue_watching.go#L35-L44

Added lines #L35 - L44 were not covered by tests
}

return e

Check warning on line 47 in managedcache/enqueue_watching.go

View check run for this annotation

Codecov / codecov/patch

managedcache/enqueue_watching.go#L47

Added line #L47 was not covered by tests
}

type ownerRefGetter interface {
getWatchersForGVK(gvk schema.GroupVersionKind) []accessManagerKey
}

// Create implements handler.EventHandler.
func (e *EnqueueWatchingObjects) Create(_ context.Context, evt event.CreateEvent,
q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
e.enqueueWatchers(evt.Object, q)

Check warning on line 58 in managedcache/enqueue_watching.go

View check run for this annotation

Codecov / codecov/patch

managedcache/enqueue_watching.go#L57-L58

Added lines #L57 - L58 were not covered by tests
}

// Update implements handler.EventHandler.
func (e *EnqueueWatchingObjects) Update(_ context.Context, evt event.UpdateEvent,
q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
e.enqueueWatchers(evt.ObjectNew, q)
e.enqueueWatchers(evt.ObjectOld, q)

Check warning on line 66 in managedcache/enqueue_watching.go

View check run for this annotation

Codecov / codecov/patch

managedcache/enqueue_watching.go#L64-L66

Added lines #L64 - L66 were not covered by tests
}

// Delete implements handler.EventHandler.
func (e *EnqueueWatchingObjects) Delete(_ context.Context, evt event.DeleteEvent,
q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
e.enqueueWatchers(evt.Object, q)

Check warning on line 73 in managedcache/enqueue_watching.go

View check run for this annotation

Codecov / codecov/patch

managedcache/enqueue_watching.go#L72-L73

Added lines #L72 - L73 were not covered by tests
}

// Generic implements handler.EventHandler.
func (e *EnqueueWatchingObjects) Generic(_ context.Context, evt event.GenericEvent,
q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
e.enqueueWatchers(evt.Object, q)

Check warning on line 80 in managedcache/enqueue_watching.go

View check run for this annotation

Codecov / codecov/patch

managedcache/enqueue_watching.go#L79-L80

Added lines #L79 - L80 were not covered by tests
}

func (e *EnqueueWatchingObjects) enqueueWatchers(obj client.Object,
q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
if obj == nil {
return
}

Check warning on line 88 in managedcache/enqueue_watching.go

View check run for this annotation

Codecov / codecov/patch

managedcache/enqueue_watching.go#L85-L88

Added lines #L85 - L88 were not covered by tests

gvk, err := apiutil.GVKForObject(obj, e.scheme)
if err != nil {
// TODO: error reporting?
panic(err)

Check warning on line 93 in managedcache/enqueue_watching.go

View check run for this annotation

Codecov / codecov/patch

managedcache/enqueue_watching.go#L90-L93

Added lines #L90 - L93 were not covered by tests
}

ownerRefs := e.WatcherRefGetter.getWatchersForGVK(gvk)
for _, ownerRef := range ownerRefs {
if ownerRef.Kind != e.groupKind.Kind ||
ownerRef.Group != e.groupKind.Group {
continue

Check warning on line 100 in managedcache/enqueue_watching.go

View check run for this annotation

Codecov / codecov/patch

managedcache/enqueue_watching.go#L96-L100

Added lines #L96 - L100 were not covered by tests
}

q.Add(reconcile.Request{
NamespacedName: types.NamespacedName{
Name: ownerRef.Name,
Namespace: ownerRef.Namespace,
},
})

Check warning on line 108 in managedcache/enqueue_watching.go

View check run for this annotation

Codecov / codecov/patch

managedcache/enqueue_watching.go#L103-L108

Added lines #L103 - L108 were not covered by tests
}
}

// parseOwnerTypeGroupKind parses the WatcherType into a Group and Kind and caches the result. Returns false
// if the WatcherType could not be parsed using the scheme.
func (e *EnqueueWatchingObjects) parseWatcherTypeGroupKind(scheme *runtime.Scheme) error {
// Get the kinds of the type
kinds, _, err := scheme.ObjectKinds(e.WatcherType)
if err != nil {
return err
}

Check warning on line 119 in managedcache/enqueue_watching.go

View check run for this annotation

Codecov / codecov/patch

managedcache/enqueue_watching.go#L114-L119

Added lines #L114 - L119 were not covered by tests
// Expect only 1 kind. If there is more than one kind this is probably an edge case such as ListOptions.
if len(kinds) != 1 {
panic(fmt.Sprintf("Expected exactly 1 kind for WatcherType %T, but found %s kinds", e.WatcherType, kinds))

Check warning on line 122 in managedcache/enqueue_watching.go

View check run for this annotation

Codecov / codecov/patch

managedcache/enqueue_watching.go#L121-L122

Added lines #L121 - L122 were not covered by tests
}
// Cache the Group and Kind for the WatcherType
e.groupKind = schema.GroupKind{Group: kinds[0].Group, Kind: kinds[0].Kind}

return nil

Check warning on line 127 in managedcache/enqueue_watching.go

View check run for this annotation

Codecov / codecov/patch

managedcache/enqueue_watching.go#L125-L127

Added lines #L125 - L127 were not covered by tests
}
94 changes: 75 additions & 19 deletions managedcache/objectboundaccess.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
cacheSourcer: &cacheSource{},
newClient: client.New,

accessors: map[types.UID]accessorEntry{},
accessors: map[accessManagerKey]accessorEntry{},

Check warning on line 76 in managedcache/objectboundaccess.go

View check run for this annotation

Codecov / codecov/patch

managedcache/objectboundaccess.go#L76

Added line #L76 was not covered by tests
accessorRequestCh: make(chan accessorRequest[T]),
accessorStopCh: make(chan accessorRequest[T]),
}
Expand Down Expand Up @@ -103,14 +103,22 @@
cacheSourcer cacheSourcer
newClient newClientFunc

accessors map[types.UID]accessorEntry
accessorsLock sync.RWMutex
accessors map[accessManagerKey]accessorEntry
accessorRequestCh chan accessorRequest[T]
accessorStopCh chan accessorRequest[T]
}

type accessManagerKey struct {
// UID ensures a re-created object also gets it's own cache.
UID types.UID
schema.GroupVersionKind
client.ObjectKey
}

type accessorEntry struct {
accessor Accessor
users map[types.UID]sets.Set[schema.GroupVersionKind]
users map[accessManagerKey]sets.Set[schema.GroupVersionKind]
cancel func()
}

Expand All @@ -128,7 +136,7 @@

type cacheDone struct {
err error
uid types.UID
key accessManagerKey
}

// implements Accessor interface.
Expand All @@ -154,11 +162,8 @@
for {
select {
case done := <-doneCh:
// Remove accessor from list.
delete(m.accessors, done.uid)

if done.err != nil && !errors.Is(done.err, context.Canceled) {
return fmt.Errorf("cache for UID %s crashed: %w", done.uid, done.err)
if err := m.handleCacheDone(ctx, done); err != nil {
return err

Check warning on line 166 in managedcache/objectboundaccess.go

View check run for this annotation

Codecov / codecov/patch

managedcache/objectboundaccess.go#L165-L166

Added lines #L165 - L166 were not covered by tests
}

case req := <-m.accessorRequestCh:
Expand Down Expand Up @@ -191,17 +196,36 @@
}
}

func (m *objectBoundAccessManagerImpl[T]) handleCacheDone(
_ context.Context, done cacheDone,
) error {
m.accessorsLock.Lock()
defer m.accessorsLock.Unlock()

// Remove accessor from list.
delete(m.accessors, done.key)

if done.err != nil && !errors.Is(done.err, context.Canceled) {
return fmt.Errorf("cache for Key %s crashed: %w", done.key, done.err)
}

Check warning on line 210 in managedcache/objectboundaccess.go

View check run for this annotation

Codecov / codecov/patch

managedcache/objectboundaccess.go#L201-L210

Added lines #L201 - L210 were not covered by tests

return nil

Check warning on line 212 in managedcache/objectboundaccess.go

View check run for this annotation

Codecov / codecov/patch

managedcache/objectboundaccess.go#L212

Added line #L212 was not covered by tests
}

func (m *objectBoundAccessManagerImpl[T]) handleAccessorStop(
ctx context.Context, req accessorRequest[T],
) error {
cache, ok := m.accessors[req.owner.GetUID()]
m.accessorsLock.Lock()
defer m.accessorsLock.Unlock()

cache, ok := m.accessors[toAccessManagerKey(req.owner)]

Check warning on line 221 in managedcache/objectboundaccess.go

View check run for this annotation

Codecov / codecov/patch

managedcache/objectboundaccess.go#L218-L221

Added lines #L218 - L221 were not covered by tests
if !ok {
// nothing todo.
return nil
}

if req.user != nil {
delete(cache.users, req.owner.GetUID())
delete(cache.users, toAccessManagerKey(req.user))

Check warning on line 228 in managedcache/objectboundaccess.go

View check run for this annotation

Codecov / codecov/patch

managedcache/objectboundaccess.go#L228

Added line #L228 was not covered by tests
}

return m.gcCache(ctx, req.owner)
Expand All @@ -210,7 +234,9 @@
func (m *objectBoundAccessManagerImpl[T]) gcCache(ctx context.Context, owner T) error {
log := logr.FromContextOrDiscard(ctx)

entry, ok := m.accessors[owner.GetUID()]
key := toAccessManagerKey(owner)

entry, ok := m.accessors[key]

Check warning on line 239 in managedcache/objectboundaccess.go

View check run for this annotation

Codecov / codecov/patch

managedcache/objectboundaccess.go#L237-L239

Added lines #L237 - L239 were not covered by tests
if !ok {
return nil
}
Expand All @@ -219,7 +245,7 @@
// no users left -> close
log.Info("no users left, closing cache")
entry.cancel()
delete(m.accessors, owner.GetUID())
delete(m.accessors, key)

Check warning on line 248 in managedcache/objectboundaccess.go

View check run for this annotation

Codecov / codecov/patch

managedcache/objectboundaccess.go#L248

Added line #L248 was not covered by tests

return nil
}
Expand All @@ -236,18 +262,22 @@
ctx context.Context, req accessorRequest[T],
doneCh chan<- cacheDone, wg *sync.WaitGroup,
) (Accessor, error) {
m.accessorsLock.Lock()
defer m.accessorsLock.Unlock()

Check warning on line 267 in managedcache/objectboundaccess.go

View check run for this annotation

Codecov / codecov/patch

managedcache/objectboundaccess.go#L265-L267

Added lines #L265 - L267 were not covered by tests
log := logr.FromContextOrDiscard(ctx)
log = log.WithValues(
"ownerUID", req.owner.GetUID(),
)
ctx = logr.NewContext(ctx, log)
key := toAccessManagerKey(req.owner)

Check warning on line 273 in managedcache/objectboundaccess.go

View check run for this annotation

Codecov / codecov/patch

managedcache/objectboundaccess.go#L273

Added line #L273 was not covered by tests

entry, ok := m.accessors[req.owner.GetUID()]
entry, ok := m.accessors[key]

Check warning on line 275 in managedcache/objectboundaccess.go

View check run for this annotation

Codecov / codecov/patch

managedcache/objectboundaccess.go#L275

Added line #L275 was not covered by tests
if ok {
log.V(-1).Info("reusing cache for owner")

if req.user != nil {
entry.users[req.user.GetUID()] = req.gvks
entry.users[toAccessManagerKey(req.user)] = req.gvks

Check warning on line 280 in managedcache/objectboundaccess.go

View check run for this annotation

Codecov / codecov/patch

managedcache/objectboundaccess.go#L280

Added line #L280 was not covered by tests
}

return entry.accessor, m.gcCache(ctx, req.owner)
Expand Down Expand Up @@ -281,25 +311,25 @@

entry = accessorEntry{
accessor: a,
users: map[types.UID]sets.Set[schema.GroupVersionKind]{},
users: map[accessManagerKey]sets.Set[schema.GroupVersionKind]{},

Check warning on line 314 in managedcache/objectboundaccess.go

View check run for this annotation

Codecov / codecov/patch

managedcache/objectboundaccess.go#L314

Added line #L314 was not covered by tests
cancel: cancel,
}
if req.user != nil {
entry.users[req.user.GetUID()] = req.gvks
entry.users[toAccessManagerKey(req.user)] = req.gvks

Check warning on line 318 in managedcache/objectboundaccess.go

View check run for this annotation

Codecov / codecov/patch

managedcache/objectboundaccess.go#L318

Added line #L318 was not covered by tests
log = log.WithValues(
"userUID", req.user.GetUID(),
"usedForGVKs", req.gvks.UnsortedList(),
)
}

m.accessors[req.owner.GetUID()] = entry
m.accessors[key] = entry

Check warning on line 325 in managedcache/objectboundaccess.go

View check run for this annotation

Codecov / codecov/patch

managedcache/objectboundaccess.go#L325

Added line #L325 was not covered by tests

log.V(-1).Info("starting new cache")
wg.Add(1)

go func(ctx context.Context, doneCh chan<- cacheDone) {
defer wg.Done()
doneCh <- cacheDone{uid: req.owner.GetUID(), err: ctrlcache.Start(ctx)}
doneCh <- cacheDone{key: key, err: ctrlcache.Start(ctx)}

Check warning on line 332 in managedcache/objectboundaccess.go

View check run for this annotation

Codecov / codecov/patch

managedcache/objectboundaccess.go#L332

Added line #L332 was not covered by tests
}(ctx, doneCh)

return a, nil
Expand Down Expand Up @@ -381,3 +411,29 @@

return err
}

func (m *objectBoundAccessManagerImpl[T]) getWatchersForGVK(gvk schema.GroupVersionKind) (out []accessManagerKey) {
m.accessorsLock.RLock()
defer m.accessorsLock.RUnlock()

for k, a := range m.accessors {
if !sets.New(a.accessor.GetGVKs()...).Has(gvk) {
continue

Check warning on line 421 in managedcache/objectboundaccess.go

View check run for this annotation

Codecov / codecov/patch

managedcache/objectboundaccess.go#L415-L421

Added lines #L415 - L421 were not covered by tests
}

out = append(out, k)
for u := range a.users {
out = append(out, u)
}

Check warning on line 427 in managedcache/objectboundaccess.go

View check run for this annotation

Codecov / codecov/patch

managedcache/objectboundaccess.go#L424-L427

Added lines #L424 - L427 were not covered by tests
}

return out

Check warning on line 430 in managedcache/objectboundaccess.go

View check run for this annotation

Codecov / codecov/patch

managedcache/objectboundaccess.go#L430

Added line #L430 was not covered by tests
}

func toAccessManagerKey[T RefType](owner T) accessManagerKey {
return accessManagerKey{
UID: owner.GetUID(),
ObjectKey: client.ObjectKeyFromObject(owner),
GroupVersionKind: owner.GetObjectKind().GroupVersionKind(),
}

Check warning on line 438 in managedcache/objectboundaccess.go

View check run for this annotation

Codecov / codecov/patch

managedcache/objectboundaccess.go#L433-L438

Added lines #L433 - L438 were not covered by tests
}
11 changes: 11 additions & 0 deletions managedcache/trackingcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@

// RemoveOtherInformers stops all informers that are not needed to watch the given list of object types.
RemoveOtherInformers(ctx context.Context, gvks ...schema.GroupVersionKind) error

// GetGVKs returns a list of GVKs known by this trackingCache.
GetGVKs() []schema.GroupVersionKind
}

type cacheSourcer interface {
Expand Down Expand Up @@ -112,6 +115,14 @@
return wehc, nil
}

// GetGVKs returns a list of GVKs known by this trackingCache.
func (c *trackingCache) GetGVKs() []schema.GroupVersionKind {
c.accessLock.RLock()
defer c.accessLock.RUnlock()

return c.knownInformers.UnsortedList()

Check warning on line 123 in managedcache/trackingcache.go

View check run for this annotation

Codecov / codecov/patch

managedcache/trackingcache.go#L119-L123

Added lines #L119 - L123 were not covered by tests
}

func (c *trackingCache) Source(handler handler.EventHandler, predicates ...predicate.Predicate) source.Source {
return c.cacheSourcer.Source(handler, predicates...)
}
Expand Down