Skip to content

Commit 3b7e459

Browse files
JoshVanLItalyPaleAle
authored andcommitted
PubSub Kafka: Respect Subscribe context
Updates the kafka consumer group to correctly respect context cancellation for individual topic subscriptions. The consumer will be reloaded for every topic subscription, or un-subscription, using the same consumer group. Ensures there are no infinite tight loops, and only a singular consumer is run at a time. Ensures all go routines have returned on close. Signed-off-by: joshvanl <[email protected]> Signed-off-by: ItalyPaleAle <[email protected]>
1 parent 0a002b9 commit 3b7e459

File tree

5 files changed

+153
-204
lines changed

5 files changed

+153
-204
lines changed

Diff for: bindings/kafka/kafka.go

+12-15
Original file line numberDiff line numberDiff line change
@@ -100,29 +100,26 @@ func (b *Binding) Read(ctx context.Context, handler bindings.Handler) error {
100100
return nil
101101
}
102102

103-
handlerConfig := kafka.SubscriptionHandlerConfig{
104-
IsBulkSubscribe: false,
105-
Handler: adaptHandler(handler),
106-
}
107-
for _, t := range b.topics {
108-
b.kafka.AddTopicHandler(t, handlerConfig)
109-
}
103+
ctx, cancel := context.WithCancel(ctx)
104+
110105
b.wg.Add(1)
111106
go func() {
112-
defer b.wg.Done()
113-
// Wait for context cancelation or closure.
114107
select {
115108
case <-ctx.Done():
116109
case <-b.closeCh:
117110
}
118-
119-
// Remove the topic handlers.
120-
for _, t := range b.topics {
121-
b.kafka.RemoveTopicHandler(t)
122-
}
111+
cancel()
112+
b.wg.Done()
123113
}()
124114

125-
return b.kafka.Subscribe(ctx)
115+
handlerConfig := kafka.SubscriptionHandlerConfig{
116+
IsBulkSubscribe: false,
117+
Handler: adaptHandler(handler),
118+
}
119+
120+
b.kafka.Subscribe(ctx, handlerConfig, b.topics...)
121+
122+
return nil
126123
}
127124

