Skip to content

Commit cc45977

Browse files
authored
kafka consumer: Limit MaxBrokerReadBytes (#606)
Limits the `MaxBrokerReadBytes` and `MaxFetchBytes` to the maximum value that is accepted by `franz-go` library. --------- Signed-off-by: Marc Lopez Rubio <[email protected]>
1 parent 8dfaf4c commit cc45977

File tree

2 files changed

+175
-0
lines changed

2 files changed

+175
-0
lines changed

kafka/consumer.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"context"
2222
"errors"
2323
"fmt"
24+
"math"
2425
"strings"
2526
"sync"
2627
"time"
@@ -150,6 +151,34 @@ func (cfg *ConsumerConfig) finalize() error {
150151
if cfg.FetchMinBytes < 0 {
151152
errs = append(errs, errors.New("kafka: fetch min bytes cannot be negative"))
152153
}
154+
if cfg.BrokerMaxReadBytes < 0 {
155+
errs = append(errs, errors.New("kafka: broker max read bytes cannot be negative"))
156+
}
157+
if cfg.MaxPollPartitionBytes > 1<<30 {
158+
cfg.Logger.Info("kafka: MaxPollPartitionBytes exceeds 1GiB, setting to 1GiB")
159+
cfg.MaxPollPartitionBytes = 1 << 30
160+
}
161+
if cfg.BrokerMaxReadBytes > 1<<30 {
162+
cfg.Logger.Info("kafka: BrokerMaxReadBytes exceeds 1GiB, setting to 1GiB")
163+
cfg.BrokerMaxReadBytes = 1 << 30
164+
}
165+
if cfg.MaxPollBytes > 0 {
166+
// math.MaxInt32 is 1<<31-1.
167+
if cfg.MaxPollBytes > 1<<30 {
168+
cfg.Logger.Info("kafka: MaxPollBytes exceeds 1GiB, setting to 1GiB")
169+
cfg.MaxPollBytes = 1 << 30
170+
}
171+
if cfg.BrokerMaxReadBytes == 0 {
172+
cfg.Logger.Info("kafka: BrokerMaxReadBytes unset, setting to MaxPollBytes * 2 or 1GiB, whichever is smallest")
173+
cfg.BrokerMaxReadBytes = int32(math.Min(float64(cfg.MaxPollBytes)*2, 1<<30))
174+
}
175+
if cfg.BrokerMaxReadBytes > 0 && cfg.BrokerMaxReadBytes < cfg.MaxPollBytes {
176+
errs = append(errs, fmt.Errorf(
177+
"kafka: BrokerMaxReadBytes (%d) cannot be less than MaxPollBytes (%d)",
178+
cfg.BrokerMaxReadBytes, cfg.MaxPollBytes,
179+
))
180+
}
181+
}
153182
return errors.Join(errs...)
154183
}
155184

kafka/consumer_test.go

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -763,6 +763,152 @@ func TestConsumerTopicLogFieldFunc(t *testing.T) {
763763
})
764764
}
765765

