From e6756f9ca73a0e9f66941eaf9c54a13f776f681d Mon Sep 17 00:00:00 2001 From: Justin Reasoner Date: Mon, 28 Oct 2024 13:41:14 -0400 Subject: [PATCH 1/2] add support for http header routing Signed-off-by: Justin Reasoner --- interceptor/config/serving.go | 3 +++ interceptor/handler/static.go | 2 +- interceptor/handler/static_test.go | 4 +++- interceptor/main.go | 2 +- pkg/routing/key.go | 8 ++++++- pkg/routing/key_test.go | 37 +++++++++++++++++++----------- pkg/routing/table.go | 6 +++-- pkg/routing/table_test.go | 11 +++++---- 8 files changed, 49 insertions(+), 24 deletions(-) diff --git a/interceptor/config/serving.go b/interceptor/config/serving.go index d0a2ff5d0..3304c0b7e 100644 --- a/interceptor/config/serving.go +++ b/interceptor/config/serving.go @@ -45,6 +45,9 @@ type Serving struct { TLSCertStorePaths string `envconfig:"KEDA_HTTP_PROXY_TLS_CERT_STORE_PATHS" default:""` // TLSPort is the port that the server should serve on if TLS is enabled TLSPort int `envconfig:"KEDA_HTTP_PROXY_TLS_PORT" default:"8443"` + // RoutingHeader is an optional header that can be used to route requests + // to different backends when set and sent in the HTTP request + RoutingHeader string `envconfig:"KEDA_HTTP_ROUTING_HEADER" default:""` } // Parse parses standard configs using envconfig and returns a pointer to the diff --git a/interceptor/handler/static.go b/interceptor/handler/static.go index 54c77953a..e9e678cf1 100644 --- a/interceptor/handler/static.go +++ b/interceptor/handler/static.go @@ -31,7 +31,7 @@ func (sh *Static) ServeHTTP(w http.ResponseWriter, r *http.Request) { stream := util.StreamFromContext(ctx) statusText := http.StatusText(sh.statusCode) - routingKey := routing.NewKeyFromRequest(r) + routingKey := routing.NewKeyFromRequest(r, "") namespacedName := k8s.NamespacedNameFromObject(httpso) logger.Error(sh.err, statusText, "routingKey", routingKey, "namespacedName", namespacedName, "stream", stream) diff --git a/interceptor/handler/static_test.go b/interceptor/handler/static_test.go index 4cf4b9323..324b2f580 100644 --- a/interceptor/handler/static_test.go +++ b/interceptor/handler/static_test.go @@ -23,6 +23,8 @@ var _ = Describe("ServeHTTP", func() { st = http.StatusText(sc) se = errors.New("test error") + + rh = "" ) BeforeEach(func() { @@ -47,7 +49,7 @@ var _ = Describe("ServeHTTP", func() { err := json.Unmarshal([]byte(obj), &m) Expect(err).NotTo(HaveOccurred()) - rk := routing.NewKeyFromRequest(r) + rk := routing.NewKeyFromRequest(r, rh) Expect(m).To(HaveKeyWithValue("error", se.Error())) Expect(m).To(HaveKeyWithValue("msg", st)) Expect(m).To(HaveKeyWithValue("routingKey", rk.String())) diff --git a/interceptor/main.go b/interceptor/main.go index 5a67887ab..a490c87e4 100644 --- a/interceptor/main.go +++ b/interceptor/main.go @@ -106,7 +106,7 @@ func main() { queues := queue.NewMemory() sharedInformerFactory := informers.NewSharedInformerFactory(httpCl, servingCfg.ConfigMapCacheRsyncPeriod) - routingTable, err := routing.NewTable(sharedInformerFactory, servingCfg.WatchNamespace, queues) + routingTable, err := routing.NewTable(sharedInformerFactory, servingCfg.WatchNamespace, queues, servingCfg.RoutingHeader) if err != nil { setupLog.Error(err, "fetching routing table") os.Exit(1) diff --git a/pkg/routing/key.go b/pkg/routing/key.go index 4f7da0d45..2d5ad095b 100644 --- a/pkg/routing/key.go +++ b/pkg/routing/key.go @@ -33,7 +33,7 @@ func NewKeyFromURL(url *url.URL) Key { return NewKey(url.Host, url.Path) } -func NewKeyFromRequest(req *http.Request) Key { +func NewKeyFromRequest(req *http.Request, httpRoutingHeaderKey string) Key { if req == nil { return nil } @@ -48,6 +48,12 @@ func NewKeyFromRequest(req *http.Request) Key { keyURL.Host = reqHost } + if httpRoutingHeaderKey != "" { + if httpRoutingHeaderValue := req.Header.Get(httpRoutingHeaderKey); httpRoutingHeaderValue != "" { + keyURL.Host = httpRoutingHeaderValue + } + } + return NewKeyFromURL(&keyURL) } diff --git a/pkg/routing/key_test.go b/pkg/routing/key_test.go index 092dcff11..b684a387a 100644 --- a/pkg/routing/key_test.go +++ b/pkg/routing/key_test.go @@ -2,13 +2,11 @@ package routing import ( "fmt" - "net/http" - "net/url" - + httpv1alpha1 "github.com/kedacore/http-add-on/operator/apis/http/v1alpha1" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - - httpv1alpha1 "github.com/kedacore/http-add-on/operator/apis/http/v1alpha1" + "net/http" + "net/url" ) var _ = Describe("Key", func() { @@ -125,23 +123,36 @@ var _ = Describe("Key", func() { }) Context("NewFromRequest", func() { + const ( + host = "kubernetes.io" + path = "abc/def" + norm0 = "//kubernetes.io/abc/def/" + norm1 = "//get-thing/abc/def/" + serviceHeader = "x-service-action-a" + serviceHost = "get-thing" + ) + It("returns expected key for Request", func() { - const ( - host = "kubernetes.io" - path = "abc/def" - norm = "//kubernetes.io/abc/def/" - ) + r, err := http.NewRequest("GET", fmt.Sprintf("https://%s:443/%s?123=456#789", host, path), nil) + Expect(err).NotTo(HaveOccurred()) + Expect(r).NotTo(BeNil()) + + key := NewKeyFromRequest(r, "") + Expect(key).To(Equal(Key(norm0))) + }) + It("returns service host for Request with http routing header", func() { r, err := http.NewRequest("GET", fmt.Sprintf("https://%s:443/%s?123=456#789", host, path), nil) Expect(err).NotTo(HaveOccurred()) Expect(r).NotTo(BeNil()) + r.Header.Set(serviceHeader, serviceHost) - key := NewKeyFromRequest(r) - Expect(key).To(Equal(Key(norm))) + key := NewKeyFromRequest(r, serviceHeader) + Expect(key).To(Equal(Key(norm1))) }) It("returns nil for nil Request", func() { - key := NewKeyFromRequest(nil) + key := NewKeyFromRequest(nil, "") Expect(key).To(BeNil()) }) }) diff --git a/pkg/routing/table.go b/pkg/routing/table.go index 4cf8a5309..c440a4941 100644 --- a/pkg/routing/table.go +++ b/pkg/routing/table.go @@ -42,14 +42,16 @@ type table struct { memoryHolder util.AtomicValue[TableMemory] memorySignaler util.Signaler queueCounter queue.Counter + routingHeader string } -func NewTable(sharedInformerFactory externalversions.SharedInformerFactory, namespace string, counter queue.Counter) (Table, error) { +func NewTable(sharedInformerFactory externalversions.SharedInformerFactory, namespace string, counter queue.Counter, routingHeader string) (Table, error) { httpScaledObjects := informershttpv1alpha1.New(sharedInformerFactory, namespace, nil).HTTPScaledObjects() t := table{ httpScaledObjects: make(map[types.NamespacedName]*httpv1alpha1.HTTPScaledObject), memorySignaler: util.NewSignaler(), + routingHeader: routingHeader, } informer, ok := httpScaledObjects.Informer().(sharedIndexInformer) @@ -134,7 +136,7 @@ func (t *table) Route(req *http.Request) *httpv1alpha1.HTTPScaledObject { return nil } - key := NewKeyFromRequest(req) + key := NewKeyFromRequest(req, t.routingHeader) return tm.Route(key) } diff --git a/pkg/routing/table_test.go b/pkg/routing/table_test.go index e0bbf478b..6f9d17832 100644 --- a/pkg/routing/table_test.go +++ b/pkg/routing/table_test.go @@ -22,7 +22,8 @@ import ( var _ = Describe("Table", func() { const ( - namespace = "default" + namespace = "default" + routingHeader = "X-HTTP-Routing" ) var ( @@ -111,7 +112,7 @@ var _ = Describe("Table", func() { Context("New", func() { It("returns a table with fields initialized", func() { - i, err := NewTable(sharedInformerFactory, namespace, queue.NewFakeCounter()) + i, err := NewTable(sharedInformerFactory, namespace, queue.NewFakeCounter(), routingHeader) Expect(err).NotTo(HaveOccurred()) Expect(i).NotTo(BeNil()) @@ -136,7 +137,7 @@ var _ = Describe("Table", func() { ) BeforeEach(func() { - i, _ := NewTable(sharedInformerFactory, namespace, queue.NewFakeCounter()) + i, _ := NewTable(sharedInformerFactory, namespace, queue.NewFakeCounter(), routingHeader) t = i.(*table) }) @@ -181,7 +182,7 @@ var _ = Describe("Table", func() { ) BeforeEach(func() { - i, _ := NewTable(sharedInformerFactory, namespace, queue.NewFakeCounter()) + i, _ := NewTable(sharedInformerFactory, namespace, queue.NewFakeCounter(), routingHeader) t = i.(*table) }) @@ -279,7 +280,7 @@ var _ = Describe("Table", func() { ) BeforeEach(func() { - i, _ := NewTable(sharedInformerFactory, namespace, queue.NewFakeCounter()) + i, _ := NewTable(sharedInformerFactory, namespace, queue.NewFakeCounter(), routingHeader) t = i.(*table) }) From 738cc6eb1087898c20a4233f794917de8a8d2888 Mon Sep 17 00:00:00 2001 From: Justin Reasoner Date: Mon, 28 Oct 2024 13:50:32 -0400 Subject: [PATCH 2/2] add changelog Signed-off-by: Justin Reasoner --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 065954515..abdfa1643 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,7 +25,7 @@ This changelog keeps track of work items that have been completed and are ready - **General**: Support portName in HTTPScaledObject service scaleTargetRef ([#1174](https://github.com/kedacore/http-add-on/issues/1174)) - **General**: Support setting multiple TLS certs for different domains on the interceptor proxy ([#1116](https://github.com/kedacore/http-add-on/issues/1116)) -- **General**: TODO ([#TODO](https://github.com/kedacore/http-add-on/issues/TODO)) +- **General**: Support for http header routing ([#1177](https://github.com/kedacore/http-add-on/issues/1177)) ### Improvements