Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Remodel the agent's connection code to be more testable #27

Merged
merged 1 commit into from
Mar 1, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 131 additions & 110 deletions agent/connection.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package agent

import (
"errors"
"io"
"fmt"
"time"

"github.com/jannfis/argocd-agent/internal/event"
"github.com/jannfis/argocd-agent/internal/grpcutil"
"github.com/jannfis/argocd-agent/pkg/api/grpc/eventstreamapi"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

format "github.com/cloudevents/sdk-go/binding/format/protobuf/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
Expand Down Expand Up @@ -43,144 +41,167 @@ func (a *Agent) maintainConnection() error {
return nil
}

func (a *Agent) sender(stream eventstreamapi.EventStream_SubscribeClient) error {
logCtx := log().WithFields(logrus.Fields{
"module": "StreamEvent",
"direction": "Send",
"client_addr": grpcutil.AddressFromContext(stream.Context()),
})
q := a.queues.SendQ(a.remote.ClientID())
if q == nil {
return fmt.Errorf("no send queue found for the remote principal")
}
// Get() is blocking until there is at least one item in the
// queue.
logCtx.Tracef("Waiting to grab an item from queue as it appears")
item, shutdown := q.Get()
if shutdown {
logCtx.Tracef("Queue shutdown in progress")
return nil
}
logCtx.Tracef("Grabbed an item")
if item == nil {
// FIXME: Is this really the right thing to do?
return nil
}

ev, ok := item.(*cloudevents.Event)
if !ok {
logCtx.Warnf("invalid data in sendqueue")
return nil
}

logCtx.Tracef("Sending an item to the event stream")

pev, err := format.ToProto(ev)
if err != nil {
logCtx.Warnf("Could not wire event: %v", err)
return nil
}

err = stream.Send(&eventstreamapi.Event{Event: pev})
if err != nil {
if grpcutil.NeedReconnectOnError(err) {
return err
} else {
logCtx.Infof("Error while sending: %v", err)
return nil
}
}

return nil
}

// receiver receives and processes a single event from the event stream. It
// will block until an event has been received, or an error has occured.
func (a *Agent) receiver(stream eventstreamapi.EventStream_SubscribeClient) error {
logCtx := log().WithFields(logrus.Fields{
"module": "StreamEvent",
"direction": "Recv",
"client_addr": grpcutil.AddressFromContext(stream.Context()),
})
rcvd, err := stream.Recv()
if err != nil {
if grpcutil.NeedReconnectOnError(err) {
return err
} else {
logCtx.Infof("Error while receiving: %v", err)
return nil
}
}
ev, incomingApp, err := event.ApplicationFromWire(rcvd.Event)
if err != nil {
logCtx.Errorf("Could not unwrap event: %v", err)
return nil
}
logCtx.Debugf("Received a new event from stream")
switch ev.Type() {
case event.ApplicationCreated:
_, err := a.createApplication(incomingApp)
if err != nil {
logCtx.Errorf("Error creating application: %v", err)
}
case event.ApplicationSpecUpdated:
_, err = a.updateApplication(incomingApp)
if err != nil {
logCtx.Errorf("Error updating application: %v", err)
}
case event.ApplicationDeleted:
err = a.deleteApplication(incomingApp)
if err != nil {
logCtx.Errorf("Error deleting application: %v", err)
}
default:
logCtx.Warnf("Received an unknown event: %s. Protocol mismatch?", ev.Type())
}

return nil
}

func (a *Agent) handleStreamEvents() error {
conn := a.remote.Conn()
client := eventstreamapi.NewEventStreamClient(conn)
stream, err := client.Subscribe(a.context)
if err != nil {
return err
}
syncCh := make(chan struct{})
logCtx := log().WithFields(logrus.Fields{
"module": "StreamEvent",
"server_addr": grpcutil.AddressFromContext(stream.Context()),
})

// Receive events from the subscription stream
go func() {
logCtx := log().WithFields(logrus.Fields{
"module": "StreamEvent",
"direction": "Recv",
logCtx := logCtx.WithFields(logrus.Fields{
"direction": "recv",
})
logCtx.Info("Starting to receive events from event stream")
for a.connected.Load() {
rcvd, err := stream.Recv()
if err != nil {
if err == io.EOF {
close(syncCh)
return
}
time.Sleep(100 * time.Millisecond)
continue
}
ev, incomingApp, err := event.ApplicationFromWire(rcvd.Event)
var err error
for a.IsConnected() && err == nil {
err = a.receiver(stream)
if err != nil {
logCtx.Errorf("Could not unwrap event: %v", err)
continue
}
logCtx.Debugf("Received a new event from stream")
switch ev.Type() {
case event.ApplicationCreated:
_, err := a.createApplication(incomingApp)
if err != nil {
logCtx.Errorf("Error creating application: %v", err)
}
case event.ApplicationSpecUpdated:
_, err = a.updateApplication(incomingApp)
if err != nil {
logCtx.Errorf("Error updating application: %v", err)
}
case event.ApplicationDeleted:
err = a.deleteApplication(incomingApp)
if err != nil {
logCtx.Errorf("Error deleting application: %v", err)
if grpcutil.NeedReconnectOnError(err) {
a.SetConnected(false)
} else {
logCtx.Errorf("Error while sending to stream: %v", err)
}
default:
logCtx.Warnf("Received an unknown event: %s. Protocol mismatch?", ev.Type())
}
}
}()

// Send events in the sendq to the event stream
go func() {
logCtx := log().WithFields(logrus.Fields{
"module": "StreamEvent",
"direction": "Send",
logCtx := logCtx.WithFields(logrus.Fields{
"direction": "send",
})
logCtx.Info("Starting to send events to event stream")
for a.IsConnected() {
select {
case <-a.context.Done():
logCtx.Info("Context canceled")
return
default:
q := a.queues.SendQ(a.remote.ClientID())
if q == nil {
logCtx.Warnf("I have no send queue for the remote server")
return
}
// Get() is blocking until there is at least one item in the
// queue.
logCtx.Tracef("Waiting to grab an item from queue as it appears")
item, shutdown := q.Get()
if shutdown {
logCtx.Tracef("Queue shutdown in progress")
return
}
logCtx.Tracef("Grabbed an item")
if item == nil {
return
}

ev, ok := item.(*cloudevents.Event)
if !ok {
logCtx.Warnf("invalid data in sendqueue")
continue
}

logCtx.Tracef("Sending an item to the event stream")

pev, perr := format.ToProto(ev)
if perr != nil {
logCtx.Warnf("Could not wire event: %v", err)
continue
}

err := stream.Send(&eventstreamapi.Event{Event: pev})
// TODO: How to handle errors on send?
if err != nil {
status, ok := status.FromError(err)
if !ok {
if errors.Is(err, io.EOF) {
logCtx.Errorf("Remote disappeared")
a.SetConnected(false)
close(syncCh)
return
} else {
logCtx.Errorf("Error sending data: %v", err)
}
continue
}
if status.Code() == codes.Unavailable {
logCtx.Info("Agent has closed the connection during send, closing send loop")
close(syncCh)
return
}
var err error
for a.IsConnected() && err == nil {
err = a.sender(stream)
if err != nil {
if grpcutil.NeedReconnectOnError(err) {
a.SetConnected(false)
} else {
logCtx.Errorf("Error while sending to stream: %v", err)
}
}
}

}()

for {
for a.IsConnected() {
select {
case <-a.context.Done():
return nil
case <-syncCh:
log().WithField("component", "EventHandler").Info("Stream closed")
err := a.queues.Delete(a.remote.ClientID(), true)
if err != nil {
log().Errorf("Could not remove agent queue: %v", err)
}
return nil
default:
time.Sleep(100 * time.Millisecond)
}
}

log().WithField("component", "EventHandler").Info("Stream closed")
err = a.queues.Delete(a.remote.ClientID(), true)
if err != nil {
log().Errorf("Could not remove agent queue: %v", err)
}

return nil
}
Loading