Skip to content

Commit d3d55a0

Browse files
committed
attempt to separate each DDL event handler to make the code a little more manageable
1 parent 06270e8 commit d3d55a0

12 files changed

+437
-178
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656
<properties>
5757
<avro.version>1.8.2</avro.version>
5858
<bigquery.version>1.78.0</bigquery.version>
59-
<cdap.version>6.4.0-SNAPSHOT</cdap.version>
59+
<cdap.version>6.4.0</cdap.version>
6060
<delta.version>0.4.0-SNAPSHOT</delta.version>
6161
<failsafe.version>2.3.3</failsafe.version>
6262
<gcs.version>1.78.0</gcs.version>

src/main/java/io/cdap/delta/bigquery/BigQueryEventConsumer.java

Lines changed: 33 additions & 177 deletions
Large diffs are not rendered by default.
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package io.cdap.delta.bigquery.event;
2+
3+
import com.google.cloud.bigquery.*;
4+
import com.google.cloud.storage.Bucket;
5+
import io.cdap.cdap.api.common.Bytes;
6+
import io.cdap.delta.api.DDLEvent;
7+
import io.cdap.delta.api.DeltaFailureException;
8+
import io.cdap.delta.bigquery.BigQueryEventConsumer;
9+
import io.cdap.delta.bigquery.BigQueryTableState;
10+
import io.cdap.delta.bigquery.Schemas;
11+
12+
import java.io.IOException;
13+
import java.util.List;
14+
import java.util.Map;
15+
16+
public class AlterTable extends DDLEventHandler {
17+
18+
public AlterTable(BigQueryEventConsumer consumer, EventHandlerConfig config) { super(consumer, config); }
19+
20+
@Override
21+
public void handleDDL(DDLEvent event, String normalizedDatabaseName, String normalizedTableName, String normalizedStagingTableName) throws IOException, DeltaFailureException, InterruptedException {
22+
23+
// need to flush any changes before altering the table to ensure all changes before the schema change
24+
// are in the table when it is altered.
25+
consumer.flush();
26+
// after a flush, the staging table will be gone, so no need to alter it.
27+
TableId tableId = TableId.of(config.project, normalizedDatabaseName, normalizedTableName);
28+
Table table = config.bigQuery.getTable(tableId);
29+
List<String> primaryKeys = event.getPrimaryKey();
30+
Clustering clustering = config.maxClusteringColumns <= 0 ? null :
31+
Clustering.newBuilder()
32+
.setFields(primaryKeys.subList(0, Math.min(config.maxClusteringColumns, primaryKeys.size())))
33+
.build();
34+
TableDefinition tableDefinition = StandardTableDefinition.newBuilder()
35+
.setSchema(Schemas.convert(BigQueryEventConsumer.addSupplementaryColumnsToTargetSchema(event.getSchema())))
36+
.setClustering(clustering)
37+
.build();
38+
TableInfo.Builder builder = TableInfo.newBuilder(tableId, tableDefinition);
39+
if (config.encryptionConfig != null) {
40+
builder.setEncryptionConfiguration(config.encryptionConfig);
41+
}
42+
TableInfo tableInfo = builder.build();
43+
if (table == null) {
44+
config.bigQuery.create(tableInfo);
45+
} else {
46+
config.bigQuery.update(tableInfo);
47+
}
48+
49+
consumer.updatePrimaryKeys(tableId, primaryKeys);
50+
}
51+
52+
53+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package io.cdap.delta.bigquery.event;
2+
3+
import com.google.cloud.bigquery.*;
4+
import com.google.cloud.storage.Bucket;
5+
import io.cdap.delta.api.DDLEvent;
6+
import io.cdap.delta.api.DeltaFailureException;
7+
import io.cdap.delta.bigquery.BigQueryEventConsumer;
8+
import io.cdap.delta.bigquery.BigQueryTarget;
9+
10+
import java.io.IOException;
11+
import java.util.List;
12+
import java.util.Map;
13+
14+
public class CreateDatabase extends DDLEventHandler {
15+
16+
public CreateDatabase(BigQueryEventConsumer consumer, EventHandlerConfig config) { super(consumer, config); }
17+
18+
@Override
19+
public void handleDDL(DDLEvent event, String normalizedDatabaseName, String normalizedTableName, String normalizedStagingTableName) throws IOException, DeltaFailureException, InterruptedException {
20+
DatasetId datasetId = DatasetId.of(config.project, normalizedDatabaseName);
21+
if (config.bigQuery.getDataset(datasetId) == null) {
22+
DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).setLocation(config.bucket.getLocation()).build();
23+
try {
24+
config.bigQuery.create(datasetInfo);
25+
} catch (BigQueryException e) {
26+
// It is possible that in multiple worker instances scenario
27+
// dataset is created by another worker instance after this worker instance
28+
// determined that dataset does not exists. Ignore error if dataset is created.
29+
if (e.getCode() != BigQueryTarget.CONFLICT) {
30+
throw e;
31+
}
32+
}
33+
}
34+
}
35+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package io.cdap.delta.bigquery.event;
2+
3+
import com.google.cloud.bigquery.*;
4+
import com.google.cloud.storage.Bucket;
5+
import io.cdap.cdap.api.common.Bytes;
6+
import io.cdap.delta.api.DDLEvent;
7+
import io.cdap.delta.api.DeltaFailureException;
8+
import io.cdap.delta.bigquery.BigQueryEventConsumer;
9+
import io.cdap.delta.bigquery.Schemas;
10+
11+
import java.io.IOException;
12+
import java.util.List;
13+
import java.util.Map;
14+
15+
public class CreateTable extends DDLEventHandler {
16+
17+
public CreateTable(BigQueryEventConsumer consumer, EventHandlerConfig config) { super(consumer, config); }
18+
19+
@Override
20+
public void handleDDL(DDLEvent event, String normalizedDatabaseName, String normalizedTableName, String normalizedStagingTableName) throws IOException, DeltaFailureException, InterruptedException {
21+
TableId tableId = TableId.of(config.project, normalizedDatabaseName, normalizedTableName);
22+
Table table = config.bigQuery.getTable(tableId);
23+
// SNAPSHOT data is directly loaded in the target table. Check if any such direct load was in progress
24+
// for the current table when target received CREATE_TABLE ddl. This indicates that the snapshot was abandoned
25+
// because of some failure scenario. Delete the existing table if any.
26+
byte[] state = config.context.getState(String.format(BigQueryEventConsumer.DIRECT_LOADING_IN_PROGRESS_PREFIX + "%s-%s",
27+
normalizedDatabaseName, normalizedTableName));
28+
if (table != null && state != null && Bytes.toBoolean(state)) {
29+
config.bigQuery.delete(tableId);
30+
}
31+
List<String> primaryKeys = event.getPrimaryKey();
32+
consumer.updatePrimaryKeys(tableId, primaryKeys);
33+
// TODO: check schema of table if it exists already
34+
if (table == null) {
35+
List<String> clusteringSupportedKeys = BigQueryEventConsumer.getClusteringSupportedKeys(primaryKeys, event.getSchema());
36+
Clustering clustering = config.maxClusteringColumns <= 0 || clusteringSupportedKeys.isEmpty() ? null :
37+
Clustering.newBuilder()
38+
.setFields(clusteringSupportedKeys.subList(0, Math.min(config.maxClusteringColumns,
39+
clusteringSupportedKeys.size())))
40+
.build();
41+
TableDefinition tableDefinition = StandardTableDefinition.newBuilder()
42+
.setSchema(Schemas.convert(BigQueryEventConsumer.addSupplementaryColumnsToTargetSchema(event.getSchema())))
43+
.setClustering(clustering)
44+
.build();
45+
46+
TableInfo.Builder builder = TableInfo.newBuilder(tableId, tableDefinition);
47+
if (config.encryptionConfig != null) {
48+
builder.setEncryptionConfiguration(config.encryptionConfig);
49+
}
50+
TableInfo tableInfo = builder.build();
51+
config.bigQuery.create(tableInfo);
52+
}
53+
}
54+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package io.cdap.delta.bigquery.event;
2+
3+
import io.cdap.delta.api.DDLEvent;
4+
import io.cdap.delta.api.DeltaFailureException;
5+
import io.cdap.delta.bigquery.BigQueryEventConsumer;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
9+
import java.io.IOException;
10+
11+
public abstract class DDLEventHandler {
12+
13+
final static protected Logger LOG = LoggerFactory.getLogger(DDLEventHandler.class);
14+
15+
final protected BigQueryEventConsumer consumer;
16+
final protected EventHandlerConfig config;
17+
18+
public DDLEventHandler(BigQueryEventConsumer consumer, EventHandlerConfig config) {
19+
this.consumer = consumer;
20+
this.config = config;
21+
}
22+
23+
public abstract void handleDDL(
24+
DDLEvent event,
25+
String normalizedDatabaseName,
26+
String normalizedTableName,
27+
String normalizedStagingTableName)
28+
throws IOException, DeltaFailureException, InterruptedException
29+
;
30+
31+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package io.cdap.delta.bigquery.event;
2+
3+
import com.google.cloud.bigquery.DatasetId;
4+
import io.cdap.delta.api.DDLEvent;
5+
import io.cdap.delta.api.DeltaFailureException;
6+
import io.cdap.delta.bigquery.BigQueryEventConsumer;
7+
8+
import java.io.IOException;
9+
10+
public class DropDatabase extends DDLEventHandler {
11+
12+
public DropDatabase(BigQueryEventConsumer consumer, EventHandlerConfig config) { super(consumer, config); }
13+
14+
@Override
15+
public void handleDDL(DDLEvent event, String normalizedDatabaseName, String normalizedTableName, String normalizedStagingTableName) throws IOException, DeltaFailureException, InterruptedException {
16+
DatasetId datasetId = DatasetId.of(config.project, normalizedDatabaseName);
17+
config.primaryKeyStore.clear();
18+
if (config.bigQuery.getDataset(datasetId) != null) {
19+
if (config.requireManualDrops) {
20+
String message = String.format("Encountered an event to drop dataset '%s' in project '%s', " +
21+
"but the target is configured to require manual drops. " +
22+
"Please manually drop the dataset to make progress.",
23+
normalizedDatabaseName, config.project);
24+
LOG.error(message);
25+
throw new RuntimeException(message);
26+
}
27+
config.bigQuery.delete(datasetId);
28+
}
29+
30+
}
31+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package io.cdap.delta.bigquery.event;
2+
3+
import com.google.cloud.bigquery.BigQuery;
4+
import com.google.cloud.bigquery.Table;
5+
import com.google.cloud.bigquery.TableId;
6+
import com.google.cloud.storage.Bucket;
7+
import io.cdap.delta.api.DDLEvent;
8+
import io.cdap.delta.api.DeltaFailureException;
9+
import io.cdap.delta.bigquery.BigQueryEventConsumer;
10+
11+
import java.io.IOException;
12+
import java.util.List;
13+
import java.util.Map;
14+
15+
public class DropTable extends DDLEventHandler {
16+
17+
public DropTable(BigQueryEventConsumer consumer, EventHandlerConfig config) { super(consumer, config); }
18+
19+
@Override
20+
public void handleDDL(DDLEvent event, String normalizedDatabaseName, String normalizedTableName, String normalizedStagingTableName) throws IOException, DeltaFailureException, InterruptedException {
21+
// need to flush changes before dropping the table, otherwise the next flush will write data that
22+
// shouldn't exist
23+
consumer.flush();
24+
TableId tableId = TableId.of(config.project, normalizedDatabaseName, normalizedTableName);
25+
config.primaryKeyStore.remove(tableId);
26+
Table table = config.bigQuery.getTable(tableId);
27+
if (table != null) {
28+
if (config.requireManualDrops) {
29+
String message = String.format("Encountered an event to drop table '%s' in dataset '%s' in project '%s', " +
30+
"but the target is configured to require manual drops. " +
31+
"Please manually drop the table to make progress.",
32+
normalizedTableName, normalizedDatabaseName, config.project);
33+
LOG.error(message);
34+
throw new RuntimeException(message);
35+
}
36+
config.bigQuery.delete(tableId);
37+
}
38+
TableId stagingTableId = TableId.of(config.project, normalizedDatabaseName, normalizedStagingTableName);
39+
Table stagingTable = config.bigQuery.getTable(stagingTableId);
40+
if (stagingTable != null) {
41+
config.bigQuery.delete(stagingTableId);
42+
}
43+
44+
}
45+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package io.cdap.delta.bigquery.event;
2+
3+
import io.cdap.delta.api.DDLEvent;
4+
import io.cdap.delta.api.DDLOperation;
5+
import io.cdap.delta.api.DeltaFailureException;
6+
7+
import java.io.IOException;
8+
import java.util.HashMap;
9+
import java.util.Map;
10+
11+
public class EventDispatcher {
12+
13+
private final Map<DDLOperation.Type, DDLEventHandler> handlers = new HashMap<>();
14+
15+
public EventDispatcher(
16+
DropDatabase dropDatabase,
17+
CreateDatabase createDatabase,
18+
DropTable dropTable,
19+
CreateTable createTable,
20+
AlterTable alterTable,
21+
TruncateTable truncateTable,
22+
RenameTable renameTable
23+
) {
24+
handlers.put(DDLOperation.Type.DROP_DATABASE, dropDatabase);
25+
handlers.put(DDLOperation.Type.CREATE_DATABASE, createDatabase);
26+
handlers.put(DDLOperation.Type.DROP_TABLE, dropTable);
27+
handlers.put(DDLOperation.Type.CREATE_TABLE, createTable);
28+
handlers.put(DDLOperation.Type.ALTER_TABLE, alterTable);
29+
handlers.put(DDLOperation.Type.TRUNCATE_TABLE, truncateTable);
30+
handlers.put(DDLOperation.Type.RENAME_TABLE, renameTable);
31+
}
32+
33+
public DDLEventHandler handler(DDLEvent event) {
34+
return handlers.get(event.getOperation().getType());
35+
}
36+
37+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package io.cdap.delta.bigquery.event;
2+
3+
import com.google.cloud.bigquery.BigQuery;
4+
import com.google.cloud.bigquery.EncryptionConfiguration;
5+
import com.google.cloud.bigquery.TableId;
6+
import com.google.cloud.storage.Bucket;
7+
import io.cdap.delta.api.DeltaTargetContext;
8+
9+
import java.util.List;
10+
import java.util.Map;
11+
12+
public class EventHandlerConfig {
13+
14+
public final DeltaTargetContext context;
15+
public final BigQuery bigQuery;
16+
public final String project;
17+
public final Bucket bucket;
18+
public final Map<TableId, List<String>> primaryKeyStore;
19+
public final boolean requireManualDrops;
20+
public final int maxClusteringColumns;
21+
public final EncryptionConfiguration encryptionConfig;
22+
23+
public EventHandlerConfig(
24+
DeltaTargetContext context,
25+
BigQuery bigQuery,
26+
String project,
27+
Bucket bucket,
28+
Map<TableId,
29+
List<String>> primaryKeyStore,
30+
boolean requireManualDrops,
31+
int maxClusteringColumns,
32+
EncryptionConfiguration encryptionConfig
33+
34+
) {
35+
this.context = context;
36+
this.bigQuery = bigQuery;
37+
this.project = project;
38+
this.bucket = bucket;
39+
this.primaryKeyStore = primaryKeyStore;
40+
this.requireManualDrops = requireManualDrops;
41+
this.maxClusteringColumns = maxClusteringColumns;
42+
this.encryptionConfig = encryptionConfig;
43+
}
44+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package io.cdap.delta.bigquery.event;
2+
3+
import com.google.cloud.bigquery.BigQuery;
4+
import com.google.cloud.bigquery.TableId;
5+
import com.google.cloud.storage.Bucket;
6+
import io.cdap.delta.api.DDLEvent;
7+
import io.cdap.delta.api.DeltaFailureException;
8+
import io.cdap.delta.bigquery.BigQueryEventConsumer;
9+
10+
import java.io.IOException;
11+
import java.util.List;
12+
import java.util.Map;
13+
14+
public class RenameTable extends DDLEventHandler {
15+
16+
public RenameTable(BigQueryEventConsumer consumer, EventHandlerConfig config) { super(consumer, config); }
17+
18+
@Override
19+
public void handleDDL(DDLEvent event, String normalizedDatabaseName, String normalizedTableName, String normalizedStagingTableName) throws IOException, DeltaFailureException, InterruptedException {
20+
// TODO: flush changes, execute a copy job, delete previous table, drop old staging table, remove old entry
21+
// in primaryKeyStore, put new entry in primaryKeyStore
22+
LOG.warn("Rename DDL events are not supported. Ignoring rename event in database {} from table {} to table {}.",
23+
event.getOperation().getDatabaseName(), event.getOperation().getPrevTableName(),
24+
event.getOperation().getTableName());
25+
26+
}
27+
}

0 commit comments

Comments
 (0)