-
Notifications
You must be signed in to change notification settings - Fork 5
feat: add support for Avro message production with new schema versions (V3 and V4) #100
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
❯ just -f append-scenario/justfile create-topic
docker exec automq /opt/automq/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic orders --partitions 16 --config automq.table.topic.enable=true --config automq.table.topic.commit.interval.ms=1000 --config automq.table.topic.convert.value.type=by_schema_id --config automq.table.topic.transform.value.type=flatten --config automq.table.topic.namespace=default || true
Created topic orders.❯ just -f append-scenario/justfile send-auto
Producing 10 Avro messages (auto-register) to orders
SCHEMA_REGISTRY_URL=http://schema-registry:8081 bash append-scenario/scripts/produce-avro-auto.sh orders 10
[append] Producing 10 Avro messages to topic orders (auto-register)❯ just -f append-scenario/justfile show-ddl
Executing SQL (Trino): SHOW CREATE TABLE iceberg.default."orders"
docker compose -f docker-compose.yml exec trino trino --execute 'SHOW CREATE TABLE iceberg.default."orders"' --output-format ALIGNED --catalog iceberg --schema default
Create Table
------------------------------------------------------------------------------------------------------------
CREATE TABLE iceberg.default.orders (
order_id bigint NOT NULL,
product_name varchar NOT NULL,
order_description varchar NOT NULL,
_kafka_header map(varchar, varbinary) COMMENT 'Kafka record headers',
_kafka_key varchar COMMENT 'Kafka record key',
_kafka_metadata ROW(partition integer, offset bigint, timestamp bigint) COMMENT 'Kafka record metadata'
)
WITH (
format = 'PARQUET',
format_version = 2,
location = 's3://warehouse/default/orders',
object_store_layout_enabled = true
)
(1 row)❯ just -f append-scenario/justfile send-auto-v2
Producing 10 Avro messages (OrderV2 aggregated evolution) to orders
SCHEMA_REGISTRY_URL=http://schema-registry:8081 SCHEMA_FILE=append-scenario/schemas/OrderV2.avsc bash append-scenario/scripts/produce-avro-auto.sh orders 10
[append] Producing 10 Avro messages to topic orders (auto-register)❯ just -f append-scenario/justfile show-ddl
Executing SQL (Trino): SHOW CREATE TABLE iceberg.default."orders"
docker compose -f docker-compose.yml exec trino trino --execute 'SHOW CREATE TABLE iceberg.default."orders"' --output-format ALIGNED --catalog iceberg --schema default
Create Table
-------------------------------------------------------------------------------------------------------------
CREATE TABLE iceberg.default.orders (
order_id bigint NOT NULL,
product_name varchar NOT NULL,
order_description varchar,
_kafka_header map(varchar, varbinary) COMMENT 'Kafka record headers',
_kafka_key varchar COMMENT 'Kafka record key',
_kafka_metadata ROW(partition integer, offset bigint, timestamp bigint) COMMENT 'Kafka record metadata',
f_v2 varchar
)
WITH (
format = 'PARQUET',
format_version = 2,
location = 's3://warehouse/default/orders',
object_store_layout_enabled = true
)
(1 row)❯ just -f append-scenario/justfile send-auto-v3
Producing 10 Avro messages (OrderV2 aggregated evolution) to orders
SCHEMA_REGISTRY_URL=http://schema-registry:8081 SCHEMA_FILE=append-scenario/schemas/OrderV3.avsc bash append-scenario/scripts/produce-avro-auto.sh orders 10
[append] Producing 10 Avro messages to topic orders (auto-register)❯ just -f append-scenario/justfile show-ddl
Executing SQL (Trino): SHOW CREATE TABLE iceberg.default."orders"
docker compose -f docker-compose.yml exec trino trino --execute 'SHOW CREATE TABLE iceberg.default."orders"' --output-format ALIGNED --catalog iceberg --schema default
Create Table
-------------------------------------------------------------------------------------------------------------
CREATE TABLE iceberg.default.orders (
order_id bigint NOT NULL,
product_name varchar NOT NULL,
order_description varchar,
_kafka_header map(varchar, varbinary) COMMENT 'Kafka record headers',
_kafka_key varchar COMMENT 'Kafka record key',
_kafka_metadata ROW(partition integer, offset bigint, timestamp bigint) COMMENT 'Kafka record metadata',
f_v2 varchar,
f_v3 varchar
)
WITH (
format = 'PARQUET',
format_version = 2,
location = 's3://warehouse/default/orders',
object_store_layout_enabled = true
)
(1 row)❯ just -f append-scenario/justfile send-auto-v4
Producing 10 Avro messages (OrderV2 aggregated evolution) to orders
SCHEMA_REGISTRY_URL=http://schema-registry:8081 SCHEMA_FILE=append-scenario/schemas/OrderV4.avsc bash append-scenario/scripts/produce-avro-auto.sh orders 10
[append] Producing 10 Avro messages to topic orders (auto-register)❯ just -f append-scenario/justfile show-ddl
Executing SQL (Trino): SHOW CREATE TABLE iceberg.default."orders"
docker compose -f docker-compose.yml exec trino trino --execute 'SHOW CREATE TABLE iceberg.default."orders"' --output-format ALIGNED --catalog iceberg --schema default
Create Table
-------------------------------------------------------------------------------------------------------------
CREATE TABLE iceberg.default.orders (
order_id bigint NOT NULL,
product_name varchar NOT NULL,
order_description varchar,
_kafka_header map(varchar, varbinary) COMMENT 'Kafka record headers',
_kafka_key varchar COMMENT 'Kafka record key',
_kafka_metadata ROW(partition integer, offset bigint, timestamp bigint) COMMENT 'Kafka record metadata',
f_v2 varchar,
f_v3 varchar,
f_v4 varchar
)
WITH (
format = 'PARQUET',
format_version = 2,
location = 's3://warehouse/default/orders',
object_store_layout_enabled = true
)
(1 row)❯ just trino-sql "SELECT order_id, product_name, order_description, f_v2, f_v3, f_v4 FROM iceberg.default.\"orders\" ORDER BY _kafka_metadata.timestamp"
Executing SQL (Trino): SELECT order_id, product_name, order_description, f_v2, f_v3, f_v4 FROM iceberg.default."orders" ORDER BY _kafka_metadata.timestamp
docker compose -f docker-compose.yml exec trino trino --execute 'SELECT order_id, product_name, order_description, f_v2, f_v3, f_v4 FROM iceberg.default."orders" ORDER BY _kafka_metadata.timestamp' --output-format ALIGNED --catalog iceberg --schema default
order_id | product_name | order_description | f_v2 | f_v3 | f_v4
----------+---------------+-------------------+---------------+---------------+---------------
1 | str_1_xaji0y | str_1_6dpbhs | NULL | NULL | NULL
2 | str_2_ahxthv | str_2_3a3zmf | NULL | NULL | NULL
4 | str_4_3w5uzb | str_4_ikcidk | NULL | NULL | NULL
3 | str_3_8mdd4v | str_3_30t9nt | NULL | NULL | NULL
5 | str_5_wnnhj7 | str_5_xvg0fn | NULL | NULL | NULL
8 | str_8_oh9sdb | str_8_dw2pcn | NULL | NULL | NULL
6 | str_6_9xuy41 | str_6_ibljh7 | NULL | NULL | NULL
9 | str_9_9t84az | str_9_ytjxep | NULL | NULL | NULL
7 | str_7_5lxo6q | str_7_jiujv6 | NULL | NULL | NULL
10 | str_10_q85jsg | str_10_65kxvf | NULL | NULL | NULL
1 | str_1_xaji0y | str_1_6dpbhs | str_1_ahxthv | NULL | NULL
2 | str_2_3a3zmf | str_2_8mdd4v | str_2_30t9nt | NULL | NULL
4 | str_4_xvg0fn | str_4_9xuy41 | str_4_ibljh7 | NULL | NULL
3 | str_3_3w5uzb | str_3_ikcidk | str_3_wnnhj7 | NULL | NULL
5 | str_5_5lxo6q | str_5_jiujv6 | str_5_oh9sdb | NULL | NULL
6 | str_6_dw2pcn | str_6_9t84az | str_6_ytjxep | NULL | NULL
7 | str_7_q85jsg | str_7_65kxvf | str_7_1t2tal | NULL | NULL
8 | str_8_a753lc | str_8_58drc1 | str_8_1ertj5 | NULL | NULL
9 | str_9_pht0hl | str_9_9xpsei | str_9_mvihcw | NULL | NULL
10 | str_10_i64ciy | str_10_he7ur2 | str_10_3gdppq | NULL | NULL
1 | str_1_xaji0y | str_1_6dpbhs | NULL | str_1_ahxthv | NULL
4 | str_4_xvg0fn | str_4_9xuy41 | NULL | str_4_ibljh7 | NULL
2 | str_2_3a3zmf | str_2_8mdd4v | NULL | str_2_30t9nt | NULL
5 | str_5_5lxo6q | str_5_jiujv6 | NULL | str_5_oh9sdb | NULL
3 | str_3_3w5uzb | str_3_ikcidk | NULL | str_3_wnnhj7 | NULL
8 | str_8_a753lc | str_8_58drc1 | NULL | str_8_1ertj5 | NULL
6 | str_6_dw2pcn | str_6_9t84az | NULL | str_6_ytjxep | NULL
9 | str_9_pht0hl | str_9_9xpsei | NULL | str_9_mvihcw | NULL
7 | str_7_q85jsg | str_7_65kxvf | NULL | str_7_1t2tal | NULL
10 | str_10_i64ciy | str_10_he7ur2 | NULL | str_10_3gdppq | NULL
1 | str_1_xaji0y | str_1_6dpbhs | NULL | str_1_ahxthv | str_1_3a3zmf
2 | str_2_8mdd4v | str_2_30t9nt | NULL | str_2_3w5uzb | str_2_ikcidk
3 | str_3_wnnhj7 | str_3_xvg0fn | NULL | str_3_9xuy41 | str_3_ibljh7
4 | str_4_5lxo6q | str_4_jiujv6 | NULL | str_4_oh9sdb | str_4_dw2pcn
8 | str_8_he7ur2 | str_8_3gdppq | NULL | str_8_0y9dom | str_8_5igqpk
5 | str_5_9t84az | str_5_ytjxep | NULL | str_5_q85jsg | str_5_65kxvf
6 | str_6_1t2tal | str_6_a753lc | NULL | str_6_58drc1 | str_6_1ertj5
7 | str_7_pht0hl | str_7_9xpsei | NULL | str_7_mvihcw | str_7_i64ciy
9 | str_9_i7p5tb | str_9_94874f | NULL | str_9_rhocn9 | str_9_j2qp89
10 | str_10_uzfk8u | str_10_t0cvs4 | NULL | str_10_f8cgvy | str_10_ie6ivw
(40 rows) |
| {"name": "order_description", "type": ["null", "string"], "default": null}, | ||
| {"name": "price", "type": "double", "default": 0.0}, | ||
| {"name": "quantity", "type": "long", "default": 0} | ||
| {"name": "f_v2", "type": ["null", "string"], "default": null} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice idea with this field versioning - easy to follow and see what is new and in case of more changes needed can be still possible to follow without making diffs on those files
| {"name": "order_id", "type": "long"}, | ||
| {"name": "product_name", "type": "string"}, | ||
| {"name": "order_description", "type": ["null", "string"], "default": null}, | ||
| {"name": "f_v3", "type": ["null", "string"], "default": null} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in my example I used obligatory field - I think I tried to specify as optional first and later redefine to obligatory but that indeed would be incompatible schema evolution. (not sure if that will matter but just mentioning)
| {"name": "order_id", "type": "long"}, | ||
| {"name": "product_name", "type": "string"}, | ||
| {"name": "order_description", "type": ["null", "string"], "default": null}, | ||
| {"name": "f_v2", "type": ["null", "string"], "default": null}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from schemas it looks correct - so my case was what i see here in v2, v3 and v5
|
As I wrote above the use case i have been thinking about and tsting is the one you named v5 so the return data from sql statement would be |
|
@dumbNickname This PR is just for demonstration purposes and isn't intended to be merged. In the comments above, I actually tested v5 as well, but the query output was getting too long, so to save some effort, I only copied the results up to the v4 run. Everything passed in my tests. |
❯ just -f append-scenario/justfile create-topic
docker exec automq /opt/automq/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic orders --partitions 16 --config automq.table.topic.enable=true --config automq.table.topic.commit.interval.ms=1000 --config automq.table.topic.convert.value.type=by_schema_id --config automq.table.topic.transform.value.type=flatten --config automq.table.topic.namespace=default || true
Created topic orders.
❯ just -f append-scenario/justfile send-auto
Producing 2 Avro messages (auto-register) to orders
SCHEMA_REGISTRY_URL=http://schema-registry:8081 bash append-scenario/scripts/produce-avro-auto.sh orders 2
[append] Producing 2 Avro messages to topic orders (auto-register)
❯ just -f append-scenario/justfile show-ddl
Executing SQL (Trino): SHOW CREATE TABLE iceberg.default."orders"
docker compose -f docker-compose.yml exec trino trino --execute 'SHOW CREATE TABLE iceberg.default."orders"' --output-format ALIGNED --catalog iceberg --schema default
WARN[0000] The "KAFKA_HEAP_OPTS" variable is not set. Defaulting to a blank string.
Create Table
------------------------------------------------------------------------------------------------------------
CREATE TABLE iceberg.default.orders (
order_id bigint NOT NULL,
product_name varchar NOT NULL,
order_description varchar NOT NULL,
_kafka_header map(varchar, varbinary) COMMENT 'Kafka record headers',
_kafka_key varchar COMMENT 'Kafka record key',
_kafka_metadata ROW(partition integer, offset bigint, timestamp bigint) COMMENT 'Kafka record metadata'
)
WITH (
format = 'PARQUET',
format_version = 2,
location = 's3://warehouse/default/orders',
object_store_layout_enabled = true
)
(1 row)
❯ just -f append-scenario/justfile send-auto-v2
Producing 2 Avro messages (OrderV2 aggregated evolution) to orders
SCHEMA_REGISTRY_URL=http://schema-registry:8081 SCHEMA_FILE=append-scenario/schemas/OrderV2.avsc bash append-scenario/scripts/produce-avro-auto.sh orders 2
[append] Producing 2 Avro messages to topic orders (auto-register)
❯ just -f append-scenario/justfile show-ddl
Executing SQL (Trino): SHOW CREATE TABLE iceberg.default."orders"
docker compose -f docker-compose.yml exec trino trino --execute 'SHOW CREATE TABLE iceberg.default."orders"' --output-format ALIGNED --catalog iceberg --schema default
WARN[0000] The "KAFKA_HEAP_OPTS" variable is not set. Defaulting to a blank string.
Create Table
-------------------------------------------------------------------------------------------------------------
CREATE TABLE iceberg.default.orders (
order_id bigint NOT NULL,
product_name varchar NOT NULL,
order_description varchar,
_kafka_header map(varchar, varbinary) COMMENT 'Kafka record headers',
_kafka_key varchar COMMENT 'Kafka record key',
_kafka_metadata ROW(partition integer, offset bigint, timestamp bigint) COMMENT 'Kafka record metadata',
f_v2 varchar
)
WITH (
format = 'PARQUET',
format_version = 2,
location = 's3://warehouse/default/orders',
object_store_layout_enabled = true
)
(1 row)
❯ just -f append-scenario/justfile send-auto-v3
Producing 2 Avro messages (OrderV3 aggregated evolution) to orders
SCHEMA_REGISTRY_URL=http://schema-registry:8081 SCHEMA_FILE=append-scenario/schemas/OrderV3.avsc bash append-scenario/scripts/produce-avro-auto.sh orders 2
[append] Producing 2 Avro messages to topic orders (auto-register)
❯ just -f append-scenario/justfile show-ddl
Executing SQL (Trino): SHOW CREATE TABLE iceberg.default."orders"
docker compose -f docker-compose.yml exec trino trino --execute 'SHOW CREATE TABLE iceberg.default."orders"' --output-format ALIGNED --catalog iceberg --schema default
WARN[0000] The "KAFKA_HEAP_OPTS" variable is not set. Defaulting to a blank string.
Create Table
-------------------------------------------------------------------------------------------------------------
CREATE TABLE iceberg.default.orders (
order_id bigint NOT NULL,
product_name varchar NOT NULL,
order_description varchar,
_kafka_header map(varchar, varbinary) COMMENT 'Kafka record headers',
_kafka_key varchar COMMENT 'Kafka record key',
_kafka_metadata ROW(partition integer, offset bigint, timestamp bigint) COMMENT 'Kafka record metadata',
f_v2 varchar,
f_v3 varchar
)
WITH (
format = 'PARQUET',
format_version = 2,
location = 's3://warehouse/default/orders',
object_store_layout_enabled = true
)
(1 row)
❯ just -f append-scenario/justfile send-auto-v4
error: Justfile does not contain recipe `send-auto-v4`
Did you mean `send-auto-v2`?
❯ just -f append-scenario/justfile send-auto-v4
Producing 2 Avro messages (OrderV4 aggregated evolution) to orders
SCHEMA_REGISTRY_URL=http://schema-registry:8081 SCHEMA_FILE=append-scenario/schemas/OrderV4.avsc bash append-scenario/scripts/produce-avro-auto.sh orders 2
[append] Producing 2 Avro messages to topic orders (auto-register)
❯ just -f append-scenario/justfile show-ddl
Executing SQL (Trino): SHOW CREATE TABLE iceberg.default."orders"
docker compose -f docker-compose.yml exec trino trino --execute 'SHOW CREATE TABLE iceberg.default."orders"' --output-format ALIGNED --catalog iceberg --schema default
WARN[0000] The "KAFKA_HEAP_OPTS" variable is not set. Defaulting to a blank string.
Create Table
-------------------------------------------------------------------------------------------------------------
CREATE TABLE iceberg.default.orders (
order_id bigint NOT NULL,
product_name varchar NOT NULL,
order_description varchar,
_kafka_header map(varchar, varbinary) COMMENT 'Kafka record headers',
_kafka_key varchar COMMENT 'Kafka record key',
_kafka_metadata ROW(partition integer, offset bigint, timestamp bigint) COMMENT 'Kafka record metadata',
f_v2 varchar,
f_v3 varchar
)
WITH (
format = 'PARQUET',
format_version = 2,
location = 's3://warehouse/default/orders',
object_store_layout_enabled = true
)
(1 row) |
No description provided.