diff --git a/pkg/pubsub/sqs/publisher.go b/pkg/pubsub/sqs/publisher.go new file mode 100644 index 00000000..af00e5f2 --- /dev/null +++ b/pkg/pubsub/sqs/publisher.go @@ -0,0 +1,29 @@ +package sqs + +import ( + "context" + + "github.com/aws/aws-sdk-go-v2/service/sqs" +) + +type ( + Publisher struct { + client *sqs.Client + queueURL string + } +) + +func NewPublisher(sqsClient *sqs.Client, queueURL string) *Publisher { + return &Publisher{ + client: sqsClient, + queueURL: queueURL, + } +} + +func (p *Publisher) Publish(ctx context.Context, msg *sqs.SendMessageInput) (*sqs.SendMessageOutput, error) { + if msg.QueueUrl == nil { + msg.QueueUrl = &p.queueURL + } + + return p.client.SendMessage(ctx, msg) +} diff --git a/pkg/pubsub/sqs/subscriber.go b/pkg/pubsub/sqs/subscriber.go new file mode 100644 index 00000000..8f082321 --- /dev/null +++ b/pkg/pubsub/sqs/subscriber.go @@ -0,0 +1,71 @@ +package sqs + +import ( + "context" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" + + "github.com/scribd/go-sdk/pkg/pubsub" + "github.com/scribd/go-sdk/pkg/pubsub/pool" +) + +type ( + Subscriber struct { + client *sqs.Client + queueURL string + handler MsgHandler + maxMessages int + pool *pool.Pool + } + + SubscriberConfig struct { + SQSClient *sqs.Client + MsgHandler MsgHandler + SQSConfig pubsub.SQS + } + + MsgHandler func(msg types.Message) +) + +func NewSubscriber(c SubscriberConfig) *Subscriber { + return &Subscriber{ + client: c.SQSClient, + handler: c.MsgHandler, + maxMessages: c.SQSConfig.Subscriber.MaxMessages, + queueURL: c.SQSConfig.Subscriber.QueueURL, + } +} + +func (s *Subscriber) Subscribe(ctx context.Context) chan error { + ch := make(chan error) + + go func() { + defer close(ch) + + for { + response, err := s.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{ + QueueUrl: aws.String(s.queueURL), + MaxNumberOfMessages: int32(s.maxMessages), + MessageAttributeNames: []string{"All"}, + AttributeNames: []types.QueueAttributeName{"All"}, + }) + + if err != nil { + ch <- err + + return + } + + for _, message := range response.Messages { + s.pool.Schedule(func() { + s.handler(message) + }) + } + } + + }() + + return ch +}