Skip to content

Commit

Permalink
feat: don't configure Kong Gateways with incompatible router flavor
Browse files Browse the repository at this point in the history
  • Loading branch information
pmalek committed Oct 31, 2023
1 parent cc4d68a commit 51acbbb
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 34 deletions.
97 changes: 81 additions & 16 deletions internal/clients/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@ import (

"github.com/go-logr/logr"
"github.com/samber/lo"
"github.com/samber/mo"
"golang.org/x/exp/maps"

"github.com/kong/kubernetes-ingress-controller/v3/internal/adminapi"
"github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/configuration"
"github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/sendconfig"
"github.com/kong/kubernetes-ingress-controller/v3/internal/manager/utils/kongconfig"
"github.com/kong/kubernetes-ingress-controller/v3/internal/util"
"github.com/kong/kubernetes-ingress-controller/v3/internal/util/clock"
)
Expand Down Expand Up @@ -70,12 +74,48 @@ type AdminAPIClientsManager struct {
// This client is used to synchronise configuration with Konnect's Control Plane Admin API.
konnectClient *adminapi.KonnectClient

// clientRequirements are checked against clients before passing those to subscribers.
clientRequirements ClientRequirements

// lock prevents concurrent access to the manager's fields.
lock sync.RWMutex

logger logr.Logger
}

type ClientRequirements struct {
logger logr.Logger
RouterFlavor mo.Option[configuration.RouterFlavor]
}

func (cr ClientRequirements) Validate(ctx context.Context, cl sendconfig.AdminAPIClient) bool {
if routerFlavor, ok := cr.RouterFlavor.Get(); ok {
logger := cr.logger.WithValues("client", cl.BaseRootURL())
root, err := cl.AdminAPIClient().Root(ctx)
if err != nil {
logger.V(util.DebugLevel).Info("Failed fetching configuration root")
return false
}

f, err := kongconfig.RouterFlavorFromRoot(root)
if err != nil {
logger.V(util.DebugLevel).Info("Failed getting router flavor from configuration root")
return false
}

if routerFlavor != f {
logger.Error(err,
"Unexpected router flavor. Not taking it into account for pushing configuration.",
"expected", routerFlavor, "actual", f,
)

return false
}
}

return true
}

type AdminAPIClientsManagerOption func(*AdminAPIClientsManager)

// WithReadinessReconciliationTicker allows to set a custom ticker for readiness reconciliation loop.
Expand All @@ -85,6 +125,21 @@ func WithReadinessReconciliationTicker(ticker Ticker) AdminAPIClientsManagerOpti
}
}

// WithClientRequirements allows to set custom client requirements.
func WithClientRequirements(cr ClientRequirements) AdminAPIClientsManagerOption {
return func(m *AdminAPIClientsManager) {
m.clientRequirements = cr
}
}

// // WithAdminAPIRequirements allows to set a custom Admin API requirement checker
// // which is being used to check the discovered AdminAPI instances.
// func WithClientRequirements() AdminAPIClientsManagerOption {
// return func(m *AdminAPIClientsManager) {
// // m.readinessReconciliationTicker =
// }
// }

