Skip to content

Use UpstreamAuthority.SubscribeToLocalBundle RPC #6090

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ require (
github.com/sirupsen/logrus v1.9.3
github.com/spiffe/go-spiffe/v2 v2.5.0
github.com/spiffe/spire-api-sdk v1.2.5-0.20250109200630-101d5e7de758
github.com/spiffe/spire-plugin-sdk v1.4.4-0.20240701180828-594312f4444d
github.com/spiffe/spire-plugin-sdk v1.4.4-0.20250606112051-68609d83ce7c
github.com/stretchr/testify v1.10.0
github.com/uber-go/tally/v4 v4.1.17
github.com/valyala/fastjson v1.6.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1662,8 +1662,8 @@ github.com/spiffe/go-spiffe/v2 v2.5.0 h1:N2I01KCUkv1FAjZXJMwh95KK1ZIQLYbPfhaxw8W
github.com/spiffe/go-spiffe/v2 v2.5.0/go.mod h1:P+NxobPc6wXhVtINNtFjNWGBTreew1GBUCwT2wPmb7g=
github.com/spiffe/spire-api-sdk v1.2.5-0.20250109200630-101d5e7de758 h1:7zZbXRN0z8IFjx8CQbkK25nypdaFq+ntCiRQGTNfdvY=
github.com/spiffe/spire-api-sdk v1.2.5-0.20250109200630-101d5e7de758/go.mod h1:4uuhFlN6KBWjACRP3xXwrOTNnvaLp1zJs8Lribtr4fI=
github.com/spiffe/spire-plugin-sdk v1.4.4-0.20240701180828-594312f4444d h1:Upcyq8u1aWFHTQSEskwxBE2PehobpY+M21LXXDS/mPw=
github.com/spiffe/spire-plugin-sdk v1.4.4-0.20240701180828-594312f4444d/go.mod h1:GA6o2PVLwyJdevT6KKt5ZXCY/ziAPna13y/seGk49Ik=
github.com/spiffe/spire-plugin-sdk v1.4.4-0.20250606112051-68609d83ce7c h1:Y2C0USw8YgFfzZpt/Tm+dYuf0swSbcDy5sOF7FHtCyE=
github.com/spiffe/spire-plugin-sdk v1.4.4-0.20250606112051-68609d83ce7c/go.mod h1:GA6o2PVLwyJdevT6KKt5ZXCY/ziAPna13y/seGk49Ik=
github.com/ssgreg/nlreturn/v2 v2.2.1 h1:X4XDI7jstt3ySqGU86YGAURbxw3oTDPK9sPEi6YEwQ0=
github.com/ssgreg/nlreturn/v2 v2.2.1/go.mod h1:E/iiPB78hV7Szg2YfRgyIrk1AD6JVMTRkkxBiELzh2I=
github.com/stbenjam/no-sprintf-host-port v0.2.0 h1:i8pxvGrt1+4G0czLr/WnmyH7zbZ8Bg8etvARQ1rpyl4=
Expand Down
17 changes: 17 additions & 0 deletions pkg/server/ca/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type AuthorityManager interface {
IsUpstreamAuthority() bool
PublishJWTKey(ctx context.Context, jwtKey *common.PublicKey) ([]*common.PublicKey, error)
NotifyTaintedX509Authority(ctx context.Context, authorityID string) error
SubscribeToLocalBundle(ctx context.Context) error
}

type Config struct {
Expand Down Expand Up @@ -447,6 +448,22 @@ func (m *Manager) PublishJWTKey(ctx context.Context, jwtKey *common.PublicKey) (
return bundle.JwtSigningKeys, nil
}

func (m *Manager) SubscribeToLocalBundle(ctx context.Context) error {
if m.upstreamClient == nil {
return nil
}

err := m.upstreamClient.SubscribeToLocalBundle(ctx)
switch {
case status.Code(err) == codes.Unimplemented:
return nil
case err != nil:
return err
default:
return nil
}
}

func (m *Manager) PruneBundle(ctx context.Context) (err error) {
counter := telemetry_server.StartCAManagerPruneBundleCall(m.c.Metrics)
defer counter.Done(&err)
Expand Down
31 changes: 31 additions & 0 deletions pkg/server/ca/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,37 @@ func TestUpstreamAuthorityWithPublishJWTKeyImplemented(t *testing.T) {
)
}

func TestUpstreamAuthorityWithSubscribeToBundleUpdate(t *testing.T) {
ctx := context.Background()
test := setupTest(t)
upstreamAuthority, ua := test.newFakeUpstreamAuthority(t, fakeupstreamauthority.Config{
TrustDomain: testTrustDomain,
UseSubscribeToLocalBundle: true,
UseIntermediate: true,
})
bundle := test.createBundle(ctx)
require.Len(t, bundle.JwtSigningKeys, 0)

test.initAndActivateUpstreamSignedManager(ctx, upstreamAuthority)

// X509 CA should be set up to be an intermediate and have two certs in
// its chain: itself and the upstream intermediate that signed it.
x509CA := test.currentX509CA()
assert.NotNil(t, x509CA.Signer)
if assert.NotNil(t, x509CA.Certificate) {
assert.Equal(t, ua.X509Intermediate().Subject, x509CA.Certificate.Issuer)
}
if assert.Len(t, x509CA.UpstreamChain, 2) {
assert.Equal(t, x509CA.Certificate, x509CA.UpstreamChain[0])
assert.Equal(t, ua.X509Intermediate(), x509CA.UpstreamChain[1])
}

// The trust bundle should contain the upstream root
test.requireBundleRootCAs(ctx, t, ua.X509Root())

spiretest.AssertProtoListEqual(t, ua.JWTKeys(), test.fetchBundle(ctx).JwtSigningKeys)
}

func TestX509CARotation(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
Expand Down
17 changes: 17 additions & 0 deletions pkg/server/ca/rotator/rotator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type CAManager interface {
ActivateJWTKey(ctx context.Context)
RotateJWTKey(ctx context.Context)

SubscribeToLocalBundle(ctx context.Context) error

PruneBundle(ctx context.Context) error
PruneCAJournals(ctx context.Context) error
}
Expand Down Expand Up @@ -77,10 +79,25 @@ func (r *Rotator) Run(ctx context.Context) error {
if err := r.c.Manager.NotifyBundleLoaded(ctx); err != nil {
return err
}

err := util.RunTasks(ctx,
func(ctx context.Context) error {
return r.rotateEvery(ctx, rotateInterval)
},
func(ctx context.Context) error {
var lastError error
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may we move this to Subscribe?
and only start when we really need to subscribe

for {
select {
case <-ctx.Done():
return lastError
case <-time.After(5 * time.Second):
lastError = r.c.Manager.SubscribeToLocalBundle(ctx)
if lastError == nil {
return nil
}
}
}
},
func(ctx context.Context) error {
return r.pruneBundleEvery(ctx, pruneBundleInterval)
},
Expand Down
4 changes: 4 additions & 0 deletions pkg/server/ca/rotator/rotator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,10 @@ func (f *fakeCAManager) RotateJWTKey(context.Context) {
f.jwtKeyCh <- struct{}{}
}

func (f *fakeCAManager) SubscribeToLocalBundle(ctx context.Context) error {
return nil
}

func (f *fakeCAManager) PruneBundle(context.Context) error {
defer func() {
f.pruneBundleCh <- struct{}{}
Expand Down
113 changes: 98 additions & 15 deletions pkg/server/ca/upstream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,21 @@ type UpstreamClientConfig struct {
type UpstreamClient struct {
c UpstreamClientConfig

mintX509CAMtx sync.Mutex
mintX509CAStream *streamState
publishJWTKeyMtx sync.Mutex
publishJWTKeyStream *streamState
mintX509CAMtx sync.Mutex
mintX509CAStream *streamState
publishJWTKeyMtx sync.Mutex
publishJWTKeyStream *streamState
subscribeToLocalBundleStreamMtx sync.Mutex
subscribeToLocalBundleStream *streamState
}

// NewUpstreamClient returns a new UpstreamAuthority plugin client.
func NewUpstreamClient(config UpstreamClientConfig) *UpstreamClient {
return &UpstreamClient{
c: config,
mintX509CAStream: newStreamState(),
publishJWTKeyStream: newStreamState(),
c: config,
mintX509CAStream: newStreamState(),
publishJWTKeyStream: newStreamState(),
subscribeToLocalBundleStream: newStreamState(),
}
}

Expand All @@ -67,6 +70,11 @@ func (u *UpstreamClient) Close() error {
defer u.publishJWTKeyMtx.Unlock()
u.publishJWTKeyStream.Stop()
}()
func() {
u.subscribeToLocalBundleStreamMtx.Lock()
defer u.subscribeToLocalBundleStreamMtx.Unlock()
u.subscribeToLocalBundleStream.Stop()
}()
return nil
}

Expand Down Expand Up @@ -96,11 +104,6 @@ func (u *UpstreamClient) MintX509CA(ctx context.Context, csr []byte, ttl time.Du
}
}

// WaitUntilMintX509CAStreamDone waits until the MintX509CA stream has stopped.
func (u *UpstreamClient) WaitUntilMintX509CAStreamDone(ctx context.Context) error {
return u.mintX509CAStream.WaitUntilStopped(ctx)
}

// PublishJWTKey publishes the JWT key to the UpstreamAuthority. It maintains
// an open stream to the UpstreamAuthority plugin to receive and append JWT key
// updates to the bundle. The stream remains open until another call to
Expand All @@ -127,9 +130,26 @@ func (u *UpstreamClient) PublishJWTKey(ctx context.Context, jwtKey *common.Publi
}
}

// WaitUntilPublishJWTKeyStreamDone waits until the MintX509CA stream has stopped.
func (u *UpstreamClient) WaitUntilPublishJWTKeyStreamDone(ctx context.Context) error {
return u.publishJWTKeyStream.WaitUntilStopped(ctx)
func (u *UpstreamClient) SubscribeToLocalBundle(ctx context.Context) (err error) {
u.subscribeToLocalBundleStreamMtx.Lock()
defer u.subscribeToLocalBundleStreamMtx.Unlock()

firstResultCh := make(chan bundleUpdatesResult, 1)
u.subscribeToLocalBundleStream.Start(func(streamCtx context.Context) {
u.runSubscribeToLocalBundleStream(streamCtx, firstResultCh)
})
defer func() {
if err != nil {
u.subscribeToLocalBundleStream.Stop()
}
}()

select {
case result := <-firstResultCh:
return result.err
case <-ctx.Done():
return ctx.Err()
}
}

func (u *UpstreamClient) runMintX509CAStream(ctx context.Context, csr []byte, ttl time.Duration, validateX509CA ValidateX509CAFunc, firstResultCh chan<- mintX509CAResult) {
Expand Down Expand Up @@ -223,6 +243,63 @@ func (u *UpstreamClient) runPublishJWTKeyStream(ctx context.Context, jwtKey *com
}
}

func (u *UpstreamClient) runSubscribeToLocalBundleStream(ctx context.Context, firstResultCh chan<- bundleUpdatesResult) {
x509CAs, jwtKeys, authorityStream, err := u.c.UpstreamAuthority.SubscribeToLocalBundle(ctx)
if err != nil {
firstResultCh <- bundleUpdatesResult{err: err}
return
}
defer authorityStream.Close()

err = u.c.BundleUpdater.SyncX509Roots(ctx, x509CAs)
if err != nil {
firstResultCh <- bundleUpdatesResult{err: err}
return
}
updatedKeys, err := u.c.BundleUpdater.AppendJWTKeys(ctx, jwtKeys)
if err != nil {
firstResultCh <- bundleUpdatesResult{err: err}
return
}

x509CA := []*x509.Certificate{}
for _, ca := range x509CAs {
x509CA = append(x509CA, ca.Certificate)
}

firstResultCh <- bundleUpdatesResult{
x509CA: x509CA,
jwtKeys: updatedKeys,
}

for {
x509CA, jwtKeys, err := authorityStream.RecvLocalBundleUpdate()
if err != nil {
switch {
case errors.Is(err, io.EOF):
// This is normal if the plugin does not support streaming
// bundle updates.
case status.Code(err) == codes.Canceled:
// This is normal. This client cancels this stream when opening
// a new stream.
default:
u.c.BundleUpdater.LogError(err, "The upstream authority plugin stopped streaming authorities updates prematurely. Please report this bug. Will retry later.")
}
return
}

if err := u.c.BundleUpdater.SyncX509Roots(ctx, x509CA); err != nil {
u.c.BundleUpdater.LogError(err, "Failed to store X.509 CAs received by the upstream authority plugin.")
continue
}

if _, err := u.c.BundleUpdater.AppendJWTKeys(ctx, jwtKeys); err != nil {
u.c.BundleUpdater.LogError(err, "Failed to store JWT keys received by the upstream authority plugin.")
continue
}
}
}

type mintX509CAResult struct {
x509CA []*x509.Certificate
err error
Expand All @@ -233,6 +310,12 @@ type publishJWTKeyResult struct {
err error
}

type bundleUpdatesResult struct {
x509CA []*x509.Certificate
jwtKeys []*common.PublicKey
err error
}

// streamState manages the state for open streams to the plugin that are
// receiving bundle updates. It is protected by the respective mutexes in
// the UpstreamClient.
Expand Down
37 changes: 37 additions & 0 deletions pkg/server/ca/upstream_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,43 @@ func TestUpstreamClientPublishJWTKey_NotImplemented(t *testing.T) {
require.Nil(t, jwtKeys)
}

func TestUpstreamClientSubscribeToLocalBundle(t *testing.T) {
client, updater, ua := setUpUpstreamClientTest(t, fakeupstreamauthority.Config{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
client, updater, ua := setUpUpstreamClientTest(t, fakeupstreamauthority.Config{
client, updater, ua := setupUpstreamClientTest(t, fakeupstreamauthority.Config{

TrustDomain: trustDomain,
UseSubscribeToLocalBundle: true,
})

err := client.SubscribeToLocalBundle(t.Context())
require.NoError(t, err)

// We should get an update with the initial CA and a list of empty JWT keys since
// the fakeupstreamauthority does not create one by default.
require.Equal(t, ua.X509Roots(), updater.WaitForAppendedX509Roots(t))
spiretest.RequireProtoListEqual(t, []*common.PublicKey{}, updater.WaitForAppendedJWTKeys(t))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

require.Empty?


// Trigger an update to the upstream bundle by rotating the root
// certificate and wait for the bundle updater to receive the update.
ua.RotateX509CA()
require.Equal(t, ua.X509Roots(), updater.WaitForAppendedX509Roots(t))
spiretest.RequireProtoListEqual(t, []*common.PublicKey{}, updater.WaitForAppendedJWTKeys(t))

key1 := makePublicKey(t, "KEY1")
ua.AppendJWTKey(key1)
require.Equal(t, ua.X509Roots(), updater.WaitForAppendedX509Roots(t))
spiretest.RequireProtoListEqual(t, []*common.PublicKey{key1}, updater.WaitForAppendedJWTKeys(t))

// Trigger an update to the upstream bundle by rotating the root
// certificate and wait for the bundle updater to receive the update.
ua.RotateX509CA()
require.Equal(t, ua.X509Roots(), updater.WaitForAppendedX509Roots(t))
spiretest.RequireProtoListEqual(t, []*common.PublicKey{key1}, updater.WaitForAppendedJWTKeys(t))

key2 := makePublicKey(t, "KEY2")
ua.AppendJWTKey(key2)
require.Equal(t, ua.X509Roots(), updater.WaitForAppendedX509Roots(t))
spiretest.RequireProtoListEqual(t, []*common.PublicKey{key1, key2}, updater.WaitForAppendedJWTKeys(t))
}

func setUpUpstreamClientTest(t *testing.T, config fakeupstreamauthority.Config) (*ca.UpstreamClient, *fakeBundleUpdater, *fakeupstreamauthority.UpstreamAuthority) {
plugin, upstreamAuthority := fakeupstreamauthority.Load(t, config)
updater := newFakeBundleUpdater()
Expand Down
4 changes: 4 additions & 0 deletions pkg/server/plugin/upstreamauthority/awspca/pca.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,10 @@ func (*PCAPlugin) PublishJWTKeyAndSubscribe(*upstreamauthorityv1.PublishJWTKeyRe
return status.Error(codes.Unimplemented, "publishing upstream is unsupported")
}

func (p *PCAPlugin) SubscribeToLocalBundle(req *upstreamauthorityv1.SubscribeToLocalBundleRequest, stream upstreamauthorityv1.UpstreamAuthority_SubscribeToLocalBundleServer) error {
return status.Error(codes.Unimplemented, "fetching upstream trust bundle is unsupported")
}

func (p *PCAPlugin) getConfig() (*configuration, error) {
p.mtx.Lock()
defer p.mtx.Unlock()
Expand Down
4 changes: 4 additions & 0 deletions pkg/server/plugin/upstreamauthority/awssecret/awssecret.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ func (p *Plugin) PublishJWTKeyAndSubscribe(*upstreamauthorityv1.PublishJWTKeyReq
return status.Error(codes.Unimplemented, "publishing upstream is unsupported")
}

func (p *Plugin) SubscribeToLocalBundle(req *upstreamauthorityv1.SubscribeToLocalBundleRequest, stream upstreamauthorityv1.UpstreamAuthority_SubscribeToLocalBundleServer) error {
return status.Error(codes.Unimplemented, "fetching upstream trust bundle is unsupported")
}

func (p *Plugin) loadUpstreamCAAndCerts(trustDomain spiffeid.TrustDomain, keyPEMstr, certsPEMstr, bundleCertsPEMstr string) (*x509svid.UpstreamCA, []*x509.Certificate, []*x509.Certificate, error) {
key, err := pemutil.ParsePrivateKey([]byte(keyPEMstr))
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,10 @@ func (*Plugin) PublishJWTKeyAndSubscribe(*upstreamauthorityv1.PublishJWTKeyReque
return status.Error(codes.Unimplemented, "publishing upstream is unsupported")
}

func (p *Plugin) SubscribeToLocalBundle(req *upstreamauthorityv1.SubscribeToLocalBundleRequest, stream upstreamauthorityv1.UpstreamAuthority_SubscribeToLocalBundleServer) error {
return status.Error(codes.Unimplemented, "fetching upstream trust bundle is unsupported")
}

func newCertManagerClient(configPath string) (client.Client, error) {
config, err := getKubeConfig(configPath)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/server/plugin/upstreamauthority/disk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ func (*Plugin) PublishJWTKeyAndSubscribe(*upstreamauthorityv1.PublishJWTKeyReque
return status.Error(codes.Unimplemented, "publishing upstream is unsupported")
}

func (p *Plugin) SubscribeToLocalBundle(req *upstreamauthorityv1.SubscribeToLocalBundleRequest, stream upstreamauthorityv1.UpstreamAuthority_SubscribeToLocalBundleServer) error {
return status.Error(codes.Unimplemented, "fetching upstream trust bundle is unsupported")
}

func (p *Plugin) reloadCA() (*x509svid.UpstreamCA, *caCerts, error) {
p.mtx.Lock()
defer p.mtx.Unlock()
Expand Down
6 changes: 5 additions & 1 deletion pkg/server/plugin/upstreamauthority/ejbca/ejbca.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,10 +319,14 @@ func (p *Plugin) MintX509CAAndSubscribe(req *upstreamauthorityv1.MintX509CAReque
}

// The EJBCA UpstreamAuthority plugin does not support publishing JWT keys.
func (p *Plugin) PublishJWTKeyAndSubscribe(_ *upstreamauthorityv1.PublishJWTKeyRequest, _ upstreamauthorityv1.UpstreamAuthority_PublishJWTKeyAndSubscribeServer) error {
func (p *Plugin) PublishJWTKeyAndSubscribe(*upstreamauthorityv1.PublishJWTKeyRequest, upstreamauthorityv1.UpstreamAuthority_PublishJWTKeyAndSubscribeServer) error {
return status.Error(codes.Unimplemented, "publishing JWT keys is not supported by the EJBCA UpstreamAuthority plugin")
}

func (p *Plugin) SubscribeToLocalBundle(req *upstreamauthorityv1.SubscribeToLocalBundleRequest, stream upstreamauthorityv1.UpstreamAuthority_SubscribeToLocalBundleServer) error {
return status.Error(codes.Unimplemented, "fetching upstream trust bundle is unsupported")
}

// setConfig replaces the configuration atomically under a write lock.
func (p *Plugin) setConfig(config *Config) {
p.configMtx.Lock()
Expand Down
Loading
Loading