Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions common/component/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {
config.Consumer.Group.Session.Timeout = meta.SessionTimeout
config.ChannelBufferSize = meta.channelBufferSize

config.Producer.Compression = meta.internalCompression

config.Net.KeepAlive = meta.ClientConnectionKeepAliveInterval
config.Metadata.RefreshFrequency = meta.ClientConnectionTopicMetadataRefreshInterval

Expand Down
15 changes: 14 additions & 1 deletion common/component/kafka/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ const (
certificateAuthType = "certificate"
clientCert = "clientCert"
clientKey = "clientKey"
consumeRetryEnabled = "consumeRetryEnabled"
consumeRetryInterval = "consumeRetryInterval"
authType = "authType"
passwordAuthType = "password"
Expand All @@ -50,6 +49,7 @@ const (
consumerFetchDefault = "consumerFetchDefault"
channelBufferSize = "channelBufferSize"
valueSchemaType = "valueSchemaType"
compression = "compression"

// Kafka client config default values.
// Refresh interval < keep alive time so that way connection can be kept alive indefinitely if desired.
Expand Down Expand Up @@ -102,6 +102,10 @@ type KafkaMetadata struct {
consumerFetchMin int32 `mapstructure:"-"`
consumerFetchDefault int32 `mapstructure:"-"`

// configs for kafka producer
Compression string `mapstructure:"Compression"`
internalCompression sarama.CompressionCodec `mapstructure:"-"`

// schema registry
SchemaRegistryURL string `mapstructure:"schemaRegistryURL"`
SchemaRegistryAPIKey string `mapstructure:"schemaRegistryAPIKey"`
Expand Down Expand Up @@ -149,6 +153,7 @@ func (k *Kafka) getKafkaMetadata(meta map[string]string) (*KafkaMetadata, error)
ConsumeRetryEnabled: k.DefaultConsumeRetryEnabled,
ConsumeRetryInterval: 100 * time.Millisecond,
internalVersion: sarama.V2_0_0_0, //nolint:nosnakecase
internalCompression: sarama.CompressionNone,
channelBufferSize: 256,
consumerFetchMin: 1,
consumerFetchDefault: 1024 * 1024,
Expand Down Expand Up @@ -294,6 +299,14 @@ func (k *Kafka) getKafkaMetadata(meta map[string]string) (*KafkaMetadata, error)
m.internalVersion = version
}

if m.Compression != "" {
compression, err := parseCompression(m.Compression)
if err != nil {
return nil, err
}
m.internalCompression = compression
}

if val, ok := meta[channelBufferSize]; ok && val != "" {
v, err := strconv.Atoi(val)
if err != nil {
Expand Down
14 changes: 14 additions & 0 deletions common/component/kafka/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,18 +397,21 @@ func TestMetadataProducerValues(t *testing.T) {
require.NoError(t, err)
require.Equal(t, defaultClientConnectionTopicMetadataRefreshInterval, meta.ClientConnectionTopicMetadataRefreshInterval)
require.Equal(t, defaultClientConnectionKeepAliveInterval, meta.ClientConnectionKeepAliveInterval)
require.Equal(t, sarama.CompressionNone, meta.internalCompression)
})

t.Run("setting producer values explicitly", func(t *testing.T) {
k := getKafka()
m := getCompleteMetadata()
m[clientConnectionTopicMetadataRefreshInterval] = "3m0s"
m[clientConnectionKeepAliveInterval] = "4m0s"
m[compression] = "gzip"

meta, err := k.getKafkaMetadata(m)
require.NoError(t, err)
require.Equal(t, 3*time.Minute, meta.ClientConnectionTopicMetadataRefreshInterval)
require.Equal(t, 4*time.Minute, meta.ClientConnectionKeepAliveInterval)
require.Equal(t, sarama.CompressionGZIP, meta.internalCompression)
})

t.Run("setting producer invalid values so defaults take over", func(t *testing.T) {
Expand All @@ -422,6 +425,17 @@ func TestMetadataProducerValues(t *testing.T) {
require.Equal(t, defaultClientConnectionTopicMetadataRefreshInterval, meta.ClientConnectionTopicMetadataRefreshInterval)
require.Equal(t, defaultClientConnectionKeepAliveInterval, meta.ClientConnectionKeepAliveInterval)
})

t.Run("setting producer invalid compression value", func(t *testing.T) {
k := getKafka()
m := getCompleteMetadata()
m[compression] = "invalid"

meta, err := k.getKafkaMetadata(m)
require.Error(t, err)
require.Nil(t, meta)
require.Equal(t, "kafka error: invalid compression: invalid", err.Error())
})
}

func TestMetadataChannelBufferSize(t *testing.T) {
Expand Down
18 changes: 17 additions & 1 deletion common/component/kafka/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,22 @@ func parseInitialOffset(value string) (initialOffset int64, err error) {
return initialOffset, err
}

// parseCompression parses the compression codec from the given string.
// If the string is empty, it returns the default compression codec.
// If the string is not empty, it returns the parsed compression codec.
// If the string is not empty and not a valid compression codec, it returns an error.
// Supported compression codecs are: none, gzip, snappy, lz4, zstd.
func parseCompression(value string) (compression sarama.CompressionCodec, err error) {
compression = sarama.CompressionNone // Default
if value != "" {
unmarshalErr := compression.UnmarshalText([]byte(value))
if unmarshalErr != nil {
return sarama.CompressionNone, fmt.Errorf("kafka error: invalid compression: %s", value)
}
}
return compression, err
}

// isValidPEM validates the provided input has PEM formatted block.
func isValidPEM(val string) bool {
block, _ := pem.Decode([]byte(val))
Expand All @@ -64,7 +80,7 @@ func isValidPEM(val string) bool {
// TopicHandlerConfig is the map of topics and sruct containing handler and their config.
type TopicHandlerConfig map[string]SubscriptionHandlerConfig

// // TopicList returns the list of topics
// TopicList returns the list of topics
func (tbh TopicHandlerConfig) TopicList() []string {
topics := make([]string, len(tbh))
i := 0
Expand Down