Skip to content

Commit

Permalink
Eliminate Callbacks interface
Browse files Browse the repository at this point in the history
The interface has the following downsides:
- Impossible to define non-trivial default behavior. Here is an example where it was needed: open-telemetry#269 (comment)
- Adding new callbacks requires expanding the interface, which is a breaking change for existing client users.

Getting rid of the interface and keeping just a struct
for callbacks solves both problems:
- Arbitrarily complex default behavior can be now defined on the struct if the user does not provide the particular callback func.
- Adding new callback funcs is not a braking change, existing users won't be affected.
  • Loading branch information
tigrannajaryan committed Dec 18, 2024
1 parent f7d56df commit 2204615
Show file tree
Hide file tree
Showing 10 changed files with 195 additions and 255 deletions.
72 changes: 36 additions & 36 deletions client/clientimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ func TestOnConnectFail(t *testing.T) {
testClients(t, func(t *testing.T, client OpAMPClient) {
var connectErr atomic.Value
settings := createNoServerSettings()
settings.Callbacks = types.CallbacksStruct{
OnConnectFailedFunc: func(ctx context.Context, err error) {
settings.Callbacks = types.Callbacks{
OnConnectFailed: func(ctx context.Context, err error) {
connectErr.Store(err)
},
}
Expand Down Expand Up @@ -244,8 +244,8 @@ func TestConnectWithServer(t *testing.T) {
// Start a client.
var connected int64
settings := types.StartSettings{
Callbacks: types.CallbacksStruct{
OnConnectFunc: func(ctx context.Context) {
Callbacks: types.Callbacks{
OnConnect: func(ctx context.Context) {
atomic.StoreInt64(&connected, 1)
},
},
Expand Down Expand Up @@ -282,12 +282,12 @@ func TestConnectWithServer503(t *testing.T) {
var clientConnected int64
var connectErr atomic.Value
settings := types.StartSettings{
Callbacks: types.CallbacksStruct{
OnConnectFunc: func(ctx context.Context) {
Callbacks: types.Callbacks{
OnConnect: func(ctx context.Context) {
atomic.StoreInt64(&clientConnected, 1)
assert.Fail(t, "Client should not be able to connect")
},
OnConnectFailedFunc: func(ctx context.Context, err error) {
OnConnectFailed: func(ctx context.Context, err error) {
connectErr.Store(err)
},
},
Expand Down Expand Up @@ -484,11 +484,11 @@ func TestFirstStatusReport(t *testing.T) {
// Start a client.
var connected, remoteConfigReceived int64
settings := types.StartSettings{
Callbacks: types.CallbacksStruct{
OnConnectFunc: func(ctx context.Context) {
Callbacks: types.Callbacks{
OnConnect: func(ctx context.Context) {
atomic.AddInt64(&connected, 1)
},
OnMessageFunc: func(ctx context.Context, msg *types.MessageData) {
OnMessage: func(ctx context.Context, msg *types.MessageData) {
// Verify that the client received exactly the remote config that
// the Server sent.
assert.True(t, proto.Equal(remoteConfig, msg.RemoteConfig))
Expand Down Expand Up @@ -537,8 +537,8 @@ func TestIncludesDetailsOnReconnect(t *testing.T) {

var connected int64
settings := types.StartSettings{
Callbacks: types.CallbacksStruct{
OnConnectFunc: func(ctx context.Context) {
Callbacks: types.Callbacks{
OnConnect: func(ctx context.Context) {
atomic.AddInt64(&connected, 1)
},
},
Expand Down Expand Up @@ -589,8 +589,8 @@ func TestSetEffectiveConfig(t *testing.T) {
// Start a client.
sendConfig := createEffectiveConfig()
settings := types.StartSettings{
Callbacks: types.CallbacksStruct{
GetEffectiveConfigFunc: func(ctx context.Context) (*protobufs.EffectiveConfig, error) {
Callbacks: types.Callbacks{
GetEffectiveConfig: func(ctx context.Context) (*protobufs.EffectiveConfig, error) {
return sendConfig, nil
},
},
Expand Down Expand Up @@ -822,8 +822,8 @@ func TestServerOfferConnectionSettings(t *testing.T) {

// Start a client.
settings := types.StartSettings{
Callbacks: types.CallbacksStruct{
OnMessageFunc: func(ctx context.Context, msg *types.MessageData) {
Callbacks: types.Callbacks{
OnMessage: func(ctx context.Context, msg *types.MessageData) {
assert.True(t, proto.Equal(metricsSettings, msg.OwnMetricsConnSettings))
assert.True(t, proto.Equal(tracesSettings, msg.OwnTracesConnSettings))
assert.True(t, proto.Equal(logsSettings, msg.OwnLogsConnSettings))
Expand All @@ -834,7 +834,7 @@ func TestServerOfferConnectionSettings(t *testing.T) {
atomic.AddInt64(&gotOtherSettings, 1)
},

OnOpampConnectionSettingsFunc: func(
OnOpampConnectionSettings: func(
ctx context.Context, settings *protobufs.OpAMPConnectionSettings,
) error {
assert.True(t, proto.Equal(opampSettings, settings))
Expand Down Expand Up @@ -891,8 +891,8 @@ func TestClientRequestConnectionSettings(t *testing.T) {

// Start a client.
settings := types.StartSettings{
Callbacks: types.CallbacksStruct{
OnOpampConnectionSettingsFunc: func(
Callbacks: types.Callbacks{
OnOpampConnectionSettings: func(
ctx context.Context, settings *protobufs.OpAMPConnectionSettings,
) error {
assert.True(t, proto.Equal(opampSettings, settings))
Expand Down Expand Up @@ -1073,8 +1073,8 @@ func TestReportEffectiveConfig(t *testing.T) {
// Start a client.
settings := types.StartSettings{
OpAMPServerURL: "ws://" + srv.Endpoint,
Callbacks: types.CallbacksStruct{
GetEffectiveConfigFunc: func(ctx context.Context) (*protobufs.EffectiveConfig, error) {
Callbacks: types.Callbacks{
GetEffectiveConfig: func(ctx context.Context) (*protobufs.EffectiveConfig, error) {
return clientEffectiveConfig, nil
},
},
Expand Down Expand Up @@ -1139,8 +1139,8 @@ func verifyRemoteConfigUpdate(t *testing.T, successCase bool, expectStatus *prot
// Start a client.
settings := types.StartSettings{
OpAMPServerURL: "ws://" + srv.Endpoint,
Callbacks: types.CallbacksStruct{
OnMessageFunc: func(ctx context.Context, msg *types.MessageData) {
Callbacks: types.Callbacks{
OnMessage: func(ctx context.Context, msg *types.MessageData) {
if msg.RemoteConfig != nil {
if successCase {
client.SetRemoteConfigStatus(
Expand Down Expand Up @@ -1355,8 +1355,8 @@ func verifyUpdatePackages(t *testing.T, testCase packageTestCase) {
// Start a client.
settings := types.StartSettings{
OpAMPServerURL: "ws://" + srv.Endpoint,
Callbacks: types.CallbacksStruct{
OnMessageFunc: onMessageFunc,
Callbacks: types.Callbacks{
OnMessage: onMessageFunc,
},
PackagesStateProvider: localPackageState,
Capabilities: protobufs.AgentCapabilities_AgentCapabilities_AcceptsPackages |
Expand Down Expand Up @@ -1542,8 +1542,8 @@ func TestMissingCapabilities(t *testing.T) {

// Start a client.
settings := types.StartSettings{
Callbacks: types.CallbacksStruct{
OnMessageFunc: func(ctx context.Context, msg *types.MessageData) {
Callbacks: types.Callbacks{
OnMessage: func(ctx context.Context, msg *types.MessageData) {
// These fields must not be set since we did not define the capabilities to accept them.
assert.Nil(t, msg.RemoteConfig)
assert.Nil(t, msg.OwnLogsConnSettings)
Expand All @@ -1552,7 +1552,7 @@ func TestMissingCapabilities(t *testing.T) {
assert.Nil(t, msg.OtherConnSettings)
assert.Nil(t, msg.PackagesAvailable)
},
OnOpampConnectionSettingsFunc: func(
OnOpampConnectionSettings: func(
ctx context.Context, settings *protobufs.OpAMPConnectionSettings,
) error {
assert.Fail(t, "should not be called since capability is not set to accept it")
Expand Down Expand Up @@ -1613,7 +1613,7 @@ func TestMissingPackagesStateProvider(t *testing.T) {
testClients(t, func(t *testing.T, client OpAMPClient) {
// Start a client.
settings := types.StartSettings{
Callbacks: types.CallbacksStruct{},
Callbacks: types.Callbacks{},
Capabilities: protobufs.AgentCapabilities_AgentCapabilities_AcceptsPackages |
protobufs.AgentCapabilities_AgentCapabilities_ReportsPackageStatuses,
}
Expand All @@ -1624,7 +1624,7 @@ func TestMissingPackagesStateProvider(t *testing.T) {
// Start a client.
localPackageState := internal.NewInMemPackagesStore()
settings = types.StartSettings{
Callbacks: types.CallbacksStruct{},
Callbacks: types.Callbacks{},
PackagesStateProvider: localPackageState,
Capabilities: protobufs.AgentCapabilities_AgentCapabilities_AcceptsPackages,
}
Expand All @@ -1634,7 +1634,7 @@ func TestMissingPackagesStateProvider(t *testing.T) {

// Start a client.
settings = types.StartSettings{
Callbacks: types.CallbacksStruct{},
Callbacks: types.Callbacks{},
PackagesStateProvider: localPackageState,
Capabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsPackageStatuses,
}
Expand Down Expand Up @@ -1668,8 +1668,8 @@ func TestOfferUpdatedVersion(t *testing.T) {
// Start a client.
settings := types.StartSettings{
OpAMPServerURL: "ws://" + srv.Endpoint,
Callbacks: types.CallbacksStruct{
OnMessageFunc: onMessageFunc,
Callbacks: types.Callbacks{
OnMessage: onMessageFunc,
},
PackagesStateProvider: localPackageState,
Capabilities: protobufs.AgentCapabilities_AgentCapabilities_AcceptsPackages |
Expand Down Expand Up @@ -1747,8 +1747,8 @@ func TestReportCustomCapabilities(t *testing.T) {
// Start a client.
settings := types.StartSettings{
OpAMPServerURL: "ws://" + srv.Endpoint,
Callbacks: types.CallbacksStruct{
OnMessageFunc: func(ctx context.Context, msg *types.MessageData) {
Callbacks: types.Callbacks{
OnMessage: func(ctx context.Context, msg *types.MessageData) {
clientRcvCustomMessage.Store(msg.CustomMessage)
},
},
Expand Down Expand Up @@ -1843,7 +1843,7 @@ func TestReportCustomCapabilities(t *testing.T) {
func TestSendCustomMessage(t *testing.T) {
testClients(t, func(t *testing.T, client OpAMPClient) {
settings := types.StartSettings{
Callbacks: types.CallbacksStruct{},
Callbacks: types.Callbacks{},
}
prepareClient(t, &settings, client)
clientCustomCapabilities := &protobufs.CustomCapabilities{
Expand Down
5 changes: 1 addition & 4 deletions client/internal/clientcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,7 @@ func (c *ClientCommon) PrepareStart(

// Prepare callbacks.
c.Callbacks = settings.Callbacks
if c.Callbacks == nil {
// Make sure it is always safe to call Callbacks.
c.Callbacks = types.CallbacksStruct{}
}
c.Callbacks.SetDefaults()

if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsHeartbeat != 0 && settings.HeartbeatInterval != nil {
if err := c.sender.SetHeartbeatInterval(*settings.HeartbeatInterval); err != nil {
Expand Down
26 changes: 14 additions & 12 deletions client/internal/httpsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/open-telemetry/opamp-go/client/types"
sharedinternal "github.com/open-telemetry/opamp-go/internal"
"github.com/open-telemetry/opamp-go/internal/testhelpers"
"github.com/open-telemetry/opamp-go/protobufs"
"github.com/stretchr/testify/assert"
)

func TestHTTPSenderRetryForStatusTooManyRequests(t *testing.T) {
Expand Down Expand Up @@ -46,10 +47,10 @@ func TestHTTPSenderRetryForStatusTooManyRequests(t *testing.T) {
}},
}
})
sender.callbacks = types.CallbacksStruct{
OnConnectFunc: func(ctx context.Context) {
sender.callbacks = types.Callbacks{
OnConnect: func(ctx context.Context) {
},
OnConnectFailedFunc: func(ctx context.Context, _ error) {
OnConnectFailed: func(ctx context.Context, _ error) {
},
}
sender.url = url
Expand Down Expand Up @@ -163,10 +164,10 @@ func TestHTTPSenderRetryForFailedRequests(t *testing.T) {
}},
}
})
sender.callbacks = types.CallbacksStruct{
OnConnectFunc: func(ctx context.Context) {
sender.callbacks = types.Callbacks{
OnConnect: func(ctx context.Context) {
},
OnConnectFailedFunc: func(ctx context.Context, _ error) {
OnConnectFailed: func(ctx context.Context, _ error) {
},
}
sender.url = url
Expand Down Expand Up @@ -197,7 +198,8 @@ func TestRequestInstanceUidFlagReset(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

sender := NewHTTPSender(&sharedinternal.NopLogger{})
sender.callbacks = types.CallbacksStruct{}
sender.callbacks = types.Callbacks{}
sender.callbacks.SetDefaults()

// Set the RequestInstanceUid flag on the tracked state to request the server for a new ID to use.
clientSyncedState := &ClientSyncedState{}
Expand Down Expand Up @@ -248,8 +250,8 @@ func TestPackageUpdatesInParallel(t *testing.T) {

var messages atomic.Int32
var mux sync.Mutex
sender.callbacks = types.CallbacksStruct{
OnMessageFunc: func(ctx context.Context, msg *types.MessageData) {
sender.callbacks = types.Callbacks{
OnMessage: func(ctx context.Context, msg *types.MessageData) {
err := msg.PackageSyncer.Sync(ctx)
assert.NoError(t, err)
messages.Add(1)
Expand Down Expand Up @@ -320,8 +322,8 @@ func TestPackageUpdatesWithError(t *testing.T) {
localPackageState := types.PackagesStateProvider(nil)
var messages atomic.Int32
var mux sync.Mutex
sender.callbacks = types.CallbacksStruct{
OnMessageFunc: func(ctx context.Context, msg *types.MessageData) {
sender.callbacks = types.Callbacks{
OnMessage: func(ctx context.Context, msg *types.MessageData) {
// Make sure the call to Sync will return an error due to a nil PackageStateProvider
err := msg.PackageSyncer.Sync(ctx)
assert.Error(t, err)
Expand Down
Loading

0 comments on commit 2204615

Please sign in to comment.