Skip to content

feat(policy): policy should maintain a cache of entitlement policy #2327

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 28 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
4923115
feat(policy): cache entitlement policy in authz v2
jakedoublev May 28, 2025
c86d9d7
hook to run after services are started
jakedoublev May 28, 2025
12e2c4d
policy config
jakedoublev May 28, 2025
9096218
consume policy cache
jakedoublev May 28, 2025
0bfd2e3
lint fixes
jakedoublev May 28, 2025
603fb63
fix cache utilization in auth service and lower defaults
jakedoublev May 28, 2025
cc99503
lint fixes
jakedoublev May 28, 2025
5747db1
test: sleep to ensure roundtrip tests hit cached policy
jakedoublev May 28, 2025
281f41d
put back rttests
jakedoublev May 28, 2025
a5859e1
put back authz doing any caching
jakedoublev May 28, 2025
a14f9ae
set cache on relevant policy services and refactor to servicesStarted…
jakedoublev May 28, 2025
a668dd7
improve log
jakedoublev May 28, 2025
9505dd0
working cache
jakedoublev May 29, 2025
3ba044c
lint fixes
jakedoublev May 29, 2025
b1ec09a
fix limit/offset
jakedoublev May 29, 2025
eae73f6
rm debounce on write-triggered refresh
jakedoublev May 29, 2025
f1d2fbb
Merge branch 'main' into feat/authz-v2-cache-policy
jakedoublev May 29, 2025
5abe1fa
Merge branch 'main' into feat/authz-v2-cache-policy
jakedoublev May 30, 2025
ffa528c
rm mutation cache refreshes
jakedoublev May 30, 2025
e18c04a
disable caching by default
jakedoublev May 30, 2025
0103264
fixes
jakedoublev May 30, 2025
f0a56a6
ensure close is called to shut down db clients and caches
jakedoublev May 30, 2025
184cfff
lint fixes
jakedoublev May 30, 2025
f864a33
lint fixes
jakedoublev May 30, 2025
07d6849
Merge remote-tracking branch 'origin' into feat/authz-v2-cache-policy
jakedoublev Jun 2, 2025
a1f8ac6
ristretto cache/go-cache implementation
jakedoublev Jun 2, 2025
b7cd66c
lint fix
jakedoublev Jun 2, 2025
57873d8
fix shutdown panic
jakedoublev Jun 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions opentdf-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ services:
# policy is enabled by default in mode 'all'
# policy:
# enabled: true
# list_request_limit_default: 1000
# list_request_limit_max: 2500
# list_request_limit_default: 1000
# list_request_limit_max: 2500
# cache_refresh_interval_seconds: 15
server:
tls:
enabled: false
Expand Down
1 change: 1 addition & 0 deletions opentdf-example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ services:
# enabled: true
# list_request_limit_default: 1000
# list_request_limit_max: 2500
# cache_refresh_interval_seconds: 15
server:
auth:
enabled: true
Expand Down
23 changes: 23 additions & 0 deletions service/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
// ChangeHook is a function invoked when the configuration changes.
type ChangeHook func(configServices ServicesMap) error

// ServicesStartedHook is a function invoked when all service registrations are complete.
type ServicesStartedHook func(context.Context) error

// Config structure holding all services.
type ServicesMap map[string]ServiceConfig

