diff --git a/pkg/cmd/run/run.go b/pkg/cmd/run/run.go index 6601dd9..3857981 100644 --- a/pkg/cmd/run/run.go +++ b/pkg/cmd/run/run.go @@ -4,6 +4,7 @@ import ( "context" "github.com/spf13/cobra" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/errors" genericapiserver "k8s.io/apiserver/pkg/server" @@ -160,10 +161,16 @@ func (o *Options) Run(ctx context.Context, f cmdutil.Factory) error { // register with metrics collector spiceDBClusterMetrics := ctrlmetrics.NewConditionStatusCollector[*v1alpha1.SpiceDBCluster](o.MetricNamespace, "clusters", v1alpha1.SpiceDBClusterResourceName) - lister := typed.ListerFor[*v1alpha1.SpiceDBCluster](registry, typed.NewRegistryKey(controller.OwnedFactoryKey, v1alpha1ClusterGVR)) - spiceDBClusterMetrics.AddListerBuilder(func() ([]*v1alpha1.SpiceDBCluster, error) { - return lister.List(labels.Everything()) - }) + + if len(o.WatchNamespaces) == 0 { + o.WatchNamespaces = []string{corev1.NamespaceAll} + } + for _, n := range o.WatchNamespaces { + lister := typed.MustListerForKey[*v1alpha1.SpiceDBCluster](registry, typed.NewRegistryKey(controller.OwnedFactoryKey(n), v1alpha1ClusterGVR)) + spiceDBClusterMetrics.AddListerBuilder(func() ([]*v1alpha1.SpiceDBCluster, error) { + return lister.List(labels.Everything()) + }) + } legacyregistry.CustomMustRegister(spiceDBClusterMetrics) if ctx.Err() != nil { diff --git a/pkg/controller/context.go b/pkg/controller/context.go index f08cd52..bb309dc 100644 --- a/pkg/controller/context.go +++ b/pkg/controller/context.go @@ -16,6 +16,7 @@ import ( var ( QueueOps = queue.NewQueueOperationsCtx() CtxOperatorConfig = typedctx.WithDefault[*config.OperatorConfig](nil) + CtxCacheNamespace = typedctx.WithDefault("") CtxClusterNN = typedctx.WithDefault(types.NamespacedName{}) CtxSecretNN = typedctx.WithDefault(types.NamespacedName{}) CtxSecret = typedctx.WithDefault[*corev1.Secret](nil) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index bb473d4..b62a16b 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -3,9 +3,11 @@ package controller import ( "bytes" "context" + "errors" "fmt" "io" "os" + "strings" "sync" "github.com/authzed/controller-idioms/adopt" @@ -69,14 +71,26 @@ func init() { utilruntime.Must(v1alpha1.AddToScheme(scheme.Scheme)) } -var ( - v1alpha1ClusterGVR = v1alpha1.SchemeGroupVersion.WithResource(v1alpha1.SpiceDBClusterResourceName) - OwnedFactoryKey = typed.NewFactoryKey(v1alpha1.SpiceDBClusterResourceName, "local", "unfiltered") - DependentFactoryKey = typed.NewFactoryKey(v1alpha1.SpiceDBClusterResourceName, "local", "dependents") -) +var v1alpha1ClusterGVR = v1alpha1.SchemeGroupVersion.WithResource(v1alpha1.SpiceDBClusterResourceName) + +func localClusterNamespace(ns string) string { + if ns == "" { + return "local" + } + return strings.Join([]string{"local", ns}, "/") +} + +func OwnedFactoryKey(namespace string) typed.FactoryKey { + return typed.NewFactoryKey(v1alpha1.SpiceDBClusterResourceName, localClusterNamespace(namespace), "unfiltered") +} + +func DependentFactoryKey(namespace string) typed.FactoryKey { + return typed.NewFactoryKey(v1alpha1.SpiceDBClusterResourceName, localClusterNamespace(namespace), "dependents") +} type Controller struct { *manager.OwnedResourceController + namespaces []string client dynamic.Interface kclient kubernetes.Interface resources openapi.Resources @@ -89,10 +103,16 @@ type Controller struct { } func NewController(ctx context.Context, registry *typed.Registry, dclient dynamic.Interface, kclient kubernetes.Interface, resources openapi.Resources, configFilePath string, broadcaster record.EventBroadcaster, namespaces []string) (*Controller, error) { + // If no namespaces are provided, watch all namespaces + if len(namespaces) == 0 { + namespaces = []string{metav1.NamespaceAll} + } + c := Controller{ - client: dclient, - kclient: kclient, - resources: resources, + client: dclient, + kclient: kclient, + resources: resources, + namespaces: namespaces, } c.OwnedResourceController = manager.NewOwnedResourceController( textlogger.NewLogger(textlogger.NewConfig()), @@ -122,15 +142,10 @@ func NewController(ctx context.Context, registry *typed.Registry, dclient dynami logr.FromContextOrDiscard(ctx).V(3).Info("no operator configuration provided", "path", configFilePath) } - // If no namespaces are provided, watch all namespaces - if len(namespaces) == 0 { - namespaces = []string{metav1.NamespaceAll} - } - ownedInformerFactories := make([]dynamicinformer.DynamicSharedInformerFactory, 0, len(namespaces)) for _, ns := range namespaces { ownedInformerFactory := registry.MustNewFilteredDynamicSharedInformerFactory( - OwnedFactoryKey, + OwnedFactoryKey(ns), dclient, 0, ns, @@ -149,7 +164,7 @@ func NewController(ctx context.Context, registry *typed.Registry, dclient dynami externalInformerFactories := make([]dynamicinformer.DynamicSharedInformerFactory, 0, len(namespaces)) for _, ns := range namespaces { externalInformerFactory := registry.MustNewFilteredDynamicSharedInformerFactory( - DependentFactoryKey, + DependentFactoryKey(ns), dclient, 0, ns, @@ -291,14 +306,16 @@ func (c *Controller) loadConfig(path string) { logger.V(3).Info("updated config", "path", path, "config", c.config) // requeue all clusters - lister := typed.ListerFor[*v1alpha1.SpiceDBCluster](c.Registry, typed.NewRegistryKey(OwnedFactoryKey, v1alpha1ClusterGVR)) - clusters, err := lister.List(labels.Everything()) - if err != nil { - utilruntime.HandleError(err) - return - } - for _, cluster := range clusters { - c.enqueue(v1alpha1ClusterGVR, cluster) + for _, ns := range c.namespaces { + lister := typed.MustListerForKey[*v1alpha1.SpiceDBCluster](c.Registry, typed.NewRegistryKey(OwnedFactoryKey(ns), v1alpha1ClusterGVR)) + clusters, err := lister.List(labels.Everything()) + if err != nil { + utilruntime.HandleError(err) + return + } + for _, cluster := range clusters { + c.enqueue(v1alpha1ClusterGVR, cluster) + } } } @@ -313,7 +330,14 @@ func (c *Controller) enqueue(gvr schema.GroupVersionResource, obj interface{}) { // syncOwnedResource is called when SpiceDBCluster is updated func (c *Controller) syncOwnedResource(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) { - cluster, err := typed.ListerFor[*v1alpha1.SpiceDBCluster](c.Registry, typed.NewRegistryKey(OwnedFactoryKey, v1alpha1ClusterGVR)).ByNamespace(namespace).Get(name) + // configure watch namespace; used to lookup the right caches to use + if len(c.namespaces) == 1 && c.namespaces[0] == corev1.NamespaceAll { + ctx = CtxCacheNamespace.WithValue(ctx, corev1.NamespaceAll) + } else { + ctx = CtxCacheNamespace.WithValue(ctx, namespace) + } + + cluster, err := typed.MustListerForKey[*v1alpha1.SpiceDBCluster](c.Registry, typed.NewRegistryKey(OwnedFactoryKey(CtxCacheNamespace.Value(ctx)), v1alpha1ClusterGVR)).ByNamespace(namespace).Get(name) if err != nil { utilruntime.HandleError(fmt.Errorf("syncOwnedResource called on unknown object (%s::%s/%s): %w", gvr.String(), namespace, name, err)) QueueOps.Done(ctx) @@ -391,7 +415,7 @@ func (c *Controller) ensureDeployment(next ...handler.Handler) handler.Handler { }, getDeploymentPods: func(ctx context.Context) []*corev1.Pod { return component.NewIndexedComponent( - typed.IndexerFor[*corev1.Pod](c.Registry, typed.NewRegistryKey(DependentFactoryKey, corev1.SchemeGroupVersion.WithResource("pods"))), + typed.MustIndexerForKey[*corev1.Pod](c.Registry, typed.NewRegistryKey(DependentFactoryKey(CtxCacheNamespace.Value(ctx)), corev1.SchemeGroupVersion.WithResource("pods"))), metadata.OwningClusterIndex, func(ctx context.Context) labels.Selector { return metadata.SelectorForComponent(CtxClusterNN.MustValue(ctx).Name, metadata.ComponentSpiceDBLabelValue) @@ -408,7 +432,7 @@ func (c *Controller) cleanupJob(...handler.Handler) handler.Handler { registry: c.Registry, getJobs: func(ctx context.Context) []*batchv1.Job { return component.NewIndexedComponent( - typed.IndexerFor[*batchv1.Job](c.Registry, typed.NewRegistryKey(DependentFactoryKey, batchv1.SchemeGroupVersion.WithResource("jobs"))), + typed.MustIndexerForKey[*batchv1.Job](c.Registry, typed.NewRegistryKey(DependentFactoryKey(CtxCacheNamespace.Value(ctx)), batchv1.SchemeGroupVersion.WithResource("jobs"))), metadata.OwningClusterIndex, func(ctx context.Context) labels.Selector { return metadata.SelectorForComponent(CtxClusterNN.MustValue(ctx).Name, metadata.ComponentMigrationJobLabelValue) @@ -416,7 +440,7 @@ func (c *Controller) cleanupJob(...handler.Handler) handler.Handler { }, getJobPods: func(ctx context.Context) []*corev1.Pod { return component.NewIndexedComponent( - typed.IndexerFor[*corev1.Pod](c.Registry, typed.NewRegistryKey(DependentFactoryKey, corev1.SchemeGroupVersion.WithResource("pods"))), + typed.MustIndexerForKey[*corev1.Pod](c.Registry, typed.NewRegistryKey(DependentFactoryKey(CtxCacheNamespace.Value(ctx)), corev1.SchemeGroupVersion.WithResource("pods"))), metadata.OwningClusterIndex, func(ctx context.Context) labels.Selector { return metadata.SelectorForComponent(CtxClusterNN.MustValue(ctx).Name, metadata.ComponentMigrationJobLabelValue) @@ -453,42 +477,48 @@ func (c *Controller) selfPauseCluster(...handler.Handler) handler.Handler { func (c *Controller) secretAdopter(next ...handler.Handler) handler.Handler { secretsGVR := corev1.SchemeGroupVersion.WithResource("secrets") - return NewSecretAdoptionHandler( - c.Recorder, - func(ctx context.Context) (*corev1.Secret, error) { - return typed.ListerFor[*corev1.Secret](c.Registry, typed.NewRegistryKey(DependentFactoryKey, secretsGVR)).ByNamespace(CtxSecretNN.MustValue(ctx).Namespace).Get(CtxSecretNN.MustValue(ctx).Name) - }, - func(ctx context.Context, err error) { - cluster := CtxCluster.MustValue(ctx) - status := &v1alpha1.SpiceDBCluster{ - TypeMeta: metav1.TypeMeta{ - Kind: v1alpha1.SpiceDBClusterKind, - APIVersion: v1alpha1.SchemeGroupVersion.String(), - }, - ObjectMeta: metav1.ObjectMeta{Namespace: cluster.Namespace, Name: cluster.Name}, - Status: *cluster.Status.DeepCopy(), - } - status.Status.ObservedGeneration = cluster.GetGeneration() - status.SetStatusCondition(v1alpha1.NewMissingSecretCondition(types.NamespacedName{ - Namespace: cluster.Namespace, - Name: cluster.Spec.SecretRef, - })) - if err := c.PatchStatus(ctx, status); err != nil { - QueueOps.RequeueAPIErr(ctx, err) - } - // keep checking to see if the secret is added - QueueOps.RequeueErr(ctx, err) - }, - typed.IndexerFor[*corev1.Secret](c.Registry, typed.NewRegistryKey(DependentFactoryKey, secretsGVR)), - func(ctx context.Context, secret *applycorev1.SecretApplyConfiguration, options metav1.ApplyOptions) (*corev1.Secret, error) { - return c.kclient.CoreV1().Secrets(*secret.Namespace).Apply(ctx, secret, options) - }, - func(ctx context.Context, nn types.NamespacedName) error { - _, err := c.kclient.CoreV1().Secrets(nn.Namespace).Get(ctx, nn.Name, metav1.GetOptions{}) - return err - }, - handler.Handlers(next).MustOne(), - ) + return handler.NewHandlerFromFunc(func(ctx context.Context) { + NewSecretAdoptionHandler( + c.Recorder, + func(ctx context.Context) (*corev1.Secret, error) { + return typed.MustListerForKey[*corev1.Secret](c.Registry, typed.NewRegistryKey(DependentFactoryKey(CtxCacheNamespace.Value(ctx)), secretsGVR)).ByNamespace(CtxSecretNN.MustValue(ctx).Namespace).Get(CtxSecretNN.MustValue(ctx).Name) + }, + func(ctx context.Context, err error) { + cluster := CtxCluster.MustValue(ctx) + status := &v1alpha1.SpiceDBCluster{ + TypeMeta: metav1.TypeMeta{ + Kind: v1alpha1.SpiceDBClusterKind, + APIVersion: v1alpha1.SchemeGroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{Namespace: cluster.Namespace, Name: cluster.Name}, + Status: *cluster.Status.DeepCopy(), + } + status.Status.ObservedGeneration = cluster.GetGeneration() + status.SetStatusCondition(v1alpha1.NewMissingSecretCondition(types.NamespacedName{ + Namespace: cluster.Namespace, + Name: cluster.Spec.SecretRef, + })) + if err := c.PatchStatus(ctx, status); err != nil { + QueueOps.RequeueAPIErr(ctx, err) + } + // keep checking to see if the secret is added + QueueOps.RequeueErr(ctx, err) + }, + typed.MustIndexerForKey[*corev1.Secret](c.Registry, typed.NewRegistryKey(DependentFactoryKey(CtxCacheNamespace.Value(ctx)), secretsGVR)), + func(ctx context.Context, secret *applycorev1.SecretApplyConfiguration, options metav1.ApplyOptions) (*corev1.Secret, error) { + return c.kclient.CoreV1().Secrets(*secret.Namespace).Apply(ctx, secret, options) + }, + func(ctx context.Context, nn types.NamespacedName) error { + _, err := c.kclient.CoreV1().Secrets(nn.Namespace).Get(ctx, nn.Name, metav1.GetOptions{}) + return err + }, + handler.Handlers(next).MustOne(), + ).Handle(ctx) + if errors.Is(ctx.Err(), context.Canceled) { + return + } + handler.Handlers(next).MustOne().Handle(ctx) + }, "adoptSecret") } func (c *Controller) checkConfigChanged(next ...handler.Handler) handler.Handler { @@ -507,32 +537,44 @@ func (c *Controller) validateConfig(next ...handler.Handler) handler.Handler { }) } -func (c *Controller) getDeployments(...handler.Handler) handler.Handler { - return handler.NewHandler(component.NewComponentContextHandler[*appsv1.Deployment]( - CtxDeployments, - component.NewIndexedComponent( - typed.IndexerFor[*appsv1.Deployment](c.Registry, typed.NewRegistryKey(DependentFactoryKey, appsv1.SchemeGroupVersion.WithResource("deployments"))), - metadata.OwningClusterIndex, - func(ctx context.Context) labels.Selector { - return metadata.SelectorForComponent(CtxClusterNN.MustValue(ctx).Name, metadata.ComponentSpiceDBLabelValue) - }), - CtxClusterNN, - handler.NoopHandler, - ), "getDeployments") +func (c *Controller) getDeployments(next ...handler.Handler) handler.Handler { + return handler.NewHandlerFromFunc(func(ctx context.Context) { + component.NewComponentContextHandler[*appsv1.Deployment]( + CtxDeployments, + component.NewIndexedComponent( + typed.MustIndexerForKey[*appsv1.Deployment](c.Registry, typed.NewRegistryKey(DependentFactoryKey(CtxCacheNamespace.Value(ctx)), appsv1.SchemeGroupVersion.WithResource("deployments"))), + metadata.OwningClusterIndex, + func(ctx context.Context) labels.Selector { + return metadata.SelectorForComponent(CtxClusterNN.MustValue(ctx).Name, metadata.ComponentSpiceDBLabelValue) + }), + CtxClusterNN, + handler.NoopHandler, + ).Handle(ctx) + if errors.Is(ctx.Err(), context.Canceled) { + return + } + handler.Handlers(next).MustOne().Handle(ctx) + }, "getDeployments") } -func (c *Controller) getJobs(...handler.Handler) handler.Handler { - return handler.NewHandler(component.NewComponentContextHandler[*batchv1.Job]( - CtxJobs, - component.NewIndexedComponent( - typed.IndexerFor[*batchv1.Job](c.Registry, typed.NewRegistryKey(DependentFactoryKey, batchv1.SchemeGroupVersion.WithResource("jobs"))), - metadata.OwningClusterIndex, - func(ctx context.Context) labels.Selector { - return metadata.SelectorForComponent(CtxClusterNN.MustValue(ctx).Name, metadata.ComponentMigrationJobLabelValue) - }), - CtxClusterNN, - handler.NoopHandler, - ), "getJobs") +func (c *Controller) getJobs(next ...handler.Handler) handler.Handler { + return handler.NewHandlerFromFunc(func(ctx context.Context) { + component.NewComponentContextHandler[*batchv1.Job]( + CtxJobs, + component.NewIndexedComponent( + typed.MustIndexerForKey[*batchv1.Job](c.Registry, typed.NewRegistryKey(DependentFactoryKey(CtxCacheNamespace.Value(ctx)), batchv1.SchemeGroupVersion.WithResource("jobs"))), + metadata.OwningClusterIndex, + func(ctx context.Context) labels.Selector { + return metadata.SelectorForComponent(CtxClusterNN.MustValue(ctx).Name, metadata.ComponentMigrationJobLabelValue) + }), + CtxClusterNN, + handler.NoopHandler, + ).Handle(ctx) + if errors.Is(ctx.Err(), context.Canceled) { + return + } + handler.Handlers(next).MustOne().Handle(ctx) + }, "getJobs") } func (c *Controller) runMigration(next ...handler.Handler) handler.Handler { @@ -558,64 +600,76 @@ func (c *Controller) checkMigrations(next ...handler.Handler) handler.Handler { }) } -func (c *Controller) ensureServiceAccount(...handler.Handler) handler.Handler { - return handler.NewHandler(component.NewEnsureComponentByHash( - component.NewHashableComponent( - component.NewIndexedComponent( - typed.IndexerFor[*corev1.ServiceAccount]( - c.Registry, - typed.NewRegistryKey( - DependentFactoryKey, - corev1.SchemeGroupVersion.WithResource("serviceaccounts"), - )), - metadata.OwningClusterIndex, - func(ctx context.Context) labels.Selector { - return metadata.SelectorForComponent(CtxClusterNN.MustValue(ctx).Name, metadata.ComponentServiceAccountLabel) - }), - hash.NewObjectHash(), "authzed.com/controller-component-hash"), - CtxClusterNN, - QueueOps, - func(ctx context.Context, apply *applycorev1.ServiceAccountApplyConfiguration) (*corev1.ServiceAccount, error) { - logr.FromContextOrDiscard(ctx).V(4).Info("applying serviceaccount", "namespace", *apply.Namespace, "name", *apply.Name) - return c.kclient.CoreV1().ServiceAccounts(*apply.Namespace).Apply(ctx, apply, metadata.ApplyForceOwned) - }, - func(ctx context.Context, nn types.NamespacedName) error { - logr.FromContextOrDiscard(ctx).V(4).Info("deleting serviceaccount", "namespace", nn.Namespace, "name", nn.Name) - return c.kclient.CoreV1().ServiceAccounts(nn.Namespace).Delete(ctx, nn.Name, metav1.DeleteOptions{}) - }, - func(ctx context.Context) *applycorev1.ServiceAccountApplyConfiguration { - return CtxConfig.MustValue(ctx).ServiceAccount() - }), "ensureServiceAccount") +func (c *Controller) ensureServiceAccount(next ...handler.Handler) handler.Handler { + return handler.NewHandlerFromFunc(func(ctx context.Context) { + component.NewEnsureComponentByHash( + component.NewHashableComponent( + component.NewIndexedComponent( + typed.MustIndexerForKey[*corev1.ServiceAccount]( + c.Registry, + typed.NewRegistryKey( + DependentFactoryKey(CtxCacheNamespace.Value(ctx)), + corev1.SchemeGroupVersion.WithResource("serviceaccounts"), + )), + metadata.OwningClusterIndex, + func(ctx context.Context) labels.Selector { + return metadata.SelectorForComponent(CtxClusterNN.MustValue(ctx).Name, metadata.ComponentServiceAccountLabel) + }), + hash.NewObjectHash(), "authzed.com/controller-component-hash"), + CtxClusterNN, + QueueOps, + func(ctx context.Context, apply *applycorev1.ServiceAccountApplyConfiguration) (*corev1.ServiceAccount, error) { + logr.FromContextOrDiscard(ctx).V(4).Info("applying serviceaccount", "namespace", *apply.Namespace, "name", *apply.Name) + return c.kclient.CoreV1().ServiceAccounts(*apply.Namespace).Apply(ctx, apply, metadata.ApplyForceOwned) + }, + func(ctx context.Context, nn types.NamespacedName) error { + logr.FromContextOrDiscard(ctx).V(4).Info("deleting serviceaccount", "namespace", nn.Namespace, "name", nn.Name) + return c.kclient.CoreV1().ServiceAccounts(nn.Namespace).Delete(ctx, nn.Name, metav1.DeleteOptions{}) + }, + func(ctx context.Context) *applycorev1.ServiceAccountApplyConfiguration { + return CtxConfig.MustValue(ctx).ServiceAccount() + }).Handle(ctx) + if errors.Is(ctx.Err(), context.Canceled) { + return + } + handler.Handlers(next).MustOne().Handle(ctx) + }, "ensureServiceAccount") } -func (c *Controller) ensureRole(...handler.Handler) handler.Handler { - return handler.NewHandler(component.NewEnsureComponentByHash( - component.NewHashableComponent( - component.NewIndexedComponent( - typed.IndexerFor[*rbacv1.Role]( - c.Registry, - typed.NewRegistryKey( - DependentFactoryKey, - rbacv1.SchemeGroupVersion.WithResource("roles"), - )), - metadata.OwningClusterIndex, - func(ctx context.Context) labels.Selector { - return metadata.SelectorForComponent(CtxClusterNN.MustValue(ctx).Name, metadata.ComponentRoleLabel) - }), - hash.NewObjectHash(), "authzed.com/controller-component-hash"), - CtxClusterNN, - QueueOps, - func(ctx context.Context, apply *applyrbacv1.RoleApplyConfiguration) (*rbacv1.Role, error) { - logr.FromContextOrDiscard(ctx).V(4).Info("applying role", "namespace", *apply.Namespace, "name", *apply.Name) - return c.kclient.RbacV1().Roles(*apply.Namespace).Apply(ctx, apply, metadata.ApplyForceOwned) - }, - func(ctx context.Context, nn types.NamespacedName) error { - logr.FromContextOrDiscard(ctx).V(4).Info("deleting role", "namespace", nn.Namespace, "name", nn.Name) - return c.kclient.RbacV1().Roles(nn.Namespace).Delete(ctx, nn.Name, metav1.DeleteOptions{}) - }, - func(ctx context.Context) *applyrbacv1.RoleApplyConfiguration { - return CtxConfig.MustValue(ctx).Role() - }), "ensureRole") +func (c *Controller) ensureRole(next ...handler.Handler) handler.Handler { + return handler.NewHandlerFromFunc(func(ctx context.Context) { + component.NewEnsureComponentByHash( + component.NewHashableComponent( + component.NewIndexedComponent( + typed.MustIndexerForKey[*rbacv1.Role]( + c.Registry, + typed.NewRegistryKey( + DependentFactoryKey(CtxCacheNamespace.Value(ctx)), + rbacv1.SchemeGroupVersion.WithResource("roles"), + )), + metadata.OwningClusterIndex, + func(ctx context.Context) labels.Selector { + return metadata.SelectorForComponent(CtxClusterNN.MustValue(ctx).Name, metadata.ComponentRoleLabel) + }), + hash.NewObjectHash(), "authzed.com/controller-component-hash"), + CtxClusterNN, + QueueOps, + func(ctx context.Context, apply *applyrbacv1.RoleApplyConfiguration) (*rbacv1.Role, error) { + logr.FromContextOrDiscard(ctx).V(4).Info("applying role", "namespace", *apply.Namespace, "name", *apply.Name) + return c.kclient.RbacV1().Roles(*apply.Namespace).Apply(ctx, apply, metadata.ApplyForceOwned) + }, + func(ctx context.Context, nn types.NamespacedName) error { + logr.FromContextOrDiscard(ctx).V(4).Info("deleting role", "namespace", nn.Namespace, "name", nn.Name) + return c.kclient.RbacV1().Roles(nn.Namespace).Delete(ctx, nn.Name, metav1.DeleteOptions{}) + }, + func(ctx context.Context) *applyrbacv1.RoleApplyConfiguration { + return CtxConfig.MustValue(ctx).Role() + }).Handle(ctx) + if errors.Is(ctx.Err(), context.Canceled) { + return + } + handler.Handlers(next).MustOne().Handle(ctx) + }, "ensureRole") } func (c *Controller) ensureRoleBinding(next ...handler.Handler) handler.Handler { @@ -623,10 +677,10 @@ func (c *Controller) ensureRoleBinding(next ...handler.Handler) handler.Handler component.NewEnsureComponentByHash( component.NewHashableComponent( component.NewIndexedComponent( - typed.IndexerFor[*rbacv1.RoleBinding]( + typed.MustIndexerForKey[*rbacv1.RoleBinding]( c.Registry, typed.NewRegistryKey( - DependentFactoryKey, + DependentFactoryKey(CtxCacheNamespace.Value(ctx)), rbacv1.SchemeGroupVersion.WithResource("rolebindings"), )), metadata.OwningClusterIndex, @@ -648,36 +702,45 @@ func (c *Controller) ensureRoleBinding(next ...handler.Handler) handler.Handler return CtxConfig.MustValue(ctx).RoleBinding() }, ).Handle(ctx) + if errors.Is(ctx.Err(), context.Canceled) { + return + } handler.Handlers(next).MustOne().Handle(ctx) }, "ensureRoleBinding") } -func (c *Controller) ensureService(...handler.Handler) handler.Handler { - return handler.NewHandler(component.NewEnsureComponentByHash( - component.NewHashableComponent( - component.NewIndexedComponent( - typed.IndexerFor[*corev1.Service]( - c.Registry, - typed.NewRegistryKey( - DependentFactoryKey, - corev1.SchemeGroupVersion.WithResource("services"), - )), - metadata.OwningClusterIndex, - func(ctx context.Context) labels.Selector { - return metadata.SelectorForComponent(CtxClusterNN.MustValue(ctx).Name, metadata.ComponentServiceLabel) - }), - hash.NewObjectHash(), "authzed.com/controller-component-hash"), - CtxClusterNN, - QueueOps, - func(ctx context.Context, apply *applycorev1.ServiceApplyConfiguration) (*corev1.Service, error) { - logr.FromContextOrDiscard(ctx).V(4).Info("applying service", "namespace", *apply.Namespace, "name", *apply.Name) - return c.kclient.CoreV1().Services(*apply.Namespace).Apply(ctx, apply, metadata.ApplyForceOwned) - }, - func(ctx context.Context, nn types.NamespacedName) error { - logr.FromContextOrDiscard(ctx).V(4).Info("deleting service", "namespace", nn.Namespace, "name", nn.Name) - return c.kclient.CoreV1().Services(nn.Namespace).Delete(ctx, nn.Name, metav1.DeleteOptions{}) - }, - func(ctx context.Context) *applycorev1.ServiceApplyConfiguration { - return CtxConfig.MustValue(ctx).Service() - }), "ensureService") +func (c *Controller) ensureService(next ...handler.Handler) handler.Handler { + return handler.NewHandlerFromFunc(func(ctx context.Context) { + component.NewEnsureComponentByHash( + component.NewHashableComponent( + component.NewIndexedComponent( + typed.MustIndexerForKey[*corev1.Service]( + c.Registry, + typed.NewRegistryKey( + DependentFactoryKey(CtxCacheNamespace.Value(ctx)), + corev1.SchemeGroupVersion.WithResource("services"), + )), + metadata.OwningClusterIndex, + func(ctx context.Context) labels.Selector { + return metadata.SelectorForComponent(CtxClusterNN.MustValue(ctx).Name, metadata.ComponentServiceLabel) + }), + hash.NewObjectHash(), "authzed.com/controller-component-hash"), + CtxClusterNN, + QueueOps, + func(ctx context.Context, apply *applycorev1.ServiceApplyConfiguration) (*corev1.Service, error) { + logr.FromContextOrDiscard(ctx).V(4).Info("applying service", "namespace", *apply.Namespace, "name", *apply.Name) + return c.kclient.CoreV1().Services(*apply.Namespace).Apply(ctx, apply, metadata.ApplyForceOwned) + }, + func(ctx context.Context, nn types.NamespacedName) error { + logr.FromContextOrDiscard(ctx).V(4).Info("deleting service", "namespace", nn.Namespace, "name", nn.Name) + return c.kclient.CoreV1().Services(nn.Namespace).Delete(ctx, nn.Name, metav1.DeleteOptions{}) + }, + func(ctx context.Context) *applycorev1.ServiceApplyConfiguration { + return CtxConfig.MustValue(ctx).Service() + }).Handle(ctx) + if errors.Is(ctx.Err(), context.Canceled) { + return + } + handler.Handlers(next).MustOne().Handle(ctx) + }, "ensureService") } diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go new file mode 100644 index 0000000..db44b9b --- /dev/null +++ b/pkg/controller/controller_test.go @@ -0,0 +1,206 @@ +package controller + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/authzed/controller-idioms/typed" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/dynamic/fake" + kfake "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + + "github.com/authzed/spicedb-operator/pkg/apis/authzed/v1alpha1" + "github.com/authzed/spicedb-operator/pkg/metadata" +) + +type keyRecordingQueue struct { + workqueue.RateLimitingInterface + Items chan any +} + +func newKeyRecordingQueue(queue workqueue.RateLimitingInterface) *keyRecordingQueue { + return &keyRecordingQueue{ + RateLimitingInterface: queue, + Items: make(chan any), + } +} + +func (q *keyRecordingQueue) Add(item any) { + q.Items <- item + q.RateLimitingInterface.Add(item) +} + +func (q *keyRecordingQueue) AddAfter(item any, d time.Duration) { + q.Items <- item + q.RateLimitingInterface.AddAfter(item, d) +} + +func (q *keyRecordingQueue) AddRateLimited(item any) { + q.Items <- item + q.RateLimitingInterface.AddRateLimited(item) +} + +func TestControllerNamespacing(t *testing.T) { + tests := []struct { + name string + watchedNamespaces []string + createNamespaces []string + spiceDBClusters map[string]string + services map[string]string + expectedKeys []string + }{ + { + name: "default to all namespaces", + watchedNamespaces: nil, + createNamespaces: []string{"test", "test2", "test3"}, + spiceDBClusters: map[string]string{"test3": "test3"}, + services: map[string]string{ + "test": "test", + "test2": "test2", + }, + expectedKeys: []string{ + "spicedbclusters.v1alpha1.authzed.com::test/test", + "spicedbclusters.v1alpha1.authzed.com::test2/test2", + "spicedbclusters.v1alpha1.authzed.com::test3/test3", + }, + }, + { + name: "explicitly watch all namespaces", + watchedNamespaces: []string{""}, + createNamespaces: []string{"test", "test2", "test3"}, + spiceDBClusters: map[string]string{"test3": "test3"}, + services: map[string]string{ + "test": "test", + "test2": "test2", + }, + expectedKeys: []string{ + "spicedbclusters.v1alpha1.authzed.com::test/test", + "spicedbclusters.v1alpha1.authzed.com::test2/test2", + "spicedbclusters.v1alpha1.authzed.com::test3/test3", + }, + }, + { + name: "watch one namespace (owned objects)", + watchedNamespaces: []string{"test"}, + createNamespaces: []string{"test", "test2", "test3"}, + spiceDBClusters: map[string]string{"test3": "test3"}, + services: map[string]string{ + "test": "test", + "test2": "test2", + }, + expectedKeys: []string{ + "spicedbclusters.v1alpha1.authzed.com::test/test", + }, + }, + { + name: "watch one namespace (external objects)", + watchedNamespaces: []string{"test2"}, + createNamespaces: []string{"test", "test2", "test3"}, + spiceDBClusters: map[string]string{"test3": "test3"}, + services: map[string]string{ + "test": "test", + "test2": "test2", + }, + expectedKeys: []string{ + "spicedbclusters.v1alpha1.authzed.com::test2/test2", + }, + }, + { + name: "watch multiple namespaces", + watchedNamespaces: []string{"test2", "test3"}, + createNamespaces: []string{"test", "test2", "test3"}, + spiceDBClusters: map[string]string{"test3": "test3"}, + services: map[string]string{ + "test": "test", + "test2": "test2", + }, + expectedKeys: []string{ + "spicedbclusters.v1alpha1.authzed.com::test2/test2", + "spicedbclusters.v1alpha1.authzed.com::test3/test3", + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + registry := typed.NewRegistry() + broadcaster := record.NewBroadcaster() + dclient := fake.NewSimpleDynamicClient(scheme.Scheme) + kclient := kfake.NewSimpleClientset() + c, err := NewController(ctx, registry, dclient, kclient, nil, "", broadcaster, tt.watchedNamespaces) + require.NoError(t, err) + queue := newKeyRecordingQueue(workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())) + c.Queue = queue + go c.Start(ctx, 1) + + for _, ns := range tt.createNamespaces { + ns, err := typed.ObjToUnstructuredObj(&corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ns}}) + require.NoError(t, err) + _, err = dclient.Resource(corev1.SchemeGroupVersion.WithResource("namespaces")).Create(ctx, ns, metav1.CreateOptions{}) + require.NoError(t, err) + } + + // test that non-owned objects (i.e. services) are watched in + // the appropriate namespaces as well + for ns, spicedb := range tt.services { + service, err := typed.ObjToUnstructuredObj(&corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: ns, + Labels: map[string]string{ + metadata.OperatorManagedLabelKey: metadata.OperatorManagedLabelValue, + }, + Annotations: map[string]string{ + metadata.OwnerAnnotationKeyPrefix + spicedb: "owned", + }, + }, + }) + require.NoError(t, err) + + _, err = dclient.Resource(corev1.SchemeGroupVersion.WithResource("services")).Namespace(ns).Create(ctx, service, metav1.CreateOptions{}) + require.NoError(t, err) + } + + for ns, spicedb := range tt.spiceDBClusters { + service, err := typed.ObjToUnstructuredObj(&v1alpha1.SpiceDBCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: spicedb, + Namespace: ns, + }, + }) + require.NoError(t, err) + + _, err = dclient.Resource(v1alpha1ClusterGVR).Namespace(ns).Create(ctx, service, metav1.CreateOptions{}) + require.NoError(t, err) + } + + gotKeys := make([]any, 0) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case s := <-queue.Items: + gotKeys = append(gotKeys, s) + if len(gotKeys) == len(tt.expectedKeys) { + return + } + case <-ctx.Done(): + return + } + } + }() + wg.Wait() + require.ElementsMatch(t, tt.expectedKeys, gotKeys) + }) + } +}