Skip to content

Commit 06270e8

Browse files
authored
Merge pull request #113 from data-integrations/fix/CDAP-17847-clustering-key
CDAP-17847 Ignore un-supported primary key types while creating clustering on BigQuery tables.
2 parents 879c98a + 334c09d commit 06270e8

File tree

3 files changed

+159
-46
lines changed

3 files changed

+159
-46
lines changed

src/main/java/io/cdap/delta/bigquery/BigQueryEventConsumer.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ public class BigQueryEventConsumer implements EventConsumer {
226226
// https://cloud.google.com/bigquery/docs/creating-clustered-tables#limitations
227227
this.maxClusteringColumns = maxClusteringColumnsStr == null ? 4 : Integer.parseInt(maxClusteringColumnsStr);
228228
this.sourceRowIdSupported =
229-
context.getSourceProperties() == null ? false : context.getSourceProperties().isRowIdSupported();
229+
context.getSourceProperties() != null && context.getSourceProperties().isRowIdSupported();
230230
this.sourceEventOrdering = context.getSourceProperties() == null ? SourceProperties.Ordering.ORDERED :
231231
context.getSourceProperties().getOrdering();
232232
this.datasetName = datasetName;
@@ -371,9 +371,11 @@ private void handleDDL(DDLEvent event, String normalizedDatabaseName, String nor
371371
updatePrimaryKeys(tableId, primaryKeys);
372372
// TODO: check schema of table if it exists already
373373
if (table == null) {
374-
Clustering clustering = maxClusteringColumns <= 0 ? null :
374+
List<String> clusteringSupportedKeys = getClusteringSupportedKeys(primaryKeys, event.getSchema());
375+
Clustering clustering = maxClusteringColumns <= 0 || clusteringSupportedKeys.isEmpty() ? null :
375376
Clustering.newBuilder()
376-
.setFields(primaryKeys.subList(0, Math.min(maxClusteringColumns, primaryKeys.size())))
377+
.setFields(clusteringSupportedKeys.subList(0, Math.min(maxClusteringColumns,
378+
clusteringSupportedKeys.size())))
377379
.build();
378380
TableDefinition tableDefinition = StandardTableDefinition.newBuilder()
379381
.setSchema(Schemas.convert(addSupplementaryColumnsToTargetSchema(event.getSchema())))
@@ -477,6 +479,17 @@ private void handleDDL(DDLEvent event, String normalizedDatabaseName, String nor
477479
}
478480
}
479481

482+
@VisibleForTesting
483+
static List<String> getClusteringSupportedKeys(List<String> primaryKeys, Schema recordSchema) {
484+
List<String> result = new ArrayList<>();
485+
for (String key : primaryKeys) {
486+
if (Schemas.isClusteringSupported(recordSchema.getField(key))) {
487+
result.add(key);
488+
}
489+
}
490+
return result;
491+
}
492+
480493
private void updatePrimaryKeys(TableId tableId, List<String> primaryKeys) throws DeltaFailureException, IOException {
481494
if (primaryKeys.isEmpty()) {
482495
throw new DeltaFailureException(

src/main/java/io/cdap/delta/bigquery/Schemas.java

Lines changed: 67 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -22,66 +22,37 @@
2222
import io.cdap.cdap.api.data.schema.Schema;
2323

2424
import java.util.ArrayList;
25+
import java.util.Arrays;
26+
import java.util.HashSet;
2527
import java.util.List;
28+
import java.util.Set;
2629
import javax.annotation.Nullable;
2730

2831
/**
2932
* Utilities around BigQuery schemas.
3033
*/
3134
public class Schemas {
3235

36+
// Set of BigQuery types supported for clustering per definition
37+
// https://cloud.google.com/bigquery/docs/creating-clustered-tables#limitations
38+
private static final Set<StandardSQLTypeName> CLUSTERING_SUPPORTED_TYPES
39+
= new HashSet<>(Arrays.asList(StandardSQLTypeName.DATE, StandardSQLTypeName.BOOL, StandardSQLTypeName.GEOGRAPHY,
40+
StandardSQLTypeName.INT64, StandardSQLTypeName.NUMERIC, StandardSQLTypeName.STRING,
41+
StandardSQLTypeName.TIMESTAMP, StandardSQLTypeName.DATETIME));
42+
3343
private Schemas() {
3444
// no-op
3545
}
3646

47+
3748
public static com.google.cloud.bigquery.Schema convert(Schema schema) {
3849
return com.google.cloud.bigquery.Schema.of(convertFields(schema.getFields()));
3950
}
4051

4152
private static List<Field> convertFields(List<Schema.Field> fields) {
4253
List<Field> output = new ArrayList<>();
4354
for (Schema.Field field : fields) {
44-
String name = field.getName();
45-
boolean isNullable = field.getSchema().isNullable();
46-
Schema fieldSchema = field.getSchema();
47-
fieldSchema = isNullable ? fieldSchema.getNonNullable() : fieldSchema;
48-
Schema.LogicalType logicalType = fieldSchema.getLogicalType();
49-
Field.Mode fieldMode = isNullable ? Field.Mode.NULLABLE : Field.Mode.REQUIRED;
50-
if (logicalType != null) {
51-
StandardSQLTypeName bqType = convertLogicalType(logicalType);
52-
// TODO: figure out what the correct behavior should be
53-
if (bqType == null) {
54-
throw new IllegalArgumentException(
55-
String.format("Field '%s' is of type '%s', which is not supported in BigQuery.",
56-
name, logicalType.getToken()));
57-
}
58-
output.add(Field.newBuilder(name, bqType).setMode(fieldMode).build());
59-
continue;
60-
}
61-
62-
Schema.Type type = isNullable ? field.getSchema().getNonNullable().getType() : field.getSchema().getType();
63-
if (type == Schema.Type.ARRAY) {
64-
Schema componentSchema = fieldSchema.getComponentSchema();
65-
componentSchema = componentSchema.isNullable() ? componentSchema.getNonNullable() : componentSchema;
66-
StandardSQLTypeName bqType = convertType(componentSchema.getType());
67-
if (bqType == null) {
68-
throw new IllegalArgumentException(
69-
String.format("Field '%s' is an array of '%s', which is not supported in BigQuery.",
70-
name, logicalType.getToken()));
71-
}
72-
output.add(Field.newBuilder(name, bqType).setMode(Field.Mode.REPEATED).build());
73-
} else if (type == Schema.Type.RECORD) {
74-
List<Field> subFields = convertFields(fieldSchema.getFields());
75-
output.add(Field.newBuilder(name, StandardSQLTypeName.STRUCT, FieldList.of(subFields)).build());
76-
} else {
77-
StandardSQLTypeName bqType = convertType(type);
78-
if (bqType == null) {
79-
throw new IllegalArgumentException(
80-
String.format("Field '%s' is of type '%s', which is not supported in BigQuery.",
81-
name, type.name().toLowerCase()));
82-
}
83-
output.add(Field.newBuilder(name, bqType).setMode(fieldMode).build());
84-
}
55+
output.add(convertToBigQueryField(field));
8556
}
8657
return output;
8758
}
@@ -96,13 +67,12 @@ private static StandardSQLTypeName convertType(Schema.Type type) {
9667
case DOUBLE:
9768
return StandardSQLTypeName.FLOAT64;
9869
case STRING:
70+
case ENUM:
9971
return StandardSQLTypeName.STRING;
10072
case BOOLEAN:
10173
return StandardSQLTypeName.BOOL;
10274
case BYTES:
10375
return StandardSQLTypeName.BYTES;
104-
case ENUM:
105-
return StandardSQLTypeName.STRING;
10676
}
10777
return null;
10878
}
@@ -125,4 +95,58 @@ private static StandardSQLTypeName convertLogicalType(Schema.LogicalType logical
12595
}
12696
return null;
12797
}
98+
99+
/**
100+
* Check if the BigQuery data type associated with the {@link Schema.Field} can be added
101+
* as a clustering column while creating BigQuery table.
102+
*/
103+
public static boolean isClusteringSupported(Schema.Field field) {
104+
Field bigQueryField = convertToBigQueryField(field);
105+
return CLUSTERING_SUPPORTED_TYPES.contains(bigQueryField.getType().getStandardType());
106+
}
107+
108+
private static Field convertToBigQueryField(Schema.Field field) {
109+
String name = field.getName();
110+
boolean isNullable = field.getSchema().isNullable();
111+
Schema fieldSchema = field.getSchema();
112+
fieldSchema = isNullable ? fieldSchema.getNonNullable() : fieldSchema;
113+
Schema.LogicalType logicalType = fieldSchema.getLogicalType();
114+
Field.Mode fieldMode = isNullable ? Field.Mode.NULLABLE : Field.Mode.REQUIRED;
115+
if (logicalType != null) {
116+
StandardSQLTypeName bqType = convertLogicalType(logicalType);
117+
// TODO: figure out what the correct behavior should be
118+
if (bqType == null) {
119+
throw new IllegalArgumentException(
120+
String.format("Field '%s' is of type '%s', which is not supported in BigQuery.",
121+
name, logicalType.getToken()));
122+
}
123+
return Field.newBuilder(name, bqType).setMode(fieldMode).build();
124+
}
125+
126+
Field output;
127+
Schema.Type type = isNullable ? field.getSchema().getNonNullable().getType() : field.getSchema().getType();
128+
if (type == Schema.Type.ARRAY) {
129+
Schema componentSchema = fieldSchema.getComponentSchema();
130+
componentSchema = componentSchema.isNullable() ? componentSchema.getNonNullable() : componentSchema;
131+
StandardSQLTypeName bqType = convertType(componentSchema.getType());
132+
if (bqType == null) {
133+
throw new IllegalArgumentException(
134+
String.format("Field '%s' is an array of '%s', which is not supported in BigQuery.",
135+
name, logicalType.getToken()));
136+
}
137+
output = Field.newBuilder(name, bqType).setMode(Field.Mode.REPEATED).build();
138+
} else if (type == Schema.Type.RECORD) {
139+
List<Field> subFields = convertFields(fieldSchema.getFields());
140+
output = Field.newBuilder(name, StandardSQLTypeName.STRUCT, FieldList.of(subFields)).build();
141+
} else {
142+
StandardSQLTypeName bqType = convertType(type);
143+
if (bqType == null) {
144+
throw new IllegalArgumentException(
145+
String.format("Field '%s' is of type '%s', which is not supported in BigQuery.",
146+
name, type.name().toLowerCase()));
147+
}
148+
output = Field.newBuilder(name, bqType).setMode(fieldMode).build();
149+
}
150+
return output;
151+
}
128152
}

src/test/java/io/cdap/delta/bigquery/BigQueryEventConsumerTest.java

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,82 @@ public void testCreateTableWithClustering() throws Exception {
179179
cleanupTest(bucket, dataset, eventConsumer);
180180
}
181181

182+
@Test
183+
public void testCreateTableWithInvalidTypesForClustering() throws Exception {
184+
String bucketName = "bqtest-" + UUID.randomUUID().toString();
185+
Bucket bucket = storage.create(BucketInfo.of(bucketName));
186+
BigQueryEventConsumer eventConsumer = new BigQueryEventConsumer(new MockContext(300, Collections.emptyMap()),
187+
storage, bigQuery, bucket, project, 0,
188+
STAGING_TABLE_PREFIX, true, null, 1L, null);
189+
190+
String dataset = "testInvalidTypesForClustering";
191+
String allinvalidsTableName = "allinvalids";
192+
TableId allInvalidsTable = TableId.of(dataset, allinvalidsTableName);
193+
String someInvalidsTableName = "someinvalids";
194+
TableId someInvalidsTable = TableId.of(dataset, someInvalidsTableName);
195+
196+
try {
197+
bigQuery.create(DatasetInfo.newBuilder(dataset).build());
198+
199+
// Primary keys with all un-supported types for clustering
200+
List<String> primaryKeys = new ArrayList<>();
201+
primaryKeys.add("id1");
202+
Schema schema = Schema.recordOf(allinvalidsTableName,
203+
Schema.Field.of("id1", Schema.of(Schema.Type.BYTES)));
204+
205+
DDLEvent allInvalidsCreateTable = DDLEvent.builder()
206+
.setOperation(DDLOperation.Type.CREATE_TABLE)
207+
.setDatabaseName(dataset)
208+
.setTableName(allinvalidsTableName)
209+
.setSchema(schema)
210+
.setPrimaryKey(primaryKeys)
211+
.setOffset(new Offset())
212+
.build();
213+
eventConsumer.applyDDL(new Sequenced<>(allInvalidsCreateTable, 0));
214+
215+
Table table = bigQuery.getTable(allInvalidsTable);
216+
StandardTableDefinition tableDefinition = table.getDefinition();
217+
Clustering clustering = tableDefinition.getClustering();
218+
// No clustering should be added
219+
Assert.assertNull(clustering);
220+
bigQuery.delete(allInvalidsTable);
221+
222+
// Primary keys with some un-supported types for clustering
223+
primaryKeys = new ArrayList<>();
224+
primaryKeys.add("id1");
225+
primaryKeys.add("id2");
226+
primaryKeys.add("id3");
227+
primaryKeys.add("id4");
228+
primaryKeys.add("id5");
229+
schema = Schema.recordOf(allinvalidsTableName,
230+
Schema.Field.of("id1", Schema.of(Schema.Type.BYTES)),
231+
Schema.Field.of("id2", Schema.of(Schema.Type.BYTES)),
232+
Schema.Field.of("id3", Schema.of(Schema.Type.BYTES)),
233+
Schema.Field.of("id4", Schema.of(Schema.Type.BYTES)),
234+
// add one valid clustering key
235+
Schema.Field.of("id5", Schema.of(Schema.Type.INT)));
236+
237+
DDLEvent someInvalidsTableCreate = DDLEvent.builder()
238+
.setOperation(DDLOperation.Type.CREATE_TABLE)
239+
.setDatabaseName(dataset)
240+
.setTableName(someInvalidsTableName)
241+
.setSchema(schema)
242+
.setPrimaryKey(primaryKeys)
243+
.setOffset(new Offset())
244+
.build();
245+
eventConsumer.applyDDL(new Sequenced<>(someInvalidsTableCreate, 0));
246+
247+
table = bigQuery.getTable(someInvalidsTable);
248+
tableDefinition = table.getDefinition();
249+
clustering = tableDefinition.getClustering();
250+
Assert.assertNotNull(clustering);
251+
Assert.assertEquals(primaryKeys.subList(4, 5), clustering.getFields());
252+
bigQuery.delete(someInvalidsTable);
253+
} finally {
254+
cleanupTest(bucket, dataset, eventConsumer);
255+
}
256+
}
257+
182258
@Test
183259
public void testManualDropRetries() throws Exception {
184260
String bucketName = "bqtest-" + UUID.randomUUID().toString();

0 commit comments

Comments
 (0)