Skip to content

Commit 472ed91

Browse files
committed
Add UpstreamClient support for wathing upstream authorities
Signed-off-by: Sorin Dumitru <[email protected]>
1 parent d318264 commit 472ed91

File tree

2 files changed

+135
-15
lines changed

2 files changed

+135
-15
lines changed

pkg/server/ca/upstream_client.go

Lines changed: 98 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -39,18 +39,21 @@ type UpstreamClientConfig struct {
3939
type UpstreamClient struct {
4040
c UpstreamClientConfig
4141

42-
mintX509CAMtx sync.Mutex
43-
mintX509CAStream *streamState
44-
publishJWTKeyMtx sync.Mutex
45-
publishJWTKeyStream *streamState
42+
mintX509CAMtx sync.Mutex
43+
mintX509CAStream *streamState
44+
publishJWTKeyMtx sync.Mutex
45+
publishJWTKeyStream *streamState
46+
subscribeToLocalBundleStreamMtx sync.Mutex
47+
subscribeToLocalBundleStream *streamState
4648
}
4749

4850
// NewUpstreamClient returns a new UpstreamAuthority plugin client.
4951
func NewUpstreamClient(config UpstreamClientConfig) *UpstreamClient {
5052
return &UpstreamClient{
51-
c: config,
52-
mintX509CAStream: newStreamState(),
53-
publishJWTKeyStream: newStreamState(),
53+
c: config,
54+
mintX509CAStream: newStreamState(),
55+
publishJWTKeyStream: newStreamState(),
56+
subscribeToLocalBundleStream: newStreamState(),
5457
}
5558
}
5659

@@ -67,6 +70,11 @@ func (u *UpstreamClient) Close() error {
6770
defer u.publishJWTKeyMtx.Unlock()
6871
u.publishJWTKeyStream.Stop()
6972
}()
73+
func() {
74+
u.subscribeToLocalBundleStreamMtx.Lock()
75+
defer u.subscribeToLocalBundleStreamMtx.Unlock()
76+
u.subscribeToLocalBundleStream.Stop()
77+
}()
7078
return nil
7179
}
7280

@@ -96,11 +104,6 @@ func (u *UpstreamClient) MintX509CA(ctx context.Context, csr []byte, ttl time.Du
96104
}
97105
}
98106

99-
// WaitUntilMintX509CAStreamDone waits until the MintX509CA stream has stopped.
100-
func (u *UpstreamClient) WaitUntilMintX509CAStreamDone(ctx context.Context) error {
101-
return u.mintX509CAStream.WaitUntilStopped(ctx)
102-
}
103-
104107
// PublishJWTKey publishes the JWT key to the UpstreamAuthority. It maintains
105108
// an open stream to the UpstreamAuthority plugin to receive and append JWT key
106109
// updates to the bundle. The stream remains open until another call to
@@ -127,9 +130,26 @@ func (u *UpstreamClient) PublishJWTKey(ctx context.Context, jwtKey *common.Publi
127130
}
128131
}
129132

