-
Notifications
You must be signed in to change notification settings - Fork 120
Message
The AMQP protocol has a well defined data type system and metadata for describing content other than an opaque body, which can be encoded using the same AMQP data type system or handled as raw data bytes. Apache Kafka doesn't have such rich features on transferring messages which are handled as raw bytes.
In order to translate AMQP messages between AMQP client and Apache Kafka, a MessageConverter
interface is defined with following two methods :
- toKafkaRecord : handles the conversion between an AMQP message to Kafka record;
- toAmqpMessage : translated a Kafka record to an AMQP messages;
The message converter is pluggable through the message.convert
property inside the bridge configuration file (bridge.properties).
The bridge provides a DefaultMessageConverter
(as default) and pluggable JsonMessageConverter
and RawMessageConverter
converters.
From a Kafka point of view all records (produced and consumed) are defined with a String
for the key and a byte[]
array for the value.
It's the simplest converter which works in the following way.
From AMQP message to Kafka record.
- All properties, application properties, message annotations are lost. They are not encoded in any way in the Kafka record;
- If partition and key are specified as message annotations, they are get in order to specify partition and key for topic destination in the Kafka record;
- The AMQP body always handled as bytes and put inside the Kafka record value. The converter supports AMQP value and raw data/binary encoding;
From Kafka record to AMQP message.
- No properties, application properties, message annotations are generated/filled;
- Only partition, offset and key message annotations are filled from the Kafka record related information;
- The AMQP body is encoded as raw data/binary from the corresponding Kafka record value;
This converter translates and brings all main AMQP message information/metadata/body in a JSON format and it works in the following way.
From AMQP message to Kafka record.
The converter generated a JSON document with following structure :
- All main properties (messageId, to, subject, ...) are converted in a JSON map named "properties" with property name/property value pairs;
- All application properties are converted in a JSON map named "applicationProperties" with property name/property value pairs;
- All message annotations are converted in a JSON map named "messageAnnotations" with annotation name/annotation value pairs. If partition and key are specified as message annotations, they are get in order to specify partition and key for topic destination in the Kafka record;
- The body is encoded in a JSON map named "body" with a "type" field which specifies if it's AMQP value or raw data encoded and a "section" field containing the body content. A raw data bytes section is Base64 encoded;
From Kafka record to AMQP message.
Starting from the received JSON document fro Kafka record it produce an AMQP message in the following way :
- All main properties (messageId, to, subject, ...) are filled from the corresponding JSON map named "properties";
- All application properties are filled from the corresponding JSOM map (if present) named "applicationProperties";
- All message annotations are filled from the corresponding JSON map (if present) named "messageAnnotations". The annotations related to partition, offset and key will be always filled;
- The body is encoded as AMQP value or raw data bytes as specified by the "type" field of "body" and the content is get from the "section" field;
{
"properties": {
"to": ...
"messageId": ...
"subject": ...
"replyTo": ...
"correlationId": ...
}
"applicationProperties": {
"prop1": ...
...
"propN": ...
}
"messageAnnotations": {
"partition": ...
"offset": ...
"key": ...
"ann1": ...
...
"annN": ...
}
"body": {
"type": ...
"section": ...
}
}
This converter doesn't apply any real conversion and works in the following way.
From AMQP message to Kafka record :
- If partition and key are specified as message annotations, they are get in order to specify partition and key for topic destination in the Kafka record;
- The message is encoded as raw bytes in order to put them inside value of Kafka record;
From Kafka record to AMQP message :
- The message is decoded from raw bytes which represents the value inside the Kafka record;
- The annotations related to partition, offset and key are filled;