Skip to content

Cannot deserialize Array[Byte] fields via Avro with Schema Registry #1330

@willneedham93

Description

@willneedham93

Hello

Firstly big fan of the library - has made interfacing with Kafka a breeze! Apologies if this is the wrong place to raise this and I should be looking at the vulcan repository instead.

I have recently hit an issue when trying to deserialise Array[Byte] keys and values when using the Vulcan module with a Schema Registry. When referenced in nested models this data type works fine. I'm aware this is a slightly odd interface, but unfortunately it is what we have to work with.

vulcan.AvroException$$anon$1: Error decoding Array[Byte]: Got unexpected type byte[], expected type ByteBuffer
  at vulcan.AvroException$.apply(AvroException.scala:18)
  at vulcan.AvroError$ErrorDecodingType.throwable(AvroError.scala:93)
  at fs2.kafka.vulcan.AvroDeserializer$.$anonfun$createDeserializer$extension$2(AvroDeserializer.scala:57)
  at defer @ fs2.kafka.vulcan.AvroDeserializer$.$anonfun$createDeserializer$3(AvroDeserializer.scala:40)
  at defer @ fs2.kafka.vulcan.AvroSerializer$.$anonfun$create$3(AvroSerializer.scala:38)
  at product$extension @ fs2.kafka.KafkaProducer$.serializeToBytes(KafkaProducer.scala:242)
  at map @ fs2.kafka.ConsumerRecord$.fromJava(ConsumerRecord.scala:208)
  at map @ fs2.kafka.internal.KafkaConsumerActor.$anonfun$records$2(KafkaConsumerActor.scala:275)
  at traverse @ fs2.kafka.internal.KafkaConsumerActor.$anonfun$records$1(KafkaConsumerActor.scala:272)
  at map @ fs2.kafka.internal.KafkaConsumerActor.$anonfun$records$1(KafkaConsumerActor.scala:277)
  at traverse @ fs2.kafka.KafkaConsumer$$anon$1.$anonfun$partitionsMapStream$28(KafkaConsumer.scala:280)
  at traverse @ fs2.kafka.KafkaConsumer$$anon$1.$anonfun$partitionsMapStream$28(KafkaConsumer.scala:280)
  at traverse @ fs2.kafka.KafkaConsumer$$anon$1.$anonfun$partitionsMapStream$35(KafkaConsumer.scala:317)
  at map @ fs2.kafka.internal.KafkaConsumerActor.records(KafkaConsumerActor.scala:279)
  at delay @ fs2.kafka.internal.Blocking$$anon$2.apply(Blocking.scala:28)
  at flatMap @ fs2.kafka.internal.KafkaConsumerActor.pollConsumer$1(KafkaConsumerActor.scala:303)
  at flatMap @ fs2.kafka.internal.KafkaConsumerActor.$anonfun$poll$12(KafkaConsumerActor.scala:423)
  at flatMap @ fs2.kafka.internal.KafkaConsumerActor.<init>(KafkaConsumerActor.scala:420)

I have reproduced in a simple test scenario running using the confluent 7.5.3 kafka stack via testcontainers in

Scala version: 2.12.18
fs2-kafka version: 3.5.1

//Fails
"SerDes" should "Serialize and deserialize ByteArray keys" in {

    val as: AvroSettings[IO] = AvroSettings {
      SchemaRegistryClientSettings[IO](schemaRegistryUrl)
    }

    val producerSettings =
      ProducerSettings[IO, Array[Byte], String](avroSerializer[Array[Byte]].forKey(as), avroSerializer[String].forValue(as))
        .withBootstrapServers(bootstrapServers)

    val consumerSettings =
      ConsumerSettings[IO, Array[Byte], String](avroDeserializer[Array[Byte]].forKey(as), avroDeserializer[String].forValue(as))
        .withBootstrapServers(bootstrapServers)
        .withAutoOffsetReset(AutoOffsetReset.Earliest)
        .withGroupId(UUID.randomUUID().toString)

    val topic = "inputTopic1"
    val keyBytes = "123".getBytes
    val value = "Test"

    val test = for {
      _ <- KafkaProducer.resource(producerSettings).use(producer => producer.produce(ProducerRecords.one(ProducerRecord(topic, keyBytes, value))))
      _ <- IO.sleep(1.second)
      result <- KafkaConsumer.resource(consumerSettings).use(consumer => consumer.subscribeTo(topic).flatMap(_ => consumer.records.take(1).compile.toList.map(_.head)))
    } yield (result.record.key, result.record.value)

    test.unsafeRunSync() shouldBe (keyBytes, value)

  }

  //Succeeds
  it should "Serialize and deserialize String keys and values" in {

    val as: AvroSettings[IO] = AvroSettings {
      SchemaRegistryClientSettings[IO](schemaRegistryUrl)
    }

    val producerSettings =
      ProducerSettings[IO, String, String](avroSerializer[String].forKey(as), avroSerializer[String].forValue(as))
        .withBootstrapServers(bootstrapServers)

    val consumerSettings =
      ConsumerSettings[IO, String, String](avroDeserializer[String].forKey(as), avroDeserializer[String].forValue(as))
        .withBootstrapServers(bootstrapServers)
        .withAutoOffsetReset(AutoOffsetReset.Earliest)
        .withGroupId(UUID.randomUUID().toString)

    val topic = "inputTopic3"
    val key = "123"
    val value = "Test"

    val test = for {
      _ <- KafkaProducer.resource(producerSettings).use(producer => producer.produce(ProducerRecords.one(ProducerRecord(topic, key, value))))
      _ <- IO.sleep(1.second)
      result <- KafkaConsumer.resource(consumerSettings).use(consumer => consumer.subscribeTo(topic).flatMap(_ => consumer.records.take(1).compile.toList.map(_.head)))
    } yield (result.record.key, result.record.value)

    test.unsafeRunSync() shouldBe (key, value)

  }

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions