-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
Search before asking
- I searched in the issues and found nothing similar.
Version
all
Minimal reproduce step
1、create producer1 by schema [Schema.JSON(String.class)]
Producer<String> producer = client.newProducer(Schema.JSON(String.class)) .topic("test-tenant/test-ns/test-topic").enableBatching(false).create();
and send a message
2、create producer2 by schema [Schema.AUTO_PRODUCE_BYTES()]
Producer<byte[]> producer = client.newProducer(Schema.AUTO_PRODUCE_BYTES()) .topic("test-tenant/test-ns/test-topic").enableBatching(false).create();
we will encounter an error:

3、execute schemas admin: sh bin/pulsar-admin schemas get test-tenant/test-ns/test-topic
We will see the following error:

What did you expect to see?
Create producer use Schema.AUTO_PRODUCE_BYTES() will success and admin schema get return a result.
What did you see instead?
Throw exception: java.lang.IllegalStateException: Not a JSON Object: "string"
Anything else?
1、There is no problem if the user send string type message uses Schema.STRING instead of Schema.JSON(String.class). The Schema.JSON(String.class) unable to reverse parse.
2、This will also cause problems when use geo-replication if the remote topics has a incorrect schema:
The incorrect schema:
{
"name" : "test-topic",
"schema" : "InN0cmluZyI=",
"type" : "JSON",
"properties" : {
"__jsr310ConversionEnabled" : "false",
"__alwaysAllowNull" : "true"
},
"schemaDefinition" : ""string""
}
The current logic is create geo producer:
this.producerBuilder = replicationClient.newProducer(Schema.AUTO_PRODUCE_BYTES()) // .topic(remoteTopicName) .messageRoutingMode(MessageRoutingMode.SinglePartition) .enableBatching(false) .sendTimeout(0, TimeUnit.SECONDS) // .maxPendingMessages(producerQueueSize) // .producerName(getProducerName());
Are you willing to submit a PR?
- I'm willing to submit a PR!