Skip to content

Commit 422f99d

Browse files
committed
Address review comments
Signed-off-by: Marc Lopez Rubio <[email protected]>
1 parent 7acaf46 commit 422f99d

File tree

1 file changed

+11
-4
lines changed

1 file changed

+11
-4
lines changed

kafka/consumer.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"context"
2222
"errors"
2323
"fmt"
24+
"math"
2425
"strings"
2526
"sync"
2627
"time"
@@ -150,18 +151,24 @@ func (cfg *ConsumerConfig) finalize() error {
150151
if cfg.FetchMinBytes < 0 {
151152
errs = append(errs, errors.New("kafka: fetch min bytes cannot be negative"))
152153
}
154+
if cfg.BrokerMaxReadBytes < 0 {
155+
errs = append(errs, errors.New("kafka: broker max read bytes cannot be negative"))
156+
}
157+
if cfg.BrokerMaxReadBytes > 1<<30 {
158+
cfg.Logger.Info("kafka: BrokerMaxReadBytes exceeds 1GiB, setting to 1GiB")
159+
cfg.BrokerMaxReadBytes = 1 << 30
160+
}
153161
if cfg.MaxPollBytes > 0 {
154162
// math.MaxInt32 is 1<<31-1.
155163
if cfg.MaxPollBytes > 1<<30 {
164+
cfg.Logger.Info("kafka: MaxPollBytes exceeds 1GiB, setting to 1GiB")
156165
cfg.MaxPollBytes = 1 << 30
157166
}
158167
if cfg.BrokerMaxReadBytes == 0 {
159-
cfg.BrokerMaxReadBytes = cfg.MaxPollBytes * 2
168+
cfg.Logger.Info("kafka: BrokerMaxReadBytes unset, setting to MaxPollBytes * 2 or 1GiB, whichever is smallest")
169+
cfg.BrokerMaxReadBytes = int32(math.Min(float64(cfg.MaxPollBytes)*2, 1<<30))
160170
}
161171
}
162-
if cfg.BrokerMaxReadBytes < 0 || cfg.BrokerMaxReadBytes > 1<<30 {
163-
cfg.BrokerMaxReadBytes = 1 << 30
164-
}
165172
return errors.Join(errs...)
166173
}
167174

0 commit comments

Comments
 (0)