Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 177 additions & 0 deletions controllers/aga/eventhandlers/reference_grant_events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package eventhandlers

import (
"context"
"fmt"
"github.com/go-logr/logr"
"k8s.io/client-go/util/workqueue"
agaapi "sigs.k8s.io/aws-load-balancer-controller/apis/aga/v1beta1"
"sigs.k8s.io/aws-load-balancer-controller/pkg/k8s"
"sigs.k8s.io/aws-load-balancer-controller/pkg/shared_constants"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
gwbeta1 "sigs.k8s.io/gateway-api/apis/v1beta1"
)

// NewEnqueueRequestsForReferenceGrantEvent creates handler for ReferenceGrant resources
func NewEnqueueRequestsForReferenceGrantEvent(
k8sClient client.Client,
logger logr.Logger,
) handler.TypedEventHandler[*gwbeta1.ReferenceGrant, reconcile.Request] {
return &enqueueRequestsForReferenceGrantEvent{
k8sClient: k8sClient,
logger: logger,
}
}

var _ handler.TypedEventHandler[*gwbeta1.ReferenceGrant, reconcile.Request] = (*enqueueRequestsForReferenceGrantEvent)(nil)

// enqueueRequestsForReferenceGrantEvent handles ReferenceGrant events
type enqueueRequestsForReferenceGrantEvent struct {
k8sClient client.Client
logger logr.Logger
}

func (h *enqueueRequestsForReferenceGrantEvent) Create(ctx context.Context, e event.TypedCreateEvent[*gwbeta1.ReferenceGrant], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
refGrant := e.Object
h.logger.V(1).Info("enqueue reference grant create event", "reference grant", refGrant.Name)
h.enqueueImpactedGlobalAccelerators(ctx, refGrant, nil, queue)
}

func (h *enqueueRequestsForReferenceGrantEvent) Update(ctx context.Context, e event.TypedUpdateEvent[*gwbeta1.ReferenceGrant], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
refGrantNew := e.ObjectNew
refGrantOld := e.ObjectOld
h.logger.V(1).Info("enqueue reference grant update event", "reference grant", refGrantNew.Name)
h.enqueueImpactedGlobalAccelerators(ctx, refGrantNew, refGrantOld, queue)
}

func (h *enqueueRequestsForReferenceGrantEvent) Delete(ctx context.Context, e event.TypedDeleteEvent[*gwbeta1.ReferenceGrant], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
refGrant := e.Object
h.logger.V(1).Info("enqueue reference grant delete event", "reference grant", refGrant.Name)
h.enqueueImpactedGlobalAccelerators(ctx, refGrant, nil, queue)
}

func (h *enqueueRequestsForReferenceGrantEvent) Generic(ctx context.Context, e event.TypedGenericEvent[*gwbeta1.ReferenceGrant], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
refGrant := e.Object
h.logger.V(1).Info("enqueue reference grant generic event", "reference grant", refGrant.Name)
h.enqueueImpactedGlobalAccelerators(ctx, refGrant, nil, queue)
}

