Skip to content

Commit 36b3357

Browse files
author
Per Goncalves da Silva
committed
queue item fix
Signed-off-by: Per Goncalves da Silva <[email protected]>
1 parent 25e0255 commit 36b3357

File tree

12 files changed

+116
-83
lines changed

12 files changed

+116
-83
lines changed

Diff for: pkg/controller/operators/catalog/operator.go

+35-22
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111
"sync"
1212
"time"
1313

14+
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate"
15+
1416
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/labeller"
1517
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/validatingroundtripper"
1618
errorwrap "github.com/pkg/errors"
@@ -114,7 +116,7 @@ type Operator struct {
114116
subQueueSet *queueinformer.ResourceQueueSet
115117
ipQueueSet *queueinformer.ResourceQueueSet
116118
ogQueueSet *queueinformer.ResourceQueueSet
117-
nsResolveQueue workqueue.TypedRateLimitingInterface[any]
119+
nsResolveQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]
118120
namespace string
119121
recorder record.EventRecorder
120122
sources *grpc.SourceStore
@@ -268,8 +270,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
268270
// Wire InstallPlans
269271
ipInformer := crInformerFactory.Operators().V1alpha1().InstallPlans()
270272
op.lister.OperatorsV1alpha1().RegisterInstallPlanLister(metav1.NamespaceAll, ipInformer.Lister())
271-
ipQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
272-
workqueue.TypedRateLimitingQueueConfig[any]{
273+
ipQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
274+
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
275+
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
273276
Name: "ips",
274277
})
275278
op.ipQueueSet.Set(metav1.NamespaceAll, ipQueue)
@@ -290,8 +293,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
290293

291294
operatorGroupInformer := crInformerFactory.Operators().V1().OperatorGroups()
292295
op.lister.OperatorsV1().RegisterOperatorGroupLister(metav1.NamespaceAll, operatorGroupInformer.Lister())
293-
ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
294-
workqueue.TypedRateLimitingQueueConfig[any]{
296+
ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
297+
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
298+
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
295299
Name: "ogs",
296300
})
297301
op.ogQueueSet.Set(metav1.NamespaceAll, ogQueue)
@@ -312,8 +316,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
312316
// Wire CatalogSources
313317
catsrcInformer := crInformerFactory.Operators().V1alpha1().CatalogSources()
314318
op.lister.OperatorsV1alpha1().RegisterCatalogSourceLister(metav1.NamespaceAll, catsrcInformer.Lister())
315-
catsrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
316-
workqueue.TypedRateLimitingQueueConfig[any]{
319+
catsrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
320+
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
321+
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
317322
Name: "catsrcs",
318323
})
319324
op.catsrcQueueSet.Set(metav1.NamespaceAll, catsrcQueue)
@@ -341,8 +346,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
341346
subIndexer := subInformer.Informer().GetIndexer()
342347
op.catalogSubscriberIndexer[metav1.NamespaceAll] = subIndexer
343348

344-
subQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
345-
workqueue.TypedRateLimitingQueueConfig[any]{
349+
subQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
350+
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
351+
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
346352
Name: "subs",
347353
})
348354
op.subQueueSet.Set(metav1.NamespaceAll, subQueue)
@@ -415,9 +421,12 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
415421
logger := op.logger.WithFields(logrus.Fields{"gvr": gvr.String(), "index": idx})
416422
logger.Info("registering labeller")
417423

418-
queue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{
419-
Name: gvr.String(),
420-
})
424+
queue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
425+
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
426+
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
427+
Name: gvr.String(),
428+
},
429+
)
421430
queueInformer, err := queueinformer.NewQueueInformer(
422431
ctx,
423432
queueinformer.WithQueue(queue),
@@ -560,9 +569,12 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
560569
logger := op.logger.WithFields(logrus.Fields{"gvr": gvr.String()})
561570
logger.Info("registering owner reference fixer")
562571

563-
queue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{
564-
Name: gvr.String(),
565-
})
572+
queue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
573+
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
574+
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
575+
Name: gvr.String(),
576+
},
577+
)
566578
queueInformer, err := queueinformer.NewQueueInformer(
567579
ctx,
568580
queueinformer.WithQueue(queue),
@@ -745,8 +757,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
745757
// Namespace sync for resolving subscriptions
746758
namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), resyncPeriod()).Core().V1().Namespaces()
747759
op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister())
748-
op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
749-
workqueue.TypedRateLimitingQueueConfig[any]{
760+
op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
761+
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
762+
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
750763
Name: "resolve",
751764
})
752765
namespaceQueueInformer, err := queueinformer.NewQueueInformer(
@@ -787,12 +800,12 @@ func (o *Operator) syncSourceState(state grpc.SourceState) {
787800

788801
if err == nil {
789802
for ns := range namespaces {
790-
o.nsResolveQueue.Add(ns)
803+
o.nsResolveQueue.Add(kubestate.NewUpdateEvent(ns))
791804
}
792805
}
793806
}
794807

795-
o.nsResolveQueue.Add(state.Key.Namespace)
808+
o.nsResolveQueue.Add(kubestate.NewUpdateEvent(state.Key.Namespace))
796809
}
797810
if err := o.catsrcQueueSet.Requeue(state.Key.Namespace, state.Key.Name); err != nil {
798811
o.logger.WithError(err).Info("couldn't requeue catalogsource from catalog status change")
@@ -1411,7 +1424,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
14111424
}
14121425