128125
func adaptHandler(handler bindings.Handler) kafka.EventHandler {

Diff for: common/component/kafka/consumer.go

+2-146
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,10 @@ limitations under the License.
1414
package kafka
1515

1616
import (
17-
"context"
1817
"errors"
1918
"fmt"
2019
"strconv"
2120
"sync"
22-
"sync/atomic"
2321
"time"
2422

2523
"github.com/IBM/sarama"
@@ -29,15 +27,8 @@ import (
2927
)
3028

3129
type consumer struct {
32-
k *Kafka
33-
ready chan bool
34-
running chan struct{}
35-
stopped atomic.Bool
36-
once sync.Once
37-
mutex sync.Mutex
38-
skipConsume bool
39-
consumeCtx context.Context
40-
consumeCancel context.CancelFunc
30+
k *Kafka
31+
mutex sync.Mutex
4132
}
4233

4334
func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
@@ -233,27 +224,9 @@ func (consumer *consumer) Cleanup(sarama.ConsumerGroupSession) error {
233224
}
234225

235226
func (consumer *consumer) Setup(sarama.ConsumerGroupSession) error {
236-
consumer.once.Do(func() {
237-
close(consumer.ready)
238-
})
239-
240227
return nil
241228
}
242229

243-
// AddTopicHandler adds a handler and configuration for a topic
244-
func (k *Kafka) AddTopicHandler(topic string, handlerConfig SubscriptionHandlerConfig) {
245-
k.subscribeLock.Lock()
246-
k.subscribeTopics[topic] = handlerConfig
247-
k.subscribeLock.Unlock()
248-
}
249-
250-
// RemoveTopicHandler removes a topic handler
251-
func (k *Kafka) RemoveTopicHandler(topic string) {
252-
k.subscribeLock.Lock()
253-
delete(k.subscribeTopics, topic)
254-
k.subscribeLock.Unlock()
255-
}
256-
257230
// checkBulkSubscribe checks if a bulk handler and config are correctly registered for provided topic
258231
func (k *Kafka) checkBulkSubscribe(topic string) bool {
259232
if bulkHandlerConfig, ok := k.subscribeTopics[topic]; ok &&
@@ -275,120 +248,3 @@ func (k *Kafka) GetTopicHandlerConfig(topic string) (SubscriptionHandlerConfig,
275248
return SubscriptionHandlerConfig{},
276249
fmt.Errorf("any handler for messages of topic %s not found", topic)
277250
}
278-
279-
// Subscribe to topic in the Kafka cluster, in a background goroutine
280-
func (k *Kafka) Subscribe(ctx context.Context) error {
281-
if k.consumerGroup == "" {
282-
return errors.New("kafka: consumerGroup must be set to subscribe")
283-
}
284-
285-
k.subscribeLock.Lock()
286-
defer k.subscribeLock.Unlock()
287-
288-
topics := k.subscribeTopics.TopicList()
289-
if len(topics) == 0 {
290-
// Nothing to subscribe to
291-
return nil
292-
}
293-
k.consumer.skipConsume = true
294-
295-
ctxCreateFn := func() {
296-
consumeCtx, cancel := context.WithCancel(context.Background())
297-
298-
k.consumer.consumeCtx = consumeCtx
299-
k.consumer.consumeCancel = cancel
300-
301-
k.consumer.skipConsume = false
302-
}
303-
304-
if k.cg == nil {
305-
cg, err := sarama.NewConsumerGroup(k.brokers, k.consumerGroup, k.config)
306-
if err != nil {
307-
return err
308-
}
309-
310-
k.cg = cg
311-
312-
ready := make(chan bool)
313-
k.consumer = consumer{
314-
k: k,
315-
ready: ready,
316-
running: make(chan struct{}),
317-
}
318-
319-
ctxCreateFn()
320-
321-
go func() {
322-
k.logger.Debugf("Subscribed and listening to topics: %s", topics)
323-
324-
for {
325-
// If the context was cancelled, as is the case when handling SIGINT and SIGTERM below, then this pops
326-
// us out of the consume loop
327-
if ctx.Err() != nil {
328-
k.logger.Info("Consume context cancelled")
329-
break
330-
}
331-
332-
k.logger.Debugf("Starting loop to consume.")
333-
334-
if k.consumer.skipConsume {
335-
continue
336-
}
337-
338-
topics = k.subscribeTopics.TopicList()
339-
340-
// Consume the requested topics
341-
bo := backoff.WithContext(backoff.NewConstantBackOff(k.consumeRetryInterval), ctx)
342-
innerErr := retry.NotifyRecover(func() error {
343-
if ctxErr := ctx.Err(); ctxErr != nil {
344-
return backoff.Permanent(ctxErr)
345-
}
346-
return k.cg.Consume(k.consumer.consumeCtx, topics, &(k.consumer))
347-
}, bo, func(err error, t time.Duration) {
348-
k.logger.Errorf("Error consuming %v. Retrying...: %v", topics, err)
349-
}, func() {
350-
k.logger.Infof("Recovered consuming %v", topics)
351-
})
352-
if innerErr != nil && !errors.Is(innerErr, context.Canceled) {
353-
k.logger.Errorf("Permanent error consuming %v: %v", topics, innerErr)
354-
}
355-
}
356-
357-
k.logger.Debugf("Closing ConsumerGroup for topics: %v", topics)
358-
err := k.cg.Close()
359-
if err != nil {
360-
k.logger.Errorf("Error closing consumer group: %v", err)
361-
}
362-
363-
// Ensure running channel is only closed once.
364-
if k.consumer.stopped.CompareAndSwap(false, true) {
365-
close(k.consumer.running)
366-
}
367-
}()
368-
369-
<-ready
370-
} else {
371-
// The consumer group is already created and consuming topics. This means a new subscription is being added
372-
k.consumer.consumeCancel()
373-
ctxCreateFn()
374-
}
375-
376-
return nil
377-
}
378-
379-
// Close down consumer group resources, refresh once.
380-
func (k *Kafka) closeSubscriptionResources() {
381-
if k.cg != nil {
382-
err := k.cg.Close()
383-
if err != nil {
384-
k.logger.Errorf("Error closing consumer group: %v", err)
385-
}
386-
387-
k.consumer.once.Do(func() {
388-
// Wait for shutdown to be complete
389-
<-k.consumer.running
390-
close(k.consumer.ready)
391-
k.consumer.once = sync.Once{}
392-
})
393-
}
394-
}

Diff for: common/component/kafka/kafka.go

+42-23
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"fmt"
2121
"strings"
2222
"sync"
23+
"sync/atomic"
2324
"time"
2425

2526
"github.com/IBM/sarama"
@@ -34,19 +35,24 @@ import (
3435

3536
// Kafka allows reading/writing to a Kafka consumer group.
3637
type Kafka struct {
37-
producer sarama.SyncProducer
38-
consumerGroup string
39-
brokers []string
40-
logger logger.Logger
41-
authType string
42-
saslUsername string
43-
saslPassword string
44-
initialOffset int64
38+
producer sarama.SyncProducer
39+
consumerGroup string
40+
brokers []string
41+
logger logger.Logger
42+
authType string
43+
saslUsername string
44+
saslPassword string
45+
initialOffset int64
46+
config *sarama.Config
47+
4548
cg sarama.ConsumerGroup
46-
consumer consumer
47-
config *sarama.Config
4849
subscribeTopics TopicHandlerConfig
4950
subscribeLock sync.Mutex
51+
consumerCancel context.CancelFunc
52+
consumerWG sync.WaitGroup
53+
closeCh chan struct{}
54+
closed atomic.Bool
55+
wg sync.WaitGroup
5056

5157
// schema registry settings
5258
srClient srclient.ISchemaRegistryClient
@@ -106,7 +112,7 @@ func NewKafka(logger logger.Logger) *Kafka {
106112
return &Kafka{
107113
logger: logger,
108114
subscribeTopics: make(TopicHandlerConfig),
109-
subscribeLock: sync.Mutex{},
115+
closeCh: make(chan struct{}),
110116
}
111117
}
112118

@@ -184,11 +190,11 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {
184190

185191
// Default retry configuration is used if no
186192
// backOff properties are set.
187-
if err := retry.DecodeConfigWithPrefix(
193+
if rerr := retry.DecodeConfigWithPrefix(
188194
&k.backOffConfig,
189195
metadata,
190-
"backOff"); err != nil {
191-
return err
196+
"backOff"); rerr != nil {
197+
return rerr
192198
}
193199
k.consumeRetryEnabled = meta.ConsumeRetryEnabled
194200
k.consumeRetryInterval = meta.ConsumeRetryInterval
@@ -207,22 +213,35 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {
207213
}
208214
k.logger.Debug("Kafka message bus initialization complete")
209215

216+
k.cg, err = sarama.NewConsumerGroup(k.brokers, k.consumerGroup, k.config)
217+
if err != nil {
218+
return err
219+
}
220+
210221
return nil
211222
}
212223

213-
func (k *Kafka) Close() (err error) {
214-
k.closeSubscriptionResources()
224+
func (k *Kafka) Close() error {
225+
defer k.wg.Wait()
226+
defer k.consumerWG.Wait()
215227

216-
if k.producer != nil {
217-
err = k.producer.Close()
218-
k.producer = nil
219-
}
228+
errs := make([]error, 2)
229+
if k.closed.CompareAndSwap(false, true) {
230+
close(k.closeCh)
231+
232+
if k.producer != nil {
233+
errs[0] = k.producer.Close()
234+
k.producer = nil
235+
}
236+
237+
if k.internalContext != nil {
238+
k.internalContextCancel()
239+
}
220240

221-
if k.internalContext != nil {
222-
k.internalContextCancel()
241+
errs[1] = k.cg.Close()
223242
}
224243

225-
return err
244+
return errors.Join(errs...)
226245
}
227246

228247
func getSchemaSubject(topic string) string {

0 commit comments

Comments
 (0)