// enqueueImpactedGlobalAccelerators finds and enqueues GlobalAccelerators impacted by a ReferenceGrant change
func (h *enqueueRequestsForReferenceGrantEvent) enqueueImpactedGlobalAccelerators(
ctx context.Context,
newRefGrant *gwbeta1.ReferenceGrant,
oldRefGrant *gwbeta1.ReferenceGrant,
queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {

// Collect all relevant namespaces from both old and new ReferenceGrant
impactedFroms := make(map[string]gwbeta1.ReferenceGrantFrom)

// Process new reference grant
for i, from := range newRefGrant.Spec.From {
if from.Group == shared_constants.GlobalAcceleratorResourcesGroup && from.Kind == shared_constants.GlobalAcceleratorKind {
key := generateGrantFromKey(from)
impactedFroms[key] = newRefGrant.Spec.From[i]
}
}

// Also process old reference grant if it exists (for updates)
if oldRefGrant != nil {
for i, from := range oldRefGrant.Spec.From {
if from.Group == shared_constants.GlobalAcceleratorResourcesGroup && from.Kind == shared_constants.GlobalAcceleratorKind {
key := generateGrantFromKey(from)
impactedFroms[key] = oldRefGrant.Spec.From[i]
}
}
}

// If no GlobalAccelerator references found, nothing to do
if len(impactedFroms) == 0 {
h.logger.V(1).Info("ReferenceGrant doesn't reference GlobalAccelerators, skipping",
"referenceGrant", k8s.NamespacedName(newRefGrant))
return
}

totalMatched := 0

// Process each impacted GlobalAccelerator namespace
for _, from := range impactedFroms {

var gaList agaapi.GlobalAcceleratorList
if err := h.k8sClient.List(ctx, &gaList, &client.ListOptions{Namespace: string(from.Namespace)}); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any caching you can do here to prevent calling List() on the same namespace multiple times? I think the client should handle this case, but adding an explicit cache might make things cleaner here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohh are you concerned they might have multiple same namespace references in single grant or you thinking across grants?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a look at the Reference Grant structure, and the comment I made doesn't make sense (sorry). I see the existing reference grant logic also does this same listing.

h.logger.Error(err, "Failed to list GlobalAccelerators for ReferenceGrant",
"referenceGrant", k8s.NamespacedName(newRefGrant),
"from namespace", from.Namespace)
continue
}

// Check each GA to see if it references resources in the target namespace
for i := range gaList.Items {
ga := &gaList.Items[i]

// Only check GAs that reference resources in the ReferenceGrant's namespace
hasRelevantCrossNamespaceRef := h.hasCrossNamespaceReferences(
ga,
newRefGrant.Namespace,
)

if hasRelevantCrossNamespaceRef {
totalMatched++

// Enqueue reconcile request for this GA
request := reconcile.Request{
NamespacedName: k8s.NamespacedName(ga),
}

h.logger.V(1).Info("Enqueueing GlobalAccelerator for reconcile due to ReferenceGrant change",
"globalAccelerator", request.NamespacedName,
"referenceGrant", k8s.NamespacedName(newRefGrant))

queue.Add(request)
}
}
}

h.logger.V(1).Info("ReferenceGrant event processing completed",
"referenceGrant", k8s.NamespacedName(newRefGrant),
"totalMatchedGAs", totalMatched)
}

// hasCrossNamespaceReferences checks if a GlobalAccelerator has cross-namespace references to resources in the target namespace
func (h *enqueueRequestsForReferenceGrantEvent) hasCrossNamespaceReferences(
ga *agaapi.GlobalAccelerator,
targetNamespace string) bool {

// Go through all endpoints in the GA spec
if ga.Spec.Listeners == nil {
return false
}

for _, listener := range *ga.Spec.Listeners {
if listener.EndpointGroups == nil {
continue
}

for _, endpointGroup := range *listener.EndpointGroups {
if endpointGroup.Endpoints == nil {
continue
}

for _, endpoint := range *endpointGroup.Endpoints {
// Check for cross-namespace references
if endpoint.Namespace != nil && *endpoint.Namespace == targetNamespace && *endpoint.Namespace != ga.Namespace {
return true // Found a cross-namespace reference to target namespace
}
}
}
}

return false
}

