From 5e7f55aeec6bc02f3c56ee9bddf2f703b0f84395 Mon Sep 17 00:00:00 2001 From: Evan Johnson Date: Sun, 13 Aug 2023 09:14:36 -0700 Subject: [PATCH 1/3] Add an sqs destination. --- cmd/kawad/config.go | 25 +++++++ internal/destinations/sqs/sqs.go | 124 +++++++++++++++++++++++++++++++ 2 files changed, 149 insertions(+) create mode 100644 internal/destinations/sqs/sqs.go diff --git a/cmd/kawad/config.go b/cmd/kawad/config.go index 2978867..1338a54 100644 --- a/cmd/kawad/config.go +++ b/cmd/kawad/config.go @@ -11,6 +11,7 @@ import ( "github.com/runreveal/kawa/cmd/kawad/internal/sources/scanner" "github.com/runreveal/kawa/cmd/kawad/internal/sources/syslog" "github.com/runreveal/kawa/cmd/kawad/internal/types" + "github.com/runreveal/kawa/internal/destinations/sqs" "github.com/runreveal/kawa/x/s3" "github.com/runreveal/lib/loader" "golang.org/x/exp/slog" @@ -34,6 +35,9 @@ func init() { loader.Register("printer", func() loader.Builder[kawa.Destination[types.Event]] { return &PrinterConfig{} }) + loader.Register("sqs", func() loader.Builder[kawa.Destination[types.Event]] { + return &SQSConfig{} + }) loader.Register("s3", func() loader.Builder[kawa.Destination[types.Event]] { return &S3Config{} }) @@ -94,6 +98,16 @@ type S3Config struct { BatchSize int `json:"batchSize"` } +type SQSConfig struct { + QueueURL string `json:"queueURL"` + Region string `json:"region"` + + AccessKeyID string `json:"accessKeyID"` + AccessSecretKey string `json:"accessSecretKey"` + + BatchSize int `json:"batchSize"` +} + func (c *S3Config) Configure() (kawa.Destination[types.Event], error) { slog.Info("configuring s3") return s3kawad.NewS3( @@ -107,6 +121,17 @@ func (c *S3Config) Configure() (kawa.Destination[types.Event], error) { ), nil } +func (c *SQSConfig) Configure() (kawa.Destination[types.Event], error) { + slog.Info("configuring sqs") + return sqs.New( + sqs.WithQueueURL(c.QueueURL), + sqs.WithRegion(c.Region), + sqs.WithAccessKeyID(c.AccessKeyID), + sqs.WithAccessSecretKey(c.AccessSecretKey), + sqs.WithBatchSize(c.BatchSize), + ), nil +} + type JournaldConfig struct { } diff --git a/internal/destinations/sqs/sqs.go b/internal/destinations/sqs/sqs.go new file mode 100644 index 0000000..be3f6e3 --- /dev/null +++ b/internal/destinations/sqs/sqs.go @@ -0,0 +1,124 @@ +package sqs + +import ( + "context" + "errors" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + awssqs "github.com/aws/aws-sdk-go/service/sqs" + "github.com/runreveal/kawa" + "github.com/runreveal/kawa/internal/types" + batch "github.com/runreveal/kawa/x/batcher" + "github.com/segmentio/ksuid" +) + +type Option func(*sqs) + +func WithQueueURL(queueURL string) Option { + return func(s *sqs) { + s.queueURL = queueURL + } +} + +func WithRegion(region string) Option { + return func(s *sqs) { + s.region = region + } +} + +func WithAccessKeyID(accessKeyID string) Option { + return func(s *sqs) { + s.accessKeyID = accessKeyID + } +} + +func WithAccessSecretKey(accessSecretKey string) Option { + return func(s *sqs) { + s.accessSecretKey = accessSecretKey + } +} + +func WithBatchSize(batchSize int) Option { + return func(s *sqs) { + s.batchSize = batchSize + } +} + +type sqs struct { + batcher *batch.Destination[types.Event] + + queueURL string + region string + + accessKeyID string + accessSecretKey string + + batchSize int +} + +func New(opts ...Option) *sqs { + ret := &sqs{} + for _, o := range opts { + o(ret) + } + if ret.batchSize == 0 { + ret.batchSize = 100 + } + ret.batcher = batch.NewDestination[types.Event](ret, + batch.FlushLength(ret.batchSize), + batch.FlushFrequency(5*time.Second), + ) + return ret +} + +func (s *sqs) Run(ctx context.Context) error { + if s.queueURL == "" { + return errors.New("missing queue url") + } + + return s.batcher.Run(ctx) +} + +func (s *sqs) Send(ctx context.Context, ack func(), msgs ...kawa.Message[types.Event]) error { + return s.batcher.Send(ctx, ack, msgs...) +} + +// Flush sends the given messages of type kawa.Message[type.Event] to an sqs queue +func (s *sqs) Flush(ctx context.Context, msgs []kawa.Message[types.Event]) error { + + var config = &aws.Config{} + if s.accessKeyID != "" && s.accessSecretKey != "" { + config.Credentials = credentials.NewStaticCredentials(s.accessKeyID, s.accessSecretKey, "") + } + if s.region != "" { + config.Region = aws.String(s.region) + } + sess, err := session.NewSession(config) + if err != nil { + return err + } + sqsClient := awssqs.New(sess) + + var entries = []*awssqs.SendMessageBatchRequestEntry{} + for _, msg := range msgs { + entries = append(entries, &awssqs.SendMessageBatchRequestEntry{ + Id: aws.String(ksuid.New().String()), + MessageBody: aws.String(string(msg.Value.RawLog)), + }) + } + + sendMessageInput := &awssqs.SendMessageBatchInput{ + QueueUrl: aws.String(s.queueURL), + Entries: entries, + } + + // Upload the file to S3 + _, err = sqsClient.SendMessageBatch(sendMessageInput) + if err != nil { + return err + } + return nil +} From 86a43a452a11fde7ca73ba3749e3f84f47d3b447 Mon Sep 17 00:00:00 2001 From: Evan Johnson Date: Mon, 14 Aug 2023 14:40:30 -0700 Subject: [PATCH 2/3] Fixes to work with fifo queues. --- internal/destinations/sqs/sqs.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/internal/destinations/sqs/sqs.go b/internal/destinations/sqs/sqs.go index be3f6e3..69cd6cd 100644 --- a/internal/destinations/sqs/sqs.go +++ b/internal/destinations/sqs/sqs.go @@ -3,6 +3,7 @@ package sqs import ( "context" "errors" + "strings" "time" "github.com/aws/aws-sdk-go/aws" @@ -104,10 +105,15 @@ func (s *sqs) Flush(ctx context.Context, msgs []kawa.Message[types.Event]) error var entries = []*awssqs.SendMessageBatchRequestEntry{} for _, msg := range msgs { - entries = append(entries, &awssqs.SendMessageBatchRequestEntry{ + messageRequest := &awssqs.SendMessageBatchRequestEntry{ Id: aws.String(ksuid.New().String()), MessageBody: aws.String(string(msg.Value.RawLog)), - }) + } + if strings.HasSuffix(s.queueURL, ".fifo") { + messageRequest.MessageGroupId = messageRequest.Id + messageRequest.MessageDeduplicationId = messageRequest.Id + } + entries = append(entries, messageRequest) } sendMessageInput := &awssqs.SendMessageBatchInput{ From 9fde325e39b65d115c92f4e8fa97d9679e21c80a Mon Sep 17 00:00:00 2001 From: Evan Johnson Date: Sun, 27 Aug 2023 12:26:29 -0500 Subject: [PATCH 3/3] Convert types.Event -> []byte for sqs dest --- cmd/kawad/config.go | 4 ++-- internal/destinations/sqs/sqs.go | 13 ++++++------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/cmd/kawad/config.go b/cmd/kawad/config.go index 1338a54..f8b4d1e 100644 --- a/cmd/kawad/config.go +++ b/cmd/kawad/config.go @@ -35,7 +35,7 @@ func init() { loader.Register("printer", func() loader.Builder[kawa.Destination[types.Event]] { return &PrinterConfig{} }) - loader.Register("sqs", func() loader.Builder[kawa.Destination[types.Event]] { + loader.Register("sqs", func() loader.Builder[kawa.Destination[[]byte]] { return &SQSConfig{} }) loader.Register("s3", func() loader.Builder[kawa.Destination[types.Event]] { @@ -121,7 +121,7 @@ func (c *S3Config) Configure() (kawa.Destination[types.Event], error) { ), nil } -func (c *SQSConfig) Configure() (kawa.Destination[types.Event], error) { +func (c *SQSConfig) Configure() (kawa.Destination[[]byte], error) { slog.Info("configuring sqs") return sqs.New( sqs.WithQueueURL(c.QueueURL), diff --git a/internal/destinations/sqs/sqs.go b/internal/destinations/sqs/sqs.go index 69cd6cd..e890efc 100644 --- a/internal/destinations/sqs/sqs.go +++ b/internal/destinations/sqs/sqs.go @@ -11,7 +11,6 @@ import ( "github.com/aws/aws-sdk-go/aws/session" awssqs "github.com/aws/aws-sdk-go/service/sqs" "github.com/runreveal/kawa" - "github.com/runreveal/kawa/internal/types" batch "github.com/runreveal/kawa/x/batcher" "github.com/segmentio/ksuid" ) @@ -49,7 +48,7 @@ func WithBatchSize(batchSize int) Option { } type sqs struct { - batcher *batch.Destination[types.Event] + batcher *batch.Destination[[]byte] queueURL string region string @@ -68,7 +67,7 @@ func New(opts ...Option) *sqs { if ret.batchSize == 0 { ret.batchSize = 100 } - ret.batcher = batch.NewDestination[types.Event](ret, + ret.batcher = batch.NewDestination[[]byte](ret, batch.FlushLength(ret.batchSize), batch.FlushFrequency(5*time.Second), ) @@ -83,12 +82,12 @@ func (s *sqs) Run(ctx context.Context) error { return s.batcher.Run(ctx) } -func (s *sqs) Send(ctx context.Context, ack func(), msgs ...kawa.Message[types.Event]) error { +func (s *sqs) Send(ctx context.Context, ack func(), msgs ...kawa.Message[[]byte]) error { return s.batcher.Send(ctx, ack, msgs...) } -// Flush sends the given messages of type kawa.Message[type.Event] to an sqs queue -func (s *sqs) Flush(ctx context.Context, msgs []kawa.Message[types.Event]) error { +// Flush sends the given messages of type kawa.Message[[]byte] to an sqs queue +func (s *sqs) Flush(ctx context.Context, msgs []kawa.Message[[]byte]) error { var config = &aws.Config{} if s.accessKeyID != "" && s.accessSecretKey != "" { @@ -107,7 +106,7 @@ func (s *sqs) Flush(ctx context.Context, msgs []kawa.Message[types.Event]) error for _, msg := range msgs { messageRequest := &awssqs.SendMessageBatchRequestEntry{ Id: aws.String(ksuid.New().String()), - MessageBody: aws.String(string(msg.Value.RawLog)), + MessageBody: aws.String(string(msg.Value)), } if strings.HasSuffix(s.queueURL, ".fifo") { messageRequest.MessageGroupId = messageRequest.Id