Expand Down Expand Up @@ -47,6 +50,8 @@ type Config struct {
// Trace is for configuring open telemetry based tracing.
Trace tracing.Config `mapstructure:"trace"`

// onServicesStartedHooks is a list of functions to call when all service registrations are complete.
onServicesStartedHooks []ServicesStartedHook
// onConfigChangeHooks is a list of functions to call when the configuration changes.
onConfigChangeHooks []ChangeHook
// loaders is a list of configuration loaders.
Expand Down Expand Up @@ -110,6 +115,11 @@ func (c *Config) AddOnConfigChangeHook(hook ChangeHook) {
c.onConfigChangeHooks = append(c.onConfigChangeHooks, hook)
}

// AddOnServicesStartedHook adds a hook to the list of hooks to call when all service registrations are complete.
func (c *Config) AddOnServicesStartedHook(hook ServicesStartedHook) {
c.onServicesStartedHooks = append(c.onServicesStartedHooks, hook)
}

// Watch starts watching the configuration for changes in all config loaders.
func (c *Config) Watch(ctx context.Context) error {
if len(c.loaders) == 0 {
Expand All @@ -123,6 +133,19 @@ func (c *Config) Watch(ctx context.Context) error {
return nil
}

// RunServicesStartedHooks triggers the service hooks that run once all services are live.
func (c *Config) RunServicesStartedHooks(ctx context.Context) error {
if len(c.onServicesStartedHooks) == 0 {
return nil
}
for _, hook := range c.onServicesStartedHooks {
if err := hook(ctx); err != nil {
return err
}
}
return nil
}

// Close invokes close method on all config loaders.
func (c *Config) Close(_ context.Context) error {
if len(c.loaders) == 0 {
Expand Down
4 changes: 4 additions & 0 deletions service/pkg/server/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ func startServices(ctx context.Context, cfg *config.Config, otdf *server.OpenTDF
return func() {}, fmt.Errorf("failed to register config update hook: %w", err)
}

if err := svc.RegisterOnServicesStartedHook(ctx, cfg.AddOnServicesStartedHook); err != nil {
return func() {}, fmt.Errorf("failed to register on complete service registration hook: %w", err)
}

// Register Connect RPC Services
if err := svc.RegisterConnectRPCServiceHandler(ctx, otdf.ConnectRPC); err != nil {
logger.Info("service did not register a connect-rpc handler", slog.String("namespace", ns))
Expand Down
5 changes: 5 additions & 0 deletions service/pkg/server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,11 @@ func Start(f ...StartOptions) error {
}
defer cfg.Close(ctx)

// Run the services started hooks
if err := cfg.RunServicesStartedHooks(ctx); err != nil {
Copy link
Preview

Copilot AI Jun 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The RunServicesStartedHooks call appears twice in this function; remove the earlier invocation to avoid running the hooks twice.

Copilot uses AI. Check for mistakes.

return fmt.Errorf("failed to run service registration complete hooks: %w", err)
}

// Start the server
logger.Info("starting opentdf")
if err := otdf.Start(); err != nil {
Expand Down
21 changes: 21 additions & 0 deletions service/pkg/serviceregistry/serviceregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ type (
RegisterFunc[S any] func(RegistrationParams) (impl S, HandlerServer HandlerServer)
// Allow services to implement handling for config changes as direced by caller
OnConfigUpdateHook func(context.Context, config.ServiceConfig) error
// Allow services to implement a callback to run when all services are registered
OnServicesStartedHook func(context.Context) error
)

// DBRegister is a struct that holds the information needed to register a service with a database
Expand All @@ -81,6 +83,7 @@ type IService interface {
IsStarted() bool
Shutdown() error
RegisterConfigUpdateHook(ctx context.Context, hookAppender func(config.ChangeHook)) error
RegisterOnServicesStartedHook(ctx context.Context, hookAppender func(config.ServicesStartedHook)) error
RegisterConnectRPCServiceHandler(context.Context, *server.ConnectRPC) error
RegisterGRPCGatewayHandler(context.Context, *runtime.ServeMux, *grpc.ClientConn) error
RegisterHTTPHandlers(context.Context, *runtime.ServeMux) error
Expand Down Expand Up @@ -110,6 +113,8 @@ type ServiceOptions[S any] struct {
ServiceDesc *grpc.ServiceDesc
// OnConfigUpdate is a hook to handle in-service actions when config changes
OnConfigUpdate OnConfigUpdateHook
// OnServicesStarted is a hook to handle in-service actions that should run when all services are registered
OnServicesStarted OnServicesStartedHook
// RegisterFunc is the function that will be called to register the service
RegisterFunc RegisterFunc[S]
// HTTPHandlerFunc is the function that will be called to register extra http handlers
Expand Down Expand Up @@ -192,6 +197,22 @@ func (s Service[S]) RegisterConfigUpdateHook(ctx context.Context, hookAppender f
return nil
}

// RegisterOnServicesStartedHook appends a registered service's onServicesStartedHook to any watching services.
func (s Service[S]) RegisterOnServicesStartedHook(_ context.Context, hookAppender func(config.ServicesStartedHook)) error {
// If no hook is registered, exit
if s.OnServicesStarted != nil {
var onChange config.ServicesStartedHook = func(ctx context.Context) error {
slog.Debug("OnServicesStarted hook called",
slog.String("namespace", s.GetNamespace()),
slog.String("service", s.GetServiceDesc().ServiceName),
)
return s.OnServicesStarted(ctx)
}
hookAppender(onChange)
}
return nil
}

func (s Service[S]) RegisterConnectRPCServiceHandler(_ context.Context, connectRPC *server.ConnectRPC) error {
if s.ConnectRPCFunc == nil {
return errors.New("service did not register a handler")
Expand Down
74 changes: 66 additions & 8 deletions service/policy/attributes/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log/slog"

"connectrpc.com/connect"
"github.com/opentdf/platform/protocol/go/common"
"github.com/opentdf/platform/protocol/go/policy"
"github.com/opentdf/platform/protocol/go/policy/attributes"
"github.com/opentdf/platform/protocol/go/policy/attributes/attributesconnect"
Expand All @@ -24,6 +25,14 @@ type AttributesService struct { //nolint:revive // AttributesService is a valid
logger *logger.Logger
config *policyconfig.Config
trace.Tracer
cache *policyconfig.EntitlementPolicyCache // Cache for attributes and subject mappings
}

func OnServicesStarted(svc *AttributesService) serviceregistry.OnServicesStartedHook {
return func(ctx context.Context) error {
svc.cache = policyconfig.GetSharedEntitlementPolicyCache(ctx, svc.dbClient, svc.logger, svc.config)
return nil
}
}

func OnConfigUpdate(as *AttributesService) serviceregistry.OnConfigUpdateHook {
Expand All @@ -44,16 +53,18 @@ func OnConfigUpdate(as *AttributesService) serviceregistry.OnConfigUpdateHook {
func NewRegistration(ns string, dbRegister serviceregistry.DBRegister) *serviceregistry.Service[attributesconnect.AttributesServiceHandler] {
as := new(AttributesService)
onUpdateConfigHook := OnConfigUpdate(as)
onStartHook := OnServicesStarted(as)

return &serviceregistry.Service[attributesconnect.AttributesServiceHandler]{
Close: as.Close,
ServiceOptions: serviceregistry.ServiceOptions[attributesconnect.AttributesServiceHandler]{
Namespace: ns,
DB: dbRegister,
ServiceDesc: &attributes.AttributesService_ServiceDesc,
ConnectRPCFunc: attributesconnect.NewAttributesServiceHandler,
GRPCGatewayFunc: attributes.RegisterAttributesServiceHandler,
OnConfigUpdate: onUpdateConfigHook,
Namespace: ns,
DB: dbRegister,
ServiceDesc: &attributes.AttributesService_ServiceDesc,
ConnectRPCFunc: attributesconnect.NewAttributesServiceHandler,
GRPCGatewayFunc: attributes.RegisterAttributesServiceHandler,
OnConfigUpdate: onUpdateConfigHook,
OnServicesStarted: onStartHook,
RegisterFunc: func(srp serviceregistry.RegistrationParams) (attributesconnect.AttributesServiceHandler, serviceregistry.HandlerServer) {
logger := srp.Logger
cfg, err := policyconfig.GetSharedPolicyConfig(srp.Config)
Expand All @@ -65,15 +76,19 @@ func NewRegistration(ns string, dbRegister serviceregistry.DBRegister) *servicer
as.logger = logger
as.dbClient = policydb.NewClient(srp.DBClient, logger, int32(cfg.ListRequestLimitMax), int32(cfg.ListRequestLimitDefault))
as.config = cfg

return as, nil
},
},
}
}

// Close gracefully shuts down the service, closing the database client.
// Close gracefully shuts down the attributes service's cache and database client.
func (s *AttributesService) Close() {
s.logger.Info("gracefully shutting down attributes service")
if s.cache != nil {
s.cache.Stop()
}
s.dbClient.Close()
}

Expand Down Expand Up @@ -120,6 +135,50 @@ func (s *AttributesService) ListAttributes(ctx context.Context,
state := req.Msg.GetState().String()
s.logger.Debug("listing attribute definitions", slog.String("state", state))

// If active state and caching enabled, return from cache instead of DB
Copy link
Preview

Copilot AI Jun 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The caching logic in ListAttributes closely mirrors ListSubjectMappings; consider extracting shared pagination+cache handling into a helper to reduce duplication.

Copilot uses AI. Check for mistakes.

isActiveState := req.Msg.GetState() == common.ActiveStateEnum_ACTIVE_STATE_ENUM_ACTIVE
if s.cache.IsEnabled() && isActiveState {
s.logger.Debug("returning cached attributes")

limit := req.Msg.GetPagination().GetLimit()
if limit <= 0 {
limit = int32(s.config.ListRequestLimitDefault)
}
offset := req.Msg.GetPagination().GetOffset()

// Validate limit against the max configured value
maxLimit := int32(s.config.ListRequestLimitMax)
if maxLimit > 0 && limit > maxLimit {
return nil, db.StatusifyError(db.ErrListLimitTooLarge, db.ErrTextListLimitTooLarge)
}

// Get all cached attributes
cachedAttrs, total, err := s.cache.ListCachedAttributes(ctx, limit, offset)
if err != nil {
s.logger.Error("failed to retrieve cached attributes", slog.Any("error", err))
return nil, db.StatusifyError(err, db.ErrTextListRetrievalFailed)
}

// Calculate next offset using the same logic as the DB implementation
var nextOffset int32
next := offset + limit
if next < total {
nextOffset = next
} // else nextOffset remains 0

rsp := &attributes.ListAttributesResponse{
Attributes: cachedAttrs,
Pagination: &policy.PageResponse{
CurrentOffset: offset,
Total: total,
NextOffset: nextOffset,
},
}
return connect.NewResponse(rsp), nil
}

s.logger.Debug("querying database for attributes")

rsp, err := s.dbClient.ListAttributes(ctx, req.Msg)
if err != nil {
return nil, db.StatusifyError(err, db.ErrTextListRetrievalFailed)
Expand Down Expand Up @@ -365,7 +424,6 @@ func (s *AttributesService) DeactivateAttributeValue(ctx context.Context, req *c
s.logger.Audit.PolicyCRUDSuccess(ctx, auditParams)

rsp.Value = updated

return connect.NewResponse(rsp), nil
}

Expand Down
Loading
Loading