// generateGrantFromKey creates a unique key for a ReferenceGrantFrom
func generateGrantFromKey(from gwbeta1.ReferenceGrantFrom) string {
return fmt.Sprintf("%s-%s", from.Kind, from.Namespace)
}
40 changes: 26 additions & 14 deletions controllers/aga/globalaccelerator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controllers
import (
"context"
"fmt"
gwbeta1 "sigs.k8s.io/gateway-api/apis/v1beta1"
"time"

"github.com/aws/aws-sdk-go-v2/service/globalaccelerator/types"
Expand Down Expand Up @@ -60,10 +61,6 @@ const (
controllerName = "globalAccelerator"
agaTagPrefix = "aga.k8s.aws"

// the groupVersion of used GlobalAccelerator resource.
agaResourcesGroupVersion = "aga.k8s.aws/v1beta1"
globalAcceleratorKind = "GlobalAccelerator"

// Requeue constants for state monitoring
// requeueReasonAcceleratorInProgress indicates that the reconciliation is being requeued because
// the Global Accelerator is still in progress state
Expand Down Expand Up @@ -128,8 +125,11 @@ func NewGlobalAcceleratorReconciler(k8sClient client.Client, eventRecorder recor
logger.Error(err, "Failed to create DNS resolver")
}

// Create unified endpoint loader
endpointLoader := aga.NewEndpointLoader(k8sClient, dnsToLoadBalancerResolver, logger.WithName("endpoint-loader"))
// Create cross-namespace validator for ReferenceGrants
crossNamespaceValidator := aga.NewReferenceGrantValidator(k8sClient, logger.WithName("reference-grant-validator"))

// Create unified endpoint loader with validator
endpointLoader := aga.NewEndpointLoader(k8sClient, dnsToLoadBalancerResolver, logger.WithName("endpoint-loader"), crossNamespaceValidator)
return &globalAcceleratorReconciler{
k8sClient: k8sClient,
eventRecorder: eventRecorder,
Expand Down Expand Up @@ -189,6 +189,7 @@ type globalAcceleratorReconciler struct {
//+kubebuilder:rbac:groups=aga.k8s.aws,resources=globalaccelerators,verbs=get;list;watch;patch
//+kubebuilder:rbac:groups=aga.k8s.aws,resources=globalaccelerators/status,verbs=update;patch
//+kubebuilder:rbac:groups=aga.k8s.aws,resources=globalaccelerators/finalizers,verbs=update;patch
//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=referencegrants,verbs=get;list;watch

func (r *globalAcceleratorReconciler) Reconcile(ctx context.Context, req reconcile.Request) (ctrl.Result, error) {
r.reconcileTracker(req.NamespacedName)
Expand Down Expand Up @@ -283,11 +284,9 @@ func (r *globalAcceleratorReconciler) reconcileGlobalAcceleratorResources(ctx co
// Track referenced endpoints
r.referenceTracker.UpdateDesiredEndpointReferencesForGA(ga, endpoints)

// Update resource watches with the endpointResourcesManager
r.endpointResourcesManager.MonitorEndpointResources(ga, endpoints)

// Validate and load endpoint status using the endpoint loader
loadedEndpoints, fatalErrors := r.endpointLoader.LoadEndpoints(ctx, ga, endpoints)

if len(fatalErrors) > 0 {
err := fmt.Errorf("failed to load endpoints: %v", fatalErrors[0])
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GlobalAcceleratorEventReasonFailedEndpointLoad, fmt.Sprintf("Failed to reconcile due to %v", err))
Expand All @@ -299,6 +298,10 @@ func (r *globalAcceleratorReconciler) reconcileGlobalAcceleratorResources(ctx co
return err
}

// Update resource watches with the endpointResourcesManager
// Do this after loading endpoints so we have more accurate status information
r.endpointResourcesManager.MonitorEndpointResources(ga, loadedEndpoints)

var stack core.Stack
var accelerator *agamodel.Accelerator
var err error
Expand Down Expand Up @@ -424,12 +427,12 @@ func (r *globalAcceleratorReconciler) cleanupGlobalAcceleratorResources(ctx cont

func (r *globalAcceleratorReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, clientSet *kubernetes.Clientset) error {
// Check if GlobalAccelerator CRD is available
resList, err := clientSet.ServerResourcesForGroupVersion(agaResourcesGroupVersion)
resList, err := clientSet.ServerResourcesForGroupVersion(shared_constants.GlobalAcceleratorResourcesGroupVersion)
if err != nil {
r.logger.Info("GlobalAccelerator CRD is not available, skipping controller setup")
return nil
}
globalAcceleratorResourceAvailable := k8s.IsResourceKindAvailable(resList, globalAcceleratorKind)
globalAcceleratorResourceAvailable := k8s.IsResourceKindAvailable(resList, shared_constants.GlobalAcceleratorKind)
if !globalAcceleratorResourceAvailable {
r.logger.Info("GlobalAccelerator CRD is not available, skipping controller setup")
return nil
Expand Down Expand Up @@ -476,15 +479,15 @@ func (r *globalAcceleratorReconciler) SetupWithManager(ctx context.Context, mgr
}

// Setup watches for resource events
if err := r.setupGlobalAcceleratorWatches(ctrl); err != nil {
if err := r.setupGlobalAcceleratorWatches(ctrl, mgr); err != nil {
return err
}

return nil
}

// setupGlobalAcceleratorWatches sets up watches for resources that can trigger reconciliation of GlobalAccelerator objects
func (r *globalAcceleratorReconciler) setupGlobalAcceleratorWatches(c controller.Controller) error {
func (r *globalAcceleratorReconciler) setupGlobalAcceleratorWatches(c controller.Controller, mgr ctrl.Manager) error {
loggerPrefix := r.logger.WithName("eventHandlers")

// Create handlers for our dedicated watchers
Expand Down Expand Up @@ -521,8 +524,17 @@ func (r *globalAcceleratorReconciler) setupGlobalAcceleratorWatches(c controller
if err := c.Watch(source.Channel(r.gatewayEventChan, gatewayHandler)); err != nil {
return err
}

referenceGrantHandler := eventhandlers.NewEnqueueRequestsForReferenceGrantEvent(
r.k8sClient,
loggerPrefix.WithName("referencegrant-handler"),
)

if err := c.Watch(source.Kind(mgr.GetCache(), &gwbeta1.ReferenceGrant{}, referenceGrantHandler)); err != nil {
r.logger.Info("Failed to set up watch for ReferenceGrant resources, cross-namespace validation may be delayed", "error", err)
}
} else {
r.logger.Info("Gateway API CRDs not found, skipping Gateway event watch setup")
r.logger.Info("Gateway API CRDs not found, skipping Gateway and ReferenceGrant event watch setup")
}

return nil
Expand Down
60 changes: 54 additions & 6 deletions docs/guide/globalaccelerator/aga-controller.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,60 @@ endpointGroups:

For more information about when and how to use port overrides, see [AWS Global Accelerator Port Overrides](https://docs.aws.amazon.com/global-accelerator/latest/dg/about-endpoint-groups-port-override.html) in the AWS documentation.

> **Note**: The AWS Global Accelerator Controller handles all port override constraints automatically, ensuring your configuration is valid.
!!!note "Note"
The AWS Global Accelerator Controller handles all port override constraints automatically, ensuring your configuration is valid.

## Cross-Namespace Endpoint References


The AWS Global Accelerator controller supports cross-namespace references for endpoints using the [Gateway API ReferenceGrant](https://gateway-api.sigs.k8s.io/api-types/referencegrant/) approach, which is a common pattern for secure cross-namespace references in Kubernetes. Cross-namespace references allow a GlobalAccelerator resource in one namespace (e.g., `accelerator-ns`) to reference resources in another namespace (e.g., `web-ns`), provided that a ReferenceGrant exists in the target namespace explicitly allowing the reference.

### Using Cross-Namespace References

#### Step 1: Create a ReferenceGrant in the Target Namespace

To allow a GlobalAccelerator in namespace A to reference a resource in namespace B, create a ReferenceGrant in namespace B:

```yaml
apiVersion: gateway.networking.k8s.io/v1beta1
kind: ReferenceGrant
metadata:
name: allow-accelerator-references
namespace: web-ns # Target namespace containing the service
spec:
from:
- group: aga.k8s.aws
kind: GlobalAccelerator
namespace: accelerator-ns # Source namespace containing the GlobalAccelerator
to:
- group: ""
kind: Service
name: web-service
```

#### Step 2: Reference the Resource in your GlobalAccelerator

Once the ReferenceGrant is in place, you can reference the resource in your GlobalAccelerator:

```yaml
apiVersion: aga.k8s.aws/v1beta1
kind: GlobalAccelerator
metadata:
name: my-accelerator
namespace: accelerator-ns # Source namespace
spec:
listeners:
- endpointGroups:
- endpoints:
- namespace: web-ns # Target namespace
name: web-service # Service in target namespace
type: Service
```

!!!note "To use cross-namespace references"

1. The Gateway API CRDs must be installed in your cluster (specifically the ReferenceGrant CRD)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can be explicit here and say that Reference grant is installed via "standard crds" https://kubernetes-sigs.github.io/aws-load-balancer-controller/latest/guide/gateway/gateway/#prerequisites

2. The controller must be granted permission to read ReferenceGrants cluster-wide

## Sample CRDs

Expand Down Expand Up @@ -283,11 +336,6 @@ Ensure your Service/Ingress/Gateway resources:
2. Have been successfully provisioned with actual AWS load balancers
3. Are in the same namespace as specified in the endpoint

## Current Limitations and Future Enhancements

### Cross-Namespace Reference Limitations

The initial release of the AWS Global Accelerator Controller does not support cross-namespace endpoint references. This means that all endpoint resources (Services, Ingresses, Gateways) must be in the same namespace as the GlobalAccelerator resource that references them.

## References

Expand Down
Loading
Loading