-
-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add AWS' Simple Queue Service support for transport #1018
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First pass
transport/awssqs/consumer.go
Outdated
"github.com/go-kit/kit/transport" | ||
) | ||
|
||
// Consumer wraps an endpoint and provides and provides a handler for sqs msgs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please format all doc comments as full English sentences with punctuation and proper initialism capitalization, e.g.
// Consumer wraps an endpoint and provides and provides a handler for sqs msgs | |
// Consumer wraps an endpoint and provides a handler for SQS messages. |
This comment applies to all doc comments in the PR.
@0marq SQS Consumer. Each sqs message has a default flow: read / process / delete. I see that in your implementation the deleting remain on the user part in different ways and it is not obvious how to do better (ConsumerFinalizerFunc or ConsumerResponseFunc). If a message is not deleted, then it will be processed X time, depends on sqs queue configuration. Here should be a more obvious way: Add a ConsumerOption with DeleteMessage call which can be used beforeDecode or afterEncode, I can imagine when someone wants to remove and take all the responsibility on the processing message or someone want to delete after successful processing. |
SQS (Simple Queue Service), not query. |
transport/awssqs/consumer.go
Outdated
return err | ||
} | ||
|
||
if _, err := c.sqsClient.SendMessageWithContext(ctx, &responseMsg); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will introduce ResponsePublisher which it will responsible to publish the response or just run no operation func. How about that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see what that would improve over my implementation. If you think it really does please help me understand.
I have a WantReplyFunc that can be either based on a message attribute or simply return false or true all the time.
Depending on the output of this func, I proceed to the rest of the function which is in charge of encoding and sending a response or not.
I prefere my approach because I find it closer to gokit's approach (in NATS transport for example) where you have a dedicated encoding response function.
transport/awssqs/consumer.go
Outdated
} | ||
|
||
for _, msg := range msgs { | ||
if c.deleteMessage == BeforeHandle { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this process, before or after handle message can be done inside before or after function, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, because in before/after func you will do for all messages (for example you have read with 10 messages), so it wouldn't be right. Maybe there would be more "readable" to extract delete to another func?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, i see. Before is before you loop the messages, while after is for each message, weird. What do you think for my comment below? I borrow it from amqp transport.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I first thought that it would be up to the developer to delete the message inside an after func but @vrazdalovschi thought in #1018 (comment) that we should be responsible of deleting the message, not the developer given that this is part of the flow in handling an SQS message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xyluet So, yes, it can be done in before/after with some default flow (Detele in After), and to provide these Options as helpers. As it's done for DoNotExtendVisibilityTimeout
@0marq I meant to provide the more clear way how to delete the messages because it wasn't documented and no examples. I sure, that everyone needs a way to remove the messages before/after their processing. So, to add from the box these before/after consumer functions would be fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok I get your point now. But I have an issue. After and before functions do not allow errors to be returned.
Lets admit I have the following code
// ConsumerDeleteMessagesBefore returns a ConsumerOption that appends a function
// that delete all received messages from queue to the list of consumer's before functions.
func ConsumerDeleteMessagesBefore() ConsumerOption {
return func(c *Consumer) {
deleteBefore := func(ctx context.Context, msgs *[]*sqs.Message) context.Context {
for _, msg := range *msgs {
if err := deleteMessage(ctx, c.sqsClient, c.queueURL, msg); err != nil {
c.errorHandler.Handle(ctx, err)
c.errorEncoder(ctx, err, msg, c.sqsClient)
}
}
return ctx
}
c.before = append(c.before, deleteBefore)
}
}
// ConsumerDeleteMessageAfter returns a ConsumerOption that appends a function
// that delete a message from queue to the list of consumer's after functions.
func ConsumerDeleteMessageAfter() ConsumerOption {
return func(c *Consumer) {
deleteAfter := func(ctx context.Context, msg *sqs.Message, _ *sqs.SendMessageInput, leftMsgs *[]*sqs.Message, mux *sync.Mutex) context.Context {
if err := deleteMessage(ctx, c.sqsClient, c.queueURL, msg); err != nil {
c.errorHandler.Handle(ctx, err)
c.errorEncoder(ctx, err, msg, c.sqsClient)
}
return ctx
}
c.after = append(c.after, deleteAfter)
}
}
func deleteMessage(ctx context.Context, sqsClient sqsiface.SQSAPI, queueURL string, msg *sqs.Message) error {
_, err := sqsClient.DeleteMessageWithContext(ctx, &sqs.DeleteMessageInput{
QueueUrl: &queueURL,
ReceiptHandle: msg.ReceiptHandle,
})
return err
}
Is there anything more I can do if an error happens when deleting the message ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you want cancellation, you can cancel the context, and your endpoint should select if the context has been cancelled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would require adding a Context.CancelFunc in before and after parameters and calling the function in case of an error right ? I am going to wait for the response of #1018 (comment) before committing any changes here
I actually made my own implementation and using it on my production. The signature like below: //main.go
//build subscriber
subscriber := sqstransport.NewSubscriber(endpoint, dec, enc, opts...)
for {
out, _ := sqsClient.ReceiveMessage(ctx, input) //handle error
for _, msg := range out.Messages {
// Its caller responsibility if want to have many workers because its more easier to scale.
// Now you are free to extend the visibility timeout, or using hearbeat suggested by AWS.
subscriber.ServeMessage(&msg)
}
} What do you think? |
This would lighten the consumer's responsibilities because we would no longer need to handler visibility timeouts, while allowing multiple message processing. In the beginning I was thinking to go towards this approach but I ended up building a code more complete but also more complex, my motivation was that the consumer should not only process the messages but also fetch the messages. |
There's a great deal of complexity in this PR that I'm not qualified to judge, because I'm not a user of SQS. But I'll write my general expectations. And remember: Go kit transports aren't meant to be feature-complete clients of the technology they wrap — they're meant to provide a simple RPC-style (single request, single response) interface to their underlying transport. I would expect any kind of message broker or queue transport package to have a New{Subscriber, Consumer, ...} constructor that took enough configuration information to identify a single topic/stream/whatever of messages of the same schema. I would expect the transport to consume messages one-by-one from the topic. Each message should be fed to a DecodeRequestFunc that took the message in its native type and produced a request interface{}. That request should be fed to the endpoint and the result received and fed to an EncodeResponseFunc that (probably optionally) produced a native message type which would be published as a response. Acking or Nacking the original message is I guess implementation dependent. All of the details I'm seeing in this PR about working with batches of messages, retries, the entire concept of "left messages", handling additional message states, etc. etc. are in my opinion out of scope for a Go kit transport. These details should not be exposed to users. |
I understand.
|
…eiving messages, handling multiple messages and updating visibilitytimeout must be done by the users.
Given the new input from @peterbourgon I have simplified the code base : Receiving messages, handling multiple messages and updating visibilityTimeouts are left to the user. |
transport/awssqs/consumer.go
Outdated
// ServeMessage serves an SQS message. | ||
func (c Consumer) ServeMessage(ctx context.Context, msg *sqs.Message) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this implement an interface expected of something in the AWS SQS package? How should callers use it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No not really.
It should be used as depicted by @xyluet in #1018 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is some conceptual disconnect here.
A Go kit service is comprised of 1 or more endpoints, and each endpoint is exposed via 1 or more transports. By convention every Go kit transport package provides a type that exposes a single endpoint with unique DecodeRequest and EncodeResponse functions. But that type should compose into a larger "unit" which represents an entire service.
For example, transport/http.Server type isn't an sttdlib http.Server itself but an http.Handler, and you're supposed to mount multiple transport/http.Server handlers in a single mux to represent your service. Or, transport/nats.Subscriber isn't it's own client and consumer of the NATS topic, instead it implements nats.MsgHandler so that it can be composed by the caller into a larger consumer that receives multiple message types and dispatches them to the appropriate MsgHandler.
Concretely: users of this package shouldn't have to run ServeMessage loops for every transport/awssqs.Consumer they create (i.e. every endpoint in their service). They should create one SQS client/consumer/whatever with 1 or more transport/awssqs.Consumer types, each of which is fed messages by the "outer" component appropriately. I don't know the architecture of the SQS client lib so I don't know if this is natively supported or would have to be provided in this package too.
Does this make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't implement anything. I just borrow from amqp transport.
kit/transport/amqp/subscriber.go
Line 98 in 6c17021
func (s Subscriber) ServeDelivery(ch Channel) func(deliv *amqp.Delivery) { |
SQS SDK provides almost the same way as RabbitMQ when receiving messages. RabbitMQ is using channel, and we need to for loop the channel and pass a Delivery
. SQS has the same way, but we need to call ReceiveMessage
in for loop and getting array of messages from receive message output and we iterate the messages and pass it to our transport.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any update on how you feel about this, knowing that this pattern is already used in amqp transport @peterbourgon ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just changed the ServeMessage method signature to return a function. Is this what both of you had in mind ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any update guys ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello !
What are your thoughts about the last changes I committed b673cbe ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xyluet @peterbourgon Any update on this ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is some conceptual disconnect here.
A Go kit service is comprised of 1 or more endpoints, and each endpoint is exposed via 1 or more transports. By convention every Go kit transport package provides a type that exposes a single endpoint with unique DecodeRequest and EncodeResponse functions. But that type should compose into a larger "unit" which represents an entire service.
For example, transport/http.Server type isn't an sttdlib http.Server itself but an http.Handler, and you're supposed to mount multiple transport/http.Server handlers in a single mux to represent your service. Or, transport/nats.Subscriber isn't it's own client and consumer of the NATS topic, instead it implements nats.MsgHandler so that it can be composed by the caller into a larger consumer that receives multiple message types and dispatches them to the appropriate MsgHandler.
Concretely: users of this package shouldn't have to run ServeMessage loops for every transport/awssqs.Consumer they create (i.e. every endpoint in their service). They should create one SQS client/consumer/whatever with 1 or more transport/awssqs.Consumer types, each of which is fed messages by the "outer" component appropriately. I don't know the architecture of the SQS client lib so I don't know if this is natively supported or would have to be provided in this package too.
Does this make sense?
@peterbourgon
Regarding your second point above, SQS is just a poll based queue.
- Example SQS request that is direct from my service.
-
queueURLOutput, err := sqsclient.GetQueueUrl(&sqs.GetQueueUrlInput{QueueName: &queue}) if err != nil { logger.Info().Msg("failed to get queue url") os.Exit(1) } input := &sqs.ReceiveMessageInput{ QueueUrl: queueURLOutput.QueueUrl, WaitTimeSeconds: aws.Int64(20), // long polling MaxNumberOfMessages: aws.Int64(10), VisibilityTimeout: aws.Int64(2), } // output contains a slice of messages output, err := sqsclient.ReceiveMessageWithContext(ctx, input)
This is the receive message api endpoint from SQS:
ReceiveMessage(input *ReceiveMessageInput) (*ReceiveMessageOutput, error)
SQS has this concept of long polling and short polling. Long polling would be a long lived connection that lives for a maximum of 20 seconds, allowing the poller to return a response only when there is a message available (cheaper due to less network calls). With short polling, a response is returned immediately but may have an empty response(may require multiple requests to check for a message on the queue).
I've seen that most of the supported transports return functions due to the nature of the way the client of those transports are built. Example:
- http has ServerHTTP func which allows it to be used as a handler.
func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request)
- NATS has a MsgHandler which is a function that takes in a message as input. This gets returned as a function within the ServeMsg function
func (s Subscriber) ServeMsg(nc *nats.Conn) func(msg *nats.Msg)
sub, err := nc.QueueSubscribe("natstransport.test", "natstransport", handler.ServeMsg(nc))
I think that the SQS implementation should have some type of SQS muxer that dispatches SQS messages to the relevant consumers based off some sort of matching much like the other transports in go kit.
The problem with SQS is that is has a large interface and we wouldn't want to satisfy all interface functions.
Is this still being worked on? I'd love to use it for a project |
My two cents: I don't necessarily think that go-kit has to be the central repository of every possible transport implementation. The readme could mention third-party implementations for easy discoverability. Another thing that I'd require from these submissions is that they are already released in a third-party repository, have a few users in production, etc. Go-kit is released about once a year, so it's not the right place for testing new transports IMHO. |
Totally agree. As documented in #843 when generics lands Go kit will begin a process of paring down its core offerings to a lot less than is currently present. |
Hi,
@forepaas, we are currently starting a new project using go-kit and wanted to use AWS' Simple Query Service as a transport layer between some of our microservices.
I took the liberty of forking the project and implementing the support of SQS in go-kit's transport. This was discussed on issue #858 but was never implemented.
If you have any remarks, suggestions or questions please let me know.