diff --git a/src/frontend/src/catalog/purify.rs b/src/frontend/src/catalog/purify.rs index 774ed49cfe9fa..c099c9387f88a 100644 --- a/src/frontend/src/catalog/purify.rs +++ b/src/frontend/src/catalog/purify.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use prost::Message as _; use risingwave_common::bail; use risingwave_common::catalog::{ColumnCatalog, ColumnId}; use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; @@ -53,12 +54,9 @@ pub fn try_purify_table_source_create_sql_ast( // Filter out columns that are not defined by users in SQL. let defined_columns = columns.iter().filter(|c| c.is_user_defined()); - // If all columns are defined... - // - either the schema is fully specified by the user, - // - the persisted definition is already purified. - // No need to proceed. + // If all columns are defined, check if the count matches. if !column_defs.is_empty() && wildcard_idx.is_none() { - let defined_columns_len = defined_columns.count(); + let defined_columns_len = defined_columns.clone().count(); if column_defs.len() != defined_columns_len { bail /* unlikely */ !( "column count mismatch: defined {} columns, but {} columns in the definition", @@ -66,46 +64,65 @@ pub fn try_purify_table_source_create_sql_ast( column_defs.len() ); } - - return Ok(base); } - // Schema inferred. Now derive the missing columns and constraints. + // Now derive the missing columns and constraints. + // First, remove the wildcard from the definition. *wildcard_idx = None; // Derive `ColumnDef` from `ColumnCatalog`. let mut purified_column_defs = Vec::new(); for column in defined_columns { - // If the column is already defined in the persisted definition, keep it. - if let Some(existing) = column_defs + let mut column_def = if let Some(existing) = column_defs .iter() .find(|c| c.name.real_value() == column.name()) { - purified_column_defs.push(existing.clone()); - continue; - } + // If the column is already defined in the persisted definition, retrieve it. + existing.clone() + } else { + assert!( + !column.is_generated(), + "generated column must not be inferred" + ); - if let Some(c) = &column.column_desc.generated_or_default_column { - match c { - GeneratedOrDefaultColumn::GeneratedColumn(_) => { - unreachable!("generated column must not be inferred"); - } - GeneratedOrDefaultColumn::DefaultColumn(_) => { - // TODO: convert `ExprNode` back to ast can be a bit tricky. - // Fortunately, this case is rare as inferring default values is not - // widely supported. - bail /* unlikely */ !("purifying default value is not supported yet"); - } + // Generate a new `ColumnDef` from the catalog. + ColumnDef { + name: column.name().into(), + data_type: Some(column.data_type().to_ast()), + collation: None, + options: Vec::new(), // pk will be specified with table constraints } + }; + + // Fill in the persisted default value desc. + if let Some(c) = &column.column_desc.generated_or_default_column + && let GeneratedOrDefaultColumn::DefaultColumn(desc) = c + { + let persisted = desc.encode_to_vec().into_boxed_slice(); + + let default_value_option = column_def + .options + .extract_if(|o| matches!(o.option, ColumnOption::DefaultValue { .. })) + .next(); + + let expr = default_value_option.map(|o| match o.option { + ColumnOption::DefaultValue(expr) => expr, + _ => unreachable!(), + }); + + column_def.options.push(ColumnOptionDef { + name: None, + option: ColumnOption::DefaultValuePersisted { persisted, expr }, + }); } - let column_def = ColumnDef { - name: column.name().into(), - data_type: Some(column.data_type().to_ast()), - collation: None, - options: Vec::new(), // pk will be specified with table constraints - }; + // Remove primary key constraint from the column options. It will be specified with + // table constraints later. + column_def + .options + .retain(|o| !matches!(o.option, ColumnOption::Unique { is_primary: true })); + purified_column_defs.push(column_def); } *column_defs = purified_column_defs;