From 4772af43f994143f363d16ede0a41786b2112ae3 Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Thu, 28 Mar 2024 16:46:40 +0800 Subject: [PATCH] fix: wait for new worker join in migrate workers (#15967) (#15982) --- src/meta/src/barrier/info.rs | 12 +-- src/meta/src/barrier/mod.rs | 8 +- src/meta/src/barrier/recovery.rs | 136 ++++++++++++++++++++----------- src/meta/src/manager/metadata.rs | 19 +++++ 4 files changed, 113 insertions(+), 62 deletions(-) diff --git a/src/meta/src/barrier/info.rs b/src/meta/src/barrier/info.rs index 7e845be71d987..c98b2cdd64d6f 100644 --- a/src/meta/src/barrier/info.rs +++ b/src/meta/src/barrier/info.rs @@ -17,7 +17,7 @@ use std::collections::{HashMap, HashSet}; use risingwave_pb::common::PbWorkerNode; use tracing::warn; -use crate::manager::{ActorInfos, WorkerId}; +use crate::manager::{ActiveStreamingWorkerNodes, ActorInfos, WorkerId}; use crate::model::ActorId; #[derive(Debug, Clone)] @@ -52,14 +52,8 @@ pub struct InflightActorInfo { impl InflightActorInfo { /// Resolve inflight actor info from given nodes and actors that are loaded from meta store. It will be used during recovery to rebuild all streaming actors. - pub fn resolve( - all_nodes: impl IntoIterator, - actor_infos: ActorInfos, - ) -> Self { - let node_map = all_nodes - .into_iter() - .map(|node| (node.id, node)) - .collect::>(); + pub fn resolve(active_nodes: &ActiveStreamingWorkerNodes, actor_infos: ActorInfos) -> Self { + let node_map = active_nodes.current().clone(); let actor_map = actor_infos .actor_maps diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 3bad3cbd15a2d..ffacfe3a31a99 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -32,7 +32,6 @@ use risingwave_hummock_sdk::table_watermark::{ }; use risingwave_hummock_sdk::{ExtendedSstableInfo, HummockSstableObjectId}; use risingwave_pb::catalog::table::TableType; -use risingwave_pb::common::WorkerNode; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::PausedReason; @@ -556,6 +555,7 @@ impl GlobalBarrierManager { changed_worker = self.active_streaming_nodes.changed() => { #[cfg(debug_assertions)] { + use risingwave_pb::common::WorkerNode; match self .context .metadata_manager @@ -965,18 +965,18 @@ impl GlobalBarrierManagerContext { /// will create or drop before this barrier flow through them. async fn resolve_actor_info( &self, - all_nodes: Vec, + active_nodes: &ActiveStreamingWorkerNodes, ) -> MetaResult { let info = match &self.metadata_manager { MetadataManager::V1(mgr) => { let all_actor_infos = mgr.fragment_manager.load_all_actors().await; - InflightActorInfo::resolve(all_nodes, all_actor_infos) + InflightActorInfo::resolve(active_nodes, all_actor_infos) } MetadataManager::V2(mgr) => { let all_actor_infos = mgr.catalog_controller.load_all_actors().await?; - InflightActorInfo::resolve(all_nodes, all_actor_infos) + InflightActorInfo::resolve(active_nodes, all_actor_infos) } }; diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index fdcde0a8e1340..84c91dfc261f2 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -22,7 +22,7 @@ use risingwave_common::catalog::TableId; use risingwave_common::config::DefaultParallelism; use risingwave_hummock_sdk::compaction_group::StateTableId; use risingwave_meta_model_v2::StreamingParallelism; -use risingwave_pb::common::{ActorInfo, WorkerNode}; +use risingwave_pb::common::ActorInfo; use risingwave_pb::meta::table_fragments::State; use risingwave_pb::meta::PausedReason; use risingwave_pb::stream_plan::barrier::BarrierKind; @@ -378,17 +378,11 @@ impl GlobalBarrierManager { // This is a quick path to accelerate the process of dropping and canceling streaming jobs. let _ = self.pre_apply_drop_cancel().await?; - let active_streaming_nodes = ActiveStreamingWorkerNodes::new_snapshot( + let mut active_streaming_nodes = ActiveStreamingWorkerNodes::new_snapshot( self.context.metadata_manager.clone(), ) .await?; - let all_nodes = active_streaming_nodes - .current() - .values() - .cloned() - .collect_vec(); - let background_streaming_jobs = self .context .metadata_manager @@ -402,14 +396,14 @@ impl GlobalBarrierManager { && background_streaming_jobs.is_empty() { self.context - .scale_actors(all_nodes.clone()) + .scale_actors(&active_streaming_nodes) .await .inspect_err(|err| { warn!(error = %err.as_report(), "scale actors failed"); })?; self.context - .resolve_actor_info(all_nodes.clone()) + .resolve_actor_info(&active_streaming_nodes) .await .inspect_err(|err| { warn!(error = %err.as_report(), "resolve actor info failed"); @@ -417,7 +411,7 @@ impl GlobalBarrierManager { } else { // Migrate actors in expired CN to newly joined one. self.context - .migrate_actors(all_nodes.clone()) + .migrate_actors(&mut active_streaming_nodes) .await .inspect_err(|err| { warn!(error = %err.as_report(), "migrate actors failed"); @@ -432,7 +426,7 @@ impl GlobalBarrierManager { if self.pre_apply_drop_cancel().await? { info = self .context - .resolve_actor_info(all_nodes.clone()) + .resolve_actor_info(&active_streaming_nodes) .await .inspect_err(|err| { warn!(error = %err.as_report(), "resolve actor info failed"); @@ -536,14 +530,20 @@ impl GlobalBarrierManager { impl GlobalBarrierManagerContext { /// Migrate actors in expired CNs to newly joined ones, return true if any actor is migrated. - async fn migrate_actors(&self, all_nodes: Vec) -> MetaResult { + async fn migrate_actors( + &self, + active_nodes: &mut ActiveStreamingWorkerNodes, + ) -> MetaResult { match &self.metadata_manager { - MetadataManager::V1(_) => self.migrate_actors_v1(all_nodes).await, - MetadataManager::V2(_) => self.migrate_actors_v2(all_nodes).await, + MetadataManager::V1(_) => self.migrate_actors_v1(active_nodes).await, + MetadataManager::V2(_) => self.migrate_actors_v2(active_nodes).await, } } - async fn migrate_actors_v2(&self, all_nodes: Vec) -> MetaResult { + async fn migrate_actors_v2( + &self, + active_nodes: &mut ActiveStreamingWorkerNodes, + ) -> MetaResult { let mgr = self.metadata_manager.as_v2_ref(); let all_inuse_parallel_units: HashSet<_> = mgr @@ -553,8 +553,9 @@ impl GlobalBarrierManagerContext { .into_iter() .collect(); - let active_parallel_units: HashSet<_> = all_nodes - .iter() + let active_parallel_units: HashSet<_> = active_nodes + .current() + .values() .flat_map(|node| node.parallel_units.iter().map(|pu| pu.id as i32)) .collect(); @@ -564,7 +565,7 @@ impl GlobalBarrierManagerContext { .collect(); if expired_parallel_units.is_empty() { debug!("no expired parallel units, skipping."); - return self.resolve_actor_info(all_nodes.clone()).await; + return self.resolve_actor_info(active_nodes).await; } debug!("start migrate actors."); @@ -581,8 +582,9 @@ impl GlobalBarrierManagerContext { let start = Instant::now(); let mut plan = HashMap::new(); 'discovery: while !to_migrate_parallel_units.is_empty() { - let new_parallel_units = all_nodes - .iter() + let new_parallel_units = active_nodes + .current() + .values() .flat_map(|node| { node.parallel_units .iter() @@ -605,26 +607,44 @@ impl GlobalBarrierManagerContext { } } } - warn!( - "waiting for new workers to join, elapsed: {}s", - start.elapsed().as_secs() - ); + + if to_migrate_parallel_units.is_empty() { + break; + } + // wait to get newly joined CN - tokio::time::sleep(Duration::from_millis(100)).await; + let changed = active_nodes + .wait_changed(Duration::from_millis(5000), |active_nodes| { + let current_nodes = active_nodes + .current() + .values() + .map(|node| (node.id, &node.host, &node.parallel_units)) + .collect_vec(); + warn!( + current_nodes = ?current_nodes, + "waiting for new workers to join, elapsed: {}s", + start.elapsed().as_secs() + ); + }) + .await; + warn!(?changed, "get worker changed. Retry migrate"); } mgr.catalog_controller.migrate_actors(plan).await?; debug!("migrate actors succeed."); - self.resolve_actor_info(all_nodes).await + self.resolve_actor_info(active_nodes).await } /// Migrate actors in expired CNs to newly joined ones, return true if any actor is migrated. - async fn migrate_actors_v1(&self, all_nodes: Vec) -> MetaResult { + async fn migrate_actors_v1( + &self, + active_nodes: &mut ActiveStreamingWorkerNodes, + ) -> MetaResult { let mgr = self.metadata_manager.as_v1_ref(); - let info = self.resolve_actor_info(all_nodes.clone()).await?; + let info = self.resolve_actor_info(active_nodes).await?; // 1. get expired workers. let expired_workers: HashSet = info @@ -640,7 +660,7 @@ impl GlobalBarrierManagerContext { debug!("start migrate actors."); let migration_plan = self - .generate_migration_plan(expired_workers, &all_nodes) + .generate_migration_plan(expired_workers, active_nodes) .await?; // 2. start to migrate fragment one-by-one. mgr.fragment_manager @@ -650,18 +670,18 @@ impl GlobalBarrierManagerContext { migration_plan.delete(self.env.meta_store_checked()).await?; debug!("migrate actors succeed."); - self.resolve_actor_info(all_nodes).await + self.resolve_actor_info(active_nodes).await } - async fn scale_actors(&self, all_nodes: Vec) -> MetaResult<()> { + async fn scale_actors(&self, active_nodes: &ActiveStreamingWorkerNodes) -> MetaResult<()> { let _guard = self.scale_controller.reschedule_lock.write().await; match &self.metadata_manager { - MetadataManager::V1(_) => self.scale_actors_v1(all_nodes).await, - MetadataManager::V2(_) => self.scale_actors_v2(all_nodes).await, + MetadataManager::V1(_) => self.scale_actors_v1(active_nodes).await, + MetadataManager::V2(_) => self.scale_actors_v2(active_nodes).await, } } - async fn scale_actors_v2(&self, workers: Vec) -> MetaResult<()> { + async fn scale_actors_v2(&self, active_nodes: &ActiveStreamingWorkerNodes) -> MetaResult<()> { let mgr = self.metadata_manager.as_v2_ref(); debug!("start resetting actors distribution"); @@ -685,8 +705,9 @@ impl GlobalBarrierManagerContext { .collect() }; - let schedulable_worker_ids = workers - .iter() + let schedulable_worker_ids = active_nodes + .current() + .values() .filter(|worker| { !worker .property @@ -793,8 +814,8 @@ impl GlobalBarrierManagerContext { } } - async fn scale_actors_v1(&self, workers: Vec) -> MetaResult<()> { - let info = self.resolve_actor_info(workers.clone()).await?; + async fn scale_actors_v1(&self, active_nodes: &ActiveStreamingWorkerNodes) -> MetaResult<()> { + let info = self.resolve_actor_info(active_nodes).await?; let mgr = self.metadata_manager.as_v1_ref(); debug!("start resetting actors distribution"); @@ -839,8 +860,9 @@ impl GlobalBarrierManagerContext { .collect() }; - let schedulable_worker_ids: BTreeSet<_> = workers - .iter() + let schedulable_worker_ids: BTreeSet<_> = active_nodes + .current() + .values() .filter(|worker| { !worker .property @@ -915,7 +937,7 @@ impl GlobalBarrierManagerContext { async fn generate_migration_plan( &self, expired_workers: HashSet, - all_nodes: &Vec, + active_nodes: &mut ActiveStreamingWorkerNodes, ) -> MetaResult { let mgr = self.metadata_manager.as_v1_ref(); @@ -965,8 +987,9 @@ impl GlobalBarrierManagerContext { let start = Instant::now(); // if in-used expire parallel units are not empty, should wait for newly joined worker. 'discovery: while !to_migrate_parallel_units.is_empty() { - let mut new_parallel_units = all_nodes - .iter() + let mut new_parallel_units = active_nodes + .current() + .values() .flat_map(|worker| worker.parallel_units.iter().cloned()) .collect_vec(); new_parallel_units.retain(|pu| !inuse_parallel_units.contains(&pu.id)); @@ -988,12 +1011,27 @@ impl GlobalBarrierManagerContext { } } } - warn!( - "waiting for new workers to join, elapsed: {}s", - start.elapsed().as_secs() - ); + + if to_migrate_parallel_units.is_empty() { + break; + } + // wait to get newly joined CN - tokio::time::sleep(Duration::from_millis(100)).await; + let changed = active_nodes + .wait_changed(Duration::from_millis(5000), |active_nodes| { + let current_nodes = active_nodes + .current() + .values() + .map(|node| (node.id, &node.host, &node.parallel_units)) + .collect_vec(); + warn!( + current_nodes = ?current_nodes, + "waiting for new workers to join, elapsed: {}s", + start.elapsed().as_secs() + ); + }) + .await; + warn!(?changed, "get worker changed. Retry migrate"); } // update migration plan, if there is a chain in the plan, update it. diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index 4e92a931967a3..699841c258a06 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -13,7 +13,10 @@ // limitations under the License. use std::collections::{BTreeMap, HashMap, HashSet}; +use std::pin::pin; +use std::time::Duration; +use futures::future::{select, Either}; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_meta_model_v2::SourceId; use risingwave_pb::catalog::{PbSource, PbTable}; @@ -23,6 +26,7 @@ use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty; use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, PbFragment}; use risingwave_pb::stream_plan::{PbDispatchStrategy, PbStreamActor, StreamActor}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; +use tokio::time::sleep; use tracing::warn; use crate::barrier::Reschedule; @@ -90,6 +94,21 @@ impl ActiveStreamingWorkerNodes { &self.worker_nodes } + pub(crate) async fn wait_changed( + &mut self, + verbose_internal: Duration, + verbose_fn: impl Fn(&Self), + ) -> ActiveStreamingWorkerChange { + loop { + if let Either::Left((change, _)) = + select(pin!(self.changed()), pin!(sleep(verbose_internal))).await + { + break change; + } + verbose_fn(self) + } + } + pub(crate) async fn changed(&mut self) -> ActiveStreamingWorkerChange { let ret = loop { let notification = self