diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/history/BigquerySchemaHistory.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/history/BigquerySchemaHistory.java index 71b3835..5998517 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/history/BigquerySchemaHistory.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/history/BigquerySchemaHistory.java @@ -12,12 +12,10 @@ import io.debezium.annotation.ThreadSafe; import io.debezium.common.annotation.Incubating; import io.debezium.config.Configuration; -import io.debezium.config.Field; import io.debezium.document.DocumentReader; import io.debezium.document.DocumentWriter; import io.debezium.relational.history.*; import io.debezium.server.bigquery.BatchUtil; -import io.debezium.util.Collect; import io.debezium.util.FunctionalReadWriteLock; import io.debezium.util.Strings; @@ -27,7 +25,6 @@ import java.nio.file.Files; import java.sql.SQLException; import java.sql.Timestamp; -import java.util.Collection; import java.util.Optional; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; @@ -59,15 +56,11 @@ public final class BigquerySchemaHistory extends AbstractDatabaseHistory { public static final String DATABASE_HISTORY_STORAGE_TABLE_INSERT = "INSERT INTO %s VALUES ( ?, ?, ? )"; public static final String DATABASE_HISTORY_STORAGE_TABLE_SELECT = "SELECT id, history_data, record_insert_ts FROM %s ORDER BY " + "record_insert_ts ASC"; - - static final Field SINK_TYPE_FIELD = Field.create("debezium.sink.type").required(); - public static Collection ALL_FIELDS = Collect.arrayListOf(SINK_TYPE_FIELD); - private final FunctionalReadWriteLock lock = FunctionalReadWriteLock.reentrant(); private final DocumentWriter writer = DocumentWriter.defaultWriter(); private final DocumentReader reader = DocumentReader.defaultReader(); private final AtomicBoolean running = new AtomicBoolean(); - BigquerySchemaHistoryConfig config; + BigquerySchemaHistoryConfig historyConfig; BigQuery bqClient; private String tableFullName; private TableId tableId; @@ -76,16 +69,16 @@ public final class BigquerySchemaHistory extends AbstractDatabaseHistory { public void configure(Configuration config, HistoryRecordComparator comparator, DatabaseHistoryListener listener, boolean useCatalogBeforeSchema) { super.configure(config, comparator, listener, useCatalogBeforeSchema); - this.config = new BigquerySchemaHistoryConfig(config); + this.historyConfig = new BigquerySchemaHistoryConfig(config); try { bqClient = BatchUtil.getBQClient( - Optional.ofNullable(this.config.getBigqueryProject()), - Optional.ofNullable(this.config.getBigqueryDataset()), - Optional.ofNullable(this.config.getBigqueryCredentialsFile()), - this.config.getBigqueryLocation() + Optional.ofNullable(this.historyConfig.getBigqueryProject()), + Optional.ofNullable(this.historyConfig.getBigqueryDataset()), + Optional.ofNullable(this.historyConfig.getBigqueryCredentialsFile()), + this.historyConfig.getBigqueryLocation() ); - tableFullName = String.format("%s.%s", this.config.getBigqueryDataset(), this.config.getBigqueryTable()); - tableId = TableId.of(this.config.getBigqueryDataset(), this.config.getBigqueryTable()); + tableFullName = String.format("%s.%s", this.historyConfig.getBigqueryDataset(), this.historyConfig.getBigqueryTable()); + tableId = TableId.of(this.historyConfig.getBigqueryDataset(), this.historyConfig.getBigqueryTable()); } catch (Exception e) { throw new DatabaseHistoryException("Failed to connect bigquery database history backing store", e); } @@ -211,9 +204,9 @@ public void initializeStorage() { BatchUtil.executeQuery(bqClient, String.format(DATABASE_HISTORY_TABLE_DDL, tableFullName)); LOG.warn("Created database history storage table {} to store history", tableFullName); - if (!Strings.isNullOrEmpty(config.getMigrateHistoryFile().strip())) { - LOG.warn("Migrating history from file {}", config.getMigrateHistoryFile()); - this.loadFileDatabaseHistory(new File(config.getMigrateHistoryFile())); + if (!Strings.isNullOrEmpty(historyConfig.getMigrateHistoryFile().strip())) { + LOG.warn("Migrating history from file {}", historyConfig.getMigrateHistoryFile()); + this.loadFileDatabaseHistory(new File(historyConfig.getMigrateHistoryFile())); } } catch (Exception e) { throw new DatabaseHistoryException("Creation of database history topic failed, please create the topic manually", e); @@ -251,48 +244,42 @@ public static class BigquerySchemaHistoryConfig { private final Configuration config; public BigquerySchemaHistoryConfig(Configuration config) { - - if (!config.validateAndRecord(ALL_FIELDS, LOG::error)) { - throw new DatabaseHistoryException( - "Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details"); - } - config.validateAndRecord(ALL_FIELDS, LOG::error); - this.config = config; } - public String sinkType() { - String type = this.config.getString(SINK_TYPE_FIELD); - if (type == null) { - throw new DatabaseHistoryException("The config property debezium.sink.type is required " + - "but it could not be found in any config source"); - } - return type; + private String getConfig(String configName, String fallbackConfigName, String defaultValue) { + return this.config.getString(configName, this.config.getString(fallbackConfigName, defaultValue)); } public String getBigqueryProject() { - return this.config.getString(Field.create(String.format("debezium.sink.%s.project", this.sinkType()))); + return getConfig(CONFIGURATION_FIELD_PREFIX_STRING + "bigquerybatch.project", + CONFIGURATION_FIELD_PREFIX_STRING + "bigquerystream.project", null); } public String getBigqueryDataset() { - return this.config.getString(Field.create(String.format("debezium.sink.%s.dataset", this.sinkType()))); + return getConfig(CONFIGURATION_FIELD_PREFIX_STRING + "bigquerybatch.dataset", + CONFIGURATION_FIELD_PREFIX_STRING + "bigquerystream.dataset", null); } public String getBigqueryTable() { - return this.config.getString(Field.create("database.history.bigquery.table-name").withDefault( - "debezium_database_history_storage")); + return getConfig(CONFIGURATION_FIELD_PREFIX_STRING + "bigquery.table-name", + CONFIGURATION_FIELD_PREFIX_STRING + "bigquerystream.table-name", "debezium_database_history_storage" + ); } public String getMigrateHistoryFile() { - return this.config.getString(Field.create("database.history.bigquery.migrate-history-file").withDefault("")); + return this.getConfig(CONFIGURATION_FIELD_PREFIX_STRING + "bigquery.migrate-history-file", + CONFIGURATION_FIELD_PREFIX_STRING + "bigquerystream.migrate-history-file", ""); } public String getBigqueryCredentialsFile() { - return this.config.getString(Field.create(String.format("debezium.sink.%s.credentials-file", this.sinkType())).withDefault("")); + return this.getConfig(CONFIGURATION_FIELD_PREFIX_STRING + "bigquerybatch.credentials-file", + CONFIGURATION_FIELD_PREFIX_STRING + "bigquerystream.credentials-file", ""); } public String getBigqueryLocation() { - return this.config.getString(Field.create(String.format("debezium.sink.%s.location", this.sinkType())).withDefault("US")); + return this.getConfig(CONFIGURATION_FIELD_PREFIX_STRING + "bigquerybatch.location", + CONFIGURATION_FIELD_PREFIX_STRING + "bigquerystream.location", "US"); } } diff --git a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/offset/BigqueryOffsetBackingStore.java b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/offset/BigqueryOffsetBackingStore.java index 0bfc45e..9d96eb4 100644 --- a/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/offset/BigqueryOffsetBackingStore.java +++ b/debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/offset/BigqueryOffsetBackingStore.java @@ -60,7 +60,7 @@ public class BigqueryOffsetBackingStore extends MemoryOffsetBackingStore impleme BigQuery bqClient; private String tableFullName; private TableId tableId; - BigqueryOffsetBackingStoreConfig config; + BigqueryOffsetBackingStoreConfig offsetConfig; protected static final ObjectMapper mapper = new ObjectMapper(); protected Map data = new HashMap<>(); @@ -75,17 +75,17 @@ public String getTableFullName() { @Override public void configure(WorkerConfig config) { super.configure(config); - this.config = new BigqueryOffsetBackingStoreConfig(Configuration.from(config.originalsStrings())); + this.offsetConfig = new BigqueryOffsetBackingStoreConfig(Configuration.from(config.originalsStrings())); try { bqClient = BatchUtil.getBQClient( - Optional.ofNullable(this.config.getBigqueryProject()), - Optional.ofNullable(this.config.getBigqueryDataset()), - Optional.ofNullable(this.config.getBigqueryCredentialsFile()), - this.config.getBigqueryLocation() + Optional.ofNullable(this.offsetConfig.getBigqueryProject()), + Optional.ofNullable(this.offsetConfig.getBigqueryDataset()), + Optional.ofNullable(this.offsetConfig.getBigqueryCredentialsFile()), + this.offsetConfig.getBigqueryLocation() ); - tableFullName = String.format("%s.%s", this.config.getBigqueryDataset(), this.config.getBigqueryTable()); - tableId = TableId.of(this.config.getBigqueryDataset(), this.config.getBigqueryTable()); + tableFullName = String.format("%s.%s", this.offsetConfig.getBigqueryDataset(), this.offsetConfig.getBigqueryTable()); + tableId = TableId.of(this.offsetConfig.getBigqueryDataset(), this.offsetConfig.getBigqueryTable()); } catch (Exception e) { throw new IllegalStateException("Failed to connect bigquery offset backing store", e); } @@ -112,9 +112,9 @@ private void initializeTable() throws SQLException { BatchUtil.executeQuery(bqClient, String.format(OFFSET_STORAGE_TABLE_DDL, tableFullName)); LOG.warn("Created offset storage table {} to store offset", tableFullName); - if (!Strings.isNullOrEmpty(config.getMigrateOffsetFile().strip())){ - LOG.warn("Loading offset from file {}", config.getMigrateOffsetFile()); - this.loadFileOffset(new File(config.getMigrateOffsetFile())); + if (!Strings.isNullOrEmpty(offsetConfig.getMigrateOffsetFile().strip())){ + LOG.warn("Loading offset from file {}", offsetConfig.getMigrateOffsetFile()); + this.loadFileOffset(new File(offsetConfig.getMigrateOffsetFile())); } } }