diff --git a/control-plane/pkg/core/config/utils.go b/control-plane/pkg/core/config/utils.go index 736264c5ad..4711857ef9 100644 --- a/control-plane/pkg/core/config/utils.go +++ b/control-plane/pkg/core/config/utils.go @@ -62,6 +62,10 @@ func EgressConfigFromDelivery( egressConfig := &contract.EgressConfig{} if delivery.DeadLetterSink != nil { + destination := *delivery.DeadLetterSink // Do not update object Spec, so copy destination. + if destination.Ref != nil && destination.Ref.Namespace == "" { + destination.Ref.Namespace = parent.GetNamespace() + } deadLetterSinkURL, err := resolver.URIFromDestinationV1(ctx, *delivery.DeadLetterSink, parent) if err != nil { return nil, fmt.Errorf("failed to resolve Spec.Delivery.DeadLetterSink: %w", err) diff --git a/control-plane/pkg/reconciler/broker/broker_test.go b/control-plane/pkg/reconciler/broker/broker_test.go index c14aabf0e8..5de3214035 100644 --- a/control-plane/pkg/reconciler/broker/broker_test.go +++ b/control-plane/pkg/reconciler/broker/broker_test.go @@ -217,6 +217,63 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { BootstrapServersConfigMapKey: bootstrapServers, }, }, + { + Name: "Reconciled normal - with DLS - no DLS ref namespace", + Objects: []runtime.Object{ + NewBroker( + WithDelivery(WithNoDeadLetterSinkNamespace), + ), + NewConfigMapFromContract(&contract.Contract{ + Generation: 1, + }, &configs), + NewService(WithServiceNamespace(BrokerNamespace)), + BrokerReceiverPod(configs.SystemNamespace, map[string]string{base.VolumeGenerationAnnotationKey: "3"}), + BrokerDispatcherPod(configs.SystemNamespace, map[string]string{base.VolumeGenerationAnnotationKey: "1"}), + }, + Key: testKey, + WantEvents: []string{ + finalizerUpdatedEvent, + }, + WantUpdates: []clientgotesting.UpdateActionImpl{ + ConfigMapUpdate(&configs, &contract.Contract{ + Resources: []*contract.Resource{ + { + Uid: BrokerUUID, + Topics: []string{BrokerTopic()}, + Ingress: &contract.Ingress{ContentMode: contract.ContentMode_BINARY, IngressType: &contract.Ingress_Path{Path: receiver.Path(BrokerNamespace, BrokerName)}}, + BootstrapServers: bootstrapServers, + EgressConfig: &contract.EgressConfig{DeadLetter: ServiceURLFrom(BrokerNamespace, ServiceName)}, + }, + }, + Generation: 2, + }), + BrokerReceiverPodUpdate(configs.SystemNamespace, map[string]string{ + base.VolumeGenerationAnnotationKey: "2", + }), + BrokerDispatcherPodUpdate(configs.SystemNamespace, map[string]string{ + base.VolumeGenerationAnnotationKey: "2", + }), + }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{ + { + Object: NewBroker( + WithDelivery(WithNoDeadLetterSinkNamespace), + reconcilertesting.WithInitBrokerConditions, + BrokerConfigMapUpdatedReady(&configs), + BrokerDataPlaneAvailable, + BrokerTopicReady, + BrokerConfigParsed, + BrokerAddressable(&configs), + ), + }, + }, + OtherTestData: map[string]interface{}{ + BootstrapServersConfigMapKey: bootstrapServers, + }, + }, { Name: "Failed to create topic", Objects: []runtime.Object{ diff --git a/control-plane/pkg/reconciler/testing/objects_broker.go b/control-plane/pkg/reconciler/testing/objects_broker.go index 610c81a4a5..b27705d2ca 100644 --- a/control-plane/pkg/reconciler/testing/objects_broker.go +++ b/control-plane/pkg/reconciler/testing/objects_broker.go @@ -77,7 +77,7 @@ func NewDeletedBroker(options ...reconcilertesting.BrokerOption) runtime.Object ) } -func WithDelivery() func(*eventing.Broker) { +func WithDelivery(mutations ...func(spec *eventingduck.DeliverySpec)) func(*eventing.Broker) { service := NewService() return func(broker *eventing.Broker) { @@ -92,6 +92,15 @@ func WithDelivery() func(*eventing.Broker) { APIVersion: service.APIVersion, }, } + for _, mut := range mutations { + mut(broker.Spec.Delivery) + } + } +} + +func WithNoDeadLetterSinkNamespace(spec *eventingduck.DeliverySpec) { + if spec.DeadLetterSink != nil && spec.DeadLetterSink.Ref != nil { + spec.DeadLetterSink.Ref.Namespace = "" } } diff --git a/control-plane/pkg/reconciler/testing/objects_common.go b/control-plane/pkg/reconciler/testing/objects_common.go index e279298fc0..5268ca1dd4 100644 --- a/control-plane/pkg/reconciler/testing/objects_common.go +++ b/control-plane/pkg/reconciler/testing/objects_common.go @@ -17,6 +17,7 @@ package testing import ( + "fmt" "io/ioutil" "knative.dev/eventing-kafka-broker/control-plane/pkg/contract" @@ -39,9 +40,8 @@ const ( ConfigMapNamespace = "test-namespace-config-map" ConfigMapName = "test-config-cm" - serviceNamespace = "test-service-namespace" - serviceName = "test-service" - ServiceURL = "http://test-service.test-service-namespace.svc.cluster.local" + ServiceNamespace = "test-service-namespace" + ServiceName = "test-service" TriggerUUID = "e7185016-5d98-4b54-84e8-3b1cd4acc6b5" @@ -51,19 +51,35 @@ const ( var ( Formats = []string{base.Protobuf, base.Json} + + ServiceURL = ServiceURLFrom(ServiceNamespace, ServiceName) ) -func NewService() *corev1.Service { - return &corev1.Service{ +func NewService(mutations ...func(*corev1.Service)) *corev1.Service { + s := &corev1.Service{ TypeMeta: metav1.TypeMeta{ Kind: "Service", APIVersion: "v1", }, ObjectMeta: metav1.ObjectMeta{ - Name: serviceName, - Namespace: serviceNamespace, + Name: ServiceName, + Namespace: ServiceNamespace, }, } + for _, mut := range mutations { + mut(s) + } + return s +} + +func WithServiceNamespace(ns string) func(s *corev1.Service) { + return func(s *corev1.Service) { + s.Namespace = ns + } +} + +func ServiceURLFrom(ns, name string) string { + return fmt.Sprintf("http://%s.%s.svc.cluster.local", name, ns) } func NewConfigMap(configs *Configs, data []byte) runtime.Object {