Skip to content

Commit ed38d5f

Browse files
authored
Enable clients to set flags (#286)
Updates #198 This PR adds a new method to the OpAMPClient that allows clients to set their own flags (such as `RequestInstanceUid`).
1 parent 8f7a652 commit ed38d5f

File tree

8 files changed

+204
-18
lines changed

8 files changed

+204
-18
lines changed

client/client.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,16 @@ type OpAMPClient interface {
107107
// for more details.
108108
SetCustomCapabilities(customCapabilities *protobufs.CustomCapabilities) error
109109

110+
// SetFlags modifies the set of flags supported by the client.
111+
// May be called before or after Start(), including from OnMessage handler.
112+
// The zero value of protobufs.AgentToServerFlags corresponds to FlagsUnspecified
113+
// and is safe to use.
114+
//
115+
// See
116+
// https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#agenttoserverflags
117+
// for more details.
118+
SetFlags(flags protobufs.AgentToServerFlags)
119+
110120
// SendCustomMessage sends the custom message to the Server. May be called anytime after
111121
// Start(), including from OnMessage handler.
112122
//

client/clientimpl_test.go

Lines changed: 112 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func eventually(t *testing.T, f func() bool) {
9393
assert.Eventually(t, f, 5*time.Second, 10*time.Millisecond)
9494
}
9595

96-
func newInstanceUid(t *testing.T) types.InstanceUid {
96+
func genNewInstanceUid(t *testing.T) types.InstanceUid {
9797
uid, err := uuid.NewV7()
9898
require.NoError(t, err)
9999
b, err := uid.MarshalBinary()
@@ -103,7 +103,7 @@ func newInstanceUid(t *testing.T) types.InstanceUid {
103103

104104
func prepareSettings(t *testing.T, settings *types.StartSettings, c OpAMPClient) {
105105
// Autogenerate instance id.
106-
settings.InstanceUid = newInstanceUid(t)
106+
settings.InstanceUid = genNewInstanceUid(t)
107107

108108
// Make sure correct URL scheme is used, based on the type of the OpAMP client.
109109
u, err := url.Parse(settings.OpAMPServerURL)
@@ -630,27 +630,24 @@ func TestAgentIdentification(t *testing.T) {
630630
testClients(t, func(t *testing.T, client OpAMPClient) {
631631
// Start a server.
632632
srv := internal.StartMockServer(t)
633-
newInstanceUid := newInstanceUid(t)
633+
newInstanceUid := genNewInstanceUid(t)
634634
var rcvAgentInstanceUid atomic.Value
635-
var sentInvalidId atomic.Bool
636635
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
637-
rcvAgentInstanceUid.Store(msg.InstanceUid)
638-
if sentInvalidId.Load() {
636+
if msg.Flags&uint64(protobufs.AgentToServerFlags_AgentToServerFlags_RequestInstanceUid) == 1 {
637+
newInstanceUid = genNewInstanceUid(t)
638+
rcvAgentInstanceUid.Store(newInstanceUid[:])
639639
return &protobufs.ServerToAgent{
640640
InstanceUid: msg.InstanceUid,
641641
AgentIdentification: &protobufs.AgentIdentification{
642-
// If we sent the invalid one first, send a valid one now
642+
// If the RequestInstanceUid flag was set, populate this field.
643643
NewInstanceUid: newInstanceUid[:],
644644
},
645645
}
646646
}
647-
sentInvalidId.Store(true)
647+
rcvAgentInstanceUid.Store(msg.InstanceUid)
648+
// Start by sending just the old instance ID.
648649
return &protobufs.ServerToAgent{
649650
InstanceUid: msg.InstanceUid,
650-
AgentIdentification: &protobufs.AgentIdentification{
651-
// Start by sending an invalid id forcing an error.
652-
NewInstanceUid: nil,
653-
},
654651
}
655652
}
656653

@@ -689,8 +686,8 @@ func TestAgentIdentification(t *testing.T) {
689686
},
690687
)
691688

692-
// Send a dummy message again to get the _new_ id
693-
_ = client.SetAgentDescription(createAgentDescr())
689+
// Set the flags to request a new ID.
690+
client.SetFlags(protobufs.AgentToServerFlags_AgentToServerFlags_RequestInstanceUid)
694691

695692
// When it was sent, the new instance uid should have been used, which should
696693
// have been observed by the Server
@@ -2122,3 +2119,104 @@ func TestSetCustomCapabilities(t *testing.T) {
21222119
assert.NoError(t, err)
21232120
})
21242121
}
2122+
2123+
// TestSetFlags tests the ability for the client to change the set of flags it sends.
2124+
func TestSetFlags(t *testing.T) {
2125+
testClients(t, func(t *testing.T, client OpAMPClient) {
2126+
2127+
// Start a Server.
2128+
srv := internal.StartMockServer(t)
2129+
var rcvCustomFlags atomic.Value
2130+
var flags protobufs.AgentToServerFlags
2131+
2132+
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
2133+
if msg.Flags != 0 {
2134+
rcvCustomFlags.Store(msg.Flags)
2135+
}
2136+
return nil
2137+
}
2138+
2139+
settings := types.StartSettings{}
2140+
settings.OpAMPServerURL = "ws://" + srv.Endpoint
2141+
prepareClient(t, &settings, client)
2142+
2143+
assert.NoError(t, client.Start(context.Background(), settings))
2144+
2145+
// The zero value of AgentToServerFlags is ready to use
2146+
client.SetFlags(flags)
2147+
2148+
// Update flags to send
2149+
flags |= protobufs.AgentToServerFlags_AgentToServerFlags_RequestInstanceUid
2150+
client.SetFlags(flags)
2151+
2152+
// Verify new flags were delivered to the server
2153+
eventually(
2154+
t,
2155+
func() bool {
2156+
msg, ok := rcvCustomFlags.Load().(uint64)
2157+
if !ok || msg == 0 {
2158+
return false
2159+
}
2160+
return uint64(flags) == msg
2161+
},
2162+
)
2163+
2164+
// Shutdown the Server.
2165+
srv.Close()
2166+
2167+
// Shutdown the client.
2168+
err := client.Stop(context.Background())
2169+
assert.NoError(t, err)
2170+
})
2171+
}
2172+
2173+
// TestSetFlags tests the ability for the client to set its flags before starting up.
2174+
func TestSetFlagsBeforeStart(t *testing.T) {
2175+
testClients(t, func(t *testing.T, client OpAMPClient) {
2176+
// Start a Server.
2177+
flags := protobufs.AgentToServerFlags_AgentToServerFlags_RequestInstanceUid
2178+
srv := internal.StartMockServer(t)
2179+
var rcvCustomFlags atomic.Value
2180+
var isFirstMessage atomic.Bool
2181+
isFirstMessage.Store(true)
2182+
2183+
// Make sure we only record flags from the very first message.
2184+
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
2185+
if isFirstMessage.Load() {
2186+
rcvCustomFlags.Store(msg.Flags)
2187+
}
2188+
isFirstMessage.Store(false)
2189+
return nil
2190+
}
2191+
2192+
settings := types.StartSettings{}
2193+
settings.OpAMPServerURL = "ws://" + srv.Endpoint
2194+
prepareClient(t, &settings, client)
2195+
2196+
// Set up the flags _before_ calling Start to verify that they're
2197+
// handled correctly in PrepareFirstMessage.
2198+
client.SetFlags(flags)
2199+
2200+
// Start the client.
2201+
assert.NoError(t, client.Start(context.Background(), settings))
2202+
2203+
// Verify the flags were delivered to the server during the first message.
2204+
eventually(
2205+
t,
2206+
func() bool {
2207+
msg, ok := rcvCustomFlags.Load().(uint64)
2208+
if !ok || msg == 0 {
2209+
return false
2210+
}
2211+
return uint64(flags) == msg
2212+
},
2213+
)
2214+
2215+
// Shutdown the Server.
2216+
srv.Close()
2217+
2218+
// Shutdown the client.
2219+
err := client.Stop(context.Background())
2220+
assert.NoError(t, err)
2221+
})
2222+
}

client/httpclient.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,11 +105,16 @@ func (c *httpClient) SetPackageStatuses(statuses *protobufs.PackageStatuses) err
105105
return c.common.SetPackageStatuses(statuses)
106106
}
107107

108-
// SendCustomMessage implements OpAMPClient.SetCustomCapabilities.
108+
// SendCustomCapabilities implements OpAMPClient.SetCustomCapabilities.
109109
func (c *httpClient) SetCustomCapabilities(customCapabilities *protobufs.CustomCapabilities) error {
110110
return c.common.SetCustomCapabilities(customCapabilities)
111111
}
112112

113+
// SetFlags implements OpAMPClient.SetFlags.
114+
func (c *httpClient) SetFlags(flags protobufs.AgentToServerFlags) {
115+
c.common.SetFlags(flags)
116+
}
117+
113118
// SendCustomMessage implements OpAMPClient.SendCustomMessage.
114119
func (c *httpClient) SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) {
115120
return c.common.SendCustomMessage(message)

client/internal/clientcommon.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ func (c *ClientCommon) PrepareFirstMessage(ctx context.Context) error {
214214
msg.PackageStatuses = c.ClientSyncedState.PackageStatuses()
215215
msg.Capabilities = uint64(c.Capabilities)
216216
msg.CustomCapabilities = c.ClientSyncedState.CustomCapabilities()
217+
msg.Flags = c.ClientSyncedState.Flags()
217218
},
218219
)
219220
return nil
@@ -385,6 +386,19 @@ func (c *ClientCommon) SetCustomCapabilities(customCapabilities *protobufs.Custo
385386
return nil
386387
}
387388

389+
func (c *ClientCommon) SetFlags(flags protobufs.AgentToServerFlags) {
390+
// store the flags to send
391+
c.ClientSyncedState.SetFlags(flags)
392+
393+
// send the new flags to the Server
394+
c.sender.NextMessage().Update(
395+
func(msg *protobufs.AgentToServer) {
396+
msg.Flags = uint64(flags)
397+
},
398+
)
399+
c.sender.ScheduleSend()
400+
}
401+
388402
// SendCustomMessage sends the specified custom message to the server.
389403
func (c *ClientCommon) SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) {
390404
if message == nil {

client/internal/clientstate.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ var (
1717
)
1818

1919
// ClientSyncedState stores the state of the Agent messages that the OpAMP Client needs to
20-
// have access to synchronize to the Server. 5 messages can be stored in this store:
21-
// AgentDescription, ComponentHealth, RemoteConfigStatus, PackageStatuses and CustomCapabilities.
20+
// have access to synchronize to the Server. Six messages can be stored in this store:
21+
// AgentDescription, ComponentHealth, RemoteConfigStatus, PackageStatuses, CustomCapabilities and Flags.
2222
//
2323
// See OpAMP spec for more details on how status reporting works:
2424
// https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#status-reporting
@@ -39,6 +39,7 @@ type ClientSyncedState struct {
3939
remoteConfigStatus *protobufs.RemoteConfigStatus
4040
packageStatuses *protobufs.PackageStatuses
4141
customCapabilities *protobufs.CustomCapabilities
42+
flags protobufs.AgentToServerFlags
4243
}
4344

4445
func (s *ClientSyncedState) AgentDescription() *protobufs.AgentDescription {
@@ -71,6 +72,12 @@ func (s *ClientSyncedState) CustomCapabilities() *protobufs.CustomCapabilities {
7172
return s.customCapabilities
7273
}
7374

75+
func (s *ClientSyncedState) Flags() uint64 {
76+
defer s.mutex.Unlock()
77+
s.mutex.Lock()
78+
return uint64(s.flags)
79+
}
80+
7481
// SetAgentDescription sets the AgentDescription in the state.
7582
func (s *ClientSyncedState) SetAgentDescription(descr *protobufs.AgentDescription) error {
7683
if descr == nil {
@@ -168,3 +175,11 @@ func (s *ClientSyncedState) HasCustomCapability(capability string) bool {
168175

169176
return false
170177
}
178+
179+
// SetFlags sets the flags in the state.
180+
func (s *ClientSyncedState) SetFlags(flags protobufs.AgentToServerFlags) {
181+
defer s.mutex.Unlock()
182+
s.mutex.Lock()
183+
184+
s.flags = flags
185+
}

client/internal/httpsender_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,3 +172,39 @@ func TestHTTPSenderRetryForFailedRequests(t *testing.T) {
172172
cancel()
173173
srv.Close()
174174
}
175+
176+
func TestRequestInstanceUidFlagReset(t *testing.T) {
177+
ctx, cancel := context.WithCancel(context.Background())
178+
179+
sender := NewHTTPSender(&sharedinternal.NopLogger{})
180+
sender.callbacks = types.CallbacksStruct{}
181+
182+
// Set the RequestInstanceUid flag on the tracked state to request the server for a new ID to use.
183+
clientSyncedState := &ClientSyncedState{}
184+
clientSyncedState.SetFlags(protobufs.AgentToServerFlags_AgentToServerFlags_RequestInstanceUid)
185+
capabilities := protobufs.AgentCapabilities_AgentCapabilities_Unspecified
186+
sender.receiveProcessor = newReceivedProcessor(&sharedinternal.NopLogger{}, sender.callbacks, sender, clientSyncedState, nil, capabilities)
187+
188+
// If we process a message with a nil AgentIdentification, or an incorrect NewInstanceUid.
189+
sender.receiveProcessor.ProcessReceivedMessage(ctx,
190+
&protobufs.ServerToAgent{
191+
AgentIdentification: nil,
192+
})
193+
sender.receiveProcessor.ProcessReceivedMessage(ctx,
194+
&protobufs.ServerToAgent{
195+
AgentIdentification: &protobufs.AgentIdentification{NewInstanceUid: []byte("foo")},
196+
})
197+
198+
// Then the RequestInstanceUid flag stays intact.
199+
assert.Equal(t, sender.receiveProcessor.clientSyncedState.flags, protobufs.AgentToServerFlags_AgentToServerFlags_RequestInstanceUid)
200+
201+
// If we process a message that contains a non-nil AgentIdentification that contains a NewInstanceUid.
202+
sender.receiveProcessor.ProcessReceivedMessage(ctx,
203+
&protobufs.ServerToAgent{
204+
AgentIdentification: &protobufs.AgentIdentification{NewInstanceUid: []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}},
205+
})
206+
207+
// Then the flag is reset so we don't request a new instance uid yet again.
208+
assert.Equal(t, sender.receiveProcessor.clientSyncedState.flags, protobufs.AgentToServerFlags_AgentToServerFlags_Unspecified)
209+
cancel()
210+
}

client/internal/receivedprocessor.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,8 +190,9 @@ func (r *receivedProcessor) rcvFlags(
190190
msg.RemoteConfigStatus = r.clientSyncedState.RemoteConfigStatus()
191191
msg.PackageStatuses = r.clientSyncedState.PackageStatuses()
192192
msg.CustomCapabilities = r.clientSyncedState.CustomCapabilities()
193+
msg.Flags = r.clientSyncedState.Flags()
193194

194-
// The logic for EffectiveConfig is similar to the previous 5 sub-messages however
195+
// The logic for EffectiveConfig is similar to the previous 6 sub-messages however
195196
// the EffectiveConfig is fetched using GetEffectiveConfig instead of
196197
// from clientSyncedState. We do this to avoid keeping EffectiveConfig in-memory.
197198
msg.EffectiveConfig = cfg
@@ -237,6 +238,9 @@ func (r *receivedProcessor) rcvAgentIdentification(ctx context.Context, agentId
237238
return err
238239
}
239240

241+
// If we set up a new instance ID, reset the RequestInstanceUid flag.
242+
r.clientSyncedState.flags &^= protobufs.AgentToServerFlags_AgentToServerFlags_RequestInstanceUid
243+
240244
return nil
241245
}
242246

client/wsclient.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,10 @@ func (c *wsClient) SetCustomCapabilities(customCapabilities *protobufs.CustomCap
129129
return c.common.SetCustomCapabilities(customCapabilities)
130130
}
131131

132+
func (c *wsClient) SetFlags(flags protobufs.AgentToServerFlags) {
133+
c.common.SetFlags(flags)
134+
}
135+
132136
func (c *wsClient) SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) {
133137
return c.common.SendCustomMessage(message)
134138
}

0 commit comments

Comments
 (0)