Skip to content

[BUG] Exception thrown by iceberg when using table topic when event schema changed #3006

@dumbNickname

Description

@dumbNickname

AutoMQ Version

v1.5.5

Operating System

Docker on MacOS

Installation Method

docker

Hardware Configuration

No response

Other Relevant Software

https://raw.githubusercontent.com/AutoMQ/automq/refs/tags/1.5.5/docker/table_topic/docker-compose.yml

with akhq added

What Went Wrong?

When experimenting with producing events that seemed to follow compatible schema evolution (at least according to my understanding) the event is accepted by automq and schema registry but an error is thrown when iceberg tries to process it. Later no other events get processed and error is thrown in the background all the time as it seems to be triggered from an EventLoop

automq           | java.lang.ArrayIndexOutOfBoundsException: Index 9 out of bounds for length 9
automq           |      at org.apache.iceberg.data.GenericRecord.get(GenericRecord.java:114)
automq           |      at org.apache.iceberg.data.parquet.GenericParquetWriter$RecordWriter.get(GenericParquetWriter.java:48)
automq           |      at org.apache.iceberg.data.parquet.GenericParquetWriter$RecordWriter.get(GenericParquetWriter.java:41)
automq           |      at org.apache.iceberg.parquet.ParquetValueWriters$StructWriter.write(ParquetValueWriters.java:580)
automq           |      at org.apache.iceberg.parquet.ParquetWriter.add(ParquetWriter.java:135)
automq           |      at org.apache.iceberg.io.DataWriter.write(DataWriter.java:71)
automq           |      at org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter.write(BaseTaskWriter.java:392)
automq           |      at org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter.write(BaseTaskWriter.java:375)
automq           |      at org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.write(BaseTaskWriter.java:307)
automq           |      at org.apache.iceberg.io.PartitionedFanoutWriter.write(PartitionedFanoutWriter.java:63)
automq           |      at kafka.automq.table.worker.IcebergWriter.write0(IcebergWriter.java:149)
automq           |      at kafka.automq.table.worker.IcebergWriter.write(IcebergWriter.java:96)
automq           |      at kafka.automq.table.worker.PartitionWriteTask.handleReadResult(PartitionWriteTask.java:121)
automq           |      at kafka.automq.table.worker.PartitionWriteTask.lambda$run0$0(PartitionWriteTask.java:70)
automq           |      at kafka.automq.table.worker.EventLoops$EventLoopRef.lambda$execute$0(EventLoops.java:142)
automq           |      at kafka.automq.table.worker.EventLoops$EventLoopRef.lambda$execute$1(EventLoops.java:154)
automq           |      at com.automq.stream.utils.threads.EventLoop.run(EventLoop.java:69)
automq           | [2025-11-13 22:05:10,462] ERROR [HANDLE_READ_RESULT_FAIL],dedw000000000128_events_page_view-0,2-3 (kafka.automq.table.worker.PartitionWriteTask)
automq           | java.io.IOException: java.lang.ArrayIndexOutOfBoundsException: Index 9 out of bounds for length 9
automq           |      at kafka.automq.table.worker.IcebergWriter.write(IcebergWriter.java:115)
automq           |      at kafka.automq.table.worker.PartitionWriteTask.handleReadResult(PartitionWriteTask.java:121)
automq           |      at kafka.automq.table.worker.PartitionWriteTask.lambda$run0$0(PartitionWriteTask.java:70)
automq           |      at kafka.automq.table.worker.EventLoops$EventLoopRef.lambda$execute$0(EventLoops.java:142)
automq           |      at kafka.automq.table.worker.EventLoops$EventLoopRef.lambda$execute$1(EventLoops.java:154)
automq           |      at com.automq.stream.utils.threads.EventLoop.run(EventLoop.java:69)
automq           | Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 9 out of bounds for length 9
automq           |      at org.apache.iceberg.data.GenericRecord.get(GenericRecord.java:114)
automq           |      at org.apache.iceberg.data.parquet.GenericParquetWriter$RecordWriter.get(GenericParquetWriter.java:48)
automq           |      at org.apache.iceberg.data.parquet.GenericParquetWriter$RecordWriter.get(GenericParquetWriter.java:41)
automq           |      at org.apache.iceberg.parquet.ParquetValueWriters$StructWriter.write(ParquetValueWriters.java:580)
automq           |      at org.apache.iceberg.parquet.ParquetWriter.add(ParquetWriter.java:135)
automq           |      at org.apache.iceberg.io.DataWriter.write(DataWriter.java:71)
automq           |      at org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter.write(BaseTaskWriter.java:392)
automq           |      at org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter.write(BaseTaskWriter.java:375)
automq           |      at org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.write(BaseTaskWriter.java:307)
automq           |      at org.apache.iceberg.io.PartitionedFanoutWriter.write(PartitionedFanoutWriter.java:63)
automq           |      at kafka.automq.table.worker.IcebergWriter.write0(IcebergWriter.java:149)
automq           |      at kafka.automq.table.worker.IcebergWriter.write(IcebergWriter.java:96)
automq           |      ... 5 more

