Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gracefully shutdown of the websocket client #213

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion client/internal/wsreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ type wsReceiver struct {
sender *WSSender
callbacks types.Callbacks
processor receivedProcessor

// Indicates that the receiver has fully stopped.
stopped chan struct{}
}

// NewWSReceiver creates a new Receiver that uses WebSocket to receive
Expand All @@ -36,25 +39,40 @@ func NewWSReceiver(
sender: sender,
callbacks: callbacks,
processor: newReceivedProcessor(logger, callbacks, sender, clientSyncedState, packagesStateProvider, capabilities),
stopped: make(chan struct{}),
}

return w
}

// ReceiverLoop runs the receiver loop. To stop the receiver cancel the context.
// Start starts the receiver loop.
func (r *wsReceiver) Start(ctx context.Context) {
go r.ReceiverLoop(ctx)
}

// IsStopped returns a channel that's closed when the receiver is stopped.
func (r *wsReceiver) IsStopped() <-chan struct{} {
haoqixu marked this conversation as resolved.
Show resolved Hide resolved
return r.stopped
}

// ReceiverLoop runs the receiver loop.
// To stop the receiver cancel the context and close the websocket connection
func (r *wsReceiver) ReceiverLoop(ctx context.Context) {
type receivedMessage struct {
message *protobufs.ServerToAgent
err error
}

defer func() { close(r.stopped) }()

for {
select {
case <-ctx.Done():
return
default:
result := make(chan receivedMessage, 1)

// To stop this goroutine, close the websocket connection
go func() {
var message protobufs.ServerToAgent
err := r.receiveMessage(&message)
Expand Down
33 changes: 29 additions & 4 deletions client/internal/wssender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package internal

import (
"context"
"time"

"github.com/gorilla/websocket"
"google.golang.org/protobuf/proto"
Expand All @@ -11,13 +12,19 @@ import (
"github.com/open-telemetry/opamp-go/protobufs"
)

const (
defaultSendCloseMessageTimeout = 5 * time.Second
)

// WSSender implements the WebSocket client's sending portion of OpAMP protocol.
type WSSender struct {
SenderCommon
conn *websocket.Conn
logger types.Logger

// Indicates that the sender has fully stopped.
stopped chan struct{}
err error
}

// NewSender creates a new Sender that uses WebSocket to send
Expand All @@ -37,15 +44,22 @@ func (s *WSSender) Start(ctx context.Context, conn *websocket.Conn) error {

// Run the sender in the background.
s.stopped = make(chan struct{})
s.err = nil
go s.run(ctx)

return err
}

// WaitToStop blocks until the sender is stopped. To stop the sender cancel the context
// that was passed to Start().
func (s *WSSender) WaitToStop() {
<-s.stopped
// IsStopped returns a channel that's closed when the sender is stopped.
func (s *WSSender) IsStopped() <-chan struct{} {
return s.stopped
}

// StoppingErr returns an error if there was a problem with stopping the sender.
// If stopping was successful will return nil.
// StoppingErr() can be called only after IsStopped() is signalled.
func (s *WSSender) StoppingErr() error {
return s.err
}

func (s *WSSender) run(ctx context.Context) {
Expand All @@ -56,13 +70,24 @@ out:
s.sendNextMessage(ctx)

case <-ctx.Done():
if err := s.sendCloseMessage(); err != nil && err != websocket.ErrCloseSent {
s.err = err
}
break out
}
}

close(s.stopped)
}

func (s *WSSender) sendCloseMessage() error {
return s.conn.WriteControl(
websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseNormalClosure, "Normal closure"),
time.Now().Add(defaultSendCloseMessageTimeout),
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved
)
}

func (s *WSSender) sendNextMessage(ctx context.Context) error {
msgToSend := s.nextMessage.PopPending()
if msgToSend != nil && !proto.Equal(msgToSend, &protobufs.AgentToServer{}) {
Expand Down
87 changes: 57 additions & 30 deletions client/wsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
"github.com/open-telemetry/opamp-go/protobufs"
)

const (
defaultShutdownTimeout = 5 * time.Second
)

// wsClient is an OpAMP Client implementation for WebSocket transport.
// See specification: https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#websocket-transport
type wsClient struct {
Expand All @@ -40,6 +44,10 @@
// last non-nil internal error that was encountered in the conn retry loop,
// currently used only for testing.
lastInternalErr atomic.Pointer[error]

// Network connection timeout used for the WebSocket closing handshake.
// This field is currently only modified during testing.
connShutdownTimeout time.Duration
}

// NewWebSocket creates a new OpAMP Client that uses WebSocket transport.
Expand All @@ -50,8 +58,9 @@

sender := internal.NewSender(logger)
w := &wsClient{
common: internal.NewClientCommon(logger, sender),
sender: sender,
common: internal.NewClientCommon(logger, sender),
sender: sender,
connShutdownTimeout: defaultShutdownTimeout,
}
return w
}
Expand Down Expand Up @@ -85,15 +94,6 @@
}

func (c *wsClient) Stop(ctx context.Context) error {
// Close connection if any.
c.connMutex.RLock()
conn := c.conn
c.connMutex.RUnlock()

if conn != nil {
_ = conn.Close()
}

return c.common.Stop(ctx)
}

Expand Down Expand Up @@ -224,19 +224,25 @@
// runOneCycle performs the following actions:
// 1. connect (try until succeeds).
// 2. send first status report.
// 3. receive and process messages until error happens.
// 3. start the sender to wait for scheduled messages and send them to the server.
// 4. start the receiver to receive and process messages until an error happens.
// 5. wait until both the sender and receiver are stopped.
//
// If it encounters an error it closes the connection and returns.
// Will stop and return if Stop() is called (ctx is cancelled, isStopping is set).
// runOneCycle will close the connection it created before it return.
//
// When Stop() is called (ctx is cancelled, isStopping is set), wsClient will shutdown gracefully:
// 1. sender will be cancelled by the ctx, send the close message to server and return the error via sender.Err().
// 2. runOneCycle will handle that error and wait for the close message from server until timeout.
func (c *wsClient) runOneCycle(ctx context.Context) {
if err := c.ensureConnected(ctx); err != nil {
// Can't connect, so can't move forward. This currently happens when we
// are being stopped.
return
}
// Close the underlying connection.
defer c.conn.Close()

if c.common.IsStopping() {
_ = c.conn.Close()
return
}

Expand All @@ -248,15 +254,14 @@
}

// Create a cancellable context for background processors.
procCtx, procCancel := context.WithCancel(ctx)
senderCtx, stopSender := context.WithCancel(ctx)
defer stopSender()

// Connected successfully. Start the sender. This will also send the first
// status report.
if err := c.sender.Start(procCtx, c.conn); err != nil {
c.common.Logger.Errorf(procCtx, "Failed to send first status report: %v", err)
if err := c.sender.Start(senderCtx, c.conn); err != nil {
c.common.Logger.Errorf(senderCtx, "Failed to send first status report: %v", err)

Check warning on line 263 in client/wsclient.go

View check run for this annotation

Codecov / codecov/patch

client/wsclient.go#L263

Added line #L263 was not covered by tests
// We could not send the report, the only thing we can do is start over.
_ = c.conn.Close()
procCancel()
return
}

Expand All @@ -270,19 +275,41 @@
c.common.PackagesStateProvider,
c.common.Capabilities,
)
r.ReceiverLoop(ctx)

// Stop the background processors.
procCancel()

// If we exited receiverLoop it means there is a connection error, we cannot
// read messages anymore. We need to start over.
// When the wsclient is closed, the context passed to runOneCycle will be canceled.
// The receiver should keep running and processing messages
// until it received a Close message from the server which means the server has no more messages.
receiverCtx, stopReceiver := context.WithCancel(context.Background())
defer stopReceiver()
r.Start(receiverCtx)

select {
case <-c.sender.IsStopped():
// sender will send close message to initiate the close handshake
if err := c.sender.StoppingErr(); err != nil {
c.common.Logger.Debugf(ctx, "Error stopping the sender: %v", err)

Check warning on line 290 in client/wsclient.go

View check run for this annotation

Codecov / codecov/patch

client/wsclient.go#L290

Added line #L290 was not covered by tests

stopReceiver()
<-r.IsStopped()
break

Check warning on line 294 in client/wsclient.go

View check run for this annotation

Codecov / codecov/patch

client/wsclient.go#L292-L294

Added lines #L292 - L294 were not covered by tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we break here? If the sender stopping failed shouldn't we still try to stop the receiver and close the connection?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this break was introduced during the updates of the PR. The initial code does not wait for the receiver to stop, leaving the closing of the connection to the defer. I update the branch to close the conneciton explicitly and wait for the receiver to stop before break.

}

// Close the connection to unblock the WSSender as well.
_ = c.conn.Close()
c.common.Logger.Debugf(ctx, "Waiting for receiver to stop.")
select {
case <-r.IsStopped():
c.common.Logger.Debugf(ctx, "Receiver stopped.")
case <-time.After(c.connShutdownTimeout):
c.common.Logger.Debugf(ctx, "Timeout waiting for receiver to stop.")
stopReceiver()
<-r.IsStopped()
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved
}
case <-r.IsStopped():
// If we exited receiverLoop it means there is a connection error, we cannot
// read messages anymore. We need to start over.

// Wait for WSSender to stop.
c.sender.WaitToStop()
stopSender()
<-c.sender.IsStopped()
haoqixu marked this conversation as resolved.
Show resolved Hide resolved
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved
}
}

func (c *wsClient) runUntilStopped(ctx context.Context) {
Expand Down
Loading
Loading