Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ redundant_explicit_links = "allow"
lto = "off"
# use parallel frontend to speed up build
# TODO: may consider applying to release/production profile as well
rustflags = ["-Z", "threads=8"]
#rustflags = ["-Z", "threads=8"]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will be reverted once approved


[profile.release]
debug = "full"
Expand Down
1 change: 1 addition & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@ message Database {
uint32 id = 1;
string name = 2;
uint32 owner = 3;
string resource_group = 4;
}

message Comment {
Expand Down
9 changes: 6 additions & 3 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,14 @@ message WorkerNode {
bool is_unschedulable = 3;
// This is used for frontend node to register its rpc address
string internal_rpc_host_addr = 4;
// Meta may assign labels to worker nodes to partition workload by label.
// This is used for serverless backfilling of materialized views.
optional string node_label = 5;

reserved 5;
reserved "node_label";

uint32 parallelism = 6;

// resource group for scheduling
optional string resource_group = 7;
}
message Resource {
string rw_version = 1;
Expand Down
12 changes: 12 additions & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ message CreateMaterializedViewRequest {

// The list of object IDs that this materialized view depends on.
repeated uint32 dependencies = 4;

// The specific resource group to use for the materialized view. If not set, the database resource group is used.
optional string specific_resource_group = 5;
}

message CreateMaterializedViewResponse {
Expand Down Expand Up @@ -270,6 +273,14 @@ message AlterParallelismRequest {

message AlterParallelismResponse {}

message AlterResourceGroupRequest {
uint32 table_id = 1;
optional string resource_group = 2;
bool deferred = 3;
}

message AlterResourceGroupResponse {}

message AlterOwnerResponse {
common.Status status = 1;
WaitVersion version = 2;
Expand Down Expand Up @@ -549,6 +560,7 @@ service DdlService {
rpc AlterOwner(AlterOwnerRequest) returns (AlterOwnerResponse);
rpc AlterSetSchema(AlterSetSchemaRequest) returns (AlterSetSchemaResponse);
rpc AlterParallelism(AlterParallelismRequest) returns (AlterParallelismResponse);
rpc AlterResourceGroup(AlterResourceGroupRequest) returns (AlterResourceGroupResponse);
rpc DropTable(DropTableRequest) returns (DropTableResponse);
rpc RisectlListStateTables(RisectlListStateTablesRequest) returns (RisectlListStateTablesResponse);
rpc CreateView(CreateViewRequest) returns (CreateViewResponse);
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/util/worker_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@

pub type WorkerNodeId = u32;

pub const DEFAULT_COMPUTE_NODE_LABEL: &str = "default";
pub const DEFAULT_RESOURCE_GROUP: &str = "default";
12 changes: 6 additions & 6 deletions src/compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use risingwave_common::util::meta_addr::MetaAddressStrategy;
use risingwave_common::util::resource_util::cpu::total_cpu_available;
use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
use risingwave_common::util::tokio_util::sync::CancellationToken;
use risingwave_common::util::worker_util::DEFAULT_COMPUTE_NODE_LABEL;
use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
use serde::{Deserialize, Serialize};

/// If `total_memory_bytes` is not specified, the default memory limit will be set to
Expand Down Expand Up @@ -113,9 +113,9 @@ pub struct ComputeNodeOpts {
#[override_opts(if_absent, path = streaming.actor_runtime_worker_threads_num)]
pub parallelism: usize,

/// The parallelism that the compute node will register to the scheduler of the meta service.
#[clap(long, env = "RW_NODE_LABEL", default_value_t = default_node_label())]
pub node_label: String,
/// Resource group for scheduling, default value is "default"
#[clap(long, env = "RW_RESOURCE_GROUP", default_value_t = default_resource_group())]
pub resource_group: String,

/// Decides whether the compute node can be used for streaming and serving.
#[clap(long, env = "RW_COMPUTE_NODE_ROLE", value_enum, default_value_t = default_role())]
Expand Down Expand Up @@ -262,8 +262,8 @@ pub fn default_parallelism() -> usize {
total_cpu_available().ceil() as usize
}

pub fn default_node_label() -> String {
DEFAULT_COMPUTE_NODE_LABEL.to_owned()
pub fn default_resource_group() -> String {
DEFAULT_RESOURCE_GROUP.to_owned()
}

pub fn default_role() -> Role {
Expand Down
2 changes: 1 addition & 1 deletion src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ pub async fn compute_node_serve(
is_serving: opts.role.for_serving(),
is_unschedulable: false,
internal_rpc_host_addr: "".to_owned(),
node_label: Some(opts.node_label.clone()),
resource_group: Some(opts.resource_group.clone()),
},
&config.meta,
)
Expand Down
40 changes: 37 additions & 3 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,12 @@ impl CatalogReader {
/// [observer](`crate::observer::FrontendObserverNode`).
#[async_trait::async_trait]
pub trait CatalogWriter: Send + Sync {
async fn create_database(&self, db_name: &str, owner: UserId) -> Result<()>;
async fn create_database(
&self,
db_name: &str,
owner: UserId,
resource_group: &str,
) -> Result<()>;

async fn create_schema(
&self,
Expand All @@ -81,6 +86,7 @@ pub trait CatalogWriter: Send + Sync {
table: PbTable,
graph: StreamFragmentGraph,
dependencies: HashSet<ObjectId>,
specific_resource_group: Option<String>,
) -> Result<()>;

async fn create_table(
Expand Down Expand Up @@ -214,6 +220,13 @@ pub trait CatalogWriter: Send + Sync {
deferred: bool,
) -> Result<()>;

async fn alter_resource_group(
&self,
table_id: u32,
resource_group: Option<String>,
deferred: bool,
) -> Result<()>;

async fn alter_set_schema(
&self,
object: alter_set_schema_request::Object,
Expand All @@ -232,13 +245,19 @@ pub struct CatalogWriterImpl {

#[async_trait::async_trait]
impl CatalogWriter for CatalogWriterImpl {
async fn create_database(&self, db_name: &str, owner: UserId) -> Result<()> {
async fn create_database(
&self,
db_name: &str,
owner: UserId,
resource_group: &str,
) -> Result<()> {
let version = self
.meta_client
.create_database(PbDatabase {
name: db_name.to_owned(),
id: 0,
owner,
resource_group: resource_group.to_owned(),
})
.await?;
self.wait_version(version).await
Expand Down Expand Up @@ -268,11 +287,12 @@ impl CatalogWriter for CatalogWriterImpl {
table: PbTable,
graph: StreamFragmentGraph,
dependencies: HashSet<ObjectId>,
specific_resource_group: Option<String>,
) -> Result<()> {
let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground);
let version = self
.meta_client
.create_materialized_view(table, graph, dependencies)
.create_materialized_view(table, graph, dependencies, specific_resource_group)
.await?;
if matches!(create_type, PbCreateType::Foreground) {
self.wait_version(version).await?
Expand Down Expand Up @@ -579,6 +599,20 @@ impl CatalogWriter for CatalogWriterImpl {
.await?;
self.wait_version(version).await
}

async fn alter_resource_group(
&self,
table_id: u32,
resource_group: Option<String>,
deferred: bool,
) -> Result<()> {
self.meta_client
.alter_resource_group(table_id, resource_group, deferred)
.await
.map_err(|e| anyhow!(e))?;

Ok(())
}
}

impl CatalogWriterImpl {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ struct RwWorkerNode {
system_total_memory_bytes: Option<i64>,
system_total_cpu_cores: Option<i64>,
started_at: Option<Timestamptz>,
label: Option<String>,
resource_group: Option<String>,
}

#[system_catalog(table, "rw_catalog.rw_worker_nodes")]
Expand Down Expand Up @@ -82,8 +82,8 @@ async fn read_rw_worker_nodes_info(reader: &SysCatalogReaderImpl) -> Result<Vec<
started_at: worker
.started_at
.map(|ts| Timestamptz::from_secs(ts as i64).unwrap()),
label: if is_compute {
property.and_then(|p| p.node_label.clone())
resource_group: if is_compute {
property.and_then(|p| p.resource_group.clone())
} else {
None
},
Expand Down
99 changes: 99 additions & 0 deletions src/frontend/src/handler/alter_resource_group.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use pgwire::pg_response::StatementType;
use risingwave_common::bail;
use risingwave_sqlparser::ast::{ObjectName, SetVariableValue, SetVariableValueSingle, Value};

use super::{HandlerArgs, RwPgResponse};
use crate::catalog::root_catalog::SchemaPath;
use crate::catalog::table_catalog::TableType;
use crate::error::{ErrorCode, Result};
use crate::Binder;

pub async fn handle_alter_resource_group(
handler_args: HandlerArgs,
obj_name: ObjectName,
resource_group: Option<SetVariableValue>,
stmt_type: StatementType,
deferred: bool,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let db_name = session.database();
let (schema_name, real_table_name) =
Binder::resolve_schema_qualified_name(db_name, obj_name.clone())?;
let search_path = session.config().search_path();
let user_name = &session.auth_context().user_name;
let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);

let table_id = {
let reader = session.env().catalog_reader().read_guard();

match stmt_type {
StatementType::ALTER_MATERIALIZED_VIEW => {
let (table, schema_name) =
reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;

match (table.table_type(), stmt_type) {
(TableType::MaterializedView, StatementType::ALTER_MATERIALIZED_VIEW) => {}
_ => {
return Err(ErrorCode::InvalidInputSyntax(format!(
"cannot alter resource group of {} {} by {}",
table.table_type().to_prost().as_str_name(),
table.name(),
stmt_type,
))
.into());
}
}

session.check_privilege_for_drop_alter(schema_name, &**table)?;
table.id.table_id()
}
_ => bail!(
"invalid statement type for alter resource group: {:?}",
stmt_type
),
}
};

let resource_group = match resource_group {
None => None,
Some(SetVariableValue::Single(SetVariableValueSingle::Ident(ident))) => {
Some(ident.real_value())
}
Some(SetVariableValue::Single(SetVariableValueSingle::Literal(
Value::SingleQuotedString(v),
))) => Some(v),
_ => {
return Err(ErrorCode::InvalidInputSyntax(
"target parallelism must be a valid number or adaptive".to_owned(),
)
.into());
}
};

let mut builder = RwPgResponse::builder(stmt_type);

let catalog_writer = session.catalog_writer()?;
catalog_writer
.alter_resource_group(table_id, resource_group, deferred)
.await?;

if deferred {
builder = builder.notice("DEFERRED is used, please ensure that automatic parallelism control is enabled on the meta, otherwise, the alter will not take effect.".to_owned());
}

Ok(builder.into())
}
4 changes: 3 additions & 1 deletion src/frontend/src/handler/create_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
use risingwave_sqlparser::ast::ObjectName;

use super::RwPgResponse;
Expand Down Expand Up @@ -73,7 +74,8 @@ pub async fn handle_create_database(

let catalog_writer = session.catalog_writer()?;
catalog_writer
.create_database(&database_name, database_owner)
// TODO: add support for create database with resource_group
.create_database(&database_name, database_owner, DEFAULT_RESOURCE_GROUP)
.await?;

Ok(PgResponse::empty_result(StatementType::CREATE_DATABASE))
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_mv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ It only indicates the physical clustering of the data, which may improve the per
let session = session.clone();
let catalog_writer = session.catalog_writer()?;
catalog_writer
.create_materialized_view(table, graph, dependencies)
.create_materialized_view(table, graph, dependencies, None)
.await?;

Ok(PgResponse::empty_result(
Expand Down
19 changes: 19 additions & 0 deletions src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use crate::utils::WithOptions;
mod alter_owner;
mod alter_parallelism;
mod alter_rename;
mod alter_resource_group;
mod alter_secret;
mod alter_set_schema;
mod alter_source_column;
Expand Down Expand Up @@ -840,6 +841,24 @@ pub async fn handle(
)
.await
}
Statement::AlterView {
materialized,
name,
operation:
AlterViewOperation::SetResourceGroup {
resource_group,
deferred,
},
} if materialized => {
alter_resource_group::handle_alter_resource_group(
handler_args,
name,
resource_group,
StatementType::ALTER_MATERIALIZED_VIEW,
deferred,
)
.await
}
Statement::AlterView {
materialized,
name,
Expand Down
Loading
Loading