766+
func TestConsumerConfigFinalizer(t *testing.T) {
767+
proc := apmqueue.ProcessorFunc(func(context.Context, apmqueue.Record) error { return nil })
768+
ccfg := CommonConfig{
769+
Brokers: []string{"localhost:9092"},
770+
Logger: zapTest(t),
771+
}
772+
t.Run("MaxPollBytes set to 1 << 20", func(t *testing.T) {
773+
cfg := ConsumerConfig{
774+
CommonConfig: ccfg,
775+
Processor: proc,
776+
Topics: []apmqueue.Topic{"topic"},
777+
GroupID: "groupid",
778+
MaxPollBytes: 1 << 20,
779+
MaxPollPartitionBytes: 1 << 20,
780+
}
781+
err := cfg.finalize()
782+
require.NoError(t, err)
783+
assert.NotNil(t, cfg.Processor)
784+
cfg.Processor = nil
785+
assert.NotNil(t, cfg.Logger)
786+
cfg.Logger = nil
787+
788+
assert.Equal(t, ConsumerConfig{
789+
CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}},
790+
Topics: []apmqueue.Topic{"topic"},
791+
GroupID: "groupid",
792+
MaxPollBytes: 1 << 20,
793+
MaxPollPartitionBytes: 1 << 20,
794+
BrokerMaxReadBytes: 1 << 21,
795+
}, cfg)
796+
})
797+
t.Run("MaxPollBytes set to 1 << 28", func(t *testing.T) {
798+
cfg := ConsumerConfig{
799+
CommonConfig: ccfg,
800+
Processor: proc,
801+
Topics: []apmqueue.Topic{"topic"},
802+
GroupID: "groupid",
803+
MaxPollBytes: 1 << 28,
804+
MaxPollPartitionBytes: 1 << 28,
805+
}
806+
err := cfg.finalize()
807+
require.NoError(t, err)
808+
assert.NotNil(t, cfg.Processor)
809+
cfg.Processor = nil
810+
assert.NotNil(t, cfg.Logger)
811+
cfg.Logger = nil
812+
813+
assert.Equal(t, ConsumerConfig{
814+
CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}},
815+
Topics: []apmqueue.Topic{"topic"},
816+
GroupID: "groupid",
817+
MaxPollBytes: 1 << 28,
818+
MaxPollPartitionBytes: 1 << 28,
819+
BrokerMaxReadBytes: 1 << 29,
820+
}, cfg)
821+
})
822+
t.Run("MaxPollBytes set to 1 << 29", func(t *testing.T) {
823+
cfg := ConsumerConfig{
824+
CommonConfig: ccfg,
825+
Processor: proc,
826+
Topics: []apmqueue.Topic{"topic"},
827+
GroupID: "groupid",
828+
MaxPollBytes: 1 << 29,
829+
MaxPollPartitionBytes: 1 << 29,
830+
}
831+
err := cfg.finalize()
832+
require.NoError(t, err)
833+
assert.NotNil(t, cfg.Processor)
834+
cfg.Processor = nil
835+
assert.NotNil(t, cfg.Logger)
836+
cfg.Logger = nil
837+
838+
assert.Equal(t, ConsumerConfig{
839+
CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}},
840+
Topics: []apmqueue.Topic{"topic"},
841+
GroupID: "groupid",
842+
MaxPollBytes: 1 << 29,
843+
MaxPollPartitionBytes: 1 << 29,
844+
BrokerMaxReadBytes: 1 << 30,
845+
}, cfg)
846+
})
847+
t.Run("MaxPollBytes set to 1 << 30", func(t *testing.T) {
848+
cfg := ConsumerConfig{
849+
CommonConfig: ccfg,
850+
Processor: proc,
851+
Topics: []apmqueue.Topic{"topic"},
852+
GroupID: "groupid",
853+
MaxPollBytes: 1 << 30,
854+
MaxPollPartitionBytes: 1 << 30,
855+
}
856+
err := cfg.finalize()
857+
require.NoError(t, err)
858+
assert.NotNil(t, cfg.Processor)
859+
cfg.Processor = nil
860+
assert.NotNil(t, cfg.Logger)
861+
cfg.Logger = nil
862+
863+
assert.Equal(t, ConsumerConfig{
864+
CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}},
865+
Topics: []apmqueue.Topic{"topic"},
866+
GroupID: "groupid",
867+
MaxPollBytes: 1 << 30,
868+
MaxPollPartitionBytes: 1 << 30,
869+
BrokerMaxReadBytes: 1 << 30,
870+
}, cfg)
871+
})
872+
t.Run("MaxPollBytes set to 1 << 31-1", func(t *testing.T) {
873+
cfg := ConsumerConfig{
874+
CommonConfig: ccfg,
875+
Processor: proc,
876+
Topics: []apmqueue.Topic{"topic"},
877+
GroupID: "groupid",
878+
MaxPollBytes: 1<<31 - 1,
879+
MaxPollPartitionBytes: 1<<31 - 1,
880+
}
881+
err := cfg.finalize()
882+
require.NoError(t, err)
883+
assert.NotNil(t, cfg.Processor)
884+
cfg.Processor = nil
885+
assert.NotNil(t, cfg.Logger)
886+
cfg.Logger = nil
887+
888+
assert.Equal(t, ConsumerConfig{
889+
CommonConfig: CommonConfig{Brokers: []string{"localhost:9092"}},
890+
Topics: []apmqueue.Topic{"topic"},
891+
GroupID: "groupid",
892+
MaxPollBytes: 1 << 30,
893+
MaxPollPartitionBytes: 1 << 30,
894+
BrokerMaxReadBytes: 1 << 30,
895+
}, cfg)
896+
})
897+
t.Run("BrokerMaxReadBytes is less than MaxPollBytes", func(t *testing.T) {
898+
cfg := ConsumerConfig{
899+
CommonConfig: ccfg,
900+
Processor: proc,
901+
Topics: []apmqueue.Topic{"topic"},
902+
GroupID: "groupid",
903+
BrokerMaxReadBytes: 1,
904+
MaxPollBytes: 1<<31 - 1,
905+
MaxPollPartitionBytes: 1<<31 - 1,
906+
}
907+
err := cfg.finalize()
908+
assert.EqualError(t, err, "kafka: BrokerMaxReadBytes (1) cannot be less than MaxPollBytes (1073741824)")
909+
})
910+
}
911+
766912
func newConsumer(t testing.TB, cfg ConsumerConfig) *Consumer {
767913
if cfg.MaxPollWait <= 0 {
768914
// Lower MaxPollWait, ShutdownGracePeriod to speed up execution.

0 commit comments

Comments
 (0)