From 7b52939be6b29606a16ab1c72a1a34b06807d105 Mon Sep 17 00:00:00 2001 From: tab Date: Tue, 11 Mar 2025 15:10:31 +0800 Subject: [PATCH 1/3] feat(connector): Add configurable sync call timeout for sources --- src/connector/src/with_options.rs | 10 ++++++++++ src/meta/src/stream/source_manager/worker.rs | 18 +++++++++++------- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/src/connector/src/with_options.rs b/src/connector/src/with_options.rs index c3dc1c0136920..b466f90afb00e 100644 --- a/src/connector/src/with_options.rs +++ b/src/connector/src/with_options.rs @@ -18,6 +18,7 @@ use std::marker::PhantomData; use risingwave_pb::secret::PbSecretRef; use crate::sink::catalog::SinkFormatDesc; +use std::time::Duration; use crate::source::cdc::MYSQL_CDC_CONNECTOR; use crate::source::cdc::external::CdcTableType; use crate::source::iceberg::ICEBERG_CONNECTOR; @@ -115,6 +116,15 @@ pub trait WithPropertiesExt: Get + Sized { connector == MYSQL_CDC_CONNECTOR } + #[inline(always)] + fn get_sync_call_timeout(&self) -> Option { + const SYNC_CALL_TIMEOUT_KEY: &str = "properties.sync.call.timeout"; // only from kafka props, add more if needed + self.get(SYNC_CALL_TIMEOUT_KEY) + // ignore the error is ok here, because we will parse the field again when building the properties and has more precise error message + .map(|s| duration_str::parse_std(s).ok()) + .flatten() + } + #[inline(always)] fn is_cdc_connector(&self) -> bool { let Some(connector) = self.get_connector() else { diff --git a/src/meta/src/stream/source_manager/worker.rs b/src/meta/src/stream/source_manager/worker.rs index 18549e869e9bd..b1625b7031aa8 100644 --- a/src/meta/src/stream/source_manager/worker.rs +++ b/src/meta/src/stream/source_manager/worker.rs @@ -14,7 +14,7 @@ #[cfg(not(debug_assertions))] use risingwave_connector::error::ConnectorError; -use risingwave_connector::source::AnySplitEnumerator; +use risingwave_connector::{source::AnySplitEnumerator, WithPropertiesExt}; use super::*; @@ -97,6 +97,11 @@ pub async fn create_source_worker( let enable_scale_in = connector_properties.enable_drop_split(); let enable_adaptive_splits = connector_properties.enable_adaptive_splits(); let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel(); + let sync_call_timeout = if let Some(timeout) = source.with_properties.get_sync_call_timeout() { + timeout + } else { + DEFAULT_SOURCE_TICK_TIMEOUT + }; let handle = { let mut worker = ConnectorSourceWorker::create( source, @@ -108,12 +113,11 @@ pub async fn create_source_worker( .await?; // if fail to fetch meta info, will refuse to create source - - // todo: make the timeout configurable, longer than `properties.sync.call.timeout` - // in kafka - tokio::time::timeout(DEFAULT_SOURCE_TICK_TIMEOUT, worker.tick()) - .await - .ok() + tokio::time::timeout( + sync_call_timeout, + worker.tick(), + ) + .await .with_context(|| { format!( "failed to fetch meta info for source {}, timeout {:?}", From 4868e02a203213c94af723ded02c75822dfd4925 Mon Sep 17 00:00:00 2001 From: tab Date: Tue, 11 Mar 2025 15:13:53 +0800 Subject: [PATCH 2/3] fix format --- src/connector/src/with_options.rs | 7 +++---- src/meta/src/stream/source_manager/worker.rs | 10 ++++------ 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/src/connector/src/with_options.rs b/src/connector/src/with_options.rs index b466f90afb00e..e5e3e452a8a84 100644 --- a/src/connector/src/with_options.rs +++ b/src/connector/src/with_options.rs @@ -14,11 +14,11 @@ use std::collections::{BTreeMap, HashMap}; use std::marker::PhantomData; +use std::time::Duration; use risingwave_pb::secret::PbSecretRef; use crate::sink::catalog::SinkFormatDesc; -use std::time::Duration; use crate::source::cdc::MYSQL_CDC_CONNECTOR; use crate::source::cdc::external::CdcTableType; use crate::source::iceberg::ICEBERG_CONNECTOR; @@ -120,9 +120,8 @@ pub trait WithPropertiesExt: Get + Sized { fn get_sync_call_timeout(&self) -> Option { const SYNC_CALL_TIMEOUT_KEY: &str = "properties.sync.call.timeout"; // only from kafka props, add more if needed self.get(SYNC_CALL_TIMEOUT_KEY) - // ignore the error is ok here, because we will parse the field again when building the properties and has more precise error message - .map(|s| duration_str::parse_std(s).ok()) - .flatten() + // ignore the error is ok here, because we will parse the field again when building the properties and has more precise error message + .and_then(|s| duration_str::parse_std(s).ok()) } #[inline(always)] diff --git a/src/meta/src/stream/source_manager/worker.rs b/src/meta/src/stream/source_manager/worker.rs index b1625b7031aa8..14e8e27742224 100644 --- a/src/meta/src/stream/source_manager/worker.rs +++ b/src/meta/src/stream/source_manager/worker.rs @@ -12,9 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_connector::WithPropertiesExt; #[cfg(not(debug_assertions))] use risingwave_connector::error::ConnectorError; -use risingwave_connector::{source::AnySplitEnumerator, WithPropertiesExt}; +use risingwave_connector::source::AnySplitEnumerator; use super::*; @@ -113,11 +114,8 @@ pub async fn create_source_worker( .await?; // if fail to fetch meta info, will refuse to create source - tokio::time::timeout( - sync_call_timeout, - worker.tick(), - ) - .await + tokio::time::timeout(sync_call_timeout, worker.tick()) + .await .with_context(|| { format!( "failed to fetch meta info for source {}, timeout {:?}", From 1ebdf2fd698112599768da144c2248ca74a4fa77 Mon Sep 17 00:00:00 2001 From: tab Date: Tue, 11 Mar 2025 15:52:24 +0800 Subject: [PATCH 3/3] fmt --- src/meta/src/stream/source_manager/worker.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/meta/src/stream/source_manager/worker.rs b/src/meta/src/stream/source_manager/worker.rs index 14e8e27742224..8bf985f0e40f8 100644 --- a/src/meta/src/stream/source_manager/worker.rs +++ b/src/meta/src/stream/source_manager/worker.rs @@ -98,11 +98,10 @@ pub async fn create_source_worker( let enable_scale_in = connector_properties.enable_drop_split(); let enable_adaptive_splits = connector_properties.enable_adaptive_splits(); let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel(); - let sync_call_timeout = if let Some(timeout) = source.with_properties.get_sync_call_timeout() { - timeout - } else { - DEFAULT_SOURCE_TICK_TIMEOUT - }; + let sync_call_timeout = source + .with_properties + .get_sync_call_timeout() + .unwrap_or(DEFAULT_SOURCE_TICK_TIMEOUT); let handle = { let mut worker = ConnectorSourceWorker::create( source,