Skip to content

Commit

Permalink
new trait
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Jan 13, 2025
1 parent 8a6de63 commit 4b50846
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 10 deletions.
4 changes: 2 additions & 2 deletions src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,8 @@ macro_rules! impl_connector_properties {

$crate::paste! {
impl ConnectorProperties {
pub async fn create_split_enumerator(self, context: $crate::source::base::SourceEnumeratorContextRef) -> $crate::error::ConnectorResult<Box<dyn SplitEnumerator>> {
let enumerator: Box<dyn SplitEnumerator> = match self {
pub async fn create_split_enumerator(self, context: $crate::source::base::SourceEnumeratorContextRef) -> $crate::error::ConnectorResult<Box<dyn AnySplitEnumerator>> {
let enumerator: Box<dyn AnySplitEnumerator> = match self {
$(
ConnectorProperties::$variant_name(prop) =>
Box::new(prop.create_split_enumerator(context).await?),
Expand Down
27 changes: 24 additions & 3 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub trait SourceProperties: TryFromBTreeMap + Clone + WithOptions + std::fmt::De
type Split: SplitMetaData
+ TryFrom<SplitImpl, Error = crate::error::ConnectorError>
+ Into<SplitImpl>;
type SplitEnumerator;
type SplitEnumerator: SplitEnumerator<Properties = Self, Split = Self::Split>;;
type SplitReader: SplitReader<Split = Self::Split, Properties = Self>;

/// Load additional info from `PbSource`. Currently only used by CDC.
Expand Down Expand Up @@ -190,13 +190,34 @@ pub async fn create_split_readers<P: SourceProperties>(
/// [`SplitEnumerator`] fetches the split metadata from the external source service.
/// NOTE: It runs in the meta server, so probably it should be moved to the `meta` crate.
#[async_trait]
pub trait SplitEnumerator: Send {
async fn list_splits(&mut self) -> Result<Vec<SplitImpl>>;
pub trait SplitEnumerator: Sized {
type Split: SplitMetaData + Into<SplitImpl> + Send;
type Properties;

async fn new(properties: Self::Properties, context: SourceEnumeratorContextRef)
-> Result<Self>;
async fn list_splits(&mut self) -> Result<Vec<Self::Split>>;
}

pub type SourceContextRef = Arc<SourceContext>;
pub type SourceEnumeratorContextRef = Arc<SourceEnumeratorContext>;

/// Dyn-compatible [`SplitEnumerator`].
#[async_trait]
pub trait AnySplitEnumerator: Send {
async fn list_splits(&mut self) -> Result<Vec<SplitImpl>>;
}

#[async_trait]
impl<T: SplitEnumerator> AnySplitEnumerator for T {
async fn list_splits(&mut self) -> Result<Vec<SplitImpl>> {
(self as &mut dyn SplitEnumerator)
.list_splits()
.await
.map(|s| s.into_iter().map(|s| s.into()).collect())
}
}

/// The max size of a chunk yielded by source stream.
pub const MAX_CHUNK_SIZE: usize = 1024;

Expand Down
6 changes: 4 additions & 2 deletions src/meta/service/src/cloud_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ use std::sync::LazyLock;
use async_trait::async_trait;
use regex::Regex;
use risingwave_connector::error::ConnectorResult;
use risingwave_connector::source::{ConnectorProperties, SourceEnumeratorContext, SplitEnumerator};
use risingwave_connector::source::{
AnySplitEnumerator, ConnectorProperties, SourceEnumeratorContext, SplitEnumerator,
};
use risingwave_connector::WithOptionsSecResolved;
use risingwave_pb::cloud_service::cloud_service_server::CloudService;
use risingwave_pb::cloud_service::rw_cloud_validate_source_response::{Error, ErrorType};
Expand Down Expand Up @@ -78,7 +80,7 @@ impl CloudService for CloudServiceImpl {

async fn new_enumerator(
props: ConnectorProperties,
) -> ConnectorResult<Box<dyn SplitEnumerator>> {
) -> ConnectorResult<Box<dyn AnySplitEnumerator>> {
props
.create_split_enumerator(SourceEnumeratorContext::dummy().into())
.await
Expand Down
6 changes: 4 additions & 2 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ use risingwave_common::util::stream_graph_visitor::{
use risingwave_common::{bail, bail_not_implemented, hash, must_match};
use risingwave_connector::connector_common::validate_connection;
use risingwave_connector::error::ConnectorError;
use risingwave_connector::source::{ConnectorProperties, SourceEnumeratorContext, SplitEnumerator};
use risingwave_connector::source::{
AnySplitEnumerator, ConnectorProperties, SourceEnumeratorContext, SplitEnumerator,
};
use risingwave_connector::WithOptionsSecResolved;
use risingwave_meta_model::object::ObjectType;
use risingwave_meta_model::{
Expand Down Expand Up @@ -651,7 +653,7 @@ impl DdlController {

async fn new_enumerator_for_validate(
source_props: ConnectorProperties,
) -> Result<Box<dyn SplitEnumerator>, ConnectorError> {
) -> Result<Box<dyn AnySplitEnumerator>, ConnectorError> {
source_props
.create_split_enumerator(SourceEnumeratorContext::dummy().into())
.await
Expand Down
4 changes: 3 additions & 1 deletion src/meta/src/stream/source_manager/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_connector::source::AnySplitEnumerator;

use super::*;

const MAX_FAIL_CNT: u32 = 10;
Expand All @@ -30,7 +32,7 @@ pub struct ConnectorSourceWorker {
source_name: String,
current_splits: SharedSplitMapRef,
// XXX: box or arc?
enumerator: Box<dyn SplitEnumerator>,
enumerator: Box<dyn AnySplitEnumerator>,
period: Duration,
metrics: Arc<MetaMetrics>,
connector_properties: ConnectorProperties,
Expand Down

0 comments on commit 4b50846

Please sign in to comment.