func NewAdminAPIClientsManager(
ctx context.Context,
logger logr.Logger,
Expand Down Expand Up @@ -126,7 +181,7 @@ func (c *AdminAPIClientsManager) Running() chan struct{} {
// It should only be called when Gateway Discovery is enabled.
func (c *AdminAPIClientsManager) Run() {
c.onceNotifyLoopRunning.Do(func() {
go c.gatewayClientsReconciliationLoop()
go c.gatewayClientsReconciliationLoop(c.ctx)

c.lock.Lock()
defer c.lock.Unlock()
Expand Down Expand Up @@ -206,45 +261,45 @@ func (c *AdminAPIClientsManager) SubscribeToGatewayClientsChanges() (<-chan stru
// gatewayClientsReconciliationLoop is an inner loop listening on:
// - discoveredAdminAPIsNotifyChan - triggered on every Notify() call.
// - readinessReconciliationTicker - triggered on every readinessReconciliationTicker tick.
func (c *AdminAPIClientsManager) gatewayClientsReconciliationLoop() {
func (c *AdminAPIClientsManager) gatewayClientsReconciliationLoop(ctx context.Context) {
c.readinessReconciliationTicker.Reset(DefaultReadinessReconciliationInterval)
defer c.readinessReconciliationTicker.Stop()

close(c.runningChan)
for {
select {
case <-c.ctx.Done():
c.logger.V(util.InfoLevel).Info("closing AdminAPIClientsManager", "reason", c.ctx.Err())
case <-ctx.Done():
c.logger.V(util.InfoLevel).Info("closing AdminAPIClientsManager", "reason", ctx.Err())
c.closeGatewayClientsSubscribers()
return
case discoveredAdminAPIs := <-c.discoveredAdminAPIsNotifyChan:
c.onDiscoveredAdminAPIsNotification(discoveredAdminAPIs)
c.onDiscoveredAdminAPIsNotification(ctx, discoveredAdminAPIs)
case <-c.readinessReconciliationTicker.Channel():
c.onReadinessReconciliationTick()
c.onReadinessReconciliationTick(ctx)
}
}
}

// onDiscoveredAdminAPIsNotification is called when a new notification about Admin API addresses change is received.
// It will adjust lists of gateway clients and notify subscribers about the change if readyGatewayClients list has
// changed.
func (c *AdminAPIClientsManager) onDiscoveredAdminAPIsNotification(discoveredAdminAPIs []adminapi.DiscoveredAdminAPI) {
func (c *AdminAPIClientsManager) onDiscoveredAdminAPIsNotification(ctx context.Context, discoveredAdminAPIs []adminapi.DiscoveredAdminAPI) {
c.logger.V(util.DebugLevel).Info("received notification about Admin API addresses change")

clientsChanged := c.adjustGatewayClients(discoveredAdminAPIs)
readinessChanged := c.reconcileGatewayClientsReadiness()
readinessChanged := c.reconcileGatewayClientsReadiness(ctx)
if clientsChanged || readinessChanged {
c.notifyGatewayClientsSubscribers()
c.notifyGatewayClientsSubscribers(ctx)
}
}

// onReadinessReconciliationTick is called on every readinessReconciliationTicker tick. It will reconcile readiness
// of all gateway clients and notify subscribers about the change if readyGatewayClients list has changed.
func (c *AdminAPIClientsManager) onReadinessReconciliationTick() {
func (c *AdminAPIClientsManager) onReadinessReconciliationTick(ctx context.Context) {
c.logger.V(util.DebugLevel).Info("reconciling readiness of gateway clients")

if changed := c.reconcileGatewayClientsReadiness(); changed {
c.notifyGatewayClientsSubscribers()
if changed := c.reconcileGatewayClientsReadiness(ctx); changed {
c.notifyGatewayClientsSubscribers(ctx)
}
}

Expand Down Expand Up @@ -305,7 +360,7 @@ func (c *AdminAPIClientsManager) adjustGatewayClients(discoveredAdminAPIs []admi
// If any of the clients is not ready anymore, it will be moved to the pendingGatewayClients list. If any of the clients
// is not pending anymore, it will be moved to the readyGatewayClients list. It returns true if any transition has been
// made, false otherwise.
func (c *AdminAPIClientsManager) reconcileGatewayClientsReadiness() bool {
func (c *AdminAPIClientsManager) reconcileGatewayClientsReadiness(ctx context.Context) bool {
// Reset the ticker after each readiness reconciliation despite the trigger (whether it was a tick or a notification).
// It's to ensure that the readiness is not reconciled too often when we receive a lot of notifications.
defer c.readinessReconciliationTicker.Reset(DefaultReadinessReconciliationInterval)
Expand All @@ -319,7 +374,7 @@ func (c *AdminAPIClientsManager) reconcileGatewayClientsReadiness() bool {
}

readinessCheckResult := c.readinessChecker.CheckReadiness(
c.ctx,
ctx,
lo.MapToSlice(c.readyGatewayClients, func(_ string, cl *adminapi.Client) AlreadyCreatedClient { return cl }),
lo.Values(c.pendingGatewayClients),
)
Expand All @@ -333,15 +388,25 @@ func (c *AdminAPIClientsManager) reconcileGatewayClientsReadiness() bool {
c.pendingGatewayClients[cl.Address] = cl
}

readinessCheckResult.ClientsTurnedReady = lo.Filter(
readinessCheckResult.ClientsTurnedReady, func(cl *adminapi.Client, _ int) bool {
if ok := c.clientRequirements.Validate(ctx, cl); !ok {
delete(c.readyGatewayClients, cl.BaseRootURL())
return false
}
return true
},
)

return readinessCheckResult.HasChanges()
}

// notifyGatewayClientsSubscribers sends notifications to all subscribers that have called SubscribeToGatewayClientsChanges.
func (c *AdminAPIClientsManager) notifyGatewayClientsSubscribers() {
func (c *AdminAPIClientsManager) notifyGatewayClientsSubscribers(ctx context.Context) {
c.logger.V(util.DebugLevel).Info("notifying subscribers about gateway clients change")
for _, sub := range c.gatewayClientsChangesSubscribers {
select {
case <-c.ctx.Done():
case <-ctx.Done():
c.logger.V(util.InfoLevel).Info("not sending notification to subscribers as the context is done")
return
case sub <- struct{}{}:
Expand Down
15 changes: 15 additions & 0 deletions internal/dataplane/configuration/router_flavor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package configuration

// RouterFlavor is the type for Kong Gateway router flavors.
// Ref: https://docs.konghq.com/kubernetes-ingress-controller/latest/references/supported-router-flavors
type RouterFlavor string

const (
// RouterFlavorTraditional is one of Kong Gateway router flavors.
RouterFlavorTraditional RouterFlavor = "traditional"
// RouterFlavorTraditionalCompatible is one of Kong Gateway router flavors.
RouterFlavorTraditionalCompatible RouterFlavor = "traditional_compatible"
// RouterFlavorExpressions is one of Kong Gateway router flavors.
// Ref: https://docs.konghq.com/gateway/latest/reference/router-expressions-language/
RouterFlavorExpressions RouterFlavor = "expressions"
)
11 changes: 4 additions & 7 deletions internal/dataplane/parser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/kong/kubernetes-ingress-controller/v3/internal/annotations"
"github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/configuration"
"github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/failures"
"github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/kongstate"
"github.com/kong/kubernetes-ingress-controller/v3/internal/gatewayapi"
Expand All @@ -36,10 +37,6 @@ import (

const (
KindGateway = gatewayapi.Kind("Gateway")

// kongRouterFlavorExpressions is the value used in router_flavor of kong configuration
// to enable expression based router of kong.
kongRouterFlavorExpressions = "expressions"
)

// -----------------------------------------------------------------------------
Expand Down Expand Up @@ -67,7 +64,7 @@ type FeatureFlags struct {
func NewFeatureFlags(
logger logr.Logger,
featureGates featuregates.FeatureGates,
routerFlavor string,
routerFlavor configuration.RouterFlavor,
updateStatusFlag bool,
) FeatureFlags {
return FeatureFlags{
Expand All @@ -80,9 +77,9 @@ func NewFeatureFlags(

func shouldEnableParserExpressionRoutes(
logger logr.Logger,
routerFlavor string,
routerFlavor configuration.RouterFlavor,
) bool {
if routerFlavor != kongRouterFlavorExpressions {
if routerFlavor != configuration.RouterFlavorExpressions {
logger.V(util.InfoLevel).Info("Gateway is running with non-expression router flavor", "flavor", routerFlavor)
return false
}
Expand Down
7 changes: 4 additions & 3 deletions internal/dataplane/parser/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/kong/kubernetes-ingress-controller/v3/internal/annotations"
"github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/configuration"
"github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/kongstate"
"github.com/kong/kubernetes-ingress-controller/v3/internal/store"
"github.com/kong/kubernetes-ingress-controller/v3/internal/util"
Expand Down Expand Up @@ -4505,7 +4506,7 @@ func TestNewFeatureFlags(t *testing.T) {
name string

featureGates map[string]bool
routerFlavor string
routerFlavor configuration.RouterFlavor
updateStatusFlag bool

expectedFeatureFlags FeatureFlags
Expand All @@ -4514,15 +4515,15 @@ func TestNewFeatureFlags(t *testing.T) {
{
name: "default",
featureGates: map[string]bool{},
routerFlavor: "traditional",
routerFlavor: configuration.RouterFlavorTraditional,
updateStatusFlag: true,
expectedFeatureFlags: FeatureFlags{
ReportConfiguredKubernetesObjects: true,
},
},
{
name: "expression routes feature gate enabled and router flavor matches",
routerFlavor: kongRouterFlavorExpressions,
routerFlavor: configuration.RouterFlavorExpressions,
expectedFeatureFlags: FeatureFlags{
ExpressionRoutes: true,
},
Expand Down
4 changes: 4 additions & 0 deletions internal/manager/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/avast/retry-go/v4"
"github.com/blang/semver/v4"
"github.com/go-logr/logr"
"github.com/samber/mo"
"k8s.io/apimachinery/pkg/util/sets"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
Expand Down Expand Up @@ -153,6 +154,9 @@ func Run(
logger,
initialKongClients,
readinessChecker,
clients.WithClientRequirements(clients.ClientRequirements{
RouterFlavor: mo.Some(routerFlavor),
}),
)
if err != nil {
return fmt.Errorf("failed to create AdminAPIClientsManager: %w", err)
Expand Down
7 changes: 4 additions & 3 deletions internal/manager/utils/kongconfig/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ import (
"golang.org/x/sync/errgroup"

"github.com/kong/kubernetes-ingress-controller/v3/internal/adminapi"
"github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/configuration"
)

// KongStartUpOptions includes start up configurations of Kong that could change behavior of Kong Ingress Controller.
// The fields are extracted from results of Kong gateway configuration root.
type KongStartUpOptions struct {
DBMode string
RouterFlavor string
RouterFlavor configuration.RouterFlavor
Version kong.Version
}

Expand Down Expand Up @@ -94,7 +95,7 @@ func DBModeFromRoot(r Root) (string, error) {
return dbModeStr, nil
}

func RouterFlavorFromRoot(r Root) (string, error) {
func RouterFlavorFromRoot(r Root) (configuration.RouterFlavor, error) {
rootConfig, err := extractConfigurationFromRoot(r)
if err != nil {
return "", err
Expand All @@ -109,7 +110,7 @@ func RouterFlavorFromRoot(r Root) (string, error) {
if !ok {
return "", fmt.Errorf("invalid %q type, expected a string, got %T", routerFlavorKey, routerFlavor)
}
return routerFlavorStr, nil
return configuration.RouterFlavor(routerFlavorStr), nil
}

func KongVersionFromRoot(r Root) (kong.Version, error) {
Expand Down
5 changes: 3 additions & 2 deletions internal/manager/utils/kongconfig/root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/configuration"
"github.com/kong/kubernetes-ingress-controller/v3/internal/versions"
)

Expand All @@ -22,14 +23,14 @@ func TestValidateRoots(t *testing.T) {
name string
configStr string
expectedDBMode string
expectedRouterFlavor string
expectedRouterFlavor configuration.RouterFlavor
expectedKongVersion string
}{
{
name: "dbless config with version 3.4.1",
configStr: dblessConfigJSON3_4_1,
expectedDBMode: "off",
expectedRouterFlavor: "traditional_compatible",
expectedRouterFlavor: configuration.RouterFlavorTraditionalCompatible,
expectedKongVersion: versions.KICv3VersionCutoff.String(),
},
}
Expand Down
5 changes: 3 additions & 2 deletions test/integration/version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/configuration"
"github.com/kong/kubernetes-ingress-controller/v3/test"
"github.com/kong/kubernetes-ingress-controller/v3/test/consts"
"github.com/kong/kubernetes-ingress-controller/v3/test/internal/helpers"
Expand Down Expand Up @@ -101,12 +102,12 @@ func eventuallyGetKongDBMode(t *testing.T, adminURL *url.URL) string {
return dbmode
}

func eventuallyGetKongRouterFlavor(t *testing.T, adminURL *url.URL) string {
func eventuallyGetKongRouterFlavor(t *testing.T, adminURL *url.URL) configuration.RouterFlavor {
t.Helper()

var (
err error
routerFlavor string
routerFlavor configuration.RouterFlavor
)

require.EventuallyWithT(t, func(t *assert.CollectT) {
Expand Down
Loading

0 comments on commit 51acbbb

Please sign in to comment.