Skip to content

Commit

Permalink
feat(PubSub): Add SQS PubSub wrappers
Browse files Browse the repository at this point in the history
  • Loading branch information
Neurostep committed Nov 1, 2024
1 parent 29b512f commit 0c1c94b
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 0 deletions.
29 changes: 29 additions & 0 deletions pkg/pubsub/sqs/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package sqs

import (
"context"

"github.com/aws/aws-sdk-go-v2/service/sqs"
)

type (
Publisher struct {
client *sqs.Client
queueURL string
}
)

func NewPublisher(sqsClient *sqs.Client, queueURL string) *Publisher {
return &Publisher{
client: sqsClient,
queueURL: queueURL,
}
}

func (p *Publisher) Publish(ctx context.Context, msg *sqs.SendMessageInput) (*sqs.SendMessageOutput, error) {
if msg.QueueUrl == nil {
msg.QueueUrl = &p.queueURL
}

return p.client.SendMessage(ctx, msg)
}
71 changes: 71 additions & 0 deletions pkg/pubsub/sqs/subscriber.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package sqs

import (
"context"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"

"github.com/scribd/go-sdk/pkg/pubsub"
"github.com/scribd/go-sdk/pkg/pubsub/pool"
)

type (
Subscriber struct {
client *sqs.Client
queueURL string
handler MsgHandler
maxMessages int
pool *pool.Pool
}

SubscriberConfig struct {
SQSClient *sqs.Client
MsgHandler MsgHandler
SQSConfig pubsub.SQS
}

MsgHandler func(msg types.Message)
)

func NewSubscriber(c SubscriberConfig) *Subscriber {
return &Subscriber{
client: c.SQSClient,
handler: c.MsgHandler,
maxMessages: c.SQSConfig.Subscriber.MaxMessages,
queueURL: c.SQSConfig.Subscriber.QueueURL,
}
}

func (s *Subscriber) Subscribe(ctx context.Context) chan error {
ch := make(chan error)

go func() {
defer close(ch)

for {
response, err := s.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
QueueUrl: aws.String(s.queueURL),
MaxNumberOfMessages: int32(s.maxMessages),
MessageAttributeNames: []string{"All"},
AttributeNames: []types.QueueAttributeName{"All"},
})

if err != nil {
ch <- err

return
}

for _, message := range response.Messages {
s.pool.Schedule(func() {
s.handler(message)
})
}
}

}()

return ch
}

0 comments on commit 0c1c94b

Please sign in to comment.