Skip to content

Commit

Permalink
fill persisted
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Jan 2, 2025
1 parent ec9d8b1 commit 4e2ba8c
Showing 1 changed file with 47 additions and 30 deletions.
77 changes: 47 additions & 30 deletions src/frontend/src/catalog/purify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use prost::Message as _;
use risingwave_common::bail;
use risingwave_common::catalog::{ColumnCatalog, ColumnId};
use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
Expand Down Expand Up @@ -53,59 +54,75 @@ pub fn try_purify_table_source_create_sql_ast(
// Filter out columns that are not defined by users in SQL.
let defined_columns = columns.iter().filter(|c| c.is_user_defined());

// If all columns are defined...
// - either the schema is fully specified by the user,
// - the persisted definition is already purified.
// No need to proceed.
// If all columns are defined, check if the count matches.
if !column_defs.is_empty() && wildcard_idx.is_none() {
let defined_columns_len = defined_columns.count();
let defined_columns_len = defined_columns.clone().count();
if column_defs.len() != defined_columns_len {
bail /* unlikely */ !(
"column count mismatch: defined {} columns, but {} columns in the definition",
defined_columns_len,
column_defs.len()
);
}

return Ok(base);
}

// Schema inferred. Now derive the missing columns and constraints.
// Now derive the missing columns and constraints.

// First, remove the wildcard from the definition.
*wildcard_idx = None;

// Derive `ColumnDef` from `ColumnCatalog`.
let mut purified_column_defs = Vec::new();
for column in defined_columns {
// If the column is already defined in the persisted definition, keep it.
if let Some(existing) = column_defs
let mut column_def = if let Some(existing) = column_defs
.iter()
.find(|c| c.name.real_value() == column.name())
{
purified_column_defs.push(existing.clone());
continue;
}
// If the column is already defined in the persisted definition, retrieve it.
existing.clone()
} else {
assert!(
!column.is_generated(),
"generated column must not be inferred"
);

if let Some(c) = &column.column_desc.generated_or_default_column {
match c {
GeneratedOrDefaultColumn::GeneratedColumn(_) => {
unreachable!("generated column must not be inferred");
}
GeneratedOrDefaultColumn::DefaultColumn(_) => {
// TODO: convert `ExprNode` back to ast can be a bit tricky.
// Fortunately, this case is rare as inferring default values is not
// widely supported.
bail /* unlikely */ !("purifying default value is not supported yet");
}
// Generate a new `ColumnDef` from the catalog.
ColumnDef {
name: column.name().into(),
data_type: Some(column.data_type().to_ast()),
collation: None,
options: Vec::new(), // pk will be specified with table constraints
}
};

// Fill in the persisted default value desc.
if let Some(c) = &column.column_desc.generated_or_default_column
&& let GeneratedOrDefaultColumn::DefaultColumn(desc) = c
{
let persisted = desc.encode_to_vec().into_boxed_slice();

let default_value_option = column_def
.options
.extract_if(|o| matches!(o.option, ColumnOption::DefaultValue { .. }))
.next();

let expr = default_value_option.map(|o| match o.option {
ColumnOption::DefaultValue(expr) => expr,
_ => unreachable!(),
});

column_def.options.push(ColumnOptionDef {
name: None,
option: ColumnOption::DefaultValuePersisted { persisted, expr },
});
}

let column_def = ColumnDef {
name: column.name().into(),
data_type: Some(column.data_type().to_ast()),
collation: None,
options: Vec::new(), // pk will be specified with table constraints
};
// Remove primary key constraint from the column options. It will be specified with
// table constraints later.
column_def
.options
.retain(|o| !matches!(o.option, ColumnOption::Unique { is_primary: true }));

purified_column_defs.push(column_def);
}
*column_defs = purified_column_defs;
Expand Down

0 comments on commit 4e2ba8c

Please sign in to comment.