|
12 | 12 | // See the License for the specific language governing permissions and
|
13 | 13 | // limitations under the License.
|
14 | 14 |
|
| 15 | +use prost::Message as _; |
15 | 16 | use risingwave_common::bail;
|
16 | 17 | use risingwave_common::catalog::{ColumnCatalog, ColumnId};
|
17 | 18 | use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
|
@@ -53,59 +54,75 @@ pub fn try_purify_table_source_create_sql_ast(
|
53 | 54 | // Filter out columns that are not defined by users in SQL.
|
54 | 55 | let defined_columns = columns.iter().filter(|c| c.is_user_defined());
|
55 | 56 |
|
56 |
| - // If all columns are defined... |
57 |
| - // - either the schema is fully specified by the user, |
58 |
| - // - the persisted definition is already purified. |
59 |
| - // No need to proceed. |
| 57 | + // If all columns are defined, check if the count matches. |
60 | 58 | if !column_defs.is_empty() && wildcard_idx.is_none() {
|
61 |
| - let defined_columns_len = defined_columns.count(); |
| 59 | + let defined_columns_len = defined_columns.clone().count(); |
62 | 60 | if column_defs.len() != defined_columns_len {
|
63 | 61 | bail /* unlikely */ !(
|
64 | 62 | "column count mismatch: defined {} columns, but {} columns in the definition",
|
65 | 63 | defined_columns_len,
|
66 | 64 | column_defs.len()
|
67 | 65 | );
|
68 | 66 | }
|
69 |
| - |
70 |
| - return Ok(base); |
71 | 67 | }
|
72 | 68 |
|
73 |
| - // Schema inferred. Now derive the missing columns and constraints. |
| 69 | + // Now derive the missing columns and constraints. |
| 70 | + |
74 | 71 | // First, remove the wildcard from the definition.
|
75 | 72 | *wildcard_idx = None;
|
76 | 73 |
|
77 | 74 | // Derive `ColumnDef` from `ColumnCatalog`.
|
78 | 75 | let mut purified_column_defs = Vec::new();
|
79 | 76 | for column in defined_columns {
|
80 |
| - // If the column is already defined in the persisted definition, keep it. |
81 |
| - if let Some(existing) = column_defs |
| 77 | + let mut column_def = if let Some(existing) = column_defs |
82 | 78 | .iter()
|
83 | 79 | .find(|c| c.name.real_value() == column.name())
|
84 | 80 | {
|
85 |
| - purified_column_defs.push(existing.clone()); |
86 |
| - continue; |
87 |
| - } |
| 81 | + // If the column is already defined in the persisted definition, retrieve it. |
| 82 | + existing.clone() |
| 83 | + } else { |
| 84 | + assert!( |
| 85 | + !column.is_generated(), |
| 86 | + "generated column must not be inferred" |
| 87 | + ); |
88 | 88 |
|
89 |
| - if let Some(c) = &column.column_desc.generated_or_default_column { |
90 |
| - match c { |
91 |
| - GeneratedOrDefaultColumn::GeneratedColumn(_) => { |
92 |
| - unreachable!("generated column must not be inferred"); |
93 |
| - } |
94 |
| - GeneratedOrDefaultColumn::DefaultColumn(_) => { |
95 |
| - // TODO: convert `ExprNode` back to ast can be a bit tricky. |
96 |
| - // Fortunately, this case is rare as inferring default values is not |
97 |
| - // widely supported. |
98 |
| - bail /* unlikely */ !("purifying default value is not supported yet"); |
99 |
| - } |
| 89 | + // Generate a new `ColumnDef` from the catalog. |
| 90 | + ColumnDef { |
| 91 | + name: column.name().into(), |
| 92 | + data_type: Some(column.data_type().to_ast()), |
| 93 | + collation: None, |
| 94 | + options: Vec::new(), // pk will be specified with table constraints |
100 | 95 | }
|
| 96 | + }; |
| 97 | + |
| 98 | + // Fill in the persisted default value desc. |
| 99 | + if let Some(c) = &column.column_desc.generated_or_default_column |
| 100 | + && let GeneratedOrDefaultColumn::DefaultColumn(desc) = c |
| 101 | + { |
| 102 | + let persisted = desc.encode_to_vec().into_boxed_slice(); |
| 103 | + |
| 104 | + let default_value_option = column_def |
| 105 | + .options |
| 106 | + .extract_if(|o| matches!(o.option, ColumnOption::DefaultValue { .. })) |
| 107 | + .next(); |
| 108 | + |
| 109 | + let expr = default_value_option.map(|o| match o.option { |
| 110 | + ColumnOption::DefaultValue(expr) => expr, |
| 111 | + _ => unreachable!(), |
| 112 | + }); |
| 113 | + |
| 114 | + column_def.options.push(ColumnOptionDef { |
| 115 | + name: None, |
| 116 | + option: ColumnOption::DefaultValuePersisted { persisted, expr }, |
| 117 | + }); |
101 | 118 | }
|
102 | 119 |
|
103 |
| - let column_def = ColumnDef { |
104 |
| - name: column.name().into(), |
105 |
| - data_type: Some(column.data_type().to_ast()), |
106 |
| - collation: None, |
107 |
| - options: Vec::new(), // pk will be specified with table constraints |
108 |
| - }; |
| 120 | + // Remove primary key constraint from the column options. It will be specified with |
| 121 | + // table constraints later. |
| 122 | + column_def |
| 123 | + .options |
| 124 | + .retain(|o| !matches!(o.option, ColumnOption::Unique { is_primary: true })); |
| 125 | + |
109 | 126 | purified_column_defs.push(column_def);
|
110 | 127 | }
|
111 | 128 | *column_defs = purified_column_defs;
|
|
0 commit comments