Skip to content

Commit

Permalink
Test coverage for master leader stepdown with reconnects
Browse files Browse the repository at this point in the history
  • Loading branch information
radekg committed Jan 13, 2022
1 parent a14f413 commit d5ca09e
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 3 deletions.
10 changes: 8 additions & 2 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/radekg/yugabyte-db-go-client/configs"
clientErrors "github.com/radekg/yugabyte-db-go-client/errors"
"github.com/radekg/yugabyte-db-go-client/metrics"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"

ybApi "github.com/radekg/yugabyte-db-go-proto/v2/yb/api"
Expand Down Expand Up @@ -137,6 +138,11 @@ func (c *defaultYBClient) Execute(payload, response protoreflect.ProtoMessage) e
return nil
}

// Previous execute call could have resulted
// in an error response.
// Because we are reusing response object, we have to reset it!
proto.Reset(response)

if c.config.MaxExecuteRetries <= configs.NoExecuteRetry {
reportErr := executeErr
if tReconnectError, ok := executeErr.(*clientErrors.RequiresReconnectError); ok {
Expand Down Expand Up @@ -234,15 +240,14 @@ func (c *defaultYBClient) Execute(payload, response protoreflect.ProtoMessage) e
}

if !reconnected {

c.logger.Error("execute: failed reconnect consecutive maximum reconnect attempts",
"max-attempts", c.config.MaxReconnectAttempts,
"reason", tReconnectError.Cause)
return fmt.Errorf("%s: %s", errNotReconnected.Error(), tReconnectError.Cause.Error())
}

// retry:
<-time.After(c.config.RetryInterval)
c.logger.Info("execute: client successfully reconnected")
currentAttempt = currentAttempt + 1
continue

Expand Down Expand Up @@ -358,6 +363,7 @@ func (c *defaultYBClient) connectUnsafe() error {
return &clientErrors.NoLeaderError{}
}
case connectedClient := <-chanConnectedClient:
c.logger.Debug("Setting connected client...")
c.metricsCallback.ClientConnect()
c.connectedClient = connectedClient
c.isConnecting = false
Expand Down
7 changes: 7 additions & 0 deletions client/single_node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ var recvChunkSize = 4 * 1024

// YBConnectedClient represents a connected client.
type YBConnectedClient interface {
ClientID() string
// Close closes the connected client.
Close() error
// Execute executes the payload against the service
Expand All @@ -33,6 +34,7 @@ type YBConnectedClient interface {
}

type defaultSingleNodeClient struct {
id string
originalConfig *configs.YBSingleNodeClientConfig
callCounter int
chanConnected chan struct{}
Expand All @@ -44,6 +46,11 @@ type defaultSingleNodeClient struct {
svcRegistry ServiceRegistry
}

// Close closes a connected client.
func (c *defaultSingleNodeClient) ClientID() string {
return c.id
}

// Close closes a connected client.
func (c *defaultSingleNodeClient) Close() error {
return c.closeFunc()
Expand Down
3 changes: 3 additions & 0 deletions client/single_node_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package client

import (
"crypto/tls"
"fmt"
"net"
"time"

"github.com/hashicorp/go-hclog"
"github.com/radekg/yugabyte-db-go-client/configs"
Expand Down Expand Up @@ -64,6 +66,7 @@ func (dcc *defaultClientConnector) connect(cfg *configs.YBSingleNodeClientConfig
return nil, err
}
client := &defaultSingleNodeClient{
id: fmt.Sprintf("client-%d", time.Now().Unix()),
originalConfig: cfg,
chanConnected: make(chan struct{}, 1),
chanConnectErr: make(chan error, 1),
Expand Down
1 change: 0 additions & 1 deletion metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ func (p *noop) ClientBytesReceived(n int) {}
func (p *noop) ClientBytesSent(n int) {}
func (p *noop) ClientConnect() {}
func (p *noop) ClientError() {}
func (p *noop) ClientPayloadSent() {}
func (p *noop) ClientMessageSendFailure() {}
func (p *noop) ClientMessageSendSuccess() {}
func (p *noop) ClientReconnectAttempt() {}
Expand Down
122 changes: 122 additions & 0 deletions testutils/common/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package common

import (
"sync"
"testing"
)

// TestMetricsCallback is the metrics callback to use in tests.
type TestMetricsCallback struct {
lock *sync.Mutex

clientBytesReceived int
clientBytesSent int
clientConnects int
clientErrors int
messageSendFailures int
messageSendSuccesses int
reconnectAttempts int
reconnectFailures int
reconnectSuccesses int
}

// NewTestMetricsCallback creates a new configured instance of test metrics callback.
func NewTestMetricsCallback(t *testing.T) *TestMetricsCallback {
return &TestMetricsCallback{lock: &sync.Mutex{}}
}

// -- metrics callback interface

func (p *TestMetricsCallback) ClientBytesReceived(n int) {
p.lock.Lock()
p.clientBytesReceived = p.clientBytesReceived + n
p.lock.Unlock()
}
func (p *TestMetricsCallback) ClientBytesSent(n int) {
p.lock.Lock()
p.clientBytesSent = p.clientBytesSent + n
p.lock.Unlock()
}
func (p *TestMetricsCallback) ClientConnect() {
p.lock.Lock()
p.clientConnects = p.clientConnects + 1
p.lock.Unlock()
}
func (p *TestMetricsCallback) ClientError() {
p.lock.Lock()
p.clientErrors = p.clientErrors + 1
p.lock.Unlock()
}
func (p *TestMetricsCallback) ClientMessageSendFailure() {
p.lock.Lock()
p.messageSendFailures = p.messageSendFailures + 1
p.lock.Unlock()
}
func (p *TestMetricsCallback) ClientMessageSendSuccess() {
p.lock.Lock()
p.messageSendSuccesses = p.messageSendSuccesses + 1
p.lock.Unlock()
}
func (p *TestMetricsCallback) ClientReconnectAttempt() {
p.lock.Lock()
p.reconnectAttempts = p.reconnectAttempts + 1
p.lock.Unlock()
}
func (p *TestMetricsCallback) ClientReconnectFailure() {
p.lock.Lock()
p.reconnectFailures = p.reconnectFailures + 1
p.lock.Unlock()
}
func (p *TestMetricsCallback) ClientReconnectSuccess() {
p.lock.Lock()
p.reconnectSuccesses = p.reconnectSuccesses + 1
p.lock.Unlock()
}

// -- metrics inspect interface

func (p *TestMetricsCallback) InspectClientBytesReceived(t *testing.T) int {
p.lock.Lock()
defer p.lock.Unlock()
return p.clientBytesReceived
}
func (p *TestMetricsCallback) InspectClientBytesSent(t *testing.T) int {
p.lock.Lock()
defer p.lock.Unlock()
return p.clientBytesSent
}
func (p *TestMetricsCallback) InspectClientConnect(t *testing.T) int {
p.lock.Lock()
defer p.lock.Unlock()
return p.clientConnects
}
func (p *TestMetricsCallback) InspectClientError(t *testing.T) int {
p.lock.Lock()
defer p.lock.Unlock()
return p.clientErrors
}
func (p *TestMetricsCallback) InspectClientMessageSendFailure(t *testing.T) int {
p.lock.Lock()
defer p.lock.Unlock()
return p.messageSendFailures
}
func (p *TestMetricsCallback) InspectClientMessageSendSuccess(t *testing.T) int {
p.lock.Lock()
defer p.lock.Unlock()
return p.messageSendSuccesses
}
func (p *TestMetricsCallback) InspectClientReconnectAttempt(t *testing.T) int {
p.lock.Lock()
defer p.lock.Unlock()
return p.reconnectAttempts
}
func (p *TestMetricsCallback) InspectClientReconnectFailure(t *testing.T) int {
p.lock.Lock()
defer p.lock.Unlock()
return p.reconnectFailures
}
func (p *TestMetricsCallback) InspectClientReconnectSuccess(t *testing.T) int {
p.lock.Lock()
defer p.lock.Unlock()
return p.reconnectSuccesses
}
65 changes: 65 additions & 0 deletions testutils/master/master_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"
"time"

"github.com/hashicorp/go-hclog"
"github.com/radekg/yugabyte-db-go-client/client"
"github.com/radekg/yugabyte-db-go-client/configs"
"github.com/radekg/yugabyte-db-go-client/errors"
Expand Down Expand Up @@ -48,6 +49,70 @@ func TestMasterIntegration(t *testing.T) {

}

func TestMasterLeaderStepdown(t *testing.T) {

debugLog := hclog.Default()
debugLog.SetLevel(hclog.Debug)

metricsCallback := common.NewTestMetricsCallback(t)

request := &ybApi.ListMastersRequestPB{}

testCtx := SetupMasters(t, &common.TestMasterConfiguration{
ReplicationFactor: 1,
MasterPrefix: "master-it",
})
defer testCtx.Cleanup()

client := client.NewYBClient(&configs.YBClientConfig{
MasterHostPort: testCtx.MasterExternalAddresses(),
OpTimeout: time.Duration(time.Second * 5),
MaxReconnectAttempts: 20,
}).WithLogger(debugLog).WithMetricsCallback(metricsCallback)

errNotConnected := client.Execute(request, &ybApi.ListMastersResponsePB{})
assert.NotNil(t, errNotConnected, "expected an error")

common.Eventually(t, 15, func() error {
if err := client.Connect(); err != nil {
return err
}
return nil
})

defer client.Close()

common.Eventually(t, 15, func() error {

response := &ybApi.ListMastersResponsePB{}
err := client.Execute(request, response)
if err != nil {
return err
}
t.Log("Received master list", response)
return nil
})

stepdownRequest := &ybApi.LeaderStepDownRequestPB{
TabletId: []byte("00000000000000000000000000000000"),
}
stepdownResponse := &ybApi.LeaderStepDownResponsePB{}
stepdownErr := client.Execute(stepdownRequest, stepdownResponse)
assert.Nil(t, stepdownErr)

t.Log("Received stepdown response", stepdownResponse)

tsRequest := &ybApi.ListTabletServersRequestPB{}
tsResponse := &ybApi.ListTabletServersResponsePB{}
err := client.Execute(tsRequest, tsResponse)
assert.Nil(t, err)
t.Log("Received tablet servers list", tsResponse)

assert.Greater(t, metricsCallback.InspectClientReconnectAttempt(t), 0, "expected some reconnect attempts")
assert.Equal(t, 1, metricsCallback.InspectClientReconnectSuccess(t), "expected one successful reconnect")

}

func TestMasterReconnect(t *testing.T) {

request := &ybApi.ListMastersRequestPB{}
Expand Down

0 comments on commit d5ca09e

Please sign in to comment.