-
Notifications
You must be signed in to change notification settings - Fork 380
Open
Labels
Description
Trying to backup & restore topic using ByteArrayConverter's custom implementation and got headers with STRING schema instead of BYTES on recovery phase.
Connectors configuration:
{
"connector.class": "io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector",
"connect.s3.kcql": "insert into encryptSource select * from kafka:encryptSink STOREAS `JSON` PROPERTIES ('store.envelope'=true); insert into jsonSource select * from kafka:jsonSink STOREAS `JSON` PROPERTIES ('store.envelope'=true)",
"initial_state": "STOPPED",
"connect.s3.custom.endpoint": "http://10.x.x.x:8082",
"topics": "encryptSource,jsonSource",
"tasks.max": "1",
"name": "s3-source-encrypt",
"connect.s3.vhost.bucket": "true",
"connect.s3.aws.auth.mode": "Credentials",
"connect.s3.aws.access.key": "none",
"connect.s3.aws.region": "eu-west-2",
"connect.s3.aws.secret.key": "none"
}
{
"connector.class": "io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector",
"connect.s3.kcql": "insert into kafka select * from `*` STOREAS `JSON` PROPERTIES ('flush.count'=1,'store.envelope'=true)",
"initial_state": "STOPPED",
"connect.s3.custom.endpoint": "http://10.x.x.x:8082",
"topics": "encryptSink,jsonSink",
"tasks.max": "1",
"name": "s3-sink-encrypt",
"connect.s3.vhost.bucket": "true",
"connect.s3.aws.auth.mode": "Credentials",
"connect.s3.aws.access.key": "none",
"connect.s3.aws.region": "eu-west-2",
"connect.s3.aws.secret.key": "none"
}
S3 data:
{
"headers": {
"encryptSinkHeader1": "8En+w2J/uYPrB4+QDjqdAw==",
"encryptSinkHeader2": "8En+w2J/uYPrB4+QDjqdAw=="
},
"metadata": {
"partition": 0,
"offset": 0,
"topic": "encryptSink",
"timestamp": 1758198306830
},
"keyIsArray": true,
"value": "mGjMWUbEa4AHb/cIXDOMNjwFI/uK7a+wJXD86Or/krs=",
"key": "mGjMWUbEa4AHb/cIXDOMNlXiB4g/FG266dAXukFfp5I=",
"valueIsArray": true
}
Since header schema is STRING gaining the following exception:
Caused by: org.apache.kafka.connect.errors.DataException: Invalid schema type for EncryptedByteArrayConverter: STRING
Where converter implementation is:
@Override
public byte[] fromConnectData(String topic, Schema schema, Object value) {
if (schema != null && schema.type() != Schema.Type.BYTES) {
throw new DataException("Invalid schema type for EncryptedByteArrayConverter: " + schema.type().toString());
}
...
BTW, everything works fine for keys and values.