Skip to content

Commit

Permalink
fix: wait for new worker join in migrate workers (#15967) (#15982)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Mar 28, 2024
1 parent 8e96da1 commit 4772af4
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 62 deletions.
12 changes: 3 additions & 9 deletions src/meta/src/barrier/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::collections::{HashMap, HashSet};
use risingwave_pb::common::PbWorkerNode;
use tracing::warn;

use crate::manager::{ActorInfos, WorkerId};
use crate::manager::{ActiveStreamingWorkerNodes, ActorInfos, WorkerId};
use crate::model::ActorId;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -52,14 +52,8 @@ pub struct InflightActorInfo {

impl InflightActorInfo {
/// Resolve inflight actor info from given nodes and actors that are loaded from meta store. It will be used during recovery to rebuild all streaming actors.
pub fn resolve(
all_nodes: impl IntoIterator<Item = PbWorkerNode>,
actor_infos: ActorInfos,
) -> Self {
let node_map = all_nodes
.into_iter()
.map(|node| (node.id, node))
.collect::<HashMap<_, _>>();
pub fn resolve(active_nodes: &ActiveStreamingWorkerNodes, actor_infos: ActorInfos) -> Self {
let node_map = active_nodes.current().clone();

let actor_map = actor_infos
.actor_maps
Expand Down
8 changes: 4 additions & 4 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use risingwave_hummock_sdk::table_watermark::{
};
use risingwave_hummock_sdk::{ExtendedSstableInfo, HummockSstableObjectId};
use risingwave_pb::catalog::table::TableType;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::PausedReason;
Expand Down Expand Up @@ -556,6 +555,7 @@ impl GlobalBarrierManager {
changed_worker = self.active_streaming_nodes.changed() => {
#[cfg(debug_assertions)]
{
use risingwave_pb::common::WorkerNode;
match self
.context
.metadata_manager
Expand Down Expand Up @@ -965,18 +965,18 @@ impl GlobalBarrierManagerContext {
/// will create or drop before this barrier flow through them.
async fn resolve_actor_info(
&self,
all_nodes: Vec<WorkerNode>,
active_nodes: &ActiveStreamingWorkerNodes,
) -> MetaResult<InflightActorInfo> {
let info = match &self.metadata_manager {
MetadataManager::V1(mgr) => {
let all_actor_infos = mgr.fragment_manager.load_all_actors().await;

InflightActorInfo::resolve(all_nodes, all_actor_infos)
InflightActorInfo::resolve(active_nodes, all_actor_infos)
}
MetadataManager::V2(mgr) => {
let all_actor_infos = mgr.catalog_controller.load_all_actors().await?;

InflightActorInfo::resolve(all_nodes, all_actor_infos)
InflightActorInfo::resolve(active_nodes, all_actor_infos)
}
};

Expand Down
136 changes: 87 additions & 49 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use risingwave_common::catalog::TableId;
use risingwave_common::config::DefaultParallelism;
use risingwave_hummock_sdk::compaction_group::StateTableId;
use risingwave_meta_model_v2::StreamingParallelism;
use risingwave_pb::common::{ActorInfo, WorkerNode};
use risingwave_pb::common::ActorInfo;
use risingwave_pb::meta::table_fragments::State;
use risingwave_pb::meta::PausedReason;
use risingwave_pb::stream_plan::barrier::BarrierKind;
Expand Down Expand Up @@ -378,17 +378,11 @@ impl GlobalBarrierManager {
// This is a quick path to accelerate the process of dropping and canceling streaming jobs.
let _ = self.pre_apply_drop_cancel().await?;

let active_streaming_nodes = ActiveStreamingWorkerNodes::new_snapshot(
let mut active_streaming_nodes = ActiveStreamingWorkerNodes::new_snapshot(
self.context.metadata_manager.clone(),
)
.await?;

let all_nodes = active_streaming_nodes
.current()
.values()
.cloned()
.collect_vec();

let background_streaming_jobs = self
.context
.metadata_manager
Expand All @@ -402,22 +396,22 @@ impl GlobalBarrierManager {
&& background_streaming_jobs.is_empty()
{
self.context
.scale_actors(all_nodes.clone())
.scale_actors(&active_streaming_nodes)
.await
.inspect_err(|err| {
warn!(error = %err.as_report(), "scale actors failed");
})?;

self.context
.resolve_actor_info(all_nodes.clone())
.resolve_actor_info(&active_streaming_nodes)
.await
.inspect_err(|err| {
warn!(error = %err.as_report(), "resolve actor info failed");
})?
} else {
// Migrate actors in expired CN to newly joined one.
self.context
.migrate_actors(all_nodes.clone())
.migrate_actors(&mut active_streaming_nodes)
.await
.inspect_err(|err| {
warn!(error = %err.as_report(), "migrate actors failed");
Expand All @@ -432,7 +426,7 @@ impl GlobalBarrierManager {
if self.pre_apply_drop_cancel().await? {
info = self
.context
.resolve_actor_info(all_nodes.clone())
.resolve_actor_info(&active_streaming_nodes)
.await
.inspect_err(|err| {
warn!(error = %err.as_report(), "resolve actor info failed");
Expand Down Expand Up @@ -536,14 +530,20 @@ impl GlobalBarrierManager {

impl GlobalBarrierManagerContext {
/// Migrate actors in expired CNs to newly joined ones, return true if any actor is migrated.
async fn migrate_actors(&self, all_nodes: Vec<WorkerNode>) -> MetaResult<InflightActorInfo> {
async fn migrate_actors(
&self,
active_nodes: &mut ActiveStreamingWorkerNodes,
) -> MetaResult<InflightActorInfo> {
match &self.metadata_manager {
MetadataManager::V1(_) => self.migrate_actors_v1(all_nodes).await,
MetadataManager::V2(_) => self.migrate_actors_v2(all_nodes).await,
MetadataManager::V1(_) => self.migrate_actors_v1(active_nodes).await,
MetadataManager::V2(_) => self.migrate_actors_v2(active_nodes).await,
}
}

async fn migrate_actors_v2(&self, all_nodes: Vec<WorkerNode>) -> MetaResult<InflightActorInfo> {
async fn migrate_actors_v2(
&self,
active_nodes: &mut ActiveStreamingWorkerNodes,
) -> MetaResult<InflightActorInfo> {
let mgr = self.metadata_manager.as_v2_ref();

let all_inuse_parallel_units: HashSet<_> = mgr
Expand All @@ -553,8 +553,9 @@ impl GlobalBarrierManagerContext {
.into_iter()
.collect();

let active_parallel_units: HashSet<_> = all_nodes
.iter()
let active_parallel_units: HashSet<_> = active_nodes
.current()
.values()
.flat_map(|node| node.parallel_units.iter().map(|pu| pu.id as i32))
.collect();

Expand All @@ -564,7 +565,7 @@ impl GlobalBarrierManagerContext {
.collect();
if expired_parallel_units.is_empty() {
debug!("no expired parallel units, skipping.");
return self.resolve_actor_info(all_nodes.clone()).await;
return self.resolve_actor_info(active_nodes).await;
}

debug!("start migrate actors.");
Expand All @@ -581,8 +582,9 @@ impl GlobalBarrierManagerContext {
let start = Instant::now();
let mut plan = HashMap::new();
'discovery: while !to_migrate_parallel_units.is_empty() {
let new_parallel_units = all_nodes
.iter()
let new_parallel_units = active_nodes
.current()
.values()
.flat_map(|node| {
node.parallel_units
.iter()
Expand All @@ -605,26 +607,44 @@ impl GlobalBarrierManagerContext {
}
}
}
warn!(
"waiting for new workers to join, elapsed: {}s",
start.elapsed().as_secs()
);

if to_migrate_parallel_units.is_empty() {
break;
}

// wait to get newly joined CN
tokio::time::sleep(Duration::from_millis(100)).await;
let changed = active_nodes
.wait_changed(Duration::from_millis(5000), |active_nodes| {
let current_nodes = active_nodes
.current()
.values()
.map(|node| (node.id, &node.host, &node.parallel_units))
.collect_vec();
warn!(
current_nodes = ?current_nodes,
"waiting for new workers to join, elapsed: {}s",
start.elapsed().as_secs()
);
})
.await;
warn!(?changed, "get worker changed. Retry migrate");
}

mgr.catalog_controller.migrate_actors(plan).await?;

debug!("migrate actors succeed.");

self.resolve_actor_info(all_nodes).await
self.resolve_actor_info(active_nodes).await
}

/// Migrate actors in expired CNs to newly joined ones, return true if any actor is migrated.
async fn migrate_actors_v1(&self, all_nodes: Vec<WorkerNode>) -> MetaResult<InflightActorInfo> {
async fn migrate_actors_v1(
&self,
active_nodes: &mut ActiveStreamingWorkerNodes,
) -> MetaResult<InflightActorInfo> {
let mgr = self.metadata_manager.as_v1_ref();

let info = self.resolve_actor_info(all_nodes.clone()).await?;
let info = self.resolve_actor_info(active_nodes).await?;

// 1. get expired workers.
let expired_workers: HashSet<WorkerId> = info
Expand All @@ -640,7 +660,7 @@ impl GlobalBarrierManagerContext {

debug!("start migrate actors.");
let migration_plan = self
.generate_migration_plan(expired_workers, &all_nodes)
.generate_migration_plan(expired_workers, active_nodes)
.await?;
// 2. start to migrate fragment one-by-one.
mgr.fragment_manager
Expand All @@ -650,18 +670,18 @@ impl GlobalBarrierManagerContext {
migration_plan.delete(self.env.meta_store_checked()).await?;
debug!("migrate actors succeed.");

self.resolve_actor_info(all_nodes).await
self.resolve_actor_info(active_nodes).await
}

async fn scale_actors(&self, all_nodes: Vec<WorkerNode>) -> MetaResult<()> {
async fn scale_actors(&self, active_nodes: &ActiveStreamingWorkerNodes) -> MetaResult<()> {
let _guard = self.scale_controller.reschedule_lock.write().await;
match &self.metadata_manager {
MetadataManager::V1(_) => self.scale_actors_v1(all_nodes).await,
MetadataManager::V2(_) => self.scale_actors_v2(all_nodes).await,
MetadataManager::V1(_) => self.scale_actors_v1(active_nodes).await,
MetadataManager::V2(_) => self.scale_actors_v2(active_nodes).await,
}
}

async fn scale_actors_v2(&self, workers: Vec<WorkerNode>) -> MetaResult<()> {
async fn scale_actors_v2(&self, active_nodes: &ActiveStreamingWorkerNodes) -> MetaResult<()> {
let mgr = self.metadata_manager.as_v2_ref();
debug!("start resetting actors distribution");

Expand All @@ -685,8 +705,9 @@ impl GlobalBarrierManagerContext {
.collect()
};

let schedulable_worker_ids = workers
.iter()
let schedulable_worker_ids = active_nodes
.current()
.values()
.filter(|worker| {
!worker
.property
Expand Down Expand Up @@ -793,8 +814,8 @@ impl GlobalBarrierManagerContext {
}
}

async fn scale_actors_v1(&self, workers: Vec<WorkerNode>) -> MetaResult<()> {
let info = self.resolve_actor_info(workers.clone()).await?;
async fn scale_actors_v1(&self, active_nodes: &ActiveStreamingWorkerNodes) -> MetaResult<()> {
let info = self.resolve_actor_info(active_nodes).await?;

let mgr = self.metadata_manager.as_v1_ref();
debug!("start resetting actors distribution");
Expand Down Expand Up @@ -839,8 +860,9 @@ impl GlobalBarrierManagerContext {
.collect()
};

let schedulable_worker_ids: BTreeSet<_> = workers
.iter()
let schedulable_worker_ids: BTreeSet<_> = active_nodes
.current()
.values()
.filter(|worker| {
!worker
.property
Expand Down Expand Up @@ -915,7 +937,7 @@ impl GlobalBarrierManagerContext {
async fn generate_migration_plan(
&self,
expired_workers: HashSet<WorkerId>,
all_nodes: &Vec<WorkerNode>,
active_nodes: &mut ActiveStreamingWorkerNodes,
) -> MetaResult<MigrationPlan> {
let mgr = self.metadata_manager.as_v1_ref();

Expand Down Expand Up @@ -965,8 +987,9 @@ impl GlobalBarrierManagerContext {
let start = Instant::now();
// if in-used expire parallel units are not empty, should wait for newly joined worker.
'discovery: while !to_migrate_parallel_units.is_empty() {
let mut new_parallel_units = all_nodes
.iter()
let mut new_parallel_units = active_nodes
.current()
.values()
.flat_map(|worker| worker.parallel_units.iter().cloned())
.collect_vec();
new_parallel_units.retain(|pu| !inuse_parallel_units.contains(&pu.id));
Expand All @@ -988,12 +1011,27 @@ impl GlobalBarrierManagerContext {
}
}
}
warn!(
"waiting for new workers to join, elapsed: {}s",
start.elapsed().as_secs()
);

if to_migrate_parallel_units.is_empty() {
break;
}

// wait to get newly joined CN
tokio::time::sleep(Duration::from_millis(100)).await;
let changed = active_nodes
.wait_changed(Duration::from_millis(5000), |active_nodes| {
let current_nodes = active_nodes
.current()
.values()
.map(|node| (node.id, &node.host, &node.parallel_units))
.collect_vec();
warn!(
current_nodes = ?current_nodes,
"waiting for new workers to join, elapsed: {}s",
start.elapsed().as_secs()
);
})
.await;
warn!(?changed, "get worker changed. Retry migrate");
}

// update migration plan, if there is a chain in the plan, update it.
Expand Down
Loading

0 comments on commit 4772af4

Please sign in to comment.