diff --git a/src/bench/sink_bench/main.rs b/src/bench/sink_bench/main.rs index 1f104cbd2946b..2341c9f82a144 100644 --- a/src/bench/sink_bench/main.rs +++ b/src/bench/sink_bench/main.rs @@ -289,6 +289,7 @@ impl MockDatagenSource { properties.clone(), SourceEnumeratorContext::dummy().into(), ) + .await .unwrap(); let parser_config = ParserConfig { specific: SpecificParserConfig { diff --git a/src/connector/src/macros.rs b/src/connector/src/macros.rs index fe113bda87a64..d39ec665bf2da 100644 --- a/src/connector/src/macros.rs +++ b/src/connector/src/macros.rs @@ -320,10 +320,11 @@ macro_rules! impl_connector_properties { $crate::paste! { impl ConnectorProperties { pub async fn create_split_enumerator(self, context: $crate::source::base::SourceEnumeratorContextRef) -> $crate::error::ConnectorResult> { + use crate::source::prelude::*; let enumerator: Box = match self { $( ConnectorProperties::$variant_name(prop) => - Box::new(prop.create_split_enumerator(context).await?), + Box::new( [<$variant_name SplitEnumerator>]::new(*prop, context).await?), )* }; Ok(enumerator) diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 73d497c9d78e0..087e68ee821b1 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -47,7 +47,7 @@ use crate::sink::{DummySinkCommitCoordinator, Result, SinkWriterParam}; use crate::source::kafka::{ KafkaContextCommon, KafkaProperties, KafkaSplitEnumerator, RwProducerContext, }; -use crate::source::SourceEnumeratorContext; +use crate::source::{SourceEnumeratorContext, SplitEnumerator}; use crate::{ deserialize_duration_from_string, deserialize_u32_from_string, dispatch_sink_formatter_impl, }; diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 77ed49d283767..35af6b9cfc1da 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -81,7 +81,7 @@ pub trait SourceProperties: TryFromBTreeMap + Clone + WithOptions + std::fmt::De type Split: SplitMetaData + TryFrom + Into; - type SplitEnumerator: SplitEnumerator;; + type SplitEnumerator: SplitEnumerator; type SplitReader: SplitReader; /// Load additional info from `PbSource`. Currently only used by CDC. @@ -90,10 +90,10 @@ pub trait SourceProperties: TryFromBTreeMap + Clone + WithOptions + std::fmt::De /// Load additional info from `ExternalTableDesc`. Currently only used by CDC. fn init_from_pb_cdc_table_desc(&mut self, _table_desc: &ExternalTableDesc) {} - async fn create_split_enumerator( - self, - context: crate::source::SourceEnumeratorContextRef, - ) -> Result; + // async fn create_split_enumerator( + // self, + // context: crate::source::SourceEnumeratorContextRef, + // ) -> Result; } pub trait UnknownFields { @@ -190,8 +190,8 @@ pub async fn create_split_readers( /// [`SplitEnumerator`] fetches the split metadata from the external source service. /// NOTE: It runs in the meta server, so probably it should be moved to the `meta` crate. #[async_trait] -pub trait SplitEnumerator: Sized { - type Split: SplitMetaData + Into + Send; +pub trait SplitEnumerator: Sized + Send { + type Split: SplitMetaData + Send; type Properties; async fn new(properties: Self::Properties, context: SourceEnumeratorContextRef) @@ -209,10 +209,9 @@ pub trait AnySplitEnumerator: Send { } #[async_trait] -impl AnySplitEnumerator for T { +impl>> AnySplitEnumerator for T { async fn list_splits(&mut self) -> Result> { - (self as &mut dyn SplitEnumerator) - .list_splits() + SplitEnumerator::list_splits(self) .await .map(|s| s.into_iter().map(|s| s.into()).collect()) } diff --git a/src/connector/src/source/cdc/enumerator/mod.rs b/src/connector/src/source/cdc/enumerator/mod.rs index 626bc53e8532a..76648fa2d7cec 100644 --- a/src/connector/src/source/cdc/enumerator/mod.rs +++ b/src/connector/src/source/cdc/enumerator/mod.rs @@ -30,7 +30,7 @@ use crate::source::cdc::{ table_schema_exclude_additional_columns, CdcProperties, CdcSourceTypeTrait, Citus, DebeziumCdcSplit, Mongodb, Mysql, Postgres, SqlServer, }; -use crate::source::SplitEnumerator; +use crate::source::{SourceEnumeratorContextRef, SplitEnumerator}; pub const DATABASE_SERVERS_KEY: &str = "database.servers"; @@ -42,10 +42,17 @@ pub struct DebeziumSplitEnumerator { _phantom: PhantomData, } -impl DebeziumSplitEnumerator { - pub async fn new( +#[async_trait] +impl SplitEnumerator for DebeziumSplitEnumerator +where + Self: ListCdcSplits, +{ + type Properties = CdcProperties; + type Split = DebeziumCdcSplit; + + async fn new( props: CdcProperties, - context: crate::source::SourceEnumeratorContextRef, + context: SourceEnumeratorContextRef, ) -> ConnectorResult { let server_addrs = props .properties @@ -112,47 +119,47 @@ impl DebeziumSplitEnumerator { _phantom: PhantomData, }) } -} -#[async_trait] -impl SplitEnumerator for DebeziumSplitEnumerator -where - Self: ListCdcSplits, -{ - async fn list_splits( - &mut self, - ) -> crate::error::ConnectorResult> { + async fn list_splits(&mut self) -> ConnectorResult>> { Ok(self.list_cdc_splits()) } } pub trait ListCdcSplits { type CdcSourceType: CdcSourceTypeTrait; - fn list_cdc_splits(&mut self) -> Vec; + fn list_cdc_splits(&mut self) -> Vec>; } impl ListCdcSplits for DebeziumSplitEnumerator { type CdcSourceType = Mysql; - fn list_cdc_splits(&mut self) -> Vec { + fn list_cdc_splits(&mut self) -> Vec> { // CDC source only supports single split - vec![DebeziumCdcSplit::::new(self.source_id, None, None).into()] + vec![DebeziumCdcSplit::::new( + self.source_id, + None, + None, + )] } } impl ListCdcSplits for DebeziumSplitEnumerator { type CdcSourceType = Postgres; - fn list_cdc_splits(&mut self) -> Vec { + fn list_cdc_splits(&mut self) -> Vec> { // CDC source only supports single split - vec![DebeziumCdcSplit::::new(self.source_id, None, None).into()] + vec![DebeziumCdcSplit::::new( + self.source_id, + None, + None, + )] } } impl ListCdcSplits for DebeziumSplitEnumerator { type CdcSourceType = Citus; - fn list_cdc_splits(&mut self) -> Vec { + fn list_cdc_splits(&mut self) -> Vec> { self.worker_node_addrs .iter() .enumerate() @@ -162,7 +169,6 @@ impl ListCdcSplits for DebeziumSplitEnumerator { None, Some(addr.to_string()), ) - .into() }) .collect_vec() } @@ -170,16 +176,24 @@ impl ListCdcSplits for DebeziumSplitEnumerator { impl ListCdcSplits for DebeziumSplitEnumerator { type CdcSourceType = Mongodb; - fn list_cdc_splits(&mut self) -> Vec { + fn list_cdc_splits(&mut self) -> Vec> { // CDC source only supports single split - vec![DebeziumCdcSplit::::new(self.source_id, None, None).into()] + vec![DebeziumCdcSplit::::new( + self.source_id, + None, + None, + )] } } impl ListCdcSplits for DebeziumSplitEnumerator { type CdcSourceType = SqlServer; - fn list_cdc_splits(&mut self) -> Vec { - vec![DebeziumCdcSplit::::new(self.source_id, None, None).into()] + fn list_cdc_splits(&mut self) -> Vec> { + vec![DebeziumCdcSplit::::new( + self.source_id, + None, + None, + )] } } diff --git a/src/connector/src/source/cdc/mod.rs b/src/connector/src/source/cdc/mod.rs index c9c0f69d9c4ef..cd9f50e17ebab 100644 --- a/src/connector/src/source/cdc/mod.rs +++ b/src/connector/src/source/cdc/mod.rs @@ -208,13 +208,6 @@ where self.is_cdc_source_job = false; self.is_backfill_table = true; } - - async fn create_split_enumerator( - self, - context: crate::source::SourceEnumeratorContextRef, - ) -> crate::error::ConnectorResult { - DebeziumSplitEnumerator::new(self, context).await - } } impl crate::source::UnknownFields for CdcProperties { diff --git a/src/connector/src/source/datagen/enumerator/mod.rs b/src/connector/src/source/datagen/enumerator/mod.rs index 197efc1d9e3b1..8a595bbfa2198 100644 --- a/src/connector/src/source/datagen/enumerator/mod.rs +++ b/src/connector/src/source/datagen/enumerator/mod.rs @@ -16,17 +16,21 @@ use anyhow::Context; use async_trait::async_trait; use crate::source::datagen::{DatagenProperties, DatagenSplit}; -use crate::source::SplitEnumerator; +use crate::source::{SourceEnumeratorContextRef, SplitEnumerator}; #[derive(Debug, Copy, Clone, Eq, PartialEq)] pub struct DatagenSplitEnumerator { split_num: i32, } -impl DatagenSplitEnumerator { - pub fn new( +#[async_trait] +impl SplitEnumerator for DatagenSplitEnumerator { + type Properties = DatagenProperties; + type Split = DatagenSplit; + + async fn new( properties: DatagenProperties, - _context: crate::source::SourceEnumeratorContextRef, + _context: SourceEnumeratorContextRef, ) -> crate::error::ConnectorResult { let split_num = properties.split_num.unwrap_or_else(|| "1".to_owned()); let split_num = split_num @@ -34,23 +38,15 @@ impl DatagenSplitEnumerator { .context("failed to parse datagen split num")?; Ok(Self { split_num }) } -} -#[async_trait] -impl SplitEnumerator for DatagenSplitEnumerator { - async fn list_splits( - &mut self, - ) -> crate::error::ConnectorResult> { + async fn list_splits(&mut self) -> crate::error::ConnectorResult> { let mut splits = vec![]; for i in 0..self.split_num { - splits.push( - DatagenSplit { - split_num: self.split_num, - split_index: i, - start_offset: None, - } - .into(), - ); + splits.push(DatagenSplit { + split_num: self.split_num, + split_index: i, + start_offset: None, + }); } Ok(splits) } diff --git a/src/connector/src/source/datagen/mod.rs b/src/connector/src/source/datagen/mod.rs index e2e30888b7311..a34a2d65dc47d 100644 --- a/src/connector/src/source/datagen/mod.rs +++ b/src/connector/src/source/datagen/mod.rs @@ -63,13 +63,6 @@ impl SourceProperties for DatagenProperties { type SplitReader = DatagenSplitReader; const SOURCE_NAME: &'static str = DATAGEN_CONNECTOR; - - async fn create_split_enumerator( - self, - context: crate::source::SourceEnumeratorContextRef, - ) -> crate::error::ConnectorResult { - DatagenSplitEnumerator::new(self, context) - } } impl crate::source::UnknownFields for DatagenProperties { diff --git a/src/connector/src/source/filesystem/opendal_source/mod.rs b/src/connector/src/source/filesystem/opendal_source/mod.rs index 8172cbc4a37d0..a500b2e50a253 100644 --- a/src/connector/src/source/filesystem/opendal_source/mod.rs +++ b/src/connector/src/source/filesystem/opendal_source/mod.rs @@ -14,6 +14,8 @@ use std::collections::HashMap; +pub use opendal_enumerator::OpendalEnumerator; + pub mod azblob_source; pub mod gcs_source; pub mod posix_fs_source; @@ -25,7 +27,6 @@ use with_options::WithOptions; pub mod opendal_enumerator; pub mod opendal_reader; -pub use self::opendal_enumerator::OpendalEnumerator; use self::opendal_reader::OpendalReader; use super::file_common::CompressionFormat; pub use super::s3::S3PropertiesCommon; @@ -86,13 +87,6 @@ impl SourceProperties for GcsProperties { type SplitReader = OpendalReader; const SOURCE_NAME: &'static str = GCS_CONNECTOR; - - async fn create_split_enumerator( - self, - context: crate::source::SourceEnumeratorContextRef, - ) -> crate::error::ConnectorResult { - OpendalEnumerator::new(self, context) - } } pub trait OpendalSource: Send + Sync + 'static + Clone + PartialEq { @@ -162,13 +156,6 @@ impl SourceProperties for OpendalS3Properties { type SplitReader = OpendalReader; const SOURCE_NAME: &'static str = OPENDAL_S3_CONNECTOR; - - async fn create_split_enumerator( - self, - context: crate::source::SourceEnumeratorContextRef, - ) -> crate::error::ConnectorResult { - OpendalEnumerator::new(self, context) - } } #[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)] @@ -202,13 +189,6 @@ impl SourceProperties for PosixFsProperties { type SplitReader = OpendalReader; const SOURCE_NAME: &'static str = POSIX_FS_CONNECTOR; - - async fn create_split_enumerator( - self, - context: crate::source::SourceEnumeratorContextRef, - ) -> crate::error::ConnectorResult { - OpendalEnumerator::new(self, context) - } } #[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)] @@ -248,13 +228,6 @@ impl SourceProperties for AzblobProperties { type SplitReader = OpendalReader; const SOURCE_NAME: &'static str = AZBLOB_CONNECTOR; - - async fn create_split_enumerator( - self, - context: crate::source::SourceEnumeratorContextRef, - ) -> crate::error::ConnectorResult { - OpendalEnumerator::new(self, context) - } } #[derive(Debug, Clone, Copy, PartialEq, Eq)] diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs b/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs index 9866376ab1cfd..a770d600282a2 100644 --- a/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs +++ b/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs @@ -26,7 +26,7 @@ use super::OpendalSource; use crate::error::ConnectorResult; use crate::source::filesystem::file_common::CompressionFormat; use crate::source::filesystem::{FsPageItem, OpendalFsSplit}; -use crate::source::SplitEnumerator; +use crate::source::{SourceEnumeratorContextRef, SplitEnumerator}; #[derive(Debug, Clone)] pub struct OpendalEnumerator { @@ -38,14 +38,34 @@ pub struct OpendalEnumerator { pub(crate) compression_format: CompressionFormat, } -impl OpendalEnumerator { - pub fn new( +#[async_trait] +impl SplitEnumerator for OpendalEnumerator { + type Properties = Src::Properties; + type Split = OpendalFsSplit; + + async fn new( properties: Src::Properties, - _context: crate::source::SourceEnumeratorContextRef, + _context: SourceEnumeratorContextRef, ) -> ConnectorResult { Src::new_enumerator(properties) } + async fn list_splits(&mut self) -> ConnectorResult>> { + let empty_split: OpendalFsSplit = OpendalFsSplit::empty_split(); + let prefix = self.prefix.as_deref().unwrap_or("/"); + + match self.op.list(prefix).await { + Ok(_) => return Ok(vec![empty_split]), + Err(e) => { + return Err(anyhow!(e) + .context("fail to create source, please check your config.") + .into()) + } + } + } +} + +impl OpendalEnumerator { pub async fn list(&self) -> ConnectorResult { let prefix = self.prefix.as_deref().unwrap_or("/"); @@ -93,34 +113,4 @@ impl OpendalEnumerator { self.prefix.as_deref().unwrap_or("/") } } - -// we don't use generic since impl From for SplitImpl is implemented for concrete types -macro_rules! impl_split_enumerator { - ($source_type:ty) => { - #[async_trait] - impl SplitEnumerator for OpendalEnumerator<$source_type> { - async fn list_splits( - &mut self, - ) -> crate::error::ConnectorResult> { - let empty_split: OpendalFsSplit<$source_type> = OpendalFsSplit::empty_split(); - let prefix = self.prefix.as_deref().unwrap_or("/"); - - match self.op.list(prefix).await { - Ok(_) => return Ok(vec![empty_split.into()]), - Err(e) => { - return Err(anyhow!(e) - .context("fail to create source, please check your config.") - .into()) - } - } - } - } - }; -} - -impl_split_enumerator!(super::OpendalS3); -impl_split_enumerator!(super::OpendalGcs); -impl_split_enumerator!(super::OpendalAzblob); -impl_split_enumerator!(super::OpendalPosixFs); - pub type ObjectMetadataIter = BoxStream<'static, ConnectorResult>; diff --git a/src/connector/src/source/filesystem/s3/enumerator.rs b/src/connector/src/source/filesystem/s3/enumerator.rs index c9b4d8295632f..ff15b1c00dc37 100644 --- a/src/connector/src/source/filesystem/s3/enumerator.rs +++ b/src/connector/src/source/filesystem/s3/enumerator.rs @@ -20,7 +20,7 @@ use crate::aws_utils::{default_conn_config, s3_client}; use crate::connector_common::AwsAuthProps; use crate::source::filesystem::file_common::FsSplit; use crate::source::filesystem::s3::S3Properties; -use crate::source::{FsListInner, SplitEnumerator}; +use crate::source::{FsListInner, SourceEnumeratorContextRef, SplitEnumerator}; /// Get the prefix from a glob pub fn get_prefix(glob: &str) -> String { @@ -67,10 +67,14 @@ pub struct S3SplitEnumerator { pub(crate) next_continuation_token: Option, } -impl S3SplitEnumerator { - pub async fn new( - properties: S3Properties, - _context: crate::source::SourceEnumeratorContextRef, +#[async_trait] +impl SplitEnumerator for S3SplitEnumerator { + type Properties = S3Properties; + type Split = FsSplit; + + async fn new( + properties: Self::Properties, + _context: SourceEnumeratorContextRef, ) -> crate::error::ConnectorResult { let config = AwsAuthProps::from(&properties); let sdk_config = config.build_config().await?; @@ -93,17 +97,12 @@ impl S3SplitEnumerator { next_continuation_token: None, }) } -} -#[async_trait] -impl SplitEnumerator for S3SplitEnumerator { - async fn list_splits( - &mut self, - ) -> crate::error::ConnectorResult> { + async fn list_splits(&mut self) -> crate::error::ConnectorResult> { let mut objects = Vec::new(); loop { let (files, has_finished) = self.get_next_page::().await?; - objects.extend(files.into_iter().map(|f| f.into())); + objects.extend(files); if has_finished { break; } @@ -111,3 +110,45 @@ impl SplitEnumerator for S3SplitEnumerator { Ok(objects) } } + +#[cfg(test)] +mod tests { + use itertools::Itertools; + + #[test] + fn test_get_prefix() { + assert_eq!(&get_prefix("a/"), "a/"); + assert_eq!(&get_prefix("a/**"), "a/"); + assert_eq!(&get_prefix("[ab]*"), ""); + assert_eq!(&get_prefix("a/{a,b}*"), "a/"); + assert_eq!(&get_prefix(r"a/\{a,b}"), "a/{a,b}"); + assert_eq!(&get_prefix(r"a/\[ab]"), "a/[ab]"); + } + + use super::*; + use crate::source::filesystem::file_common::CompressionFormat; + use crate::source::filesystem::s3::S3PropertiesCommon; + use crate::source::SourceEnumeratorContext; + #[tokio::test] + #[ignore] + async fn test_s3_split_enumerator() { + let props = S3PropertiesCommon { + region_name: "ap-southeast-1".to_owned(), + bucket_name: "mingchao-s3-source".to_owned(), + match_pattern: Some("happy[0-9].csv".to_owned()), + access: None, + secret: None, + endpoint_url: None, + compression_format: CompressionFormat::None, + }; + let mut enumerator = + S3SplitEnumerator::new(props.into(), SourceEnumeratorContext::dummy().into()) + .await + .unwrap(); + let splits = enumerator.list_splits().await.unwrap(); + let names = splits.into_iter().map(|split| split.name).collect_vec(); + assert_eq!(names.len(), 2); + assert!(names.contains(&"happy1.csv".to_owned())); + assert!(names.contains(&"happy2.csv".to_owned())); + } +} diff --git a/src/connector/src/source/filesystem/s3/mod.rs b/src/connector/src/source/filesystem/s3/mod.rs index c05ba722efd8e..76b7e9f1f0ca1 100644 --- a/src/connector/src/source/filesystem/s3/mod.rs +++ b/src/connector/src/source/filesystem/s3/mod.rs @@ -70,13 +70,6 @@ impl SourceProperties for S3Properties { type SplitReader = S3FileReader; const SOURCE_NAME: &'static str = S3_CONNECTOR; - - async fn create_split_enumerator( - self, - context: crate::source::SourceEnumeratorContextRef, - ) -> crate::error::ConnectorResult { - S3SplitEnumerator::new(self, context).await - } } impl UnknownFields for S3Properties { diff --git a/src/connector/src/source/filesystem/s3/source/reader.rs b/src/connector/src/source/filesystem/s3/source/reader.rs index 8bef3744305ca..5f2631b913ab4 100644 --- a/src/connector/src/source/filesystem/s3/source/reader.rs +++ b/src/connector/src/source/filesystem/s3/source/reader.rs @@ -235,3 +235,71 @@ impl S3FileReader { } } } + +#[cfg(test)] +mod tests { + use futures_async_stream::for_await; + use risingwave_common::types::DataType; + + use super::*; + use crate::parser::{ + CommonParserConfig, CsvProperties, EncodingProperties, ProtocolProperties, + SpecificParserConfig, + }; + use crate::source::filesystem::file_common::CompressionFormat; + use crate::source::filesystem::s3::S3PropertiesCommon; + use crate::source::filesystem::S3SplitEnumerator; + use crate::source::{ + SourceColumnDesc, SourceContext, SourceEnumeratorContext, SplitEnumerator, + }; + + #[tokio::test] + #[ignore] + async fn test_s3_split_reader() { + let props: S3Properties = S3PropertiesCommon { + region_name: "ap-southeast-1".to_owned(), + bucket_name: "mingchao-s3-source".to_owned(), + match_pattern: None, + access: None, + secret: None, + endpoint_url: None, + compression_format: CompressionFormat::None, + } + .into(); + let mut enumerator = + S3SplitEnumerator::new(props.clone(), SourceEnumeratorContext::dummy().into()) + .await + .unwrap(); + let splits = enumerator.list_splits().await.unwrap(); + println!("splits {:?}", splits); + + let descs = vec![ + SourceColumnDesc::simple("id", DataType::Int64, 1.into()), + SourceColumnDesc::simple("name", DataType::Varchar, 2.into()), + SourceColumnDesc::simple("age", DataType::Int32, 3.into()), + ]; + + let csv_config = CsvProperties { + delimiter: b',', + has_header: true, + }; + + let config = ParserConfig { + common: CommonParserConfig { rw_columns: descs }, + specific: SpecificParserConfig { + encoding_config: EncodingProperties::Csv(csv_config), + protocol_config: ProtocolProperties::Plain, + }, + }; + + let reader = S3FileReader::new(props, splits, config, SourceContext::dummy().into(), None) + .await + .unwrap(); + + let msg_stream = reader.into_stream_inner(); + #[for_await] + for msg in msg_stream { + println!("msg {:?}", msg); + } + } +} diff --git a/src/connector/src/source/google_pubsub/enumerator/client.rs b/src/connector/src/source/google_pubsub/enumerator/client.rs index 6d0d58ed89143..93532a9e3954f 100644 --- a/src/connector/src/source/google_pubsub/enumerator/client.rs +++ b/src/connector/src/source/google_pubsub/enumerator/client.rs @@ -22,16 +22,21 @@ use crate::error::ConnectorResult; use crate::source::base::SplitEnumerator; use crate::source::google_pubsub::split::PubsubSplit; use crate::source::google_pubsub::PubsubProperties; +use crate::source::SourceEnumeratorContextRef; pub struct PubsubSplitEnumerator { subscription: String, split_count: u32, } -impl PubsubSplitEnumerator { - pub async fn new( - properties: PubsubProperties, - _context: crate::source::SourceEnumeratorContextRef, +#[async_trait] +impl SplitEnumerator for PubsubSplitEnumerator { + type Properties = PubsubProperties; + type Split = PubsubSplit; + + async fn new( + properties: Self::Properties, + _context: SourceEnumeratorContextRef, ) -> ConnectorResult { let split_count = properties.parallelism.unwrap_or(1); if split_count < 1 { @@ -77,23 +82,15 @@ impl PubsubSplitEnumerator { split_count, }) } -} -#[async_trait] -impl SplitEnumerator for PubsubSplitEnumerator { - async fn list_splits( - &mut self, - ) -> crate::error::ConnectorResult> { + async fn list_splits(&mut self) -> ConnectorResult> { tracing::debug!("enumerating pubsub splits"); - let splits: Vec<_> = (0..self.split_count) - .map(|i| { - PubsubSplit { - index: i, - subscription: self.subscription.to_owned(), - __deprecated_start_offset: None, - __deprecated_stop_offset: None, - } - .into() + let splits: Vec = (0..self.split_count) + .map(|i| PubsubSplit { + index: i, + subscription: self.subscription.to_owned(), + __deprecated_start_offset: None, + __deprecated_stop_offset: None, }) .collect(); diff --git a/src/connector/src/source/google_pubsub/mod.rs b/src/connector/src/source/google_pubsub/mod.rs index 7e51bbe6e987d..d6517ecb75fb9 100644 --- a/src/connector/src/source/google_pubsub/mod.rs +++ b/src/connector/src/source/google_pubsub/mod.rs @@ -97,13 +97,6 @@ impl SourceProperties for PubsubProperties { type SplitReader = PubsubSplitReader; const SOURCE_NAME: &'static str = GOOGLE_PUBSUB_CONNECTOR; - - async fn create_split_enumerator( - self, - context: crate::source::SourceEnumeratorContextRef, - ) -> crate::error::ConnectorResult { - PubsubSplitEnumerator::new(self, context).await - } } impl crate::source::UnknownFields for PubsubProperties { diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index eb616cd2a37d4..42fc791b3a677 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -41,8 +41,8 @@ use crate::connector_common::IcebergCommon; use crate::error::{ConnectorError, ConnectorResult}; use crate::parser::ParserConfig; use crate::source::{ - BoxSourceChunkStream, Column, SourceContextRef, SourceProperties, SplitEnumerator, SplitId, - SplitMetaData, SplitReader, UnknownFields, + BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties, + SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields, }; pub const ICEBERG_CONNECTOR: &str = "iceberg"; @@ -110,13 +110,6 @@ impl SourceProperties for IcebergProperties { type SplitReader = IcebergFileReader; const SOURCE_NAME: &'static str = ICEBERG_CONNECTOR; - - async fn create_split_enumerator( - self, - context: crate::source::SourceEnumeratorContextRef, - ) -> crate::error::ConnectorResult { - IcebergSplitEnumerator::new(self, context) - } } impl UnknownFields for IcebergProperties { @@ -255,31 +248,28 @@ pub struct IcebergSplitEnumerator { config: IcebergProperties, } -impl IcebergSplitEnumerator { - pub fn new( - properties: IcebergProperties, - context: crate::source::SourceEnumeratorContextRef, +#[async_trait] +impl SplitEnumerator for IcebergSplitEnumerator { + type Properties = IcebergProperties; + type Split = IcebergSplit; + + async fn new( + properties: Self::Properties, + context: SourceEnumeratorContextRef, ) -> ConnectorResult { Ok(Self::new_inner(properties, context)) } - pub fn new_inner( - properties: IcebergProperties, - _context: crate::source::SourceEnumeratorContextRef, - ) -> Self { - Self { config: properties } - } -} - -#[async_trait] -impl SplitEnumerator for IcebergSplitEnumerator { - async fn list_splits( - &mut self, - ) -> crate::error::ConnectorResult> { + async fn list_splits(&mut self) -> ConnectorResult> { // Iceberg source does not support streaming queries Ok(vec![]) } } +impl IcebergSplitEnumerator { + pub fn new_inner(properties: IcebergProperties, _context: SourceEnumeratorContextRef) -> Self { + Self { config: properties } + } +} pub enum IcebergTimeTravelInfo { Version(i64), diff --git a/src/connector/src/source/kafka/enumerator/client.rs b/src/connector/src/source/kafka/enumerator/client.rs index d1a377e6124b1..416a704dc8bcc 100644 --- a/src/connector/src/source/kafka/enumerator/client.rs +++ b/src/connector/src/source/kafka/enumerator/client.rs @@ -33,6 +33,7 @@ use crate::source::kafka::{ KafkaConnectionProps, KafkaContextCommon, KafkaProperties, RwConsumerContext, KAFKA_ISOLATION_LEVEL, }; +use crate::source::SourceEnumeratorContextRef; type KafkaClientType = BaseConsumer; @@ -48,7 +49,7 @@ pub enum KafkaEnumeratorOffset { } pub struct KafkaSplitEnumerator { - context: crate::source::SourceEnumeratorContextRef, + context: SourceEnumeratorContextRef, broker_address: String, topic: String, client: Arc, @@ -61,10 +62,16 @@ pub struct KafkaSplitEnumerator { high_watermark_metrics: HashMap>, } -impl KafkaSplitEnumerator { - pub async fn new( +impl KafkaSplitEnumerator {} + +#[async_trait] +impl SplitEnumerator for KafkaSplitEnumerator { + type Properties = KafkaProperties; + type Split = KafkaSplit; + + async fn new( properties: KafkaProperties, - context: crate::source::SourceEnumeratorContextRef, + context: SourceEnumeratorContextRef, ) -> ConnectorResult { let mut config = rdkafka::ClientConfig::new(); let common_props = &properties.common; @@ -159,13 +166,8 @@ impl KafkaSplitEnumerator { high_watermark_metrics: HashMap::new(), }) } -} -#[async_trait] -impl SplitEnumerator for KafkaSplitEnumerator { - async fn list_splits( - &mut self, - ) -> crate::error::ConnectorResult> { + async fn list_splits(&mut self) -> ConnectorResult> { let topic_partitions = self.fetch_topic_partition().await.with_context(|| { format!( "failed to fetch metadata from kafka ({})", @@ -184,14 +186,11 @@ impl SplitEnumerator for KafkaSplitEnumerator { let ret: Vec<_> = topic_partitions .into_iter() - .map(|partition| { - KafkaSplit { - topic: self.topic.clone(), - partition, - start_offset: start_offsets.remove(&partition).unwrap(), - stop_offset: stop_offsets.remove(&partition).unwrap(), - } - .into() + .map(|partition| KafkaSplit { + topic: self.topic.clone(), + partition, + start_offset: start_offsets.remove(&partition).unwrap(), + stop_offset: stop_offsets.remove(&partition).unwrap(), }) .collect(); diff --git a/src/connector/src/source/kafka/mod.rs b/src/connector/src/source/kafka/mod.rs index 40d6ae3fd1f5a..425011a3e677c 100644 --- a/src/connector/src/source/kafka/mod.rs +++ b/src/connector/src/source/kafka/mod.rs @@ -168,13 +168,6 @@ impl SourceProperties for KafkaProperties { type SplitReader = KafkaSplitReader; const SOURCE_NAME: &'static str = KAFKA_CONNECTOR; - - async fn create_split_enumerator( - self, - context: crate::source::SourceEnumeratorContextRef, - ) -> crate::error::ConnectorResult { - KafkaSplitEnumerator::new(self, context).await - } } impl crate::source::UnknownFields for KafkaProperties { diff --git a/src/connector/src/source/kinesis/enumerator/client.rs b/src/connector/src/source/kinesis/enumerator/client.rs index 02924d32d716b..a8f1e08ec86d9 100644 --- a/src/connector/src/source/kinesis/enumerator/client.rs +++ b/src/connector/src/source/kinesis/enumerator/client.rs @@ -21,7 +21,7 @@ use risingwave_common::bail; use crate::error::ConnectorResult as Result; use crate::source::kinesis::split::KinesisOffset; use crate::source::kinesis::*; -use crate::source::SplitEnumerator; +use crate::source::{SourceEnumeratorContextRef, SplitEnumerator}; pub struct KinesisSplitEnumerator { stream_name: String, @@ -30,10 +30,14 @@ pub struct KinesisSplitEnumerator { impl KinesisSplitEnumerator {} -impl KinesisSplitEnumerator { - pub async fn new( +#[async_trait] +impl SplitEnumerator for KinesisSplitEnumerator { + type Properties = KinesisProperties; + type Split = KinesisSplit; + + async fn new( properties: KinesisProperties, - _context: crate::source::SourceEnumeratorContextRef, + _context: SourceEnumeratorContextRef, ) -> Result { let client = properties.common.build_client().await?; let stream_name = properties.common.stream_name.clone(); @@ -42,13 +46,8 @@ impl KinesisSplitEnumerator { client, }) } -} -#[async_trait] -impl SplitEnumerator for KinesisSplitEnumerator { - async fn list_splits( - &mut self, - ) -> crate::error::ConnectorResult> { + async fn list_splits(&mut self) -> Result> { let mut next_token: Option = None; let mut shard_collect: Vec = Vec::new(); @@ -85,14 +84,11 @@ impl SplitEnumerator for KinesisSplitEnumerator { } Ok(shard_collect .into_iter() - .map(|x| { - KinesisSplit { - shard_id: x.shard_id().to_owned().into(), - // handle start with position in reader part - next_offset: KinesisOffset::None, - end_offset: KinesisOffset::None, - } - .into() + .map(|x| KinesisSplit { + shard_id: x.shard_id().to_owned().into(), + // handle start with position in reader part + next_offset: KinesisOffset::None, + end_offset: KinesisOffset::None, }) .collect()) } diff --git a/src/connector/src/source/kinesis/mod.rs b/src/connector/src/source/kinesis/mod.rs index 372be99fc2577..89ccccb007379 100644 --- a/src/connector/src/source/kinesis/mod.rs +++ b/src/connector/src/source/kinesis/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. pub mod enumerator; +pub use enumerator::client::KinesisSplitEnumerator; pub mod source; pub mod split; @@ -24,7 +25,6 @@ pub use source::KinesisMeta; use with_options::WithOptions; use crate::connector_common::KinesisCommon; -pub use crate::source::kinesis::enumerator::client::KinesisSplitEnumerator; use crate::source::kinesis::source::reader::KinesisSplitReader; use crate::source::kinesis::split::KinesisSplit; use crate::source::SourceProperties; @@ -55,13 +55,6 @@ impl SourceProperties for KinesisProperties { type SplitReader = KinesisSplitReader; const SOURCE_NAME: &'static str = KINESIS_CONNECTOR; - - async fn create_split_enumerator( - self, - context: crate::source::SourceEnumeratorContextRef, - ) -> crate::error::ConnectorResult { - KinesisSplitEnumerator::new(self, context).await - } } impl crate::source::UnknownFields for KinesisProperties { diff --git a/src/connector/src/source/mqtt/enumerator/mod.rs b/src/connector/src/source/mqtt/enumerator/mod.rs index 5cf2d290cadef..738298205c3e3 100644 --- a/src/connector/src/source/mqtt/enumerator/mod.rs +++ b/src/connector/src/source/mqtt/enumerator/mod.rs @@ -26,7 +26,7 @@ use tokio::sync::RwLock; use super::source::MqttSplit; use super::MqttProperties; use crate::error::ConnectorResult; -use crate::source::SplitEnumerator; +use crate::source::{SourceEnumeratorContextRef, SplitEnumerator}; pub struct MqttSplitEnumerator { #[expect(dead_code)] @@ -38,10 +38,14 @@ pub struct MqttSplitEnumerator { stopped: Arc, } -impl MqttSplitEnumerator { - pub async fn new( - properties: MqttProperties, - context: crate::source::SourceEnumeratorContextRef, +#[async_trait] +impl SplitEnumerator for MqttSplitEnumerator { + type Properties = MqttProperties; + type Split = MqttSplit; + + async fn new( + properties: Self::Properties, + context: SourceEnumeratorContextRef, ) -> ConnectorResult { let (client, mut eventloop) = properties.common.build_client(context.info.source_id, 0)?; @@ -112,13 +116,8 @@ impl MqttSplitEnumerator { stopped, }) } -} -#[async_trait] -impl SplitEnumerator for MqttSplitEnumerator { - async fn list_splits( - &mut self, - ) -> crate::error::ConnectorResult> { + async fn list_splits(&mut self) -> ConnectorResult> { if !self.connected.load(std::sync::atomic::Ordering::Relaxed) { let start = std::time::Instant::now(); loop { @@ -135,11 +134,7 @@ impl SplitEnumerator for MqttSplitEnumerator { } let topics = self.topics.read().await; - Ok(topics - .iter() - .cloned() - .map(|t| MqttSplit::new(t).into()) - .collect()) + Ok(topics.iter().cloned().map(MqttSplit::new).collect()) } } diff --git a/src/connector/src/source/mqtt/mod.rs b/src/connector/src/source/mqtt/mod.rs index e3116696d6551..17153bf42e23e 100644 --- a/src/connector/src/source/mqtt/mod.rs +++ b/src/connector/src/source/mqtt/mod.rs @@ -13,8 +13,8 @@ // limitations under the License. pub mod enumerator; -pub use enumerator::MqttSplitEnumerator; pub mod source; +pub use enumerator::MqttSplitEnumerator; pub mod split; use std::collections::HashMap; @@ -64,13 +64,6 @@ impl SourceProperties for MqttProperties { type SplitReader = MqttSplitReader; const SOURCE_NAME: &'static str = MQTT_CONNECTOR; - - async fn create_split_enumerator( - self, - context: crate::source::SourceEnumeratorContextRef, - ) -> crate::error::ConnectorResult { - MqttSplitEnumerator::new(self, context).await - } } impl crate::source::UnknownFields for MqttProperties { diff --git a/src/connector/src/source/nats/enumerator/mod.rs b/src/connector/src/source/nats/enumerator/mod.rs index 7ccd0567d767b..16bae5e847a1a 100644 --- a/src/connector/src/source/nats/enumerator/mod.rs +++ b/src/connector/src/source/nats/enumerator/mod.rs @@ -20,7 +20,7 @@ use risingwave_common::bail; use super::source::{NatsOffset, NatsSplit}; use super::NatsProperties; use crate::error::ConnectorResult; -use crate::source::{SplitEnumerator, SplitId}; +use crate::source::{SourceEnumeratorContextRef, SplitEnumerator, SplitId}; #[derive(Debug, Clone)] pub struct NatsSplitEnumerator { @@ -30,10 +30,14 @@ pub struct NatsSplitEnumerator { client: async_nats::Client, } -impl NatsSplitEnumerator { - pub async fn new( - properties: NatsProperties, - _context: crate::source::SourceEnumeratorContextRef, +#[async_trait] +impl SplitEnumerator for NatsSplitEnumerator { + type Properties = NatsProperties; + type Split = NatsSplit; + + async fn new( + properties: Self::Properties, + _context: SourceEnumeratorContextRef, ) -> ConnectorResult { let client = properties.common.build_client().await?; Ok(Self { @@ -42,13 +46,8 @@ impl NatsSplitEnumerator { client, }) } -} -#[async_trait] -impl SplitEnumerator for NatsSplitEnumerator { - async fn list_splits( - &mut self, - ) -> crate::error::ConnectorResult> { + async fn list_splits(&mut self) -> ConnectorResult> { // Nats currently does not support list_splits API, if we simple return the default 0 without checking the client status, will result executor crash let state = self.client.connection_state(); if state != async_nats::connection::State::Connected { @@ -64,6 +63,6 @@ impl SplitEnumerator for NatsSplitEnumerator { start_sequence: NatsOffset::None, }; - Ok(vec![nats_split.into()]) + Ok(vec![nats_split]) } } diff --git a/src/connector/src/source/nats/mod.rs b/src/connector/src/source/nats/mod.rs index a3f363bd96b64..983878d18719b 100644 --- a/src/connector/src/source/nats/mod.rs +++ b/src/connector/src/source/nats/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. pub mod enumerator; +pub use enumerator::NatsSplitEnumerator; pub mod source; pub mod split; @@ -22,7 +23,6 @@ use std::time::Duration; use async_nats::jetstream::consumer::pull::Config; use async_nats::jetstream::consumer::{AckPolicy, ReplayPolicy}; -pub use enumerator::NatsSplitEnumerator; use serde::Deserialize; use serde_with::{serde_as, DisplayFromStr}; use thiserror::Error; @@ -287,13 +287,6 @@ impl SourceProperties for NatsProperties { type SplitReader = NatsSplitReader; const SOURCE_NAME: &'static str = NATS_CONNECTOR; - - async fn create_split_enumerator( - self, - context: crate::source::SourceEnumeratorContextRef, - ) -> crate::error::ConnectorResult { - NatsSplitEnumerator::new(self, context).await - } } impl crate::source::UnknownFields for NatsProperties { diff --git a/src/connector/src/source/nexmark/enumerator/mod.rs b/src/connector/src/source/nexmark/enumerator/mod.rs index 17ca6e7bdecf4..e05edd1435fcf 100644 --- a/src/connector/src/source/nexmark/enumerator/mod.rs +++ b/src/connector/src/source/nexmark/enumerator/mod.rs @@ -17,37 +17,35 @@ use async_trait::async_trait; use crate::error::ConnectorResult; use crate::source::nexmark::split::NexmarkSplit; use crate::source::nexmark::NexmarkProperties; -use crate::source::SplitEnumerator; +use crate::source::{SourceEnumeratorContextRef, SplitEnumerator}; pub struct NexmarkSplitEnumerator { split_num: i32, } -impl NexmarkSplitEnumerator { - pub fn new( +impl NexmarkSplitEnumerator {} + +#[async_trait] +impl SplitEnumerator for NexmarkSplitEnumerator { + type Properties = NexmarkProperties; + type Split = NexmarkSplit; + + async fn new( properties: NexmarkProperties, - _context: crate::source::SourceEnumeratorContextRef, + _context: SourceEnumeratorContextRef, ) -> ConnectorResult { let split_num = properties.split_num; Ok(Self { split_num }) } -} -#[async_trait] -impl SplitEnumerator for NexmarkSplitEnumerator { - async fn list_splits( - &mut self, - ) -> crate::error::ConnectorResult> { + async fn list_splits(&mut self) -> ConnectorResult> { let mut splits = vec![]; for i in 0..self.split_num { - splits.push( - NexmarkSplit { - split_num: self.split_num, - split_index: i, - start_offset: None, - } - .into(), - ); + splits.push(NexmarkSplit { + split_num: self.split_num, + split_index: i, + start_offset: None, + }); } Ok(splits) } diff --git a/src/connector/src/source/nexmark/mod.rs b/src/connector/src/source/nexmark/mod.rs index a05876054e3f9..be10b8478511d 100644 --- a/src/connector/src/source/nexmark/mod.rs +++ b/src/connector/src/source/nexmark/mod.rs @@ -228,13 +228,6 @@ impl SourceProperties for NexmarkProperties { type SplitReader = NexmarkSplitReader; const SOURCE_NAME: &'static str = NEXMARK_CONNECTOR; - - async fn create_split_enumerator( - self, - context: crate::source::SourceEnumeratorContextRef, - ) -> crate::error::ConnectorResult { - NexmarkSplitEnumerator::new(self, context) - } } impl crate::source::UnknownFields for NexmarkProperties { diff --git a/src/connector/src/source/nexmark/source/reader.rs b/src/connector/src/source/nexmark/source/reader.rs index 8846e9cc67d11..aeeb2ece91742 100644 --- a/src/connector/src/source/nexmark/source/reader.rs +++ b/src/connector/src/source/nexmark/source/reader.rs @@ -206,3 +206,85 @@ impl NexmarkSplitReader { tracing::debug!(?self.event_type, "nexmark generator finished"); } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::source::nexmark::NexmarkSplitEnumerator; + use crate::source::{SourceContext, SourceEnumeratorContext, SplitEnumerator}; + + #[tokio::test] + async fn test_nexmark_split_reader() -> crate::error::ConnectorResult<()> { + let props = NexmarkProperties { + split_num: 2, + min_event_gap_in_ns: 0, + table_type: Some(EventType::Bid), + max_chunk_size: 5, + ..Default::default() + }; + + let mut enumerator = + NexmarkSplitEnumerator::new(props.clone(), SourceEnumeratorContext::dummy().into()) + .await?; + let list_splits_resp: Vec<_> = enumerator.list_splits().await?.into_iter().collect(); + + assert_eq!(list_splits_resp.len(), 2); + + for split in list_splits_resp { + let state = vec![split]; + let mut reader = NexmarkSplitReader::new( + props.clone(), + state, + Default::default(), + SourceContext::dummy().into(), + None, + ) + .await? + .into_stream(); + let _chunk = reader.next().await.unwrap()?; + } + + Ok(()) + } + + #[tokio::test] + async fn test_nexmark_event_num() -> crate::error::ConnectorResult<()> { + let max_chunk_size = 32; + let event_num = max_chunk_size * 128 + 1; + let props = NexmarkProperties { + split_num: 1, + min_event_gap_in_ns: 0, + table_type: None, + max_chunk_size, + event_num, + ..Default::default() + }; + + let mut enumerator = + NexmarkSplitEnumerator::new(props.clone(), SourceEnumeratorContext::dummy().into()) + .await?; + let list_splits_resp: Vec<_> = enumerator.list_splits().await?.into_iter().collect(); + + for split in list_splits_resp { + let state = vec![split]; + let reader = NexmarkSplitReader::new( + props.clone(), + state, + Default::default(), + SourceContext::dummy().into(), + None, + ) + .await? + .into_stream(); + let (rows_count, chunk_count) = reader + .fold((0, 0), |acc, x| async move { + (acc.0 + x.unwrap().cardinality(), acc.1 + 1) + }) + .await; + assert_eq!(rows_count as u64, event_num); + assert_eq!(chunk_count as u64, event_num / max_chunk_size + 1); + } + + Ok(()) + } +} diff --git a/src/connector/src/source/pulsar/enumerator/client.rs b/src/connector/src/source/pulsar/enumerator/client.rs index b043fc3df1531..6a4f19bd68941 100644 --- a/src/connector/src/source/pulsar/enumerator/client.rs +++ b/src/connector/src/source/pulsar/enumerator/client.rs @@ -23,7 +23,7 @@ use crate::error::ConnectorResult; use crate::source::pulsar::split::PulsarSplit; use crate::source::pulsar::topic::{parse_topic, Topic}; use crate::source::pulsar::PulsarProperties; -use crate::source::SplitEnumerator; +use crate::source::{SourceEnumeratorContextRef, SplitEnumerator}; pub struct PulsarSplitEnumerator { client: Pulsar, @@ -39,10 +39,14 @@ pub enum PulsarEnumeratorOffset { Timestamp(i64), } -impl PulsarSplitEnumerator { - pub async fn new( +#[async_trait] +impl SplitEnumerator for PulsarSplitEnumerator { + type Properties = PulsarProperties; + type Split = PulsarSplit; + + async fn new( properties: PulsarProperties, - _context: crate::source::SourceEnumeratorContextRef, + _context: SourceEnumeratorContextRef, ) -> ConnectorResult { let pulsar = properties .common @@ -66,7 +70,7 @@ impl PulsarSplitEnumerator { } }; - if let Some(s) = &properties.time_offset { + if let Some(s) = properties.time_offset { let time_offset = s.parse::().map_err(|e| anyhow!(e))?; scan_start_offset = PulsarEnumeratorOffset::Timestamp(time_offset) } @@ -77,13 +81,8 @@ impl PulsarSplitEnumerator { start_offset: scan_start_offset, }) } -} -#[async_trait] -impl SplitEnumerator for PulsarSplitEnumerator { - async fn list_splits( - &mut self, - ) -> crate::error::ConnectorResult> { + async fn list_splits(&mut self) -> ConnectorResult> { let offset = self.start_offset.clone(); // MessageId is only used when recovering from a State assert!(!matches!(offset, PulsarEnumeratorOffset::MessageId(_))); @@ -97,12 +96,9 @@ impl SplitEnumerator for PulsarSplitEnumerator { let splits = if topic_partitions > 0 { // partitioned topic (0..topic_partitions as i32) - .map(|p| { - PulsarSplit { - topic: self.topic.sub_topic(p).unwrap(), - start_offset: offset.clone(), - } - .into() + .map(|p| PulsarSplit { + topic: self.topic.sub_topic(p).unwrap(), + start_offset: offset.clone(), }) .collect_vec() } else { @@ -110,8 +106,7 @@ impl SplitEnumerator for PulsarSplitEnumerator { vec![PulsarSplit { topic: self.topic.clone(), start_offset: offset.clone(), - } - .into()] + }] }; Ok(splits) diff --git a/src/connector/src/source/pulsar/mod.rs b/src/connector/src/source/pulsar/mod.rs index 66f1449170ef5..a6ecfe7f6a738 100644 --- a/src/connector/src/source/pulsar/mod.rs +++ b/src/connector/src/source/pulsar/mod.rs @@ -37,13 +37,6 @@ impl SourceProperties for PulsarProperties { type SplitReader = PulsarSplitReader; const SOURCE_NAME: &'static str = PULSAR_CONNECTOR; - - async fn create_split_enumerator( - self, - context: crate::source::SourceEnumeratorContextRef, - ) -> crate::error::ConnectorResult { - PulsarSplitEnumerator::new(self, context).await - } } impl crate::source::UnknownFields for PulsarProperties { diff --git a/src/connector/src/source/test_source.rs b/src/connector/src/source/test_source.rs index 25cb725816b67..cbfa8e7d24e70 100644 --- a/src/connector/src/source/test_source.rs +++ b/src/connector/src/source/test_source.rs @@ -162,33 +162,31 @@ impl SplitMetaData for TestSourceSplit { pub struct TestSourceSplitEnumerator { properties: TestSourceProperties, - context: crate::source::SourceEnumeratorContextRef, + context: SourceEnumeratorContextRef, } -impl TestSourceSplitEnumerator { - pub fn new( - properties: TestSourceProperties, - context: crate::source::SourceEnumeratorContextRef, +#[async_trait] +impl SplitEnumerator for TestSourceSplitEnumerator { + type Properties = TestSourceProperties; + type Split = TestSourceSplit; + + async fn new( + properties: Self::Properties, + context: SourceEnumeratorContextRef, ) -> ConnectorResult { Ok(Self { properties, context, }) } -} -#[async_trait] -impl SplitEnumerator for TestSourceSplitEnumerator { - async fn list_splits( - &mut self, - ) -> crate::error::ConnectorResult> { - let splits = (get_registry() + async fn list_splits(&mut self) -> ConnectorResult> { + (get_registry() .box_source .lock() .as_mut() .expect("should have init") - .list_split)(self.properties.clone(), self.context.clone())?; - Ok(splits.into_iter().map(|s| s.into()).collect()) + .list_split)(self.properties.clone(), self.context.clone()) } } @@ -243,13 +241,6 @@ impl SourceProperties for TestSourceProperties { type SplitReader = TestSourceSplitReader; const SOURCE_NAME: &'static str = TEST_CONNECTOR; - - async fn create_split_enumerator( - self, - context: crate::source::SourceEnumeratorContextRef, - ) -> crate::error::ConnectorResult { - TestSourceSplitEnumerator::new(self, context) - } } impl crate::source::UnknownFields for TestSourceProperties { diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index d7e8041896941..3949f1f2d9f5d 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -43,7 +43,9 @@ use risingwave_connector::source::filesystem::opendal_source::{ use risingwave_connector::source::iceberg::{IcebergSplitEnumerator, IcebergTimeTravelInfo}; use risingwave_connector::source::kafka::KafkaSplitEnumerator; use risingwave_connector::source::reader::reader::build_opendal_fs_list_for_batch; -use risingwave_connector::source::{ConnectorProperties, SourceEnumeratorContext, SplitImpl}; +use risingwave_connector::source::{ + ConnectorProperties, SourceEnumeratorContext, SplitEnumerator, SplitImpl, +}; use risingwave_pb::batch_plan::iceberg_scan_node::IcebergScanType; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::{ExchangeInfo, ScanRange as ScanRangeProto}; @@ -378,7 +380,8 @@ impl SourceScanInfo { SourceFetchParameters::IcebergSpecificInfo(iceberg_specific_info), ) => { let iceberg_enumerator = - IcebergSplitEnumerator::new(*prop, SourceEnumeratorContext::dummy().into())?; + IcebergSplitEnumerator::new(*prop, SourceEnumeratorContext::dummy().into()) + .await?; let time_travel_info = match fetch_info.as_of { Some(AsOf::VersionNum(v)) => Some(IcebergTimeTravelInfo::Version(v)), diff --git a/src/meta/service/src/cloud_service.rs b/src/meta/service/src/cloud_service.rs index 173550896f936..c6aa77e318ab3 100644 --- a/src/meta/service/src/cloud_service.rs +++ b/src/meta/service/src/cloud_service.rs @@ -19,7 +19,7 @@ use async_trait::async_trait; use regex::Regex; use risingwave_connector::error::ConnectorResult; use risingwave_connector::source::{ - AnySplitEnumerator, ConnectorProperties, SourceEnumeratorContext, SplitEnumerator, + AnySplitEnumerator, ConnectorProperties, SourceEnumeratorContext, }; use risingwave_connector::WithOptionsSecResolved; use risingwave_pb::cloud_service::cloud_service_server::CloudService; diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 0391c0e06a546..003e25de042bd 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -33,7 +33,7 @@ use risingwave_common::{bail, bail_not_implemented, hash, must_match}; use risingwave_connector::connector_common::validate_connection; use risingwave_connector::error::ConnectorError; use risingwave_connector::source::{ - AnySplitEnumerator, ConnectorProperties, SourceEnumeratorContext, SplitEnumerator, + AnySplitEnumerator, ConnectorProperties, SourceEnumeratorContext, }; use risingwave_connector::WithOptionsSecResolved; use risingwave_meta_model::object::ObjectType; diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index 761d6b768b05d..3375f41c9ca67 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -27,7 +27,7 @@ use risingwave_common::metrics::LabelGuardedIntGauge; use risingwave_connector::error::ConnectorResult; use risingwave_connector::source::{ fill_adaptive_split, ConnectorProperties, SourceEnumeratorContext, SourceEnumeratorInfo, - SplitEnumerator, SplitId, SplitImpl, SplitMetaData, + SplitId, SplitImpl, SplitMetaData, }; use risingwave_connector::WithOptionsSecResolved; use risingwave_meta_model::SourceId;