130-
// WaitUntilPublishJWTKeyStreamDone waits until the MintX509CA stream has stopped.
131-
func (u *UpstreamClient) WaitUntilPublishJWTKeyStreamDone(ctx context.Context) error {
132-
return u.publishJWTKeyStream.WaitUntilStopped(ctx)
133+
func (u *UpstreamClient) SubscribeToLocalBundle(ctx context.Context) (err error) {
134+
u.subscribeToLocalBundleStreamMtx.Lock()
135+
defer u.subscribeToLocalBundleStreamMtx.Unlock()
136+
137+
firstResultCh := make(chan bundleUpdatesResult, 1)
138+
u.subscribeToLocalBundleStream.Start(func(streamCtx context.Context) {
139+
u.runSubscribeToLocalBundleStream(streamCtx, firstResultCh)
140+
})
141+
defer func() {
142+
if err != nil {
143+
u.subscribeToLocalBundleStream.Stop()
144+
}
145+
}()
146+
147+
select {
148+
case result := <-firstResultCh:
149+
return result.err
150+
case <-ctx.Done():
151+
return ctx.Err()
152+
}
133153
}
134154

135155
func (u *UpstreamClient) runMintX509CAStream(ctx context.Context, csr []byte, ttl time.Duration, validateX509CA ValidateX509CAFunc, firstResultCh chan<- mintX509CAResult) {
@@ -223,6 +243,63 @@ func (u *UpstreamClient) runPublishJWTKeyStream(ctx context.Context, jwtKey *com
223243
}
224244
}
225245

246+
func (u *UpstreamClient) runSubscribeToLocalBundleStream(ctx context.Context, firstResultCh chan<- bundleUpdatesResult) {
247+
x509CAs, jwtKeys, authorityStream, err := u.c.UpstreamAuthority.SubscribeToLocalBundle(ctx)
248+
if err != nil {
249+
firstResultCh <- bundleUpdatesResult{err: err}
250+
return
251+
}
252+
defer authorityStream.Close()
253+
254+
err = u.c.BundleUpdater.SyncX509Roots(ctx, x509CAs)
255+
if err != nil {
256+
firstResultCh <- bundleUpdatesResult{err: err}
257+
return
258+
}
259+
updatedKeys, err := u.c.BundleUpdater.AppendJWTKeys(ctx, jwtKeys)
260+
if err != nil {
261+
firstResultCh <- bundleUpdatesResult{err: err}
262+
return
263+
}
264+
265+
x509CA := []*x509.Certificate{}
266+
for _, ca := range x509CAs {
267+
x509CA = append(x509CA, ca.Certificate)
268+
}
269+
270+
firstResultCh <- bundleUpdatesResult{
271+
x509CA: x509CA,
272+
jwtKeys: updatedKeys,
273+
}
274+
275+
for {
276+
x509CA, jwtKeys, err := authorityStream.RecvLocalBundleUpdate()
277+
if err != nil {
278+
switch {
279+
case errors.Is(err, io.EOF):
280+
// This is normal if the plugin does not support streaming
281+
// bundle updates.
282+
case status.Code(err) == codes.Canceled:
283+
// This is normal. This client cancels this stream when opening
284+
// a new stream.
285+
default:
286+
u.c.BundleUpdater.LogError(err, "The upstream authority plugin stopped streaming authorities updates prematurely. Please report this bug. Will retry later.")
287+
}
288+
return
289+
}
290+
291+
if err := u.c.BundleUpdater.SyncX509Roots(ctx, x509CA); err != nil {
292+
u.c.BundleUpdater.LogError(err, "Failed to store X.509 CAs received by the upstream authority plugin.")
293+
continue
294+
}
295+
296+
if _, err := u.c.BundleUpdater.AppendJWTKeys(ctx, jwtKeys); err != nil {
297+
u.c.BundleUpdater.LogError(err, "Failed to store JWT keys received by the upstream authority plugin.")
298+
continue
299+
}
300+
}
301+
}
302+
226303
type mintX509CAResult struct {
227304
x509CA []*x509.Certificate
228305
err error
@@ -233,6 +310,12 @@ type publishJWTKeyResult struct {
233310
err error
234311
}
235312

313+
type bundleUpdatesResult struct {
314+
x509CA []*x509.Certificate
315+
jwtKeys []*common.PublicKey
316+
err error
317+
}
318+
236319
// streamState manages the state for open streams to the plugin that are
237320
// receiving bundle updates. It is protected by the respective mutexes in
238321
// the UpstreamClient.

pkg/server/ca/upstream_client_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,43 @@ func TestUpstreamClientPublishJWTKey_NotImplemented(t *testing.T) {
151151
require.Nil(t, jwtKeys)
152152
}
153153

154+
func TestUpstreamClientSubscribeToLocalBundle(t *testing.T) {
155+
client, updater, ua := setUpUpstreamClientTest(t, fakeupstreamauthority.Config{
156+
TrustDomain: trustDomain,
157+
UseSubscribeToLocalBundle: true,
158+
})
159+
160+
err := client.SubscribeToLocalBundle(t.Context())
161+
require.NoError(t, err)
162+
163+
// We should get an update with the initial CA and a list of empty JWT keys since
164+
// the fakeupstreamauthority does not create one by default.
165+
require.Equal(t, ua.X509Roots(), updater.WaitForAppendedX509Roots(t))
166+
spiretest.RequireProtoListEqual(t, []*common.PublicKey{}, updater.WaitForAppendedJWTKeys(t))
167+
168+
// Trigger an update to the upstream bundle by rotating the root
169+
// certificate and wait for the bundle updater to receive the update.
170+
ua.RotateX509CA()
171+
require.Equal(t, ua.X509Roots(), updater.WaitForAppendedX509Roots(t))
172+
spiretest.RequireProtoListEqual(t, []*common.PublicKey{}, updater.WaitForAppendedJWTKeys(t))
173+
174+
key1 := makePublicKey(t, "KEY1")
175+
ua.AppendJWTKey(key1)
176+
require.Equal(t, ua.X509Roots(), updater.WaitForAppendedX509Roots(t))
177+
spiretest.RequireProtoListEqual(t, []*common.PublicKey{key1}, updater.WaitForAppendedJWTKeys(t))
178+
179+
// Trigger an update to the upstream bundle by rotating the root
180+
// certificate and wait for the bundle updater to receive the update.
181+
ua.RotateX509CA()
182+
require.Equal(t, ua.X509Roots(), updater.WaitForAppendedX509Roots(t))
183+
spiretest.RequireProtoListEqual(t, []*common.PublicKey{key1}, updater.WaitForAppendedJWTKeys(t))
184+
185+
key2 := makePublicKey(t, "KEY2")
186+
ua.AppendJWTKey(key2)
187+
require.Equal(t, ua.X509Roots(), updater.WaitForAppendedX509Roots(t))
188+
spiretest.RequireProtoListEqual(t, []*common.PublicKey{key1, key2}, updater.WaitForAppendedJWTKeys(t))
189+
}
190+
154191
func setUpUpstreamClientTest(t *testing.T, config fakeupstreamauthority.Config) (*ca.UpstreamClient, *fakeBundleUpdater, *fakeupstreamauthority.UpstreamAuthority) {
155192
plugin, upstreamAuthority := fakeupstreamauthority.Load(t, config)
156193
updater := newFakeBundleUpdater()

0 commit comments

Comments
 (0)