14131426
logger.Info("unpacking is not complete yet, requeueing")
1414-
o.nsResolveQueue.AddAfter(namespace, 5*time.Second)
1427+
o.nsResolveQueue.AddAfter(kubestate.NewUpdateEvent(namespace), 5*time.Second)
14151428
return nil
14161429
}
14171430
}
@@ -1506,7 +1519,7 @@ func (o *Operator) syncSubscriptions(obj interface{}) error {
15061519
return fmt.Errorf("casting Subscription failed")
15071520
}
15081521

1509-
o.nsResolveQueue.Add(sub.GetNamespace())
1522+
o.nsResolveQueue.Add(kubestate.NewUpdateEvent(sub.GetNamespace()))
15101523

15111524
return nil
15121525
}
@@ -1520,7 +1533,7 @@ func (o *Operator) syncOperatorGroups(obj interface{}) error {
15201533
return fmt.Errorf("casting OperatorGroup failed")
15211534
}
15221535

1523-
o.nsResolveQueue.Add(og.GetNamespace())
1536+
o.nsResolveQueue.Add(kubestate.NewUpdateEvent(og.GetNamespace()))
15241537

15251538
return nil
15261539
}

Diff for: pkg/controller/operators/catalog/operator_test.go

+7-5
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import (
1313
"testing/quick"
1414
"time"
1515

16+
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate"
17+
1618
"k8s.io/utils/ptr"
1719

1820
controllerclient "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/controller-runtime/client"
@@ -2156,13 +2158,13 @@ func NewFakeOperator(ctx context.Context, namespace string, namespaces []string,
21562158
client: clientFake,
21572159
lister: lister,
21582160
namespace: namespace,
2159-
nsResolveQueue: workqueue.NewTypedRateLimitingQueueWithConfig[any](
2160-
workqueue.NewTypedMaxOfRateLimiter[any](
2161-
workqueue.NewTypedItemExponentialFailureRateLimiter[any](1*time.Second, 1000*time.Second),
2161+
nsResolveQueue: workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
2162+
workqueue.NewTypedMaxOfRateLimiter[kubestate.ResourceEvent](
2163+
workqueue.NewTypedItemExponentialFailureRateLimiter[kubestate.ResourceEvent](1*time.Second, 1000*time.Second),
21622164
// 1 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
2163-
&workqueue.TypedBucketRateLimiter[any]{Limiter: rate.NewLimiter(rate.Limit(1), 100)},
2165+
&workqueue.TypedBucketRateLimiter[kubestate.ResourceEvent]{Limiter: rate.NewLimiter(rate.Limit(1), 100)},
21642166
),
2165-
workqueue.TypedRateLimitingQueueConfig[any]{
2167+
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
21662168
Name: "resolver",
21672169
}),
21682170
resolver: config.resolver,

Diff for: pkg/controller/operators/catalog/subscription/config.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ type syncerConfig struct {
2323
subscriptionInformer cache.SharedIndexInformer
2424
catalogInformer cache.SharedIndexInformer
2525
installPlanInformer cache.SharedIndexInformer
26-
subscriptionQueue workqueue.TypedRateLimitingInterface[any]
26+
subscriptionQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]
2727
reconcilers kubestate.ReconcilerChain
2828
registryReconcilerFactory reconciler.RegistryReconcilerFactory
2929
globalCatalogNamespace string
@@ -97,7 +97,7 @@ func WithOperatorLister(lister operatorlister.OperatorLister) SyncerOption {
9797
}
9898

9999
// WithSubscriptionQueue sets a syncer's subscription queue.
100-
func WithSubscriptionQueue(subscriptionQueue workqueue.TypedRateLimitingInterface[any]) SyncerOption {
100+
func WithSubscriptionQueue(subscriptionQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]) SyncerOption {
101101
return func(config *syncerConfig) {
102102
config.subscriptionQueue = subscriptionQueue
103103
}

Diff for: pkg/controller/operators/catalogtemplate/operator.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"strings"
77
"time"
88

9+
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate"
10+
911
"github.com/distribution/reference"
1012
"github.com/operator-framework/api/pkg/operators/v1alpha1"
1113
"github.com/sirupsen/logrus"
@@ -101,8 +103,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, logger *logrus.Logg
101103
// Wire CatalogSources
102104
catsrcInformer := crInformerFactory.Operators().V1alpha1().CatalogSources()
103105
op.lister.OperatorsV1alpha1().RegisterCatalogSourceLister(metav1.NamespaceAll, catsrcInformer.Lister())
104-
catalogTemplateSrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
105-
workqueue.TypedRateLimitingQueueConfig[any]{
106+
catalogTemplateSrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
107+
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
108+
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
106109
Name: "catalogSourceTemplate",
107110
})
108111
op.catalogSourceTemplateQueueSet.Set(metav1.NamespaceAll, catalogTemplateSrcQueue)

Diff for: pkg/controller/operators/olm/operator.go

+35-26
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"sync"
99
"time"
1010

11+
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate"
12+
1113
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/labeller"
1214
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/olm/plugins"
1315
"github.com/sirupsen/logrus"
@@ -83,11 +85,11 @@ type Operator struct {
8385
copiedCSVLister metadatalister.Lister
8486
ogQueueSet *queueinformer.ResourceQueueSet
8587
csvQueueSet *queueinformer.ResourceQueueSet
86-
olmConfigQueue workqueue.TypedRateLimitingInterface[any]
88+
olmConfigQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]
8789
csvCopyQueueSet *queueinformer.ResourceQueueSet
8890
copiedCSVGCQueueSet *queueinformer.ResourceQueueSet
89-
nsQueueSet workqueue.TypedRateLimitingInterface[any]
90-
apiServiceQueue workqueue.TypedRateLimitingInterface[any]
91+
nsQueueSet workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]
92+
apiServiceQueue workqueue.TypedRateLimitingInterface[kubestate.ResourceEvent]
9193
csvIndexers map[string]cache.Indexer
9294
recorder record.EventRecorder
9395
resolver install.StrategyResolverInterface
@@ -198,17 +200,17 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
198200
client: config.externalClient,
199201
ogQueueSet: queueinformer.NewEmptyResourceQueueSet(),
200202
csvQueueSet: queueinformer.NewEmptyResourceQueueSet(),
201-
olmConfigQueue: workqueue.NewTypedRateLimitingQueueWithConfig[any](
202-
workqueue.DefaultTypedControllerRateLimiter[any](),
203-
workqueue.TypedRateLimitingQueueConfig[any]{
203+
olmConfigQueue: workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
204+
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
205+
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
204206
Name: "olmConfig",
205207
}),
206208

207209
csvCopyQueueSet: queueinformer.NewEmptyResourceQueueSet(),
208210
copiedCSVGCQueueSet: queueinformer.NewEmptyResourceQueueSet(),
209-
apiServiceQueue: workqueue.NewTypedRateLimitingQueueWithConfig[any](
210-
workqueue.DefaultTypedControllerRateLimiter[any](),
211-
workqueue.TypedRateLimitingQueueConfig[any]{
211+
apiServiceQueue: workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
212+
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
213+
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
212214
Name: "apiservice",
213215
}),
214216
resolver: config.strategyResolver,
@@ -246,9 +248,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
246248
).Operators().V1alpha1().ClusterServiceVersions()
247249
informersByNamespace[namespace].CSVInformer = csvInformer
248250
op.lister.OperatorsV1alpha1().RegisterClusterServiceVersionLister(namespace, csvInformer.Lister())
249-
csvQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](
250-
workqueue.DefaultTypedControllerRateLimiter[any](),
251-
workqueue.TypedRateLimitingQueueConfig[any]{
251+
csvQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
252+
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
253+
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
252254
Name: fmt.Sprintf("%s/csv", namespace),
253255
})
254256
op.csvQueueSet.Set(namespace, csvQueue)
@@ -273,7 +275,11 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
273275
op.csvIndexers[namespace] = csvIndexer
274276

