Skip to content

Commit 058dbb1

Browse files
committed
fix: Use unique aliases in MERGE statements to avoid conflicts with column names
1 parent 21e6101 commit 058dbb1

File tree

1 file changed

+20
-20
lines changed

1 file changed

+20
-20
lines changed

src/main/java/io/cdap/delta/bigquery/BigQueryEventConsumer.java

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1190,34 +1190,34 @@ static String createDiffQuery(TableId stagingTable, List<String> primaryKeys, lo
11901190
* This makes sure event in B happens later than event in A
11911191
*/
11921192
if (sourceRowIdSupported) {
1193-
joinCondition = "A._row_id = B._row_id ";
1194-
whereClause = " B._row_id IS NULL ";
1193+
joinCondition = "__cdap_A__._row_id = __cdap_B__._row_id ";
1194+
whereClause = " __cdap_B__._row_id IS NULL ";
11951195
} else {
11961196
joinCondition = primaryKeys.stream()
1197-
.map(name -> String.format("A.`%s` = B.`_before_%s`", name, name))
1197+
.map(name -> String.format("__cdap_A__.`%s` = __cdap_B__.`_before_%s`", name, name))
11981198
.collect(Collectors.joining(" AND "));
11991199

12001200
whereClause = primaryKeys.stream()
1201-
.map(name -> String.format("B.`_before_%s` IS NULL", name))
1201+
.map(name -> String.format("__cdap_B__.`_before_%s` IS NULL", name))
12021202
.collect(Collectors.joining(" AND "));
12031203
}
12041204

12051205
if (sourceEventsOrdering == SourceProperties.Ordering.ORDERED) {
1206-
joinCondition += String.format(" AND A.%s < B.%1$s\n", Constants.SEQUENCE_NUM);
1206+
joinCondition += String.format(" AND __cdap_A__.%s < __cdap_B__.%1$s\n", Constants.SEQUENCE_NUM);
12071207

12081208
} else {
1209-
joinCondition += getOrderingCondition(sortKeys, "A", "B");
1209+
joinCondition += getOrderingCondition(sortKeys, "__cdap_A__", "__cdap_B__");
12101210
}
1211-
return "SELECT A.* FROM\n" +
1211+
return "SELECT __cdap_A__.* FROM\n" +
12121212
"(SELECT * FROM " +
12131213
BigQueryUtils.wrapInBackTick(stagingTable.getProject(), stagingTable.getDataset(), stagingTable.getTable()) +
12141214
" WHERE _batch_id = " + batchId +
1215-
" AND _sequence_num > " + latestSequenceNumInTargetTable + ") as A\n" +
1215+
" AND _sequence_num > " + latestSequenceNumInTargetTable + ") as __cdap_A__\n" +
12161216
"LEFT OUTER JOIN\n" +
12171217
"(SELECT * FROM " +
12181218
BigQueryUtils.wrapInBackTick(stagingTable.getProject(), stagingTable.getDataset(), stagingTable.getTable()) +
12191219
" WHERE _batch_id = " + batchId +
1220-
" AND _sequence_num > " + latestSequenceNumInTargetTable + ") as B\n" +
1220+
" AND _sequence_num > " + latestSequenceNumInTargetTable + ") as __cdap_B__\n" +
12211221
"ON " + joinCondition +
12221222
" WHERE " + whereClause;
12231223
}
@@ -1277,12 +1277,12 @@ static String createMergeQuery(TableId targetTableId, List<String> primaryKeys,
12771277

12781278
if (sourceRowIdSupported) {
12791279
// if source supports row Id , we use row id to match the row
1280-
mergeCondition = " T._row_id = D._row_id ";
1280+
mergeCondition = " __cdap_T__._row_id = __cdap_D__._row_id ";
12811281

12821282
} else {
12831283
// if source doesn't support row Id, we use primary keys to match the row
12841284
mergeCondition = primaryKeys.stream()
1285-
.map(name -> String.format("T.`%s` = D.`_before_%s`", name, name))
1285+
.map(name -> String.format("__cdap_T__.`%s` = __cdap_D__.`_before_%s`", name, name))
12861286
.collect(Collectors.joining(" AND "));
12871287

12881288
}
@@ -1332,37 +1332,37 @@ static String createMergeQuery(TableId targetTableId, List<String> primaryKeys,
13321332
deleteOperation = " UPDATE SET " + targetSchema.getFields().stream()
13331333
.filter(predicate)
13341334
.map(Schema.Field::getName)
1335-
.map(name -> String.format("`%s` = D.`%s`", name, name))
1335+
.map(name -> String.format("`%s` = __cdap_D__.`%s`", name, name))
13361336
.collect(Collectors.joining(", ")) + ", " + Constants.IS_DELETED + " = true ";
13371337
// if events are unordered , sort keys can decide the ordering
13381338
// if an event happening earlier comes later , it's possible that some events happening later against the same
13391339
// row has already been merged, so this late coming event should be ignored.
1340-
updateAndDeleteCondition = getOrderingCondition(sortKeys, "T", "D");
1340+
updateAndDeleteCondition = getOrderingCondition(sortKeys, "__cdap_T__", "__cdap_D__");
13411341
}
13421342

13431343
String mergeQuery = "MERGE " +
13441344
BigQueryUtils.wrapInBackTick(targetTableId.getProject(), targetTableId.getDataset(), targetTableId.getTable()) +
1345-
" as T\n" +
1346-
"USING (" + diffQuery + ") as D\n" +
1345+
" as __cdap_T__\n" +
1346+
"USING (" + diffQuery + ") as __cdap_D__\n" +
13471347
"ON " + mergeCondition + "\n" +
1348-
"WHEN MATCHED AND D._op = \"DELETE\" " + updateAndDeleteCondition + "THEN\n" +
1348+
"WHEN MATCHED AND __cdap_D__._op = \"DELETE\" " + updateAndDeleteCondition + "THEN\n" +
13491349
deleteOperation + "\n" +
13501350
// In a case when a replicator is paused for too long and crashed when resumed
13511351
// user will create a new replicator against the same target
13521352
// in this case the target already has some data
13531353
// so the new repliator's snapshot will generate insert events that match some existing data in the
13541354
// targe. That's why in the match case, we still need the insert opertion.
1355-
"WHEN MATCHED AND D._op IN (\"INSERT\", \"UPDATE\") " + updateAndDeleteCondition + "THEN\n" +
1355+
"WHEN MATCHED AND __cdap_D__._op IN (\"INSERT\", \"UPDATE\") " + updateAndDeleteCondition + "THEN\n" +
13561356
" UPDATE SET " +
13571357
targetSchema.getFields().stream()
13581358
.filter(predicate)
13591359
.map(Schema.Field::getName)
1360-
.map(name -> String.format("`%s` = D.`%s`", name, name))
1360+
.map(name -> String.format("`%s` = __cdap_D__.`%s`", name, name))
13611361
// explicitly set "_is_deleted" to null for the case when this row was previously deleted and the
13621362
// "_is_deleted" column was set to "true" and now a new insert is to insert the same row , we need to
13631363
// reset "_is_deleted" back to null.
13641364
.collect(Collectors.joining(", ")) + ", " + Constants.IS_DELETED + " = null\n" +
1365-
"WHEN NOT MATCHED AND D._op IN (\"INSERT\", \"UPDATE\") THEN\n" +
1365+
"WHEN NOT MATCHED AND __cdap_D__._op IN (\"INSERT\", \"UPDATE\") THEN\n" +
13661366
" INSERT (" +
13671367
targetSchema.getFields().stream()
13681368
.filter(predicate)
@@ -1375,7 +1375,7 @@ static String createMergeQuery(TableId targetTableId, List<String> primaryKeys,
13751375
.collect(Collectors.joining(", ")) + ")";
13761376

13771377
if (sourceEventOrdering == SourceProperties.Ordering.UN_ORDERED) {
1378-
mergeQuery += "\nWHEN NOT MATCHED AND D._op = \"DELETE\" THEN\n" +
1378+
mergeQuery += "\nWHEN NOT MATCHED AND __cdap_D__._op = \"DELETE\" THEN\n" +
13791379
" INSERT (" +
13801380
targetSchema.getFields().stream()
13811381
.filter(predicate)

0 commit comments

Comments
 (0)