diff --git a/src/connector/src/with_options.rs b/src/connector/src/with_options.rs index c3dc1c0136920..e5e3e452a8a84 100644 --- a/src/connector/src/with_options.rs +++ b/src/connector/src/with_options.rs @@ -14,6 +14,7 @@ use std::collections::{BTreeMap, HashMap}; use std::marker::PhantomData; +use std::time::Duration; use risingwave_pb::secret::PbSecretRef; @@ -115,6 +116,14 @@ pub trait WithPropertiesExt: Get + Sized { connector == MYSQL_CDC_CONNECTOR } + #[inline(always)] + fn get_sync_call_timeout(&self) -> Option { + const SYNC_CALL_TIMEOUT_KEY: &str = "properties.sync.call.timeout"; // only from kafka props, add more if needed + self.get(SYNC_CALL_TIMEOUT_KEY) + // ignore the error is ok here, because we will parse the field again when building the properties and has more precise error message + .and_then(|s| duration_str::parse_std(s).ok()) + } + #[inline(always)] fn is_cdc_connector(&self) -> bool { let Some(connector) = self.get_connector() else { diff --git a/src/meta/src/stream/source_manager/worker.rs b/src/meta/src/stream/source_manager/worker.rs index 18549e869e9bd..8bf985f0e40f8 100644 --- a/src/meta/src/stream/source_manager/worker.rs +++ b/src/meta/src/stream/source_manager/worker.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_connector::WithPropertiesExt; #[cfg(not(debug_assertions))] use risingwave_connector::error::ConnectorError; use risingwave_connector::source::AnySplitEnumerator; @@ -97,6 +98,10 @@ pub async fn create_source_worker( let enable_scale_in = connector_properties.enable_drop_split(); let enable_adaptive_splits = connector_properties.enable_adaptive_splits(); let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel(); + let sync_call_timeout = source + .with_properties + .get_sync_call_timeout() + .unwrap_or(DEFAULT_SOURCE_TICK_TIMEOUT); let handle = { let mut worker = ConnectorSourceWorker::create( source, @@ -108,12 +113,8 @@ pub async fn create_source_worker( .await?; // if fail to fetch meta info, will refuse to create source - - // todo: make the timeout configurable, longer than `properties.sync.call.timeout` - // in kafka - tokio::time::timeout(DEFAULT_SOURCE_TICK_TIMEOUT, worker.tick()) + tokio::time::timeout(sync_call_timeout, worker.tick()) .await - .ok() .with_context(|| { format!( "failed to fetch meta info for source {}, timeout {:?}",