-
Notifications
You must be signed in to change notification settings - Fork 506
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PubSub Kafka: Respect Subscribe context #3363
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3363 +/- ##
==========================================
+ Coverage 35.26% 35.30% +0.03%
==========================================
Files 244 245 +1
Lines 30969 30920 -49
==========================================
- Hits 10921 10915 -6
+ Misses 19116 19072 -44
- Partials 932 933 +1 ☔ View full report in Codecov by Sentry. |
Please note that in components-contrib we open all PR against main, and then cherry-pick in the relevant release branch (this has worked better here). |
3baf1fd
to
3b7e459
Compare
k.subscribeLock.Lock() | ||
defer k.subscribeLock.Unlock() | ||
for _, topic := range topics { | ||
k.subscribeTopics[topic] = handlerConfig |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this check if there's already a value for k.subscribeTopics
? Otherwise, we may be in a bad state, where one subscriber stopping causes the other subscriber to stop receiving messages (see the delete
below)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In principle yes, however this is the same behaviour as today and I don't want to change it for 1.13.
I think this check should exist in runtime, and cover all subscribers.
@@ -54,7 +54,9 @@ func (p *PubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, han | |||
Handler: adaptHandler(handler), | |||
ValueSchemaType: valueSchemaType, | |||
} | |||
return p.subscribeUtil(ctx, req, handlerConfig) | |||
|
|||
p.subscribeUtil(ctx, req, handlerConfig) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think we should batch this?
At startup, Dapr will likely invoke Subscribe
multiple times for different topics. This will cause the connection to Kafka to be destroyed and re-created multiple times. Perhaps here we could add a 200ms delay to batch all subscribe calls
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code in 1.13 right now doesn't destroy the connection to Kafka, it only recreates the context that the Consume function uses on every topic subscription. This PR should adhere to the same behavior
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, this code will only close the consumer group when the pubsub component is Close()
ed.
I think adding a batcher is a good idea. I've added one for 200ms. It uses a new struct in kit
, please see the PR here dapr/kit#85
3e51b58
to
aea9183
Compare
Updates the kafka consumer group to correctly respect context cancellation for individual topic subscriptions. The consumer will be reloaded for every topic subscription, or un-subscription, using the same consumer group. Ensures there are no infinite tight loops, and only a singular consumer is run at a time. Ensures all go routines have returned on close. Signed-off-by: joshvanl <[email protected]>
01f2de8
to
fdd54f0
Compare
Signed-off-by: joshvanl <[email protected]>
@JoshVanL please cherry pick to 1.13. |
Cherry pick #3363 Signed-off-by: joshvanl <[email protected]>
Updates the kafka consumer group to correctly respect context cancellation for individual topic subscriptions. The consumer will be reloaded for every topic subscription, or un-subscription, using the same consumer group. Ensures there are no infinite tight loops, and only a singular consumer is run at a time. Ensures all go routines have returned on close.