Skip to content

Commit

Permalink
Hotfix improve storage classes code (#79)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed Feb 10, 2023
1 parent d9bbd17 commit 226ef86
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@
import io.debezium.annotation.ThreadSafe;
import io.debezium.common.annotation.Incubating;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.relational.history.*;
import io.debezium.server.bigquery.BatchUtil;
import io.debezium.util.Collect;
import io.debezium.util.FunctionalReadWriteLock;
import io.debezium.util.Strings;

Expand All @@ -27,7 +25,6 @@
import java.nio.file.Files;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Collection;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -59,15 +56,11 @@ public final class BigquerySchemaHistory extends AbstractDatabaseHistory {
public static final String DATABASE_HISTORY_STORAGE_TABLE_INSERT = "INSERT INTO %s VALUES ( ?, ?, ? )";
public static final String DATABASE_HISTORY_STORAGE_TABLE_SELECT = "SELECT id, history_data, record_insert_ts FROM %s ORDER BY " +
"record_insert_ts ASC";

static final Field SINK_TYPE_FIELD = Field.create("debezium.sink.type").required();
public static Collection<Field> ALL_FIELDS = Collect.arrayListOf(SINK_TYPE_FIELD);

private final FunctionalReadWriteLock lock = FunctionalReadWriteLock.reentrant();
private final DocumentWriter writer = DocumentWriter.defaultWriter();
private final DocumentReader reader = DocumentReader.defaultReader();
private final AtomicBoolean running = new AtomicBoolean();
BigquerySchemaHistoryConfig config;
BigquerySchemaHistoryConfig historyConfig;
BigQuery bqClient;
private String tableFullName;
private TableId tableId;
Expand All @@ -76,16 +69,16 @@ public final class BigquerySchemaHistory extends AbstractDatabaseHistory {
public void configure(Configuration config, HistoryRecordComparator comparator, DatabaseHistoryListener listener, boolean useCatalogBeforeSchema) {

super.configure(config, comparator, listener, useCatalogBeforeSchema);
this.config = new BigquerySchemaHistoryConfig(config);
this.historyConfig = new BigquerySchemaHistoryConfig(config);
try {
bqClient = BatchUtil.getBQClient(
Optional.ofNullable(this.config.getBigqueryProject()),
Optional.ofNullable(this.config.getBigqueryDataset()),
Optional.ofNullable(this.config.getBigqueryCredentialsFile()),
this.config.getBigqueryLocation()
Optional.ofNullable(this.historyConfig.getBigqueryProject()),
Optional.ofNullable(this.historyConfig.getBigqueryDataset()),
Optional.ofNullable(this.historyConfig.getBigqueryCredentialsFile()),
this.historyConfig.getBigqueryLocation()
);
tableFullName = String.format("%s.%s", this.config.getBigqueryDataset(), this.config.getBigqueryTable());
tableId = TableId.of(this.config.getBigqueryDataset(), this.config.getBigqueryTable());
tableFullName = String.format("%s.%s", this.historyConfig.getBigqueryDataset(), this.historyConfig.getBigqueryTable());
tableId = TableId.of(this.historyConfig.getBigqueryDataset(), this.historyConfig.getBigqueryTable());
} catch (Exception e) {
throw new DatabaseHistoryException("Failed to connect bigquery database history backing store", e);
}
Expand Down Expand Up @@ -211,9 +204,9 @@ public void initializeStorage() {
BatchUtil.executeQuery(bqClient, String.format(DATABASE_HISTORY_TABLE_DDL, tableFullName));
LOG.warn("Created database history storage table {} to store history", tableFullName);

if (!Strings.isNullOrEmpty(config.getMigrateHistoryFile().strip())) {
LOG.warn("Migrating history from file {}", config.getMigrateHistoryFile());
this.loadFileDatabaseHistory(new File(config.getMigrateHistoryFile()));
if (!Strings.isNullOrEmpty(historyConfig.getMigrateHistoryFile().strip())) {
LOG.warn("Migrating history from file {}", historyConfig.getMigrateHistoryFile());
this.loadFileDatabaseHistory(new File(historyConfig.getMigrateHistoryFile()));
}
} catch (Exception e) {
throw new DatabaseHistoryException("Creation of database history topic failed, please create the topic manually", e);
Expand Down Expand Up @@ -251,48 +244,42 @@ public static class BigquerySchemaHistoryConfig {
private final Configuration config;

public BigquerySchemaHistoryConfig(Configuration config) {

if (!config.validateAndRecord(ALL_FIELDS, LOG::error)) {
throw new DatabaseHistoryException(
"Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");
}
config.validateAndRecord(ALL_FIELDS, LOG::error);

this.config = config;
}

public String sinkType() {
String type = this.config.getString(SINK_TYPE_FIELD);
if (type == null) {
throw new DatabaseHistoryException("The config property debezium.sink.type is required " +
"but it could not be found in any config source");
}
return type;
private String getConfig(String configName, String fallbackConfigName, String defaultValue) {
return this.config.getString(configName, this.config.getString(fallbackConfigName, defaultValue));
}

public String getBigqueryProject() {
return this.config.getString(Field.create(String.format("debezium.sink.%s.project", this.sinkType())));
return getConfig(CONFIGURATION_FIELD_PREFIX_STRING + "bigquerybatch.project",
CONFIGURATION_FIELD_PREFIX_STRING + "bigquerystream.project", null);
}

public String getBigqueryDataset() {
return this.config.getString(Field.create(String.format("debezium.sink.%s.dataset", this.sinkType())));
return getConfig(CONFIGURATION_FIELD_PREFIX_STRING + "bigquerybatch.dataset",
CONFIGURATION_FIELD_PREFIX_STRING + "bigquerystream.dataset", null);
}

public String getBigqueryTable() {
return this.config.getString(Field.create("database.history.bigquery.table-name").withDefault(
"debezium_database_history_storage"));
return getConfig(CONFIGURATION_FIELD_PREFIX_STRING + "bigquery.table-name",
CONFIGURATION_FIELD_PREFIX_STRING + "bigquerystream.table-name", "debezium_database_history_storage"
);
}

public String getMigrateHistoryFile() {
return this.config.getString(Field.create("database.history.bigquery.migrate-history-file").withDefault(""));
return this.getConfig(CONFIGURATION_FIELD_PREFIX_STRING + "bigquery.migrate-history-file",
CONFIGURATION_FIELD_PREFIX_STRING + "bigquerystream.migrate-history-file", "");
}

public String getBigqueryCredentialsFile() {
return this.config.getString(Field.create(String.format("debezium.sink.%s.credentials-file", this.sinkType())).withDefault(""));
return this.getConfig(CONFIGURATION_FIELD_PREFIX_STRING + "bigquerybatch.credentials-file",
CONFIGURATION_FIELD_PREFIX_STRING + "bigquerystream.credentials-file", "");
}

public String getBigqueryLocation() {
return this.config.getString(Field.create(String.format("debezium.sink.%s.location", this.sinkType())).withDefault("US"));
return this.getConfig(CONFIGURATION_FIELD_PREFIX_STRING + "bigquerybatch.location",
CONFIGURATION_FIELD_PREFIX_STRING + "bigquerystream.location", "US");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class BigqueryOffsetBackingStore extends MemoryOffsetBackingStore impleme
BigQuery bqClient;
private String tableFullName;
private TableId tableId;
BigqueryOffsetBackingStoreConfig config;
BigqueryOffsetBackingStoreConfig offsetConfig;
protected static final ObjectMapper mapper = new ObjectMapper();
protected Map<String, String> data = new HashMap<>();

Expand All @@ -75,17 +75,17 @@ public String getTableFullName() {
@Override
public void configure(WorkerConfig config) {
super.configure(config);
this.config = new BigqueryOffsetBackingStoreConfig(Configuration.from(config.originalsStrings()));
this.offsetConfig = new BigqueryOffsetBackingStoreConfig(Configuration.from(config.originalsStrings()));

try {
bqClient = BatchUtil.getBQClient(
Optional.ofNullable(this.config.getBigqueryProject()),
Optional.ofNullable(this.config.getBigqueryDataset()),
Optional.ofNullable(this.config.getBigqueryCredentialsFile()),
this.config.getBigqueryLocation()
Optional.ofNullable(this.offsetConfig.getBigqueryProject()),
Optional.ofNullable(this.offsetConfig.getBigqueryDataset()),
Optional.ofNullable(this.offsetConfig.getBigqueryCredentialsFile()),
this.offsetConfig.getBigqueryLocation()
);
tableFullName = String.format("%s.%s", this.config.getBigqueryDataset(), this.config.getBigqueryTable());
tableId = TableId.of(this.config.getBigqueryDataset(), this.config.getBigqueryTable());
tableFullName = String.format("%s.%s", this.offsetConfig.getBigqueryDataset(), this.offsetConfig.getBigqueryTable());
tableId = TableId.of(this.offsetConfig.getBigqueryDataset(), this.offsetConfig.getBigqueryTable());
} catch (Exception e) {
throw new IllegalStateException("Failed to connect bigquery offset backing store", e);
}
Expand All @@ -112,9 +112,9 @@ private void initializeTable() throws SQLException {
BatchUtil.executeQuery(bqClient, String.format(OFFSET_STORAGE_TABLE_DDL, tableFullName));
LOG.warn("Created offset storage table {} to store offset", tableFullName);

if (!Strings.isNullOrEmpty(config.getMigrateOffsetFile().strip())){
LOG.warn("Loading offset from file {}", config.getMigrateOffsetFile());
this.loadFileOffset(new File(config.getMigrateOffsetFile()));
if (!Strings.isNullOrEmpty(offsetConfig.getMigrateOffsetFile().strip())){
LOG.warn("Loading offset from file {}", offsetConfig.getMigrateOffsetFile());
this.loadFileOffset(new File(offsetConfig.getMigrateOffsetFile()));
}
}
}
Expand Down

0 comments on commit 226ef86

Please sign in to comment.