275277
// Register separate queue for copying csvs
276-
csvCopyQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any](), fmt.Sprintf("%s/csv-copy", namespace))
278+
csvCopyQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
279+
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
280+
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
281+
Name: fmt.Sprintf("%s/csv-copy", namespace),
282+
})
277283
op.csvCopyQueueSet.Set(namespace, csvCopyQueue)
278284
csvCopyQueueInformer, err := queueinformer.NewQueueInformer(
279285
ctx,
@@ -307,9 +313,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
307313
informersByNamespace[namespace].CopiedCSVLister = op.copiedCSVLister
308314

309315
// Register separate queue for gcing copied csvs
310-
copiedCSVGCQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](
311-
workqueue.DefaultTypedControllerRateLimiter[any](),
312-
workqueue.TypedRateLimitingQueueConfig[any]{
316+
copiedCSVGCQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
317+
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
318+
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
313319
Name: fmt.Sprintf("%s/csv-gc", namespace),
314320
})
315321
op.copiedCSVGCQueueSet.Set(namespace, copiedCSVGCQueue)
@@ -333,9 +339,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
333339
operatorGroupInformer := extInformerFactory.Operators().V1().OperatorGroups()
334340
informersByNamespace[namespace].OperatorGroupInformer = operatorGroupInformer
335341
op.lister.OperatorsV1().RegisterOperatorGroupLister(namespace, operatorGroupInformer.Lister())
336-
ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](
337-
workqueue.DefaultTypedControllerRateLimiter[any](),
338-
workqueue.TypedRateLimitingQueueConfig[any]{
342+
ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
343+
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
344+
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
339345
Name: fmt.Sprintf("%s/og", namespace),
340346
})
341347
op.ogQueueSet.Set(namespace, ogQueue)
@@ -522,9 +528,12 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
522528
logger := op.logger.WithFields(logrus.Fields{"gvr": gvr.String(), "index": idx})
523529
logger.Info("registering labeller")
524530

