Skip to content

Commit

Permalink
refactor: Remodel the agent's connection code to be more testable (#27)
Browse files Browse the repository at this point in the history
Signed-off-by: jannfis <[email protected]>
  • Loading branch information
jannfis authored Mar 1, 2024
1 parent d251a8c commit b35c9bc
Showing 1 changed file with 131 additions and 110 deletions.
241 changes: 131 additions & 110 deletions agent/connection.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package agent

import (
"errors"
"io"
"fmt"
"time"

"github.com/jannfis/argocd-agent/internal/event"
"github.com/jannfis/argocd-agent/internal/grpcutil"
"github.com/jannfis/argocd-agent/pkg/api/grpc/eventstreamapi"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

format "github.com/cloudevents/sdk-go/binding/format/protobuf/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
Expand Down Expand Up @@ -43,144 +41,167 @@ func (a *Agent) maintainConnection() error {
return nil
}

func (a *Agent) sender(stream eventstreamapi.EventStream_SubscribeClient) error {
logCtx := log().WithFields(logrus.Fields{
"module": "StreamEvent",
"direction": "Send",
"client_addr": grpcutil.AddressFromContext(stream.Context()),
})
q := a.queues.SendQ(a.remote.ClientID())
if q == nil {
return fmt.Errorf("no send queue found for the remote principal")
}
// Get() is blocking until there is at least one item in the
// queue.
logCtx.Tracef("Waiting to grab an item from queue as it appears")
item, shutdown := q.Get()
if shutdown {
logCtx.Tracef("Queue shutdown in progress")
return nil
}
logCtx.Tracef("Grabbed an item")
if item == nil {
// FIXME: Is this really the right thing to do?
return nil
}

ev, ok := item.(*cloudevents.Event)
if !ok {
logCtx.Warnf("invalid data in sendqueue")
return nil
}

logCtx.Tracef("Sending an item to the event stream")

pev, err := format.ToProto(ev)
if err != nil {
logCtx.Warnf("Could not wire event: %v", err)
return nil
}

err = stream.Send(&eventstreamapi.Event{Event: pev})
if err != nil {
if grpcutil.NeedReconnectOnError(err) {
return err
} else {
logCtx.Infof("Error while sending: %v", err)
return nil
}
}

return nil
}

// receiver receives and processes a single event from the event stream. It
// will block until an event has been received, or an error has occured.
func (a *Agent) receiver(stream eventstreamapi.EventStream_SubscribeClient) error {
logCtx := log().WithFields(logrus.Fields{
"module": "StreamEvent",
"direction": "Recv",
"client_addr": grpcutil.AddressFromContext(stream.Context()),
})
rcvd, err := stream.Recv()
if err != nil {
if grpcutil.NeedReconnectOnError(err) {
return err
} else {
logCtx.Infof("Error while receiving: %v", err)
return nil
}
}
ev, incomingApp, err := event.ApplicationFromWire(rcvd.Event)
if err != nil {
logCtx.Errorf("Could not unwrap event: %v", err)
return nil
}
logCtx.Debugf("Received a new event from stream")
switch ev.Type() {
case event.ApplicationCreated:
_, err := a.createApplication(incomingApp)
if err != nil {
logCtx.Errorf("Error creating application: %v", err)
}
case event.ApplicationSpecUpdated:
_, err = a.updateApplication(incomingApp)
if err != nil {
logCtx.Errorf("Error updating application: %v", err)
}
case event.ApplicationDeleted:
err = a.deleteApplication(incomingApp)
if err != nil {
logCtx.Errorf("Error deleting application: %v", err)
}
default:
logCtx.Warnf("Received an unknown event: %s. Protocol mismatch?", ev.Type())
}

return nil
}

func (a *Agent) handleStreamEvents() error {
conn := a.remote.Conn()
client := eventstreamapi.NewEventStreamClient(conn)
stream, err := client.Subscribe(a.context)
if err != nil {
return err
}
syncCh := make(chan struct{})
logCtx := log().WithFields(logrus.Fields{
"module": "StreamEvent",
"server_addr": grpcutil.AddressFromContext(stream.Context()),
})

// Receive events from the subscription stream
go func() {
logCtx := log().WithFields(logrus.Fields{
"module": "StreamEvent",
"direction": "Recv",
logCtx := logCtx.WithFields(logrus.Fields{
"direction": "recv",
})
logCtx.Info("Starting to receive events from event stream")
for a.connected.Load() {
rcvd, err := stream.Recv()
if err != nil {
if err == io.EOF {
close(syncCh)
return
}
time.Sleep(100 * time.Millisecond)
continue
}
ev, incomingApp, err := event.ApplicationFromWire(rcvd.Event)
var err error
for a.IsConnected() && err == nil {
err = a.receiver(stream)
if err != nil {
logCtx.Errorf("Could not unwrap event: %v", err)
continue
}
logCtx.Debugf("Received a new event from stream")
switch ev.Type() {
case event.ApplicationCreated:
_, err := a.createApplication(incomingApp)
if err != nil {
logCtx.Errorf("Error creating application: %v", err)
}
case event.ApplicationSpecUpdated:
_, err = a.updateApplication(incomingApp)
if err != nil {
logCtx.Errorf("Error updating application: %v", err)
}
case event.ApplicationDeleted:
err = a.deleteApplication(incomingApp)
if err != nil {
logCtx.Errorf("Error deleting application: %v", err)
if grpcutil.NeedReconnectOnError(err) {
a.SetConnected(false)
} else {
logCtx.Errorf("Error while sending to stream: %v", err)
}
default:
logCtx.Warnf("Received an unknown event: %s. Protocol mismatch?", ev.Type())
}
}
}()

// Send events in the sendq to the event stream
go func() {
logCtx := log().WithFields(logrus.Fields{
"module": "StreamEvent",
"direction": "Send",
logCtx := logCtx.WithFields(logrus.Fields{
"direction": "send",
})
logCtx.Info("Starting to send events to event stream")
for a.IsConnected() {
select {
case <-a.context.Done():
logCtx.Info("Context canceled")
return
default:
q := a.queues.SendQ(a.remote.ClientID())
if q == nil {
logCtx.Warnf("I have no send queue for the remote server")
return
}
// Get() is blocking until there is at least one item in the
// queue.
logCtx.Tracef("Waiting to grab an item from queue as it appears")
item, shutdown := q.Get()
if shutdown {
logCtx.Tracef("Queue shutdown in progress")
return
}
logCtx.Tracef("Grabbed an item")
if item == nil {
return
}

ev, ok := item.(*cloudevents.Event)
if !ok {
logCtx.Warnf("invalid data in sendqueue")
continue
}

logCtx.Tracef("Sending an item to the event stream")

pev, perr := format.ToProto(ev)
if perr != nil {
logCtx.Warnf("Could not wire event: %v", err)
continue
}

err := stream.Send(&eventstreamapi.Event{Event: pev})
// TODO: How to handle errors on send?
if err != nil {
status, ok := status.FromError(err)
if !ok {
if errors.Is(err, io.EOF) {
logCtx.Errorf("Remote disappeared")
a.SetConnected(false)
close(syncCh)
return
} else {
logCtx.Errorf("Error sending data: %v", err)
}
continue
}
if status.Code() == codes.Unavailable {
logCtx.Info("Agent has closed the connection during send, closing send loop")
close(syncCh)
return
}
var err error
for a.IsConnected() && err == nil {
err = a.sender(stream)
if err != nil {
if grpcutil.NeedReconnectOnError(err) {
a.SetConnected(false)
} else {
logCtx.Errorf("Error while sending to stream: %v", err)
}
}
}

}()

for {
for a.IsConnected() {
select {
case <-a.context.Done():
return nil
case <-syncCh:
log().WithField("component", "EventHandler").Info("Stream closed")
err := a.queues.Delete(a.remote.ClientID(), true)
if err != nil {
log().Errorf("Could not remove agent queue: %v", err)
}
return nil
default:
time.Sleep(100 * time.Millisecond)
}
}

log().WithField("component", "EventHandler").Info("Stream closed")
err = a.queues.Delete(a.remote.ClientID(), true)
if err != nil {
log().Errorf("Could not remove agent queue: %v", err)
}

return nil
}

0 comments on commit b35c9bc

Please sign in to comment.