Skip to content

Commit 0bcb6ab

Browse files
authored
feat(source): Avro with AWS Glue Schema Registry (#17605) (#17677)
1 parent fa06faa commit 0bcb6ab

File tree

13 files changed

+637
-132
lines changed

13 files changed

+637
-132
lines changed

Cargo.lock

+23
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ aws-config = { version = "1", default-features = false, features = [
9393
aws-credential-types = { version = "1", default-features = false, features = [
9494
"hardcoded-credentials",
9595
] }
96+
aws-sdk-glue = "1"
9697
aws-sdk-kinesis = { version = "1", default-features = false, features = [
9798
"rt-tokio",
9899
"rustls",
+121
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
control substitution on
2+
3+
system ok
4+
rpk topic delete 'glue-sample-my-event'
5+
6+
system ok
7+
rpk topic create 'glue-sample-my-event'
8+
9+
system ok
10+
rpk topic produce -f '%v{hex}\\n' 'glue-sample-my-event' <<EOF
11+
03005af405ef11b5444281a2e0563e5a734606666f6f80868dc8ebd98404
12+
EOF
13+
14+
statement ok
15+
create source t with (
16+
connector = 'kafka',
17+
properties.bootstrap.server='${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}',
18+
topic = 'glue-sample-my-event')
19+
format plain encode avro (
20+
aws.glue.schema_arn = 'arn:aws:glue:ap-southeast-1:123456123456:schema/default-registry/MyEvent',
21+
aws.glue.mock_config = '{
22+
"by_id":{
23+
"5af405ef-11b5-4442-81a2-e0563e5a7346": {
24+
"type": "record",
25+
"name": "MyEvent",
26+
"fields": [
27+
{
28+
"name": "f1",
29+
"type": "string"
30+
},
31+
{
32+
"name": "f2",
33+
"type": {
34+
"type": "long",
35+
"logicalType": "timestamp-micros"
36+
}
37+
}
38+
]
39+
}
40+
},
41+
"arn_to_latest_id":{
42+
"arn:aws:glue:ap-southeast-1:123456123456:schema/default-registry/MyEvent": "5af405ef-11b5-4442-81a2-e0563e5a7346"
43+
}
44+
}');
45+
46+
query TT
47+
select * from t;
48+
----
49+
foo 2006-01-02 22:04:05.123456+00:00
50+
51+
statement ok
52+
alter source t format plain encode avro (
53+
aws.glue.schema_arn = 'arn:aws:glue:ap-southeast-1:123456123456:schema/default-registry/MyEvent',
54+
aws.glue.mock_config = '{
55+
"by_id":{
56+
"5af405ef-11b5-4442-81a2-e0563e5a7346": {
57+
"type": "record",
58+
"name": "MyEvent",
59+
"fields": [
60+
{
61+
"name": "f1",
62+
"type": "string"
63+
},
64+
{
65+
"name": "f2",
66+
"type": {
67+
"type": "long",
68+
"logicalType": "timestamp-micros"
69+
}
70+
}
71+
]
72+
},
73+
"4516411b-b1e7-4e67-839f-3ef1b8c29280": {
74+
"type": "record",
75+
"name": "MyEvent",
76+
"fields": [
77+
{
78+
"name": "f1",
79+
"type": "string"
80+
},
81+
{
82+
"name": "f2",
83+
"type": {
84+
"type": "long",
85+
"logicalType": "timestamp-micros"
86+
}
87+
},
88+
{
89+
"name": "f3",
90+
"type": ["null", "bytes"],
91+
"default": null
92+
}
93+
]
94+
}
95+
},
96+
"arn_to_latest_id":{
97+
"arn:aws:glue:ap-southeast-1:123456123456:schema/default-registry/MyEvent": "4516411b-b1e7-4e67-839f-3ef1b8c29280"
98+
}
99+
}');
100+
101+
query TTT
102+
select * from t;
103+
----
104+
foo 2006-01-02 22:04:05.123456+00:00 NULL
105+
106+
system ok
107+
rpk topic produce -f '%v{hex}\\n' 'glue-sample-my-event' <<EOF
108+
03004516411bb1e74e67839f3ef1b8c292800441428089b5e9a886ee050208deadbeef
109+
EOF
110+
111+
query TTT
112+
select * from t order by 2;
113+
----
114+
foo 2006-01-02 22:04:05.123456+00:00 NULL
115+
AB 2022-04-08 00:00:00.123456+00:00 \xdeadbeef
116+
117+
statement ok
118+
drop source t;
119+
120+
system ok
121+
rpk topic delete 'glue-sample-my-event'

proto/catalog.proto

+5
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,11 @@ message StreamSourceInfo {
5252
// deprecated
5353
plan_common.RowFormatType row_format = 1;
5454
string row_schema_location = 2;
55+
// This means *use **confluent** schema registry* and is `false` for **aws glue** schema registry.
56+
// Eventually we will deprecate it and rely on `enum SchemaLocation` derived from `format_encode_options` below.
57+
// * schema.location false
58+
// * schema.registry true
59+
// * aws.glue.schema_arn false
5560
bool use_schema_registry = 3;
5661
string proto_message_name = 4;
5762
int32 csv_delimiter = 5;

src/connector/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ aws-config = { workspace = true }
3535
aws-credential-types = { workspace = true }
3636
aws-msk-iam-sasl-signer = "1.0.0"
3737
aws-sdk-dynamodb = "1"
38+
aws-sdk-glue = { workspace = true }
3839
aws-sdk-kinesis = { workspace = true }
3940
aws-sdk-s3 = { workspace = true }
4041
aws-smithy-http = { workspace = true }

0 commit comments

Comments
 (0)