From b35c9bcb5e98d73b8ae73eede9293a831f1a20f2 Mon Sep 17 00:00:00 2001 From: jannfis Date: Fri, 1 Mar 2024 08:39:57 -0500 Subject: [PATCH] refactor: Remodel the agent's connection code to be more testable (#27) Signed-off-by: jannfis --- agent/connection.go | 241 ++++++++++++++++++++++++-------------------- 1 file changed, 131 insertions(+), 110 deletions(-) diff --git a/agent/connection.go b/agent/connection.go index 427bf22..fbdd97f 100644 --- a/agent/connection.go +++ b/agent/connection.go @@ -1,15 +1,13 @@ package agent import ( - "errors" - "io" + "fmt" "time" "github.com/jannfis/argocd-agent/internal/event" + "github.com/jannfis/argocd-agent/internal/grpcutil" "github.com/jannfis/argocd-agent/pkg/api/grpc/eventstreamapi" "github.com/sirupsen/logrus" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" format "github.com/cloudevents/sdk-go/binding/format/protobuf/v2" cloudevents "github.com/cloudevents/sdk-go/v2" @@ -43,6 +41,103 @@ func (a *Agent) maintainConnection() error { return nil } +func (a *Agent) sender(stream eventstreamapi.EventStream_SubscribeClient) error { + logCtx := log().WithFields(logrus.Fields{ + "module": "StreamEvent", + "direction": "Send", + "client_addr": grpcutil.AddressFromContext(stream.Context()), + }) + q := a.queues.SendQ(a.remote.ClientID()) + if q == nil { + return fmt.Errorf("no send queue found for the remote principal") + } + // Get() is blocking until there is at least one item in the + // queue. + logCtx.Tracef("Waiting to grab an item from queue as it appears") + item, shutdown := q.Get() + if shutdown { + logCtx.Tracef("Queue shutdown in progress") + return nil + } + logCtx.Tracef("Grabbed an item") + if item == nil { + // FIXME: Is this really the right thing to do? + return nil + } + + ev, ok := item.(*cloudevents.Event) + if !ok { + logCtx.Warnf("invalid data in sendqueue") + return nil + } + + logCtx.Tracef("Sending an item to the event stream") + + pev, err := format.ToProto(ev) + if err != nil { + logCtx.Warnf("Could not wire event: %v", err) + return nil + } + + err = stream.Send(&eventstreamapi.Event{Event: pev}) + if err != nil { + if grpcutil.NeedReconnectOnError(err) { + return err + } else { + logCtx.Infof("Error while sending: %v", err) + return nil + } + } + + return nil +} + +// receiver receives and processes a single event from the event stream. It +// will block until an event has been received, or an error has occured. +func (a *Agent) receiver(stream eventstreamapi.EventStream_SubscribeClient) error { + logCtx := log().WithFields(logrus.Fields{ + "module": "StreamEvent", + "direction": "Recv", + "client_addr": grpcutil.AddressFromContext(stream.Context()), + }) + rcvd, err := stream.Recv() + if err != nil { + if grpcutil.NeedReconnectOnError(err) { + return err + } else { + logCtx.Infof("Error while receiving: %v", err) + return nil + } + } + ev, incomingApp, err := event.ApplicationFromWire(rcvd.Event) + if err != nil { + logCtx.Errorf("Could not unwrap event: %v", err) + return nil + } + logCtx.Debugf("Received a new event from stream") + switch ev.Type() { + case event.ApplicationCreated: + _, err := a.createApplication(incomingApp) + if err != nil { + logCtx.Errorf("Error creating application: %v", err) + } + case event.ApplicationSpecUpdated: + _, err = a.updateApplication(incomingApp) + if err != nil { + logCtx.Errorf("Error updating application: %v", err) + } + case event.ApplicationDeleted: + err = a.deleteApplication(incomingApp) + if err != nil { + logCtx.Errorf("Error deleting application: %v", err) + } + default: + logCtx.Warnf("Received an unknown event: %s. Protocol mismatch?", ev.Type()) + } + + return nil +} + func (a *Agent) handleStreamEvents() error { conn := a.remote.Conn() client := eventstreamapi.NewEventStreamClient(conn) @@ -50,137 +145,63 @@ func (a *Agent) handleStreamEvents() error { if err != nil { return err } - syncCh := make(chan struct{}) + logCtx := log().WithFields(logrus.Fields{ + "module": "StreamEvent", + "server_addr": grpcutil.AddressFromContext(stream.Context()), + }) // Receive events from the subscription stream go func() { - logCtx := log().WithFields(logrus.Fields{ - "module": "StreamEvent", - "direction": "Recv", + logCtx := logCtx.WithFields(logrus.Fields{ + "direction": "recv", }) logCtx.Info("Starting to receive events from event stream") - for a.connected.Load() { - rcvd, err := stream.Recv() - if err != nil { - if err == io.EOF { - close(syncCh) - return - } - time.Sleep(100 * time.Millisecond) - continue - } - ev, incomingApp, err := event.ApplicationFromWire(rcvd.Event) + var err error + for a.IsConnected() && err == nil { + err = a.receiver(stream) if err != nil { - logCtx.Errorf("Could not unwrap event: %v", err) - continue - } - logCtx.Debugf("Received a new event from stream") - switch ev.Type() { - case event.ApplicationCreated: - _, err := a.createApplication(incomingApp) - if err != nil { - logCtx.Errorf("Error creating application: %v", err) - } - case event.ApplicationSpecUpdated: - _, err = a.updateApplication(incomingApp) - if err != nil { - logCtx.Errorf("Error updating application: %v", err) - } - case event.ApplicationDeleted: - err = a.deleteApplication(incomingApp) - if err != nil { - logCtx.Errorf("Error deleting application: %v", err) + if grpcutil.NeedReconnectOnError(err) { + a.SetConnected(false) + } else { + logCtx.Errorf("Error while sending to stream: %v", err) } - default: - logCtx.Warnf("Received an unknown event: %s. Protocol mismatch?", ev.Type()) } } }() // Send events in the sendq to the event stream go func() { - logCtx := log().WithFields(logrus.Fields{ - "module": "StreamEvent", - "direction": "Send", + logCtx := logCtx.WithFields(logrus.Fields{ + "direction": "send", }) logCtx.Info("Starting to send events to event stream") - for a.IsConnected() { - select { - case <-a.context.Done(): - logCtx.Info("Context canceled") - return - default: - q := a.queues.SendQ(a.remote.ClientID()) - if q == nil { - logCtx.Warnf("I have no send queue for the remote server") - return - } - // Get() is blocking until there is at least one item in the - // queue. - logCtx.Tracef("Waiting to grab an item from queue as it appears") - item, shutdown := q.Get() - if shutdown { - logCtx.Tracef("Queue shutdown in progress") - return - } - logCtx.Tracef("Grabbed an item") - if item == nil { - return - } - - ev, ok := item.(*cloudevents.Event) - if !ok { - logCtx.Warnf("invalid data in sendqueue") - continue - } - - logCtx.Tracef("Sending an item to the event stream") - - pev, perr := format.ToProto(ev) - if perr != nil { - logCtx.Warnf("Could not wire event: %v", err) - continue - } - - err := stream.Send(&eventstreamapi.Event{Event: pev}) - // TODO: How to handle errors on send? - if err != nil { - status, ok := status.FromError(err) - if !ok { - if errors.Is(err, io.EOF) { - logCtx.Errorf("Remote disappeared") - a.SetConnected(false) - close(syncCh) - return - } else { - logCtx.Errorf("Error sending data: %v", err) - } - continue - } - if status.Code() == codes.Unavailable { - logCtx.Info("Agent has closed the connection during send, closing send loop") - close(syncCh) - return - } + var err error + for a.IsConnected() && err == nil { + err = a.sender(stream) + if err != nil { + if grpcutil.NeedReconnectOnError(err) { + a.SetConnected(false) + } else { + logCtx.Errorf("Error while sending to stream: %v", err) } } } - }() - for { + for a.IsConnected() { select { case <-a.context.Done(): return nil - case <-syncCh: - log().WithField("component", "EventHandler").Info("Stream closed") - err := a.queues.Delete(a.remote.ClientID(), true) - if err != nil { - log().Errorf("Could not remove agent queue: %v", err) - } - return nil default: time.Sleep(100 * time.Millisecond) } } + + log().WithField("component", "EventHandler").Info("Stream closed") + err = a.queues.Delete(a.remote.ClientID(), true) + if err != nil { + log().Errorf("Could not remove agent queue: %v", err) + } + + return nil }