Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an sqs destination. #23

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions cmd/kawad/config.go
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@ import (
"github.com/runreveal/kawa/cmd/kawad/internal/sources/scanner"
"github.com/runreveal/kawa/cmd/kawad/internal/sources/syslog"
"github.com/runreveal/kawa/cmd/kawad/internal/types"
"github.com/runreveal/kawa/internal/destinations/sqs"
"github.com/runreveal/kawa/x/s3"
"github.com/runreveal/lib/loader"
"golang.org/x/exp/slog"
@@ -34,6 +35,9 @@ func init() {
loader.Register("printer", func() loader.Builder[kawa.Destination[types.Event]] {
return &PrinterConfig{}
})
loader.Register("sqs", func() loader.Builder[kawa.Destination[[]byte]] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be types.Event, which requires implementing a small shim between bytes and types.Event.

See @harpesichord's PR for MQTT for an example:

https://github.com/runreveal/kawa/pull/22/files#diff-c98d0db234d358f55fa681b43ed3ae5cceb2257c3c44c4e77c1475b36af04cc2

return &SQSConfig{}
})
loader.Register("s3", func() loader.Builder[kawa.Destination[types.Event]] {
return &S3Config{}
})
@@ -94,6 +98,16 @@ type S3Config struct {
BatchSize int `json:"batchSize"`
}

type SQSConfig struct {
QueueURL string `json:"queueURL"`
Region string `json:"region"`

AccessKeyID string `json:"accessKeyID"`
AccessSecretKey string `json:"accessSecretKey"`

BatchSize int `json:"batchSize"`
}

func (c *S3Config) Configure() (kawa.Destination[types.Event], error) {
slog.Info("configuring s3")
return s3kawad.NewS3(
@@ -107,6 +121,17 @@ func (c *S3Config) Configure() (kawa.Destination[types.Event], error) {
), nil
}

func (c *SQSConfig) Configure() (kawa.Destination[[]byte], error) {
slog.Info("configuring sqs")
return sqs.New(
sqs.WithQueueURL(c.QueueURL),
sqs.WithRegion(c.Region),
sqs.WithAccessKeyID(c.AccessKeyID),
sqs.WithAccessSecretKey(c.AccessSecretKey),
sqs.WithBatchSize(c.BatchSize),
), nil
}

type JournaldConfig struct {
}

129 changes: 129 additions & 0 deletions internal/destinations/sqs/sqs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package sqs

import (
"context"
"errors"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
awssqs "github.com/aws/aws-sdk-go/service/sqs"
"github.com/runreveal/kawa"
batch "github.com/runreveal/kawa/x/batcher"
"github.com/segmentio/ksuid"
)

type Option func(*sqs)

func WithQueueURL(queueURL string) Option {
return func(s *sqs) {
s.queueURL = queueURL
}
}

func WithRegion(region string) Option {
return func(s *sqs) {
s.region = region
}
}

func WithAccessKeyID(accessKeyID string) Option {
return func(s *sqs) {
s.accessKeyID = accessKeyID
}
}

func WithAccessSecretKey(accessSecretKey string) Option {
return func(s *sqs) {
s.accessSecretKey = accessSecretKey
}
}

func WithBatchSize(batchSize int) Option {
return func(s *sqs) {
s.batchSize = batchSize
}
}

type sqs struct {
batcher *batch.Destination[[]byte]

queueURL string
region string

accessKeyID string
accessSecretKey string

batchSize int
}

func New(opts ...Option) *sqs {
ret := &sqs{}
for _, o := range opts {
o(ret)
}
if ret.batchSize == 0 {
ret.batchSize = 100
}
ret.batcher = batch.NewDestination[[]byte](ret,
batch.FlushLength(ret.batchSize),
batch.FlushFrequency(5*time.Second),
)
return ret
}

func (s *sqs) Run(ctx context.Context) error {
if s.queueURL == "" {
return errors.New("missing queue url")
}

return s.batcher.Run(ctx)
}

func (s *sqs) Send(ctx context.Context, ack func(), msgs ...kawa.Message[[]byte]) error {
return s.batcher.Send(ctx, ack, msgs...)
}

// Flush sends the given messages of type kawa.Message[[]byte] to an sqs queue
func (s *sqs) Flush(ctx context.Context, msgs []kawa.Message[[]byte]) error {

var config = &aws.Config{}
if s.accessKeyID != "" && s.accessSecretKey != "" {
config.Credentials = credentials.NewStaticCredentials(s.accessKeyID, s.accessSecretKey, "")
}
if s.region != "" {
config.Region = aws.String(s.region)
}
sess, err := session.NewSession(config)
if err != nil {
return err
}
sqsClient := awssqs.New(sess)

var entries = []*awssqs.SendMessageBatchRequestEntry{}
for _, msg := range msgs {
messageRequest := &awssqs.SendMessageBatchRequestEntry{
Id: aws.String(ksuid.New().String()),
MessageBody: aws.String(string(msg.Value)),
}
if strings.HasSuffix(s.queueURL, ".fifo") {
messageRequest.MessageGroupId = messageRequest.Id
messageRequest.MessageDeduplicationId = messageRequest.Id
}
entries = append(entries, messageRequest)
}

sendMessageInput := &awssqs.SendMessageBatchInput{
QueueUrl: aws.String(s.queueURL),
Entries: entries,
}

// Upload the file to S3
_, err = sqsClient.SendMessageBatch(sendMessageInput)
if err != nil {
return err
}
return nil
}