Skip to content

Commit f6c922b

Browse files
github-actions[bot]tabVersiontablmatz
authored
fix: handle missing 'connector' field in WITH clause with CONNECTION (#21691) (#21692)
Co-authored-by: Bohan Zhang <[email protected]> Co-authored-by: tab <[email protected]> Co-authored-by: lmatz <[email protected]>
1 parent a0d3248 commit f6c922b

File tree

3 files changed

+20
-2
lines changed

3 files changed

+20
-2
lines changed

e2e_test/source_inline/connection/ddl.slt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,14 @@ insert into data_table values (1, 'a'), (2, 'b'), (3, 'c');
7070
statement ok
7171
flush;
7272

73+
statement error missing field 'connector' in WITH clause
74+
create sink sink_kafka from data_table with (
75+
connection = conn,
76+
topic = 'connection_ddl_1'
77+
) format plain encode json (
78+
force_append_only='true'
79+
);
80+
7381
statement ok
7482
create sink sink_kafka from data_table with (
7583
connector = 'kafka',

src/frontend/src/handler/create_sink.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,12 @@ pub async fn gen_sink_plan(
123123

124124
// if not using connection, we don't need to check connector match connection type
125125
if !matches!(connection_type, PbConnectionType::Unspecified) {
126-
let connector = resolved_with_options.get_connector().unwrap();
126+
let Some(connector) = resolved_with_options.get_connector() else {
127+
return Err(RwError::from(ErrorCode::ProtocolError(format!(
128+
"missing field '{}' in WITH clause",
129+
CONNECTOR_TYPE_KEY
130+
))));
131+
};
127132
check_connector_match_connection_type(connector.as_str(), &connection_type)?;
128133
}
129134

src/frontend/src/handler/create_source.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -718,7 +718,12 @@ pub async fn bind_create_source_or_table_with_connector(
718718

719719
// if not using connection, we don't need to check connector match connection type
720720
if !matches!(connection_type, PbConnectionType::Unspecified) {
721-
let connector = with_properties.get_connector().unwrap();
721+
let Some(connector) = with_properties.get_connector() else {
722+
return Err(RwError::from(ProtocolError(format!(
723+
"missing field '{}' in WITH clause",
724+
UPSTREAM_SOURCE_KEY
725+
))));
726+
};
722727
check_connector_match_connection_type(connector.as_str(), &connection_type)?;
723728
}
724729

0 commit comments

Comments
 (0)