525-
queue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{
526-
Name: gvr.String(),
527-
})
531+
queue := workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
532+
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
533+
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
534+
Name: gvr.String(),
535+
},
536+
)
528537
queueInformer, err := queueinformer.NewQueueInformer(
529538
ctx,
530539
queueinformer.WithQueue(queue),
@@ -696,9 +705,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
696705
namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), config.resyncPeriod()).Core().V1().Namespaces()
697706
informersByNamespace[metav1.NamespaceAll].NamespaceInformer = namespaceInformer
698707
op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister())
699-
op.nsQueueSet = workqueue.NewTypedRateLimitingQueueWithConfig[any](
700-
workqueue.DefaultTypedControllerRateLimiter[any](),
701-
workqueue.TypedRateLimitingQueueConfig[any]{
708+
op.nsQueueSet = workqueue.NewTypedRateLimitingQueueWithConfig[kubestate.ResourceEvent](
709+
workqueue.DefaultTypedControllerRateLimiter[kubestate.ResourceEvent](),
710+
workqueue.TypedRateLimitingQueueConfig[kubestate.ResourceEvent]{
702711
Name: "resolver",
703712
})
704713
namespaceInformer.Informer().AddEventHandler(
@@ -1665,7 +1674,7 @@ func (a *Operator) syncCopyCSV(obj interface{}) (syncError error) {
16651674
}
16661675

16671676
if err == nil {
1668-
go a.olmConfigQueue.AddAfter(olmConfig, time.Second*5)
1677+
go a.olmConfigQueue.AddAfter(kubestate.NewUpdateEvent(olmConfig), time.Second*5)
16691678
}
16701679

16711680
logger := a.logger.WithFields(logrus.Fields{

Diff for: pkg/controller/operators/olm/operatorgroup.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"reflect"
99
"strings"
1010

11+
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate"
12+
1113
"k8s.io/apimachinery/pkg/api/equality"
1214

1315
"github.com/sirupsen/logrus"
@@ -182,7 +184,7 @@ func (a *Operator) syncOperatorGroups(obj interface{}) error {
182184
logger.Debug("Requeueing out of sync namespaces")
183185
for _, ns := range outOfSyncNamespaces {
184186
logger.WithField("namespace", ns).Debug("requeueing")
185-
a.nsQueueSet.Add(ns)
187+
a.nsQueueSet.Add(kubestate.NewUpdateEvent(ns))
186188
}
187189

188190
// CSV requeue is handled by the succeeding sync in `annotateCSVs`
@@ -263,7 +265,7 @@ func (a *Operator) operatorGroupDeleted(obj interface{}) {
263265
logger.Debug("OperatorGroup deleted, requeueing out of sync namespaces")
264266
for _, ns := range op.Status.Namespaces {
265267
logger.WithField("namespace", ns).Debug("requeueing")
266-
a.nsQueueSet.Add(ns)
268+
a.nsQueueSet.Add(kubestate.NewUpdateEvent(ns))
267269
}
268270
}
269271

Diff for: pkg/lib/kubestate/kubestate.go

+7
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,13 @@ func (r resourceEvent) Resource() interface{} {
163163
return r.resource
164164
}
165165

166+
func NewUpdateEvent(resource interface{}) ResourceEvent {
167+
return resourceEvent{
168+
eventType: ResourceUpdated,
169+
resource: resource,
170+
}
171+
}
172+
166173
func NewResourceEvent(eventType ResourceEventType, resource interface{}) ResourceEvent {
167174
return resourceEvent{
168175
eventType: eventType,

0 commit comments

Comments
 (0)