Description
Currently, the Kafka source plugin does not retrieve key, partition, offset or header information from Kafka messages, even though the underlying library (github.com/segmentio/kafka-go) already supports these parameters. This omission prevents users from accessing important metadata that can be crucial for debugging, routing, or processing logic.
Proposed Solution:
Extend the existing Kafka source plugin to include the retrieval of:
- Message Key
- Partition
- Offset
- Headers
- Topic
Update any existing data structures or interfaces within the plugin to store and return these additional metadata fields.
Ensure that backward compatibility is preserved if the plugin is already being used without this metadata.
Example Reference: Below is a sample code snippet using kafka-go that demonstrates how to read the topic, partition, offset, key, value and headers:
import (
"context"
"fmt"
"log"
"os"
"strings"
kafka "github.com/segmentio/kafka-go"
)
func getKafkaReader(kafkaURL, topic, groupID string) *kafka.Reader {
brokers := strings.Split(kafkaURL, ",")
return kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
GroupID: groupID,
Topic: topic,
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})
}
func main() {
kafkaURL := os.Getenv("kafkaURL")
topic := os.Getenv("topic")
groupID := os.Getenv("groupID")
reader := getKafkaReader(kafkaURL, topic, groupID)
defer reader.Close()
fmt.Println("start consuming ... !!")
for {
m, err := reader.ReadMessage(context.Background())
if err != nil {
log.Fatalln(err)
}
fmt.Printf("message at topic:%v partition:%v offset:%v %s = %s headers:%v\n",
m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value), m.Headers)
}
}
Event on kafka topic:
key: dev000001
header : {
"app.id": "best-app-ever"
}
value: { "temperature":32.7063, "createTime":1735896706186, "correlationId":"bb889204-4ad7-4652-b18a-bc465592302d"}
output:
consumer-logger-1 | message at topic:connect.edge.raw.temperature partition:0 offset:31106 dev000001 = {"temperature":32.7063,"createTime":1735896706186,"correlationId":"bb889204-4ad7-4652-b18a-bc465592302d"} headers:[{app.id best-app-ever}]