Skip to content

Commit d9bbd17

Browse files
committed
Add storage class to save database history to bq (#77)
1 parent f0ffaca commit d9bbd17

File tree

6 files changed

+502
-23
lines changed

6 files changed

+502
-23
lines changed

debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/BatchUtil.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,17 @@
1212

1313
import java.io.FileInputStream;
1414
import java.io.IOException;
15+
import java.sql.SQLException;
1516
import java.util.HashMap;
17+
import java.util.List;
1618
import java.util.Map;
1719
import java.util.Optional;
1820
import javax.enterprise.inject.Instance;
1921
import javax.enterprise.inject.literal.NamedLiteral;
2022

2123
import com.google.api.gax.retrying.RetrySettings;
2224
import com.google.auth.oauth2.GoogleCredentials;
23-
import com.google.cloud.bigquery.BigQuery;
24-
import com.google.cloud.bigquery.BigQueryOptions;
25+
import com.google.cloud.bigquery.*;
2526
import org.eclipse.microprofile.config.Config;
2627
import org.slf4j.Logger;
2728
import org.slf4j.LoggerFactory;
@@ -102,5 +103,20 @@ public static BigQuery getBQClient(Optional<String> gcpProject, Optional<String>
102103
.getService();
103104

104105
}
106+
107+
public static TableResult executeQuery(BigQuery bqClient, String query, List<QueryParameterValue> parameters) throws SQLException {
108+
try {
109+
QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(query)
110+
.setPositionalParameters(parameters)
111+
.build();
112+
return bqClient.query(queryConfig);
113+
} catch (BigQueryException | InterruptedException e) {
114+
throw new SQLException(e);
115+
}
116+
}
117+
118+
public static TableResult executeQuery(BigQuery bqClient, String query) throws SQLException {
119+
return BatchUtil.executeQuery(bqClient, query, null);
120+
}
105121

106122
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,299 @@
1+
/*
2+
*
3+
* * Copyright memiiso Authors.
4+
* *
5+
* * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
6+
*
7+
*/
8+
9+
package io.debezium.server.bigquery.history;
10+
11+
import io.debezium.DebeziumException;
12+
import io.debezium.annotation.ThreadSafe;
13+
import io.debezium.common.annotation.Incubating;
14+
import io.debezium.config.Configuration;
15+
import io.debezium.config.Field;
16+
import io.debezium.document.DocumentReader;
17+
import io.debezium.document.DocumentWriter;
18+
import io.debezium.relational.history.*;
19+
import io.debezium.server.bigquery.BatchUtil;
20+
import io.debezium.util.Collect;
21+
import io.debezium.util.FunctionalReadWriteLock;
22+
import io.debezium.util.Strings;
23+
24+
import java.io.BufferedReader;
25+
import java.io.File;
26+
import java.io.IOException;
27+
import java.nio.file.Files;
28+
import java.sql.SQLException;
29+
import java.sql.Timestamp;
30+
import java.util.Collection;
31+
import java.util.Optional;
32+
import java.util.UUID;
33+
import java.util.concurrent.atomic.AtomicBoolean;
34+
import java.util.concurrent.atomic.AtomicInteger;
35+
import java.util.function.Consumer;
36+
37+
import autovalue.shaded.com.google.common.collect.ImmutableList;
38+
import com.google.cloud.bigquery.*;
39+
import org.slf4j.Logger;
40+
import org.slf4j.LoggerFactory;
41+
42+
/**
43+
* A {@link DatabaseHistory} implementation that stores the schema history to database table
44+
*
45+
* @author Ismail Simsek
46+
*/
47+
@ThreadSafe
48+
@Incubating
49+
public final class BigquerySchemaHistory extends AbstractDatabaseHistory {
50+
51+
private static final Logger LOG = LoggerFactory.getLogger(BigquerySchemaHistory.class);
52+
53+
public static final String DATABASE_HISTORY_TABLE_DDL = "CREATE TABLE IF NOT EXISTS %s " +
54+
"(id STRING NOT NULL, " +
55+
"history_data STRING, " +
56+
"record_insert_ts TIMESTAMP NOT NULL " +
57+
")";
58+
59+
public static final String DATABASE_HISTORY_STORAGE_TABLE_INSERT = "INSERT INTO %s VALUES ( ?, ?, ? )";
60+
public static final String DATABASE_HISTORY_STORAGE_TABLE_SELECT = "SELECT id, history_data, record_insert_ts FROM %s ORDER BY " +
61+
"record_insert_ts ASC";
62+
63+
static final Field SINK_TYPE_FIELD = Field.create("debezium.sink.type").required();
64+
public static Collection<Field> ALL_FIELDS = Collect.arrayListOf(SINK_TYPE_FIELD);
65+
66+
private final FunctionalReadWriteLock lock = FunctionalReadWriteLock.reentrant();
67+
private final DocumentWriter writer = DocumentWriter.defaultWriter();
68+
private final DocumentReader reader = DocumentReader.defaultReader();
69+
private final AtomicBoolean running = new AtomicBoolean();
70+
BigquerySchemaHistoryConfig config;
71+
BigQuery bqClient;
72+
private String tableFullName;
73+
private TableId tableId;
74+
75+
@Override
76+
public void configure(Configuration config, HistoryRecordComparator comparator, DatabaseHistoryListener listener, boolean useCatalogBeforeSchema) {
77+
78+
super.configure(config, comparator, listener, useCatalogBeforeSchema);
79+
this.config = new BigquerySchemaHistoryConfig(config);
80+
try {
81+
bqClient = BatchUtil.getBQClient(
82+
Optional.ofNullable(this.config.getBigqueryProject()),
83+
Optional.ofNullable(this.config.getBigqueryDataset()),
84+
Optional.ofNullable(this.config.getBigqueryCredentialsFile()),
85+
this.config.getBigqueryLocation()
86+
);
87+
tableFullName = String.format("%s.%s", this.config.getBigqueryDataset(), this.config.getBigqueryTable());
88+
tableId = TableId.of(this.config.getBigqueryDataset(), this.config.getBigqueryTable());
89+
} catch (Exception e) {
90+
throw new DatabaseHistoryException("Failed to connect bigquery database history backing store", e);
91+
}
92+
93+
if (running.get()) {
94+
throw new DatabaseHistoryException("Bigquery database history process already initialized table: " + tableFullName);
95+
}
96+
}
97+
98+
@Override
99+
public void start() {
100+
super.start();
101+
lock.write(() -> {
102+
if (running.compareAndSet(false, true)) {
103+
try {
104+
if (!storageExists()) {
105+
initializeStorage();
106+
}
107+
} catch (Exception e) {
108+
throw new DatabaseHistoryException("Unable to create history table: " + tableFullName + " : " + e.getMessage(),
109+
e);
110+
}
111+
}
112+
});
113+
}
114+
115+
public String getTableFullName() {
116+
return tableFullName;
117+
}
118+
119+
@Override
120+
protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException {
121+
if (record == null) {
122+
return;
123+
}
124+
lock.write(() -> {
125+
if (!running.get()) {
126+
throw new DebeziumException("The history has been stopped and will not accept more records");
127+
}
128+
try {
129+
String recordDocString = writer.write(record.document());
130+
LOG.trace("Saving history data {}", recordDocString);
131+
Timestamp currentTs = new Timestamp(System.currentTimeMillis());
132+
BatchUtil.executeQuery(bqClient,
133+
String.format(DATABASE_HISTORY_STORAGE_TABLE_INSERT, tableFullName),
134+
ImmutableList.of(
135+
QueryParameterValue.string(UUID.randomUUID().toString()),
136+
QueryParameterValue.string(recordDocString),
137+
QueryParameterValue.timestamp(String.valueOf(currentTs))
138+
)
139+
);
140+
LOG.trace("Successfully saved history data to bigquery table");
141+
} catch (IOException | SQLException e) {
142+
throw new DatabaseHistoryException("Failed to store record: " + record, e);
143+
}
144+
});
145+
}
146+
147+
@Override
148+
public void stop() {
149+
running.set(false);
150+
super.stop();
151+
}
152+
153+
@Override
154+
protected synchronized void recoverRecords(Consumer<HistoryRecord> records) {
155+
lock.write(() -> {
156+
try {
157+
if (exists()) {
158+
TableResult rs = BatchUtil.executeQuery(bqClient, String.format(DATABASE_HISTORY_STORAGE_TABLE_SELECT, tableFullName));
159+
for (FieldValueList row : rs.getValues()) {
160+
String line = row.get("history_data").getStringValue();
161+
if (line == null) {
162+
break;
163+
}
164+
if (!line.isEmpty()) {
165+
records.accept(new HistoryRecord(reader.read(line)));
166+
}
167+
}
168+
}
169+
} catch (IOException | SQLException e) {
170+
throw new DatabaseHistoryException("Failed to recover records", e);
171+
}
172+
});
173+
}
174+
175+
@Override
176+
public boolean storageExists() {
177+
Table table = bqClient.getTable(tableId);
178+
return table != null;
179+
}
180+
181+
@Override
182+
public boolean exists() {
183+
184+
if (!storageExists()) {
185+
return false;
186+
}
187+
188+
int numRows = 0;
189+
try {
190+
TableResult rs = BatchUtil.executeQuery(bqClient, "SELECT COUNT(*) as row_count FROM " + tableFullName);
191+
for (FieldValueList row : rs.getValues()) {
192+
numRows = row.get("row_count").getNumericValue().intValue();
193+
break;
194+
}
195+
} catch (SQLException e) {
196+
throw new DatabaseHistoryException("Failed to check database history storage", e);
197+
}
198+
return numRows > 0;
199+
}
200+
201+
@Override
202+
public String toString() {
203+
return "Bigquery database history storage: " + (tableFullName != null ? tableFullName : "(unstarted)");
204+
}
205+
206+
@Override
207+
public void initializeStorage() {
208+
if (!storageExists()) {
209+
try {
210+
LOG.debug("Creating table {} to store database history", tableFullName);
211+
BatchUtil.executeQuery(bqClient, String.format(DATABASE_HISTORY_TABLE_DDL, tableFullName));
212+
LOG.warn("Created database history storage table {} to store history", tableFullName);
213+
214+
if (!Strings.isNullOrEmpty(config.getMigrateHistoryFile().strip())) {
215+
LOG.warn("Migrating history from file {}", config.getMigrateHistoryFile());
216+
this.loadFileDatabaseHistory(new File(config.getMigrateHistoryFile()));
217+
}
218+
} catch (Exception e) {
219+
throw new DatabaseHistoryException("Creation of database history topic failed, please create the topic manually", e);
220+
}
221+
} else {
222+
LOG.debug("Storage is exists, skipping initialization");
223+
}
224+
}
225+
226+
private void loadFileDatabaseHistory(File file) {
227+
LOG.warn(String.format("Migrating file database history from:'%s' to Bigquery database history storage: %s",
228+
file.toPath(), tableFullName));
229+
AtomicInteger numRecords = new AtomicInteger();
230+
lock.write(() -> {
231+
try (BufferedReader historyReader = Files.newBufferedReader(file.toPath())) {
232+
while (true) {
233+
String line = historyReader.readLine();
234+
if (line == null) {
235+
break;
236+
}
237+
if (!line.isEmpty()) {
238+
this.storeRecord(new HistoryRecord(reader.read(line)));
239+
numRecords.getAndIncrement();
240+
}
241+
}
242+
} catch (IOException e) {
243+
logger.error("Failed to migrate history record from history file at {}", file.toPath(), e);
244+
}
245+
});
246+
LOG.warn("Migrated {} database history record. " +
247+
"Migrating file database history to Bigquery database history storage successfully completed", numRecords.get());
248+
}
249+
250+
public static class BigquerySchemaHistoryConfig {
251+
private final Configuration config;
252+
253+
public BigquerySchemaHistoryConfig(Configuration config) {
254+
255+
if (!config.validateAndRecord(ALL_FIELDS, LOG::error)) {
256+
throw new DatabaseHistoryException(
257+
"Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");
258+
}
259+
config.validateAndRecord(ALL_FIELDS, LOG::error);
260+
261+
this.config = config;
262+
}
263+
264+
public String sinkType() {
265+
String type = this.config.getString(SINK_TYPE_FIELD);
266+
if (type == null) {
267+
throw new DatabaseHistoryException("The config property debezium.sink.type is required " +
268+
"but it could not be found in any config source");
269+
}
270+
return type;
271+
}
272+
273+
public String getBigqueryProject() {
274+
return this.config.getString(Field.create(String.format("debezium.sink.%s.project", this.sinkType())));
275+
}
276+
277+
public String getBigqueryDataset() {
278+
return this.config.getString(Field.create(String.format("debezium.sink.%s.dataset", this.sinkType())));
279+
}
280+
281+
public String getBigqueryTable() {
282+
return this.config.getString(Field.create("database.history.bigquery.table-name").withDefault(
283+
"debezium_database_history_storage"));
284+
}
285+
286+
public String getMigrateHistoryFile() {
287+
return this.config.getString(Field.create("database.history.bigquery.migrate-history-file").withDefault(""));
288+
}
289+
290+
public String getBigqueryCredentialsFile() {
291+
return this.config.getString(Field.create(String.format("debezium.sink.%s.credentials-file", this.sinkType())).withDefault(""));
292+
}
293+
294+
public String getBigqueryLocation() {
295+
return this.config.getString(Field.create(String.format("debezium.sink.%s.location", this.sinkType())).withDefault("US"));
296+
}
297+
}
298+
299+
}

debezium-server-bigquery-sinks/src/main/java/io/debezium/server/bigquery/offset/BigqueryOffsetBackingStore.java

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -71,21 +71,6 @@ public BigqueryOffsetBackingStore() {
7171
public String getTableFullName() {
7272
return tableFullName;
7373
}
74-
75-
private TableResult executeQuery(String query, List<QueryParameterValue> parameters) throws SQLException {
76-
try {
77-
QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(query)
78-
.setPositionalParameters(parameters)
79-
.build();
80-
return bqClient.query(queryConfig);
81-
} catch (BigQueryException | InterruptedException e) {
82-
throw new SQLException(e);
83-
}
84-
}
85-
86-
private TableResult executeQuery(String query) throws SQLException {
87-
return this.executeQuery(query, null);
88-
}
8974

9075
@Override
9176
public void configure(WorkerConfig config) {
@@ -124,7 +109,7 @@ private void initializeTable() throws SQLException {
124109
Table table = bqClient.getTable(tableId);
125110
if (table == null) {
126111
LOG.debug("Creating table {} to store offset", tableFullName);
127-
executeQuery(String.format(OFFSET_STORAGE_TABLE_DDL, tableFullName));
112+
BatchUtil.executeQuery(bqClient, String.format(OFFSET_STORAGE_TABLE_DDL, tableFullName));
128113
LOG.warn("Created offset storage table {} to store offset", tableFullName);
129114

130115
if (!Strings.isNullOrEmpty(config.getMigrateOffsetFile().strip())){
@@ -137,11 +122,11 @@ private void initializeTable() throws SQLException {
137122
protected void save() {
138123
LOG.debug("Saving offset data to bigquery table...");
139124
try {
140-
this.executeQuery(String.format(OFFSET_STORAGE_TABLE_DELETE, tableFullName));
125+
BatchUtil.executeQuery(bqClient, String.format(OFFSET_STORAGE_TABLE_DELETE, tableFullName));
141126
String dataJson = mapper.writeValueAsString(data);
142127
LOG.debug("Saving offset data {}", dataJson);
143128
Timestamp currentTs = new Timestamp(System.currentTimeMillis());
144-
this.executeQuery(
129+
BatchUtil.executeQuery(bqClient,
145130
String.format(OFFSET_STORAGE_TABLE_INSERT, tableFullName),
146131
ImmutableList.of(
147132
QueryParameterValue.string(UUID.randomUUID().toString()),
@@ -159,7 +144,7 @@ protected void save() {
159144
private void load() {
160145
try {
161146
String dataJsonString = null;
162-
TableResult rs = this.executeQuery(String.format(OFFSET_STORAGE_TABLE_SELECT, tableFullName));
147+
TableResult rs = BatchUtil.executeQuery(bqClient, String.format(OFFSET_STORAGE_TABLE_SELECT, tableFullName));
163148
for (FieldValueList row : rs.getValues()) {
164149
dataJsonString = row.get("offset_data").getStringValue();
165150
break;

0 commit comments

Comments
 (0)