Skip to content

Commit

Permalink
new trait, revert impl
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Jan 13, 2025
1 parent 4b50846 commit 3d1a47b
Show file tree
Hide file tree
Showing 34 changed files with 432 additions and 374 deletions.
1 change: 1 addition & 0 deletions src/bench/sink_bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ impl MockDatagenSource {
properties.clone(),
SourceEnumeratorContext::dummy().into(),
)
.await
.unwrap();
let parser_config = ParserConfig {
specific: SpecificParserConfig {
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,11 @@ macro_rules! impl_connector_properties {
$crate::paste! {
impl ConnectorProperties {
pub async fn create_split_enumerator(self, context: $crate::source::base::SourceEnumeratorContextRef) -> $crate::error::ConnectorResult<Box<dyn AnySplitEnumerator>> {
use crate::source::prelude::*;
let enumerator: Box<dyn AnySplitEnumerator> = match self {
$(
ConnectorProperties::$variant_name(prop) =>
Box::new(prop.create_split_enumerator(context).await?),
Box::new( [<$variant_name SplitEnumerator>]::new(*prop, context).await?),
)*
};
Ok(enumerator)
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use crate::sink::{DummySinkCommitCoordinator, Result, SinkWriterParam};
use crate::source::kafka::{
KafkaContextCommon, KafkaProperties, KafkaSplitEnumerator, RwProducerContext,
};
use crate::source::SourceEnumeratorContext;
use crate::source::{SourceEnumeratorContext, SplitEnumerator};
use crate::{
deserialize_duration_from_string, deserialize_u32_from_string, dispatch_sink_formatter_impl,
};
Expand Down
19 changes: 9 additions & 10 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub trait SourceProperties: TryFromBTreeMap + Clone + WithOptions + std::fmt::De
type Split: SplitMetaData
+ TryFrom<SplitImpl, Error = crate::error::ConnectorError>
+ Into<SplitImpl>;
type SplitEnumerator: SplitEnumerator<Properties = Self, Split = Self::Split>;;
type SplitEnumerator: SplitEnumerator<Properties = Self, Split = Self::Split>;
type SplitReader: SplitReader<Split = Self::Split, Properties = Self>;

/// Load additional info from `PbSource`. Currently only used by CDC.
Expand All @@ -90,10 +90,10 @@ pub trait SourceProperties: TryFromBTreeMap + Clone + WithOptions + std::fmt::De
/// Load additional info from `ExternalTableDesc`. Currently only used by CDC.
fn init_from_pb_cdc_table_desc(&mut self, _table_desc: &ExternalTableDesc) {}

async fn create_split_enumerator(
self,
context: crate::source::SourceEnumeratorContextRef,
) -> Result<Self::SplitEnumerator>;
// async fn create_split_enumerator(
// self,
// context: crate::source::SourceEnumeratorContextRef,
// ) -> Result<Self::SplitEnumerator>;
}

pub trait UnknownFields {
Expand Down Expand Up @@ -190,8 +190,8 @@ pub async fn create_split_readers<P: SourceProperties>(
/// [`SplitEnumerator`] fetches the split metadata from the external source service.
/// NOTE: It runs in the meta server, so probably it should be moved to the `meta` crate.
#[async_trait]
pub trait SplitEnumerator: Sized {
type Split: SplitMetaData + Into<SplitImpl> + Send;
pub trait SplitEnumerator: Sized + Send {
type Split: SplitMetaData + Send;
type Properties;

async fn new(properties: Self::Properties, context: SourceEnumeratorContextRef)
Expand All @@ -209,10 +209,9 @@ pub trait AnySplitEnumerator: Send {
}

#[async_trait]
impl<T: SplitEnumerator> AnySplitEnumerator for T {
impl<T: SplitEnumerator<Split: Into<SplitImpl>>> AnySplitEnumerator for T {
async fn list_splits(&mut self) -> Result<Vec<SplitImpl>> {
(self as &mut dyn SplitEnumerator)
.list_splits()
SplitEnumerator::list_splits(self)
.await
.map(|s| s.into_iter().map(|s| s.into()).collect())
}
Expand Down
62 changes: 38 additions & 24 deletions src/connector/src/source/cdc/enumerator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::source::cdc::{
table_schema_exclude_additional_columns, CdcProperties, CdcSourceTypeTrait, Citus,
DebeziumCdcSplit, Mongodb, Mysql, Postgres, SqlServer,
};
use crate::source::SplitEnumerator;
use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};

pub const DATABASE_SERVERS_KEY: &str = "database.servers";

Expand All @@ -42,10 +42,17 @@ pub struct DebeziumSplitEnumerator<T: CdcSourceTypeTrait> {
_phantom: PhantomData<T>,
}

impl<T: CdcSourceTypeTrait> DebeziumSplitEnumerator<T> {
pub async fn new(
#[async_trait]
impl<T: CdcSourceTypeTrait> SplitEnumerator for DebeziumSplitEnumerator<T>
where
Self: ListCdcSplits<CdcSourceType = T>,
{
type Properties = CdcProperties<T>;
type Split = DebeziumCdcSplit<T>;

async fn new(
props: CdcProperties<T>,
context: crate::source::SourceEnumeratorContextRef,
context: SourceEnumeratorContextRef,
) -> ConnectorResult<Self> {
let server_addrs = props
.properties
Expand Down Expand Up @@ -112,47 +119,47 @@ impl<T: CdcSourceTypeTrait> DebeziumSplitEnumerator<T> {
_phantom: PhantomData,
})
}
}

#[async_trait]
impl<T: CdcSourceTypeTrait> SplitEnumerator for DebeziumSplitEnumerator<T>
where
Self: ListCdcSplits<CdcSourceType = T>,
{
async fn list_splits(
&mut self,
) -> crate::error::ConnectorResult<Vec<crate::source::base::SplitImpl>> {
async fn list_splits(&mut self) -> ConnectorResult<Vec<DebeziumCdcSplit<T>>> {
Ok(self.list_cdc_splits())
}
}

pub trait ListCdcSplits {
type CdcSourceType: CdcSourceTypeTrait;
fn list_cdc_splits(&mut self) -> Vec<crate::source::base::SplitImpl>;
fn list_cdc_splits(&mut self) -> Vec<DebeziumCdcSplit<Self::CdcSourceType>>;
}

impl ListCdcSplits for DebeziumSplitEnumerator<Mysql> {
type CdcSourceType = Mysql;

fn list_cdc_splits(&mut self) -> Vec<crate::source::base::SplitImpl> {
fn list_cdc_splits(&mut self) -> Vec<DebeziumCdcSplit<Self::CdcSourceType>> {
// CDC source only supports single split
vec![DebeziumCdcSplit::<Self::CdcSourceType>::new(self.source_id, None, None).into()]
vec![DebeziumCdcSplit::<Self::CdcSourceType>::new(
self.source_id,
None,
None,
)]
}
}

impl ListCdcSplits for DebeziumSplitEnumerator<Postgres> {
type CdcSourceType = Postgres;

fn list_cdc_splits(&mut self) -> Vec<crate::source::base::SplitImpl> {
fn list_cdc_splits(&mut self) -> Vec<DebeziumCdcSplit<Self::CdcSourceType>> {
// CDC source only supports single split
vec![DebeziumCdcSplit::<Self::CdcSourceType>::new(self.source_id, None, None).into()]
vec![DebeziumCdcSplit::<Self::CdcSourceType>::new(
self.source_id,
None,
None,
)]
}
}

impl ListCdcSplits for DebeziumSplitEnumerator<Citus> {
type CdcSourceType = Citus;

fn list_cdc_splits(&mut self) -> Vec<crate::source::base::SplitImpl> {
fn list_cdc_splits(&mut self) -> Vec<DebeziumCdcSplit<Self::CdcSourceType>> {
self.worker_node_addrs
.iter()
.enumerate()
Expand All @@ -162,24 +169,31 @@ impl ListCdcSplits for DebeziumSplitEnumerator<Citus> {
None,
Some(addr.to_string()),
)
.into()
})
.collect_vec()
}
}
impl ListCdcSplits for DebeziumSplitEnumerator<Mongodb> {
type CdcSourceType = Mongodb;

fn list_cdc_splits(&mut self) -> Vec<crate::source::base::SplitImpl> {
fn list_cdc_splits(&mut self) -> Vec<DebeziumCdcSplit<Self::CdcSourceType>> {
// CDC source only supports single split
vec![DebeziumCdcSplit::<Self::CdcSourceType>::new(self.source_id, None, None).into()]
vec![DebeziumCdcSplit::<Self::CdcSourceType>::new(
self.source_id,
None,
None,
)]
}
}

impl ListCdcSplits for DebeziumSplitEnumerator<SqlServer> {
type CdcSourceType = SqlServer;

fn list_cdc_splits(&mut self) -> Vec<crate::source::base::SplitImpl> {
vec![DebeziumCdcSplit::<Self::CdcSourceType>::new(self.source_id, None, None).into()]
fn list_cdc_splits(&mut self) -> Vec<DebeziumCdcSplit<Self::CdcSourceType>> {
vec![DebeziumCdcSplit::<Self::CdcSourceType>::new(
self.source_id,
None,
None,
)]
}
}
7 changes: 0 additions & 7 deletions src/connector/src/source/cdc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,6 @@ where
self.is_cdc_source_job = false;
self.is_backfill_table = true;
}

async fn create_split_enumerator(
self,
context: crate::source::SourceEnumeratorContextRef,
) -> crate::error::ConnectorResult<Self::SplitEnumerator> {
DebeziumSplitEnumerator::new(self, context).await
}
}

impl<T: CdcSourceTypeTrait> crate::source::UnknownFields for CdcProperties<T> {
Expand Down
32 changes: 14 additions & 18 deletions src/connector/src/source/datagen/enumerator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,41 +16,37 @@ use anyhow::Context;
use async_trait::async_trait;

use crate::source::datagen::{DatagenProperties, DatagenSplit};
use crate::source::SplitEnumerator;
use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};

#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub struct DatagenSplitEnumerator {
split_num: i32,
}

impl DatagenSplitEnumerator {
pub fn new(
#[async_trait]
impl SplitEnumerator for DatagenSplitEnumerator {
type Properties = DatagenProperties;
type Split = DatagenSplit;

async fn new(
properties: DatagenProperties,
_context: crate::source::SourceEnumeratorContextRef,
_context: SourceEnumeratorContextRef,
) -> crate::error::ConnectorResult<DatagenSplitEnumerator> {
let split_num = properties.split_num.unwrap_or_else(|| "1".to_owned());
let split_num = split_num
.parse::<i32>()
.context("failed to parse datagen split num")?;
Ok(Self { split_num })
}
}

#[async_trait]
impl SplitEnumerator for DatagenSplitEnumerator {
async fn list_splits(
&mut self,
) -> crate::error::ConnectorResult<Vec<crate::source::base::SplitImpl>> {
async fn list_splits(&mut self) -> crate::error::ConnectorResult<Vec<DatagenSplit>> {
let mut splits = vec![];
for i in 0..self.split_num {
splits.push(
DatagenSplit {
split_num: self.split_num,
split_index: i,
start_offset: None,
}
.into(),
);
splits.push(DatagenSplit {
split_num: self.split_num,
split_index: i,
start_offset: None,
});
}
Ok(splits)
}
Expand Down
7 changes: 0 additions & 7 deletions src/connector/src/source/datagen/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,6 @@ impl SourceProperties for DatagenProperties {
type SplitReader = DatagenSplitReader;

const SOURCE_NAME: &'static str = DATAGEN_CONNECTOR;

async fn create_split_enumerator(
self,
context: crate::source::SourceEnumeratorContextRef,
) -> crate::error::ConnectorResult<Self::SplitEnumerator> {
DatagenSplitEnumerator::new(self, context)
}
}

impl crate::source::UnknownFields for DatagenProperties {
Expand Down
31 changes: 2 additions & 29 deletions src/connector/src/source/filesystem/opendal_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

use std::collections::HashMap;

pub use opendal_enumerator::OpendalEnumerator;

pub mod azblob_source;
pub mod gcs_source;
pub mod posix_fs_source;
Expand All @@ -25,7 +27,6 @@ use with_options::WithOptions;
pub mod opendal_enumerator;
pub mod opendal_reader;

pub use self::opendal_enumerator::OpendalEnumerator;
use self::opendal_reader::OpendalReader;
use super::file_common::CompressionFormat;
pub use super::s3::S3PropertiesCommon;
Expand Down Expand Up @@ -86,13 +87,6 @@ impl SourceProperties for GcsProperties {
type SplitReader = OpendalReader<OpendalGcs>;

const SOURCE_NAME: &'static str = GCS_CONNECTOR;

async fn create_split_enumerator(
self,
context: crate::source::SourceEnumeratorContextRef,
) -> crate::error::ConnectorResult<Self::SplitEnumerator> {
OpendalEnumerator::new(self, context)
}
}

pub trait OpendalSource: Send + Sync + 'static + Clone + PartialEq {
Expand Down Expand Up @@ -162,13 +156,6 @@ impl SourceProperties for OpendalS3Properties {
type SplitReader = OpendalReader<OpendalS3>;

const SOURCE_NAME: &'static str = OPENDAL_S3_CONNECTOR;

async fn create_split_enumerator(
self,
context: crate::source::SourceEnumeratorContextRef,
) -> crate::error::ConnectorResult<Self::SplitEnumerator> {
OpendalEnumerator::new(self, context)
}
}

#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
Expand Down Expand Up @@ -202,13 +189,6 @@ impl SourceProperties for PosixFsProperties {
type SplitReader = OpendalReader<OpendalPosixFs>;

const SOURCE_NAME: &'static str = POSIX_FS_CONNECTOR;

async fn create_split_enumerator(
self,
context: crate::source::SourceEnumeratorContextRef,
) -> crate::error::ConnectorResult<Self::SplitEnumerator> {
OpendalEnumerator::new(self, context)
}
}

#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
Expand Down Expand Up @@ -248,13 +228,6 @@ impl SourceProperties for AzblobProperties {
type SplitReader = OpendalReader<OpendalAzblob>;

const SOURCE_NAME: &'static str = AZBLOB_CONNECTOR;

async fn create_split_enumerator(
self,
context: crate::source::SourceEnumeratorContextRef,
) -> crate::error::ConnectorResult<Self::SplitEnumerator> {
OpendalEnumerator::new(self, context)
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down
Loading

0 comments on commit 3d1a47b

Please sign in to comment.