diff --git a/filters/openpolicyagent/openpolicyagent.go b/filters/openpolicyagent/openpolicyagent.go index 1024208ec4..9e6536b32c 100644 --- a/filters/openpolicyagent/openpolicyagent.go +++ b/filters/openpolicyagent/openpolicyagent.go @@ -21,6 +21,7 @@ import ( "github.com/open-policy-agent/opa/config" "github.com/open-policy-agent/opa/logging" "github.com/open-policy-agent/opa/plugins" + "github.com/open-policy-agent/opa/plugins/bundle" "github.com/open-policy-agent/opa/plugins/discovery" "github.com/open-policy-agent/opa/rego" "github.com/open-policy-agent/opa/runtime" @@ -50,6 +51,11 @@ const ( DefaultRequestBodyBufferSize = 8 * 1024 // 8 KB spanNameEval = "open-policy-agent" + + GeneralPluginStatusStartupListener = "general-plugin-status-startup" + DiscoveryPluginStartupListener = "skipper-instance-startup-discovery" + PluginStatusStartupListener = "skipper-instance-startup-plugin" + BundlePluginStartupListener = "skipper-instance-startup-bundle" ) type OpenPolicyAgentRegistry struct { @@ -363,16 +369,18 @@ func (registry *OpenPolicyAgentRegistry) newOpenPolicyAgentInstance(bundleName s } type OpenPolicyAgentInstance struct { - manager *plugins.Manager - instanceConfig OpenPolicyAgentInstanceConfig - opaConfig *config.Config - bundleName string - preparedQuery *rego.PreparedEvalQuery - preparedQueryDoOnce *sync.Once - interQueryBuiltinCache iCache.InterQueryCache - once sync.Once - stopped bool - registry *OpenPolicyAgentRegistry + manager *plugins.Manager + instanceConfig OpenPolicyAgentInstanceConfig + opaConfig *config.Config + bundleName string + preparedQuery *rego.PreparedEvalQuery + preparedQueryDoOnce *sync.Once + interQueryBuiltinCache iCache.InterQueryCache + once sync.Once + stopped bool + registry *OpenPolicyAgentRegistry + registerBundleListenerOnce *sync.Once + registerDiscoveryListenerOnce *sync.Once maxBodyBytes int64 bodyReadBufferSize int64 @@ -469,7 +477,9 @@ func (registry *OpenPolicyAgentRegistry) new(store storage.Store, configBytes [] preparedQueryDoOnce: new(sync.Once), interQueryBuiltinCache: iCache.NewInterQueryCache(manager.InterQueryBuiltinCacheConfig()), - idGenerator: uniqueIDGenerator, + idGenerator: uniqueIDGenerator, + registerDiscoveryListenerOnce: new(sync.Once), + registerBundleListenerOnce: new(sync.Once), } manager.RegisterCompilerTrigger(opa.compilerUpdated) @@ -480,38 +490,77 @@ func (registry *OpenPolicyAgentRegistry) new(store storage.Store, configBytes [] // Start asynchronously starts the policy engine's plugins that download // policies, report status, etc. func (opa *OpenPolicyAgentInstance) Start(ctx context.Context, timeout time.Duration) error { - err := opa.manager.Start(ctx) - if err != nil { - return err - } + discoveryPlugin := discovery.Lookup(opa.manager) - // check readiness of all plugins - pluginsReady := func() bool { - for _, status := range opa.manager.PluginStatus() { - if status != nil && status.State != plugins.StateOK { - return false + done := make(chan struct{}) + failed := make(chan error, 1) + + opa.registerDiscoveryListenerOnce.Do(func() { + + discoveryPlugin.RegisterListener(DiscoveryPluginStartupListener, func(status bundle.Status) { + handleStatusErrors(status, failed, "discovery plugin") + }) + //defer discoveryPlugin.Unregister(DiscoveryPluginStartupListener) //ToDo + }) + + opa.manager.RegisterPluginStatusListener(PluginStatusStartupListener, func(status map[string]*plugins.Status) { + if _, exists := status["bundle"]; exists { + bundlePlugin := bundle.Lookup(opa.manager) + if bundlePlugin != nil { + opa.registerBundleListenerOnce.Do(func() { + bundlePlugin.Register(BundlePluginStartupListener, func(status bundle.Status) { + handleStatusErrors(status, failed, "bundle plugin") + //defer bundlePlugin.Unregister(BundlePluginStartupListener) //ToDo + }) + }) } } - return true - } + }) + defer opa.manager.UnregisterPluginStatusListener(PluginStatusStartupListener) - err = waitFunc(ctx, pluginsReady, 100*time.Millisecond) + // Register listener for general plugin status checks + opa.manager.RegisterPluginStatusListener(GeneralPluginStatusStartupListener, func(status map[string]*plugins.Status) { + for _, pluginStatus := range status { + if pluginStatus != nil && pluginStatus.State != plugins.StateOK { + return + } + } + close(done) + }) + defer opa.manager.UnregisterPluginStatusListener(GeneralPluginStatusStartupListener) + err := opa.manager.Start(ctx) if err != nil { + return err + } + + select { + case <-ctx.Done(): + timeoutErr := ctx.Err() + for pluginName, status := range opa.manager.PluginStatus() { if status != nil && status.State != plugins.StateOK { opa.Logger().WithFields(map[string]interface{}{ "plugin_name": pluginName, "plugin_state": status.State, "error_message": status.Message, - }).Error("Open policy agent plugin did not start in time") + }).Error("Open policy agent plugin: %v did not start in time", pluginName) } } opa.Close(ctx) - return fmt.Errorf("one or more open policy agent plugins failed to start in %v with error: %w", timeout, err) + if timeoutErr != nil { + return fmt.Errorf("one or more open policy agent plugins failed to start in %v with error: %w", timeout, timeoutErr) + } + return fmt.Errorf("one or more open policy agent plugins failed to start in %v", timeout) + + case <-done: + return nil + case err := <-failed: + opa.Close(ctx) + + return err } - return nil } func (opa *OpenPolicyAgentInstance) Close(ctx context.Context) { @@ -521,25 +570,6 @@ func (opa *OpenPolicyAgentInstance) Close(ctx context.Context) { }) } -func waitFunc(ctx context.Context, fun func() bool, interval time.Duration) error { - if fun() { - return nil - } - ticker := time.NewTicker(interval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return fmt.Errorf("timed out while starting: %w", ctx.Err()) - case <-ticker.C: - if fun() { - return nil - } - } - } -} - func configLabelsInfo(opaConfig config.Config) func(*plugins.Manager) { info := ast.NewObject() labels := ast.NewObject() @@ -796,3 +826,46 @@ func (l *QuietLogger) Error(fmt string, a ...interface{}) { func (l *QuietLogger) Warn(fmt string, a ...interface{}) { l.target.Warn(fmt, a) } + +var temporaryClientErrorHTTPCodes = map[int64]struct{}{ + 429: {}, // too many requests + 408: {}, // request timeout +} + +func isTemporaryError(code int64) bool { + _, exists := temporaryClientErrorHTTPCodes[code] + return exists +} + +func handleStatusErrors( + status bundle.Status, + failed chan error, + prefix string, +) { + if status.Code == "bundle_error" { + if status.HTTPCode == "" { + failed <- formatStatusError(prefix, status) + return + } + code, err := status.HTTPCode.Int64() + if err == nil { + if code >= 400 && code < 500 && !isTemporaryError(code) { + // Fail for error codes in the range 400-500 excluding temporary errors + failed <- formatStatusError(prefix, status) + return + } else if code >= 500 { + // Do not fail for 5xx errors and keep retrying + return + } + } + if err != nil { + failed <- formatStatusError(prefix, status) + return + } + } +} + +func formatStatusError(prefix string, status bundle.Status) error { + return fmt.Errorf("%s failed: Name: %s, Code: %s, Message: %s, HTTPCode: %s, Errors: %v", + prefix, status.Name, status.Code, status.Message, status.HTTPCode, status.Errors) +} diff --git a/filters/openpolicyagent/openpolicyagent_test.go b/filters/openpolicyagent/openpolicyagent_test.go index 6c6dc7d923..d80cf82edf 100644 --- a/filters/openpolicyagent/openpolicyagent_test.go +++ b/filters/openpolicyagent/openpolicyagent_test.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "net/http" + "net/http/httptest" "os" "strconv" "testing" @@ -243,7 +244,7 @@ func TestRegistry(t *testing.T) { assert.Error(t, err, "should not work after close") } -func TestOpaEngineStartFailureWithTimeout(t *testing.T) { +func TestOpaEngineStartFailureWithWrongBundle(t *testing.T) { _, config := mockControlPlaneWithDiscoveryBundle("bundles/discovery-with-wrong-bundle") registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second)) @@ -259,7 +260,7 @@ func TestOpaEngineStartFailureWithTimeout(t *testing.T) { err = engine.Start(ctx, cfg.startupTimeout) assert.True(t, engine.stopped) - assert.Contains(t, err.Error(), "one or more open policy agent plugins failed to start in 1s") + assert.Contains(t, err.Error(), "bundle plugin failed: Name: bundles/non-existing-bundle, Code: bundle_error, Message: server replied with Not Found, HTTPCode: 404") } func TestOpaActivationSuccessWithDiscovery(t *testing.T) { @@ -340,7 +341,7 @@ func TestOpaActivationTimeOutWithDiscoveryPointingWrongBundle(t *testing.T) { instance, err := registry.NewOpenPolicyAgentInstance("test", *cfg, "testfilter") assert.Nil(t, instance) - assert.Contains(t, err.Error(), "one or more open policy agent plugins failed to start in 1s with error: timed out while starting: context deadline exceeded") + assert.Contains(t, err.Error(), "bundle plugin failed: Name: bundles/non-existing-bundle, Code: bundle_error, Message: server replied with Not Found, HTTPCode: 404") assert.Equal(t, 0, len(registry.instances)) } @@ -354,7 +355,7 @@ func TestOpaActivationTimeOutWithDiscoveryParsingError(t *testing.T) { instance, err := registry.NewOpenPolicyAgentInstance("test", *cfg, "testfilter") assert.Nil(t, instance) - assert.Contains(t, err.Error(), "one or more open policy agent plugins failed to start in 1s with error: timed out while starting: context deadline exceeded") + assert.Contains(t, err.Error(), "one or more open policy agent plugins failed to start in 1s with error: context deadline exceeded") assert.Equal(t, 0, len(registry.instances)) } @@ -703,3 +704,147 @@ func TestBodyExtractionUnknownBody(t *testing.T) { f1() f2() } + +func TestOpaActivationFailureWithInvalidDiscovery(t *testing.T) { + _, config := mockControlPlaneWithDiscoveryBundle("bundles/invalid-discovery") + + registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second)) + + cfg, err := NewOpenPolicyAgentConfig(WithConfigTemplate(config)) + assert.NoError(t, err) + + _, err = registry.NewOpenPolicyAgentInstance("test", *cfg, "testfilter") + assert.Error(t, err) + assert.Equal(t, "discovery plugin failed: Name: discovery, Code: bundle_error, Message: server replied with Not Found, HTTPCode: 404, Errors: []", err.Error()) +} + +func TestDiscoveryRetryErrors(t *testing.T) { + tests := []struct { + name string + statusCode int + expectedError string + }{ + {"TemporaryError429", http.StatusTooManyRequests, "one or more open policy agent plugins failed to start in 1s with error: context deadline exceeded"}, + {"TemporaryError408", http.StatusRequestTimeout, "one or more open policy agent plugins failed to start in 1s with error: context deadline exceeded"}, + {"Retry5xx", http.StatusInternalServerError, "one or more open policy agent plugins failed to start in 1s with error: context deadline exceeded"}, + {"Failure403BundleError", http.StatusForbidden, "discovery plugin failed: Name: discovery, Code: bundle_error, Message: server replied with Forbidden, HTTPCode: 403"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + testDiscoveryBundleError(t, tt.statusCode, tt.expectedError) + }) + } +} + +func TestResourceBundleRetryErrors(t *testing.T) { + tests := []struct { + name string + statusCode int + expectedError string + }{ + {"TemporaryError429", http.StatusTooManyRequests, "one or more open policy agent plugins failed to start in 1s with error: context deadline exceeded"}, + {"TemporaryError408", http.StatusRequestTimeout, "one or more open policy agent plugins failed to start in 1s with error: context deadline exceeded"}, + {"Retry5xx", http.StatusInternalServerError, "one or more open policy agent plugins failed to start in 1s with error: context deadline exceeded"}, + {"Failure403BundleError", http.StatusForbidden, "bundle plugin failed: Name: test, Code: bundle_error, Message: server replied with Forbidden, HTTPCode: 403"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + testResourceBundleError(t, tt.statusCode, tt.expectedError) + }) + } +} + +// Helper function to test discovery bundle temporary errors +func testDiscoveryBundleError(t *testing.T, statusCode int, expectedError string) { + mockDiscoveryBundleServer := mockBundleServerWithStatus("/bundles/discovery", statusCode) + defer mockDiscoveryBundleServer.Close() + + // Create the plugin configuration + config := []byte(fmt.Sprintf(`{ + "services": { + "test": { + "url": %q + } + }, + "labels": { + "environment": "envValue" + }, + "discovery": { + "name": "discovery", + "resource": "/bundles/discovery", + "service": "test" + } + }`, mockDiscoveryBundleServer.URL)) + + registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second)) + + cfg, err := NewOpenPolicyAgentConfig(WithConfigTemplate(config), WithStartupTimeout(1*time.Second)) + assert.NoError(t, err) + + instance, err := registry.NewOpenPolicyAgentInstance("test", *cfg, "testfilter") + assert.Nil(t, instance) + assert.Contains(t, err.Error(), expectedError) +} + +// Helper function to test resource bundle temporary errors +func testResourceBundleError(t *testing.T, statusCode int, expectedError string) { + mockServer := mockBundleServerWithStatus("/bundles/test.tgz", statusCode) + defer mockServer.Close() + + // Create the OPA control plane with the discovery bundle + opaControlPlane := opasdktest.MustNewServer( + opasdktest.MockBundle("/bundles/discovery", map[string]string{ + "data.json": fmt.Sprintf(`{ + "discovery": { + "bundles": { + "test": { + "persist": false, + "resource": "bundles/test.tgz", + "service": "styra-bundles" + } + }, + "services": { + "styra-bundles": { + "url": "%s" + } + } + } + }`, mockServer.URL), + }), + ) + + config := []byte(fmt.Sprintf(`{ + "services": { + "discovery": { + "url": %q + } + }, + "labels": { + "environment": "envValue" + }, + "discovery": { + "name": "discovery", + "resource": "/bundles/discovery", + "service": "discovery" + } + }`, opaControlPlane.URL())) + + registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second)) + + cfg, err := NewOpenPolicyAgentConfig(WithConfigTemplate(config), WithStartupTimeout(1*time.Second)) + assert.NoError(t, err) + + instance, err := registry.NewOpenPolicyAgentInstance("test", *cfg, "testfilter") + assert.Nil(t, instance) + assert.Contains(t, err.Error(), expectedError) +} + +func mockBundleServerWithStatus(path string, statusCode int) *httptest.Server { + handler := http.NewServeMux() + handler.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) { + http.Error(w, http.StatusText(statusCode), statusCode) + }) + return httptest.NewServer(handler) +}