diff --git a/bitsail-base/src/main/java/com/bytedance/bitsail/base/connector/reader/v1/SourceReader.java b/bitsail-base/src/main/java/com/bytedance/bitsail/base/connector/reader/v1/SourceReader.java index 1962b038e..f16ecda65 100644 --- a/bitsail-base/src/main/java/com/bytedance/bitsail/base/connector/reader/v1/SourceReader.java +++ b/bitsail-base/src/main/java/com/bytedance/bitsail/base/connector/reader/v1/SourceReader.java @@ -23,7 +23,7 @@ public interface SourceReader extends Serializable, AutoCloseable { - void start(); + void start() throws Exception; void pollNext(SourcePipeline pipeline) throws Exception; diff --git a/bitsail-common/src/main/java/com/bytedance/bitsail/common/option/ReaderOptions.java b/bitsail-common/src/main/java/com/bytedance/bitsail/common/option/ReaderOptions.java index f6d6d6306..62af9a729 100644 --- a/bitsail-common/src/main/java/com/bytedance/bitsail/common/option/ReaderOptions.java +++ b/bitsail-common/src/main/java/com/bytedance/bitsail/common/option/ReaderOptions.java @@ -88,6 +88,10 @@ interface BaseReaderOptions { key(READER_PREFIX + "db_name") .noDefaultValue(String.class); + ConfigOption CONNECTION_TIMEZONE = + key(READER_PREFIX + "connection_timezone") + .defaultValue("UTC"); + ConfigOption TABLE_NAME = key(READER_PREFIX + "table_name") .noDefaultValue(String.class); diff --git a/bitsail-connectors/connector-cdc/connector-cdc-base/pom.xml b/bitsail-connectors/connector-cdc/connector-cdc-base/pom.xml index 13877c03a..5bf4fff33 100644 --- a/bitsail-connectors/connector-cdc/connector-cdc-base/pom.xml +++ b/bitsail-connectors/connector-cdc/connector-cdc-base/pom.xml @@ -33,7 +33,6 @@ - \ No newline at end of file diff --git a/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/reader/BinlogSourceReader.java b/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/reader/BinlogSourceReader.java index 6a7a6c0f0..76bb53913 100644 --- a/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/reader/BinlogSourceReader.java +++ b/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/reader/BinlogSourceReader.java @@ -27,6 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.ArrayDeque; import java.util.List; import java.util.Queue; @@ -56,7 +57,7 @@ public BinlogSourceReader(BitSailConfiguration jobConf, SourceReader.Context rea public abstract BinlogSplitReader getReader(); @Override - public void start() { + public void start() throws Exception{ //start debezium streaming reader and send data to queue } @@ -115,7 +116,7 @@ public void close() { } } - private void submitSplit() { + private void submitSplit() throws IOException, InterruptedException { if (!remainSplits.isEmpty()) { BinlogSplit curSplit = remainSplits.poll(); LOG.info("submit split to binlog reader: {}, size of the remaining splits: {}", curSplit.toString(), remainSplits.size()); diff --git a/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/reader/BinlogSplitReader.java b/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/reader/BinlogSplitReader.java index 0eb7192da..55f70dd25 100644 --- a/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/reader/BinlogSplitReader.java +++ b/bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/reader/BinlogSplitReader.java @@ -18,11 +18,12 @@ import com.bytedance.bitsail.connector.cdc.source.split.BinlogSplit; +import java.io.IOException; import java.io.Serializable; import java.util.Map; public interface BinlogSplitReader extends Serializable { - void readSplit(BinlogSplit split); + void readSplit(BinlogSplit split) throws IOException, InterruptedException; Map getOffset(); diff --git a/bitsail-connectors/connector-cdc/connector-cdc-jdbc-base/pom.xml b/bitsail-connectors/connector-cdc/connector-cdc-jdbc-base/pom.xml new file mode 100644 index 000000000..38d6651f7 --- /dev/null +++ b/bitsail-connectors/connector-cdc/connector-cdc-jdbc-base/pom.xml @@ -0,0 +1,64 @@ + + + + + com.bytedance.bitsail + connector-cdc + ${revision} + + 4.0.0 + + connector-cdc-jdbc-base + + + 8 + 8 + 1.6.4.Final + + + + com.bytedance.bitsail + connector-cdc-base + ${revision} + + + io.debezium + debezium-embedded + ${debezium.version} + + + jakarta.activation + jakarta.activation-api + + + org.apache.kafka + kafka-clients + + + + + io.debezium + debezium-core + ${debezium.version} + test-jar + test + + + + \ No newline at end of file diff --git a/bitsail-connectors/connector-cdc/connector-cdc-jdbc-base/src/main/java/com/bytedance/bitsail/connector/cdc/jdbc/source/config/AbstractJdbcDebeziumConfig.java b/bitsail-connectors/connector-cdc/connector-cdc-jdbc-base/src/main/java/com/bytedance/bitsail/connector/cdc/jdbc/source/config/AbstractJdbcDebeziumConfig.java new file mode 100644 index 000000000..6704983ce --- /dev/null +++ b/bitsail-connectors/connector-cdc/connector-cdc-jdbc-base/src/main/java/com/bytedance/bitsail/connector/cdc/jdbc/source/config/AbstractJdbcDebeziumConfig.java @@ -0,0 +1,91 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.bytedance.bitsail.connector.cdc.jdbc.source.config; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.cdc.error.BinlogReaderErrorCode; +import com.bytedance.bitsail.connector.cdc.jdbc.source.constant.DebeziumConstant; +import com.bytedance.bitsail.connector.cdc.model.ClusterInfo; +import com.bytedance.bitsail.connector.cdc.model.ConnectionInfo; +import com.bytedance.bitsail.connector.cdc.option.BinlogReaderOptions; + +import io.debezium.config.Configuration; +import io.debezium.relational.RelationalDatabaseConnectorConfig; +import lombok.Getter; +import org.apache.commons.lang.StringUtils; + +import java.time.ZoneId; +import java.util.List; +import java.util.Properties; + +@Getter +public abstract class AbstractJdbcDebeziumConfig { + + private static final long serialVersionUID = 1L; + + public static final String DEBEZIUM_PREFIX = "job.reader.debezium."; + + private final String hostname; + private final int port; + private final String username; + private final String password; + + // debezium configuration + private final Properties dbzProperties; + private final Configuration dbzConfiguration; + private final RelationalDatabaseConnectorConfig dbzJdbcConnectorConfig; + private String dbName; + + public AbstractJdbcDebeziumConfig(BitSailConfiguration jobConf) { + List clusterInfo = jobConf.getNecessaryOption(BinlogReaderOptions.CONNECTIONS, BinlogReaderErrorCode.REQUIRED_VALUE); + //Only support one DB + assert (clusterInfo.size() == 1); + ConnectionInfo connectionInfo = clusterInfo.get(0).getMaster(); + assert (connectionInfo != null); + this.dbzProperties = extractProps(jobConf); + this.hostname = connectionInfo.getHost(); + this.port = connectionInfo.getPort(); + this.username = jobConf.getNecessaryOption(BinlogReaderOptions.USER_NAME, BinlogReaderErrorCode.REQUIRED_VALUE); + this.password = jobConf.getNecessaryOption(BinlogReaderOptions.PASSWORD, BinlogReaderErrorCode.REQUIRED_VALUE); + this.dbName = jobConf.getNecessaryOption(BinlogReaderOptions.DB_NAME, BinlogReaderErrorCode.REQUIRED_VALUE); + String timezone = jobConf.get(BinlogReaderOptions.CONNECTION_TIMEZONE); + fillConnectionInfo(jobConf, this.dbzProperties, connectionInfo, timezone); + + this.dbzConfiguration = Configuration.from(this.dbzProperties); + this.dbzJdbcConnectorConfig = getJdbcConnectorConfig(this.dbzConfiguration); + } + + public static Properties extractProps(BitSailConfiguration jobConf) { + Properties props = new Properties(); + jobConf.getKeys().stream() + .filter(s -> s.startsWith(DEBEZIUM_PREFIX)) + .map(s -> StringUtils.substringAfter(s, DEBEZIUM_PREFIX)) + .forEach(s -> props.setProperty(s, jobConf.getString(DEBEZIUM_PREFIX + s))); + return props; + } + + public abstract RelationalDatabaseConnectorConfig getJdbcConnectorConfig(Configuration config); + + public void fillConnectionInfo(BitSailConfiguration jobConf, Properties props, ConnectionInfo connectionInfo, String timezone) { + props.put(DebeziumConstant.DATABASE_HOSTNAME, connectionInfo.getHost()); + props.put(DebeziumConstant.DATABASE_PORT, String.valueOf(connectionInfo.getPort())); + props.put(DebeziumConstant.DATABASE_USER, username); + props.put(DebeziumConstant.DATABASE_PASSWORD, password); + props.put(DebeziumConstant.DATABASE_NAME, dbName); + props.put(DebeziumConstant.DATABASE_TIMEZONE, ZoneId.of(timezone).toString()); + } + +} diff --git a/bitsail-connectors/connector-cdc/connector-cdc-jdbc-base/src/main/java/com/bytedance/bitsail/connector/cdc/jdbc/source/constant/DebeziumConstant.java b/bitsail-connectors/connector-cdc/connector-cdc-jdbc-base/src/main/java/com/bytedance/bitsail/connector/cdc/jdbc/source/constant/DebeziumConstant.java new file mode 100644 index 000000000..3b08aa2fd --- /dev/null +++ b/bitsail-connectors/connector-cdc/connector-cdc-jdbc-base/src/main/java/com/bytedance/bitsail/connector/cdc/jdbc/source/constant/DebeziumConstant.java @@ -0,0 +1,25 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.bytedance.bitsail.connector.cdc.jdbc.source.constant; + +public class DebeziumConstant { + public static final String DATABASE_HOSTNAME = "database.hostname"; + public static final String DATABASE_PORT = "database.port"; + public static final String DATABASE_USER = "database.user"; + public static final String DATABASE_PASSWORD = "database.password"; + public static final String DATABASE_NAME = "database.dbname"; + public static final String DATABASE_TIMEZONE = "database.serverTimezone"; +} diff --git a/bitsail-connectors/connector-cdc/connector-cdc-jdbc-base/src/main/java/com/bytedance/bitsail/connector/cdc/jdbc/source/reader/AbstractJdbcChangeEventSplitReader.java b/bitsail-connectors/connector-cdc/connector-cdc-jdbc-base/src/main/java/com/bytedance/bitsail/connector/cdc/jdbc/source/reader/AbstractJdbcChangeEventSplitReader.java new file mode 100644 index 000000000..f0e9280d7 --- /dev/null +++ b/bitsail-connectors/connector-cdc/connector-cdc-jdbc-base/src/main/java/com/bytedance/bitsail/connector/cdc/jdbc/source/reader/AbstractJdbcChangeEventSplitReader.java @@ -0,0 +1,185 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.bytedance.bitsail.connector.cdc.jdbc.source.reader; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.cdc.jdbc.source.config.AbstractJdbcDebeziumConfig; +import com.bytedance.bitsail.connector.cdc.jdbc.source.streaming.AbstractSplitChangeEventStreamingTaskContext; +import com.bytedance.bitsail.connector.cdc.jdbc.source.streaming.SplitChangeEventStreamingTaskController; +import com.bytedance.bitsail.connector.cdc.source.reader.BinlogSplitReader; +import com.bytedance.bitsail.connector.cdc.source.split.BinlogSplit; +import com.bytedance.bitsail.common.row.Row; +import io.debezium.connector.base.ChangeEventQueue; +import io.debezium.pipeline.DataChangeEvent; +import io.debezium.relational.RelationalDatabaseConnectorConfig; +import lombok.Getter; +import org.apache.kafka.connect.source.SourceRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +@Getter +public abstract class AbstractJdbcChangeEventSplitReader implements BinlogSplitReader { + private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcChangeEventSplitReader.class); + + protected final AbstractJdbcDebeziumConfig jdbcDebeziumConfig; + + protected ChangeEventQueue queue; + + protected RelationalDatabaseConnectorConfig connectorConfig; + + protected List batch; + + protected Iterator recordIterator; + + protected Map offset; + + protected SplitChangeEventStreamingTaskController splitChangeEventStreamingTaskController; + + protected AbstractSplitChangeEventStreamingTaskContext splitChangeEventStreamingTaskContext; + + private final int subtaskId; + + public AbstractJdbcChangeEventSplitReader(BitSailConfiguration jobConf, int subtaskId) { + jdbcDebeziumConfig = getJdbcDebeziumConfig(jobConf); + connectorConfig = jdbcDebeziumConfig.getDbzJdbcConnectorConfig(); + this.subtaskId = subtaskId; + this.offset = new HashMap<>(); + } + + public abstract AbstractJdbcDebeziumConfig getJdbcDebeziumConfig(BitSailConfiguration jobConf); + + public abstract AbstractSplitChangeEventStreamingTaskContext getSplitReaderTaskContext(BinlogSplit split, RelationalDatabaseConnectorConfig connectorConfig); + + public void inititialzeSplitReader(BinlogSplit split) { + splitChangeEventStreamingTaskContext = getSplitReaderTaskContext(split, connectorConfig); + this.offset = new HashMap<>(); + this.queue = new ChangeEventQueue.Builder() + .pollInterval(connectorConfig.getPollInterval()) + .maxBatchSize(connectorConfig.getMaxBatchSize()) + .maxQueueSize(connectorConfig.getMaxQueueSize()) + .maxQueueSizeInBytes(connectorConfig.getMaxQueueSizeInBytes()) + .loggingContextSupplier(() -> splitChangeEventStreamingTaskContext.getDbzTaskContext() + .configureLoggingContext(splitChangeEventStreamingTaskContext.threadNamePrefix())) + .buffering() + .build(); + this.batch = new ArrayList<>(); + this.recordIterator = this.batch.iterator(); + splitChangeEventStreamingTaskContext.attachStreamingToQueue(this.queue); + splitChangeEventStreamingTaskController = new SplitChangeEventStreamingTaskController(splitChangeEventStreamingTaskContext, this.subtaskId); + } + + /** + * Try to start streaming task to drain change event into target queue + * @param split + * @throws IOException + * @throws InterruptedException + */ + @Override + public void readSplit(BinlogSplit split) throws IOException, InterruptedException { + inititialzeSplitReader(split); + splitChangeEventStreamingTaskContext.testConnectionAndValidBinlogConfiguration(); + splitChangeEventStreamingTaskController.launchSplitReaderTask(); + } + + /** + * get the binlog offset being processed + * @return + */ + @Override + public Map getOffset() { + Map offsetToStore = new HashMap<>(); + this.offset.forEach((k, v) -> offsetToStore.put(k, v.toString())); + return offsetToStore; + } + + /** + * close task and resources + */ + @Override + public void close() { + try { + splitChangeEventStreamingTaskController.closeTask(); + } catch (Exception e) { + LOG.error("Failed to close change event streaming task: {}", e.getMessage()); + } + + try { + splitChangeEventStreamingTaskContext.closeContextResources(); + } catch (Exception e) { + LOG.error("Failed to close resources of streaming task context: {}", e.getMessage()); + } + } + + @Override + public Row poll() { + SourceRecord record = this.recordIterator.next(); + this.offset = record.sourceOffset(); + LOG.info("OFFSET:" + record.sourceOffset()); + LOG.info("poll one record {}", record.value()); + // TODO: Build BitSail row and return + return null; + } + + /** + * To judge whether current split has next record + * @return + * @throws Exception + */ + @Override + public boolean hasNext() { + if (this.recordIterator.hasNext()) { + return true; + } else { + return pollNextBatch(); + } + } + + @Override + public boolean isCompleted() { + return !splitChangeEventStreamingTaskController.isRunning(); + } + + private boolean pollNextBatch() { + if (splitChangeEventStreamingTaskController.isRunning()) { + try { + List dbzRecords = queue.poll(); + while (dbzRecords.isEmpty()) { + //sleep 10s + LOG.info("No record found, sleep for 5s in reader"); + TimeUnit.SECONDS.sleep(5); + dbzRecords = queue.poll(); + } + this.batch = new ArrayList<>(); + for (DataChangeEvent event : dbzRecords) { + this.batch.add(event.getRecord()); + } + this.recordIterator = this.batch.iterator(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return true; + } + return false; + } +} diff --git a/bitsail-connectors/connector-cdc/connector-cdc-jdbc-base/src/main/java/com/bytedance/bitsail/connector/cdc/jdbc/source/streaming/AbstractSplitChangeEventStreamingTaskContext.java b/bitsail-connectors/connector-cdc/connector-cdc-jdbc-base/src/main/java/com/bytedance/bitsail/connector/cdc/jdbc/source/streaming/AbstractSplitChangeEventStreamingTaskContext.java new file mode 100644 index 000000000..0fa69b71d --- /dev/null +++ b/bitsail-connectors/connector-cdc/connector-cdc-jdbc-base/src/main/java/com/bytedance/bitsail/connector/cdc/jdbc/source/streaming/AbstractSplitChangeEventStreamingTaskContext.java @@ -0,0 +1,206 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.bytedance.bitsail.connector.cdc.jdbc.source.streaming; + +import com.bytedance.bitsail.connector.cdc.source.split.BinlogSplit; +import io.debezium.config.Configuration; +import io.debezium.connector.base.ChangeEventQueue; +import io.debezium.connector.common.CdcSourceTaskContext; +import io.debezium.jdbc.JdbcConnection; +import io.debezium.jdbc.JdbcValueConverters; +import io.debezium.pipeline.DataChangeEvent; +import io.debezium.pipeline.ErrorHandler; +import io.debezium.pipeline.EventDispatcher; +import io.debezium.pipeline.source.spi.EventMetadataProvider; +import io.debezium.pipeline.source.spi.StreamingChangeEventSource; +import io.debezium.pipeline.spi.OffsetContext; +import io.debezium.relational.RelationalDatabaseConnectorConfig; +import io.debezium.relational.RelationalDatabaseSchema; +import io.debezium.relational.TableId; +import io.debezium.schema.TopicSelector; +import io.debezium.util.SchemaNameAdjuster; +import lombok.Getter; +import org.apache.kafka.connect.source.SourceTask; + +import java.sql.SQLException; + +@Getter +public abstract class AbstractSplitChangeEventStreamingTaskContext { + + /** + * split info + */ + public BinlogSplit split; + + /** + * OffsetContext includes context and operations on offset + */ + public OffsetContext offsetContext; + + /** + * EventDispatcher is responsible to recognize event and emit to target ChangeEventQueue + */ + public EventDispatcher eventDispatcher; + + /** + * StreamingChangeEventSource is responsible to handle different change event, and deliver to EventDispatcher + */ + public StreamingChangeEventSource streamingChangeEventSource; + + /** + * Implementations return names for Kafka topics (data and meta-data). + */ + public TopicSelector topicSelector; + + /** + * Adjuster to convert names of databases schema (e.g. fieldName) to valid Avro name + */ + public SchemaNameAdjuster schemaNameAdjuster; + + /** + * ErrorHandler is used when StreamingChangeEventSource has problems when read change event from database + */ + public volatile ErrorHandler errorHandler; + + /** + * RelationalDatabaseSchema is schema define of database + */ + public volatile RelationalDatabaseSchema schema; + + /** + * JdbcValueConverters is used to convert different type of values + */ + public JdbcValueConverters valueConverters; + + /** + * EventMetadataProvider is used to get metadata (e.g. tableName, tableId) from change event + */ + public EventMetadataProvider metadataProvider; + + /** + * Custom debezium configuration to start debezium task + */ + public Configuration dbzConfiguration; + + /** + * Contains contextual information and objects scoped to the lifecycle of Debezium's {@link SourceTask} implementations + */ + public volatile CdcSourceTaskContext dbzTaskContext; + + /** + * all related config of database + */ + public RelationalDatabaseConnectorConfig connectorConfig; + + /** + * Message queue to receive Change Event from event dispatcher + */ + public ChangeEventQueue queue; + + /** + * jdbc connection instance + */ + public JdbcConnection jdbcConnection; + + public AbstractSplitChangeEventStreamingTaskContext(BinlogSplit split, RelationalDatabaseConnectorConfig connectorConfig) { + this.split = split; + this.connectorConfig = connectorConfig; + this.jdbcConnection = tryEstablishedConnection(connectorConfig); + this.offsetContext = buildOffsetContext(); + this.topicSelector = buildTopicSelector(); + this.valueConverters = buildValueConverters(connectorConfig); + this.schema = buildRelationalDatabaseSchema(connectorConfig); + this.errorHandler = buildErrorHandler(connectorConfig, queue); + this.schemaNameAdjuster = SchemaNameAdjuster.create(); + this.dbzTaskContext = buildCdcSourceTaskContext(connectorConfig, schema); + this.metadataProvider = buildEventMetadataProvider(); + } + + protected abstract ErrorHandler buildErrorHandler(RelationalDatabaseConnectorConfig connectorConfig, ChangeEventQueue queue); + + public void attachStreamingToQueue(ChangeEventQueue queue) { + this.eventDispatcher = new EventDispatcher<>( + connectorConfig, + topicSelector, + schema, + queue, + connectorConfig.getTableFilters().dataCollectionFilter(), + DataChangeEvent::new, + metadataProvider, + schemaNameAdjuster); + + this.streamingChangeEventSource = buildStreamingChangeEventSource(); + } + + public void closeContextResources() throws SQLException { + this.closeStreamingChangeEventSource(); + if (this.jdbcConnection.isConnected()) { + this.jdbcConnection.close(); + } + } + + public abstract void testConnectionAndValidBinlogConfiguration(); + + /** + * established a connection with jdbc database using the given config + * @param connectorConfig + * @return JdbcConnection + */ + public abstract JdbcConnection tryEstablishedConnection(RelationalDatabaseConnectorConfig connectorConfig); + + /** + * Give the thread name prefix of streaming task + * @return + */ + public abstract String threadNamePrefix(); + + /** + * tru to build offset context to implement all operation on offset + * @return OffsetContext + */ + public abstract OffsetContext buildOffsetContext(); + + /** + * Implementations return names for Kafka topics (data and meta-data). + * @return TopicSelector + */ + public abstract TopicSelector buildTopicSelector(); + + /** + * StreamingChangeEventSource is used to reader change event (e.g mysql binlog) from database, and emit by EventDispatcher + * @return + */ + public abstract StreamingChangeEventSource buildStreamingChangeEventSource(); + + + /** + * close change evennt streaming task + * @return + */ + public abstract void closeStreamingChangeEventSource(); + + /** + * build RelationalDatabaseSchema base on config + * @return RelationalDatabaseSchema + */ + public abstract RelationalDatabaseSchema buildRelationalDatabaseSchema(RelationalDatabaseConnectorConfig connectorConfig); + + public abstract CdcSourceTaskContext buildCdcSourceTaskContext(RelationalDatabaseConnectorConfig connectorConfig, RelationalDatabaseSchema schema); + + public abstract EventMetadataProvider buildEventMetadataProvider(); + + public abstract JdbcValueConverters buildValueConverters(RelationalDatabaseConnectorConfig connectorConfig); +} diff --git a/bitsail-connectors/connector-cdc/connector-cdc-jdbc-base/src/main/java/com/bytedance/bitsail/connector/cdc/jdbc/source/streaming/SplitChangeEventStreamingTaskController.java b/bitsail-connectors/connector-cdc/connector-cdc-jdbc-base/src/main/java/com/bytedance/bitsail/connector/cdc/jdbc/source/streaming/SplitChangeEventStreamingTaskController.java new file mode 100644 index 000000000..7fdc2b2ea --- /dev/null +++ b/bitsail-connectors/connector-cdc/connector-cdc-jdbc-base/src/main/java/com/bytedance/bitsail/connector/cdc/jdbc/source/streaming/SplitChangeEventStreamingTaskController.java @@ -0,0 +1,76 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.bytedance.bitsail.connector.cdc.jdbc.source.streaming; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.debezium.pipeline.source.spi.ChangeEventSource; +import io.debezium.pipeline.source.spi.StreamingChangeEventSource; +import lombok.Getter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; + +@Getter +public class SplitChangeEventStreamingTaskController { + public AbstractSplitChangeEventStreamingTaskContext splitReaderTaskContext; + private ExecutorService executorService; + public int subTaskId; + private boolean isRunning; + private static final Logger LOG = LoggerFactory.getLogger(SplitChangeEventStreamingTaskController.class); + + public SplitChangeEventStreamingTaskController(AbstractSplitChangeEventStreamingTaskContext splitReaderTaskContext, int subTaskId) { + this.splitReaderTaskContext = splitReaderTaskContext; + this.subTaskId = subTaskId; + ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(splitReaderTaskContext.threadNamePrefix() + this.subTaskId).build(); + this.executorService = Executors.newSingleThreadExecutor(threadFactory); + } + + public void launchSplitReaderTask () throws InterruptedException { + StreamingChangeEventSource dbzSource = splitReaderTaskContext.getStreamingChangeEventSource(); + dbzSource.init(); + this.isRunning = true; + this.executorService.submit(() -> { + try { + dbzSource.execute( + new BinlogChangeEventSourceContext(), + splitReaderTaskContext.getOffsetContext()); + } catch (Exception e) { + this.isRunning = false; + LOG.error("Execute debezium binlog reader failed", e); + }}); + } + + /** + * close execution service + */ + public void closeTask() { + if (executorService != null) { + executorService.shutdown(); + } + isRunning = false; + } + + private class BinlogChangeEventSourceContext + implements ChangeEventSource.ChangeEventSourceContext { + @Override + public boolean isRunning() { + return isRunning; + } + } +} diff --git a/bitsail-connectors/connector-cdc/connector-cdc-mysql/pom.xml b/bitsail-connectors/connector-cdc/connector-cdc-mysql/pom.xml index d5a6ce717..c75b48b2d 100644 --- a/bitsail-connectors/connector-cdc/connector-cdc-mysql/pom.xml +++ b/bitsail-connectors/connector-cdc/connector-cdc-mysql/pom.xml @@ -43,25 +43,15 @@ - io.debezium - debezium-connector-mysql - ${debezium.version} + com.bytedance.bitsail + connector-cdc-jdbc-base + ${revision} io.debezium - debezium-embedded + debezium-connector-mysql ${debezium.version} - - - jakarta.activation - jakarta.activation-api - - - org.apache.kafka - kafka-clients - - @@ -91,14 +81,6 @@ ${revision} test - - - io.debezium - debezium-core - ${debezium.version} - test-jar - test - \ No newline at end of file diff --git a/bitsail-connectors/connector-cdc/connector-cdc-postgres/pom.xml b/bitsail-connectors/connector-cdc/connector-cdc-postgres/pom.xml new file mode 100644 index 000000000..6ab533ef9 --- /dev/null +++ b/bitsail-connectors/connector-cdc/connector-cdc-postgres/pom.xml @@ -0,0 +1,82 @@ + + + + + connector-cdc + com.bytedance.bitsail + ${revision} + + 4.0.0 + + connector-cdc-postgres + + + 8 + 8 + UTF-8 + 1.6.4.Final + 3.0.8 + + + + com.bytedance.bitsail + connector-cdc-base + ${revision} + + + com.bytedance.bitsail + connector-cdc-jdbc-base + ${revision} + + + io.debezium + debezium-connector-postgres + ${debezium.version} + + + + jakarta.activation + jakarta.activation-api + 1.2.2 + + + + + com.bytedance.bitsail + bitsail-connector-test + ${revision} + test + + + + com.bytedance.bitsail + bitsail-connector-print + ${revision} + test + + + + org.testcontainers + postgresql + 1.12.1 + test + + + + \ No newline at end of file diff --git a/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/main/java/com/bytedance/bitsail/connector/cdc/postgres/source/PostgresChangeEventSource.java b/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/main/java/com/bytedance/bitsail/connector/cdc/postgres/source/PostgresChangeEventSource.java new file mode 100644 index 000000000..a380c26f2 --- /dev/null +++ b/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/main/java/com/bytedance/bitsail/connector/cdc/postgres/source/PostgresChangeEventSource.java @@ -0,0 +1,34 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.bytedance.bitsail.connector.cdc.postgres.source; + +import com.bytedance.bitsail.base.connector.reader.v1.SourceReader; +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.connector.cdc.postgres.source.reader.PostgresChangeEventReader; +import com.bytedance.bitsail.connector.cdc.source.BinlogSource; +import com.bytedance.bitsail.connector.cdc.source.split.BinlogSplit; + +public class PostgresChangeEventSource extends BinlogSource { + @Override + public String getReaderName() { + return "postgres-cdc"; + } + + @Override + public SourceReader createReader(SourceReader.Context readerContext) { + return new PostgresChangeEventReader(jobConf, readerContext); + } +} diff --git a/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/main/java/com/bytedance/bitsail/connector/cdc/postgres/source/config/PostgresConfig.java b/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/main/java/com/bytedance/bitsail/connector/cdc/postgres/source/config/PostgresConfig.java new file mode 100644 index 000000000..7297953c5 --- /dev/null +++ b/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/main/java/com/bytedance/bitsail/connector/cdc/postgres/source/config/PostgresConfig.java @@ -0,0 +1,50 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.bytedance.bitsail.connector.cdc.postgres.source.config; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.cdc.jdbc.source.config.AbstractJdbcDebeziumConfig; +import com.bytedance.bitsail.connector.cdc.model.ConnectionInfo; +import com.bytedance.bitsail.connector.cdc.postgres.source.option.PostgresChangeEventOptions; +import io.debezium.config.Configuration; +import io.debezium.connector.postgresql.PostgresConnectorConfig; +import io.debezium.relational.RelationalDatabaseConnectorConfig; + +import java.util.Properties; + +public class PostgresConfig extends AbstractJdbcDebeziumConfig { + private String pluginName; + private String slotName; + private static final String PLUGIN_NAME_KEY = "plugin.name"; + private static final String SLOT_NAME_KEY = "slot.name"; + + public PostgresConfig(BitSailConfiguration jobConf) { + super(jobConf); + } + + @Override + public RelationalDatabaseConnectorConfig getJdbcConnectorConfig(Configuration config) { + return new PostgresConnectorConfig(config); + } + + public void fillConnectionInfo(BitSailConfiguration jobConf, Properties props, ConnectionInfo connectionInfo, String timezone) { + super.fillConnectionInfo(jobConf, props, connectionInfo, timezone); + this.pluginName = jobConf.get(PostgresChangeEventOptions.PLUGIN_NAME); + this.slotName = jobConf.get(PostgresChangeEventOptions.SLOT_NAME); + props.put(PLUGIN_NAME_KEY, pluginName); + props.put(SLOT_NAME_KEY, slotName); + } +} diff --git a/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/main/java/com/bytedance/bitsail/connector/cdc/postgres/source/constant/PostgresConstant.java b/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/main/java/com/bytedance/bitsail/connector/cdc/postgres/source/constant/PostgresConstant.java new file mode 100644 index 000000000..4b2c3a643 --- /dev/null +++ b/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/main/java/com/bytedance/bitsail/connector/cdc/postgres/source/constant/PostgresConstant.java @@ -0,0 +1,21 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.bytedance.bitsail.connector.cdc.postgres.source.constant; + +public class PostgresConstant { + public static final String LSN = "lsn"; + public static final String TS_USEC = "ts_usec"; +} diff --git a/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/main/java/com/bytedance/bitsail/connector/cdc/postgres/source/option/PostgresChangeEventOptions.java b/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/main/java/com/bytedance/bitsail/connector/cdc/postgres/source/option/PostgresChangeEventOptions.java new file mode 100644 index 000000000..5fb6cdf1f --- /dev/null +++ b/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/main/java/com/bytedance/bitsail/connector/cdc/postgres/source/option/PostgresChangeEventOptions.java @@ -0,0 +1,32 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.bytedance.bitsail.connector.cdc.postgres.source.option; + +import com.bytedance.bitsail.common.option.ConfigOption; +import com.bytedance.bitsail.connector.cdc.option.BinlogReaderOptions; + +import static com.bytedance.bitsail.common.option.ConfigOptions.key; +import static com.bytedance.bitsail.common.option.ReaderOptions.READER_PREFIX; + +public interface PostgresChangeEventOptions extends BinlogReaderOptions { + ConfigOption PLUGIN_NAME = + key(READER_PREFIX + "plugin_name") + .defaultValue("wal2json"); + + ConfigOption SLOT_NAME = + key(READER_PREFIX + "slot_name") + .defaultValue("dts"); +} diff --git a/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/main/java/com/bytedance/bitsail/connector/cdc/postgres/source/reader/PostgresChangeEventReader.java b/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/main/java/com/bytedance/bitsail/connector/cdc/postgres/source/reader/PostgresChangeEventReader.java new file mode 100644 index 000000000..efa607264 --- /dev/null +++ b/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/main/java/com/bytedance/bitsail/connector/cdc/postgres/source/reader/PostgresChangeEventReader.java @@ -0,0 +1,56 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.bytedance.bitsail.connector.cdc.postgres.source.reader; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.connector.cdc.postgres.source.constant.PostgresConstant; +import com.bytedance.bitsail.connector.cdc.source.offset.BinlogOffset; +import com.bytedance.bitsail.connector.cdc.source.reader.BinlogSourceReader; +import com.bytedance.bitsail.connector.cdc.source.reader.BinlogSplitReader; +import com.bytedance.bitsail.connector.cdc.source.split.BinlogSplit; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class PostgresChangeEventReader extends BinlogSourceReader { + public PostgresChangeEventReader(BitSailConfiguration jobConf, Context readerContext) { + super(jobConf, readerContext); + } + + @Override + public BinlogSplitReader getReader() { + return new PostgresChangeEventSplitReader(jobConf, readerContext.getIndexOfSubtask()); + } + + @Override + public List snapshotState(long checkpointId) { + // store the latest offset + Map readerOffset = this.reader.getOffset(); + BinlogOffset offset = BinlogOffset.specified(); + offset.addProps(PostgresConstant.LSN, readerOffset.get(PostgresConstant.LSN)); + offset.addProps(PostgresConstant.TS_USEC, readerOffset.get(PostgresConstant.TS_USEC)); + List splits = new ArrayList<>(); + BinlogSplit split = BinlogSplit.builder() + .splitId("binlog-0") + .beginOffset(offset) + .endOffset(BinlogOffset.boundless()) + .build(); + splits.add(split); + return splits; + } +} diff --git a/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/main/java/com/bytedance/bitsail/connector/cdc/postgres/source/reader/PostgresChangeEventSplitReader.java b/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/main/java/com/bytedance/bitsail/connector/cdc/postgres/source/reader/PostgresChangeEventSplitReader.java new file mode 100644 index 000000000..dfd933a58 --- /dev/null +++ b/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/main/java/com/bytedance/bitsail/connector/cdc/postgres/source/reader/PostgresChangeEventSplitReader.java @@ -0,0 +1,44 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.bytedance.bitsail.connector.cdc.postgres.source.reader; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.cdc.jdbc.source.config.AbstractJdbcDebeziumConfig; +import com.bytedance.bitsail.connector.cdc.jdbc.source.reader.AbstractJdbcChangeEventSplitReader; +import com.bytedance.bitsail.connector.cdc.jdbc.source.streaming.AbstractSplitChangeEventStreamingTaskContext; +import com.bytedance.bitsail.connector.cdc.postgres.source.config.PostgresConfig; +import com.bytedance.bitsail.connector.cdc.postgres.source.streaming.PostgresChangeEventStreamingTaskContext; +import com.bytedance.bitsail.connector.cdc.source.split.BinlogSplit; +import io.debezium.relational.RelationalDatabaseConnectorConfig; + +import java.io.IOException; +import java.sql.SQLException; + +public class PostgresChangeEventSplitReader extends AbstractJdbcChangeEventSplitReader { + public PostgresChangeEventSplitReader(BitSailConfiguration jobConf, int subtaskId) { + super(jobConf, subtaskId); + } + + @Override + public AbstractJdbcDebeziumConfig getJdbcDebeziumConfig(BitSailConfiguration jobConf) { + return new PostgresConfig(jobConf); + } + + @Override + public AbstractSplitChangeEventStreamingTaskContext getSplitReaderTaskContext(BinlogSplit split, RelationalDatabaseConnectorConfig connectorConfig) { + return new PostgresChangeEventStreamingTaskContext(split, connectorConfig); + } +} diff --git a/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/main/java/com/bytedance/bitsail/connector/cdc/postgres/source/streaming/PostgresChangeEventStreamingTaskContext.java b/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/main/java/com/bytedance/bitsail/connector/cdc/postgres/source/streaming/PostgresChangeEventStreamingTaskContext.java new file mode 100644 index 000000000..3554134f6 --- /dev/null +++ b/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/main/java/com/bytedance/bitsail/connector/cdc/postgres/source/streaming/PostgresChangeEventStreamingTaskContext.java @@ -0,0 +1,217 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.bytedance.bitsail.connector.cdc.postgres.source.streaming; + +import com.bytedance.bitsail.common.BitSailException; +import com.bytedance.bitsail.connector.cdc.error.BinlogReaderErrorCode; +import com.bytedance.bitsail.connector.cdc.jdbc.source.streaming.AbstractSplitChangeEventStreamingTaskContext; +import com.bytedance.bitsail.connector.cdc.postgres.source.constant.PostgresConstant; +import com.bytedance.bitsail.connector.cdc.source.offset.BinlogOffset; +import com.bytedance.bitsail.connector.cdc.source.split.BinlogSplit; +import io.debezium.connector.base.ChangeEventQueue; +import io.debezium.connector.common.CdcSourceTaskContext; +import io.debezium.connector.postgresql.PostgresConnectorConfig; +import io.debezium.connector.postgresql.PostgresErrorHandler; +import io.debezium.connector.postgresql.PostgresEventMetadataProvider; +import io.debezium.connector.postgresql.PostgresOffsetContext; +import io.debezium.connector.postgresql.PostgresSchema; +import io.debezium.connector.postgresql.PostgresStreamingChangeEventSource; +import io.debezium.connector.postgresql.PostgresTaskContext; +import io.debezium.connector.postgresql.PostgresTopicSelector; +import io.debezium.connector.postgresql.PostgresValueConverter; +import io.debezium.connector.postgresql.TypeRegistry; +import io.debezium.connector.postgresql.connection.PostgresConnection; +import io.debezium.connector.postgresql.connection.ReplicationConnection; +import io.debezium.connector.postgresql.snapshot.NeverSnapshotter; +import io.debezium.jdbc.JdbcConnection; +import io.debezium.jdbc.JdbcValueConverters; +import io.debezium.pipeline.DataChangeEvent; +import io.debezium.pipeline.ErrorHandler; +import io.debezium.pipeline.source.spi.EventMetadataProvider; +import io.debezium.pipeline.source.spi.StreamingChangeEventSource; +import io.debezium.pipeline.spi.OffsetContext; +import io.debezium.relational.RelationalDatabaseConnectorConfig; +import io.debezium.relational.RelationalDatabaseSchema; +import io.debezium.schema.TopicSelector; +import io.debezium.util.Clock; +import io.debezium.util.Metronome; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.Charset; +import java.sql.SQLException; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; + +public class PostgresChangeEventStreamingTaskContext extends AbstractSplitChangeEventStreamingTaskContext { + private static final Logger LOG = LoggerFactory.getLogger(PostgresChangeEventStreamingTaskContext.class); + private ReplicationConnection replicationConnection; + + public PostgresChangeEventStreamingTaskContext (BinlogSplit split, RelationalDatabaseConnectorConfig connectorConfig) { + super(split, connectorConfig); + this.replicationConnection = createReplicationConnection((PostgresTaskContext)dbzTaskContext, false, 3, + Duration.ofMillis(Duration.ofSeconds(10).toMillis())); + } + + @Override + protected ErrorHandler buildErrorHandler(RelationalDatabaseConnectorConfig connectorConfig, ChangeEventQueue queue) { + return new PostgresErrorHandler(connectorConfig.getLogicalName(), queue); + } + + @Override + public JdbcConnection tryEstablishedConnection(RelationalDatabaseConnectorConfig connectorConfig) { + PostgresConnection connection = new PostgresConnection(connectorConfig.getJdbcConfig()); + return new PostgresConnection(connectorConfig.getJdbcConfig(), new TypeRegistry(connection)); + } + + @Override + public String threadNamePrefix() { + return "postgres-connector-task"; + } + + @Override + public OffsetContext buildOffsetContext() { + final PostgresOffsetContext offsetContext; + + switch (split.getBeginOffset().getOffsetType()) { + case SPECIFIED: + Map offset = buildPostgresOffsetProperties(split.getBeginOffset()); + offsetContext = new PostgresOffsetContext.Loader((PostgresConnectorConfig) connectorConfig).load(offset); + break; + //TODO: Add other offset context + case LATEST: + offsetContext = PostgresOffsetContext.initialContext((PostgresConnectorConfig) connectorConfig, + (PostgresConnection) jdbcConnection, Clock.SYSTEM); + break; + default: + throw new BitSailException(BinlogReaderErrorCode.UNSUPPORTED_ERROR, + String.format("the begin binlog type %s is not supported", split.getBeginOffset().getOffsetType())); + } + return offsetContext; + } + + public Map buildPostgresOffsetProperties(BinlogOffset offset) { + Map offsetProperties = new HashMap<>(); + offsetProperties.put(PostgresConstant.LSN, offset.getProps().get(PostgresConstant.LSN)); + offsetProperties.put(PostgresConstant.TS_USEC, offset.getProps().get(PostgresConstant.TS_USEC)); + return offsetProperties; + } + + @Override + public TopicSelector buildTopicSelector() { + return PostgresTopicSelector.create((PostgresConnectorConfig) connectorConfig); + } + + @Override + public StreamingChangeEventSource buildStreamingChangeEventSource() { + return new PostgresStreamingChangeEventSource( + (PostgresConnectorConfig) connectorConfig, + new NeverSnapshotter(), + (PostgresConnection)jdbcConnection, + eventDispatcher, + errorHandler, + Clock.SYSTEM, + (PostgresSchema) schema, + (PostgresTaskContext) dbzTaskContext, + this.replicationConnection); + } + + @Override + public void closeContextResources() { + try { + if (this.replicationConnection != null && this.replicationConnection.isConnected()) { + this.replicationConnection.close(); + } + } catch (Exception e) { + LOG.error("Failed to close replication connection of pg streaming task context: {}", e.getMessage()); + } + } + + @Override + public void testConnectionAndValidBinlogConfiguration() { + try { + String testSql = "SELECT version()"; + jdbcConnection.connect(); + jdbcConnection.execute(testSql); + LOG.info("Success to connect postgres!"); + } catch (SQLException e) { + throw new RuntimeException("Failed to connect", e); + } + } + + public ReplicationConnection createReplicationConnection(PostgresTaskContext taskContext, boolean doSnapshot, + int maxRetries, Duration retryDelay) + throws RuntimeException { + final Metronome metronome = Metronome.parker(retryDelay, Clock.SYSTEM); + short retryCount = 0; + ReplicationConnection replicationConnection = null; + while (retryCount <= maxRetries) { + try { + return taskContext.createReplicationConnection(doSnapshot); + } + catch (SQLException ex) { + retryCount++; + if (retryCount > maxRetries) { + LOG.error("Too many errors connecting to server. All {} retries failed.", maxRetries); + throw new RuntimeException(ex); + } + + LOG.warn("Error connecting to server; will attempt retry {} of {} after {} " + + "seconds. Exception message: {}", retryCount, maxRetries, retryDelay.getSeconds(), ex.getMessage()); + try { + metronome.pause(); + } + catch (InterruptedException e) { + LOG.warn("Connection retry sleep interrupted by exception: " + e); + Thread.currentThread().interrupt(); + } + } + } + return replicationConnection; + } + + @Override + public void closeStreamingChangeEventSource() { + LOG.info("Closing pg streaming task"); + } + + @Override + public RelationalDatabaseSchema buildRelationalDatabaseSchema(RelationalDatabaseConnectorConfig connectorConfig) { + return new PostgresSchema((PostgresConnectorConfig)connectorConfig, + new TypeRegistry((PostgresConnection) jdbcConnection), + topicSelector, + (PostgresValueConverter) valueConverters); + } + + @Override + public CdcSourceTaskContext buildCdcSourceTaskContext(RelationalDatabaseConnectorConfig connectorConfig, RelationalDatabaseSchema schema) { + return new PostgresTaskContext((PostgresConnectorConfig) connectorConfig, (PostgresSchema) schema, topicSelector); + } + + @Override + public EventMetadataProvider buildEventMetadataProvider() { + return new PostgresEventMetadataProvider(); + } + + @Override + public JdbcValueConverters buildValueConverters(RelationalDatabaseConnectorConfig connectorConfig) { + return PostgresValueConverter.of( + (PostgresConnectorConfig) connectorConfig, + Charset.defaultCharset(), + new TypeRegistry((PostgresConnection) jdbcConnection)); + } +} diff --git a/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/main/java/io/debezium/connector/postgresql/PostgresEventMetadataProvider.java b/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/main/java/io/debezium/connector/postgresql/PostgresEventMetadataProvider.java new file mode 100644 index 000000000..1726e4f1d --- /dev/null +++ b/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/main/java/io/debezium/connector/postgresql/PostgresEventMetadataProvider.java @@ -0,0 +1,79 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.debezium.connector.postgresql; + +import java.time.Instant; +import java.util.Map; + +import org.apache.kafka.connect.data.Struct; + +import io.debezium.data.Envelope; +import io.debezium.pipeline.source.spi.EventMetadataProvider; +import io.debezium.pipeline.spi.OffsetContext; +import io.debezium.schema.DataCollectionId; +import io.debezium.time.Conversions; +import io.debezium.util.Collect; + +public class PostgresEventMetadataProvider implements EventMetadataProvider { + + @Override + public Instant getEventTimestamp(DataCollectionId source, OffsetContext offset, Object key, Struct value) { + if (value == null) { + return null; + } + final Struct sourceInfo = value.getStruct(Envelope.FieldName.SOURCE); + if (source == null) { + return null; + } + if (sourceInfo.schema().field(SourceInfo.TIMESTAMP_USEC_KEY) != null) { + final Long timestamp = sourceInfo.getInt64(SourceInfo.TIMESTAMP_USEC_KEY); + return timestamp == null ? null : Conversions.toInstantFromMicros(timestamp); + } + final Long timestamp = sourceInfo.getInt64(SourceInfo.TIMESTAMP_KEY); + return timestamp == null ? null : Instant.ofEpochMilli(timestamp); + } + + @Override + public Map getEventSourcePosition(DataCollectionId source, OffsetContext offset, Object key, Struct value) { + if (value == null) { + return null; + } + final Struct sourceInfo = value.getStruct(Envelope.FieldName.SOURCE); + if (source == null) { + return null; + } + Long xmin = sourceInfo.getInt64(SourceInfo.XMIN_KEY); + + Map r = Collect.hashMapOf( + SourceInfo.LSN_KEY, Long.toString(sourceInfo.getInt64(SourceInfo.LSN_KEY))); + if (xmin != null) { + r.put(SourceInfo.XMIN_KEY, Long.toString(xmin)); + } + return r; + } + + @Override + public String getTransactionId(DataCollectionId source, OffsetContext offset, Object key, Struct value) { + if (value == null) { + return null; + } + final Struct sourceInfo = value.getStruct(Envelope.FieldName.SOURCE); + if (source == null) { + return null; + } + return Long.toString(sourceInfo.getInt64(SourceInfo.TXID_KEY)); + } +} diff --git a/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java b/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java new file mode 100644 index 000000000..fcf489504 --- /dev/null +++ b/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java @@ -0,0 +1,269 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.debezium.connector.postgresql; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.connect.data.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.annotation.NotThreadSafe; +import io.debezium.connector.postgresql.connection.PostgresConnection; +import io.debezium.connector.postgresql.connection.ServerInfo; +import io.debezium.jdbc.JdbcConnection; +import io.debezium.relational.RelationalDatabaseSchema; +import io.debezium.relational.Table; +import io.debezium.relational.TableId; +import io.debezium.relational.TableSchemaBuilder; +import io.debezium.relational.Tables; +import io.debezium.schema.TopicSelector; +import io.debezium.util.SchemaNameAdjuster; + +/** + * Component that records the schema information for the {@link PostgresConnector}. The schema information contains + * the {@link Tables table definitions} and the Kafka Connect {@link #schemaFor(TableId) Schema}s for each table, where the + * {@link Schema} excludes any columns that have been {@link PostgresConnectorConfig#COLUMN_EXCLUDE_LIST specified} in the + * configuration. + * + * @author Horia Chiorean + */ +@NotThreadSafe +public class PostgresSchema extends RelationalDatabaseSchema { + + protected final static String PUBLIC_SCHEMA_NAME = "public"; + private final static Logger LOGGER = LoggerFactory.getLogger(PostgresSchema.class); + + private final TypeRegistry typeRegistry; + + private final Map> tableIdToToastableColumns; + private final Map relationIdToTableId; + private final boolean readToastableColumns; + + /** + * Create a schema component given the supplied {@link PostgresConnectorConfig Postgres connector configuration}. + * + * @param config the connector configuration, which is presumed to be valid + */ + public PostgresSchema(PostgresConnectorConfig config, TypeRegistry typeRegistry, + TopicSelector topicSelector, PostgresValueConverter valueConverter) { + super(config, topicSelector, new Filters(config).tableFilter(), + config.getColumnFilter(), getTableSchemaBuilder(config, valueConverter), false, + config.getKeyMapper()); + + this.typeRegistry = typeRegistry; + this.tableIdToToastableColumns = new HashMap<>(); + this.relationIdToTableId = new HashMap<>(); + this.readToastableColumns = config.skipRefreshSchemaOnMissingToastableData(); + } + + private static TableSchemaBuilder getTableSchemaBuilder(PostgresConnectorConfig config, PostgresValueConverter valueConverter) { + return new TableSchemaBuilder(valueConverter, SchemaNameAdjuster.create(), config.customConverterRegistry(), config.getSourceInfoStructMaker().schema(), + config.getSanitizeFieldNames()); + } + + /** + * Initializes the content for this schema by reading all the database information from the supplied connection. + * + * @param connection a {@link JdbcConnection} instance, never {@code null} + * @param printReplicaIdentityInfo whether or not to look and print out replica identity information about the tables + * @return this object so methods can be chained together; never null + * @throws SQLException if there is a problem obtaining the schema from the database server + */ + protected PostgresSchema refresh(PostgresConnection connection, boolean printReplicaIdentityInfo) throws SQLException { + // read all the information from the DB + connection.readSchema(tables(), null, null, getTableFilter(), null, true); + if (printReplicaIdentityInfo) { + // print out all the replica identity info + tableIds().forEach(tableId -> printReplicaIdentityInfo(connection, tableId)); + } + // and then refresh the schemas + refreshSchemas(); + if (readToastableColumns) { + tableIds().forEach(tableId -> refreshToastableColumnsMap(connection, tableId)); + } + return this; + } + + private void printReplicaIdentityInfo(PostgresConnection connection, TableId tableId) { + try { + ServerInfo.ReplicaIdentity replicaIdentity = connection.readReplicaIdentityInfo(tableId); + LOGGER.info("REPLICA IDENTITY for '{}' is '{}'; {}", tableId, replicaIdentity, replicaIdentity.description()); + } + catch (SQLException e) { + LOGGER.warn("Cannot determine REPLICA IDENTITY info for '{}'", tableId); + } + } + + /** + * Refreshes this schema's content for a particular table + * + * @param connection a {@link JdbcConnection} instance, never {@code null} + * @param tableId the table identifier; may not be null + * @param refreshToastableColumns refreshes the cache of toastable columns for `tableId`, if {@code true} + * @throws SQLException if there is a problem refreshing the schema from the database server + */ + protected void refresh(PostgresConnection connection, TableId tableId, boolean refreshToastableColumns) throws SQLException { + Tables temp = new Tables(); + connection.readSchema(temp, null, null, tableId::equals, null, true); + + // the table could be deleted before the event was processed + if (temp.size() == 0) { + LOGGER.warn("Refresh of {} was requested but the table no longer exists", tableId); + return; + } + // overwrite (add or update) or views of the tables + tables().overwriteTable(temp.forTable(tableId)); + // refresh the schema + refreshSchema(tableId); + + if (refreshToastableColumns) { + // and refresh toastable columns info + refreshToastableColumnsMap(connection, tableId); + } + } + + protected boolean isFilteredOut(TableId id) { + return !getTableFilter().isIncluded(id); + } + + /** + * Discard any currently-cached schemas and rebuild them using the filters. + */ + protected void refreshSchemas() { + clearSchemas(); + + // Create TableSchema instances for any existing table ... + tableIds().forEach(this::refreshSchema); + } + + private void refreshToastableColumnsMap(PostgresConnection connection, TableId tableId) { + // This method populates the list of 'toastable' columns for `tableId`. + // A toastable column is one that has storage strategy 'x' (inline-compressible + secondary storage enabled), + // 'e' (secondary storage enabled), or 'm' (inline-compressible). + // + // Note that, rather confusingly, the 'm' storage strategy does in fact permit secondary storage, but only as a + // last resort. + // + // Also, we cannot account for the possibility that future versions of PostgreSQL introduce new storage strategies + // that include secondary storage. We should move to native decoding in PG 10 and get rid of this hacky code + // before that possibility is realized. + + // Collect the non-system (attnum > 0), present (not attisdropped) column names that are toastable. + // + // NOTE (Ian Axelrod): + // I Would prefer to use data provided by PgDatabaseMetaData, but the PG JDBC driver does not expose storage type + // information. Thus, we need to make a separate query. If we are refreshing schemas rarely, this is not a big + // deal. + List toastableColumns = new ArrayList<>(); + String relName = tableId.table(); + String schema = tableId.schema() != null && tableId.schema().length() > 0 ? tableId.schema() : "public"; + String statement = "select att.attname" + + " from pg_attribute att " + + " join pg_class tbl on tbl.oid = att.attrelid" + + " join pg_namespace ns on tbl.relnamespace = ns.oid" + + " where tbl.relname = ?" + + " and ns.nspname = ?" + + " and att.attnum > 0" + + " and att.attstorage in ('x', 'e', 'm')" + + " and not att.attisdropped;"; + + try { + connection.prepareQuery(statement, stmt -> { + stmt.setString(1, relName); + stmt.setString(2, schema); + }, rs -> { + while (rs.next()) { + toastableColumns.add(rs.getString(1)); + } + }); + if (!connection.connection().getAutoCommit()) { + connection.connection().commit(); + } + } + catch (SQLException e) { + throw new RuntimeException("Unable to refresh table columns mapping", e); + } + + tableIdToToastableColumns.put(tableId, Collections.unmodifiableList(toastableColumns)); + } + + protected static TableId parse(String table) { + TableId tableId = TableId.parse(table, false); + if (tableId == null) { + return null; + } + return tableId.schema() == null ? new TableId(tableId.catalog(), PUBLIC_SCHEMA_NAME, tableId.table()) : tableId; + } + + public TypeRegistry getTypeRegistry() { + return typeRegistry; + } + + public List getToastableColumnsForTableId(TableId tableId) { + return tableIdToToastableColumns.getOrDefault(tableId, Collections.emptyList()); + } + + /** + * Applies schema changes for the specified table. + * + * @param relationId the postgres relation unique identifier for the table + * @param table externally constructed table, typically from the decoder; must not be null + */ + public void applySchemaChangesForTable(int relationId, Table table) { + assert table != null; + + if (isFilteredOut(table.id())) { + LOGGER.trace("Skipping schema refresh for table '{}' with relation '{}' as table is filtered", table.id(), relationId); + return; + } + + relationIdToTableId.put(relationId, table.id()); + refresh(table); + } + + /** + * Resolve a {@link Table} based on a supplied table relation unique identifier. + *

+ * This implementation relies on a prior call to {@link #applySchemaChangesForTable(int, Table)} to have + * applied schema changes from a replication stream with the {@code relationId} for the relationship to exist + * and be capable of lookup. + * + * @param relationId the unique table relation identifier + * @return the resolved table or null + */ + public Table tableFor(int relationId) { + TableId tableId = relationIdToTableId.get(relationId); + if (tableId == null) { + LOGGER.debug("Relation '{}' is unknown, cannot resolve to table", relationId); + return null; + } + LOGGER.debug("Relation '{}' resolved to table '{}'", relationId, tableId); + return tableFor(tableId); + } + + @Override + public boolean tableInformationComplete() { + // PostgreSQL does not support HistorizedDatabaseSchema - so no tables are recovered + return false; + } +} diff --git a/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java b/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java new file mode 100644 index 000000000..d6a3e6a23 --- /dev/null +++ b/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java @@ -0,0 +1,136 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.debezium.connector.postgresql; + +import java.sql.SQLException; +import java.util.Collections; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.annotation.ThreadSafe; +import io.debezium.connector.common.CdcSourceTaskContext; +import io.debezium.connector.postgresql.connection.PostgresConnection; +import io.debezium.connector.postgresql.connection.ReplicationConnection; +import io.debezium.connector.postgresql.spi.SlotState; +import io.debezium.relational.TableId; +import io.debezium.schema.TopicSelector; +import io.debezium.util.Clock; +import io.debezium.util.ElapsedTimeStrategy; + +/** + * The context of a {@link PostgresConnectorTask}. This deals with most of the brunt of reading various configuration options + * and creating other objects with these various options. + * + * @author Horia Chiorean (hchiorea@redhat.com) + */ +@ThreadSafe +public class PostgresTaskContext extends CdcSourceTaskContext { + + protected final static Logger LOGGER = LoggerFactory.getLogger(PostgresTaskContext.class); + + private final PostgresConnectorConfig config; + private final TopicSelector topicSelector; + private final PostgresSchema schema; + + private ElapsedTimeStrategy refreshXmin; + private Long lastXmin; + + public PostgresTaskContext(PostgresConnectorConfig config, PostgresSchema schema, TopicSelector topicSelector) { + super(config.getContextName(), config.getLogicalName(), Collections::emptySet); + + this.config = config; + if (config.xminFetchInterval().toMillis() > 0) { + this.refreshXmin = ElapsedTimeStrategy.constant(Clock.SYSTEM, config.xminFetchInterval().toMillis()); + } + this.topicSelector = topicSelector; + assert schema != null; + this.schema = schema; + } + + protected TopicSelector topicSelector() { + return topicSelector; + } + + protected PostgresSchema schema() { + return schema; + } + + protected PostgresConnectorConfig config() { + return config; + } + + protected void refreshSchema(PostgresConnection connection, boolean printReplicaIdentityInfo) throws SQLException { + schema.refresh(connection, printReplicaIdentityInfo); + } + + Long getSlotXmin(PostgresConnection connection) throws SQLException { + // when xmin fetch is set to 0, we don't track it to ignore any performance of querying the + // slot periodically + if (config.xminFetchInterval().toMillis() <= 0) { + return null; + } + assert (this.refreshXmin != null); + + if (this.refreshXmin.hasElapsed()) { + lastXmin = getCurrentSlotState(connection).slotCatalogXmin(); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Fetched new xmin from slot of {}", lastXmin); + } + } + else { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("reusing xmin value of {}", lastXmin); + } + } + + return lastXmin; + } + + private SlotState getCurrentSlotState(PostgresConnection connection) throws SQLException { + return connection.getReplicationSlotState(config.slotName(), config.plugin().getPostgresPluginName()); + } + + public ReplicationConnection createReplicationConnection(boolean doSnapshot) throws SQLException { + final boolean dropSlotOnStop = config.dropSlotOnStop(); + if (dropSlotOnStop) { + LOGGER.warn( + "Connector has enabled automated replication slot removal upon restart ({} = true). " + + "This setting is not recommended for production environments, as a new replication slot " + + "will be created after a connector restart, resulting in missed data change events.", + PostgresConnectorConfig.DROP_SLOT_ON_STOP.name()); + } + return ReplicationConnection.builder(config) + .withSlot(config.slotName()) + .withPublication(config.publicationName()) + .withTableFilter(config.getTableFilters()) + .withPublicationAutocreateMode(config.publicationAutocreateMode()) + .withPlugin(config.plugin()) + .withTruncateHandlingMode(config.truncateHandlingMode()) + .dropSlotOnClose(dropSlotOnStop) + .streamParams(config.streamParams()) + .statusUpdateInterval(config.statusUpdateInterval()) + .withTypeRegistry(schema.getTypeRegistry()) + .doSnapshot(doSnapshot) + .withSchema(schema) + .build(); + } + + PostgresConnectorConfig getConfig() { + return config; + } +} diff --git a/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/test/java/com/bytedance/bitsail/connector/cdc/postgres/container/PostgresContainerMariadbAdapter.java b/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/test/java/com/bytedance/bitsail/connector/cdc/postgres/container/PostgresContainerMariadbAdapter.java new file mode 100644 index 000000000..47670556e --- /dev/null +++ b/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/test/java/com/bytedance/bitsail/connector/cdc/postgres/container/PostgresContainerMariadbAdapter.java @@ -0,0 +1,36 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.bytedance.bitsail.connector.cdc.postgres.container; + +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.utility.DockerImageName; + +public class PostgresContainerMariadbAdapter> extends PostgreSQLContainer { + + public PostgresContainerMariadbAdapter(DockerImageName dockerImageName) { + super(String.valueOf(dockerImageName)); + } + + @Override + public String getDriverClassName() { + return "org.mariadb.jdbc.Driver"; + } + + @Override + public String getJdbcUrl() { + return super.getJdbcUrl(); + } +} diff --git a/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/test/java/com/bytedance/bitsail/connector/cdc/postgres/source/config/PostgresConfigTest.java b/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/test/java/com/bytedance/bitsail/connector/cdc/postgres/source/config/PostgresConfigTest.java new file mode 100644 index 000000000..9f10c7e8c --- /dev/null +++ b/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/test/java/com/bytedance/bitsail/connector/cdc/postgres/source/config/PostgresConfigTest.java @@ -0,0 +1,62 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.bytedance.bitsail.connector.cdc.postgres.source.config; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.cdc.model.ClusterInfo; +import com.bytedance.bitsail.connector.cdc.model.ConnectionInfo; +import com.bytedance.bitsail.connector.cdc.option.BinlogReaderOptions; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +public class PostgresConfigTest { + private static String HOST_NAME = "192.168"; + private static String URL = "123"; + private static int PORT = 123; + private static String USERNAME = "gary"; + private static String PASSWORD = "666"; + private static String DATABASE = "test_cdc"; + + @Test + public void testMysqlConfigConversion() { + BitSailConfiguration jobConf = BitSailConfiguration.newDefault(); + List cluster = new ArrayList<>(); + ClusterInfo instant = new ClusterInfo(); + instant.setMaster(new ConnectionInfo(HOST_NAME, URL, PORT)); + cluster.add(instant); + jobConf.set(BinlogReaderOptions.USER_NAME, USERNAME); + jobConf.set(BinlogReaderOptions.PASSWORD, PASSWORD); + jobConf.set(BinlogReaderOptions.DB_NAME, DATABASE); + jobConf.set(BinlogReaderOptions.CONNECTIONS, cluster); + + //add debezium properties + jobConf.set(PostgresConfig.DEBEZIUM_PREFIX + "key1", "value1"); + jobConf.set(PostgresConfig.DEBEZIUM_PREFIX + "key2", "value2"); + + PostgresConfig postgresConfig = new PostgresConfig(jobConf); + Assert.assertEquals(HOST_NAME, postgresConfig.getHostname()); + Assert.assertEquals(PORT, postgresConfig.getPort()); + Assert.assertEquals(USERNAME, postgresConfig.getUsername()); + Assert.assertEquals(DATABASE, postgresConfig.getDbName()); + Assert.assertEquals(PASSWORD, postgresConfig.getPassword()); + Assert.assertEquals("value1", postgresConfig.getDbzProperties().getProperty("key1")); + Assert.assertEquals("value2", postgresConfig.getDbzProperties().getProperty("key2")); + } +} + diff --git a/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/test/java/com/bytedance/bitsail/connector/cdc/postgres/source/debezium/PostgresChangeEventSplitContainerTest.java b/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/test/java/com/bytedance/bitsail/connector/cdc/postgres/source/debezium/PostgresChangeEventSplitContainerTest.java new file mode 100644 index 000000000..743bacb91 --- /dev/null +++ b/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/test/java/com/bytedance/bitsail/connector/cdc/postgres/source/debezium/PostgresChangeEventSplitContainerTest.java @@ -0,0 +1,116 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.bytedance.bitsail.connector.cdc.postgres.source.debezium; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.cdc.model.ClusterInfo; +import com.bytedance.bitsail.connector.cdc.model.ConnectionInfo; +import com.bytedance.bitsail.connector.cdc.option.BinlogReaderOptions; +import com.bytedance.bitsail.connector.cdc.postgres.container.PostgresContainerMariadbAdapter; +import com.bytedance.bitsail.connector.cdc.postgres.source.reader.PostgresChangeEventSplitReader; +import com.bytedance.bitsail.connector.cdc.postgres.source.util.TestDatabase; +import com.bytedance.bitsail.connector.cdc.source.offset.BinlogOffset; +import com.bytedance.bitsail.connector.cdc.source.split.BinlogSplit; +import com.google.common.collect.Lists; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +public class PostgresChangeEventSplitContainerTest { + private static final Logger LOG = LoggerFactory.getLogger(PostgresChangeEventSplitContainerTest.class); + + private static final String POSTGRESQL_DOCKER_IMAGER = "postgres:9.6.12"; + + private static final String TEST_USERNAME = "user1"; + private static final String TEST_PASSWORD = "password1"; + private static final String TEST_DATABASE = "test"; + + private PostgreSQLContainer container; + + @Before + public void before() { + container = new PostgresContainerMariadbAdapter<>(DockerImageName.parse(POSTGRESQL_DOCKER_IMAGER)) + .withInitScript("scripts/jdbc_to_print.sql") + .withUsername(TEST_USERNAME) + .withPassword(TEST_PASSWORD) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + Startables.deepStart(Stream.of(container)).join(); + } + + @After + public void after() { + container.close(); + } + + @Test + @Ignore + public void testDatabaseConnection() { + TestDatabase db = new TestDatabase(container, "test", TEST_USERNAME, TEST_PASSWORD); + db.executeSql("SHOW TABLES;"); + } + + @Test + @Ignore + public void testBinlogReader() throws InterruptedException, IOException { + BitSailConfiguration jobConf = BitSailConfiguration.newDefault(); + TestDatabase database = new TestDatabase(container, "test", TEST_USERNAME, TEST_PASSWORD); + + ConnectionInfo connectionInfo = ConnectionInfo.builder() + .host(database.getPostgresSqlContainer().getHost()) + .port(database.getPostgresSqlContainer().getFirstMappedPort()) + .url(database.getPostgresSqlContainer().getJdbcUrl()) + .build(); + ClusterInfo clusterInfo = ClusterInfo.builder() + .master(connectionInfo) + .build(); + + jobConf.set(BinlogReaderOptions.CONNECTIONS, Lists.newArrayList(clusterInfo)); + jobConf.set(BinlogReaderOptions.USER_NAME, database.getUsername()); + jobConf.set(BinlogReaderOptions.PASSWORD, database.getPassword()); + jobConf.set(BinlogReaderOptions.DB_NAME, database.getDatabaseName()); + jobConf.set("job.reader.debezium.database.allowPublicKeyRetrieval", "true"); + jobConf.set("job.reader.debezium.database.server.id", "123"); + jobConf.set("job.reader.debezium.database.server.name", "abc"); + jobConf.set("job.reader.debezium.gtid.source.filter.dml.events", "false"); + jobConf.set("job.reader.debezium.schema.history.internal", "io.debezium.relational.history.MemorySchemaHistory"); + jobConf.set("job.reader.debezium.database.history", "io.debezium.relational.history.MemoryDatabaseHistory"); + + PostgresChangeEventSplitReader reader = new PostgresChangeEventSplitReader(jobConf, 0); + BinlogSplit split = new BinlogSplit("split-1", BinlogOffset.earliest(), BinlogOffset.boundless()); + reader.readSplit(split); + int maxPeriod = 0; + while (maxPeriod <= 20) { + if (reader.hasNext()) { + reader.poll(); + } + maxPeriod++; + TimeUnit.SECONDS.sleep(1); + } + reader.close(); + } +} diff --git a/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/test/java/com/bytedance/bitsail/connector/cdc/postgres/source/reader/PostgresChangeEventReaderTest.java b/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/test/java/com/bytedance/bitsail/connector/cdc/postgres/source/reader/PostgresChangeEventReaderTest.java new file mode 100644 index 000000000..42c6b63c4 --- /dev/null +++ b/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/test/java/com/bytedance/bitsail/connector/cdc/postgres/source/reader/PostgresChangeEventReaderTest.java @@ -0,0 +1,96 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.bytedance.bitsail.connector.cdc.postgres.source.reader; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.cdc.model.ClusterInfo; +import com.bytedance.bitsail.connector.cdc.model.ConnectionInfo; +import com.bytedance.bitsail.connector.cdc.option.BinlogReaderOptions; +import com.bytedance.bitsail.connector.cdc.source.offset.BinlogOffset; +import com.bytedance.bitsail.connector.cdc.source.split.BinlogSplit; +import com.google.common.collect.Lists; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.concurrent.TimeUnit; + +public class PostgresChangeEventReaderTest { + private static final Logger LOG = LoggerFactory.getLogger(PostgresChangeEventReaderTest.class); + String username = "postgres"; + String password = "paas"; + String host = "10.37.141.239"; + int port = 5400; + String dbName = "test_cdc"; + + @Test + @Ignore + public void testConnection() throws SQLException { + Connection connection = DriverManager.getConnection( + getJdbcUrl(), username, password); + Statement statement = connection.createStatement(); + statement.execute("SELECT VERSION();"); + } + + @Test + @Ignore + public void testReader() throws Exception { + BitSailConfiguration jobConf = BitSailConfiguration.newDefault(); + + ConnectionInfo connectionInfo = ConnectionInfo.builder() + .host(host) + .port(port) + .url(getJdbcUrl()) + .build(); + ClusterInfo clusterInfo = ClusterInfo.builder() + .master(connectionInfo) + .build(); + + jobConf.set(BinlogReaderOptions.CONNECTIONS, Lists.newArrayList(clusterInfo)); + jobConf.set(BinlogReaderOptions.USER_NAME, username); + jobConf.set(BinlogReaderOptions.DB_NAME, dbName); + jobConf.set(BinlogReaderOptions.PASSWORD, password); + jobConf.set("job.reader.debezium.database.useSSL", "false"); + jobConf.set("job.reader.debezium.database.allowPublicKeyRetrieval", "true"); + jobConf.set("job.reader.debezium.database.server.id", "123"); + jobConf.set("job.reader.debezium.database.server.name", "dts"); + jobConf.set("job.reader.debezium.schema.history.internal", "io.debezium.relational.history.MemorySchemaHistory"); + jobConf.set("job.reader.debezium.database.history", "io.debezium.relational.history.MemoryDatabaseHistory"); + System.out.println(jobConf); + PostgresChangeEventSplitReader reader = new PostgresChangeEventSplitReader(jobConf, 0); + BinlogSplit split = new BinlogSplit("split-1", BinlogOffset.latest(), BinlogOffset.boundless()); + reader.readSplit(split); + int maxPeriod = 0; + while (maxPeriod <= 25) { + if (reader.hasNext()) { + reader.poll(); + maxPeriod++; + } + TimeUnit.SECONDS.sleep(1); + } + } + + public String getJdbcUrl() { + return String.format( + "jdbc:postgresql://%s:%s/%s?sslmode=require&rewriteBatchedStatements=true&autoReconnect=true&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull", + host, port, dbName); + } +} \ No newline at end of file diff --git a/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/test/java/com/bytedance/bitsail/connector/cdc/postgres/source/util/TestDatabase.java b/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/test/java/com/bytedance/bitsail/connector/cdc/postgres/source/util/TestDatabase.java new file mode 100644 index 000000000..fce64a83a --- /dev/null +++ b/bitsail-connectors/connector-cdc/connector-cdc-postgres/src/test/java/com/bytedance/bitsail/connector/cdc/postgres/source/util/TestDatabase.java @@ -0,0 +1,57 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.cdc.postgres.source.util; + +import lombok.Data; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.PostgreSQLContainer; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; + +/** + * A mock database instance for testing. + */ + +@Data +public class TestDatabase { + private static final Logger LOG = LoggerFactory.getLogger(TestDatabase.class); + + private final PostgreSQLContainer postgresSqlContainer; + + private final String databaseName; + + private final String username; + + private final String password; + + public void executeSql(String sql) { + try { + Connection connection = DriverManager.getConnection( + postgresSqlContainer.getJdbcUrl(), username, password); + Statement statement = connection.createStatement(); + LOG.info("executing sql: {}", sql); + boolean result = statement.execute(sql); + LOG.info("executing sql completed: {}", result); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } +} diff --git a/bitsail-connectors/connector-cdc/pom.xml b/bitsail-connectors/connector-cdc/pom.xml index 49b71610f..02f947bf1 100644 --- a/bitsail-connectors/connector-cdc/pom.xml +++ b/bitsail-connectors/connector-cdc/pom.xml @@ -30,6 +30,8 @@ connector-cdc-base connector-cdc-mysql + connector-cdc-postgres + connector-cdc-jdbc-base diff --git a/bitsail-cores/bitsail-core-flink-bridge/src/main/java/com/bytedance/bitsail/core/flink/bridge/reader/delegate/DelegateFlinkSourceReader.java b/bitsail-cores/bitsail-core-flink-bridge/src/main/java/com/bytedance/bitsail/core/flink/bridge/reader/delegate/DelegateFlinkSourceReader.java index 4b72ff25f..ee329d22e 100644 --- a/bitsail-cores/bitsail-core-flink-bridge/src/main/java/com/bytedance/bitsail/core/flink/bridge/reader/delegate/DelegateFlinkSourceReader.java +++ b/bitsail-cores/bitsail-core-flink-bridge/src/main/java/com/bytedance/bitsail/core/flink/bridge/reader/delegate/DelegateFlinkSourceReader.java @@ -33,6 +33,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import lombok.SneakyThrows; import org.apache.flink.api.connector.source.ReaderOutput; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; @@ -137,6 +138,7 @@ public void sendSplitRequest() { } } + @SneakyThrows @Override public void start() { this.metricManager.start();