What Should Have Happened Instead?

To be honest I am not sure. I am new to iceberg and usage of schemas in message queues. I was just researching the toy example to make sure what is needed in the setup and to understand it better. My expectation would be that either event gets rejected early or if already present in the topic (after validation in schema registry) then processed properly or at least not cause automq (or more precisely iceberg) to stop next events processing

Steps to Reproduce

  1. Run the "toy" example from https://raw.githubusercontent.com/AutoMQ/automq/refs/tags/1.5.5/docker/table_topic/docker-compose.yml
  2. Open the example notebook
  3. In my case I simplified the topic config, probably not relevant but sharing for completeness
topic_config = {
    'automq.table.topic.enable': 'true',
    'automq.table.topic.commit.interval.ms': '10000', # 10s
    'automq.table.topic.schema.type': 'schema',
    'automq.table.topic.upsert.enable': 'false', 
    'automq.table.topic.partition.by': '[truncate(event.type, 100), hour(collect_timestamp)]'
}
create_topic(TOPIC_NAME, config=topic_config)
  1. Produce any base event - I named it "original"
original_events = json.loads(json.dumps(json_events))
original_events[0]["event"]["id"] = "original"

# I slightly simplified the produce method as I do not need a key serializer and just moved the avro serlilizer there. I hope this example will be still easy to follow
produce_events(
    producer=producer,
    topic_name=TOPIC_NAME,
    events_data=original_events,
    schema_str=schema_str
)
  1. Insert compatible event with one new field and a matching schema (no problems here)
compatible_event = json.loads(json.dumps(json_events))[0]
compatible_event["event"]["id"] = "compatible_event_new_field_new_schema"
compatible_event["this_field_is_new"] = "new kid on the block"

next_schema_1 = json.loads(json.dumps(avro_schema))
next_schema_1['fields'].extend([
    {
        "name": "this_field_is_new", 
        "type": ["null", "string"], # optional
        "default": None, 
        "doc": "Testing things"
    }
])


produce_events(
    producer=producer,
    topic_name=TOPIC_NAME,
    events_data=[compatible_event],
    schema_str=json.dumps(next_schema_1, indent=2)
)
  1. Add event with another new field, but just without the new field that was added before
compatible_event = json.loads(json.dumps(json_events))[0]
compatible_event["event"]["id"] = "maybe_compatible_event_another_new_field_without_first_added_field"
compatible_event["another_field_is_new"] = "another kid on the block"

next_schema_2 = json.loads(json.dumps(avro_schema))
next_schema_2['fields'].extend([
    {
        "name": "another_field_is_new", 
        "type": "string", # obligatory
        "default": None, 
        "doc": "Testing things"
    }
])


produce_events(
    producer=producer,
    topic_name=TOPIC_NAME,
    events_data=[compatible_event],
    schema_str=json.dumps(next_schema_2, indent=2)
)
  1. Event is properly added to the topic, new schema is created so on the message queue part all looks fine. Unfortunately in the logs exception starts to be thrown and next events being produced will be ignored. By ignored I mean I do not see those later when getting iceberg data via duckdb.

Additional Information

I am not sure if I provided all the needed information - please let me know if sth is missing. Would be great to get a response if this is a bug and some hints on what should be the expected behavior. Then maybe I can try to look at the code and if I manage to find myself I can try to help implement some fix.

Thanks in advance for your help!

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions