Skip to content

Commit

Permalink
add support for http header routing
Browse files Browse the repository at this point in the history
Signed-off-by: Justin Reasoner <[email protected]>
  • Loading branch information
gjreasoner committed Oct 28, 2024
1 parent fb3e48b commit 8975d99
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 24 deletions.
3 changes: 3 additions & 0 deletions interceptor/config/serving.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ type Serving struct {
TLSCertStorePaths string `envconfig:"KEDA_HTTP_PROXY_TLS_CERT_STORE_PATHS" default:""`
// TLSPort is the port that the server should serve on if TLS is enabled
TLSPort int `envconfig:"KEDA_HTTP_PROXY_TLS_PORT" default:"8443"`
// RoutingHeader is an optional header that can be used to route requests
// to different backends when set and sent in the HTTP request
RoutingHeader string `envconfig:"KEDA_HTTP_ROUTING_HEADER" default:""`
}

// Parse parses standard configs using envconfig and returns a pointer to the
Expand Down
2 changes: 1 addition & 1 deletion interceptor/handler/static.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (sh *Static) ServeHTTP(w http.ResponseWriter, r *http.Request) {
stream := util.StreamFromContext(ctx)

statusText := http.StatusText(sh.statusCode)
routingKey := routing.NewKeyFromRequest(r)
routingKey := routing.NewKeyFromRequest(r, "")
namespacedName := k8s.NamespacedNameFromObject(httpso)
logger.Error(sh.err, statusText, "routingKey", routingKey, "namespacedName", namespacedName, "stream", stream)

Expand Down
4 changes: 3 additions & 1 deletion interceptor/handler/static_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ var _ = Describe("ServeHTTP", func() {
st = http.StatusText(sc)

se = errors.New("test error")

rh = ""
)

BeforeEach(func() {
Expand All @@ -47,7 +49,7 @@ var _ = Describe("ServeHTTP", func() {
err := json.Unmarshal([]byte(obj), &m)
Expect(err).NotTo(HaveOccurred())

rk := routing.NewKeyFromRequest(r)
rk := routing.NewKeyFromRequest(r, rh)
Expect(m).To(HaveKeyWithValue("error", se.Error()))
Expect(m).To(HaveKeyWithValue("msg", st))
Expect(m).To(HaveKeyWithValue("routingKey", rk.String()))
Expand Down
2 changes: 1 addition & 1 deletion interceptor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func main() {
queues := queue.NewMemory()

sharedInformerFactory := informers.NewSharedInformerFactory(httpCl, servingCfg.ConfigMapCacheRsyncPeriod)
routingTable, err := routing.NewTable(sharedInformerFactory, servingCfg.WatchNamespace, queues)
routingTable, err := routing.NewTable(sharedInformerFactory, servingCfg.WatchNamespace, queues, servingCfg.RoutingHeader)
if err != nil {
setupLog.Error(err, "fetching routing table")
os.Exit(1)
Expand Down
8 changes: 7 additions & 1 deletion pkg/routing/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func NewKeyFromURL(url *url.URL) Key {
return NewKey(url.Host, url.Path)
}

func NewKeyFromRequest(req *http.Request) Key {
func NewKeyFromRequest(req *http.Request, httpRoutingHeaderKey string) Key {
if req == nil {
return nil
}
Expand All @@ -48,6 +48,12 @@ func NewKeyFromRequest(req *http.Request) Key {
keyURL.Host = reqHost
}

if httpRoutingHeaderKey != "" {
if httpRoutingHeaderValue := req.Header.Get(httpRoutingHeaderKey); httpRoutingHeaderValue != "" {
keyURL.Host = httpRoutingHeaderValue
}
}

return NewKeyFromURL(&keyURL)
}

Expand Down
37 changes: 24 additions & 13 deletions pkg/routing/key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@ package routing

import (
"fmt"
"net/http"
"net/url"

httpv1alpha1 "github.com/kedacore/http-add-on/operator/apis/http/v1alpha1"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

httpv1alpha1 "github.com/kedacore/http-add-on/operator/apis/http/v1alpha1"
"net/http"
"net/url"
)

var _ = Describe("Key", func() {
Expand Down Expand Up @@ -125,23 +123,36 @@ var _ = Describe("Key", func() {
})

Context("NewFromRequest", func() {
const (
host = "kubernetes.io"
path = "abc/def"
norm0 = "//kubernetes.io/abc/def/"
norm1 = "//get-thing/abc/def/"
serviceHeader = "x-service-action-a"
serviceHost = "get-thing"
)

It("returns expected key for Request", func() {
const (
host = "kubernetes.io"
path = "abc/def"
norm = "//kubernetes.io/abc/def/"
)
r, err := http.NewRequest("GET", fmt.Sprintf("https://%s:443/%s?123=456#789", host, path), nil)
Expect(err).NotTo(HaveOccurred())
Expect(r).NotTo(BeNil())

key := NewKeyFromRequest(r, "")
Expect(key).To(Equal(Key(norm0)))
})

It("returns service host for Request with http routing header", func() {
r, err := http.NewRequest("GET", fmt.Sprintf("https://%s:443/%s?123=456#789", host, path), nil)
Expect(err).NotTo(HaveOccurred())
Expect(r).NotTo(BeNil())
r.Header.Set(serviceHeader, serviceHost)

key := NewKeyFromRequest(r)
Expect(key).To(Equal(Key(norm)))
key := NewKeyFromRequest(r, serviceHeader)
Expect(key).To(Equal(Key(norm1)))
})

It("returns nil for nil Request", func() {
key := NewKeyFromRequest(nil)
key := NewKeyFromRequest(nil, "")
Expect(key).To(BeNil())
})
})
Expand Down
6 changes: 4 additions & 2 deletions pkg/routing/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,16 @@ type table struct {
memoryHolder util.AtomicValue[TableMemory]
memorySignaler util.Signaler
queueCounter queue.Counter
routingHeader string
}

func NewTable(sharedInformerFactory externalversions.SharedInformerFactory, namespace string, counter queue.Counter) (Table, error) {
func NewTable(sharedInformerFactory externalversions.SharedInformerFactory, namespace string, counter queue.Counter, routingHeader string) (Table, error) {
httpScaledObjects := informershttpv1alpha1.New(sharedInformerFactory, namespace, nil).HTTPScaledObjects()

t := table{
httpScaledObjects: make(map[types.NamespacedName]*httpv1alpha1.HTTPScaledObject),
memorySignaler: util.NewSignaler(),
routingHeader: routingHeader,
}

informer, ok := httpScaledObjects.Informer().(sharedIndexInformer)
Expand Down Expand Up @@ -134,7 +136,7 @@ func (t *table) Route(req *http.Request) *httpv1alpha1.HTTPScaledObject {
return nil
}

key := NewKeyFromRequest(req)
key := NewKeyFromRequest(req, t.routingHeader)
return tm.Route(key)
}

Expand Down
11 changes: 6 additions & 5 deletions pkg/routing/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import (

var _ = Describe("Table", func() {
const (
namespace = "default"
namespace = "default"
routingHeader = "X-HTTP-Routing"
)

var (
Expand Down Expand Up @@ -111,7 +112,7 @@ var _ = Describe("Table", func() {

Context("New", func() {
It("returns a table with fields initialized", func() {
i, err := NewTable(sharedInformerFactory, namespace, queue.NewFakeCounter())
i, err := NewTable(sharedInformerFactory, namespace, queue.NewFakeCounter(), routingHeader)
Expect(err).NotTo(HaveOccurred())
Expect(i).NotTo(BeNil())

Expand All @@ -136,7 +137,7 @@ var _ = Describe("Table", func() {
)

BeforeEach(func() {
i, _ := NewTable(sharedInformerFactory, namespace, queue.NewFakeCounter())
i, _ := NewTable(sharedInformerFactory, namespace, queue.NewFakeCounter(), routingHeader)
t = i.(*table)
})

Expand Down Expand Up @@ -181,7 +182,7 @@ var _ = Describe("Table", func() {
)

BeforeEach(func() {
i, _ := NewTable(sharedInformerFactory, namespace, queue.NewFakeCounter())
i, _ := NewTable(sharedInformerFactory, namespace, queue.NewFakeCounter(), routingHeader)
t = i.(*table)
})

Expand Down Expand Up @@ -279,7 +280,7 @@ var _ = Describe("Table", func() {
)

BeforeEach(func() {
i, _ := NewTable(sharedInformerFactory, namespace, queue.NewFakeCounter())
i, _ := NewTable(sharedInformerFactory, namespace, queue.NewFakeCounter(), routingHeader)
t = i.(*table)
})

Expand Down

0 comments on commit 8975d99

Please sign in to comment.