diff --git a/internal/adminapi/konnect.go b/internal/adminapi/konnect.go index 5ab899cea5..e5fae99620 100644 --- a/internal/adminapi/konnect.go +++ b/internal/adminapi/konnect.go @@ -114,3 +114,30 @@ func KonnectHTTPDoer() kong.Doer { return resp, nil } } + +// KonnectClientFactory is a factory to create KonnectClient instances. +type KonnectClientFactory struct { + konnectConfig KonnectConfig + logger logr.Logger +} + +// NewKonnectClientFactory creates a new KonnectClientFactory instance. +func NewKonnectClientFactory(konnectConfig KonnectConfig, logger logr.Logger) *KonnectClientFactory { + return &KonnectClientFactory{ + konnectConfig: konnectConfig, + logger: logger, + } +} + +// NewKonnectClient create a new KonnectClient instance, ensuring the connection to Konnect Admin API. +// Please note it may block for a few seconds while trying to connect to Konnect Admin API. +func (f *KonnectClientFactory) NewKonnectClient(ctx context.Context) (*KonnectClient, error) { + konnectAdminAPIClient, err := NewKongClientForKonnectControlPlane(f.konnectConfig) + if err != nil { + return nil, fmt.Errorf("failed creating Konnect Control Plane Admin API client: %w", err) + } + if err := EnsureKonnectConnection(ctx, konnectAdminAPIClient.AdminAPIClient(), f.logger); err != nil { + return nil, fmt.Errorf("failed to ensure connection to Konnect Admin API: %w", err) + } + return konnectAdminAPIClient, nil +} diff --git a/internal/clients/config_status.go b/internal/clients/config_status.go index 84bbe000d3..38a48dd3ea 100644 --- a/internal/clients/config_status.go +++ b/internal/clients/config_status.go @@ -32,6 +32,7 @@ type GatewayConfigApplyStatus struct { // KonnectConfigUploadStatus stores the status of uploading configuration to Konnect. type KonnectConfigUploadStatus struct { + // Failed indicates whether the config upload to Konnect failed. Failed bool } diff --git a/internal/clients/manager.go b/internal/clients/manager.go index f71283e0f1..9dd3fa8e0e 100644 --- a/internal/clients/manager.go +++ b/internal/clients/manager.go @@ -31,7 +31,6 @@ type ClientFactory interface { // AdminAPIClientsProvider allows fetching the most recent list of Admin API clients of Gateways that // we should configure. type AdminAPIClientsProvider interface { - KonnectClient() *adminapi.KonnectClient GatewayClients() []*adminapi.Client GatewayClientsToConfigure() []*adminapi.Client } @@ -75,10 +74,6 @@ type AdminAPIClientsManager struct { // readinessReconciliationTicker is used to run readiness reconciliation loop. readinessReconciliationTicker Ticker - // konnectClient represents a special-case of the data-plane which is Konnect cloud. - // This client is used to synchronise configuration with Konnect's Control Plane Admin API. - konnectClient *adminapi.KonnectClient - // lock prevents concurrent access to the manager's fields. lock sync.RWMutex @@ -177,20 +172,6 @@ func (c *AdminAPIClientsManager) Notify(discoveredAPIs []adminapi.DiscoveredAdmi } } -// SetKonnectClient sets a client that will be used to communicate with Konnect Control Plane Admin API. -// If called multiple times, it will override the client. -func (c *AdminAPIClientsManager) SetKonnectClient(client *adminapi.KonnectClient) { - c.lock.Lock() - defer c.lock.Unlock() - c.konnectClient = client -} - -func (c *AdminAPIClientsManager) KonnectClient() *adminapi.KonnectClient { - c.lock.RLock() - defer c.lock.RUnlock() - return c.konnectClient -} - // GatewayClients returns a copy of current client's slice. Konnect client won't be included. // This method can be used when some actions need to be performed only against Kong Gateway clients. func (c *AdminAPIClientsManager) GatewayClients() []*adminapi.Client { diff --git a/internal/clients/manager_test.go b/internal/clients/manager_test.go index 8657d17d8a..89d9f68aa4 100644 --- a/internal/clients/manager_test.go +++ b/internal/clients/manager_test.go @@ -207,12 +207,6 @@ func TestAdminAPIClientsManager_Clients(t *testing.T) { require.Len(t, m.GatewayClients(), 1, "expecting one initial client") require.Equal(t, m.GatewayClientsCount(), 1, "expecting one initial client") require.Len(t, m.GatewayClientsToConfigure(), 1, "Expecting one initial client") - - konnectTestClient := &adminapi.KonnectClient{} - m.SetKonnectClient(konnectTestClient) - require.Len(t, m.GatewayClients(), 1, "konnect client should not be returned from GatewayClients") - require.Equal(t, m.GatewayClientsCount(), 1, "konnect client should not be counted in GatewayClientsCount") - require.Equal(t, konnectTestClient, m.KonnectClient(), "konnect client should be returned from KonnectClient") } func TestAdminAPIClientsManager_Clients_DBMode(t *testing.T) { diff --git a/internal/dataplane/deckgen/deckgen.go b/internal/dataplane/deckgen/deckgen.go index 38e02bd727..dfa4a07509 100644 --- a/internal/dataplane/deckgen/deckgen.go +++ b/internal/dataplane/deckgen/deckgen.go @@ -14,7 +14,13 @@ import ( // GenerateSHA generates a SHA256 checksum of targetContent, with the purpose // of change detection. func GenerateSHA(targetContent *file.Content, customEntities map[string][]custom.Object) ([]byte, error) { + defer func() { + if r := recover(); r != nil { + fmt.Println("Recovered in GenerateSHA", r) + } + }() jsonConfig, err := gojson.Marshal(targetContent) + // jsonConfig, err := []byte(`{}`), error(nil) if err != nil { return nil, fmt.Errorf("marshaling Kong declarative configuration to JSON: %w", err) } diff --git a/internal/dataplane/kong_client.go b/internal/dataplane/kong_client.go index 2bf4bd42d3..c042e4be8e 100644 --- a/internal/dataplane/kong_client.go +++ b/internal/dataplane/kong_client.go @@ -35,7 +35,6 @@ import ( "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/sendconfig" "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/translator" "github.com/kong/kubernetes-ingress-controller/v3/internal/diagnostics" - "github.com/kong/kubernetes-ingress-controller/v3/internal/konnect" "github.com/kong/kubernetes-ingress-controller/v3/internal/logging" "github.com/kong/kubernetes-ingress-controller/v3/internal/metrics" "github.com/kong/kubernetes-ingress-controller/v3/internal/store" @@ -113,7 +112,7 @@ type KongClient struct { // prometheusMetrics is the client for shipping metrics information // updates to the prometheus exporter. - prometheusMetrics *metrics.CtrlFuncMetrics + prometheusMetrics metrics.Recorder // kubernetesObjectReportLock is a mutex for thread-safety of // kubernetes object reporting functionality. @@ -180,9 +179,12 @@ type KongClient struct { // lastValidCacheSnapshot can also represent the fallback cache snapshot that was successfully synced with gateways. lastValidCacheSnapshot *store.CacheStores - // konnectConfigSynchronizer receives latest successfully applied Kong configuration from KongClient - // and uploads it to Konnect. - konnectConfigSynchronizer *konnect.ConfigSynchronizer + // konnectKongStateUpdater is used to update the state of the Kong Gateway in Konnect. + konnectKongStateUpdater KonnectKongStateUpdater +} + +type KonnectKongStateUpdater interface { + UpdateKongState(ctx context.Context, kongState *kongstate.KongState, isFallback bool) } // NewKongClient provides a new KongClient object after connecting to the @@ -201,12 +203,13 @@ func NewKongClient( kongConfigBuilder KongConfigBuilder, cacheStores *store.CacheStores, fallbackConfigGenerator FallbackConfigGenerator, + metricsRecorder metrics.Recorder, ) (*KongClient, error) { c := &KongClient{ logger: logger, requestTimeout: timeout, diagnostic: diagnostic, - prometheusMetrics: metrics.NewCtrlFuncMetrics(), + prometheusMetrics: metricsRecorder, cache: cacheStores, kongConfig: kongConfig, eventRecorder: eventRecorder, @@ -491,7 +494,7 @@ func (c *KongClient) Update(ctx context.Context) error { const isFallback = false shas, gatewaysSyncErr := c.sendOutToGatewayClients(ctx, parsingResult.KongState, c.kongConfig, isFallback) - // Taking into account the results of syncing configuration with Gateways and Konnect, and potential translation + // Taking into account the results of syncing configuration with Gateways and potential translation // failures, calculate the config status and update it. c.configStatusNotifier.NotifyGatewayConfigStatus(ctx, clients.GatewayConfigApplyStatus{ TranslationFailuresOccurred: len(parsingResult.TranslationFailures) > 0, @@ -513,7 +516,7 @@ func (c *KongClient) Update(ctx context.Context) error { } // Send configuration to Konnect ONLY when successfully applied configuration to gateways. - c.maybeSendOutToKonnectClient(ctx, parsingResult.KongState, c.kongConfig, isFallback) + c.maybeSendOutToKonnectClient(ctx, parsingResult.KongState, isFallback) // Gateways were successfully synced with the current configuration, so we can update the last valid cache snapshot. c.maybePreserveTheLastValidConfigCache(cacheSnapshot) @@ -633,7 +636,7 @@ func (c *KongClient) tryRecoveringWithFallbackConfiguration( if gatewaysSyncErr != nil { return fmt.Errorf("failed to sync fallback configuration with gateways: %w", gatewaysSyncErr) } - c.maybeSendOutToKonnectClient(ctx, fallbackParsingResult.KongState, c.kongConfig, isFallback) + c.maybeSendOutToKonnectClient(ctx, fallbackParsingResult.KongState, isFallback) // Configuration was successfully recovered with the fallback configuration. Store the last valid configuration. c.maybePreserveTheLastValidConfigCache(fallbackCache) @@ -735,29 +738,12 @@ func (c *KongClient) sendOutToGatewayClients( func (c *KongClient) maybeSendOutToKonnectClient( ctx context.Context, s *kongstate.KongState, - config sendconfig.Config, - _ bool, + isFallback bool, ) { - if c.konnectConfigSynchronizer == nil { + if c.konnectKongStateUpdater == nil { return } - konnectClient := c.clientsProvider.KonnectClient() - if konnectClient == nil { - return - } - - if config.SanitizeKonnectConfigDumps { - s = s.SanitizedCopy(util.DefaultUUIDGenerator{}) - } - - deckGenParams := deckgen.GenerateDeckContentParams{ - SelectorTags: config.FilterTags, - ExpressionRoutes: config.ExpressionRoutes, - PluginSchemas: konnectClient.PluginSchemaStore(), - AppendStubEntityWhenConfigEmpty: false, - } - targetContent := deckgen.ToDeckContent(ctx, c.logger, s, deckGenParams) - c.konnectConfigSynchronizer.SetTargetContent(targetContent) + c.konnectKongStateUpdater.UpdateKongState(ctx, s, isFallback) } func (c *KongClient) sendToClient( @@ -769,16 +755,11 @@ func (c *KongClient) sendToClient( ) (string, error) { logger := c.logger.WithValues("url", client.AdminAPIClient().BaseRootURL()) - // If the client is Konnect and the feature flag is turned on, - // we should sanitize the configuration before sending it out. - if client.IsKonnect() && config.SanitizeKonnectConfigDumps { - s = s.SanitizedCopy(util.DefaultUUIDGenerator{}) - } deckGenParams := deckgen.GenerateDeckContentParams{ SelectorTags: config.FilterTags, ExpressionRoutes: config.ExpressionRoutes, PluginSchemas: client.PluginSchemaStore(), - AppendStubEntityWhenConfigEmpty: !client.IsKonnect() && config.InMemory, + AppendStubEntityWhenConfigEmpty: config.InMemory, } targetContent := deckgen.ToDeckContent(ctx, logger, s, deckGenParams) customEntities := make(sendconfig.CustomEntitiesByType) @@ -806,11 +787,7 @@ func (c *KongClient) sendToClient( &c.diagnostic, isFallback, ) - // Only record events on applying configuration to Kong gateway here. - // Nil error is expected to be passed to indicate success. - if !client.IsKonnect() { - c.recordApplyConfigurationEvents(err, client.BaseRootURL(), isFallback) - } + c.recordApplyConfigurationEvents(err, client.BaseRootURL(), isFallback) if err != nil { var ( rawResponseBody []byte @@ -845,19 +822,17 @@ func (c *KongClient) sendToClient( } // SetConfigStatusNotifier sets a notifier which notifies subscribers about configuration sending results. -// Currently it is used for uploading the node status to konnect control plane. +// Currently, it is used for uploading the node status to Konnect control plane. func (c *KongClient) SetConfigStatusNotifier(n clients.ConfigStatusNotifier) { c.lock.Lock() defer c.lock.Unlock() - c.configStatusNotifier = n } -func (c *KongClient) SetKonnectConfigSynchronizer(s *konnect.ConfigSynchronizer) { +func (c *KongClient) SetKonnectKongStateUpdater(u KonnectKongStateUpdater) { c.lock.Lock() defer c.lock.Unlock() - - c.konnectConfigSynchronizer = s + c.konnectKongStateUpdater = u } // ----------------------------------------------------------------------------- diff --git a/internal/dataplane/kong_client_golden_test.go b/internal/dataplane/kong_client_golden_test.go index 76c57c0aad..b0abfa27ec 100644 --- a/internal/dataplane/kong_client_golden_test.go +++ b/internal/dataplane/kong_client_golden_test.go @@ -311,6 +311,7 @@ func runKongClientGoldenTest(t *testing.T, tc kongClientGoldenTestCase) { p, &cacheStores, fallbackConfigGenerator, + mocks.MetricsRecorder{}, ) require.NoError(t, err) diff --git a/internal/dataplane/kong_client_test.go b/internal/dataplane/kong_client_test.go index 36b158f987..fc78acd828 100644 --- a/internal/dataplane/kong_client_test.go +++ b/internal/dataplane/kong_client_test.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "net/http" "slices" "strings" "sync" @@ -18,7 +17,6 @@ import ( "github.com/kong/go-database-reconciler/pkg/utils" "github.com/kong/go-kong/kong" "github.com/samber/lo" - "github.com/samber/mo" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -45,18 +43,12 @@ import ( "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/sendconfig" "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/translator" "github.com/kong/kubernetes-ingress-controller/v3/internal/diagnostics" - "github.com/kong/kubernetes-ingress-controller/v3/internal/konnect" - "github.com/kong/kubernetes-ingress-controller/v3/internal/metrics" "github.com/kong/kubernetes-ingress-controller/v3/internal/store" "github.com/kong/kubernetes-ingress-controller/v3/internal/versions" "github.com/kong/kubernetes-ingress-controller/v3/test/helpers" "github.com/kong/kubernetes-ingress-controller/v3/test/mocks" ) -var defaultKongStatus = kong.Status{ - ConfigurationHash: sendconfig.WellKnownInitialHash, -} - func TestUniqueObjects(t *testing.T) { t.Log("generating some objects to test the de-duplication of objects") ing1 := &netv1.Ingress{ @@ -152,14 +144,9 @@ var ( // mockGatewayClientsProvider is a mock implementation of dataplane.AdminAPIClientsProvider. type mockGatewayClientsProvider struct { gatewayClients []*adminapi.Client - konnectClient *adminapi.KonnectClient dbMode dpconf.DBMode } -func (p *mockGatewayClientsProvider) KonnectClient() *adminapi.KonnectClient { - return p.konnectClient -} - func (p *mockGatewayClientsProvider) GatewayClients() []*adminapi.Client { return p.gatewayClients } @@ -174,166 +161,6 @@ func (p *mockGatewayClientsProvider) GatewayClientsToConfigure() []*adminapi.Cli return p.gatewayClients[:1] } -// mockUpdateStrategy is a mock implementation of sendconfig.UpdateStrategyResolver. -type mockUpdateStrategyResolver struct { - updateCalledForURLs []string - lastUpdatedContentForURLs map[string]sendconfig.ContentWithHash - errorsToReturnOnUpdate map[string][]error - lock sync.RWMutex -} - -func newMockUpdateStrategyResolver() *mockUpdateStrategyResolver { - return &mockUpdateStrategyResolver{ - errorsToReturnOnUpdate: map[string][]error{}, - lastUpdatedContentForURLs: map[string]sendconfig.ContentWithHash{}, - } -} - -func (f *mockUpdateStrategyResolver) ResolveUpdateStrategy( - c sendconfig.UpdateClient, - _ *diagnostics.ClientDiagnostic, -) sendconfig.UpdateStrategy { - f.lock.Lock() - defer f.lock.Unlock() - - url := c.AdminAPIClient().BaseRootURL() - return &mockUpdateStrategy{onUpdate: f.updateCalledForURLCallback(url)} -} - -// returnErrorOnUpdate will cause the mockUpdateStrategy with a given Admin API URL to return an error on Update(). -// Errors will be returned following FIFO order. Each call to this function adds a new error to the queue. -func (f *mockUpdateStrategyResolver) returnErrorOnUpdate(url string) { - f.lock.Lock() - defer f.lock.Unlock() - - f.errorsToReturnOnUpdate[url] = append(f.errorsToReturnOnUpdate[url], errors.New("error on update")) -} - -// returnSpecificErrorOnUpdate will cause the mockUpdateStrategy with a given Admin API URL to return a specific error -// on Update() call. Errors will be returned following FIFO order. Each call to this function adds a new error to the queue. -func (f *mockUpdateStrategyResolver) returnSpecificErrorOnUpdate(url string, err error) { - f.lock.Lock() - defer f.lock.Unlock() - - f.errorsToReturnOnUpdate[url] = append(f.errorsToReturnOnUpdate[url], err) -} - -const mockUpdateReturnedConfigSize = 22 - -// updateCalledForURLCallback returns a function that will be called when the mockUpdateStrategy is called. -// That enables us to track which URLs were called. -func (f *mockUpdateStrategyResolver) updateCalledForURLCallback(url string) func(sendconfig.ContentWithHash) (mo.Option[int], error) { - return func(content sendconfig.ContentWithHash) (mo.Option[int], error) { - f.lock.Lock() - defer f.lock.Unlock() - - f.updateCalledForURLs = append(f.updateCalledForURLs, url) - f.lastUpdatedContentForURLs[url] = content - if errsToReturn, ok := f.errorsToReturnOnUpdate[url]; ok { - if len(errsToReturn) > 0 { - err := errsToReturn[0] - f.errorsToReturnOnUpdate[url] = errsToReturn[1:] - return mo.None[int](), err - } - return mo.Some(mockUpdateReturnedConfigSize), nil - } - return mo.Some(mockUpdateReturnedConfigSize), nil - } -} - -// assertUpdateCalledForURLs asserts that the mockUpdateStrategy was called for the given URLs. -func (f *mockUpdateStrategyResolver) assertUpdateCalledForURLs(t *testing.T, urls []string, msgAndArgs ...any) { - t.Helper() - - f.lock.RLock() - defer f.lock.RUnlock() - - if len(msgAndArgs) == 0 { - msgAndArgs = []any{"update was not called for all URLs"} - } - require.ElementsMatch(t, urls, f.updateCalledForURLs, msgAndArgs...) -} - -func (f *mockUpdateStrategyResolver) assertUpdateCalledForURLsWithGivenCount(t *testing.T, urlToCount map[string]int, msgAndArgs ...any) { - t.Helper() - - f.lock.RLock() - defer f.lock.RUnlock() - actualURLToCount := lo.CountValues(f.updateCalledForURLs) - for url, callCount := range urlToCount { - m := []any{ - fmt.Sprintf("URL %s should receive %d update calls", url, callCount), - } - m = append(m, msgAndArgs...) - require.Equal(t, callCount, actualURLToCount[url], m...) - } -} - -func (f *mockUpdateStrategyResolver) assertNoUpdateCalled(t *testing.T) { - t.Helper() - - f.lock.RLock() - defer f.lock.RUnlock() - - require.Empty(t, f.updateCalledForURLs, "update was called") -} - -func (f *mockUpdateStrategyResolver) lastUpdatedContentForURL(url string) (sendconfig.ContentWithHash, bool) { - f.lock.RLock() - defer f.lock.RUnlock() - c, ok := f.lastUpdatedContentForURLs[url] - return c, ok -} - -func (f *mockUpdateStrategyResolver) eventuallyGetLastUpdatedContentForURL( - t *testing.T, url string, waitTime, waitTick time.Duration, msgAndArgs ...any, -) sendconfig.ContentWithHash { - t.Helper() - - var content sendconfig.ContentWithHash - if len(msgAndArgs) == 0 { - msgAndArgs = []any{"update was not called for URL " + url} - } - require.Eventually(t, func() bool { - c, ok := f.lastUpdatedContentForURL(url) - if ok { - content = c - return true - } - return false - }, waitTime, waitTick, msgAndArgs...) - return content -} - -// mockUpdateStrategy is a mock implementation of sendconfig.UpdateStrategy. -type mockUpdateStrategy struct { - onUpdate func(content sendconfig.ContentWithHash) (mo.Option[int], error) -} - -func (m *mockUpdateStrategy) Update(_ context.Context, targetContent sendconfig.ContentWithHash) (n mo.Option[int], err error) { - return m.onUpdate(targetContent) -} - -func (m *mockUpdateStrategy) MetricsProtocol() metrics.Protocol { - return metrics.ProtocolDBLess -} - -func (m *mockUpdateStrategy) Type() string { - return "Mock" -} - -// mockConfigurationChangeDetector is a mock implementation of sendconfig.ConfigurationChangeDetector. -type mockConfigurationChangeDetector struct { - hasConfigurationChanged bool - status kong.Status -} - -func (m mockConfigurationChangeDetector) HasConfigurationChanged( - context.Context, []byte, []byte, *file.Content, sendconfig.KonnectAwareClient, sendconfig.StatusClient, -) (bool, error) { - return m.hasConfigurationChanged, nil -} - // mockKongLastValidConfigFetcher is a mock implementation of FallbackConfigGenerator interface. type mockFallbackConfigGenerator struct { GenerateResult store.CacheStores @@ -366,7 +193,6 @@ func (m *mockFallbackConfigGenerator) GenerateBackfillingBrokenObjects( func TestKongClientUpdate_AllExpectedClientsAreCalledAndErrorIsPropagated(t *testing.T) { var ( ctx = context.Background() - testKonnectClient = mustSampleKonnectClient(t) testGatewayClients = []*adminapi.Client{ mustSampleGatewayClient(t), mustSampleGatewayClient(t), @@ -376,22 +202,15 @@ func TestKongClientUpdate_AllExpectedClientsAreCalledAndErrorIsPropagated(t *tes testCases := []struct { name string gatewayClients []*adminapi.Client - konnectClient *adminapi.KonnectClient + withKonnectUpdater bool errorOnUpdateForURLs []string expectError bool }{ { - name: "2 gateway clients and konnect with no errors", - gatewayClients: testGatewayClients, - konnectClient: testKonnectClient, - expectError: false, - }, - { - name: "2 gateway clients and konnect with error on konnect", - gatewayClients: testGatewayClients, - konnectClient: testKonnectClient, - errorOnUpdateForURLs: []string{testKonnectClient.BaseRootURL()}, - expectError: false, + name: "2 gateway clients and konnect with no errors", + gatewayClients: testGatewayClients, + withKonnectUpdater: true, + expectError: false, }, { name: "2 gateway clients with error on one of them", @@ -400,24 +219,9 @@ func TestKongClientUpdate_AllExpectedClientsAreCalledAndErrorIsPropagated(t *tes expectError: true, }, { - name: "2 gateway clients and konnect with error on one of gateways and konnect", - gatewayClients: testGatewayClients, - errorOnUpdateForURLs: []string{ - testGatewayClients[0].BaseRootURL(), - testKonnectClient.BaseRootURL(), - }, - expectError: true, - }, - { - name: "only konnect client with no error", - konnectClient: testKonnectClient, - expectError: false, - }, - { - name: "only konnect client with error on it", - konnectClient: testKonnectClient, - errorOnUpdateForURLs: []string{testKonnectClient.BaseRootURL()}, - expectError: false, + name: "only konnect client", + withKonnectUpdater: true, + expectError: false, }, { name: "no clients at all", @@ -429,23 +233,24 @@ func TestKongClientUpdate_AllExpectedClientsAreCalledAndErrorIsPropagated(t *tes t.Run(tc.name, func(t *testing.T) { clientsProvider := &mockGatewayClientsProvider{ gatewayClients: tc.gatewayClients, - konnectClient: tc.konnectClient, } - updateStrategyResolver := newMockUpdateStrategyResolver() + updateStrategyResolver := mocks.NewUpdateStrategyResolver() for _, url := range tc.errorOnUpdateForURLs { - updateStrategyResolver.returnErrorOnUpdate(url) + updateStrategyResolver.ReturnErrorOnUpdate(url) } // always return true for HasConfigurationChanged to trigger an update - configChangeDetector := mockConfigurationChangeDetector{ - hasConfigurationChanged: true, - status: defaultKongStatus, + configChangeDetector := mocks.ConfigurationChangeDetector{ + ConfigurationChanged: true, } configBuilder := newMockKongConfigBuilder() kongRawStateGetter := &mockKongLastValidConfigFetcher{} kongClient := setupTestKongClient(t, updateStrategyResolver, clientsProvider, configChangeDetector, configBuilder, nil, kongRawStateGetter) - // Set Konnect client - if tc.konnectClient != nil { - attachKonnectConfigSynchronizer(ctx, t, kongClient, updateStrategyResolver, clientsProvider, configChangeDetector, clients.NoOpConfigStatusNotifier{}) + + // Set KonnectKongStateUpdater if needed. + var konnectUpdater *mocks.KonnectKongStateUpdater + if tc.withKonnectUpdater { + konnectUpdater = &mocks.KonnectKongStateUpdater{} + kongClient.SetKonnectKongStateUpdater(konnectUpdater) } err := kongClient.Update(ctx) @@ -459,11 +264,9 @@ func TestKongClientUpdate_AllExpectedClientsAreCalledAndErrorIsPropagated(t *tes expectedURLsCalled := lo.SliceToMap(clientsProvider.GatewayClients(), func(c *adminapi.Client) (string, int) { return c.BaseRootURL(), 1 }) - updateStrategyResolver.assertUpdateCalledForURLsWithGivenCount(t, expectedURLsCalled) - // Verify that Konnect client is called eventually. - if tc.konnectClient != nil { - // Should eventually get content in Konnect client if Konnect client enabled. - _ = updateStrategyResolver.eventuallyGetLastUpdatedContentForURL(t, tc.konnectClient.BaseRootURL(), testKonenctUploadWait, testKonnectUploadPeriod) + updateStrategyResolver.AssertUpdateCalledForURLsWithGivenCount(t, expectedURLsCalled) + if tc.withKonnectUpdater { + require.Len(t, konnectUpdater.Calls(), 1, "expected Konnect updater to be called") } }) } @@ -475,14 +278,12 @@ func TestKongClientUpdate_WhenNoChangeInConfigNoClientGetsCalled(t *testing.T) { mustSampleGatewayClient(t), mustSampleGatewayClient(t), }, - konnectClient: mustSampleKonnectClient(t), } - updateStrategyResolver := newMockUpdateStrategyResolver() + updateStrategyResolver := mocks.NewUpdateStrategyResolver() // no change in config, we'll expect no update to be called - configChangeDetector := mockConfigurationChangeDetector{ - hasConfigurationChanged: false, - status: defaultKongStatus, + configChangeDetector := mocks.ConfigurationChangeDetector{ + ConfigurationChanged: false, } configBuilder := newMockKongConfigBuilder() kongRawStateGetter := &mockKongLastValidConfigFetcher{} @@ -492,7 +293,7 @@ func TestKongClientUpdate_WhenNoChangeInConfigNoClientGetsCalled(t *testing.T) { err := kongClient.Update(ctx) require.NoError(t, err) - updateStrategyResolver.assertNoUpdateCalled(t) + updateStrategyResolver.AssertNoUpdateCalled(t) } type mockConfigStatusQueue struct { @@ -617,15 +418,13 @@ func (p *mockKongConfigBuilder) returnTranslationFailuresForAllButFirstCall(fail func TestKongClientUpdate_ConfigStatusIsNotified(t *testing.T) { var ( ctx = context.Background() - testKonnectClient = mustSampleKonnectClient(t) testGatewayClient = mustSampleGatewayClient(t) clientsProvider = &mockGatewayClientsProvider{ gatewayClients: []*adminapi.Client{testGatewayClient}, - konnectClient: testKonnectClient, } - configChangeDetector = mockConfigurationChangeDetector{hasConfigurationChanged: true} + configChangeDetector = mocks.ConfigurationChangeDetector{ConfigurationChanged: true} configBuilder = newMockKongConfigBuilder() ) @@ -652,64 +451,33 @@ func TestKongClientUpdate_ConfigStatusIsNotified(t *testing.T) { translationFailures: true, expectedStatus: clients.ConfigStatusTranslationErrorHappened, }, - { - name: "konnect failure", - konnectFailuresCount: 2, - translationFailures: false, - expectedStatus: clients.ConfigStatusOKKonnectApplyFailed, - }, - { - name: "both gateway and konnect failure", - gatewayFailuresCount: 2, - konnectFailuresCount: 2, - translationFailures: false, - expectedStatus: clients.ConfigStatusApplyFailedKonnectApplyFailed, - }, - { - name: "translation failures and konnect failure", - konnectFailuresCount: 2, - translationFailures: true, - expectedStatus: clients.ConfigStatusTranslationErrorHappenedKonnectApplyFailed, - }, + // TODO(czeslavo): move this test to Konnect synchronizer tests + // { + // name: "konnect failure", + // konnectFailuresCount: 2, + // translationFailures: false, + // expectedStatus: clients.ConfigStatusOKKonnectApplyFailed, + // }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { var ( kongRawStateGetter = &mockKongLastValidConfigFetcher{} - updateStrategyResolver = newMockUpdateStrategyResolver() + updateStrategyResolver = mocks.NewUpdateStrategyResolver() statusQueue = newMockConfigStatusQueue() kongClient = setupTestKongClient(t, updateStrategyResolver, clientsProvider, configChangeDetector, configBuilder, nil, kongRawStateGetter) ) - attachKonnectConfigSynchronizer(ctx, t, kongClient, updateStrategyResolver, clientsProvider, configChangeDetector, statusQueue) - // Set an initial content in Konnect syncer to avoid that failure on gateway update causing no target content saved in Konnect config syncer - // thus uploading config to Konnect is not triggered. - kongClient.konnectConfigSynchronizer.SetTargetContent(&file.Content{}) kongClient.SetConfigStatusNotifier(statusQueue) for range tc.gatewayFailuresCount { - updateStrategyResolver.returnErrorOnUpdate(testGatewayClient.BaseRootURL()) - } - for range tc.konnectFailuresCount { - updateStrategyResolver.returnErrorOnUpdate(testKonnectClient.BaseRootURL()) + updateStrategyResolver.ReturnErrorOnUpdate(testGatewayClient.BaseRootURL()) } configBuilder.returnTranslationFailures(tc.translationFailures) _ = kongClient.Update(ctx) gatewayNotifications := statusQueue.GatewayConfigStatusNotifications() require.Len(t, gatewayNotifications, 1, "Should receive gateway configuration status right after update") - require.Eventually( - t, func() bool { - konnectNotifications := statusQueue.KonnectConfigStatusNotifications() - return len(konnectNotifications) > 0 - }, 10*testKonnectUploadPeriod, testKonnectUploadPeriod, - "Should receive Konnect config status in time", - ) - - konnectNotifications := statusQueue.KonnectConfigStatusNotifications() - require.Equal(t, tc.expectedStatus, clients.CalculateConfigStatus( - gatewayNotifications[0], konnectNotifications[0], - )) }) } } @@ -719,8 +487,8 @@ func TestKongClient_ApplyConfigurationEvents(t *testing.T) { clientsProvider := &mockGatewayClientsProvider{ gatewayClients: []*adminapi.Client{testGatewayClient}, } - updateStrategyResolver := newMockUpdateStrategyResolver() - configChangeDetector := mockConfigurationChangeDetector{hasConfigurationChanged: true} + updateStrategyResolver := mocks.NewUpdateStrategyResolver() + configChangeDetector := mocks.ConfigurationChangeDetector{ConfigurationChanged: true} configBuilder := newMockKongConfigBuilder() testCases := []struct { @@ -777,7 +545,7 @@ func TestKongClient_KubernetesEvents(t *testing.T) { t.Setenv("POD_NAME", "test-pod") ctx := context.Background() - configChangeDetector := mockConfigurationChangeDetector{hasConfigurationChanged: true} + configChangeDetector := mocks.ConfigurationChangeDetector{ConfigurationChanged: true} testIngress := helpers.WithTypeMeta(t, &netv1.Ingress{ ObjectMeta: metav1.ObjectMeta{ Name: "obj-1", @@ -886,7 +654,7 @@ func TestKongClient_KubernetesEvents(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - updateStrategyResolver := newMockUpdateStrategyResolver() + updateStrategyResolver := mocks.NewUpdateStrategyResolver() configBuilder := newMockKongConfigBuilder() eventRecorder := mocks.NewEventRecorder() lastValidConfigFetcher := &mockKongLastValidConfigFetcher{} @@ -905,7 +673,7 @@ func TestKongClient_KubernetesEvents(t *testing.T) { } if tc.updateError { if tc.entityErrors { - updateStrategyResolver.returnSpecificErrorOnUpdate(testGatewayClient.BaseRootURL(), sendconfig.NewUpdateErrorWithoutResponseBody( + updateStrategyResolver.ReturnSpecificErrorOnUpdate(testGatewayClient.BaseRootURL(), sendconfig.NewUpdateErrorWithoutResponseBody( []failures.ResourceFailure{ lo.Must(failures.NewResourceFailure("violated constraint", testIngress)), lo.Must(failures.NewResourceFailure("violated constraint", testService)), @@ -913,11 +681,11 @@ func TestKongClient_KubernetesEvents(t *testing.T) { errors.New("error on update"), )) } else { - updateStrategyResolver.returnErrorOnUpdate(testGatewayClient.BaseRootURL()) + updateStrategyResolver.ReturnErrorOnUpdate(testGatewayClient.BaseRootURL()) } } if tc.updateError && tc.fallbackConfigurationUpdateError { - updateStrategyResolver.returnSpecificErrorOnUpdate(testGatewayClient.BaseRootURL(), sendconfig.NewUpdateErrorWithoutResponseBody( + updateStrategyResolver.ReturnSpecificErrorOnUpdate(testGatewayClient.BaseRootURL(), sendconfig.NewUpdateErrorWithoutResponseBody( []failures.ResourceFailure{ lo.Must(failures.NewResourceFailure("violated constraint", testIngress)), }, errors.New("error on update"), @@ -952,28 +720,27 @@ func TestKongClient_KubernetesEvents(t *testing.T) { func TestKongClient_EmptyConfigUpdate(t *testing.T) { var ( ctx = context.Background() - testKonnectClient = mustSampleKonnectClient(t) testGatewayClient = mustSampleGatewayClient(t) clientsProvider = &mockGatewayClientsProvider{ gatewayClients: []*adminapi.Client{testGatewayClient}, - konnectClient: testKonnectClient, } - updateStrategyResolver = newMockUpdateStrategyResolver() - configChangeDetector = mockConfigurationChangeDetector{hasConfigurationChanged: true} + updateStrategyResolver = mocks.NewUpdateStrategyResolver() + configChangeDetector = mocks.ConfigurationChangeDetector{ConfigurationChanged: true} configBuilder = newMockKongConfigBuilder() kongRawStateGetter = &mockKongLastValidConfigFetcher{} kongClient = setupTestKongClient(t, updateStrategyResolver, clientsProvider, configChangeDetector, configBuilder, nil, kongRawStateGetter) + konnectUpdater = &mocks.KonnectKongStateUpdater{} ) - attachKonnectConfigSynchronizer(ctx, t, kongClient, updateStrategyResolver, clientsProvider, configChangeDetector, newMockConfigStatusQueue()) + kongClient.SetKonnectKongStateUpdater(konnectUpdater) t.Run("dbless", func(t *testing.T) { kongClient.kongConfig.InMemory = true err := kongClient.Update(ctx) require.NoError(t, err) - gwContent, ok := updateStrategyResolver.lastUpdatedContentForURL(testGatewayClient.BaseRootURL()) + gwContent, ok := updateStrategyResolver.LastUpdatedContentForURL(testGatewayClient.BaseRootURL()) require.True(t, ok) assert.Equal(t, gwContent.Content, &file.Content{ FormatVersion: versions.DeckFileFormatVersion, @@ -986,15 +753,9 @@ func TestKongClient_EmptyConfigUpdate(t *testing.T) { }, }, "gateway content should have appended stub upstream") - var konnectContent sendconfig.ContentWithHash - require.Eventually(t, func() bool { - c, ok := updateStrategyResolver.lastUpdatedContentForURL(testKonnectClient.BaseRootURL()) - if ok { - konnectContent = c - } - return ok - }, testKonenctUploadWait, testKonnectUploadPeriod, "Konnect client should be updated in time") - require.True(t, deckgen.IsContentEmpty(konnectContent.Content), "konnect content should be empty") + konnectCalls := konnectUpdater.Calls() + require.NotEmpty(t, konnectCalls) + require.Equal(t, &kongstate.KongState{}, konnectCalls[0].KongState, "konnect config should be empty") }) t.Run("db", func(t *testing.T) { @@ -1002,24 +763,20 @@ func TestKongClient_EmptyConfigUpdate(t *testing.T) { err := kongClient.Update(ctx) require.NoError(t, err) - gwContent, ok := updateStrategyResolver.lastUpdatedContentForURL(testGatewayClient.BaseRootURL()) + gwContent, ok := updateStrategyResolver.LastUpdatedContentForURL(testGatewayClient.BaseRootURL()) require.True(t, ok) require.True(t, deckgen.IsContentEmpty(gwContent.Content), "konnect content should be empty") - var konnectContent sendconfig.ContentWithHash - var konnectContentOK bool - require.Eventually(t, func() bool { - konnectContent, konnectContentOK = updateStrategyResolver.lastUpdatedContentForURL(testKonnectClient.BaseRootURL()) - return konnectContentOK - }, testKonenctUploadWait, testKonnectUploadPeriod, "Konnect client should be updated in time") - require.True(t, deckgen.IsContentEmpty(konnectContent.Content), "konnect content should be empty") + konnectCalls := konnectUpdater.Calls() + require.NotEmpty(t, konnectCalls) + require.Equal(t, &kongstate.KongState{}, konnectCalls[0].KongState, "konnect config should be empty") }) } // setupTestKongClient creates a KongClient with mocked dependencies. func setupTestKongClient( t *testing.T, - updateStrategyResolver *mockUpdateStrategyResolver, + updateStrategyResolver *mocks.UpdateStrategyResolver, clientsProvider *mockGatewayClientsProvider, configChangeDetector sendconfig.ConfigurationChangeDetector, configBuilder *mockKongConfigBuilder, @@ -1052,42 +809,12 @@ func setupTestKongClient( configBuilder, &cacheStores, newMockFallbackConfigGenerator(), + mocks.MetricsRecorder{}, ) require.NoError(t, err) return kongClient } -const ( - testKonnectUploadPeriod = 20 * time.Millisecond - testKonenctUploadWait = 5 * testKonnectUploadPeriod -) - -func attachKonnectConfigSynchronizer( - ctx context.Context, - t *testing.T, - kc *KongClient, - updateStrategyResolver *mockUpdateStrategyResolver, - clientsProvider *mockGatewayClientsProvider, - configChangeDetector sendconfig.ConfigurationChangeDetector, - configStatusNotifier clients.ConfigStatusNotifier, -) { - config := sendconfig.Config{ - SanitizeKonnectConfigDumps: true, - } - konnectConfigSynchronizer := konnect.NewConfigSynchronizer( - logr.Discard(), - config, - testKonnectUploadPeriod, - clientsProvider, - updateStrategyResolver, - configChangeDetector, - configStatusNotifier, - ) - kc.SetKonnectConfigSynchronizer(konnectConfigSynchronizer) - err := konnectConfigSynchronizer.Start(ctx) - require.NoError(t, err) -} - func mustSampleGatewayClient(t *testing.T) *adminapi.Client { t.Helper() c, err := adminapi.NewTestClient(fmt.Sprintf("https://%s:8080", uuid.NewString())) @@ -1095,16 +822,6 @@ func mustSampleGatewayClient(t *testing.T) *adminapi.Client { return c } -func mustSampleKonnectClient(t *testing.T) *adminapi.KonnectClient { - t.Helper() - - c, err := adminapi.NewKongAPIClient(fmt.Sprintf("https://%s.konghq.tech", uuid.NewString()), &http.Client{}) - require.NoError(t, err) - - rgID := uuid.NewString() - return adminapi.NewKonnectClient(c, rgID, false) -} - type mockKongLastValidConfigFetcher struct { kongRawState *utils.KongRawState lastKongState *kongstate.KongState @@ -1139,7 +856,7 @@ func TestKongClientUpdate_FetchStoreAndPushLastValidConfig(t *testing.T) { }, } - configChangeDetector = mockConfigurationChangeDetector{hasConfigurationChanged: true} + configChangeDetector = mocks.ConfigurationChangeDetector{ConfigurationChanged: true} lastKongRawState = &utils.KongRawState{ Services: []*kong.Service{ { @@ -1225,13 +942,12 @@ func TestKongClientUpdate_FetchStoreAndPushLastValidConfig(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - updateStrategyResolver := newMockUpdateStrategyResolver() + updateStrategyResolver := mocks.NewUpdateStrategyResolver() for range tc.gatewayFailuresCount { - updateStrategyResolver.returnSpecificErrorOnUpdate(clientsProvider.gatewayClients[0].BaseRootURL(), tc.errorOnGatewayFailures) - updateStrategyResolver.returnSpecificErrorOnUpdate(clientsProvider.gatewayClients[1].BaseRootURL(), tc.errorOnGatewayFailures) + updateStrategyResolver.ReturnSpecificErrorOnUpdate(clientsProvider.gatewayClients[0].BaseRootURL(), tc.errorOnGatewayFailures) + updateStrategyResolver.ReturnSpecificErrorOnUpdate(clientsProvider.gatewayClients[1].BaseRootURL(), tc.errorOnGatewayFailures) } - configChangeDetector.status.ConfigurationHash = tc.lastKongStatusHash kongRawStateGetter := &mockKongLastValidConfigFetcher{ kongRawState: tc.lastValidKongRawState, } @@ -1259,49 +975,50 @@ func TestKongClientUpdate_FetchStoreAndPushLastValidConfig(t *testing.T) { } } -func TestKongClientUpdate_KonnectUpdatesAreSanitized(t *testing.T) { - ctx := context.Background() - clientsProvider := &mockGatewayClientsProvider{ - gatewayClients: []*adminapi.Client{mustSampleGatewayClient(t)}, - konnectClient: mustSampleKonnectClient(t), - } - updateStrategyResolver := newMockUpdateStrategyResolver() - configChangeDetector := mockConfigurationChangeDetector{hasConfigurationChanged: true} - configBuilder := newMockKongConfigBuilder() - configBuilder.kongState = &kongstate.KongState{ - Certificates: []kongstate.Certificate{ - { - Certificate: kong.Certificate{ - ID: kong.String("new_cert"), - Key: kong.String(`private-key-string`), // This should be redacted. - }, - }, - }, - } - - kongRawStateGetter := &mockKongLastValidConfigFetcher{} - kongClient := setupTestKongClient( - t, - updateStrategyResolver, - clientsProvider, - configChangeDetector, - configBuilder, - nil, - kongRawStateGetter, - ) - attachKonnectConfigSynchronizer(ctx, t, kongClient, updateStrategyResolver, clientsProvider, configChangeDetector, clients.NoOpConfigStatusNotifier{}) - require.NoError(t, kongClient.Update(ctx)) - - konnectContent := updateStrategyResolver.eventuallyGetLastUpdatedContentForURL(t, clientsProvider.konnectClient.BaseRootURL(), testKonenctUploadWait, testKonnectUploadPeriod) - require.Len(t, konnectContent.Content.Certificates, 1, "expected Konnect to have 1 certificate") - cert := konnectContent.Content.Certificates[0] - require.NotNil(t, cert.Key, "expected Konnect to have certificate key") - require.Equal(t, "{vault://redacted-value}", *cert.Key, "expected Konnect to have redacted certificate key") +// TODO(czeslavo): move this test to the KonnectConfigSynchronizer tests as it doesn't belong here. +func TestKongClientUpdate_KonnectUpdatesAreSanitized(_ *testing.T) { + // ctx := context.Background() + // konnectClient := mustSampleKonnectClient(t) + // clientsProvider := &mockGatewayClientsProvider{ + // gatewayClients: []*adminapi.Client{mustSampleGatewayClient(t)}, + // } + // updateStrategyResolver := mocks.NewUpdateStrategyResolver() + // configChangeDetector := mocks.ConfigurationChangeDetector{ConfigurationChanged: true} + // configBuilder := newMockKongConfigBuilder() + // configBuilder.kongState = &kongstate.KongState{ + // Certificates: []kongstate.Certificate{ + // { + // Certificate: kong.Certificate{ + // ID: kong.String("new_cert"), + // Key: kong.String(`private-key-string`), // This should be redacted. + // }, + // }, + // }, + // } + // + // kongRawStateGetter := &mockKongLastValidConfigFetcher{} + // kongClient := setupTestKongClient( + // t, + // updateStrategyResolver, + // clientsProvider, + // configChangeDetector, + // configBuilder, + // nil, + // kongRawStateGetter, + // ) + // attachKonnectConfigSynchronizer(ctx, t, konnectClient, kongClient, updateStrategyResolver, configChangeDetector, clients.NoOpConfigStatusNotifier{}) + // require.NoError(t, kongClient.Update(ctx)) + // + // konnectContent := updateStrategyResolver.eventuallyGetLastUpdatedContentForURL(t, konnectClient.BaseRootURL(), testKonenctUploadWait, testKonnectUploadPeriod) + // require.Len(t, konnectContent.Content.Certificates, 1, "expected Konnect to have 1 certificate") + // cert := konnectContent.Content.Certificates[0] + // require.NotNil(t, cert.Key, "expected Konnect to have certificate key") + // require.Equal(t, "{vault://redacted-value}", *cert.Key, "expected Konnect to have redacted certificate key") } func TestKongClient_FallbackConfiguration_SuccessfulRecovery(t *testing.T) { ctx := context.Background() - configChangeDetector := mockConfigurationChangeDetector{hasConfigurationChanged: true} + configChangeDetector := mocks.ConfigurationChangeDetector{ConfigurationChanged: true} lastValidConfigFetcher := &mockKongLastValidConfigFetcher{} diagnosticsCh := make(chan diagnostics.ConfigDump, 10) // make it buffered to avoid blocking @@ -1331,14 +1048,12 @@ func TestKongClient_FallbackConfiguration_SuccessfulRecovery(t *testing.T) { } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - updateStrategyResolver := newMockUpdateStrategyResolver() + updateStrategyResolver := mocks.NewUpdateStrategyResolver() configBuilder := newMockKongConfigBuilder() fallbackConfigGenerator := newMockFallbackConfigGenerator() gwClient := mustSampleGatewayClient(t) - konnectClient := mustSampleKonnectClient(t) clientsProvider := &mockGatewayClientsProvider{ gatewayClients: []*adminapi.Client{gwClient}, - konnectClient: konnectClient, } kongClient, err := NewKongClient( zapr.NewLogger(zap.NewNop()), @@ -1359,6 +1074,7 @@ func TestKongClient_FallbackConfiguration_SuccessfulRecovery(t *testing.T) { configBuilder, &originalCache, fallbackConfigGenerator, + mocks.MetricsRecorder{}, ) require.NoError(t, err) @@ -1366,7 +1082,7 @@ func TestKongClient_FallbackConfiguration_SuccessfulRecovery(t *testing.T) { kongClient.lastValidCacheSnapshot = &lastValidCache t.Log("Setting update strategy to return an error on the first call to trigger fallback configuration generation") - updateStrategyResolver.returnSpecificErrorOnUpdate(gwClient.BaseRootURL(), sendconfig.NewUpdateErrorWithoutResponseBody( + updateStrategyResolver.ReturnSpecificErrorOnUpdate(gwClient.BaseRootURL(), sendconfig.NewUpdateErrorWithoutResponseBody( []failures.ResourceFailure{ lo.Must(failures.NewResourceFailure("violated constraint", brokenConsumer)), }, @@ -1424,7 +1140,7 @@ func TestKongClient_FallbackConfiguration_SuccessfulRecovery(t *testing.T) { "expected cache to be updated with the fallback snapshot on second call") t.Log("Verifying that the update strategy was called twice for gateway") - updateStrategyResolver.assertUpdateCalledForURLsWithGivenCount( + updateStrategyResolver.AssertUpdateCalledForURLsWithGivenCount( t, map[string]int{ gwClient.BaseRootURL(): 2, @@ -1461,13 +1177,11 @@ func TestKongClient_FallbackConfiguration_SuccessfulRecovery(t *testing.T) { func TestKongClient_FallbackConfiguration_SkipsUpdateWhenInSync(t *testing.T) { ctx := context.Background() gwClient := mustSampleGatewayClient(t) - konnectClient := mustSampleKonnectClient(t) clientsProvider := &mockGatewayClientsProvider{ gatewayClients: []*adminapi.Client{gwClient}, - konnectClient: konnectClient, } - updateStrategyResolver := newMockUpdateStrategyResolver() - configChangeDetector := mockConfigurationChangeDetector{hasConfigurationChanged: true} + updateStrategyResolver := mocks.NewUpdateStrategyResolver() + configChangeDetector := mocks.ConfigurationChangeDetector{ConfigurationChanged: true} configBuilder := newMockKongConfigBuilder() lastValidConfigFetcher := &mockKongLastValidConfigFetcher{} fallbackConfigGenerator := newMockFallbackConfigGenerator() @@ -1494,6 +1208,7 @@ func TestKongClient_FallbackConfiguration_SkipsUpdateWhenInSync(t *testing.T) { configBuilder, &originalCache, fallbackConfigGenerator, + mocks.MetricsRecorder{}, ) require.NoError(t, err) @@ -1505,7 +1220,7 @@ func TestKongClient_FallbackConfiguration_SkipsUpdateWhenInSync(t *testing.T) { require.Len(t, configBuilder.updateCacheCalls, 1) t.Log("Verifying that the update strategy was called once for gateway") - updateStrategyResolver.assertUpdateCalledForURLsWithGivenCount(t, map[string]int{gwClient.BaseRootURL(): 1}) + updateStrategyResolver.AssertUpdateCalledForURLsWithGivenCount(t, map[string]int{gwClient.BaseRootURL(): 1}) }) t.Run("without clients change, on second update clients are not updated", func(t *testing.T) { @@ -1516,7 +1231,7 @@ func TestKongClient_FallbackConfiguration_SkipsUpdateWhenInSync(t *testing.T) { require.Len(t, configBuilder.updateCacheCalls, 1) t.Log("Verifying that the update strategy was not called again") - updateStrategyResolver.assertUpdateCalledForURLsWithGivenCount(t, map[string]int{gwClient.BaseRootURL(): 1}) + updateStrategyResolver.AssertUpdateCalledForURLsWithGivenCount(t, map[string]int{gwClient.BaseRootURL(): 1}) }) newGwClient := mustSampleGatewayClient(t) @@ -1531,7 +1246,7 @@ func TestKongClient_FallbackConfiguration_SkipsUpdateWhenInSync(t *testing.T) { require.Len(t, configBuilder.updateCacheCalls, 1) t.Log("Verifying that the update strategies were called for the client that was added") - updateStrategyResolver.assertUpdateCalledForURLsWithGivenCount( + updateStrategyResolver.AssertUpdateCalledForURLsWithGivenCount( t, map[string]int{ gwClient.BaseRootURL(): 2, // First series of updates + Second series of updates @@ -1544,7 +1259,7 @@ func TestKongClient_FallbackConfiguration_SkipsUpdateWhenInSync(t *testing.T) { require.NoError(t, originalCache.Add(someConsumer(t, "broken"))) // Add a consumer to cache to change the cache hash. t.Log("Setting update strategy to return an error on the first call to trigger fallback configuration generation") - updateStrategyResolver.returnSpecificErrorOnUpdate(gwClient.BaseRootURL(), sendconfig.NewUpdateErrorWithoutResponseBody( + updateStrategyResolver.ReturnSpecificErrorOnUpdate(gwClient.BaseRootURL(), sendconfig.NewUpdateErrorWithoutResponseBody( []failures.ResourceFailure{ lo.Must(failures.NewResourceFailure("violated constraint", someConsumer(t, "invalid"))), }, @@ -1555,7 +1270,7 @@ func TestKongClient_FallbackConfiguration_SkipsUpdateWhenInSync(t *testing.T) { require.Error(t, kongClient.Update(ctx)) t.Log("Verifying that the update strategy was called again for all gateways") - updateStrategyResolver.assertUpdateCalledForURLsWithGivenCount( + updateStrategyResolver.AssertUpdateCalledForURLsWithGivenCount( t, map[string]int{ gwClient.BaseRootURL(): 4, // First series of updates + Second series of updates + rejected/fallback @@ -1567,7 +1282,7 @@ func TestKongClient_FallbackConfiguration_SkipsUpdateWhenInSync(t *testing.T) { t.Run("when fallback was used before and config is still broken, after discovering a new client, all clients are updated", func(t *testing.T) { t.Log("Adding a new client to the provider") clientsProvider.gatewayClients = append(clientsProvider.gatewayClients, anotherNewGwClient) - updateStrategyResolver.returnSpecificErrorOnUpdate(gwClient.BaseRootURL(), sendconfig.NewUpdateErrorWithoutResponseBody( + updateStrategyResolver.ReturnSpecificErrorOnUpdate(gwClient.BaseRootURL(), sendconfig.NewUpdateErrorWithoutResponseBody( []failures.ResourceFailure{ lo.Must(failures.NewResourceFailure("violated constraint", someConsumer(t, "invalid"))), }, @@ -1578,7 +1293,7 @@ func TestKongClient_FallbackConfiguration_SkipsUpdateWhenInSync(t *testing.T) { require.Error(t, kongClient.Update(ctx)) t.Log("Verifying that the update strategy was called again for all gateways") - updateStrategyResolver.assertUpdateCalledForURLsWithGivenCount( + updateStrategyResolver.AssertUpdateCalledForURLsWithGivenCount( t, map[string]int{ gwClient.BaseRootURL(): 6, // First series of updates + Second series of updates + rejected/fallback * 2 @@ -1594,7 +1309,7 @@ func TestKongClient_FallbackConfiguration_SkipsUpdateWhenInSync(t *testing.T) { require.NoError(t, kongClient.Update(ctx)) t.Log("Verifying that the update strategy was called again for all gateways") - updateStrategyResolver.assertUpdateCalledForURLsWithGivenCount( + updateStrategyResolver.AssertUpdateCalledForURLsWithGivenCount( t, map[string]int{ gwClient.BaseRootURL(): 7, // First series of updates + Second series of updates + rejected/fallback * 2 + third update @@ -1606,13 +1321,11 @@ func TestKongClient_FallbackConfiguration_SkipsUpdateWhenInSync(t *testing.T) { func TestKongClient_FallbackConfiguration_FailedRecovery(t *testing.T) { ctx := context.Background() gwClient := mustSampleGatewayClient(t) - konnectClient := mustSampleKonnectClient(t) clientsProvider := &mockGatewayClientsProvider{ gatewayClients: []*adminapi.Client{gwClient}, - konnectClient: konnectClient, } - updateStrategyResolver := newMockUpdateStrategyResolver() - configChangeDetector := mockConfigurationChangeDetector{hasConfigurationChanged: true} + updateStrategyResolver := mocks.NewUpdateStrategyResolver() + configChangeDetector := mocks.ConfigurationChangeDetector{ConfigurationChanged: true} configBuilder := newMockKongConfigBuilder() lastValidConfigFetcher := &mockKongLastValidConfigFetcher{} fallbackConfigGenerator := newMockFallbackConfigGenerator() @@ -1640,11 +1353,12 @@ func TestKongClient_FallbackConfiguration_FailedRecovery(t *testing.T) { configBuilder, &originalCache, fallbackConfigGenerator, + mocks.MetricsRecorder{}, ) require.NoError(t, err) t.Log("Setting update strategy to return an error on the first call to trigger fallback configuration generation") - updateStrategyResolver.returnSpecificErrorOnUpdate(gwClient.BaseRootURL(), sendconfig.NewUpdateErrorWithoutResponseBody( + updateStrategyResolver.ReturnSpecificErrorOnUpdate(gwClient.BaseRootURL(), sendconfig.NewUpdateErrorWithoutResponseBody( []failures.ResourceFailure{ lo.Must(failures.NewResourceFailure("violated constraint", brokenConsumer)), }, @@ -1652,14 +1366,14 @@ func TestKongClient_FallbackConfiguration_FailedRecovery(t *testing.T) { )) t.Log("Setting update strategy to return an error on the second call (fallback) to trigger a failed recovery") - updateStrategyResolver.returnErrorOnUpdate(gwClient.BaseRootURL()) + updateStrategyResolver.ReturnErrorOnUpdate(gwClient.BaseRootURL()) t.Log("Calling KongClient.Update") err = kongClient.Update(ctx) require.Error(t, err) t.Log("Verifying that the update strategy was called twice for gateway") - updateStrategyResolver.assertUpdateCalledForURLsWithGivenCount( + updateStrategyResolver.AssertUpdateCalledForURLsWithGivenCount( t, map[string]int{gwClient.BaseRootURL(): 2}, "expected update to be called twice: first with the initial config, then with the fallback one") @@ -1684,8 +1398,8 @@ func TestKongClient_FallbackConfiguration_FailedRecovery(t *testing.T) { func TestKongClient_LastValidCacheSnapshot(t *testing.T) { var ( ctx = context.Background() - updateStrategyResolver = newMockUpdateStrategyResolver() - configChangeDetector = mockConfigurationChangeDetector{hasConfigurationChanged: true} + updateStrategyResolver = mocks.NewUpdateStrategyResolver() + configChangeDetector = mocks.ConfigurationChangeDetector{ConfigurationChanged: true} configBuilder = newMockKongConfigBuilder() lastValidConfigFetcher = &mockKongLastValidConfigFetcher{} originalCache = cacheStoresFromObjs(t) @@ -1725,11 +1439,9 @@ func TestKongClient_LastValidCacheSnapshot(t *testing.T) { } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - testKonnectClient := mustSampleKonnectClient(t) testGatewayClient := mustSampleGatewayClient(t) clientsProvider := &mockGatewayClientsProvider{ gatewayClients: []*adminapi.Client{testGatewayClient}, - konnectClient: testKonnectClient, } kongClient, err := NewKongClient( @@ -1749,6 +1461,7 @@ func TestKongClient_LastValidCacheSnapshot(t *testing.T) { configBuilder, &originalCache, fallbackConfigGenerator, + mocks.MetricsRecorder{}, ) require.NoError(t, err) @@ -1783,10 +1496,9 @@ func TestKongClient_ConfigDumpSanitization(t *testing.T) { gatewayClients: []*adminapi.Client{ mustSampleGatewayClient(t), }, - konnectClient: mustSampleKonnectClient(t), } - updateStrategyResolver := newMockUpdateStrategyResolver() - configChangeDetector := mockConfigurationChangeDetector{hasConfigurationChanged: true} + updateStrategyResolver := mocks.NewUpdateStrategyResolver() + configChangeDetector := mocks.ConfigurationChangeDetector{ConfigurationChanged: true} configBuilder := newMockKongConfigBuilder() kongRawStateGetter := &mockKongLastValidConfigFetcher{} @@ -1846,7 +1558,7 @@ func TestKongClient_ConfigDumpSanitization(t *testing.T) { func TestKongClient_RecoveringFromGatewaySyncError(t *testing.T) { ctx := context.Background() - configChangeDetector := mockConfigurationChangeDetector{hasConfigurationChanged: true} + configChangeDetector := mocks.ConfigurationChangeDetector{ConfigurationChanged: true} fallbackConfigGenerator := newMockFallbackConfigGenerator() originalCache := cacheStoresFromObjs(t) @@ -1919,11 +1631,11 @@ func TestKongClient_RecoveringFromGatewaySyncError(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { t.Logf("Preparing %d gateway clients", len(tc.errorsFromGateways)) - updateStrategyResolver := newMockUpdateStrategyResolver() + updateStrategyResolver := mocks.NewUpdateStrategyResolver() gwClients := make([]*adminapi.Client, len(tc.errorsFromGateways)) for i := range gwClients { gwClients[i] = mustSampleGatewayClient(t) - updateStrategyResolver.returnSpecificErrorOnUpdate(gwClients[i].BaseRootURL(), tc.errorsFromGateways[i]) + updateStrategyResolver.ReturnSpecificErrorOnUpdate(gwClients[i].BaseRootURL(), tc.errorsFromGateways[i]) } clientsProvider := &mockGatewayClientsProvider{ gatewayClients: gwClients, @@ -1971,6 +1683,7 @@ func TestKongClient_RecoveringFromGatewaySyncError(t *testing.T) { configBuilder, &originalCache, fallbackConfigGenerator, + mocks.MetricsRecorder{}, ) require.NoError(t, err) @@ -1985,7 +1698,7 @@ func TestKongClient_RecoveringFromGatewaySyncError(t *testing.T) { expectedUpdatedURLs = slices.Concat(expectedUpdatedURLs, expectedUpdatedURLs) } t.Logf("Ensuring that the update strategy was called %d times", len(expectedUpdatedURLs)) - updateStrategyResolver.assertUpdateCalledForURLs(t, expectedUpdatedURLs) + updateStrategyResolver.AssertUpdateCalledForURLs(t, expectedUpdatedURLs) expectedContent := func(consumerUsername string) *file.Content { return &file.Content{ @@ -1999,7 +1712,7 @@ func TestKongClient_RecoveringFromGatewaySyncError(t *testing.T) { }, } } - receivedContent, ok := updateStrategyResolver.lastUpdatedContentForURL(expectedUpdatedURLs[0]) + receivedContent, ok := updateStrategyResolver.LastUpdatedContentForURL(expectedUpdatedURLs[0]) require.True(t, ok) if tc.expectRecoveryByApplyingLastValidConfig { t.Log("Verifying that the last valid config was applied") diff --git a/internal/dataplane/kongstate/consumer_test.go b/internal/dataplane/kongstate/consumer_test.go index aecf3e04a6..44add182a9 100644 --- a/internal/dataplane/kongstate/consumer_test.go +++ b/internal/dataplane/kongstate/consumer_test.go @@ -7,8 +7,6 @@ import ( "github.com/stretchr/testify/assert" kongv1 "github.com/kong/kubernetes-configuration/api/configuration/v1" - - "github.com/kong/kubernetes-ingress-controller/v3/test/mocks" ) func int64Ptr(i int64) *int64 { @@ -85,17 +83,17 @@ func TestConsumer_SanitizedCopy(t *testing.T) { }, HMACAuths: []*HMACAuth{ { - HMACAuth: kong.HMACAuth{ID: kong.String("1"), Secret: redactedString}, + HMACAuth: kong.HMACAuth{ID: kong.String("1"), Secret: RedactedString}, }, }, JWTAuths: []*JWTAuth{ { - JWTAuth: kong.JWTAuth{ID: kong.String("1"), Secret: redactedString}, + JWTAuth: kong.JWTAuth{ID: kong.String("1"), Secret: RedactedString}, }, }, BasicAuths: []*BasicAuth{ { - BasicAuth: kong.BasicAuth{ID: kong.String("1"), Password: redactedString}, + BasicAuth: kong.BasicAuth{ID: kong.String("1"), Password: RedactedString}, }, }, ACLGroups: []*ACLGroup{ @@ -105,7 +103,7 @@ func TestConsumer_SanitizedCopy(t *testing.T) { }, Oauth2Creds: []*Oauth2Credential{ { - Oauth2Credential: kong.Oauth2Credential{ID: kong.String("1"), ClientSecret: redactedString}, + Oauth2Credential: kong.Oauth2Credential{ID: kong.String("1"), ClientSecret: RedactedString}, }, }, MTLSAuths: []*MTLSAuth{ @@ -118,7 +116,7 @@ func TestConsumer_SanitizedCopy(t *testing.T) { }, } { t.Run(tt.name, func(t *testing.T) { - got := tt.in.SanitizedCopy(mocks.StaticUUIDGenerator{UUID: "52fdfc07-2182-454f-963f-5f0f9a621d72"}) + got := tt.in.SanitizedCopy(StaticUUIDGenerator{UUID: "52fdfc07-2182-454f-963f-5f0f9a621d72"}) assert.Equal(t, tt.want, got) }) } diff --git a/internal/dataplane/kongstate/credentials.go b/internal/dataplane/kongstate/credentials.go index 04ca1543a1..2c234abde0 100644 --- a/internal/dataplane/kongstate/credentials.go +++ b/internal/dataplane/kongstate/credentials.go @@ -9,10 +9,10 @@ import ( "github.com/kong/kubernetes-ingress-controller/v3/internal/util" ) -// redactedString is used to redact sensitive values in the KongState. +// RedactedString is used to redact sensitive values in the KongState. // It uses a vault URI to pass Konnect Admin API validations (e.g. when a TLS key is expected, it's only possible // to pass a valid key or a vault URI). -var redactedString = kong.String("{vault://redacted-value}") +var RedactedString = kong.String("{vault://redacted-value}") // randRedactedString is used to redact sensitive values in the KongState when the value must be random to avoid // collisions. @@ -181,7 +181,7 @@ func (c *HMACAuth) SanitizedCopy() *HMACAuth { CreatedAt: c.CreatedAt, ID: c.ID, Username: c.Username, - Secret: redactedString, + Secret: RedactedString, Tags: c.Tags, }, } @@ -197,7 +197,7 @@ func (c *JWTAuth) SanitizedCopy() *JWTAuth { Algorithm: c.Algorithm, Key: c.Key, // despite field name, "key" is an identifier RSAPublicKey: c.RSAPublicKey, - Secret: redactedString, + Secret: RedactedString, Tags: c.Tags, }, } @@ -211,7 +211,7 @@ func (c *BasicAuth) SanitizedCopy() *BasicAuth { CreatedAt: c.CreatedAt, ID: c.ID, Username: c.Username, - Password: redactedString, + Password: RedactedString, Tags: c.Tags, }, } @@ -226,7 +226,7 @@ func (c *Oauth2Credential) SanitizedCopy() *Oauth2Credential { ID: c.ID, Name: c.Name, ClientID: c.ClientID, - ClientSecret: redactedString, + ClientSecret: RedactedString, RedirectURIs: c.RedirectURIs, Tags: c.Tags, }, diff --git a/internal/dataplane/kongstate/credentials_test.go b/internal/dataplane/kongstate/credentials_test.go index 5b6d08e419..54d5ba6ed6 100644 --- a/internal/dataplane/kongstate/credentials_test.go +++ b/internal/dataplane/kongstate/credentials_test.go @@ -1,4 +1,4 @@ -package kongstate +package kongstate_test import ( "testing" @@ -6,18 +6,18 @@ import ( "github.com/kong/go-kong/kong" "github.com/stretchr/testify/assert" - "github.com/kong/kubernetes-ingress-controller/v3/test/mocks" + "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/kongstate" ) func TestKeyAuth_SanitizedCopy(t *testing.T) { for _, tt := range []struct { name string - in KeyAuth - want KeyAuth + in kongstate.KeyAuth + want kongstate.KeyAuth }{ { name: "fills all fields but Consumer and sanitizes key", - in: KeyAuth{ + in: kongstate.KeyAuth{ KeyAuth: kong.KeyAuth{ Consumer: &kong.Consumer{Username: kong.String("foo")}, CreatedAt: kong.Int(1), @@ -26,7 +26,7 @@ func TestKeyAuth_SanitizedCopy(t *testing.T) { Tags: []*string{kong.String("4.1"), kong.String("4.2")}, }, }, - want: KeyAuth{ + want: kongstate.KeyAuth{ KeyAuth: kong.KeyAuth{ CreatedAt: kong.Int(1), ID: kong.String("2"), @@ -37,7 +37,7 @@ func TestKeyAuth_SanitizedCopy(t *testing.T) { }, } { t.Run(tt.name, func(t *testing.T) { - got := *tt.in.SanitizedCopy(mocks.StaticUUIDGenerator{UUID: "52fdfc07-2182-454f-963f-5f0f9a621d72"}) + got := *tt.in.SanitizedCopy(kongstate.StaticUUIDGenerator{UUID: "52fdfc07-2182-454f-963f-5f0f9a621d72"}) assert.Equal(t, tt.want, got) }) } @@ -46,12 +46,12 @@ func TestKeyAuth_SanitizedCopy(t *testing.T) { func TestHMACAuth_SanitizedCopy(t *testing.T) { for _, tt := range []struct { name string - in HMACAuth - want HMACAuth + in kongstate.HMACAuth + want kongstate.HMACAuth }{ { name: "fills all fields but Consumer and sanitizes secret", - in: HMACAuth{ + in: kongstate.HMACAuth{ HMACAuth: kong.HMACAuth{ Consumer: &kong.Consumer{Username: kong.String("foo")}, CreatedAt: kong.Int(1), @@ -61,12 +61,12 @@ func TestHMACAuth_SanitizedCopy(t *testing.T) { Tags: []*string{kong.String("5.1"), kong.String("5.2")}, }, }, - want: HMACAuth{ + want: kongstate.HMACAuth{ HMACAuth: kong.HMACAuth{ CreatedAt: kong.Int(1), ID: kong.String("2"), Username: kong.String("3"), - Secret: redactedString, + Secret: kongstate.RedactedString, Tags: []*string{kong.String("5.1"), kong.String("5.2")}, }, }, @@ -82,12 +82,12 @@ func TestHMACAuth_SanitizedCopy(t *testing.T) { func TestJWTAuth_SanitizedCopy(t *testing.T) { for _, tt := range []struct { name string - in JWTAuth - want JWTAuth + in kongstate.JWTAuth + want kongstate.JWTAuth }{ { name: "fills all fields but Consumer and sanitizes secret", - in: JWTAuth{ + in: kongstate.JWTAuth{ JWTAuth: kong.JWTAuth{ Consumer: &kong.Consumer{Username: kong.String("foo")}, CreatedAt: kong.Int(1), @@ -99,14 +99,14 @@ func TestJWTAuth_SanitizedCopy(t *testing.T) { Tags: []*string{kong.String("7.1"), kong.String("7.2")}, }, }, - want: JWTAuth{ + want: kongstate.JWTAuth{ JWTAuth: kong.JWTAuth{ CreatedAt: kong.Int(1), ID: kong.String("2"), Algorithm: kong.String("3"), Key: kong.String("4"), RSAPublicKey: kong.String("5"), - Secret: redactedString, + Secret: kongstate.RedactedString, Tags: []*string{kong.String("7.1"), kong.String("7.2")}, }, }, @@ -122,12 +122,12 @@ func TestJWTAuth_SanitizedCopy(t *testing.T) { func TestBasicAuth_SanitizedCopy(t *testing.T) { for _, tt := range []struct { name string - in BasicAuth - want BasicAuth + in kongstate.BasicAuth + want kongstate.BasicAuth }{ { name: "fills all fields but Consumer and sanitizes password", - in: BasicAuth{ + in: kongstate.BasicAuth{ BasicAuth: kong.BasicAuth{ Consumer: &kong.Consumer{Username: kong.String("foo")}, CreatedAt: kong.Int(1), @@ -137,12 +137,12 @@ func TestBasicAuth_SanitizedCopy(t *testing.T) { Tags: []*string{kong.String("5.1"), kong.String("5.2")}, }, }, - want: BasicAuth{ + want: kongstate.BasicAuth{ BasicAuth: kong.BasicAuth{ CreatedAt: kong.Int(1), ID: kong.String("2"), Username: kong.String("3"), - Password: redactedString, + Password: kongstate.RedactedString, Tags: []*string{kong.String("5.1"), kong.String("5.2")}, }, }, @@ -158,12 +158,12 @@ func TestBasicAuth_SanitizedCopy(t *testing.T) { func TestOauth2Credential_SanitizedCopy(t *testing.T) { for _, tt := range []struct { name string - in Oauth2Credential - want Oauth2Credential + in kongstate.Oauth2Credential + want kongstate.Oauth2Credential }{ { name: "fills all fields but Consumer and sanitizes client secret", - in: Oauth2Credential{ + in: kongstate.Oauth2Credential{ Oauth2Credential: kong.Oauth2Credential{ Consumer: &kong.Consumer{Username: kong.String("foo")}, CreatedAt: kong.Int(1), @@ -175,13 +175,13 @@ func TestOauth2Credential_SanitizedCopy(t *testing.T) { Tags: []*string{kong.String("7.1"), kong.String("7.2")}, }, }, - want: Oauth2Credential{ + want: kongstate.Oauth2Credential{ Oauth2Credential: kong.Oauth2Credential{ CreatedAt: kong.Int(1), ID: kong.String("2"), Name: kong.String("3"), ClientID: kong.String("4"), - ClientSecret: redactedString, + ClientSecret: kongstate.RedactedString, RedirectURIs: []*string{kong.String("6.1"), kong.String("6.2")}, Tags: []*string{kong.String("7.1"), kong.String("7.2")}, }, diff --git a/internal/dataplane/kongstate/kongstate_test.go b/internal/dataplane/kongstate/kongstate_test.go index f7b0991213..d67140a3c7 100644 --- a/internal/dataplane/kongstate/kongstate_test.go +++ b/internal/dataplane/kongstate/kongstate_test.go @@ -36,7 +36,6 @@ import ( "github.com/kong/kubernetes-ingress-controller/v3/internal/labels" "github.com/kong/kubernetes-ingress-controller/v3/internal/store" "github.com/kong/kubernetes-ingress-controller/v3/internal/util" - "github.com/kong/kubernetes-ingress-controller/v3/test/mocks" ) var kongConsumerTypeMeta = metav1.TypeMeta{ @@ -105,7 +104,7 @@ func TestKongState_SanitizedCopy(t *testing.T) { want: KongState{ Services: []Service{{Service: kong.Service{ID: kong.String("1")}}}, Upstreams: []Upstream{{Upstream: kong.Upstream{ID: kong.String("1")}}}, - Certificates: []Certificate{{Certificate: kong.Certificate{ID: kong.String("1"), Key: redactedString}}}, + Certificates: []Certificate{{Certificate: kong.Certificate{ID: kong.String("1"), Key: RedactedString}}}, CACertificates: []kong.CACertificate{{ID: kong.String("1")}}, Plugins: []Plugin{{Plugin: kong.Plugin{ID: kong.String("1"), Config: map[string]interface{}{"key": "secret"}}}}, // We don't redact plugins' config. Consumers: []Consumer{ @@ -117,7 +116,7 @@ func TestKongState_SanitizedCopy(t *testing.T) { }, }, }, - Licenses: []License{{kong.License{ID: kong.String("1"), Payload: redactedString}}}, + Licenses: []License{{kong.License{ID: kong.String("1"), Payload: RedactedString}}}, ConsumerGroups: []ConsumerGroup{{ ConsumerGroup: kong.ConsumerGroup{ID: kong.String("1"), Name: kong.String("consumer-group")}, }}, @@ -152,7 +151,7 @@ func TestKongState_SanitizedCopy(t *testing.T) { } { t.Run(tt.name, func(t *testing.T) { testedFields.Insert(extractNotEmptyFieldNames(tt.in)...) - got := *tt.in.SanitizedCopy(mocks.StaticUUIDGenerator{UUID: "52fdfc07-2182-454f-963f-5f0f9a621d72"}) + got := *tt.in.SanitizedCopy(StaticUUIDGenerator{UUID: "52fdfc07-2182-454f-963f-5f0f9a621d72"}) assert.Equal(t, tt.want, got) }) } @@ -195,7 +194,7 @@ func BenchmarkSanitizedCopy(b *testing.B) { } b.ResetTimer() for i := 0; i < b.N; i++ { - ret := ks.SanitizedCopy(mocks.StaticUUIDGenerator{UUID: "52fdfc07-2182-454f-963f-5f0f9a621d72"}) + ret := ks.SanitizedCopy(StaticUUIDGenerator{UUID: "52fdfc07-2182-454f-963f-5f0f9a621d72"}) _ = ret } } diff --git a/internal/dataplane/kongstate/license.go b/internal/dataplane/kongstate/license.go index 52b064da33..3ed30cc4af 100644 --- a/internal/dataplane/kongstate/license.go +++ b/internal/dataplane/kongstate/license.go @@ -14,7 +14,7 @@ func (l License) SanitizedCopy() License { return License{ License: kong.License{ ID: l.ID, - Payload: redactedString, + Payload: RedactedString, CreatedAt: l.CreatedAt, UpdatedAt: l.UpdatedAt, }, diff --git a/internal/dataplane/kongstate/types.go b/internal/dataplane/kongstate/types.go index 9c47856b80..5b7a397ba4 100644 --- a/internal/dataplane/kongstate/types.go +++ b/internal/dataplane/kongstate/types.go @@ -59,7 +59,7 @@ func (c *Certificate) SanitizedCopy() Certificate { kong.Certificate{ ID: c.ID, Cert: c.Cert, - Key: redactedString, + Key: RedactedString, CreatedAt: c.CreatedAt, SNIs: c.SNIs, Tags: c.Tags, diff --git a/internal/dataplane/kongstate/types_test.go b/internal/dataplane/kongstate/types_test.go index bf74b9ba54..8a99454c2c 100644 --- a/internal/dataplane/kongstate/types_test.go +++ b/internal/dataplane/kongstate/types_test.go @@ -26,7 +26,7 @@ func TestCertificate_SanitizedCopy(t *testing.T) { want: Certificate{kong.Certificate{ ID: kong.String("1"), Cert: kong.String("2"), - Key: redactedString, + Key: RedactedString, CreatedAt: int64Ptr(4), SNIs: []*string{kong.String("5.1"), kong.String("5.2")}, Tags: []*string{kong.String("6.1"), kong.String("6.2")}, diff --git a/internal/dataplane/kongstate/uuid.go b/internal/dataplane/kongstate/uuid.go new file mode 100644 index 0000000000..cce56a0dcb --- /dev/null +++ b/internal/dataplane/kongstate/uuid.go @@ -0,0 +1,10 @@ +package kongstate + +// StaticUUIDGenerator is a UUIDGenerator that always returns the same UUID. It is used for testing. +type StaticUUIDGenerator struct { + UUID string +} + +func (s StaticUUIDGenerator) NewString() string { + return s.UUID +} diff --git a/internal/dataplane/sendconfig/sendconfig.go b/internal/dataplane/sendconfig/sendconfig.go index 9ea909f66c..224e5886f4 100644 --- a/internal/dataplane/sendconfig/sendconfig.go +++ b/internal/dataplane/sendconfig/sendconfig.go @@ -46,7 +46,7 @@ func PerformUpdate( config Config, targetContent *file.Content, customEntities CustomEntitiesByType, - promMetrics *metrics.CtrlFuncMetrics, + promMetrics metrics.Recorder, updateStrategyResolver UpdateStrategyResolver, configChangeDetector ConfigurationChangeDetector, diagnostic *diagnostics.ClientDiagnostic, diff --git a/internal/dataplane/sendconfig/strategy.go b/internal/dataplane/sendconfig/strategy.go index cfaca6addf..1caf7b3709 100644 --- a/internal/dataplane/sendconfig/strategy.go +++ b/internal/dataplane/sendconfig/strategy.go @@ -98,6 +98,7 @@ func (r DefaultUpdateStrategyResolver) resolveUpdateStrategy( ) UpdateStrategy { adminAPIClient := client.AdminAPIClient() + // TODO(czeslavo): move it to Konnect synchronizer // In case the client communicates with Konnect Admin API, we know it has to use DB-mode. There's no need to check // config.InMemory that is meant for regular Kong Gateway clients. if client.IsKonnect() { diff --git a/internal/konnect/config_synchronizer.go b/internal/konnect/config_synchronizer.go index 10e35ab615..34673741fc 100644 --- a/internal/konnect/config_synchronizer.go +++ b/internal/konnect/config_synchronizer.go @@ -14,8 +14,12 @@ import ( "github.com/kong/kubernetes-ingress-controller/v3/internal/adminapi" "github.com/kong/kubernetes-ingress-controller/v3/internal/clients" "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/deckerrors" + "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/deckgen" + "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/kongstate" "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/sendconfig" + "github.com/kong/kubernetes-ingress-controller/v3/internal/logging" "github.com/kong/kubernetes-ingress-controller/v3/internal/metrics" + "github.com/kong/kubernetes-ingress-controller/v3/internal/util" ) const ( @@ -25,106 +29,170 @@ const ( DefaultConfigUploadPeriod = 30 * time.Second ) +type ClientFactory interface { + NewKonnectClient(ctx context.Context) (*adminapi.KonnectClient, error) +} + // ConfigSynchronizer runs a loop to upload the traslated Kong configuration to Konnect in the given period. type ConfigSynchronizer struct { - logger logr.Logger - syncTicker *time.Ticker - kongConfig sendconfig.Config - clientsProvider clients.AdminAPIClientsProvider - prometheusMetrics *metrics.CtrlFuncMetrics + logger logr.Logger + kongConfig sendconfig.Config + konnectClientFactory ClientFactory + + prometheusMetrics metrics.Recorder updateStrategyResolver sendconfig.UpdateStrategyResolver configChangeDetector sendconfig.ConfigurationChangeDetector configStatusNotifier clients.ConfigStatusNotifier - targetContent *file.Content + konnectAdminClient *adminapi.KonnectClient + syncTicker *time.Ticker + + // targetConfig is the latest configuration to be uploaded to Konnect. + targetConfig targetConfig + // configLock is used to prevent data + configLock sync.RWMutex +} + +type targetConfig struct { + // Content the configuration to be uploaded to Konnect. It represents the latest state of the configuration + // received from the KongClient. + Content *file.Content - lock sync.RWMutex + // IsFallback indicates whether the configuration is a fallback configuration. + IsFallback bool } func NewConfigSynchronizer( logger logr.Logger, kongConfig sendconfig.Config, configUploadPeriod time.Duration, - clientsProvider clients.AdminAPIClientsProvider, + konnectClientFactory ClientFactory, updateStrategyResolver sendconfig.UpdateStrategyResolver, configChangeDetector sendconfig.ConfigurationChangeDetector, configStatusNotifier clients.ConfigStatusNotifier, + prometheusMetrics metrics.Recorder, ) *ConfigSynchronizer { return &ConfigSynchronizer{ logger: logger, - syncTicker: time.NewTicker(configUploadPeriod), kongConfig: kongConfig, - clientsProvider: clientsProvider, - prometheusMetrics: metrics.NewCtrlFuncMetrics(), + syncTicker: time.NewTicker(configUploadPeriod), + konnectClientFactory: konnectClientFactory, updateStrategyResolver: updateStrategyResolver, configChangeDetector: configChangeDetector, configStatusNotifier: configStatusNotifier, + prometheusMetrics: prometheusMetrics, } } -var _ manager.Runnable = &ConfigSynchronizer{} +var _ manager.LeaderElectionRunnable = &ConfigSynchronizer{} // Start starts the loop to receive configuration and uplaod configuration to Konnect. func (s *ConfigSynchronizer) Start(ctx context.Context) error { - s.logger.Info("Started Konnect configuration synchronizer") - go s.runKonnectUpdateServer(ctx) + s.logger.Info("Starting Konnect configuration synchronizer") + + konnectAdminClient, err := s.konnectClientFactory.NewKonnectClient(ctx) + if err != nil { + s.logger.Error(err, "Failed to create Konnect client, skipping Konnect configuration synchronization") + + // We failed to set up Konnect client. We cannot proceed with running the synchronizer. + // As it's a manager runnable, we'll wait for the context to be done and return only then to not hijack the + // manager's start process. + <-ctx.Done() + return ctx.Err() + } + + // Set the Konnect client to be used to upload configuration and start the synchronizer main loop. + s.konnectAdminClient = konnectAdminClient + s.logger.Info("Konnect client initialized, starting Konnect configuration synchronization") + s.run(ctx) + return nil } -// SetTargetContent stores the latest configuration in `file.Content` format. -// REVIEW: should we use channel to receive the configuration? -func (s *ConfigSynchronizer) SetTargetContent(targetContent *file.Content) { - s.lock.Lock() - defer s.lock.Unlock() - s.targetContent = targetContent +// NeedLeaderElection returns true to indicate that this runnable requires leader election. +// This is required to ensure that only one instance of the synchronizer is running at a time. +func (s *ConfigSynchronizer) NeedLeaderElection() bool { + return true } -// GetTargetContentCopy returns a copy of the latest configuration in `file.Content` format -// to prevent data race and long duration of occupying lock. -func (s *ConfigSynchronizer) GetTargetContentCopy() *file.Content { - s.lock.RLock() - defer s.lock.RUnlock() - return s.targetContent.DeepCopy() +func (s *ConfigSynchronizer) UpdateKongState( + ctx context.Context, + ks *kongstate.KongState, + isFallbackConfig bool, +) { + go func() { + if s.konnectAdminClient == nil { + s.logger.Info("Konnect client not initialized yet, skipping Kong state update") + return + } + + if s.kongConfig.SanitizeKonnectConfigDumps { + ks = ks.SanitizedCopy(util.DefaultUUIDGenerator{}) + } + + deckGenParams := deckgen.GenerateDeckContentParams{ + SelectorTags: s.kongConfig.FilterTags, + ExpressionRoutes: s.kongConfig.ExpressionRoutes, + PluginSchemas: s.konnectAdminClient.PluginSchemaStore(), + AppendStubEntityWhenConfigEmpty: false, + } + targetContent := deckgen.ToDeckContent(ctx, s.logger, ks, deckGenParams) + + s.configLock.Lock() + defer s.configLock.Unlock() + s.targetConfig = targetConfig{ + Content: targetContent, + IsFallback: isFallbackConfig, + } + }() } -// runKonnectUpdateServer starts the loop to receive configuration and send configuration to Konenct. -func (s *ConfigSynchronizer) runKonnectUpdateServer(ctx context.Context) { +// run starts the loop to receive configuration and send configuration to Konnect. +func (s *ConfigSynchronizer) run(ctx context.Context) { for { select { case <-ctx.Done(): - s.logger.Info("Context done: shutting down the Konnect update server") + s.logger.Info("Context done: shutting down the Konnect configuration synchronizer") s.syncTicker.Stop() case <-s.syncTicker.C: - s.logger.Info("Start uploading to Konnect") - client := s.clientsProvider.KonnectClient() - if client == nil { - s.logger.Info("Konnect client not ready, skipping") - continue - } - // Copy target content to upload here because uploading full configuration to Konnect may cost too much time. - targetContent := s.GetTargetContentCopy() - if targetContent == nil { - s.logger.Info("No target content received, skipping") - continue - } - err := s.uploadConfig(ctx, client, targetContent) - if err != nil { - s.logger.Error(err, "failed to upload configuration to Konnect") - logKonnectErrors(s.logger, err) - } - s.configStatusNotifier.NotifyKonnectConfigStatus(ctx, clients.KonnectConfigUploadStatus{ - Failed: err != nil, - }) + s.handleConfigSynchronizationTick(ctx) } } } +func (s *ConfigSynchronizer) handleConfigSynchronizationTick(ctx context.Context) { + s.logger.V(logging.DebugLevel).Info("Start uploading configuration to Konnect") + + // Get the latest configuration copy to upload to Konnect. We don't want to hold the lock for a long time to prevent + // blocking the update of the configuration thus copying the configuration. + targetCfg := s.getTargetConfigCopy() + if targetCfg.Content == nil { + s.logger.Info("No configuration received yet, skipping Konnect configuration synchronization") + return + } + + // Upload the configuration to Konnect. + err := s.uploadConfig(ctx, s.konnectAdminClient, targetCfg) + if err != nil { + s.logger.Error(err, "Failed to upload configuration to Konnect") + logKonnectErrors(s.logger, err) + } + + // Notify the status of the configuration upload to the system reporting it. + s.configStatusNotifier.NotifyKonnectConfigStatus(ctx, clients.KonnectConfigUploadStatus{ + Failed: err != nil, + }) +} + // uploadConfig sends the given configuration to Konnect. -func (s *ConfigSynchronizer) uploadConfig(ctx context.Context, client *adminapi.KonnectClient, targetContent *file.Content) error { - const isFallback = false +func (s *ConfigSynchronizer) uploadConfig( + ctx context.Context, + client *adminapi.KonnectClient, + targetCfg targetConfig, +) error { // Remove consumers in target content if consumer sync is disabled. if client.ConsumersSyncDisabled() { - targetContent.Consumers = []file.FConsumer{} + targetCfg.Content.Consumers = []file.FConsumer{} } newSHA, err := sendconfig.PerformUpdate( @@ -132,14 +200,14 @@ func (s *ConfigSynchronizer) uploadConfig(ctx context.Context, client *adminapi. s.logger, client, s.kongConfig, - targetContent, + targetCfg.Content, // Konnect client does not upload custom entities. sendconfig.CustomEntitiesByType{}, s.prometheusMetrics, s.updateStrategyResolver, s.configChangeDetector, nil, - isFallback, + targetCfg.IsFallback, ) if err != nil { return err @@ -148,10 +216,18 @@ func (s *ConfigSynchronizer) uploadConfig(ctx context.Context, client *adminapi. return nil } +// getTargetConfigCopy returns a copy of the latest configuration in `file.Content` format +// to prevent data race and long duration of occupying lock. +func (s *ConfigSynchronizer) getTargetConfigCopy() targetConfig { + s.configLock.RLock() + defer s.configLock.RUnlock() + return targetConfig{ + s.targetConfig.Content.DeepCopy(), + s.targetConfig.IsFallback, + } +} + // logKonnectErrors logs details of each error response returned from Konnect API. -// TODO: This is copied from internal/dataplane package. -// Remove the definition in dataplane package after using separate loop to upload config to Konnect: -// https://github.com/Kong/kubernetes-ingress-controller/issues/6338 func logKonnectErrors(logger logr.Logger, err error) { if crudActionErrors := deckerrors.ExtractCRUDActionErrors(err); len(crudActionErrors) > 0 { for _, actionErr := range crudActionErrors { diff --git a/internal/konnect/config_synchronizer_test.go b/internal/konnect/config_synchronizer_test.go index 691d9c0a0b..cca1ae24d8 100644 --- a/internal/konnect/config_synchronizer_test.go +++ b/internal/konnect/config_synchronizer_test.go @@ -1,170 +1,26 @@ package konnect import ( - "bytes" "context" "fmt" "net/http" - "sync" "testing" "time" - "github.com/go-logr/logr" + "github.com/go-logr/logr/testr" "github.com/google/uuid" "github.com/kong/go-database-reconciler/pkg/file" "github.com/kong/go-kong/kong" - "github.com/samber/mo" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/kong/kubernetes-ingress-controller/v3/internal/adminapi" "github.com/kong/kubernetes-ingress-controller/v3/internal/clients" - dpconf "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/config" + "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/kongstate" "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/sendconfig" - "github.com/kong/kubernetes-ingress-controller/v3/internal/diagnostics" - "github.com/kong/kubernetes-ingress-controller/v3/internal/metrics" + "github.com/kong/kubernetes-ingress-controller/v3/test/mocks" ) -func TestConfigSynchronizer_GetTargetContentCopy(t *testing.T) { - content := &file.Content{ - FormatVersion: "3.0", - Services: []file.FService{ - { - Service: kong.Service{ - Name: kong.String("service1"), - Host: kong.String("example.com"), - }, - Routes: []*file.FRoute{ - { - Route: kong.Route{ - Name: kong.String("route1"), - Expression: kong.String("http.path == \"/foo\""), - }, - }, - }, - }, - }, - } - - s := &ConfigSynchronizer{} - s.SetTargetContent(content) - copiedContent := s.GetTargetContentCopy() - require.Equal(t, content, copiedContent, "Copied content should have values with same fields with original content") - require.NotSame(t, content, copiedContent, "Copied content should not point to the same object with the original content") -} - -// ---------------------------------------------------------------------------- -// Mocks: Mock interfaces to run sendconfig.PerformUpdate in tests. -// TODO: These are (mostly) copied from internal/dataplane package, but there are also differences because we do not need so man features for the tests here. -// Should we extract the mock interfaces for sendconfig.PerformUpdate to common packages? -// ---------------------------------------------------------------------------- - -// mockGatewayClientsProvider is a mock implementation of dataplane.AdminAPIClientsProvider. -type mockGatewayClientsProvider struct { - gatewayClients []*adminapi.Client - konnectClient *adminapi.KonnectClient - dbMode dpconf.DBMode -} - -func (p *mockGatewayClientsProvider) KonnectClient() *adminapi.KonnectClient { - return p.konnectClient -} - -func (p *mockGatewayClientsProvider) GatewayClients() []*adminapi.Client { - return p.gatewayClients -} - -func (p *mockGatewayClientsProvider) GatewayClientsToConfigure() []*adminapi.Client { - if p.dbMode.IsDBLessMode() { - return p.gatewayClients - } - if len(p.gatewayClients) == 0 { - return []*adminapi.Client{} - } - return p.gatewayClients[:1] -} - -// mockUpdateStrategy is a mock implementation of sendconfig.UpdateStrategyResolver. -type mockUpdateStrategyResolver struct { - updateCalledForURLs []string - lastUpdatedContentForURLs map[string]sendconfig.ContentWithHash - lock sync.RWMutex -} - -func newMockUpdateStrategyResolver() *mockUpdateStrategyResolver { - return &mockUpdateStrategyResolver{ - lastUpdatedContentForURLs: map[string]sendconfig.ContentWithHash{}, - } -} - -func (f *mockUpdateStrategyResolver) ResolveUpdateStrategy( - c sendconfig.UpdateClient, - _ *diagnostics.ClientDiagnostic, -) sendconfig.UpdateStrategy { - f.lock.Lock() - defer f.lock.Unlock() - - url := c.AdminAPIClient().BaseRootURL() - return &mockUpdateStrategy{onUpdate: f.updateCalledForURLCallback(url)} -} - -// updateCalledForURLCallback returns a function that will be called when the mockUpdateStrategy is called. -// That enables us to track which URLs were called. -func (f *mockUpdateStrategyResolver) updateCalledForURLCallback(url string) func(sendconfig.ContentWithHash) (mo.Option[int], error) { - return func(content sendconfig.ContentWithHash) (mo.Option[int], error) { - f.lock.Lock() - defer f.lock.Unlock() - - f.updateCalledForURLs = append(f.updateCalledForURLs, url) - f.lastUpdatedContentForURLs[url] = content - // Mock returned config size. - return mo.Some(22), nil - } -} - -// getUpdateCalledForURLs returns the called URLs. -func (f *mockUpdateStrategyResolver) getUpdateCalledForURLs() []string { - f.lock.RLock() - defer f.lock.RUnlock() - - urls := make([]string, 0, len(f.updateCalledForURLs)) - urls = append(urls, f.updateCalledForURLs...) - return urls -} - -func (f *mockUpdateStrategyResolver) lastUpdatedContentForURL(url string) (sendconfig.ContentWithHash, bool) { - f.lock.RLock() - defer f.lock.RUnlock() - c, ok := f.lastUpdatedContentForURLs[url] - return c, ok -} - -// mockUpdateStrategy is a mock implementation of sendconfig.UpdateStrategy. -type mockUpdateStrategy struct { - onUpdate func(content sendconfig.ContentWithHash) (mo.Option[int], error) -} - -func (m *mockUpdateStrategy) Update(_ context.Context, targetContent sendconfig.ContentWithHash) (n mo.Option[int], err error) { - return m.onUpdate(targetContent) -} - -func (m *mockUpdateStrategy) MetricsProtocol() metrics.Protocol { - return metrics.ProtocolDBLess -} - -func (m *mockUpdateStrategy) Type() string { - return "Mock" -} - -// mockConfigurationChangeDetector is a mock implementation of sendconfig.ConfigurationChangeDetector. -type mockConfigurationChangeDetector struct{} - -func (m mockConfigurationChangeDetector) HasConfigurationChanged( - _ context.Context, oldSHA []byte, newSHA []byte, _ *file.Content, _ sendconfig.KonnectAwareClient, _ sendconfig.StatusClient, -) (bool, error) { - return !bytes.Equal(oldSHA, newSHA), nil -} - func mustSampleKonnectClient(t *testing.T) *adminapi.KonnectClient { t.Helper() @@ -175,38 +31,36 @@ func mustSampleKonnectClient(t *testing.T) *adminapi.KonnectClient { return adminapi.NewKonnectClient(c, rgID, false) } -// ---------------------------------------------------------------------------- -// End of Mocks -// ---------------------------------------------------------------------------- - func TestConfigSynchronizer_RunKonnectUpdateServer(t *testing.T) { sendConfigPeriod := 10 * time.Millisecond testKonnectClient := mustSampleKonnectClient(t) - resolver := newMockUpdateStrategyResolver() - - s := &ConfigSynchronizer{ - logger: logr.Discard(), - syncTicker: time.NewTicker(sendConfigPeriod), - clientsProvider: &mockGatewayClientsProvider{ - konnectClient: testKonnectClient, - }, - prometheusMetrics: metrics.NewCtrlFuncMetrics(), - updateStrategyResolver: resolver, - configChangeDetector: mockConfigurationChangeDetector{}, - configStatusNotifier: clients.NoOpConfigStatusNotifier{}, - } + resolver := mocks.NewUpdateStrategyResolver() + kongConfig := sendconfig.Config{} + logger := testr.New(t) + s := NewConfigSynchronizer( + logger, + kongConfig, + sendConfigPeriod, + &mocks.KonnectClientFactory{Client: testKonnectClient}, + resolver, + mocks.ConfigurationChangeDetector{}, + clients.NoOpConfigStatusNotifier{}, + mocks.MetricsRecorder{}, + ) ctx, cancel := context.WithCancel(context.Background()) - err := s.Start(ctx) - require.NoError(t, err) + go func() { + err := s.Start(ctx) + require.NoError(t, err) + }() t.Logf("Verifying that no URL are updated when no configuration received") require.Never(t, func() bool { - return len(resolver.getUpdateCalledForURLs()) != 0 + return len(resolver.GetUpdateCalledForURLs()) != 0 }, 10*sendConfigPeriod, sendConfigPeriod, "Should not update any URL when no configuration received") t.Logf("Verifying that the new config updated when received") - content := &file.Content{ + expectedContent := &file.Content{ FormatVersion: "3.0", Services: []file.FService{ { @@ -214,55 +68,50 @@ func TestConfigSynchronizer_RunKonnectUpdateServer(t *testing.T) { Name: kong.String("service1"), Host: kong.String("example.com"), }, - Routes: []*file.FRoute{ - { - Route: kong.Route{ - Name: kong.String("route1"), - Expression: kong.String("http.path == \"/foo\""), - }, - }, + }, + }, + } + kongState := &kongstate.KongState{ + Services: []kongstate.Service{ + { + Service: kong.Service{ + Name: kong.String("service1"), + Host: kong.String("example.com"), }, }, }, } - s.SetTargetContent(content) - require.Eventually(t, func() bool { - urls := resolver.getUpdateCalledForURLs() - if len(urls) != 1 { - return false - } + s.UpdateKongState(ctx, kongState, false) + // TODO(czeslavo): WHY DOES IT FAIL? + require.EventuallyWithT(t, func(t *assert.CollectT) { + urls := resolver.GetUpdateCalledForURLs() + require.Len(t, urls, 1) + url := urls[0] - contentWithHash, ok := resolver.lastUpdatedContentForURL(url) - if !ok { - return false - } - return assert.ObjectsAreEqual(content, contentWithHash.Content) + contentWithHash, ok := resolver.LastUpdatedContentForURL(url) + require.True(t, ok) + assert.ObjectsAreEqual(expectedContent, contentWithHash.Content) }, 10*sendConfigPeriod, sendConfigPeriod, "Should send expected configuration in time after received configuration") t.Logf("Verifying that update is not called when config not changed") - l := len(resolver.getUpdateCalledForURLs()) - s.SetTargetContent(content) + l := len(resolver.GetUpdateCalledForURLs()) + s.UpdateKongState(ctx, kongState, false) require.Never(t, func() bool { - return len(resolver.getUpdateCalledForURLs()) != l + return len(resolver.GetUpdateCalledForURLs()) != l }, 10*sendConfigPeriod, sendConfigPeriod) t.Logf("Verifying that new config are not sent after context cancelled") cancel() <-ctx.Done() - // modify content - newContent := content.DeepCopy() - newContent.Consumers = []file.FConsumer{ - { - Consumer: kong.Consumer{ - Username: kong.String("consumer-1"), - }, - }, - } - s.SetTargetContent(newContent) + // modify kong state + kongState.Services[0].Host = kong.String("example.org") + expectedContent.Services[0].Host = kong.String("example.org") + + s.UpdateKongState(ctx, kongState, false) // The latest updated content should always be the content in the previous update - // because it should not update new content after context cancelled. + // because it should not update new expectedContent after context cancelled. require.Never(t, func() bool { - urls := resolver.getUpdateCalledForURLs() + urls := resolver.GetUpdateCalledForURLs() l := len(urls) if l == 0 { return false @@ -271,10 +120,10 @@ func TestConfigSynchronizer_RunKonnectUpdateServer(t *testing.T) { if url != testKonnectClient.BaseRootURL() { return false } - contentWithHash, ok := resolver.lastUpdatedContentForURL(url) + contentWithHash, ok := resolver.LastUpdatedContentForURL(url) if !ok { return false } - return !(assert.ObjectsAreEqual(content, contentWithHash.Content)) + return assert.ObjectsAreEqual(expectedContent, contentWithHash.Content) }, 10*sendConfigPeriod, sendConfigPeriod, "Should not send new updates after context cancelled") } diff --git a/internal/manager/run.go b/internal/manager/run.go index e882f303b5..b53aa4652b 100644 --- a/internal/manager/run.go +++ b/internal/manager/run.go @@ -38,6 +38,7 @@ import ( "github.com/kong/kubernetes-ingress-controller/v3/internal/manager/metadata" "github.com/kong/kubernetes-ingress-controller/v3/internal/manager/telemetry" "github.com/kong/kubernetes-ingress-controller/v3/internal/manager/utils/kongconfig" + "github.com/kong/kubernetes-ingress-controller/v3/internal/metrics" "github.com/kong/kubernetes-ingress-controller/v3/internal/store" "github.com/kong/kubernetes-ingress-controller/v3/internal/util" "github.com/kong/kubernetes-ingress-controller/v3/internal/util/kubernetes/object/status" @@ -200,6 +201,7 @@ func Run( configurationChangeDetector := sendconfig.NewDefaultConfigurationChangeDetector(logger) kongConfigFetcher := configfetcher.NewDefaultKongLastGoodConfigFetcher(translatorFeatureFlags.FillIDs, c.KongWorkspace) fallbackConfigGenerator := fallback.NewGenerator(fallback.NewDefaultCacheGraphProvider(), logger) + metricsRecoreder := metrics.NewCtrlFuncMetrics() dataplaneClient, err := dataplane.NewKongClient( logger, time.Duration(c.ProxyTimeoutSeconds*float32(time.Second)), @@ -214,6 +216,7 @@ func Run( configTranslator, &cache, fallbackConfigGenerator, + metricsRecoreder, ) if err != nil { return fmt.Errorf("failed to initialize kong data-plane client: %w", err) @@ -276,10 +279,6 @@ func Run( // In case of failures when building Konnect related objects, we're not returning errors as Konnect is not // considered critical feature, and it should not break the basic functionality of the controller. - // Run the Konnect Admin API client initialization in a separate goroutine to not block while ensuring - // connection. - go setupKonnectAdminAPIClientWithClientsMgr(ctx, c.Konnect, clientsManager, setupLog) - // Set channel to send config status. configStatusNotifier := clients.NewChannelConfigNotifier(logger) dataplaneClient.SetConfigStatusNotifier(configStatusNotifier) @@ -287,17 +286,19 @@ func Run( // Setup Konnect config synchronizer. konnectConfigSynchronizer, err := setupKonnectConfigSynchronizer( ctx, + c, mgr, c.Konnect.UploadConfigPeriod, kongConfig, - clientsManager, updateStrategyResolver, configStatusNotifier, + metricsRecoreder, ) if err != nil { setupLog.Error(err, "Failed to setup Konnect configuration synchronizer with manager, skipping") + } else { + dataplaneClient.SetKonnectKongStateUpdater(konnectConfigSynchronizer) } - dataplaneClient.SetKonnectConfigSynchronizer(konnectConfigSynchronizer) // Setup Konnect NodeAgent with manager. if err := setupKonnectNodeAgentWithMgr( @@ -447,28 +448,6 @@ func setupKonnectNodeAgentWithMgr( return nil } -// setupKonnectAdminAPIClientWithClientsMgr initializes Konnect Admin API client and sets it to clientsManager. -// If it fails to initialize the client, it logs the error and returns. -func setupKonnectAdminAPIClientWithClientsMgr( - ctx context.Context, - config adminapi.KonnectConfig, - clientsManager *clients.AdminAPIClientsManager, - logger logr.Logger, -) { - konnectAdminAPIClient, err := adminapi.NewKongClientForKonnectControlPlane(config) - if err != nil { - logger.Error(err, "Failed creating Konnect Control Plane Admin API client, skipping synchronisation") - return - } - if err := adminapi.EnsureKonnectConnection(ctx, konnectAdminAPIClient.AdminAPIClient(), logger); err != nil { - logger.Error(err, "Failed to ensure connection to Konnect Admin API, skipping synchronisation") - return - } - - clientsManager.SetKonnectClient(konnectAdminAPIClient) - logger.Info("Initialized Konnect Admin API client") -} - type IsReady interface { IsReady() bool } diff --git a/internal/manager/setup.go b/internal/manager/setup.go index b2b022002e..da88057ef4 100644 --- a/internal/manager/setup.go +++ b/internal/manager/setup.go @@ -39,6 +39,7 @@ import ( "github.com/kong/kubernetes-ingress-controller/v3/internal/license" "github.com/kong/kubernetes-ingress-controller/v3/internal/logging" "github.com/kong/kubernetes-ingress-controller/v3/internal/manager/scheme" + "github.com/kong/kubernetes-ingress-controller/v3/internal/metrics" "github.com/kong/kubernetes-ingress-controller/v3/internal/store" "github.com/kong/kubernetes-ingress-controller/v3/internal/util/kubernetes/object/status" ) @@ -489,22 +490,25 @@ func setupLicenseGetter( // setupKonnectConfigSynchronizer sets up Konnect config sychronizer and adds it to the manager runnables. func setupKonnectConfigSynchronizer( ctx context.Context, + cfg *Config, mgr manager.Manager, configUploadPeriod time.Duration, kongConfig sendconfig.Config, - clientsProvider clients.AdminAPIClientsProvider, updateStrategyResolver sendconfig.UpdateStrategyResolver, configStatusNotifier clients.ConfigStatusNotifier, + metricsRecorder metrics.Recorder, ) (*konnect.ConfigSynchronizer, error) { logger := ctrl.LoggerFrom(ctx).WithName("konnect-config-synchronizer") + konnectClientFactory := adminapi.NewKonnectClientFactory(cfg.Konnect, logger.WithName("konnect-client-factory")) s := konnect.NewConfigSynchronizer( ctrl.LoggerFrom(ctx).WithName("konnect-config-synchronizer"), kongConfig, configUploadPeriod, - clientsProvider, + konnectClientFactory, updateStrategyResolver, sendconfig.NewDefaultConfigurationChangeDetector(logger), configStatusNotifier, + metricsRecorder, ) err := mgr.Add(s) if err != nil { diff --git a/internal/metrics/prometheus.go b/internal/metrics/prometheus.go index ae7107903e..0bee962151 100644 --- a/internal/metrics/prometheus.go +++ b/internal/metrics/prometheus.go @@ -14,6 +14,25 @@ import ( "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/deckerrors" ) +// Recorder is an interface for recording metrics. +type Recorder interface { + RecordPushFailure(p Protocol, duration time.Duration, size mo.Option[int], dataplane string, brokenResourcesCount int, err error) + RecordPushSuccess(protocol Protocol, duration time.Duration, size mo.Option[int], target string) + RecordFallbackPushSuccess(protocol Protocol, duration time.Duration, size mo.Option[int], target string) + RecordFallbackPushFailure(protocol Protocol, duration time.Duration, size mo.Option[int], target string, failedResources int, err error) + RecordProcessedConfigSnapshotCacheHit() + RecordProcessedConfigSnapshotCacheMiss() + RecordTranslationFailure(duration time.Duration) + RecordTranslationBrokenResources(count int) + RecordTranslationSuccess(duration time.Duration) + RecordFallbackTranslationBrokenResources(count int) + RecordFallbackTranslationFailure(duration time.Duration) + RecordFallbackTranslationSuccess(duration time.Duration) + RecordFallbackCacheGenerationDuration(since time.Duration, err error) +} + +var _ Recorder = &CtrlFuncMetrics{} + // descriptions of these metrics are found below, where their help text is set in NewCtrlFuncMetrics() type CtrlFuncMetrics struct { diff --git a/test/mocks/configuration_change_detector.go b/test/mocks/configuration_change_detector.go new file mode 100644 index 0000000000..506c33f4bc --- /dev/null +++ b/test/mocks/configuration_change_detector.go @@ -0,0 +1,20 @@ +package mocks + +import ( + "context" + + "github.com/kong/go-database-reconciler/pkg/file" + + "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/sendconfig" +) + +// ConfigurationChangeDetector is a mock implementation of sendconfig.ConfigurationChangeDetector. +type ConfigurationChangeDetector struct { + ConfigurationChanged bool +} + +func (m ConfigurationChangeDetector) HasConfigurationChanged( + context.Context, []byte, []byte, *file.Content, sendconfig.KonnectAwareClient, sendconfig.StatusClient, +) (bool, error) { + return m.ConfigurationChanged, nil +} diff --git a/test/mocks/konnect_client_factory.go b/test/mocks/konnect_client_factory.go new file mode 100644 index 0000000000..3b3689e2e1 --- /dev/null +++ b/test/mocks/konnect_client_factory.go @@ -0,0 +1,15 @@ +package mocks + +import ( + "context" + + "github.com/kong/kubernetes-ingress-controller/v3/internal/adminapi" +) + +type KonnectClientFactory struct { + Client *adminapi.KonnectClient +} + +func (f *KonnectClientFactory) NewKonnectClient(context.Context) (*adminapi.KonnectClient, error) { + return f.Client, nil +} diff --git a/test/mocks/konnect_kongstate_updater.go b/test/mocks/konnect_kongstate_updater.go new file mode 100644 index 0000000000..bfd76796e8 --- /dev/null +++ b/test/mocks/konnect_kongstate_updater.go @@ -0,0 +1,27 @@ +package mocks + +import ( + "context" + + "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/kongstate" +) + +type KonnectKongStateUpdater struct { + calls []KonnectKongStateUpdaterCall +} + +type KonnectKongStateUpdaterCall struct { + KongState *kongstate.KongState + IsFallback bool +} + +func (k *KonnectKongStateUpdater) UpdateKongState(_ context.Context, kongState *kongstate.KongState, isFallback bool) { + k.calls = append(k.calls, KonnectKongStateUpdaterCall{ + KongState: kongState, + IsFallback: isFallback, + }) +} + +func (k *KonnectKongStateUpdater) Calls() []KonnectKongStateUpdaterCall { + return k.calls +} diff --git a/test/mocks/metrics_recorder.go b/test/mocks/metrics_recorder.go new file mode 100644 index 0000000000..ec079d14e9 --- /dev/null +++ b/test/mocks/metrics_recorder.go @@ -0,0 +1,50 @@ +package mocks + +import ( + "time" + + "github.com/samber/mo" + + "github.com/kong/kubernetes-ingress-controller/v3/internal/metrics" +) + +type MetricsRecorder struct{} + +func (m MetricsRecorder) RecordPushFailure(metrics.Protocol, time.Duration, mo.Option[int], string, int, error) { +} + +func (m MetricsRecorder) RecordPushSuccess(metrics.Protocol, time.Duration, mo.Option[int], string) { +} + +func (m MetricsRecorder) RecordFallbackPushSuccess(metrics.Protocol, time.Duration, mo.Option[int], string) { +} + +func (m MetricsRecorder) RecordFallbackPushFailure(metrics.Protocol, time.Duration, mo.Option[int], string, int, error) { +} + +func (m MetricsRecorder) RecordProcessedConfigSnapshotCacheHit() { +} + +func (m MetricsRecorder) RecordProcessedConfigSnapshotCacheMiss() { +} + +func (m MetricsRecorder) RecordTranslationFailure(time.Duration) { +} + +func (m MetricsRecorder) RecordTranslationBrokenResources(int) { +} + +func (m MetricsRecorder) RecordTranslationSuccess(time.Duration) { +} + +func (m MetricsRecorder) RecordFallbackTranslationBrokenResources(int) { +} + +func (m MetricsRecorder) RecordFallbackTranslationFailure(time.Duration) { +} + +func (m MetricsRecorder) RecordFallbackTranslationSuccess(time.Duration) { +} + +func (m MetricsRecorder) RecordFallbackCacheGenerationDuration(time.Duration, error) { +} diff --git a/test/mocks/update_strategy.go b/test/mocks/update_strategy.go new file mode 100644 index 0000000000..7a29a8bff3 --- /dev/null +++ b/test/mocks/update_strategy.go @@ -0,0 +1,27 @@ +package mocks + +import ( + "context" + + "github.com/samber/mo" + + "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/sendconfig" + "github.com/kong/kubernetes-ingress-controller/v3/internal/metrics" +) + +// UpdateStrategy is a mock implementation of sendconfig.UpdateStrategy. +type UpdateStrategy struct { + onUpdate func(content sendconfig.ContentWithHash) (mo.Option[int], error) +} + +func (m *UpdateStrategy) Update(_ context.Context, targetContent sendconfig.ContentWithHash) (n mo.Option[int], err error) { + return m.onUpdate(targetContent) +} + +func (m *UpdateStrategy) MetricsProtocol() metrics.Protocol { + return metrics.ProtocolDBLess +} + +func (m *UpdateStrategy) Type() string { + return "Mock" +} diff --git a/test/mocks/update_strategy_resolver.go b/test/mocks/update_strategy_resolver.go new file mode 100644 index 0000000000..c37e387164 --- /dev/null +++ b/test/mocks/update_strategy_resolver.go @@ -0,0 +1,154 @@ +package mocks + +import ( + "errors" + "fmt" + "sync" + "testing" + "time" + + "github.com/samber/lo" + "github.com/samber/mo" + "github.com/stretchr/testify/require" + + "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/sendconfig" + "github.com/kong/kubernetes-ingress-controller/v3/internal/diagnostics" +) + +const mockUpdateReturnedConfigSize = 22 + +// UpdateStrategyResolver is a mock implementation of sendconfig.UpdateStrategyResolver. +type UpdateStrategyResolver struct { + updateCalledForURLs []string + lastUpdatedContentForURLs map[string]sendconfig.ContentWithHash + errorsToReturnOnUpdate map[string][]error + lock sync.RWMutex +} + +func NewUpdateStrategyResolver() *UpdateStrategyResolver { + return &UpdateStrategyResolver{ + lastUpdatedContentForURLs: map[string]sendconfig.ContentWithHash{}, + errorsToReturnOnUpdate: map[string][]error{}, + } +} + +func (f *UpdateStrategyResolver) ResolveUpdateStrategy(c sendconfig.UpdateClient, _ *diagnostics.ClientDiagnostic) sendconfig.UpdateStrategy { + f.lock.Lock() + defer f.lock.Unlock() + + url := c.AdminAPIClient().BaseRootURL() + return &UpdateStrategy{onUpdate: f.updateCalledForURLCallback(url)} +} + +// ReturnErrorOnUpdate will cause the mockUpdateStrategy with a given Admin API URL to return an error on Update(). +// Errors will be returned following FIFO order. Each call to this function adds a new error to the queue. +func (f *UpdateStrategyResolver) ReturnErrorOnUpdate(url string) { + f.lock.Lock() + defer f.lock.Unlock() + + f.errorsToReturnOnUpdate[url] = append(f.errorsToReturnOnUpdate[url], errors.New("error on update")) +} + +// ReturnSpecificErrorOnUpdate will cause the mockUpdateStrategy with a given Admin API URL to return a specific error +// on Update() call. Errors will be returned following FIFO order. Each call to this function adds a new error to the queue. +func (f *UpdateStrategyResolver) ReturnSpecificErrorOnUpdate(url string, err error) { + f.lock.Lock() + defer f.lock.Unlock() + + f.errorsToReturnOnUpdate[url] = append(f.errorsToReturnOnUpdate[url], err) +} + +// GetUpdateCalledForURLs returns the called URLs. +func (f *UpdateStrategyResolver) GetUpdateCalledForURLs() []string { + f.lock.RLock() + defer f.lock.RUnlock() + + urls := make([]string, 0, len(f.updateCalledForURLs)) + urls = append(urls, f.updateCalledForURLs...) + return urls +} + +func (f *UpdateStrategyResolver) LastUpdatedContentForURL(url string) (sendconfig.ContentWithHash, bool) { + f.lock.RLock() + defer f.lock.RUnlock() + c, ok := f.lastUpdatedContentForURLs[url] + return c, ok +} + +// AssertUpdateCalledForURLs asserts that the mockUpdateStrategy was called for the given URLs. +func (f *UpdateStrategyResolver) AssertUpdateCalledForURLs(t *testing.T, urls []string, msgAndArgs ...any) { + t.Helper() + + f.lock.RLock() + defer f.lock.RUnlock() + + if len(msgAndArgs) == 0 { + msgAndArgs = []any{"update was not called for all URLs"} + } + require.ElementsMatch(t, urls, f.updateCalledForURLs, msgAndArgs...) +} + +func (f *UpdateStrategyResolver) AssertUpdateCalledForURLsWithGivenCount(t *testing.T, urlToCount map[string]int, msgAndArgs ...any) { + t.Helper() + + f.lock.RLock() + defer f.lock.RUnlock() + actualURLToCount := lo.CountValues(f.updateCalledForURLs) + for url, callCount := range urlToCount { + m := []any{ + fmt.Sprintf("URL %s should receive %d update calls", url, callCount), + } + m = append(m, msgAndArgs...) + require.Equal(t, callCount, actualURLToCount[url], m...) + } +} + +func (f *UpdateStrategyResolver) AssertNoUpdateCalled(t *testing.T) { + t.Helper() + + f.lock.RLock() + defer f.lock.RUnlock() + + require.Empty(t, f.updateCalledForURLs, "update was called") +} + +func (f *UpdateStrategyResolver) EventuallyGetLastUpdatedContentForURL( + t *testing.T, url string, waitTime, waitTick time.Duration, msgAndArgs ...any, +) sendconfig.ContentWithHash { + t.Helper() + + var content sendconfig.ContentWithHash + if len(msgAndArgs) == 0 { + msgAndArgs = []any{"update was not called for URL " + url} + } + require.Eventually(t, func() bool { + c, ok := f.LastUpdatedContentForURL(url) + if ok { + content = c + return true + } + return false + }, waitTime, waitTick, msgAndArgs...) + return content +} + +// updateCalledForURLCallback returns a function that will be called when the mockUpdateStrategy is called. +// That enables us to track which URLs were called. +func (f *UpdateStrategyResolver) updateCalledForURLCallback(url string) func(sendconfig.ContentWithHash) (mo.Option[int], error) { + return func(content sendconfig.ContentWithHash) (mo.Option[int], error) { + f.lock.Lock() + defer f.lock.Unlock() + + f.updateCalledForURLs = append(f.updateCalledForURLs, url) + f.lastUpdatedContentForURLs[url] = content + if errsToReturn, ok := f.errorsToReturnOnUpdate[url]; ok { + if len(errsToReturn) > 0 { + err := errsToReturn[0] + f.errorsToReturnOnUpdate[url] = errsToReturn[1:] + return mo.None[int](), err + } + return mo.Some(mockUpdateReturnedConfigSize), nil + } + return mo.Some(mockUpdateReturnedConfigSize), nil + } +} diff --git a/test/mocks/uuid.go b/test/mocks/uuid.go deleted file mode 100644 index f0cd6d3752..0000000000 --- a/test/mocks/uuid.go +++ /dev/null @@ -1,13 +0,0 @@ -package mocks - -import "github.com/kong/kubernetes-ingress-controller/v3/internal/util" - -var _ = util.UUIDGenerator(&StaticUUIDGenerator{}) - -type StaticUUIDGenerator struct { - UUID string -} - -func (s StaticUUIDGenerator) NewString() string { - return s.UUID -}