Skip to content

Commit 331e87a

Browse files
committed
always fix raw definition
Signed-off-by: Bugen Zhao <[email protected]>
1 parent fa85b46 commit 331e87a

File tree

2 files changed

+25
-6
lines changed

2 files changed

+25
-6
lines changed

src/frontend/src/catalog/table_catalog.rs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,8 @@ impl TableVersion {
279279
impl TableCatalog {
280280
/// Returns the SQL definition when the table was created, purified with best effort
281281
/// if it's a table.
282+
///
283+
/// See [`Self::create_sql_ast_purified`] for more details.
282284
pub fn create_sql_purified(&self) -> String {
283285
self.create_sql_ast_purified()
284286
.map(|stmt| stmt.to_string())
@@ -297,7 +299,7 @@ impl TableCatalog {
297299
let name = ast::ObjectName(vec![self.name.as_str().into()]);
298300
ast::Statement::default_create_table(name)
299301
} else {
300-
self.create_sql_ast()?
302+
self.create_sql_ast_raw()?
301303
};
302304

303305
match try_purify_table_source_create_sql_ast(
@@ -316,7 +318,7 @@ impl TableCatalog {
316318
}
317319
}
318320

319-
self.create_sql_ast()
321+
self.create_sql_ast_raw()
320322
}
321323
}
322324

@@ -481,14 +483,33 @@ impl TableCatalog {
481483
}
482484

483485
/// Returns the SQL definition when the table was created.
486+
///
487+
/// See [`Self::create_sql_ast`] for more details.
484488
pub fn create_sql(&self) -> String {
485-
self.definition.clone()
489+
self.create_sql_ast()
490+
.map(|stmt| stmt.to_string())
491+
.unwrap_or_else(|_| self.definition.clone())
486492
}
487493

488494
/// Returns the parsed SQL definition when the table was created.
489495
///
496+
/// Re-create the table with this statement may have different schema if the schema is derived
497+
/// from external systems (like schema registry) or it's created by `CREATE TABLE AS`. If this
498+
/// is not desired, use [`Self::create_sql_ast_purified`] instead.
499+
///
490500
/// Returns error if it's invalid.
491501
pub fn create_sql_ast(&self) -> Result<ast::Statement> {
502+
if let TableType::Table = self.table_type()
503+
&& self.definition.is_empty()
504+
{
505+
// Fix `CREATE TABLE AS`.
506+
self.create_sql_ast_purified()
507+
} else {
508+
self.create_sql_ast_raw()
509+
}
510+
}
511+
512+
fn create_sql_ast_raw(&self) -> Result<ast::Statement> {
492513
Ok(Parser::parse_sql(&self.definition)
493514
.context("unable to parse definition sql")?
494515
.into_iter()

src/frontend/src/handler/create_sink.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -503,9 +503,7 @@ pub(crate) async fn reparse_table_for_sink(
503503
) -> Result<(StreamFragmentGraph, Table, Option<PbSource>)> {
504504
// Retrieve the original table definition and parse it to AST.
505505
let definition = table_catalog.create_sql_ast_purified()?;
506-
let raw_definition = table_catalog
507-
.create_sql_ast()
508-
.unwrap_or_else(|_| definition.clone() /* create table as */);
506+
let raw_definition = table_catalog.create_sql_ast()?;
509507

510508
let Statement::CreateTable {
511509
name,

0 commit comments

Comments
 (0)